Начало работы с RabbitMQ (3) — пять режимов и четыре переключателя RabbitMQ

RabbitMQ

Предыдущая статья:Начало работы с RabbitMQ (2) — Создание базовой очереди сообщенийКод, показанный в этой статье, был загружен на github.javaWithoutSmoke/rabbitmq-demo

Начало работы с RabbitMQ — пять режимов и четыре переключателя RabbitMQ

Шесть режимов сообщений

В RabbitMQ есть шесть режимов распространения сообщений:Шесть режимов объяснены на официальном сайте RabbitMQ.

  • Простая рабочая очередь (Simple Work Queue): Также известная как двухточечный режим, сообщение потребляется потребителем. (При наличии нескольких потребителей для рассылки сообщений потребителям используется механизм ротации по умолчанию).
  • Work Queues (рабочие очереди): также известные как справедливые очереди, модель очереди сообщений для тех, кто может работать больше. очередь должна получитьРучное подтверждениеможет продолжать отправлять сообщения потребителям.
  • Публикация/подписка (модель публикации-подписки): сообщение используется несколькими потребителями.
  • Маршрутизация: выборочное получение сообщений.
  • Темы: Выборочное получение сообщений по определенным правилам
  • Режим RPC: издатель публикует сообщение и ожидает результата через RPC. В настоящее время сценариев должно быть немного, а код более сложный, поэтому в этой главе мы не будем вдаваться в подробности.
  • Примечание. В конце официального веб-сайта Publisher Confirms — это механизм подтверждения сообщений. Относится к тому, как производители отправляют надежные сообщения.

Четыре биржи RabbitMQ

Для понимания этих шаблонов сообщений была введена концепцияОбмен:

существуетопубликовать подписатьсяЕсть объяснение этому понятию:

Основная идея модели обмена сообщениями RabbitMQ заключается в том, что производитель никогда не отправляет сообщения непосредственно в очередь. На самом деле производители часто даже не знают, доставлять ли сообщения в какие-либо очереди вообще. Вместо этого производители могут отправлять сообщения только на биржи. Общение — это очень просто. С одной стороны, он получает сообщения от производителей, а с другой — отправляет их в очередь. Биржа должна точно знать, что делать с входящими сообщениями. Должен ли он быть прикреплен к определенной очереди? Должен ли он быть добавлен ко многим очередям? Или стоит отбросить. Правила определяются типом переключателя.

Существует четыре типа обмена:

  • прямой (непосредственно подключенный к обмену): привязать очередь к обмену, routeKey сообщения должен совпадать с routeKey, привязанным к очереди.
  • fanout (переключатель в форме веера): routeKey не обрабатывается, и сообщение напрямую перенаправляется во все привязанные к нему очереди.
  • Тема (обмен темами): По определенным правилам пересылать сообщения в очереди, которые соответствуют правилам согласно routeKey, где # используется для соответствия одному или нескольким словам (с более широким диапазоном), а * используется для соответствия слову.
  • заголовки (переключатель заголовков): пересылать сообщение в соответствии с заголовками сообщения вместо пересылки сообщения в соответствии с routeKey, где заголовок является Map, что означает, что он может соответствовать не только строковому типу, но и другим типам данные. Правила можно разделить на все совпадения "ключ-значение" или на отдельные совпадения "ключ-значение".

Фактически, здесь мы почти можем провести сравнение отношений между моделью сообщений и Exchange:

режим сообщений выключатель
Простая рабочая очередь, рабочие очереди пустой переключатель
Публикация/подписка (модель публикации-подписки) разветвитель (переключатель вентилятора)
Маршрутизация (режим маршрутизации) прямой (прямой переключатель)
Темы тема (обмен темами)

Простая очередь работ

Просто посмотрите на предыдущую статьюНачало работы с RabbitMQ (2) — Создание базовой очереди сообщений

Очередь работ

Мы добавляем очередь с именем work-queue в RabbitMQ.

