Постоянство MQ | Серия RabbitMQ (4)

задняя часть RabbitMQ
Постоянство MQ | Серия RabbitMQ (4)

Это 9-й день моего участия в августовском испытании обновлений, подробности о событии:Испытание августовского обновления


Статьи по Теме

Краткое изложение серии RabbitMQ:Серия RabbitMQ


предисловие

  • Мы только что увидели, как поступить в ситуации, когда задача не потеряна, но как гарантировать, что сообщение, отправленное производителем сообщения, не будет потеряно при остановке службы RabbitMQ.
  • По умолчанию RabbitMQ игнорирует очереди и сообщения, когда он выходит или аварийно завершает работу по какой-либо причине, если только не указано не делать этого.
  • Чтобы сообщения не были потеряны, необходимы две вещи: нам нужно пометить и очередь, и сообщение как долговременные.

1. Сохранение очереди

  • Очереди, которые мы создали ранее, все непостоянны. Если RabbitMQ будет перезапущен, очередь будет удалена. Если вы хотите, чтобы очередь была постоянной, вам нужно установить постоянный параметр на постоянный при объявлении очереди.

  • image-20210803110851867.png

  • Перезапускаем RabbitMQ

  • image-20210803111540124.png

  • Успешно перезапустите и обновите страницу управления

  • image-20210803111603038.png

  • Да, наша очередь потеряна~

  • Очевидно, что в производственной среде мы не можем гарантировать, что сервер никогда не перезапустится и никогда не выключится. Поэтому, если очередь не сохраняется, данные легко теряются.

  • Постоянные данные — это то, что мы должны сделать.

  • Но следует отметить, что если объявленная ранее очередь не является постоянной, необходимо сначала удалить исходную очередь или создать постоянную очередь заново, иначе произойдет ошибка.

  • Исходный код выглядит следующим образом

  •     public com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException {
            validateQueueNameLength(queue);
            return (com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk)this.exnWrappingRpc((new com.rabbitmq.client.AMQP.Queue.Declare.Builder()).queue(queue).durable(durable).exclusive(exclusive).autoDelete(autoDelete).arguments(arguments).build()).getMethod();
        }
    
  • Изменить код производителя

  • /**
     * 这是一个测试的生产者
     *@author DingYongJun
     *@date 2021/8/1
     */
    public class DyProducerTest_xiaoxiyingda {
        /**
         * 这里为了方便,我们使用main函数来测试
         * 纯属看你个人选择
         * @param args
         */
        public static void main(String[] args) throws Exception{
            //使用工具类来创建通道
            Channel channel = RabbitMqUtils.getChannel();
    
            /**
             * 生成一个队列
             * 1.队列名称
             * 2.队列里面的消息是否持久化 默认消息存储在内存中
             * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
             * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
             * 5.其他参数
             */
            //这是持久化的参数。false不进行持久化,true进行持久化
            boolean durable = true;
            channel.queueDeclare(QueueNameConstant.XIAOXIYINGDA_MODEL,durable,false,false,null);
    
            /**
             * 发送一个消息
             * 1.发送到那个交换机
             * 2.路由的 key 是哪个
             * 3.其他的参数信息
             * 4.发送消息的消息体
             */
            Scanner sc = new Scanner(System.in);
            System.out.println("请输入信息");
            while (sc.hasNext()) {
                String message = sc.nextLine();
                channel.basicPublish("",QueueNameConstant.XIAOXIYINGDA_MODEL,null,message.getBytes());
                System.out.println("生产者发出消息" + message);
            }
        }
    }
    
  • Как упоминалось выше, исходная очередь должна быть удалена, иначе будет сообщено об ошибке.

  • image-20210803113239329.png

  • Подробное сообщение об ошибке

    • Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'xiaoxiyingda' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)
      
  • ОК, удалите исходную очередь и выполните ее снова.

  • image-20210803113338349.png

  • Буква D появляется в Features, что доказывает, что очередь успешно сохранена!

  • Попробуйте снова перезапустить RabbitMQ. Экономьте место, не наклеивайте картинки.

  • image-20210803113628870.png

  • Идеально! Очередь еще есть! Приятно!

2. Сохранение сообщения

  • Изменить код производителя

    • /**
       * 这是一个测试的生产者
       *@author DingYongJun
       *@date 2021/8/1
       */
      public class DyProducerTest_xiaoxiyingda {
          /**
           * 这里为了方便,我们使用main函数来测试
           * 纯属看你个人选择
           * @param args
           */
          public static void main(String[] args) throws Exception{
              //使用工具类来创建通道
              Channel channel = RabbitMqUtils.getChannel();
      
              /**
               * 生成一个队列
               * 1.队列名称
               * 2.队列里面的消息是否持久化 默认消息存储在内存中
               * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
               * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
               * 5.其他参数
               */
              //这是持久化的参数。false不进行持久化,true进行持久化
              boolean durable = true;
              channel.queueDeclare(QueueNameConstant.XIAOXIYINGDA_MODEL,durable,false,false,null);
      
              /**
               * 发送一个消息
               * 1.发送到那个交换机
               * 2.路由的 key 是哪个
               * 3.其他的参数信息
               * 4.发送消息的消息体
               */
              Scanner sc = new Scanner(System.in);
              System.out.println("请输入信息");
              while (sc.hasNext()) {
                  String message = sc.nextLine();
                  //MessageProperties.PERSISTENT_TEXT_PLAIN;这个代表消息持久化到硬盘
                  channel.basicPublish("",QueueNameConstant.XIAOXIYINGDA_MODEL,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
                  System.out.println("生产者发出消息" + message);
              }
          }
      
      }
      
    • MessageProperties.PERSISTENT_TEXT_PLAIN

      • Посмотрите на исходный код, что это?

      •     public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException {
                this.basicPublish(exchange, routingKey, false, props, body);
            }
        
      • public class MessageProperties {
            public static final BasicProperties MINIMAL_BASIC = new BasicProperties((String)null, (String)null, (Map)null, (Integer)null, (Integer)null, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
            public static final BasicProperties MINIMAL_PERSISTENT_BASIC = new BasicProperties((String)null, (String)null, (Map)null, 2, (Integer)null, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
            public static final BasicProperties BASIC = new BasicProperties("application/octet-stream", (String)null, (Map)null, 1, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
            public static final BasicProperties PERSISTENT_BASIC = new BasicProperties("application/octet-stream", (String)null, (Map)null, 2, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
            public static final BasicProperties TEXT_PLAIN = new BasicProperties("text/plain", (String)null, (Map)null, 1, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
            public static final BasicProperties PERSISTENT_TEXT_PLAIN = new BasicProperties("text/plain", (String)null, (Map)null, 2, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
        
            public MessageProperties() {
            }
        }
        
      • То есть при отправке сообщения нам нужно записать параметр, и тогда mq сохранит его на диск.


Впереди долгий путь, и я обязательно буду его искать вдоль и поперёк~

Если вы думаете, что я блогеры хорошо пишу! Писать нелегко, пожалуйста, ставьте лайки, подписывайтесь и комментируйте, чтобы поощрять блоггеров ~ хахах