Автоматический ответ и ручной ответ на сообщения MQ | Серия RabbitMQ (3)

задняя часть RabbitMQ
Автоматический ответ и ручной ответ на сообщения MQ | Серия RabbitMQ (3)

Это 8-й день моего участия в августовском испытании обновлений.Подробности о событии:Испытание августовского обновления


Статьи по Теме

Краткое изложение серии RabbitMQ:Серия RabbitMQ


предисловие

  • Несколько вопросов, над которыми следует подумать, прежде чем начинать отвечать на сообщение
  • Потребителям может потребоваться некоторое время для завершения задачи, что произойдет, если один из потребителей обрабатывает длинную задачу и только частично завершает ее, а затем внезапно умирает?
  • Как только RabbitMQ доставляет сообщение потребителю, он немедленно помечает сообщение для удаления. В этом случае внезапно умирает потребитель, и мы теряем сообщение, которое обрабатывали. и последующие сообщения, отправленные потребителю, потому что он не мог их получить.
  • Чтобы гарантировать, что сообщение не будет потеряно в процессе отправки, rabbitmq вводит механизм ответа на сообщение, а ответ на сообщение
    • После того, как потребитель получит сообщение и обработает его, он сообщает RabbitMQ, что оно было обработано, и RabbitMQ может удалить сообщение.

1. Автоматический ответ

  • Сообщение считается успешно доставленным сразу после его отправки.Этот режим требует компромисса между высокой пропускной способностью и безопасностью передачи данных, так как в этом режиме при наличии соединения на стороне потребителя или закрытии канала до сообщение получено, затем сообщение потеряно.
  • Конечно, с другой стороны, потребители в этом режиме могут доставлять перегруженные сообщения, а количество доставляемых сообщений не ограничено.Конечно, это может привести к тому, что потребители будут получать слишком много сообщений, которые слишком поздно обрабатывать. , что приводит к задержке этих сообщений, в конечном итоге к нехватке памяти, и в конечном итоге эти потребительские потоки уничтожаются операционной системой.
  • Таким образом, этот шаблон подходит для использования только в тех случаях, когда потребитель может эффективно и с определенной скоростью обрабатывать эти сообщения.

①, продюсер

  • /**
     * 这是一个测试的生产者
     *@author DingYongJun
     *@date 2021/8/1
     */
    public class DyProducerTest_xiaoxiyingda {
        /**
         * 这里为了方便,我们使用main函数来测试
         * 纯属看你个人选择
         * @param args
         */
        public static void main(String[] args) throws Exception{
            //使用工具类来创建通道
            Channel channel = RabbitMqUtils.getChannel();
    
            /**
             * 生成一个队列
             * 1.队列名称
             * 2.队列里面的消息是否持久化 默认消息存储在内存中
             * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
             * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
             * 5.其他参数
             */
            channel.queueDeclare(QueueNameConstant.XIAOXIYINGDA_MODEL,false,false,false,null);
    
            /**
             * 发送一个消息
             * 1.发送到那个交换机
             * 2.路由的 key 是哪个
             * 3.其他的参数信息
             * 4.发送消息的消息体
             */
            for (int i=0;i<6;i++){
                String message="我是生产者,我告诉你一个好消息!"+i;
                Thread.sleep( 1000 );
                channel.basicPublish("",QueueNameConstant.XIAOXIYINGDA_MODEL,null,message.getBytes());
                System.out.println("消息发送完毕");
            }
        }
    
    }
    

②, потребители

  • /**
     * 这是一个测试的消费者
     *@author DingYongJun
     *@date 2021/8/1
     */
    public class DyConsumerTest_xiaoxiyingda01 {
    
        public static void main(String[] args) throws Exception{
            //使用工具类来创建通道
            Channel channel = RabbitMqUtils.getChannel();
    
            System.out.println("我是消费者A,我在等待接收消息!");
            DeliverCallback deliverCallback = (String var1, Delivery var2)->{
                String message= new String(var2.getBody());
                System.out.println(message);
            };
            CancelCallback cancelCallback = (String var1)->{
                System.out.println("消息消费被中断");
            };
    
            /**
             * 消费者消费消息
             * 1.消费哪个队列
             * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
             * 3.消费者未成功消费的回调
             */
            Thread.sleep(1000);
            channel.basicConsume(QueueNameConstant.XIAOXIYINGDA_MODEL,true,deliverCallback,cancelCallback);
        }
    }
    

③、Тест

  • Результаты

    • image-20210803100143222.png
    • image-20210803100154456.png
  • в заключении

    •     public String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException {
              return this.basicConsume(queue, autoAck, "", this.consumerFromDeliverCancelCallbacks(deliverCallback, cancelCallback));
          }
      
    • Когда autoAck истинно, это автоматический ответ

    • Когда autoAck ложно, это ручной ответ

  • Не рекомендуется использовать автоматический ответ. В реальных бизнес-сценариях мы обычно используем ручной ответ.

