Жизнь в конце концов станет одиночным путешествием, до одиночества — смятение, после одиночества — рост.
клин
Эта статья представляет собой очередь сообщенийRabbitMQ
четвертая пуля.
RabbitMQ
Я написал уже три статьи.Я уже написал основные понятия отправки и получения сообщений и основные понятия.Это случай с обучением чему-либо.Сначала основы можно использовать,а потом если возникнут проблемы,можно их решить , Если вы не можете понять это, углубитесь в исходный код, понимание этой технологии также улучшится со временем.
Я считаю, что после того, как основные операции будут освоены, у каждого обязательно найдется мужество подняться на более высокое место.Сегодня я перечислю некоторые из них.RabbitMQ
Более продвинутое использование, некоторые из них полезны, а некоторые нет, но вы должны понимать, потому что в большинстве случаев мы учимся для интервью~
- Как обеспечить достоверность сообщения?
- Как ограничить поток очереди сообщений?
- Как настроить отложенную очередь для отложенного потребления?
Желаю вам хорошего урожая, как сначала, так и потом смотреть, и иметь бесконечное счастье.
Код для этой статьи: Адрес облака кода Адрес GitHub
1. 📖Как обеспечить достоверность сообщения?
Давайте посмотрим на наш десятитысячелетний график.Из графика мы, вероятно, видим, что сообщение будет проходить через четыре узла.Только обеспечив надежность этих четырех узлов, можно гарантировать надежность всей системы.
- После выдачи прибыли производители MQ.
- MQ получает сообщение и гарантирует, что оно будет распространено на Exchange, соответствующем сообщению.
- После того, как Exchange поместит сообщение в очередь, он обеспечивает сохранение сообщения.
- После того, как потребитель получит сообщение, гарантируется правильное потребление сообщения.
После этих четырех гарантий мы можем гарантировать надежность сообщения, чтобы гарантировать, что сообщение не будет потеряно.
2. 🔍Продюсеру не удалось отправить сообщение в MQ
После того, как наш продюсер отправит сообщение, это может привести к тому, что наши новости будут отправлены в MQ из-за различных причин, таких как сетевое потрясение, но на этот раз мы не знали, что наши новости не отправили их, это приведет к потере новостей.
Для решения этой проблемы,RabbitMQ
представилмеханизм транзакциииМеханизм подтверждения отправителя (подтверждение издателя), поскольку механизм транзакций слишком требователен к производительности, он обычно не используется.Здесь я сосредоточусь наМеханизм подтверждения отправителя.
Этот механизм хорошо изучен, т.После того, как сообщение будет отправлено на сторону MQ, MQ вернет нам подтверждающее сообщение..
Включение этой функции требует настройки.Далее позвольте мне продемонстрировать конфигурацию:
spring:
rabbitmq:
addresses: 127.0.0.1
host: 5672
username: guest
password: guest
virtual-host: /
# 打开消息确认机制
publisher-confirm-type: correlated
Нам нужно только открыть подтверждение сообщения в конфигурации.
Режиссер:
public void sendAndConfirm() {
User user = new User();
log.info("Message content : " + user);
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(Producer.QUEUE_NAME,user,correlationData);
log.info("消息发送完毕。");
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("CorrelationData content : " + correlationData);
log.info("Ack status : " + ack);
log.info("Cause content : " + cause);
if(ack){
log.info("消息成功发送,订单入库,更改订单状态");
}else{
log.info("消息发送失败:"+correlationData+", 出现异常:"+cause);
}
}
});
}
В коде производителя мы видим еще один параметр:CorrelationData
, этот параметр используется для однозначной идентификации сообщения, в то же время, после того, как мы открываем подтверждение сообщения, нам нужноrabbitTemplate
установить еще одинsetConfirmCallback
, параметр является анонимным классом, и обработка после того, как наше сообщение подтверждает успех или неудачу, записывается в этот анонимный класс.
Например, для сообщения заказа, когда сообщение будет подтверждено MQ, оно будет помещено на склад или статус узла заказа будет изменен.Если сообщение не достигает MQ успешно, запись может быть сделана или статус заказа может быть изменен.
Tip: ошибка подтверждения сообщения срабатывает не только в том случае, если сообщение не отправлено, но также срабатывает, если сообщение отправлено, но соответствующий Exchange не может быть найден.
3. 📔Сбой получения MQ или сбой маршрутизации
После обработки сообщения, отправленного производителем, мы можем посмотреть на обработку на стороне MQ.С MQ могут быть две проблемы:
- Сообщение не может найти соответствующий Exchange.
- Обмен был найден, но не удалось найти соответствующую Очередь.
Оба случая можно использоватьRabbitMQ
который предоставилmandatory
параметр для решения, он установит политику отказа доставки сообщения, есть две стратегии: автоматическое удаление или возврат клиенту.
Так как мы хотим сделать надежность, естественно, он настроен на возврат к клиенту (trueвозвращается клиенту,falseавтоматически удаляется).
Конфигурация:
spring:
rabbitmq:
addresses: 127.0.0.1
host: 5672
username: guest
password: guest
virtual-host: /
# 打开消息确认机制
publisher-confirm-type: correlated
# 打开消息返回
publisher-returns: true
template:
mandatory: true
Нам нужно только открыть возврат сообщения в конфигурации,template.mandatory: true
Не пропустите этот шаг~
Режиссер:
public void sendAndReturn() {
User user = new User();
log.info("Message content : " + user);
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("被退回的消息为:{}", message);
log.info("replyCode:{}", replyCode);
log.info("replyText:{}", replyText);
log.info("exchange:{}", exchange);
log.info("routingKey:{}", routingKey);
});
rabbitTemplate.convertAndSend("fail",user);
log.info("消息发送完毕。");
}
Здесь мы можем получить всю информацию о возвращенном сообщении, а затем обработать его, например, поместить в новую очередь для отдельной обработки.Ошибка маршрутизации обычно является проблемой конфигурации.
4. 📑MQ аварийно завершает работу после постановки сообщения в очередь
На этом этапе в основном возникают проблемы с малой вероятностью. Например, MQ внезапно выходит из строя или закрывается. Для такого рода проблем сообщение должно сохраняться, чтобы его можно было восстановить после перезапуска MQ.
Необходимо обеспечить сохранение сообщений, но не только сохранение сообщений, но также сохранение очередей и сохранение Exchange.
@Bean
public DirectExchange directExchange() {
// 三个构造参数:name durable autoDelete
return new DirectExchange("directExchange", false, false);
}
@Bean
public Queue erduo() {
// 其三个参数:durable exclusive autoDelete
// 一般只设置一下持久化即可
return new Queue("erduo",true);
}
Если при создании Exchange и очереди установлено сохранение, отправляемые сообщения по умолчанию являются постоянными сообщениями.
При настройке постоянства убедитесь, что и Exchange, и очередь являются постоянными:
Установите только сохраняемость Exchange, очередь будет потеряна после перезапуска. Просто поставьте постоянство очереди.После перезагрузки Exchange пропадет, и сообщение тоже потеряется, так что будет бессмысленно, если вы не поставите постоянство на две части.
Подсказка:Это все проблемы, вызванные простоем MQ.Если сервер не работает или диск поврежден, все вышеперечисленные методы будут недействительны.Нужно ввести зеркальные очереди, чтобы выполнять больше работы в разных местах, чтобы противостоять таким форс-мажорам.
5. 📌Потребители не могут нормально потреблять
Проблема на последнем этапе находится на стороне потребителя, но решение этой проблемы было упомянуто в нашей предыдущей статье, что является подтверждением сообщения потребителя.
spring:
rabbitmq:
addresses: 127.0.0.1
host: 5672
username: guest
password: guest
virtual-host: /
# 手动确认消息
listener:
simple:
acknowledge-mode: manual
После того, как ручное подтверждение сообщения включено, пока наше сообщение не будет успешно использовано, независимо от того, есть ли время простоя потребителя или исключение кода в середине, пока сообщение не было использовано после отключения соединения, сообщение будет снова помещено в очередь.
Конечно, может быть и повторное потребление, но идемпотентность должна быть реализована в распределенных системах, поэтому, как правило, повторное потребление будет блокироваться идемпотентностью интерфейса.
Так называемая идемпотентность:Многократное выполнение операции приводит к тому же результату, что и однократное выполнение.
Содержание идемпотентности выходит за рамки этой главы, поэтому я не буду вдаваться в подробности.
6. 💡Надежность сообщений
Эта картинка была нарисована давным-давно, чтобы зафиксировать использованиеRabbitMQ
Конкретный метод создания достоверных новостей, здесь я просто привожу пример для всеобщего обозрения.
Сообщение в этом примере сначала помещается в базу данных, а затем производитель получает данные из БД, упаковывает их в сообщение и отправляет в MQ.После того, как потребитель потребляет его, состояние данных БД изменяется, и затем он снова сохраняется.
Если какие-либо шаги завершаются сбоем в середине, статус данных не обновляется.В это время база данных постоянно обновляется с помощью запланированного задания, а проблемные данные обнаруживаются и повторно отправляются производителю для повторной доставки.
По сути, это решение похоже на многие решения в Интернете.После того, как базовая надежность гарантирована, запланированные задачи выполняют сканирование снизу вверх для обеспечения 100% надежности.
постскриптум
Чем больше я пишу, тем дольше я пишу, текущая очередь лимита и задержки будет помещена в следующую статью из-за места, и я отправлю ее как можно скорее, чтобы все прочитали.Честно говоря, я действительно не не хочу писать более одной статьи! ! !
Наконец, я буду рекламировать на Youhu.Недавно Nuggets установили программу с открытым исходным кодом на GitHub -open-source, целью которого является включение всевозможных забавных и полезных библиотек с открытым исходным кодом. Если у вас есть библиотеки с открытым исходным кодом, которые вы хотите порекомендовать или поделиться, вы можете принять участие и внести свой вклад в этот план с открытым исходным кодом.Start
Он также неуклонно растет, и участие в нем также может увеличить экспозицию ваших собственных проектов, убивая двух зайцев одним выстрелом.
В то же время у этой библиотеки с открытым исходным кодом есть родственный проект —open-source-translation, целью которого является найм волонтеров по переводу технических статей для перевода технических статей, Стремитесь быть лучшим переводчиком с открытым исходным кодом, переводите высококачественные документы в отрасли и вносите свой вклад в рост технических специалистов.
В эти дни произошло много всего. Youhu заставил меня перейти на уровень 3 до конца августа, поэтому лайки читателей очень важны для меня. Надеюсь, вы поднимете руку и поможете мне~
Ну вот и все содержание этого выпуска.Спасибо, что вы здесь.Приглашаем лайкать,собирать и комментировать эту статью.👍Каждый ваш лайк-самая большая мотивация для моего творчества.
Я Эр, псевдолитературный программист, который всегда хотел выводить знания, до встречи в следующем выпуске.
Код для этой статьи:Адрес облака кода Адрес GitHub