MQ недобросовестно распределяет сообщения и предварительную выборку | Серия RabbitMQ (5)

задняя часть RabbitMQ
MQ недобросовестно распределяет сообщения и предварительную выборку | Серия RabbitMQ (5)

Это 10-й день моего участия в августовском испытании обновлений. Узнайте подробности события:Испытание августовского обновления


Статьи по Теме

Краткое изложение серии RabbitMQ:Серия RabbitMQ


предисловие

  • В самом начале мы изучили циклическое распределение, которое RabbitMQ использует для распространения сообщений, но эта стратегия не очень хороша в определенных сценариях.

  • Например, есть два потребителя, обрабатывающие задачи. Один из потребителей A обрабатывает задачи очень быстро, а скорость обработки другого потребителя B очень низкая. В настоящее время мы все еще используем циклическое распределение. Быстрый потребитель бездействует большую часть времени, в то время как медленный потребитель всегда работает.

  • Этот метод распределения на самом деле не очень хорош в данном случае, но RabbitMQ не знает об этой ситуации и все равно распределяет справедливо.

  • В настоящее время мы можем использовать несправедливое распределение для достижения этого, что является способом увеличения труда для тех, кто способен! Вы можете сделать это, о, тогда вы можете сделать больше работы. Я больше ем овощи, поэтому меньше работаю.