2. Ручной ответ

  • режиссер

    • /**
       * 这是一个测试的生产者
       *@author DingYongJun
       *@date 2021/8/1
       */
      public class DyProducerTest_xiaoxiyingda {
          /**
           * 这里为了方便,我们使用main函数来测试
           * 纯属看你个人选择
           * @param args
           */
          public static void main(String[] args) throws Exception{
              //使用工具类来创建通道
              Channel channel = RabbitMqUtils.getChannel();
      
              /**
               * 生成一个队列
               * 1.队列名称
               * 2.队列里面的消息是否持久化 默认消息存储在内存中
               * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
               * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
               * 5.其他参数
               */
              channel.queueDeclare(QueueNameConstant.XIAOXIYINGDA_MODEL,false,false,false,null);
      
              /**
               * 发送一个消息
               * 1.发送到那个交换机
               * 2.路由的 key 是哪个
               * 3.其他的参数信息
               * 4.发送消息的消息体
               */
              Scanner sc = new Scanner(System.in);
              System.out.println("请输入信息");
              while (sc.hasNext()) {
                  String message = sc.nextLine();
                  channel.basicPublish("",QueueNameConstant.XIAOXIYINGDA_MODEL,null,message.getBytes());
                  System.out.println("生产者发出消息" + message);
              }
          }
      
      }
      
  • потребитель А

    • /**
       * 这是一个测试的消费者
       *@author DingYongJun
       *@date 2021/8/1
       */
      public class DyConsumerTest_xiaoxiyingda01 {
      
          public static void main(String[] args) throws Exception{
              //使用工具类来创建通道
              Channel channel = RabbitMqUtils.getChannel();
      
              System.out.println("我是消费者A,我在等待接收消息!");
              DeliverCallback deliverCallback = (String var1, Delivery var2)->{
                  String message= new String(var2.getBody());
                  try {
                      Thread.sleep(1000);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  System.out.println(message);
                  //true 代表批量应答 channel 上未应答的消息  false 单条应答
                  boolean multiple = false;
                  channel.basicAck(var2.getEnvelope().getDeliveryTag(),multiple);
              };
              CancelCallback cancelCallback = (String var1)->{
                  System.out.println("消息消费被中断");
              };
      
              /**
               * 消费者消费消息
               * 1.消费哪个队列
               * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
               * 3.消费者未成功消费的回调
               */
              channel.basicConsume(QueueNameConstant.XIAOXIYINGDA_MODEL,false,deliverCallback,cancelCallback);
          }
      }
      
    • Установите ручной ответ и уменьшите время ожидания, что означает, что бизнес обрабатывается очень быстро.

  • потребитель Б

    • /**
       * 这是一个测试的消费者
       *@author DingYongJun
       *@date 2021/8/1
       */
      public class DyConsumerTest_xiaoxiyingda02 {
      
          public static void main(String[] args) throws Exception{
              //使用工具类来创建通道
              Channel channel = RabbitMqUtils.getChannel();
      
              System.out.println("我是消费者B,我在等待接收消息!");
              DeliverCallback deliverCallback = (String var1, Delivery var2)->{
                  String message= new String(var2.getBody());
                  try {
                      Thread.sleep(30000);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  System.out.println(message);
                  //true 代表批量应答 channel 上未应答的消息  false 单条应答
                  boolean multiple = false;
                  channel.basicAck(var2.getEnvelope().getDeliveryTag(),multiple);
              };
              CancelCallback cancelCallback = (String var1)->{
                  System.out.println("消息消费被中断");
              };
      
              /**
               * 消费者消费消息
               * 1.消费哪个队列
               * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
               * 3.消费者未成功消费的回调
               */
              channel.basicConsume(QueueNameConstant.XIAOXIYINGDA_MODEL,false,deliverCallback,cancelCallback);
          }
      }
      
    • Установка ручного ответа и установка более длительного времени ожидания означает более медленную обработку услуг.

  • исполнительный лист

    • 1. Консоль отправляет два сообщения соответственно
    • 2. Теоретически у каждого из A и B есть сообщение. Если B отключится до его обработки в это время, что произойдет с сообщением?
  • Результаты

    • image-20210803103509216.png
    • image-20210803103952067.png

3. Резюме

  • Правда и ложь множественного представляют разные значения
    • ture означает пакетный ответ, который находится в единицах каналов.Например, если вы заполните один, я отвечу все завершено. Эффективно, но не безопасно.
    • ложь означает один ответ, полный ответ я отвечаю один. Безопаснее и надежнее. Но менее эффективен.
    • image-20210803104147982.png
  • Сообщения автоматически помещаются в очередь
    • Если потребитель по какой-либо причине теряет соединение (его канал закрыт, соединение закрыто или соединение TCP потеряно), из-за чего сообщение не отправляет подтверждение ACK, RabbitMQ узнает, что сообщение не было полностью обработано, и будет повторно поставить его в очередь.
    • Если другие потребители могут справиться с этим в этот момент, он вскоре перераспределит его другому потребителю. Это гарантирует, что никакие сообщения не будут потеряны, даже если потребитель время от времени умирает.
    • image-20210803104235068.png

Впереди долгий путь, и я обязательно буду его искать вдоль и поперёк~

Если вы думаете, что я блогеры хорошо пишу! Писать нелегко, пожалуйста, ставьте лайки, подписывайтесь и комментируйте, чтобы поощрять блоггеров ~ хахах