在这里插入图片描述
工作队列

  • режиссер
/**
 * 生产者
 */
public class Producer {

    private static final String QUEUE_NAME = "work-queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        while (true) {
//            System.out.println("请输入消息:");
            Scanner scanner = new Scanner(System.in);
            //1、创建连接
            Connection connection = RabbitMQConnection.getConnection();
            //2、创建通道
            Channel channel = connection.createChannel();
            //3、发送消息,这里使用Scanner通过控制台输入的内容来作为消息
            //nextLine() 以回车结束当前的输入,会接收空格
            String message = scanner.nextLine();
            /*
            参数说明:
            exchange:当期尚未指定exchange,又不能为null,这里用空字符串表示为一个默认的exchange或者匿名的exchange
            routingKey: 就是队列名称
            props:消息的额外属性
            body: 消息主体
             */
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("消息已被发送:" + message);
            //发送完记得关闭连接
            channel.close();
            connection.close();
        }
    }
}
  • потребитель Очередь работ должна определить наличие нескольких потребителей. Здесь мы устанавливаем потребителя 1 для обработки сообщений за 1 с, а потребителя 2 для обработки сообщений за 3 с.

/**
 * 消费者1
 */
public class Consumer1{

    private static final String QUEUE_NAME = "work-queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接
        Connection connection = RabbitMQConnection.getConnection();
        // 2、创建通道
        Channel channel = connection.createChannel();

        // 3、同一时刻服务器只会发送一条消息给消费者
        channel.basicQos(1);

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            //接收到一个消息时会使用这个方法,这里进行重写,用来输出接收到的消息
            /*
            参数说明:
            consumerTag:消费者关联的标签
            envelope: 消息包数据
            BasicProperties:消息的额外属性
            body: 消息主体,当前为二进制
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    // 模拟处理请求耗时较长的情况
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String messageBody = new String(body);
                System.out.println("消费者消费消息:"+messageBody);
                // 手动确认,
                // 第一个参数: 默认的消息的唯一标志
                // 第二个参数:是否批量.当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 4、添加监听,改成手动ack
        channel.basicConsume(QUEUE_NAME,false, defaultConsumer);
    }
}

  • Потребитель 2

/**
 * 消费者2
 */
public class Consumer2 {

    private static final String QUEUE_NAME = "work-queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接
        Connection connection = RabbitMQConnection.getConnection();
        // 2、创建通道
        Channel channel = connection.createChannel();

        // 3、同一时刻服务器只会发送一条消息给消费者
        channel.basicQos(1);

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            //接收到一个消息时会使用这个方法,这里进行重写,用来输出接收到的消息
            /*
            参数说明:
            consumerTag:消费者关联的标签
            envelope: 消息包数据
            BasicProperties:消息的额外属性
            body: 消息主体,当前为二进制
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    // 模拟处理请求耗时较长的情况
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String messageBody = new String(body);
                System.out.println("消费者消费消息:"+messageBody);
                // 手动确认,
                // 第一个参数: 默认的消息的唯一标志
                // 第二个参数:是否批量.当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 4、添加监听,改成手动ack
        channel.basicConsume(QUEUE_NAME,false, defaultConsumer);
    }
}

Сначала запустите двух потребителей, а затем запустите производителя.Вы можете видеть, что потребитель 1 обрабатывает быстрее, поэтому ему нужно обработать несколько сообщений.

在这里插入图片描述
在这里插入图片描述

Публикация/подписка (модель публикации-подписки)

Сначала мы создаем две очереди подписки1 и подписки2 ,

在这里插入图片描述
Также существует обмен по типу fanout Publish-Subscribe
在这里插入图片描述

Затем привяжите Exchange к двум очередям, без привязки сообщения не могут быть доставлены в очереди.

  • режиссер
/**
* 生产者
*/
public class Producer {

   private static final String EXCHANGE_NAME = "Publish-Subscribe";

