Три стратегии подтверждения выпуска MQ | Серия RabbitMQ (6)

задняя часть RabbitMQ
Три стратегии подтверждения выпуска MQ | Серия RabbitMQ (6)

Это 11-й день моего участия в августовском испытании обновлений. Узнайте подробности события:Испытание августовского обновления


Статьи по Теме

Краткое изложение серии RabbitMQ:Серия RabbitMQ


предисловие

  • Производитель устанавливает канал в режим подтверждения.Как только канал переходит в режим подтверждения, всем сообщениям, опубликованным на канале, будет присвоен уникальный идентификатор (начиная с 1), как только сообщение будет доставлено во все соответствующие очереди.

  • Брокер отправит подтверждение (содержащее уникальный идентификатор сообщения) производителю, что позволит производителю узнать, что сообщение прибыло в очередь назначения правильно.Если сообщение и очередь являются устойчивыми, сообщение подтверждения будет записано перед записью сообщения Выдается после диска.

  • Поле тега доставки в подтверждающем сообщении, возвращаемом посредником производителю, содержит серийный номер подтверждающего сообщения. Кроме того, посредник может также установить множественное поле basic.ack, чтобы указать, что все сообщения до этого серийного номера имеют обработано.

  • Самым большим преимуществом режима подтверждения является то, что он является асинхронным.После публикации сообщения приложение-производитель может продолжать отправлять следующее сообщение, ожидая, пока канал вернет подтверждение, когда сообщение будет окончательно подтверждено.

  • Приложение-производитель может обработать подтверждающее сообщение с помощью метода обратного вызова. Если RabbitMQ потеряет сообщение из-за собственной внутренней ошибки, он отправит сообщение nack. Приложение-производитель также может обработать сообщение nack в методе обратного вызова.

  • Включить подтверждение выпуска

    • Опубликованное подтверждение не включено по умолчанию, и если вам нужно вызвать метод открытия для подтверждения, всякий раз, когда вы хотите использовать релиз для подтверждения, вам нужно вызвать этот метод на канале

    • //开启发布确认
      channel.confirmSelect();
      

1. Разовое подтверждение выпуска

  • Это простой метод подтверждения. Это метод синхронного подтверждения выпуска, то есть после выпуска сообщения подтверждается только его выпуск, а последующие сообщения могут продолжать выпускаться. Он вернется, когда сообщение будет не подтверждено в течение указанного периода времени, будет выдано исключение.

  • режиссер

    • /**
       * 这是一个测试的生产者
       *@author DingYongJun
       *@date 2021/8/1
       */
      public class DyProducerTest_dingyuefabu {
      
          //设置执行次数
          public static final int MESSAGE_COUNT = 888;
          /**
           * 这里为了方便,我们使用main函数来测试
           * 纯属看你个人选择
           * @param args
           */
          public static void main(String[] args) throws Exception {
              //单个发布确认执行
              publishMessageIndividually();
          }
      
          /**
           * 单个发布确认
           */
          public static void publishMessageIndividually() throws Exception {
              Channel channel = RabbitMqUtils.getChannel();
              String queueName = UUID.randomUUID().toString();
              channel.queueDeclare(queueName, false, false, false, null);
              //开启发布确认
              channel.confirmSelect();
              long begin = System.currentTimeMillis();
              for (int i = 0; i < MESSAGE_COUNT; i++) {
                  String message = i + "";
                  channel.basicPublish("", queueName, null, message.getBytes());
                  //服务端返回 false 或超时时间内未返回,生产者可以消息重发
                  boolean flag = channel.waitForConfirms();
                  if(flag){
                      System.out.println("消息发送成功");
                  }
              }
              long end = System.currentTimeMillis();
              System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) +
                      "ms");
          }
      }
      
  • Результаты

    • image-20210803162231439.png
    • image-20210803162251045.png
  • Одним из самых больших минусов этого метода подтверждения является то, что скорость публикации крайне низкая, так как в случае отсутствия подтверждения опубликованного сообщения блокируется публикация всех последующих сообщений.Этот метод обеспечивает пропускную способность не более сотни опубликованных сообщений в секунду не более. . Конечно, для некоторых приложений этого может быть достаточно.

  • Конечно, теперь, когда я медлительный, вы не должны это воспринимать.Если вы сравните следующие типы вместе, вы узнаете, насколько он неэффективен!