1. Несправедливое распределение

  • режиссер

    • /**
       * 这是一个测试的生产者
       *@author DingYongJun
       *@date 2021/8/1
       */
      public class DyProducerTest_xiaoxiyingda {
          /**
           * 这里为了方便,我们使用main函数来测试
           * 纯属看你个人选择
           * @param args
           */
          public static void main(String[] args) throws Exception{
              //使用工具类来创建通道
              Channel channel = RabbitMqUtils.getChannel();
      
              /**
               * 生成一个队列
               * 1.队列名称
               * 2.队列里面的消息是否持久化 默认消息存储在内存中
               * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
               * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
               * 5.其他参数
               */
              //这是持久化的参数。false不进行持久化,true进行持久化
              boolean durable = true;
              channel.queueDeclare(QueueNameConstant.XIAOXIYINGDA_MODEL,durable,false,false,null);
      
              /**
               * 发送一个消息
               * 1.发送到那个交换机
               * 2.路由的 key 是哪个
               * 3.其他的参数信息
               * 4.发送消息的消息体
               */
              Scanner sc = new Scanner(System.in);
              System.out.println("请输入信息");
              while (sc.hasNext()) {
                  String message = sc.nextLine();
                  //MessageProperties.PERSISTENT_TEXT_PLAIN;这个代表消息持久化到硬盘
                  channel.basicPublish("",QueueNameConstant.XIAOXIYINGDA_MODEL,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
                  System.out.println("生产者发出消息" + message);
              }
          }
      }
      
  • потребитель

    • /**
       * 这是一个测试的消费者
       *@author DingYongJun
       *@date 2021/8/1
       */
      public class DyConsumerTest_xiaoxiyingda01 {
      
          public static void main(String[] args) throws Exception{
              //使用工具类来创建通道
              Channel channel = RabbitMqUtils.getChannel();
      
              System.out.println("我是消费者A,我在等待接收消息!");
              DeliverCallback deliverCallback = (String var1, Delivery var2)->{
                  String message= new String(var2.getBody());
                  try {
                      Thread.sleep(1000);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  System.out.println(message);
                  //true 代表批量应答 channel 上未应答的消息  false 单条应答
                  boolean multiple = false;
                  channel.basicAck(var2.getEnvelope().getDeliveryTag(),multiple);
              };
              CancelCallback cancelCallback = (String var1)->{
                  System.out.println("消息消费被中断");
              };
              //不公平分发
              int prefetchCount = 1;
              channel.basicQos(prefetchCount);
              /**
               * 消费者消费消息
               * 1.消费哪个队列
               * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
               * 3.消费者未成功消费的回调
               */
              channel.basicConsume(QueueNameConstant.XIAOXIYINGDA_MODEL,false,deliverCallback,cancelCallback);
          }
      }
      
    • /**
       * 这是一个测试的消费者
       *@author DingYongJun
       *@date 2021/8/1
       */
      public class DyConsumerTest_xiaoxiyingda02 {
      
          public static void main(String[] args) throws Exception{
              //使用工具类来创建通道
              Channel channel = RabbitMqUtils.getChannel();
      
              System.out.println("我是消费者B,我在等待接收消息!");
              DeliverCallback deliverCallback = (String var1, Delivery var2)->{
                  String message= new String(var2.getBody());
                  try {
                      Thread.sleep(3000);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  System.out.println(message);
                  //true 代表批量应答 channel 上未应答的消息  false 单条应答
                  boolean multiple = false;
                  channel.basicAck(var2.getEnvelope().getDeliveryTag(),multiple);
              };
              CancelCallback cancelCallback = (String var1)->{
                  System.out.println("消息消费被中断");
              };
      
              //不公平分发
              int prefetchCount = 1;
              channel.basicQos(prefetchCount);
      
              /**
               * 消费者消费消息
               * 1.消费哪个队列
               * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
               * 3.消费者未成功消费的回调
               */
              channel.basicConsume(QueueNameConstant.XIAOXIYINGDA_MODEL,false,deliverCallback,cancelCallback);
          }
      }
      
    • Результаты

      • image-20210803143650779.png
      • image-20210803143701013.png
      • image-20210803143711503.png
    • По результатам можно сделать некоторые выводы

      • Очевидно, что A выполнил 7, а B выполнил 3.
      • Оба потребителя создали несправедливую модель распределения.
      • Когда потребитель A эффективен (то есть выполняется за одну секунду), потребитель B работает медленно (выполняется за три секунды).
      • MQ автоматически определит, кто работает быстрее всех, а затем выделит больше тем, кто быстр.
      • По сути, это не отдых. Когда вы закончите, идите вперед и получите новые миссии!
      • Ослики производственной бригады не посмеют так протискиваться! ха-ха~
    • Это означает, что если я не закончил это задание или не ответил вам, пожалуйста, не давайте его мне, я могу выполнить только одно задание в данный момент.

    • Затем rabbitmq назначит задачу неактивному потребителю, который не так занят, конечно, если все потребители не выполнили поставленную задачу.

    • В очередь постоянно добавляются новые задачи, и очередь может столкнуться с ситуацией, что очередь переполнена, в это время можно добавлять только новых воркеров или менять стратегию других задач хранения.

    • Код установки:

      • //不公平分发
        int prefetchCount = 1;
        channel.basicQos(prefetchCount);
        
      • //源码为证    
        public void basicQos(int prefetchCount) throws IOException {
                this.basicQos(0, prefetchCount, false);
            }
        
      • Значение int по умолчанию равно 0. Это не нужно объяснять. То есть, если мы его не установим, значение равно нулю.

      • Если установлено значение 1, это доказывает, что режим недобросовестного распределения включен.

2. Распределение значений предварительной выборки

  • Отправка самого сообщения является асинхронной, поэтому в любой момент на канале должно быть более одного сообщения, и ручное подтверждение от потребителя также носит асинхронный характер.

  • Итак, здесь есть буфер неподтвержденных сообщений, поэтому я надеюсь, что разработчики смогут ограничить размер этого буфера, чтобы избежать проблемы неограниченного количества неподтвержденных сообщений в буфере.

  • Конечно, это сделает пропускную способность очень низкой, особенно если задержка клиентских соединений велика, особенно в средах с высокой задержкой потребительских соединений. Немного более высокое значение будет оптимальным для большинства приложений.

  • Производитель может быть последовательным с приведенным выше примером. Нет необходимости менять

  • потребитель

    • /**
       * 这是一个测试的消费者
       *@author DingYongJun
       *@date 2021/8/1
       */
      public class DyConsumerTest_xiaoxiyingda01 {
      
          public static void main(String[] args) throws Exception{
              //使用工具类来创建通道
              Channel channel = RabbitMqUtils.getChannel();
      
              System.out.println("我是消费者A,我在等待接收消息!");
              DeliverCallback deliverCallback = (String var1, Delivery var2)->{
                  String message= new String(var2.getBody());
                  try {
                      Thread.sleep(1000);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  System.out.println(message);
                  //true 代表批量应答 channel 上未应答的消息  false 单条应答
                  boolean multiple = false;
                  channel.basicAck(var2.getEnvelope().getDeliveryTag(),multiple);
              };
              CancelCallback cancelCallback = (String var1)->{
                  System.out.println("消息消费被中断");
              };
              //不公平分发
              int prefetchCount = 5;
              channel.basicQos(prefetchCount);
              /**
               * 消费者消费消息
               * 1.消费哪个队列
               * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
               * 3.消费者未成功消费的回调
               */
              channel.basicConsume(QueueNameConstant.XIAOXIYINGDA_MODEL,false,deliverCallback,cancelCallback);
          }
      }
      
    • Код потребителя B такой же, то есть время сна больше, а значение prefetchCount установлено равным 3, что удобно видеть разницу.

  • Результаты

    • image-20210803151058489.png
    • image-20210803151107784.png
    • image-20210803151137282.png
  • Из вышеприведенных результатов можно сделать вывод, что

    • prefetchCount, когда для этого значения установлено значение 3, это означает, что в текущем канале есть не более трех входящих каналов, и еще больше будет поставлено в очередь снаружи.
    • То есть, когда мы отправили восемь частей данных, у A было 5, а у B было 3. Когда есть еще один, тот, кто съест первый, съест новый.
    • Если в очереди всегда есть данные. Тогда потребители АВ всегда будут дополняться новостями. Количество сообщений, которые сохранят полное значение prefetchCount в канале.

Впереди долгий путь, и я обязательно буду его искать вдоль и поперёк~

Если вы думаете, что я блогеры хорошо пишу! Писать нелегко, пожалуйста, ставьте лайки, подписывайтесь и комментируйте, чтобы поощрять блоггеров ~ хахах