Модель публикации-подписки MQ (разветвление) | Серия RabbitMQ (7)

задняя часть RabbitMQ
Модель публикации-подписки MQ (разветвление) | Серия RabbitMQ (7)

Уже 12-й день участвую в августовском челлендже, подробности события Посмотреть:Испытание августовского обновления


Статьи по Теме

Краткое изложение серии RabbitMQ:Серия RabbitMQ


предисловие

  • Обзор простого режима и рабочего режима

    • Простой режим: один производитель соответствует одному потребителю.
    • Режим работы: Один производитель соответствует нескольким потребителям.
  • Теперь давайте представим, что делает переключатель

  • Основная идея модели обмена сообщениями RabbitMQ заключается в том, что сообщения, созданные производителями, никогда не отправляются напрямую в очереди. На самом деле обычно производители даже не знают, в какие очереди доставляются эти сообщения.

  • Вместо этого производитель может только отправлять сообщения на биржу, и работа биржи очень проста, с одной стороны он получает сообщения от производителя, а с другой стороны помещает их в очередь. Коммутатор должен точно знать, что делать с входящими сообщениями.

  • Должны ли эти сообщения быть помещены в определенную очередь, или они должны быть помещены в несколько очередей, или должны быть отброшены. Это зависит от типа переключателя.

  • image-20210804112052503.png

  • Существует четыре типа переключателей

    • Прямой (прямой)
    • тема
    • заголовки
    • разветвление
    • Как правило, MQ поможет нам создать некоторые переключатели по умолчанию, которые можно использовать напрямую, или вы можете сами создавать различные типы переключателей.
    • image-20210804112606519.png
  • Здесь нужно рассказать о безымянном переключателе, который является переключателем по умолчанию, в предыдущих статьях мы его не указывали.

    • image-20210804112458681.png
    • Первый параметр — это имя коммутатора. Пустая строка указывает на обмен по умолчанию или без имени: сообщения, которые могут быть перенаправлены в очередь, фактически определяются ключом привязки routingKey(bindingkey), если он существует.
    • Тогда в этом случае MQ перейдет на коммутатор по умолчанию (AMQP по умолчанию)
    • image-20210804112542337.png
  • Есть нечто, называемое временной очередью

    • //创建临时队列
      String queueName = channel.queueDeclare().getQueue();
      
    • image-20210804113925048.png
    • Прямое выполнение обязательно сообщит об ошибке: очевидно, скажите нам, что имя очереди не должно быть названо с помощью amq, поэтому код можно немного изменить.
    • image-20210804114330969.png
    • channel.queueDeclare("dy"+queueName,false,false,false,null);
      
    • image-20210804114444954.png
  • Разобравшись с переключением, мы можем официально запустить модель публикации-подписки~

1. Производители

  • Попробуй указать коммутатор у производителя, ведь по нормальной логике он должен быть сначала отправлен, прежде чем его можно будет принять. Иначе смирись с одиночеством~

  •     public static void publishMessageIndividually() throws Exception {
            //工具类获取通道
            Channel channel = RabbitMqUtils.getChannel();
            /**
             * 指定交换机和模式
             * 参数一:指定的交换机名称
             * 参数二:指定的交换机模式
             */
            channel.exchangeDeclare(ChangeNameConstant.FANOUT_MODEL,"fanout");
    ​
            Scanner sc = new Scanner(System.in);
            System.out.println("请输入信息");
            while (sc.hasNext()) {
                String message = sc.nextLine();
                channel.basicPublish(ChangeNameConstant.FANOUT_MODEL, "20210804", null, message.getBytes("UTF-8"));
                System.out.println("生产者发出消息" + message);
            }
        }
    
  • image-20210804140950379.png

  • Из этого видно

    • создал производителя
    • Создал имя коммутатора = fanout_pattern
    • Тип коммутатора - разветвитель

2. Потребители

  • потребитель А

  • /**
     * 这是一个测试的消费者
     *@author DingYongJun
     *@date 2021/8/1
     */
    public class DyConsumerTest_Fanout01 {
    ​
        public static void main(String[] args) throws Exception{
            //使用工具类来创建通道
            Channel channel = RabbitMqUtils.getChannel();
    ​
            /**
             * 生成一个临时的队列 队列的名称是随机的
             * 当消费者断开和该队列的连接时 队列自动删除
             */
            String queueName = channel.queueDeclare().getQueue();
            //把该临时队列绑定我们的 exchange 其中 routingkey(也称之为 binding key)为空字符串
            channel.queueBind(queueName, ChangeNameConstant.FANOUT_MODEL, "20210804");
            System.out.println("交换机A等待接收消息,把接收到的消息打印在屏幕.....");
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("控制台打印接收到的消息"+message);
            };
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
                System.out.println("消息中断了~");
            });
        }
    }
    
  • Код потребителя B такой же, но изменен на потребитель B, ожидающий получения сообщения~

  • image-20210804141203493.png

  • Объявленные временные очереди двух потребителей успешно привязаны к обмену!

  • Результаты

    • режиссер
    • image-20210804141255986.png
    • потребитель А
    • image-20210804141318178.png
    • потребитель Б
    • image-20210804141330836.png
  • Производитель отправляет сообщение коммутатору.

  • Оба потребителя получили сообщение.

3. Резюме

  • image-20210804141425353.png
  • Очевидно, что здесь мы не будем отправлять сообщения напрямую в очередь
  • Вместо этого отправьте сообщение на коммутатор
  • Переключатель, какие очереди связаны, чтобы найти коммутатор
  • Затем отправьте сообщение во все связанные очереди
  • Последняя очередь отправляет сообщение потребителю

Впереди долгий путь, и я обязательно буду его искать вдоль и поперёк~

Если вы думаете, что я блогеры хорошо пишу! Писать нелегко, пожалуйста, ставьте лайки, подписывайтесь и комментируйте, чтобы поощрять блоггеров ~ хахах