Самое наглядное практическое упражнение по надежной передаче сообщений RabbitMQ в истории архитектуры кэширования
1. Исходная информация: важность надежной доставки сообщений
Например: рекламодатель (такой как Tmall) хочет размещать рекламу на нашей платформе (такой как Toutiao), когда новая реклама создается через нашу рекламную систему, сообщение теряется при синхронизации с кешем Redis (es). , а мы не выяснили что реклама не может нормально отображаться,тогда будет убыток.Если нет записи рекламы за один день,может быть убыток на миллионы,поэтому новость достоверна.Это Также важно передавать больше нашей рекламной системы. На самом деле такая сцена в жизни очень сцена, другой пример: система транзакций и система заказов должны обеспечивать надежную передачу сообщений, иначе потери огромные! ! !
Во-вторых, как обеспечить надежную доставку сообщений?
1. Настройка переключателей, очередей и сообщений для сохранения
**Постоянство: **Убедитесь, что соответствующая информация не будет потеряна при перезапуске сервера, сосредоточив внимание на решении проблемы потери сообщений, вызванной нештатными сбоями сервера. Однако установка всех сообщений как постоянных серьезно повлияет на производительность RabbitMQ, а скорость записи на диск не только немного медленнее, чем скорость записи в память. Для сообщений, которые не столь надежны, обработка сохраняемости может не использоваться для повышения общей пропускной способности.При выборе сохранения сообщений необходимо найти компромисс между надежностью и пропускной способностью. В определенном сценарии приложения, таком как: система торговли ордерами с большим трафиком, чтобы не влиять на производительность, мы не можем устанавливать постоянство, но мы будем регулярно сканировать базу данных на предмет неудачных сообщений и повторять отправку. на самом деле, у нас есть много решений, не останавливайтесь на достигнутом, подумайте об этом с другой стороны, и только у вас будет больше опыта, чтобы применить его легче.
1) Постоянство переключателей
@Bean
DirectExchange advanceExchange() {
return new DirectExchange(exchangeName);
}
Примечание. Просмотрите исходный код, который легко понять, по умолчанию используется постоянный
2) Сохранение очередей
@Bean
public Queue advanceQueue() {
return new Queue(queueName);
}
Примечание. Просмотрите исходный код, который легко понять, по умолчанию используется постоянный
3) Сохранение сообщений
Когда мы вызываем метод convertAndSend(String exchange, String routingKey, final Object object), используя RabbitTemplate. По умолчанию используется режим сохранения
Уведомление:
- Постоянные сообщения записываются на диск, когда они поступают в очередь, и, если возможно, постоянные сообщения также сохраняют резервную копию в памяти, что может в определенной степени повысить производительность, и будут удаляться из памяти только при нехватке памяти. .
- Непостоянные сообщения, как правило, хранятся только в памяти и будут выгружены на диск, когда памяти будет мало, чтобы сэкономить место в памяти.
2. Механизм подтверждения сообщения производителя
После того, как сообщение отправлено, как мы узнаем, правильно ли оно прибыло на биржу? Если сообщение потеряно во время этого процесса, мы не знаем, что произошло или что послужило причиной сбоя отправки сообщения.Для решения этой проблемы есть два основных решения:
- Реализуется механизмом транзакций
- Реализовано через механизм подтверждения сообщения производителя (подтверждение издателя)
но использоватьмеханизм транзакцииРеализация серьезно снизит пропускную способность сообщений RabbitMQ, мы принимаем облегченное решение —Механизм подтверждения сообщения производителя
Что такое механизм подтверждения сообщения? Вкратце: как только сообщение, отправленное производителем, доставлено во все соответствующие очереди, сообщение подтверждения отправляется производителю, которое позволяет производителю узнать, что сообщение прибыло в пункт назначения правильно. Если сообщения и очереди сохраняются, подтверждения отправляются после записи сообщения на диск. добавить еще одинMandatoryПараметры: Когда обязательный параметр установлен в значение true, если цель недостижима, производителю будет отправлено сообщение, производителю для получения информации через функцию обратного вызова.
3, механизм подтверждения сообщения потребителя
Чтобы гарантировать, что сообщения из очереди надежно достигают потребителей, RabbitMQ предоставляет механизм подтверждения сообщения потребителя (подтверждение сообщения). После принятия механизма подтверждения сообщения потребитель имеет достаточно времени для обработки сообщения, и не нужно беспокоиться о потере сообщения после того, как процесс-потребитель зависнет в процессе обработки сообщения, потому что RabbitMQ будет ждать и удерживайте сообщение до тех пор, пока потребитель не подтвердит сообщение.
4. Очередь недоставленных писем
DLX, аббревиатура от Dead Letter Exchange, также почтовый ящик для недоставленных писем, обмен недоставленными письмами. DLX — это обычный коммутатор, который ничем не отличается от обычных коммутаторов. Когда сообщение становится мертвым письмом в очереди, мертвое письмо отправляется в очередь недоставленных сообщений через этот переключатель (укажите соответствующие параметры, rabbitmq отправит его автоматически).
Что такое мертвая буква? Какие новости становятся мертвой буквой?
- Сообщение было отклонено (basic.reject или basic.nack) с requeue=false.
- Срок жизни сообщения истек
- Очередь достигла максимальной длины (очередь заполнена, дополнительные данные не добавляются в MQ)
Анализ сценария применения:При определении бизнес-очереди вы можете рассмотреть возможность указания обмена недоставленными сообщениями и привязки очереди недоставленных сообщений.Когда сообщение становится недоставленным письмом, сообщение будет отправлено в очередь недоставленных сообщений, что нам удобно для проверки отказа сообщения , Причина ** Как использовать переключатель мертвой буквы?
Укажите параметры при определении деловой (обычной) очереди:
- x-dead-letter-exchange: используется для настройки обмена для отправки после недоставленного письма
- x-dead-letter-routing-key: routingKey, используемый для установки недоставленной буквы
@Bean
public Queue helloQueue() {
//将普通队列绑定到私信交换机上
Map<String, Object> args = new HashMap<>(2);
args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName);
args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey);
Queue queue = new Queue(queueName, true, false, false, args);
return queue;
}
3. Практические занятия
Скачать код проекта:git ee.com/срочные клиенты/J IK э… Имя элемента: spring-boot-rabbitmq-надежность
1. Включите механизм подтверждения сообщения производителя.
# 开启发送确认
spring.rabbitmq.publisher-confirms=true
# 开启发送失败退回
spring.rabbitmq.publisher-returns=true
2. Включите механизм подтверждения потребительских сообщений.
# 开启ACK
spring.rabbitmq.listener.simple.acknowledge-mode=manual
3. Базовая конфигурация
@Configuration
public class RabbitConfig {
public final static String queueName = "hello_queue";
/**
* 死信队列:
*/
public final static String deadQueueName = "dead_queue";
public final static String deadRoutingKey = "dead_routing_key";
public final static String deadExchangeName = "dead_exchange";
/**
* 死信队列 交换机标识符
*/
public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
/**
* 死信队列交换机绑定键标识符
*/
public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
@Bean
public Queue helloQueue() {
//将普通队列绑定到私信交换机上
Map<String, Object> args = new HashMap<>(2);
args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName);
args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey);
Queue queue = new Queue(queueName, true, false, false, args);
return queue;
}
/**
* 死信队列:
*/
@Bean
public Queue deadQueue() {
Queue queue = new Queue(deadQueueName, true);
return queue;
}
@Bean
public DirectExchange deadExchange() {
return new DirectExchange(deadExchangeName);
}
@Bean
public Binding bindingDeadExchange(Queue deadQueue, DirectExchange deadExchange) {
return BindingBuilder.bind(deadQueue).to(deadExchange).with(deadRoutingKey);
}
}
Примечания: Hell_Queue, чтобы настроить переключатели мертвой буквы, очереди мертвой буквы
4. Основной код производителя
@Component
public class HelloSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String exchange, String routingKey) {
String context = "你好现在是 " + new Date();
System.out.println("send content = " + context);
this.rabbitTemplate.setMandatory(true);
this.rabbitTemplate.setConfirmCallback(this);
this.rabbitTemplate.setReturnCallback(this);
this.rabbitTemplate.convertAndSend(exchange, routingKey, context);
}
/**
* 确认后回调:
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
System.out.println("send ack fail, cause = " + cause);
} else {
System.out.println("send ack success");
}
}
/**
* 失败后return回调:
*
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey);
}
}
5. Базовый код потребителя
@Component
@RabbitListener(queues = RabbitConfig.queueName)
public class HelloReceiver {
@RabbitHandler
public void process(String hello, Channel channel, Message message) throws IOException {
try {
Thread.sleep(2000);
System.out.println("睡眠2s");
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
//告诉服务器收到这条消息 已经被我消费了 可以在队列删掉;否则消息服务器以为这条消息没处理掉 后续还会在发
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("receiver success = " + hello);
} catch (Exception e) {
e.printStackTrace();
//丢弃这条消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
System.out.println("receiver fail");
}
}
}
6. Функция подтверждения сообщения производителя теста: разделена на 4 сценария для тестирования.
//1、exchange, queue 都正确, confirm被回调, ack=true
@RequestMapping("/send1")
@ResponseBody
public String send1() {
helloSender.send(null, RabbitConfig.queueName);
return "success";
}
//2、exchange 错误, queue 正确, confirm被回调, ack=false
@RequestMapping("/send2")
@ResponseBody
public String send2() {
helloSender.send("fail-exchange", RabbitConfig.queueName);
return "success";
}
//3、exchange 正确, queue 错误, confirm被回调, ack=true; return被回调 replyText:NO_ROUTE
@RequestMapping("/send3")
@ResponseBody
public String send3() {
helloSender.send(null, "fail-queue");
return "success";
}
//4、exchange 错误, queue 错误, confirm被回调, ack=false
@RequestMapping("/send4")
@ResponseBody
public String send4() {
helloSender.send("fail-exchange", "fail-queue");
return "success";
}
7. Протестируйте функцию подтверждения сообщения потребителя.
- 1) При добавлении этой строки кода:
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);Результат теста: сообщение потребляется нормально и удаляется из очереди. - 2) При выходе из этой строки кода:
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);Результат теста: сообщение будет повторно обработано и сохранено в очереди.
8. Протестируйте очередь недоставленных сообщений
При выполнении этой строки кода:channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);Сообщение будет добавлено в очередь недоставленных сообщений:
4. Расширение
В дополнение к базовым гарантиям надежности, о которых мы упоминали выше, на самом деле существует множество схем оптимизации производительности и схем гарантии надежности: мониторинг кластера, управление потоком, зеркальные очереди, высоконадежная балансировка нагрузки HAProxy+Keeplived, и мы продолжим делиться вышеуказанным контентом. в будущем добро пожаловать в продолжение Следуйте... В следующем уроке мы применим эту возможность к архитектуре кэша
Для получения дополнительной информации, пожалуйста, обратите внимание на номер заголовка: Geek Hui или посетитеwww.jikeh.cn