Эта статья размещена в личном блоге:www.chengxy-nds.top, совместное использование технических ресурсов и совместный прогресс
Недавно отдел призвал всех организовать больше сеансов обмена техническими данными, сказав, что это должно активировать техническую атмосферу компании, но я все видел и знаю, что эта ТМ для чистки.KPI
. Тем не менее, это действительно хорошо: вместо этих скучных совещаний по спорам очень полезно для личностного роста проводить больше технических обменов.
Так что я взял на себя инициативу зарегистрироваться, чтобы участвовать в обмене, кхе-кхе~, на самом деле не для этого.KPI
, Я просто хочу учиться с вами, ребята!
На этот раз я делюсьspringboot
+ rabbitmq
Как реализовать механизм подтверждения сообщения и немного опыта в реальной разработке, на самом деле, общий контент относительно прост, и иногда вещи настолько волшебны, что чем проще, тем легче они могут пойти не так.
можно увидеть б/уRabbitMQ
В дальнейшем наши деловые связи стали значительно длиннее, хотя развязка между системами была достигнута, количество сценариев, которые могут привести к потере сообщений, также увеличилось. Например:
-
производитель сообщений -> сервер rabbitmq (отправка сообщения не удалась)
-
Сбой самого сервера rabbitmq приводит к потере сообщений
-
потребитель сообщений -> служба rabbitmq (не удалось использовать сообщение)
Поэтому по возможности старайтесь не использовать middleware, если вы используете его ради использования, это только добавит проблем. После включения механизма подтверждения сообщения, несмотря на то, что точная доставка сообщения в значительной степени гарантирована из-за частых взаимодействий с подтверждением,rabbitmq
Общая эффективность становится ниже, а пропускная способность серьезно падает.Настоятельно не рекомендуется использовать механизм подтверждения сообщения для сообщений, которые не очень важны.
Далее реализуемspringboot
+ rabbitmq
Механизм подтверждения сообщения, а затем сделать конкретный анализ возникших проблем.
1. Подготовьте среду
1. Представьте пакет зависимостей rabbitmq
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2. Измените конфигурацию application.properties.
Его нужно включить в настройках发送端
и消费端
подтверждение сообщения.
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 发送者开启 confirm 确认机制
spring.rabbitmq.publisher-confirms=true
# 发送者开启 return 确认机制
spring.rabbitmq.publisher-returns=true
####################################################
# 设置消费端手动 ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 是否支持重试
spring.rabbitmq.listener.simple.retry.enabled=true
3. Определите обмен и очередь
определить переключательconfirmTestExchange
и очередьconfirm_test_queue
и привязать очередь к обмену.
@Configuration
public class QueueConfig {
@Bean(name = "confirmTestQueue")
public Queue confirmTestQueue() {
return new Queue("confirm_test_queue", true, false, false);
}
@Bean(name = "confirmTestExchange")
public FanoutExchange confirmTestExchange() {
return new FanoutExchange("confirmTestExchange");
}
@Bean
public Binding confirmTestFanoutExchangeAndQueue(
@Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange,
@Qualifier("confirmTestQueue") Queue confirmTestQueue) {
return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);
}
}
rabbitmq
Подтверждение сообщения делится на две части: подтверждение отправки сообщения и подтверждение получения сообщения.
2. Подтверждение отправки сообщения
Подтверждение отправки сообщения: используется для подтверждения производителяproducer
отправить сообщениеbroker
,broker
включитьexchange
репост в очередьqueue
Во время процесса, было ли сообщение успешно доставлено.
Сообщение отproducer
прибытьrabbitmq broker
существует одинconfirmCallback
Режим подтверждения.
Сообщение отexchange
прибытьqueue
Произошел сбой доставкиreturnCallback
Задний режим.
Мы можем использовать эти дваCallback
Обеспечить 100% доставку товара.
1. ConfirmCallback режим подтверждения
сообщение, покаrabbitmq broker
Срабатывает при полученииconfirmCallback
Перезвони .
@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.error("消息发送异常!");
} else {
log.info("发送者爸爸已经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
}
}
}
реализовать интерфейсConfirmCallback
, переписывая егоconfirm()
метод с тремя параметрамиcorrelationData
,ack
,cause
.
-
correlationData
: внутри объекта есть только одинid
Атрибут, используемый для указания уникальности текущего сообщения. -
ack
: сообщение доставленоbroker
статус,true
Указывает на успех. -
cause
: Указывает причину сбоя доставки.
но сообщение былоbroker
Получено может указывать только на то, что оно достигло сервера MQ, и не гарантирует, что сообщение будет доставлено адресату.queue
внутри. Итак, далее вам нужно использоватьreturnCallback
.
2. Режим возврата обратного вызова ReturnCallback
Если сообщение не может быть доставлено адресатуqueue
вызовет обратный вызовreturnCallback
, раз кqueue
Если доставка сообщения не удалась, здесь обычно записываются подробные данные о доставке текущего сообщения, что удобно для последующих операций, таких как повторная передача или компенсация.
@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);
}
}
реализовать интерфейсReturnCallback
, переписатьreturnedMessage()
метод, метод имеет пять параметровmessage
(Сообщение),replyCode
(код ответа),replyText
(содержание ответа),exchange
(выключатель),routingKey
(очередь).
Ниже приводится конкретное сообщение, отправленное вrabbitTemplate
установить вConfirm
иReturn
обратный звонок, мы передаемsetDeliveryMode()
Сохраните сообщение и создайте его для последующего тестирования.CorrelationData
объект, добавьтеid
за10000000000
.
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ConfirmCallbackService confirmCallbackService;
@Autowired
private ReturnCallbackService returnCallbackService;
public void sendMessage(String exchange, String routingKey, Object msg) {
/**
* 确保消息发送失败后可以重新返回到队列中
* 注意:yml需要配置 publisher-returns: true
*/
rabbitTemplate.setMandatory(true);
/**
* 消费者确认收到消息后,手动ack回执回调处理
*/
rabbitTemplate.setConfirmCallback(confirmCallbackService);
/**
* 消息投递到队列失败回调处理
*/
rabbitTemplate.setReturnCallback(returnCallbackService);
/**
* 发送消息
*/
rabbitTemplate.convertAndSend(exchange, routingKey, msg,
message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
},
new CorrelationData(UUID.randomUUID().toString()));
}
3. Подтверждение получения сообщения
Подтверждение получения сообщения немного проще, чем подтверждение отправки сообщения, потому что есть только одно подтверждение сообщения (ack
) процесс. использовать@RabbitHandler
Метод аннотации аннотации должен быть увеличенchannel
(канал),message
два параметра.
@Slf4j
@Component
@RabbitListener(queues = "confirm_test_queue")
public class ReceiverMessage1 {
@RabbitHandler
public void processHandler(String msg, Channel channel, Message message) throws IOException {
try {
log.info("小富收到消息:{}", msg);
//TODO 具体业务
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
log.error("消息已重复处理失败,拒绝再次接收...");
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
} else {
log.error("消息即将再次返回队列处理...");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
}
Существует три метода получения сообщений, давайте проанализируем значение каждого метода.
1. Базовый доступ
basicAck
: Указывает на успешное подтверждение.После использования этого метода получения сообщение будетrabbitmq broker
Удалить.
void basicAck(long deliveryTag, boolean multiple)
deliveryTag
: указывает порядковый номер доставки сообщения.После того, как каждое сообщение использовано или сообщение повторно доставлено,deliveryTag
повысится. В ручном режиме подтверждения сообщения мы можем указатьdeliveryTag
новости оack
,nack
,reject
и так далее.
multiple
: Подтверждать ли партиями, значение равноtrue
будет одноразовымack
Все сообщения меньше текущегоdeliveryTag
Новости.
Возьмите каштан:Предположим, я сначала отправляю три сообщенияdeliveryTag
Их 5, 6, 7, но ни один из них не подтвердился, когда я отправляю четвертое сообщениеdeliveryTag
8,multiple
Если установлено значение true, все сообщения 5, 6, 7 и 8 будут подтверждены.
2. Базовый Нак
basicNack
: Указывает на подтверждение ошибки.Этот метод обычно используется, когда бизнес сообщений потребления ненормальный, и сообщение может быть повторно доставлено в очередь.
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
deliveryTag
: Указывает порядковый номер доставки сообщения.
multiple
: Подтверждать ли партиями.
requeue
: значениеtrue
Сообщение будет повторно поставлено в очередь.
3. Базовый отказ
basicReject
: отклонить сообщение, сbasicNack
Разница в том, что нельзя выполнять пакетные операции, другие способы использования очень похожи.
void basicReject(long deliveryTag, boolean requeue)
deliveryTag
: Указывает порядковый номер доставки сообщения.
requeue
: значениеtrue
Сообщение будет повторно поставлено в очередь.
4. Тест
Отправьте сообщение, чтобы проверить, действует ли механизм подтверждения сообщения.Из результата выполнения видно, что отправитель успешно перезванивает после отправки сообщения, а потребитель успешно использует сообщение.Используйте инструмент захвата пакетовWireshark
наблюдатьrabbitmq
Много изменений во взаимодействии протокола amqpack
процесс.
5. Наступить на бревно ямы
1. Нет подтверждения сообщения
Это очень нетехническая яма, но здесь очень легко ошибиться.
Включите механизм подтверждения сообщений, не забывайте потреблять сообщенияchannel.basicAck
, в противном случае сообщение будет существовать всегда, что приведет к повторному потреблению.
2. Неограниченная доставка сообщений
Когда я впервые столкнулся с механизмом подтверждения сообщения, код потребителя был написан следующим образом, идея очень проста: подтвердить сообщение после обработки бизнес-логики,int a = 1 / 0
Повторно поставить сообщение в очередь после возникновения исключения.
@RabbitHandler
public void processHandler(String msg, Channel channel, Message message) throws IOException {
try {
log.info("消费者 2 号收到:{}", msg);
int a = 1 / 0;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
Но есть проблема, как только появляется бизнес-кодbug
В 99,9% случаев оно не будет автоматически восстановлено, сообщение будет доставлено в очередь на неопределенное время, а потребитель будет бесконечно его выполнять, в результате чего получится бесконечный цикл.
местныйCPU
Он был заполнен в одно мгновение, вы можете себе представить, как я паниковал, когда в то время сервис рухнул в производственной среде.
иrabbitmq management
Есть только одно неподтвержденное сообщение.
После тестирования и анализа обнаружено, что при повторной доставке сообщения в очередь сообщений сообщение не возвращается в хвост очереди, а все еще находится в начале очереди.
Потребитель немедленно обработает сообщение, бизнес-процесс выдаст исключение, сообщение будет повторно поставлено в очередь и так далее. Блокирует обработку очереди сообщений, в результате чего обычные сообщения не выполняются.
Наше решение в то время состояло в том, чтобы сначала ответить на сообщение. В это время очередь сообщений удалит сообщение. В то же время мы снова отправляем сообщение в очередь сообщений. Сообщение об исключении помещается в конец очередь сообщений, которая гарантирует, что сообщение не потеряется, но и для обеспечения нормального ведения бизнеса.
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 重新发送消息到队尾
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
JSON.toJSONBytes(msg));
Однако этот метод не решает фундаментальной проблемы, и сообщение об ошибке все равно будет время от времени сообщаться. Позже количество повторных попыток сообщения оптимизируется и устанавливается. Когда лимит повторных попыток достигнут, вручную подтвердите сообщение, удалите сообщение из очереди и сохранить сообщение вMySQL
И нажмите будильник, выполните ручную обработку и временные задачи для компенсации.
3. Повторное потребление
Как обеспечить идемпотентность потребления MQ, это необходимо определить в соответствии с конкретным бизнесом, вы можете использоватьMySQL
,илиredis
Сохраните сообщение и проверьте атрибут уникальности в сообщении.
demo
изGitHub
Адрес https://github.com/chengxy-nds/Springboot-Notebook/tree/master/springboot-rabbitmq-confirm
Оригинальность не так проста, содержание вывода волос горит, если есть потеря, пожалуйста, нажмите и поддержите его!
Разобраны и розданы друзьям сотни различных технических электронных книг. Подпишитесь на официальный аккаунт, чтобы ответить【
666
] Самовывоз. Мы создали группу технического обмена с некоторыми друзьями, чтобы обсуждать технологии и делиться технической информацией, стремясь учиться и развиваться вместе.Если вам интересно, отсканируйте код, чтобы присоединиться к нам!
В этой статье используетсяmdniceнабор текста