Это первый раз, когда я участвую в Gengwen Challenge.23День, подробности о событии уточняйте:Обновить вызов
Со временем капли воды и камни изнашиваются 😄
Что такое очередь недоставленных сообщений
DLX ,全称为 Dead-Letter-Exchange
, который можно назвать переключателем мертвой буквы. Когда сообщение становится мертвой буквой в очереди, его можно повторно отправить на другой обмен, которым является DLX.Очередь, связанная с DLX, называется очередью недоставленных сообщений..
Сообщения становятся мертвыми буквами, как правило, из-за следующих условий:
-
1. Срок действия сообщения истекает, что является TTL, упомянутым автором в предыдущей статье. Время существования сообщения в очереди превышает установленное время TTL.
-
2. Сообщение было отклонено и вызвано
channel.basicNack
илиchannel.basicReject
способ, ну и наборrequeue
Параметрыfalse
. -
3. Сообщение очереди достигает максимальной длины.
DLX это тоже обычный обмен, который ничем не отличается от общего обмена, его можно указать на любой очереди, по сути это установка свойств определенной очереди. Когда в этой очереди есть мертвые письмаRabbitMQ
Это сообщение будет автоматически опубликовано в установленном DLX, а затем перенаправлено в другую очередь, очередь недоставленных сообщений. Сообщения в этой очереди можно отслеживать для соответствующей обработки.
Настройка очереди недоставленных сообщений
существуетchannel.queueDeclare
набор методовx-dead-letter-exchange
параметр для добавления DLX в эту очередь. Далее описывается настройка и использование очередей недоставленных сообщений в первом и втором случаях.
срок действия сообщения истек
добавить конфигурацию
mq:
queueBinding:
queue: prod_queue_pay
dlQueue: dl-queue
exchange:
name: exchang_prod_pay
dlTopicExchange: dl-topic-exchange
key: prod_pay
dlRoutingKey: dl-routing-key
Создавайте обмен недоставленными сообщениями, очереди недоставленных сообщений и привязки между двумя
@Value("${mq.queueBinding.exchange.dlTopicExchange}")
private String dlTopicExchange;
@Value("${mq.queueBinding.dlRoutingKey}")
private String dlRoutingKey;
@Value("${mq.queueBinding.dlQueue}")
private String dlQueue;
//创建死信交换机
@Bean
public TopicExchange dlTopicExchange(){
return new TopicExchange(dlTopicExchange,true,false);
}
//创建死信队列
@Bean
public Queue dlQueue(){
return new Queue(dlQueue,true);
}
//死信队列与死信交换机进行绑定
@Bean
public Binding BindingErrorQueueAndExchange(Queue dlQueue, TopicExchange dlTopicExchange){
return BindingBuilder.bind(dlQueue).to(dlTopicExchange).with(dlRoutingKey);
}
Вы можете видеть, что в приведенном выше коде нет ничего особенного, просто создайте переключатель и очередь и свяжите их вместе.
Создавайте бизнес-очереди, бизнес-обмены и привязки между двумя
@Value("${mq.queueBinding.queue}")
private String queueName;
@Value("${mq.queueBinding.exchange.name}")
private String exchangeName;
@Value("${mq.queueBinding.key}")
private String key;
private final String dle = "x-dead-letter-exchange";
private final String dlk = "x-dead-letter-routing-key";
private final String ttl = "x-message-ttl";
/**
* 业务队列
* @return
*/
@Bean
public Queue payQueue(){
Map<String,Object> params = new HashMap<>();
//设置队列的过期时间
//队列中所有消息都有相同的过期时间
params.put(ttl,10000);
//声明当前队列绑定的死信交换机
params.put(dle,dlTopicExchange);
//声明当前队列的死信路由键 如果没有指定,则使用原队列的路由键:
params.put(dlk,dlRoutingKey);
return QueueBuilder.durable(queueName).withArguments(params).build();
}
@Bean
public TopicExchange payTopicExchange(){
return new TopicExchange(exchangeName,true,false);
}
//队列与交换机进行绑定
@Bean
public Binding BindingPayQueueAndPayTopicExchange(Queue payQueue, TopicExchange payTopicExchange){
return BindingBuilder.bind(payQueue).to(payTopicExchange).with(key);
}
Приведенный выше код ключа业务队列
создание. Указано время истечения очереди и настроена очередь недоставленных сообщений.
код производителя
/*
* 生产者
*/
@Component
@Slf4j
public class RabbitSender {
@Value("${mq.queueBinding.exchange.name}")
private String exchangeName;
@Value("${mq.queueBinding.key}")
private String key;
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String msg){
log.info("RabbitSender.send() msg = {}",msg);
// 将消息发送给业务交换机
rabbitTemplate.convertAndSend(exchangeName,key,msg);
}
}
Обеспечьте внешний подход
@Autowired
private RabbitSender rabbitSender;
@GetMapping
public void test(@RequestParam String msg){
rabbitSender.send(msg);
}
Запустив службу, вы увидите, что очередь службы, переключатель службы, очередь недоставленных сообщений и переключатель недоставленных сообщений создаются одновременно. И вы можете увидеть метки DLX и DLK в бизнес-очереди.
Затем вызовите интерфейс:http://localhost:8080/?msg=Бум, сообщение будет отправлено наprod_queue_pay
эта очередь.
Если ни один потребитель не потребляет сообщение в течение 10 секунд, срок действия сообщения определяется как просроченный. Поскольку DLX установлен, сообщения выбрасываются по истечении срока действия.dlxExchange
переключатель, в соответствии с настроеннымdlRoutingKey
найти сdlxExchange
соответствующая очередьdlQueue
После сохранения сообщения вdlxQueue
в очереди мертвых писем.
сообщение отклонено
Удалите время истечения срока действия бизнес-очереди.
/**
* 业务队列
* @return
*/
@Bean
public Queue payQueue(){
Map<String,Object> params = new HashMap<>();
//声明当前队列绑定的死信交换机
params.put(dle,dlTopicExchange);
//声明当前队列的死信路由键 如果没有指定,则使用原队列的路由键:
params.put(dlk,dlRoutingKey);
return QueueBuilder.durable(queueName).withArguments(params).build();
}
новый файл конфигурации
rabbitmq:
listener:
simple:
#消息确认方式 manual 手动确认 auto 自动确认 none 不管
acknowledge-mode: manual
Добавить потребителей
/**
* 消费者
*/
@Component
@Slf4j
public class RabbitReceiver {
//测试消费者进行消费发送异常 是否进入死信队列
@RabbitListener(queues = "${mq.queueBinding.queue}")
public void infoConsumption(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
log.info("收到信息:{}",data);
boolean ack = false;
Exception exception = null;
try {
if(data.contains("888")){
throw new RuntimeException("信息敏感");
}
} catch (RuntimeException e) {
ack = true;
exception = e;
}
if (ack){
log.error("消息消费发生异常,error msg:{}", exception.getMessage(), exception);
//注意第三个参数需要为false
//true则重新放入原队列,否则丢弃或者进入死信队列。
channel.basicNack(tag, false, false);
} else {
//进行手动确认信息已经被消费
channel.basicAck(tag, false);
}
}
}
В приведенном выше кодеchannel.basicNack
метод, если третий параметр установлен вtrue
, возникнет проблема с бесконечными повторами. В следующей статье автор расскажет, как решить проблему бесконечных повторных попыток.
Удалите предыдущую очередь, затем перезапустите и повторно создайте бизнес-очередь. можно увидеть业务队列
Метка имеет меньше TTL.
Интерфейс запроса:http://localhost:8080/?msg=888Из-за конфиденциального сообщения бизнес будет ненормальным, и сообщение будет перенаправлено в очередь недоставленных сообщений.
- Если у вас есть какие-либо вопросы по этой статье или есть ошибки в этой статье, пожалуйста, оставьте комментарий. Если вы считаете, что эта статья была вам полезна, ставьте лайк и подписывайтесь на нее.