Управляемое чтение
Впервые опубликовано в публичном аккаунте:Пиратский корабль Ява, оригинальность непроста, читатели, которым это нравится, могут обратить на это внимание! Публичный аккаунт для обмена учебными ресурсами по Java, практическим опытом и техническими статьями!
предисловие
На то, чтобы в свободное время написать две статьи о rabbitMq, ушло больше недели, одна — нативная версия, а другая — версия для springboot. Новичкам лучше посмотреть оригинальную звуковую версию, чтобы было понятнее, а если есть желание применить, то можно и напрямую прочитать эту статью. В этой статье много содержания. После прочтения применения пяти основных моделей сообщений есть дополнительные главы, связанные друг с другом. Если исследование не является тщательным, пожалуйста, дайте мне дополнительные советы.версия кролика MQ-springboot
Официальная справочная документация
основные основные понятия
Server:Также известный как брокер, клиент принимает соединение, внедрение организации службы AMQP.
Connection:Connect, сетевое подключение приложения к брокеру.
Channel:В сетевом канале почти все операции выполняются в канале, который является каналом для чтения и записи сообщений. Клиент может установить несколько каналов, и каждый канал представляет задачу сеанса. Если соединение устанавливается каждый раз при доступе к RabbitMQ, накладные расходы на установление TCP-соединения будут огромными, а эффективность будет низкой при большом объеме сообщений. Канал — это логическое соединение, установленное внутри соединения. Если приложение поддерживает многопоточность, обычно каждый поток создает отдельный канал для связи. Метод AMQP включает идентификатор канала, чтобы помочь клиенту и брокеру сообщений идентифицировать канал, поэтому каналы полностью изолированы. Являясь упрощенным соединением, Channel значительно снижает нагрузку операционной системы на установление TCP-соединения.
Сообщение:Сообщение, данные, передаваемые между сервером и приложением, состоит из свойств сообщения и тела. Свойства могут изменять сообщение, такие как приоритет сообщения, задержка и другие дополнительные функции, а тело — это содержимое тела сообщения.
Virtual Host:Виртуальные адреса используются для логической изоляции и маршрутизации сообщений верхнего уровня. Виртуальный хост может иметь несколько обменов и очередей, и один и тот же виртуальный хост не может иметь обменов или очередей с одинаковым именем.
Обмен:Коммутатор, который имеет только возможность пересылки, но не может хранить сообщения, пересылает сообщения в связанную очередь в соответствии с ключом маршрутизации.
Binding:Виртуальное соединение между Exchange и Queue, ключ маршрутизации может быть включен в привязку.
Routing key:Правило маршрутизации, которое виртуальная машина может использовать для определения способа маршрутизации определенного сообщения.
Очередь:Также известная как очередь сообщений, она хранит сообщения и пересылает их потребителям.
импортировать зависимости
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>
</dependencies>
настроить
spring:
rabbitmq:
host: 127.0.0.1
username: admin123
password: 123456
virtual-host: /test
Пример кода модели сообщения
Что касается использования некоторых методов, все описания атрибутов параметров прокомментированы в коде.
Простая модель очереди
образец графика
P (производитель/издатель): производитель, например экспресс-доставка
C (потребитель): потребители, такие как экспресс-доставка
Красная зона: очередь, такая как экспресс-зона, ожидающая, пока потребители заберут экспресс.
резюме в одном предложении
Производители отправляют сообщения в очереди, потребители получают сообщения из очередей, а очереди — это буферы, в которых хранятся сообщения.
Инициализировать очередь
package com.ao.springbootamqp.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class RabbitMqConfig {
/*队列*/
public static final String TEST_QUEUE = "simple-amqp_queue";
/**声明队列
* public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
* this(name, durable, exclusive, autoDelete, (Map)null);
* }
* String name: 队列名
* boolean durable: 持久化消息队列,rabbitmq 重启的时候不需要创建新的队列,默认为 true
* boolean exclusive: 表示该消息队列是否只在当前的connection生效,默认为 false
* boolean autoDelete: 表示消息队列在没有使用时将自动被删除,默认为 false*/
@Bean(TEST_QUEUE)
public Queue testQueue() {
return new Queue(TEST_QUEUE, true);
}
отправить класс сообщения
package com.ao.springbootamqp.service;
import com.ao.springbootamqp.config.RabbitMqConfig;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.UUID;
@Component
@Slf4j
public class RabbitMqService {
@Autowired
private RabbitTemplate rabbitTemplate;
/*发送消息到队列*/
public String sendQueue(Object payload){
return baseSend("", RabbitMqConfig.TEST_QUEUE, payload, null, null);
}
/**
* MQ 公用发送方法
*
* @param exchange 交换机
* @param routingKey 队列
* @param payload 消息体
* @param messageId 消息id(唯一性)
* @param messageExpirationTime 持久化时间
* @return 消息编号
*/
public String baseSend(String exchange, String routingKey, Object payload, String messageId, Long messageExpirationTime) {
/*若为空,则自动生成*/
if (messageId == null) {
messageId = UUID.randomUUID().toString();
}
String finalMessageId = messageId;
/*设置消息属性*/
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
/*消息属性中写入消息id*/
message.getMessageProperties().setMessageId(finalMessageId);
/*设置消息持久化时间*/
if (!StringUtils.isEmpty(messageExpirationTime)){
message.getMessageProperties().setExpiration(messageExpirationTime.toString());
}
/*设置消息持久化*/
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}
};
/*构造消息体,转换json数据格式*/
Message message = null;
try {
ObjectMapper objectMapper = new ObjectMapper();
String json = objectMapper.writeValueAsString(payload);
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentEncoding(MessageProperties.CONTENT_TYPE_JSON);
message = new Message(json.getBytes(), messageProperties);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
/*表示当前消息唯一性*/
CorrelationData correlationData = new CorrelationData(finalMessageId);
/**
* public void convertAndSend(String exchange, String routingKey, Object message,
* MessagePostProcessor messagePostProcessor, @Nullable CorrelationData correlationData) throws AmqpException
* exchange: 路由
* routingKey: 绑定key
* message: 消息体
* messagePostProcessor: 消息属性处理器
* correlationData: 表示当前消息唯一性
*/
rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor, correlationData);
return finalMessageId;
}
}
тестовое задание
отправлять сообщения
@SpringBootTest
class RabbitMqTest {
@Autowired
private RabbitMqService rabbitMqService;
@Test
public void tt(){
String s = "顺丰快递";
rabbitMqService.sendQueue(s);
}
}
Посмотреть интерфейс управления
Как видите, сообщение было успешно отправлено на сервер, и свойства сообщения внутри именно такие, какие мы установили. Поскольку сообщение было отправлено на сервер, потребитель может быть использован позже.
потребитель
@Component
public class RecService {
/*队列*/
public static final String TEST_QUEUE = "simple-amqp_queue";
@RabbitListener(queues = TEST_QUEUE)
public void t2(Message message){
try {
String msg = new String(message.getBody());
if (msg == null) {
System.out.println("消息为空");
}
System.out.println("我收到了=-=" + msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Запуск и просмотр
модель рабочего сообщения
образец графика
P (производитель/издатель): производитель, например экспресс-доставка
C1, C2 (потребитель): потребители, такие как экспресс-доставка
Красная зона: очередь, такая как экспресс-зона, ожидающая, пока потребители заберут экспресс.
Структура передачи сообщений 10
@SpringBootTest
class RabbitMqTest {
@Autowired
private RabbitMqService rabbitMqService;
@Test
public void tt(){
for (int i = 0;i < 10; i++){
String s = "消息" + i;
rabbitMqService.sendQueue(s);
}
}
}
Потребитель 1/2
@Component
public class RecService1 {
/*队列*/
public static final String TEST_QUEUE = "work-amqp-queue";
@RabbitListener(queues = TEST_QUEUE)
public void t2(Message message){
try {
String msg = new String(message.getBody());
if (msg == null) {
System.out.println("消息为空");
}
System.out.println("消费者1收到=-=" + msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Посмотреть консоль
Видно, что потребление одинаковое.Если хотите возможности работать больше, добавляйте конфигурацию следующим образом:
#指定一个请求能够处理多少个消息
listener:
simple:
#测试消费者1值为3,消费者2值为1
prefetch: 1
Или добавьте канал.basicQos(1) к потребителю. Это говорит RabbitMq не продолжать отправлять сообщения потребителю, а ждать, пока потребитель подтвердит предыдущее сообщение.
@Component
public class RecService1 {
/*队列*/
public static final String TEST_QUEUE = "work-amqp-queue";
@RabbitListener(queues = TEST_QUEUE)
public void t2(Message message,Channel channel){
try {
String msg = new String(message.getBody());
if (msg == null) {
System.out.println("消息为空");
}
System.out.println("消费者1收到=-=" + msg);
channel.basicQos(1);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Перезапустите двух потребителей и в цикле отправьте сообщения 10. Просмотрите консоль следующим образом:
Вы можете видеть, что Потребитель 1 тратит больше
Модель подписки — разветвление (режим вещания)
В этой модели подписки производители публикуют сообщения, и все потребители могут получать все сообщения.
образец графика
P: производитель, например, экспресс-доставка
X: Switch, эквивалент курьерской компании.
Красная зона: очередь, такая как экспресс-зона, ожидающая, пока потребители заберут экспресс.
C1, C2: потребители, такие как экспресс-доставка
Измените следующую конфигурацию в RabbitMqConfig, объявите очередь 1 и очередь 2 и привяжите переключатель к этим двум очередям.
/*交换机*/
public static final String TEST_EXCHANGE = "fanout_amqp_exchange";
/*声明一个fanout交换机*/
@Bean(TEST_EXCHANGE)
public Exchange testExchange() {
// durable(true)持久化,mq重启之后,交换机还在
return ExchangeBuilder.fanoutExchange(TEST_EXCHANGE).durable(true).build();
}
/*队列1*/
public static final String TEST_QUEUE_1 = "fanout_amqp_queue_1";
/*队列2*/
public static final String TEST_QUEUE_2 = "fanout_amqp_queue_2";
/**声明队列1
* public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
* this(name, durable, exclusive, autoDelete, (Map)null);
* }
* String name: 队列名
* boolean durable: 持久化消息队列,rabbitmq 重启的时候不需要创建新的队列,默认为 true
* boolean exclusive: 表示该消息队列是否只在当前的connection生效,默认为 false
* boolean autoDelete: 表示消息队列在没有使用时将自动被删除,默认为 false*/
@Bean(TEST_QUEUE_1)
public Queue testQueue1() {
return new Queue(TEST_QUEUE_1, true);
}
/*声明队列2*/
@Bean(TEST_QUEUE_2)
public Queue testQueue2() {
return new Queue(TEST_QUEUE_2, true);
}
/*队列1与路由进行绑定*/
@Bean
Binding bindingTest1(@Qualifier(TEST_QUEUE_1) Queue queue,
@Qualifier(TEST_EXCHANGE) Exchange exchange) {
return BindingBuilder
.bind(queue)
.to(exchange)
.with("")
.noargs();
}
/*队列2与路由进行绑定*/
@Bean
Binding bindingTest2(@Qualifier(TEST_QUEUE_2) Queue queue,
@Qualifier(TEST_EXCHANGE) Exchange exchange) {
return BindingBuilder
.bind(queue)
.to(exchange)
.with("")
.noargs();
}
существуетRabbitMqServiceДобавить способ доставки:отправить на переключение
/*发送到交换器*/
public String sendExchange(Object payload,String routingKey){
return baseSend(RabbitMqConfig.TEST_EXCHANGE, routingKey, payload, null, null);
}
отправлять сообщения
@Test
public void t1(){
String s = "广播快递";
rabbitMqService.sendExchange(s,"");
}
Просмотр отношения привязки коммутатора
Проверьте, успешно ли отправлено сообщение
Мы видим, что он был успешно отправлен
начать потребитель
Измените соответствующее имя очереди и перезапустите
@RabbitListener(queues = TEST_QUEUE)
Модель подписки — прямая (шаблон маршрута)
В этой модели подписки производители публикуют сообщения, а потребители выборочно получают сообщения. Привязка между очередью и коммутатором не может быть какой-либо привязкой, но должен быть указан RoutingKey (ключ маршрутизации). Отправитель сообщения также должен указать ключ маршрутизации сообщения при отправке сообщения в Exchange.
образец графика
P: производитель, например, экспресс-доставка
X: Switch, эквивалент курьерской компании.
Красная зона: очередь, такая как экспресс-зона, ожидающая, пока потребители заберут экспресс.
C1, C2: потребители, такие как экспресс-доставка
ошибка, информация, что мы говорим о RoutingKey
Измените конфигурацию RabbitMqConfig,В основном ключ маршрутизации указывается, когда коммутатор привязан к этим двум очередям.Очередь 1 принимает только SF Express, а очередь 2 принимает только JD Express.
/*交换机*/
public static final String TEST_EXCHANGE = "direct_amqp_exchange";
/*声明一个direct交换机*/
@Bean(TEST_EXCHANGE)
public Exchange testExchange() {
// durable(true)持久化,mq重启之后,交换机还在
return ExchangeBuilder.directExchange(TEST_EXCHANGE).durable(true).build();
}
/*队列1*/
public static final String TEST_QUEUE_1 = "direct_amqp_queue_1";
/*队列2*/
public static final String TEST_QUEUE_2 = "direct_amqp_queue_2";
/**声明队列
* public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
* this(name, durable, exclusive, autoDelete, (Map)null);
* }
* String name: 队列名
* boolean durable: 持久化消息队列,rabbitmq 重启的时候不需要创建新的队列,默认为 true
* boolean exclusive: 表示该消息队列是否只在当前的connection生效,默认为 false
* boolean autoDelete: 表示消息队列在没有使用时将自动被删除,默认为 false*/
@Bean(TEST_QUEUE_1)
public Queue testQueue1() {
return new Queue(TEST_QUEUE_1, true);
}
@Bean(TEST_QUEUE_2)
public Queue testQueue2() {
return new Queue(TEST_QUEUE_2, true);
}
/*队列1路由进行绑定*/
@Bean
Binding bindingTest1(@Qualifier(TEST_QUEUE_1) Queue queue,
@Qualifier(TEST_EXCHANGE) Exchange exchange) {
return BindingBuilder
.bind(queue)
.to(exchange)
.with("SF")
.noargs();
}
/*队列2路由进行绑定*/
@Bean
Binding bindingTest2(@Qualifier(TEST_QUEUE_2) Queue queue,
@Qualifier(TEST_EXCHANGE) Exchange exchange) {
return BindingBuilder
.bind(queue)
.to(exchange)
.with("JD")
.noargs();
}
отправлять сообщения
@Test
public void t2(){
String s = "京东快递";
String s1 = "顺丰快递";
rabbitMqService.sendExchange(s,"JD");
rabbitMqService.sendExchange(s1,"SF");
}
Просмотр отношения привязки коммутатора
Проверьте, успешно ли отправлено сообщение
начать потребитель
Измените соответствующее имя очереди и перезапустите, само собой разумеется, что потребитель 1 должен получить SF Express, а потребитель 2 должен получить JD Express, результаты следующие:
Результат ожидаемый.
Модель подписки — Тема (шаблон подстановочного знака)
образец графика
Topic
ТипExchange
а такжеDirect
В сравнении можноRoutingKey
Направлять сообщения в разные очереди. ТолькоTopic
ТипыExchange
Очередь может быть связанаRouting key
когда используешьподстановочный знак!
Routingkey
Обычно он состоит из одного или нескольких слов, а несколько слов разделяются знаком «.»
Правила подстановочных знаков:
#
: соответствует одному или нескольким словам
*
: Соответствует не более чем не менее точно 1 слову
Изменение RabbitMqConfig в основном такое же, как и прямое, изменяются только имя очереди и переключатель, а ключ маршрутизации изменяется наОчередь 1 принимает только SF Express, а Очередь 2 принимает любого курьера.
/*声明一个direct交换机*/
@Bean(TEST_EXCHANGE)
public Exchange testExchange() {
// durable(true)持久化,mq重启之后,交换机还在
return ExchangeBuilder.topicExchange(TEST_EXCHANGE).durable(true).build();
}
/*队列1路由进行绑定*/
@Bean
Binding bindingTest1(@Qualifier(TEST_QUEUE_1) Queue queue,
@Qualifier(TEST_EXCHANGE) Exchange exchange) {
return BindingBuilder
.bind(queue)
.to(exchange)
.with("SF.kd")
.noargs();
}
/*队列2路由进行绑定*/
@Bean
Binding bindingTest2(@Qualifier(TEST_QUEUE_2) Queue queue,
@Qualifier(TEST_EXCHANGE) Exchange exchange) {
return BindingBuilder
.bind(queue)
.to(exchange)
.with("#.kd")
.noargs();
}
отправлять сообщения
@Test
public void t2(){
String s = "EMS快递";
String s1 = "顺丰快递";
String s2 = "京东快递";
rabbitMqService.sendExchange(s,"EMS.kd");
rabbitMqService.sendExchange(s1,"SF.kd");
rabbitMqService.sendExchange(s2,"JD.kd");
}
Просмотр отношения привязки коммутатора
Проверьте, успешно ли отправлено сообщение
начать потребитель
Результат такой, как и ожидалось!
Передовой
Советы: следующие примеры кода демонстрируются в режиме маршрутизации.
надежная доставка сообщений
Для достижения надежности сообщений RabbitMQ должны быть гарантированы следующие три пункта:
- Механизм подтверждения сообщения RabbitMQ:Существует два типа подтверждения сообщения RabbitMQ: подтверждение отправки сообщения и подтверждение получения потребления. Подтверждение отправки сообщения должно подтвердить, надежно ли доставлено сообщение в процессе, когда производитель отправляет сообщение на Exchange, а Exchange распространяет сообщение в очередь. Первый шаг заключается в том, чтобы достичь Exchange, а второй шаг — подтвердить, достигает ли он Queue.
- Обмены, очереди и сообщения сохраняются:Предотвратите отправку сообщения брокеру, и брокер зависнет до того, как потребитель его обработает.
- Механизм подтверждения потребителя: Есть 3 режима:нет (ответ не будет отправлен),авто (автоматический ответ),ручной (ручной ответ). Дабы обеспечить достоверность сообщения, мы настроили ручной ответ, зачем это? В методе автоматического ответа каждый раз, когда потребитель получает сообщение, независимо от того, завершена обработка или нет, брокер устанавливает сообщение как завершенное, а затем удаляет его из Очереди. Если возникает исключение, когда потребитель использует сообщение, потребитель не может использовать сообщение, что приводит к потере сообщения. В ручном режиме ответа могут быть вызваны методы basicAck, basicNack и basicReject, а ACK отправляется только при правильной обработке сообщения.
Механизм подтверждения сообщения Rabbitmq
Изменить настройку
spring:
rabbitmq:
host: 127.0.0.1
username: admin123
password: 123456
virtual-host: /test
# 确认消息发送成功,通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调
publisher-confirms: true
# 实现ReturnCallback接口,如果消息从交换器发送到对应队列失败时触发
publisher-returns: true
listener:
# 消息消费确认,可以手动确认
simple:
acknowledge-mode: manual
Изменить RabbitMqService
Добавьте код для реализации интерфейса ConfirmCallBack и реализации интерфейса ReturnCallback.
// 消息发送到交换器Exchange后触发回调
private final RabbitTemplate.ConfirmCallback confirmCallback =
new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
//成功业务逻辑
log.info("消息投递到及交换机成功啦!!!");
} else {
//失败业务逻辑
log.info("消息投递到及交换机失败啦!!");
}
}
};
// 如果消息从交换器发送到对应队列失败时触发
private final RabbitTemplate.ReturnCallback returnCallback =
new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
//失败业务逻辑
log.info("message=" + message.toString());
log.info("replyCode=" + replyCode);
log.info("replyText=" + replyText);
log.info("exchange=" + exchange);
log.info("routingKey=" + routingKey);
}
};
Добавьте следующий код перед rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor, корреляцияДанные):
rabbitTemplate.setConfirmCallback(this.confirmCallback);
rabbitTemplate.setReturnCallback(this.returnCallback);
Проверьте, успешно ли отправлено сообщение коммутатору.
Для облегчения тестирования используйте контроллер для отправки сообщений. Если сообщение не может быть перенаправлено на соответствующий Exchange, ACK, возвращенный механизмом Confirm, вернет false, и будет выполнена обработка исключений, а также будет выполнена некоторая бизнес-логика, например, повторная попытка или компенсация.
@RestController
public class TestController {
@Autowired
private RabbitMqService sender;
@PostMapping("/tt")
public String sendMsg(String msg){
sender.sendExchange(msg,"");
return "ok";
}
}
Посмотреть консоль
Проверьте, успешно ли отправлено сообщение в очередь
Здесь при отправке сообщения укажите несуществующий ключ маршрутизации для имитации обратного вызова с ошибкой.
sender.sendExchange(msg,"XXX");
Посмотреть консоль
Обмены, очереди, сообщения сохраняются
Это упоминается в коде выше, опущено.
Механизм подтверждения потребителя
Как упоминалось ранее, существует три метода ручного ответа: basicAck, basicNack и basicReject, так что давайте посмотрим.
basicAck
Когда множественное значение равно false, подтверждается только текущее сообщение. Когда множественное значение равно true, пакетно подтверждается все сообщения, размер которых меньше текущего тега доставки. deliveryTag используется для идентификации сообщения, доставленного в канале. RabbitMQ гарантирует, что в каждом канале deliveryTag сообщения увеличивается с 1.
public void basicAck(long deliveryTag, boolean multiple) throws IOException {
this.transmit(new Ack(deliveryTag, multiple));
this.metricsCollector.basicAck(this, deliveryTag, multiple);
}
basicNack
Этот метод можно использовать, когда возникает исключение, когда потребитель потребляет сообщение. Когда requeue равно true, ошибочное сообщение повторно попадает в очередь, которая обычно используется в сочетании с механизмом повторных попыток (когда количество повторных попыток превышает максимальное значение, сообщение будет отброшено) или в очередь недоставленных сообщений + очередь повторных попыток. Когда requeue имеет значение false, сообщение отбрасывается.
public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException {
this.transmit(new Nack(deliveryTag, multiple, requeue));
this.metricsCollector.basicNack(this, deliveryTag);
}
basicReject
То же, что и при использовании basicNack.
тестовое задание
Сначала закомментируйте ручное подтверждение
@RabbitListener(queues = TEST_QUEUE)
public void t2(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
System.out.println("消费者1收到=-=" + msg);
// long deliveryTag = message.getMessageProperties().getDeliveryTag();
// channel.basicAck(deliveryTag,false);
}
Посмотреть интерфейс управления
сообщение становится неподтвержденным
Остановите потребительскую программу, и сообщение снова становится готовым, потому что, хотя мы настроили руководство ACK, в коде нет подтверждения сообщения! Таким образом, сообщение на самом деле не потребляется. Когда мы закрываем этот потребитель, состояние сообщения называется снова
Механизм повторных попыток потребителя
настроить
Добавьте следующую конфигурацию: повторная попытка потребителя — настроить параметры, связанные с повтором, в прослушивателе, повторная попытка производителя — настроить параметры, связанные с повтором, в шаблоне, не запутайтесь.
listener:
# 消息消费确认,可以手动确认
simple:
acknowledge-mode: manual
#是否开启消费者重试(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息)
retry:
enabled: true
#初始重试间隔为1s
initial-interval: 1000
#重试的最大次数
max-attempts: 3
#重试间隔最多1s
max-interval: 1000
#每次重试的因子是1.0 等差
multiplier: 1.0
тестовое задание
Смоделируйте исключение для потребительского потребления, добавьте int i=1/0;
Посмотреть консоль
Как видите, потребление было повторено 3 раза.
Что делать, когда количество попыток исчерпано
Что делать, если listener.retry был опробован, но все равно выдает исключение? Сообщения об исключениях можно обрабатывать, настроив MessageRecoverer.По умолчанию есть две реализации:
- RepublishMessageRecoverer: повторно отправить сообщение в указанную очередь, которую необходимо настроить вручную. есть тест:
Добавьте следующее в RabbitMqConfig: сначала объявите обмен повторными попытками (RETRY_EXCHANGE) и очередь повторных попыток объявления (RETRY_QUEUE), а затем привяжите, ключ маршрутизации: повторная попытка
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, RETRY_EXCHANGE, "retry");
}
Добавьте потребителя следующим образом:
@RabbitListener(queues = RETRY_QUEUE)
public void t3(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
System.out.println("重试消费者收到了=-=" + msg);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag,false);
}
Количество повторных попыток исчерпано (поскольку максимальное количество повторных попыток настроено как 3), результаты теста следующие:
- RejectAndDontRequeueRecoverer: если вы не настроите MessageRecoverer вручную, это будет использоваться по умолчанию. Реализация просто выдает вывод исключения. Исходный код выглядит следующим образом (тест опущен):
public class RejectAndDontRequeueRecoverer implements MessageRecoverer {
protected Log logger = LogFactory.getLog(RejectAndDontRequeueRecoverer.class);
@Override
public void recover(Message message, Throwable cause) {
if (this.logger.isWarnEnabled()) {
this.logger.warn("Retries exhausted for message " + message, cause);
}
throw new ListenerExecutionFailedException("Retry Policy Exhausted", new AmqpRejectAndDontRequeueException(cause), message);
}
}
Повторное потребление сообщений
Механизм повторных попыток может вызывать задержки, что приводит к повторным проблемам с потреблением, таким как оплата, push-SMS, электронная почта и т. д.
Решение — Глобально уникальный идентификатор
-
Отправитель сообщения добавляет в заголовок сообщения уникальный идентификатор при отправке, например UUID, номер заказа, метку времени, traceId и т. д. Инкапсуляция сообщения была обработана выше:
-
После получения сообщения получатель сначала получает уникальный идентификатор заголовка сообщения и определяет, включен ли уже уникальный идентификатор в Redis.Если он включен, это указывает на то, что потребление было успешным, и сообщение не обрабатывается напрямую. . Если redis не содержит уникальный идентификатор, обработайте сообщение и сохраните уникальный идентификатор в кеше после успеха.
очередь недоставленных сообщений
Что такое очередь недоставленных сообщений
Мертвое письмо, как следует из названия, — это сообщение, которое не может быть использовано.Если потребитель имеет какую-либо аномалию и сообщение не используется, сообщение будет повторно доставлено на другой Exchange (обмен недоставленными письмами), и Exchange будет перенаправляется на другой в соответствии с routingKey.Queue, где сообщение повторно обрабатывается.
Распространенные источники очередей недоставленных писем
- Сообщение было отклонено (basic.reject или basic.nack) с requeue=false.
- Срок жизни сообщения истек
- Очередь достигла максимальной длины (очередь заполнена, больше нельзя добавить данные в mq)
Как бороться с мертвыми письмами
- Сообщение отклонено (basic.reject или basic.nack) с requeue=false без параметра requeue или достигнуто максимальное количество повторных запросов
- TTL (время жить) сообщения - время для жизни истек
- Превышен предел длины очереди (очередь заполнена, "x-max-length"параметр)
В этом примере используется третий.
пример кода
Инициализировать очереди недоставленных сообщений, обмены и привязки
Объявите переключатель недоставленных сообщений (DL_EXCHANGE) и очередь недоставленных сообщений (DL_QUEUE), затем привяжите и объявите бизнес-очередь (TEST_QUE_1) для присоединенияx-dead-letter-exchangeа такжеx-dead-letter-routing-keyпараметры, код выглядит следующим образом:
/*业务交换机*/
public static final String TEST_EXCHANGE = "test_amqp_exchange";
/*声明业务交换机*/
@Bean(TEST_EXCHANGE)
public Exchange testExchange() {
// durable(true)持久化,mq重启之后,交换机还在
return ExchangeBuilder.directExchange(TEST_EXCHANGE).durable(true).build();
}
/*队列1*/
public static final String TEST_QUEUE_1 = "test_amqp_queue_1";
@Bean(TEST_QUEUE_1)
public Queue testQueue1() {
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 声明 死信交换机
args.put("x-dead-letter-exchange", DL_EXCHANGE);
// x-dead-letter-routing-key 声明 死信路由键
args.put("x-dead-letter-routing-key", "dlk");
return QueueBuilder.durable(TEST_QUEUE_1).withArguments(args).build();
}
/*队列1路由进行绑定*/
@Bean
Binding bindingTest1(@Qualifier(TEST_QUEUE_1) Queue queue,
@Qualifier(TEST_EXCHANGE) Exchange exchange) {
return BindingBuilder
.bind(queue)
.to(exchange)
.with("SF")
.noargs();
}
/*死信交换机*/
public static final String DL_EXCHANGE = "deadLetterExchange";
/*声明死信交换机*/
@Bean(DL_EXCHANGE)
public Exchange deadLetterExchange() {
return ExchangeBuilder.directExchange(DL_EXCHANGE).durable(true).build();
}
/*死信队列*/
public static final String DL_QUEUE = "deadLetterQueue";
/*声明死信队列*/
@Bean(DL_QUEUE)
public Queue deadLetterQueue() {
return new Queue(DL_QUEUE,true);
}
/*死信队列绑定死信交换机*/
@Bean
Binding bindingDead(@Qualifier(DL_QUEUE) Queue queue,
@Qualifier(DL_EXCHANGE) Exchange exchange) {
return BindingBuilder
.bind(queue)
.to(exchange)
.with("dlk")
.noargs();
}
потребитель
//业务消费者
@RabbitListener(queues = TEST_QUEUE)
public void t2(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
try {
int i = 1/0;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e){
System.out.println("消费者1出错啦");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}}
//死信消费者
@RabbitListener(queues = DL_QUEUE)
public void t3( Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
System.out.println("死信队列收到了=-=" + msg);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag,false);
}
Посмотреть консоль
Примерный процесс
Вероятно, процесс заключается в том, что сообщение потребляется бизнес-потребителем.В это время бизнес-потребитель вешает трубку и переходит к коду отлова basicNack.Когда mq получает nack, он повторно доставляет сообщение в мертвую букву, связанную Затем коммутатор сопоставляет ключ маршрутизации недоставленных сообщений, привязанный к сервисной очереди x-dead-letter-routing-key, с очередью недоставленных сообщений и, наконец, потребляется недоставленными сообщениями. письмо потребителя.
очередь задержки
Что такое очередь задержки
Как следует из названия, очередь с задержкой означает, что сообщения, помещенные в очередь, не требуют немедленного использования, а извлекаются и потребляются после ожидания в течение определенного периода времени.
Сценарии применения отложенной очереди
- Заказы, не оплаченные в течение десяти минут, автоматически аннулируются.
- Если пользователь сделает возврат, а продавец не обработает его в течение трех дней, продавец будет уведомлен об этом по SMS или на платформе, где он находится.
Реализовать очередь задержки
Метод 1: TTL (время жизни) + DLX (обмен недоставленными письмами)
Мертвая буква (DLX) была понята выше, так что же такое TTL? RabbitMQ может установить x-message-ttl для Queue или setExpiration для Message, чтобы контролировать время существования сообщения. письмо).).
Есть два способа установить TTL
-
Все сообщения в очереди имеют одинаковое время истечения срока действия, заданное в свойствах очереди.
Недостаток: если вы используете этот метод для установки TTL сообщения, когда градиент времени задержки относительно велик, например, 1 минута, 2 минуты, 5 минут, 12 минут... Для маршрутизации необходимо создать множество обменов и очередей. Сообщения.
@Bean(TEST_QUEUE_1)
public Queue testQueue1() {
Map<String, Object> args = new HashMap<>(2);
//声明过期时间5秒
args.put("x-message-ttl", 5000);
// x-dead-letter-exchange 声明 死信交换机
args.put("x-dead-letter-exchange", DL_EXCHANGE);
//x-dead-letter-routing-key 声明 死信路由键
args.put("x-dead-letter-routing-key", "dlk");
return QueueBuilder.durable(TEST_QUEUE_1).withArguments(args).build();
}
-
Устанавливайте сообщения индивидуально, и у каждого сообщения TTL может быть разным.
Недостаток: если TTL сообщения устанавливается отдельно, это может привести к блокировке сообщения в очереди, поскольку очередь находится в порядке поступления, предыдущее сообщение не удаляется из очереди (не потребляется), а последующие сообщения не может быть доставлено. Сообщение может не «умереть» вовремя, потому что RabbitMQ проверит только, просрочено ли первое сообщение, и если оно истечет, оно будет брошено в очередь недоставленных сообщений.Если время задержки очень мало, второе сообщение не будет выполняться в первую очередь.
Способ 2: плагин rabbitmq-delayed-message-exchange
Это может решить проблему установки TTL для сообщений отдельно и приоритета короткого времени задержки.
пример кода
Следующая демонстрация — недоставленные сообщения + TTL, код по-прежнему основан на очереди недоставленных сообщений, приведенной выше.
Срок действия очереди
Добавьте конфигурацию x-message-ttl в бизнес-очередь и установите ее на одну секунду; потребитель удаляет бизнес-потребителя (срок действия смоделированного сообщения истекает без использования), оставляя только получателя недоставленных сообщений; остальные остаются без изменений.
@Bean(TEST_QUEUE_1)
public Queue testQueue1() {
Map<String, Object> args = new HashMap<>(2);
//声明过期时间5秒
args.put("x-message-ttl", 1000);
// x-dead-letter-exchange 声明 死信交换机
args.put("x-dead-letter-exchange", DL_EXCHANGE);
//x-dead-letter-routing-key 声明 死信路由键
args.put("x-dead-letter-routing-key", "dlk");
return QueueBuilder.durable(TEST_QUEUE_1).withArguments(args).build();
}
Посмотреть консоль
Видно, что время потребляется потребителем недоставленных писем через 1 секунду.
Срок действия набора сообщений
Закомментируйте время истечения очереди, а затем измените метод отправки следующим образом;
/*发送到交换器*/
public String sendExchange(Object payload,String routingKey,Long messageExpirationTime){
return baseSend(RabbitMqConfig.TEST_EXCHANGE, routingKey, payload, null, messageExpirationTime);
}
Контроллер работает следующим образом, потребитель остается неизменным
@Autowired
private RabbitMqService sender;
@PostMapping("/tt")
public String sendMsg(String msg){
sender.sendExchange(msg,"SF",5000L);
System.out.println("【5秒过期时间测试】发送时间是:"+LocalDateTime.now());
return "ok";
}
Посмотреть консоль
Видно, что сообщение потребляется потребителем недоставленных писем через 5 секунд.