Тематический режим MQ (тема) | Серия RabbitMQ (9)

задняя часть RabbitMQ
Тематический режим MQ (тема) | Серия RabbitMQ (9)

Это 14-й день моего участия в августовском испытании обновлений.Подробности о событии:Испытание августовского обновления


Статьи по Теме

Краткое содержание серии MyBatis:Серия MyBatis


предисловие

  • Например, типы журналов, которые мы хотим получать, — это info.base и info.advantage, а очереди требуются сообщения только из info.base, поэтому в настоящее время direct этого сделать не может.

  • В настоящее время можно использовать только тип темы.

  • Ключ routing_key сообщений, отправляемых на переключатель тем, не может быть записан произвольно и должен соответствовать определенным требованиям.Должен быть список слов, разделенных точками.

  • Этими словами могут быть любые слова, например: "stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit".

  • Конечно, этот список слов не может превышать 255 байт.

  • список правил

    • * (звездочка) может заменить слово
    • # (знак решетки) может заменить ноль или более слов

1. Производители

  • /**
     * 这是一个测试的生产者
     *@author DingYongJun
     *@date 2021/8/6
     */
    public class DyProducerTest_topic {
    ​
        private static final String EXCHANGE_NAME = "topic_logs";
        /**
         * 这里为了方便,我们使用main函数来测试
         * 纯属看你个人选择
         * @param args
         */
        public static void main(String[] args) throws Exception {
            publishMessageIndividually();
        }
    ​
        public static void publishMessageIndividually() throws Exception {
            //使用工具类来创建通道
            Channel channel = RabbitMqUtils.getChannel();
            /**
             * Q1-->绑定的是
             * 中间带 orange 带 3 个单词的字符串(*.orange.*)
             * Q2-->绑定的是
             * 最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)
             * 第一个单词是 lazy 的多个单词(lazy.#)
             *
             */
            Map<String, String> bindingKeyMap = new HashMap<>();
            bindingKeyMap.put("quick.orange.rabbit", "被队列 Q1Q2 接收到");
            bindingKeyMap.put("lazy.orange.elephant", "被队列 Q1Q2 接收到");
            bindingKeyMap.put("quick.orange.fox", "被队列 Q1 接收到");
            bindingKeyMap.put("lazy.brown.fox", "被队列 Q2 接收到");
            bindingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次");
            bindingKeyMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
            bindingKeyMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");
            bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词但匹配 Q2");
    ​
            for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){
                String bindingKey = bindingKeyEntry.getKey();
                String message = bindingKeyEntry.getValue();
                channel.basicPublish(EXCHANGE_NAME,bindingKey, null,
                        message.getBytes("UTF-8"));
                System.out.println("生产者发出消息" + message);
            }
        }
    }
    

2. Потребители

  • A

    • /**
       * 这是一个测试的消费者
       *
       * @author DingYongJun
       * @date 2021/8/6
       */
      public class DyConsumerTest_topic01 {
      ​
          private static final String EXCHANGE_NAME = "topic_logs";
      ​
          public static void main(String[] args) throws Exception {
                  Channel channel = RabbitMqUtils.getChannel();
                  channel.exchangeDeclare(EXCHANGE_NAME, "topic");
                  //声明 Q1 队列与绑定关系
                  String queueName="Q1";
                  channel.queueDeclare(queueName, false, false, false, null);
                  channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
                  System.out.println("等待接收消息.....");
                  DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                      String message = new String(delivery.getBody(), "UTF-8");
                      System.out.println(" 接收队列 :"+queueName+" 绑定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message);
                  };
                  channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
                  });
              }
      }
      
  • B

    • /**
       * 这是一个测试的消费者
       *@author DingYongJun
       *@date 2021/8/1
       */
      public class DyConsumerTest_topic02 {
          private static final String EXCHANGE_NAME = "topic_logs";
          public static void main(String[] args) throws Exception{
              Channel channel = RabbitMqUtils.getChannel();
              channel.exchangeDeclare(EXCHANGE_NAME, "topic");
              //声明 Q2 队列与绑定关系
              String queueName="Q2";
              channel.queueDeclare(queueName, false, false, false, null);
              channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
              channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
              System.out.println("等待接收消息.....");
              DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                  String message = new String(delivery.getBody(), "UTF-8");
                  System.out.println(" 接收队列 :"+queueName+" 绑定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message);
              };
              channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
              });
          }
      }
      
  • Результаты

    • image-20210806105943532.png
    • режиссер
    • image-20210806110004336.png
    • потребитель А
    • image-20210806110022889.png
    • потребитель Б
    • image-20210806110045841.png

3. Резюме

  • image-20210806110059072.png
  • Если ключом привязки очереди является #, то очередь будет получать все данные, что немного похоже на разветвление.
  • Если нет ключа привязки очереди#и*появляется, то тип привязки этой вещи прямой.
  • Так что тема - самый гибкий режим! Плюс асинхронные ответы. Также самый используемый узор в нашей работе!

Впереди долгий путь, и я обязательно буду его искать вдоль и поперёк~

Если вы думаете, что я блогеры хорошо пишу! Писать нелегко, пожалуйста, ставьте лайки, подписывайтесь и комментируйте, чтобы поощрять блоггеров ~ хахах