Предыдущая статья:Начало работы с RabbitMQ (2) — Создание базовой очереди сообщенийКод, показанный в этой статье, был загружен на github.javaWithoutSmoke/rabbitmq-demo
Начало работы с RabbitMQ — пять режимов и четыре переключателя RabbitMQ
Шесть режимов сообщений
В RabbitMQ есть шесть режимов распространения сообщений:Шесть режимов объяснены на официальном сайте RabbitMQ.
- Простая рабочая очередь (Simple Work Queue): Также известная как двухточечный режим, сообщение потребляется потребителем. (При наличии нескольких потребителей для рассылки сообщений потребителям используется механизм ротации по умолчанию).
- Work Queues (рабочие очереди): также известные как справедливые очереди, модель очереди сообщений для тех, кто может работать больше. очередь должна получитьРучное подтверждениеможет продолжать отправлять сообщения потребителям.
- Публикация/подписка (модель публикации-подписки): сообщение используется несколькими потребителями.
- Маршрутизация: выборочное получение сообщений.
- Темы: Выборочное получение сообщений по определенным правилам
- Режим RPC: издатель публикует сообщение и ожидает результата через RPC. В настоящее время сценариев должно быть немного, а код более сложный, поэтому в этой главе мы не будем вдаваться в подробности.
- Примечание. В конце официального веб-сайта Publisher Confirms — это механизм подтверждения сообщений. Относится к тому, как производители отправляют надежные сообщения.
Четыре биржи RabbitMQ
Для понимания этих шаблонов сообщений была введена концепцияОбмен:
существуетопубликовать подписатьсяЕсть объяснение этому понятию:
Основная идея модели обмена сообщениями RabbitMQ заключается в том, что производитель никогда не отправляет сообщения непосредственно в очередь. На самом деле производители часто даже не знают, доставлять ли сообщения в какие-либо очереди вообще. Вместо этого производители могут отправлять сообщения только на биржи. Общение — это очень просто. С одной стороны, он получает сообщения от производителей, а с другой — отправляет их в очередь. Биржа должна точно знать, что делать с входящими сообщениями. Должен ли он быть прикреплен к определенной очереди? Должен ли он быть добавлен ко многим очередям? Или стоит отбросить. Правила определяются типом переключателя.
Существует четыре типа обмена:
- прямой (непосредственно подключенный к обмену): привязать очередь к обмену, routeKey сообщения должен совпадать с routeKey, привязанным к очереди.
- fanout (переключатель в форме веера): routeKey не обрабатывается, и сообщение напрямую перенаправляется во все привязанные к нему очереди.
- Тема (обмен темами): По определенным правилам пересылать сообщения в очереди, которые соответствуют правилам согласно routeKey, где # используется для соответствия одному или нескольким словам (с более широким диапазоном), а * используется для соответствия слову.
- заголовки (переключатель заголовков): пересылать сообщение в соответствии с заголовками сообщения вместо пересылки сообщения в соответствии с routeKey, где заголовок является Map, что означает, что он может соответствовать не только строковому типу, но и другим типам данные. Правила можно разделить на все совпадения "ключ-значение" или на отдельные совпадения "ключ-значение".
Фактически, здесь мы почти можем провести сравнение отношений между моделью сообщений и Exchange:
режим сообщений | выключатель |
---|---|
Простая рабочая очередь, рабочие очереди | пустой переключатель |
Публикация/подписка (модель публикации-подписки) | разветвитель (переключатель вентилятора) |
Маршрутизация (режим маршрутизации) | прямой (прямой переключатель) |
Темы | тема (обмен темами) |
Простая очередь работ
Просто посмотрите на предыдущую статьюНачало работы с RabbitMQ (2) — Создание базовой очереди сообщений
Очередь работ
Мы добавляем очередь с именем work-queue в RabbitMQ.
- режиссер
/**
* 生产者
*/
public class Producer {
private static final String QUEUE_NAME = "work-queue";
public static void main(String[] args) throws IOException, TimeoutException {
while (true) {
// System.out.println("请输入消息:");
Scanner scanner = new Scanner(System.in);
//1、创建连接
Connection connection = RabbitMQConnection.getConnection();
//2、创建通道
Channel channel = connection.createChannel();
//3、发送消息,这里使用Scanner通过控制台输入的内容来作为消息
//nextLine() 以回车结束当前的输入,会接收空格
String message = scanner.nextLine();
/*
参数说明:
exchange:当期尚未指定exchange,又不能为null,这里用空字符串表示为一个默认的exchange或者匿名的exchange
routingKey: 就是队列名称
props:消息的额外属性
body: 消息主体
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("消息已被发送:" + message);
//发送完记得关闭连接
channel.close();
connection.close();
}
}
}
- потребитель Очередь работ должна определить наличие нескольких потребителей. Здесь мы устанавливаем потребителя 1 для обработки сообщений за 1 с, а потребителя 2 для обработки сообщений за 3 с.
/**
* 消费者1
*/
public class Consumer1{
private static final String QUEUE_NAME = "work-queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接
Connection connection = RabbitMQConnection.getConnection();
// 2、创建通道
Channel channel = connection.createChannel();
// 3、同一时刻服务器只会发送一条消息给消费者
channel.basicQos(1);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
//接收到一个消息时会使用这个方法,这里进行重写,用来输出接收到的消息
/*
参数说明:
consumerTag:消费者关联的标签
envelope: 消息包数据
BasicProperties:消息的额外属性
body: 消息主体,当前为二进制
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
// 模拟处理请求耗时较长的情况
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String messageBody = new String(body);
System.out.println("消费者消费消息:"+messageBody);
// 手动确认,
// 第一个参数: 默认的消息的唯一标志
// 第二个参数:是否批量.当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 4、添加监听,改成手动ack
channel.basicConsume(QUEUE_NAME,false, defaultConsumer);
}
}
- Потребитель 2
/**
* 消费者2
*/
public class Consumer2 {
private static final String QUEUE_NAME = "work-queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接
Connection connection = RabbitMQConnection.getConnection();
// 2、创建通道
Channel channel = connection.createChannel();
// 3、同一时刻服务器只会发送一条消息给消费者
channel.basicQos(1);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
//接收到一个消息时会使用这个方法,这里进行重写,用来输出接收到的消息
/*
参数说明:
consumerTag:消费者关联的标签
envelope: 消息包数据
BasicProperties:消息的额外属性
body: 消息主体,当前为二进制
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
// 模拟处理请求耗时较长的情况
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String messageBody = new String(body);
System.out.println("消费者消费消息:"+messageBody);
// 手动确认,
// 第一个参数: 默认的消息的唯一标志
// 第二个参数:是否批量.当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 4、添加监听,改成手动ack
channel.basicConsume(QUEUE_NAME,false, defaultConsumer);
}
}
Сначала запустите двух потребителей, а затем запустите производителя.Вы можете видеть, что потребитель 1 обрабатывает быстрее, поэтому ему нужно обработать несколько сообщений.
Публикация/подписка (модель публикации-подписки)
Сначала мы создаем две очереди подписки1 и подписки2 ,
Также существует обмен по типу fanout Publish-SubscribeЗатем привяжите Exchange к двум очередям, без привязки сообщения не могут быть доставлены в очереди.
- режиссер
/**
* 生产者
*/
public class Producer {
private static final String EXCHANGE_NAME = "Publish-Subscribe";
public static void main(String[] args) throws IOException, TimeoutException {
for (int i = 1; i < 7; i++) {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
// 绑定交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
channel.basicPublish(EXCHANGE_NAME, "", null, String.valueOf(i).getBytes());
System.out.println("消息已被发送:" + i);
channel.close();
connection.close();
}
}
}
- Потребитель 1
/**
* 消费者1
*/
public class Consumer1{
private static final String QUEUE_NAME = "subscribe1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String messageBody = new String(body);
System.out.println("消费者1消费消息:"+messageBody);
}
};
channel.basicConsume(QUEUE_NAME,true, defaultConsumer);
}
}
- Потребитель 2
/**
* 消费者subscribe2
*/
public class Consumer2 {
private static final String QUEUE_NAME = "subscribe2";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String messageBody = new String(body);
System.out.println("消费者2消费消息:"+messageBody);
}
};
channel.basicConsume(QUEUE_NAME,true, defaultConsumer);
}
}
После запуска потребителя снова запустите производителя. Вы можете видеть, что оба потребителя потребляют все сообщения от производителя.
Думаю о публикации и подписке
-
Модель публикации-подписки на самом деле очень похожа на то, когда мы обычно используем официальную учетную запись WeChat.Официальная учетная запись публикует статьи, и все фанаты, которые подписываются на официальную учетную запись, могут получать сообщения, отправленные официальной учетной записью. Но мы обращаем внимание на официальный аккаунт, а официальный аккаунт не передаст нам данные. Так здесь то же самое?
сцена первая: Теперь мы сначала запускаем производителя, заканчиваем создание сообщения, а затем запускаем потребителя. Обнаружено, что два потребителя все еще могут потреблять созданные сообщения.
На самом деле, когда производитель публикует сообщение, он доставляет сообщение коммутатору, а коммутатор помогает нам доставить сообщение в две очереди, поэтому со временем мой производитель остановился, но в это время в моей очереди уже есть информация.Итак, как только мы запустим потребитель для прослушивания очереди, мы сможем нормально потреблять данные.Сценарий второй: Сначала мы развязываем связь между Потребителем 2 и Exchange, а затем запускаем Потребителя 1 и Производителя.
В это время мы повторно связываем очередь subscribe2 с обменом Publish-Subscribe и запускаем Consumer 2. В настоящее время вы можете видеть, что у Consumer 2 нет данных для потребления. На этот раз вернемся к теме.На самом деле эта сцена больше соответствует упомянутой в заголовке сцене, потому что болельщики здесь видят, что толчок получен, а командные потребители фактически завершили потребление. Вновь подписанный пользователь эквивалентен добавлению новой очереди в существующую Exchagne, и новое сообщение должно быть доставлено, прежде чем новый пользователь сможет его получить.
Маршрутизация (режим маршрутизации)
Создание коммутаторов с прямым подключением Очереди маршрутизации и потребителей, три очереди RoutingConsumer1, RoutingConsumer3, RoutingConsumer3
Затем отношения привязки: обратите внимание на ключ маршрутизации ниже, почему вы хотите установить его именно так? Потому что я хочу знать, во сколько очередей будет доставлено сообщение, если ключ маршрутизации одинаков?- режиссер
/**
* 生产者
*/
public class Producer {
private static final String EXCHANGE_NAME = "Routing";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
// 绑定直连交换机
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
channel.basicPublish(EXCHANGE_NAME, "key1", null, String.valueOf("key1").getBytes());
channel.basicPublish(EXCHANGE_NAME, "key2", null, String.valueOf("key2").getBytes());
System.out.println("消息已发送");
channel.close();
connection.close();
}
}
- Потребитель 1 (два других кода потребителя почти одинаковы, просто измените серийный номер)
public class RoutingConsumer1 {
private static final String QUEUE_NAME = "RoutingConsumer1";
private static final String EXCHANGE_NAME = "Routing";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key1");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String messageBody = new String(body);
System.out.println("消费者1消费消息:"+messageBody);
}
};
channel.basicConsume(QUEUE_NAME,true, defaultConsumer);
}
}
Сначала запустите трех потребителей, а затем запустите производителя. Установлено, что оба потребителя 1 и 2 потребляют ключ1, а потребитель 3 выдает ключ3.
Вывод вышел: Когда для передачи сообщения используется режим сообщения маршрутизации, прямой обмен доставляет сообщение на соответствующий ключ маршрутизации.
Мысли о коммутаторах с прямым подключением
- Если ключ маршрутизации совпадает лишь частично, будет ли он доставлен?
Нам нужно только позволить производителю передать значение Routing Key = key
Оказалось, что никто из потребителей его не употреблял. то естьВ случае прямой связи такого частичного совпадения нет.channel.basicPublish(EXCHANGE_NAME, "key", null, String.valueOf("key").getBytes());
Темы
Когда нам нужно частично сопоставить ключ маршрутизации, на этот раз в игру вступает тематический режим. В режиме тем обратите внимание на три символа
символ | эффект |
---|---|
. | разделить слова |
* | сопоставить слово |
# | соответствовать одному или нескольким словам |
Результат выглядит следующим образом: routeKey сообщения производителя: java.without.smoke
серийный номер | routeKey | потреблять ли |
---|---|---|
TopicConsumer1 | java.without.smoke | Y |
TopicConsumer2 | java.* | N |
TopicConsumer3 | java.without.* | Y |
TopicConsumer4 | java.# | Y |
Режим заголовка
Главное — установить свойство заголовка и метод сопоставления в Channel.
- режиссер
public static Map<String, Object> map = new HashMap<>();
map = new HashMap<>();
map.put("ID", 1);
map.put("Name", "aaaa");
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().headers(map).build();
channel.basicPublish(EXCHANGE_NAME, "java.without.smoke", props, String.valueOf("key1").getBytes());
- потребитель
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().headers(map).build();
channel.queueBind(QUEUE_NAME, Producer.EXCHANGE_NAME, "java.*", props.getHeaders());
Суммировать
-
RabbitMQ предоставил нам так много режимов сообщений, среди которых режим темы может достигать эффекта режима маршрутизации и режима публикации-подписки. Каждый режим имеет свои особенности. В случае, когда у очереди есть только один потребитель, тематический режим также может достигать эффекта однорангового режима.
Сцены план Сообщения фиксируются в очереди, и нет необходимости в нескольких потребителях для ускорения потребления. Использовать одноранговый режим Сообщения помещаются в очередь, поэтому для ускорения потребления требуется несколько потребителей. Использовать режим рабочей очереди Размещать сообщения в нескольких очередях в соответствии с определенными правилами тематический режим -
Доставка сообщений RabbitMQ основана на режиме «производитель -> обмен -> очередь -> потребитель», но мы можем понимать двухточечный режим и режим рабочей очереди как анонимный обмен для очередей доставки.
-
Переключатель Exchange на самом деле очень похож на сервер nginx, который мы используем при обратном проксировании: nginx отвечает за пересылку запросов, а Exchagne — за пересылку сообщений.