   public static void main(String[] args) throws IOException, TimeoutException {
       for (int i = 1; i < 7; i++) {
           Connection connection = RabbitMQConnection.getConnection();
           Channel channel = connection.createChannel();
           // 绑定交换机
           channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
           channel.basicPublish(EXCHANGE_NAME, "", null, String.valueOf(i).getBytes());
           System.out.println("消息已被发送:" + i);
           channel.close();
           connection.close();
       }
   }
}
  • Потребитель 1

/**
 * 消费者1
 */
public class Consumer1{

    private static final String QUEUE_NAME = "subscribe1";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMQConnection.getConnection();
        Channel channel = connection.createChannel();
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String messageBody = new String(body);
                System.out.println("消费者1消费消息:"+messageBody);
            }
        };
        channel.basicConsume(QUEUE_NAME,true, defaultConsumer);
    }
}

  • Потребитель 2

/**
 * 消费者subscribe2
 */
public class Consumer2 {

    private static final String QUEUE_NAME = "subscribe2";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMQConnection.getConnection();
        Channel channel = connection.createChannel();
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String messageBody = new String(body);
                System.out.println("消费者2消费消息:"+messageBody);
            }
        };
        channel.basicConsume(QUEUE_NAME,true, defaultConsumer);
    }
}

После запуска потребителя снова запустите производителя. Вы можете видеть, что оба потребителя потребляют все сообщения от производителя.

在这里插入图片描述
在这里插入图片描述

Думаю о публикации и подписке

  1. Модель публикации-подписки на самом деле очень похожа на то, когда мы обычно используем официальную учетную запись WeChat.Официальная учетная запись публикует статьи, и все фанаты, которые подписываются на официальную учетную запись, могут получать сообщения, отправленные официальной учетной записью. Но мы обращаем внимание на официальный аккаунт, а официальный аккаунт не передаст нам данные. Так здесь то же самое?

    сцена первая: Теперь мы сначала запускаем производителя, заканчиваем создание сообщения, а затем запускаем потребителя. Обнаружено, что два потребителя все еще могут потреблять созданные сообщения.

    在这里插入图片描述
    На самом деле, когда производитель публикует сообщение, он доставляет сообщение коммутатору, а коммутатор помогает нам доставить сообщение в две очереди, поэтому со временем мой производитель остановился, но в это время в моей очереди уже есть информация.
    在这里插入图片描述
    Итак, как только мы запустим потребитель для прослушивания очереди, мы сможем нормально потреблять данные.

    Сценарий второй: Сначала мы развязываем связь между Потребителем 2 и Exchange, а затем запускаем Потребителя 1 и Производителя.

    在这里插入图片描述
    В это время мы повторно связываем очередь subscribe2 с обменом Publish-Subscribe и запускаем Consumer 2. В настоящее время вы можете видеть, что у Consumer 2 нет данных для потребления. На этот раз вернемся к теме.На самом деле эта сцена больше соответствует упомянутой в заголовке сцене, потому что болельщики здесь видят, что толчок получен, а командные потребители фактически завершили потребление. Вновь подписанный пользователь эквивалентен добавлению новой очереди в существующую Exchagne, и новое сообщение должно быть доставлено, прежде чем новый пользователь сможет его получить.

Маршрутизация (режим маршрутизации)

Создание коммутаторов с прямым подключением Очереди маршрутизации и потребителей, три очереди RoutingConsumer1, RoutingConsumer3, RoutingConsumer3

在这里插入图片描述
Затем отношения привязки: обратите внимание на ключ маршрутизации ниже, почему вы хотите установить его именно так? Потому что я хочу знать, во сколько очередей будет доставлено сообщение, если ключ маршрутизации одинаков?
在这里插入图片描述

  • режиссер

/**
 * 生产者
 */
public class Producer {

    private static final String EXCHANGE_NAME = "Routing";