2. Пакетное подтверждение выпуска

  • Публикация пакета сообщений, а затем их совместное подтверждение может значительно повысить пропускную способность по сравнению с одним сообщением, ожидающим подтверждения.

  • режиссер

    • /**
      * 批量发布确认
      */
      public static void publishMessageBatch() throws Exception {
              Channel channel = RabbitMqUtils.getChannel();
              //队列名使用uuid来获取不重复的值,不需要自己再进行命名了。
              String queueName = UUID.randomUUID().toString();
              channel.queueDeclare(queueName, false, false, false, null);
              //开启发布确认
              channel.confirmSelect();
              //批量确认消息大小
              int batchSize = 88;
              //未确认消息个数
              int outstandingMessageCount = 0;
              long begin = System.currentTimeMillis();
              for (int i = 0; i < MESSAGE_COUNT; i++) {
                  String message = i + "";
                  channel.basicPublish("", queueName, null, message.getBytes());
                  outstandingMessageCount++;
                  if (outstandingMessageCount == batchSize) {
                      channel.waitForConfirms();//确认代码
                      outstandingMessageCount = 0;
                  }
      
              }
              //为了确保还有剩余没有确认消息 再次确认
              if (outstandingMessageCount > 0) {
                  channel.waitForConfirms();
              }
              long end = System.currentTimeMillis();
              System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) +
                      "ms");
          }
      
  • Результаты

    • image-20210803163106931.png
    • image-20210803163121282.png
  • Недостаток: когда сбой вызывает публикацию проблемы, мы не знаем, какое сообщение является проблемой, мы должны хранить весь пакет в памяти, чтобы записать важную информацию, а затем повторно опубликовать сообщение.

  • Конечно, это решение по-прежнему синхронно, и оно также блокирует выпуск сообщений.

3. Асинхронный выпуск подтверждения

  • Асинхронное подтверждение Хотя логика программирования сложна, чем две, цена является самой высокой, будь то надежность или эффективность, он использует функцию обратного вызова для достижения надежности сообщения, этот средний кусок также является функцией обратного вызова, чтобы обеспечить успешную доставку, давайте позволим нам объяснить Как подтвердить асинхронное подтверждение.

  • режиссер

    •    /**
           * 异步发布确认
           */
          public static void publishMessageAsync() throws Exception {
              try (Channel channel = RabbitMqUtils.getChannel()) {
                  String queueName = UUID.randomUUID().toString();
                  channel.queueDeclare(queueName, false, false, false, null);
                  //开启发布确认
                  channel.confirmSelect();
                  /**
                   * 线程安全有序的一个哈希表,适用于高并发的情况
                   * 1.轻松的将序号与消息进行关联
                   * 2.轻松批量删除条目 只要给到序列号
                   * 3.支持并发访问
                   */
                  ConcurrentSkipListMap<Long, String> outstandingConfirms = new
                          ConcurrentSkipListMap<>();
                  /**
                   * 确认收到消息的一个回调
                   * 1.消息序列号
                   * 2.true 可以确认小于等于当前序列号的消息
                   * false 确认当前序列号消息
                   */
                  ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
                      if (multiple) {
                          //返回的是小于等于当前序列号的未确认消息 是一个 map
                          ConcurrentNavigableMap<Long, String> confirmed =
                                  outstandingConfirms.headMap(sequenceNumber, true);
                          //清除该部分未确认消息
                          confirmed.clear();
                      }else{
                          //只清除当前序列号的消息
                          outstandingConfirms.remove(sequenceNumber);
                      }
                  };
                  ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
                      String message = outstandingConfirms.get(sequenceNumber);
                      System.out.println("发布的消息"+message+"未被确认,序列号"+sequenceNumber);
                  };
                  /**
                   * 添加一个异步确认的监听器
                   * 1.确认收到消息的回调
                   * 2.未收到消息的回调
                   */
                  channel.addConfirmListener(ackCallback, null);
                  long begin = System.currentTimeMillis();
                  for (int i = 0; i < MESSAGE_COUNT; i++) {
                      String message = "消息" + i;
                      /**
                       * channel.getNextPublishSeqNo()获取下一个消息的序列号
                       * 通过序列号与消息体进行一个关联
                       * 全部都是未确认的消息体
                       */
                      outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
                      channel.basicPublish("", queueName, null, message.getBytes());
                  }
                  long end = System.currentTimeMillis();
                  System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin) +
                          "ms");
              }
          }
      
  • Результаты

    • image-20210803163759179.png
    • image-20210803163826281.png
  • Легко видеть, что этот метод быстро летает!

  • Как обрабатывать неподтвержденные сообщения?

    • Лучшее решение — поместить неподтвержденные сообщения в очередь в памяти, к которой может получить доступ поток публикации, например, используя ConcurrentLinkedQueue для передачи сообщений между обратными вызовами подтверждения и потоком публикации.

4. Резюме

  • отдельное сообщение

    • Время: 21210 мс
    • Синхронное ожидание подтверждения, простое, но очень ограниченная пропускная способность.
  • Отправляйте сообщения партиями

    • Время: 525 мс
    • Пакетная синхронизация с ожиданием подтверждения, простая, разумная пропускная способность, когда есть проблема, но трудно сделать вывод, что есть проблема с сообщением.
  • Асинхронная обработка

    • Время: 45 мс
    • Лучшая производительность и использование ресурсов, хорошо контролируемый в случае ошибок, но немного сложнее в реализации

Впереди долгий путь, и я обязательно буду его искать вдоль и поперёк~

Если вы думаете, что я блогеры хорошо пишу! Писать нелегко, пожалуйста, ставьте лайки, подписывайтесь и комментируйте, чтобы поощрять блоггеров ~ хахах