    public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = RabbitMQConnection.getConnection();
            Channel channel = connection.createChannel();
            // 绑定直连交换机
            channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
            channel.basicPublish(EXCHANGE_NAME, "key1", null, String.valueOf("key1").getBytes());
            channel.basicPublish(EXCHANGE_NAME, "key2", null, String.valueOf("key2").getBytes());
            System.out.println("消息已发送");
            channel.close();
            connection.close();
    }
}
  • Потребитель 1 (два других кода потребителя почти одинаковы, просто измените серийный номер)

public class RoutingConsumer1 {

    private static final String QUEUE_NAME = "RoutingConsumer1";

    private static final String EXCHANGE_NAME = "Routing";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMQConnection.getConnection();
        Channel channel = connection.createChannel();
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key1");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String messageBody = new String(body);
                System.out.println("消费者1消费消息:"+messageBody);
            }
        };
        channel.basicConsume(QUEUE_NAME,true, defaultConsumer);
    }
}

Сначала запустите трех потребителей, а затем запустите производителя. Установлено, что оба потребителя 1 и 2 потребляют ключ1, а потребитель 3 выдает ключ3.

Вывод вышел: Когда для передачи сообщения используется режим сообщения маршрутизации, прямой обмен доставляет сообщение на соответствующий ключ маршрутизации.

Мысли о коммутаторах с прямым подключением

  1. Если ключ маршрутизации совпадает лишь частично, будет ли он доставлен? Нам нужно только позволить производителю передать значение Routing Key = key
       channel.basicPublish(EXCHANGE_NAME, "key", null, String.valueOf("key").getBytes());
    
    Оказалось, что никто из потребителей его не употреблял. то естьВ случае прямой связи такого частичного совпадения нет.

Темы

Когда нам нужно частично сопоставить ключ маршрутизации, на этот раз в игру вступает тематический режим. В режиме тем обратите внимание на три символа

символ эффект
. разделить слова
* сопоставить слово
# соответствовать одному или нескольким словам

Результат выглядит следующим образом: routeKey сообщения производителя: java.without.smoke

серийный номер routeKey потреблять ли
TopicConsumer1 java.without.smoke Y
TopicConsumer2 java.* N
TopicConsumer3 java.without.* Y
TopicConsumer4 java.# Y

Режим заголовка

Главное — установить свойство заголовка и метод сопоставления в Channel.

  • режиссер
 public static Map<String, Object> map = new HashMap<>();
 map = new HashMap<>();
 map.put("ID", 1);
 map.put("Name", "aaaa");

 AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().headers(map).build();
 channel.basicPublish(EXCHANGE_NAME, "java.without.smoke", props, String.valueOf("key1").getBytes());
  • потребитель
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().headers(map).build();
        channel.queueBind(QUEUE_NAME, Producer.EXCHANGE_NAME, "java.*", props.getHeaders());

Суммировать

  1. RabbitMQ предоставил нам так много режимов сообщений, среди которых режим темы может достигать эффекта режима маршрутизации и режима публикации-подписки. Каждый режим имеет свои особенности. В случае, когда у очереди есть только один потребитель, тематический режим также может достигать эффекта однорангового режима.

    Сцены план
    Сообщения фиксируются в очереди, и нет необходимости в нескольких потребителях для ускорения потребления. Использовать одноранговый режим
    Сообщения помещаются в очередь, поэтому для ускорения потребления требуется несколько потребителей. Использовать режим рабочей очереди
    Размещать сообщения в нескольких очередях в соответствии с определенными правилами тематический режим
  2. Доставка сообщений RabbitMQ основана на режиме «производитель -> обмен -> очередь -> потребитель», но мы можем понимать двухточечный режим и режим рабочей очереди как анонимный обмен для очередей доставки.

  3. Переключатель Exchange на самом деле очень похож на сервер nginx, который мы используем при обратном проксировании: nginx отвечает за пересылку запросов, а Exchagne — за пересылку сообщений.