Это первый раз, когда я участвую в Gengwen Challenge.27День, подробности о событии уточняйте:Обновить вызов
Со временем капли воды и камни изнашиваются 😄
предисловие
在上一篇中,笔者介绍了怎么让 RabbitMQ 如何保证数据不丢失, 但除此之外,我们还会遇到一个问题,当生产者将消息发送出去之后,消息到底有没有正确地到达RabbitMQ 服务器呢?如果不进行特殊配置,默认情况下发送消息的操作是不会返回任何信息给生产者的,也就是默认情况下生产者是不知道消息有没有正确地到达 RabbitMQ 服务器的。如果在消息到达服务器之前己经丢失,持久化操作也解决不了这个问题,因为消息根本没有到达服务器,何谈持久化?
RabbitMQ
Есть два решения этой проблемы:
-
Реализуется механизмом транзакций
-
Реализуется через механизм подтверждения отправителя
механизм транзакции
注:事务机制是确认生产者是否成功发送消息到交换机
RabbitMQ
Существует три метода, связанных с механизмом транзакций в клиенте:channel.txSelect,channel.txCommit,channel.txRollback
.
channel.txSelect
Используется для запуска транзакции;
channel.txCommit
для совершения сделок;
channel.txRollback
Используется для отката транзакции.
проходя черезchannel.txSelect
После того, как метод запустит транзакцию, мы можем отправить сообщение наRabbitMQ
Если транзакция успешно завершена, сообщение должно быть полученоRabbitMQ
, если перед фиксацией транзакции выполняется из-заRabbitMQ
Если исключение вылетает или выбрасывает исключение по другим причинам, в это время мы можем его перехватить, а затем выполнитьchannel.txRollback
способ реализации отката транзакции.
совершить транзакцию
public static void main(String[] args) throws IOException {
Connection conn = RabbitMQUtil.createConn();
Channel channel = conn.createChannel();
String exchange = "exchange-1";
String key = "key-1";
// 创建交换机
channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC,true);
//开启事务
channel.txSelect();
try{
// 发送消息到交换机
channel.basicPublish(exchange,key,null,"发送路由key为 = key-1 的消息".getBytes());
//提交事务
channel.txCommit();
System.out.println("发送成功");
}catch (Exception e){
System.out.println("发送失败,进行日志记录");
//回滚事务
channel.txRollback();
}
}
бегатьmain
метод, вывод发送成功
. Это потому, что коммутатор уже существует.По приведенному выше рисунку видно, что есть еще четыре шага, когда механизм транзакций включен и механизм транзакций не включен (прямая отправка):
- 1. Клиент отправляет Tx.Select, чтобы перевести канал в режим транзакции.
- 2. Брокер отвечает на Tx.Select-Ok, чтобы подтвердить, что канал был установлен в режим транзакций.
- 3. После отправки сообщения клиент отправляет Tx.Commit для подтверждения транзакции.
- 4. Брокер отвечает на Tx.Commit.Ok, чтобы подтвердить фиксацию транзакции.
откат транзакции
Давайте посмотрим на откат транзакции и код. будетexchange
Значение изменяется наexchange-122
, и будет создан комментарий к коду для переключателя.
public static void main(String[] args) throws IOException {
Connection conn = RabbitMQUtil.createConn();
Channel channel = conn.createChannel();
String exchange = "exchange-122";
String key = "key-1";
// 创建交换机
//channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC,true);
//开启事务
channel.txSelect();
try{
// 发送消息到交换机
channel.basicPublish(exchange,key,null,"发送路由key为 = key-1 的消息".getBytes());
//提交事务
channel.txCommit();
System.out.println("发送成功");
}catch (Exception e){
System.out.println("发送失败,进行日志记录");
//回滚事务
channel.txRollback();
}
}
бегатьmain
метод, результат вывода:发送失败,进行日志记录
.Этапы процесса:
- 1. Клиент отправляет Tx.Select, чтобы перевести канал в режим транзакции.
- 2. Брокер отвечает на Tx.Select-Ok, чтобы подтвердить, что канал был установлен в режим транзакций.
- 3. После отправки сообщения обнаруживается исключение, и клиент отправляет Tx.Rollback для отката транзакции.
- 4. Брокер отвечает на Tx.Rollback.Ok, чтобы подтвердить откат транзакции.
пакетная транзакция
Если нужно отправить несколько сообщений, тоchannel.basicPublish
,channel.txCommit
И другие методы могут быть завернуты в цикл.
Пример:
отправить сообщениеexchange-1
Коммутатор, коммутатор уже существует, но после отправки сообщения возникает исключение, которое также войдет в операцию отката транзакции.
String exchange = "exchange-1";
//开启事务
channel.txSelect();
for (int a = 0; a < 10; a++) {
try{
channel.basicPublish(exchange,key,null,"发送路由key为 = key-1 的消息".getBytes());
int i = 1/0;
//提交事务
channel.txCommit();
System.out.println("发送成功");
}catch (Exception e){
System.out.println("发送失败,进行日志记录");
//回滚事务
channel.txRollback();
}
}
Транзакции разрешают отправителя сообщения иRabbitMQ
Проблема между подтверждением сообщения, только сообщение успешноRabbitMQ
После получения транзакция может быть успешно отправлена, в противном случае транзакция может быть отброшена после перехвата исключения, и в то же время сообщение может быть повторно отправлено. Но использование механизма транзакций уменьшитRabbitMQ
производительность, так есть ли лучший способ убедиться, что отправитель сообщения подтверждает, что сообщение было доставлено правильно, без существенной потери производительности? Давайте представимRabbitMQ
Укажите другой способ:Механизм подтверждения отправителя.
Механизм подтверждения отправителя
注:发送方确认机制是确认生产者是否成功发送消息到交换机
принцип
Продюсер звонитchannel.confirmSelect
метод будетchannel
установлен вconfirm
режим, один разchannel
Входитьconfirm
режиме, все вchannel
Сообщениям, опубликованным выше, будет присвоен уникальный идентификатор (начиная с 1), как только сообщение будет доставлено в соответствующую очередь,broker
отправит подтверждение(Basic.Ack)
Дайте производителю (включая уникальный идентификатор сообщения), который сообщает производителю, что сообщение прибыло в очередь назначения правильно.Если сообщение и очередь являются постоянными, подтверждающее сообщение будет отправлено после записи сообщения на диск .broker
в подтверждающем сообщении, возвращенном производителюdeliver-tag
содержит порядковый номер сообщения подтверждения, кромеbroker
также можно установитьbasic.ack
изmultiple
параметр, указывающий, что все сообщения до этого порядкового номера обработаны.
Механизм транзакции блокирует отправителя после отправки сообщения, ожидаяRabbitMQ
, прежде чем продолжить отправку следующего сообщения.confirm
Самым большим преимуществом шаблона является то, что он асинхронный, после публикации сообщения приложение-производитель может подождать.channel
Продолжайте отправлять следующее сообщение, возвращая подтверждение. Когда сообщение будет окончательно подтверждено, приложение-производитель может обработать подтверждающее сообщение с помощью метода обратного вызова. ЕслиRabbitMQ
Сообщение потеряно из-за собственной внутренней ошибки, сообщение отправленоnack(Basic.Nack)
команда, приложение-производитель также может обрабатывать это в методе обратного вызоваnack
Информация.
существуетchannel
установлен наconfirm
режиме, все последующие отправленные сообщения будутack
или бытьnack
однажды. , сообщение не появляетсяack
Былnack
ситуация, иRabbitMQ
сообщения не былоconfirm
Скорость не дает никаких гарантий.
собственный API
Обычное подтверждение
Вызывается каждый раз при отправке сообщенияchanne.waitForConfirms
метод, ожидающий подтверждения от сервера, что на самом деле является способом последовательного синхронного ожидания. То же, что и механизм транзакций. То есть медленно.
public static void main(String[] args) throws Exception {
Connection conn = RabbitMQUtil.createConn();
Channel channel = conn.createChannel();
String quequ = "queue-2";
String exchange = "exchange-2";
String key = "key-2";
//创建交换机
channel.exchangeDeclare(exchange,
BuiltinExchangeType.TOPIC, true);
//创建队列
channel.queueDeclare(quequ, true, false, false, null);
//队列与交换机绑定
channel.queueBind(quequ, exchange, key);
//将信道置为 publisher confirm 模式
channel.confirmSelect();
String message = "发送路由key为 = "+ key + "的消息";
channel.basicPublish(exchange,key,null,
message.getBytes());
boolean b = channel.waitForConfirms();
System.out.println("发送成功 = " + b);
}
结果:发送成功 = true
Измените ключ маршрутизации на:key-22121
, создайте переключатель, создайте очередь, аннотируйте очередь и привязку переключателя и посмотрите, будет ли результат успешным.
public static void main(String[] args) throws Exception {
Connection conn = RabbitMQUtil.createConn();
Channel channel = conn.createChannel();
String quequ = "queue-2";
String exchange = "exchange-2";
String key = "key-22121";
//创建交换机
//channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, true);
//创建队列
// channel.queueDeclare(quequ, true, false, false, null);
//队列与交换机绑定
// channel.queueBind(quequ, exchange, key);
//将信道置为 publisher confirm 模式
channel.confirmSelect();
String message = "发送路由key为 = "+ key + "的消息";
channel.basicPublish(exchange,key,null,message.getBytes());
boolean b = channel.waitForConfirms();
System.out.println("发送成功 = " + b);
}
结果:发送成功 = true
Вы можете видеть, что результат отправки успешен. Затем снова измените код и измените значение обмена наexchange-2222
, остальной код не шевелится, наблюдайте за результатами.
Запустите прямую ошибку!
Если вы отправляете несколько сообщений, вам нужно толькоchannel.basicPublish
,channel.waitForConfirms
Метод можно завернуть в цикл. Но он по-прежнему вызывается после отправки каждого сообщения.channe.waitForConfirms
метод, ожидая подтверждения от сервера.
channel.confirmSelect();
for (int i = 1; i < 10; i++) {
String message = "发送路由key为 = "+ key + "的消息";
channel.basicPublish(exchange,key,null,message.getBytes());
boolean b = channel.waitForConfirms();
System.out.println("发送成功" + b);
}
Пакетное подтверждение
После отправки каждого пакета сообщений звонитеchannel.waitForConfirms
метод, дождитесь подтверждения от сервера (также синхронный, просто отправьте несколько фрагментов информации за раз, а затем подтвердите их единообразно).
channel.confirmSelect();
for (int i = 1; i < 10; i++) {
String message = "发送路由key为 = "+ key + "的消息";
channel.basicPublish(exchange,key,null,message.getBytes());
}
//批量确认信息,发送的消息中,如果有失败的,不知道是哪一条失败了
boolean b = channel.waitForConfirms();
System.out.println("发送成功" + b);
Асинхронное подтверждение
асинхронныйconfirm
Программная реализация метода является наиболее сложной и эффективной. на стороне клиентаChannel
предоставляется в интерфейсеaddConfirmListener
можно добавить методConfirmListener
Этот интерфейс обратного вызова, этотConfirmListener
Интерфейс содержит два метода:handleAck
,handleNack
, которые используются для решенияRabbitMQ
вернулсяBasic.Ack
,Basic.Nack
. Оба метода содержат два параметраdeliveryTag(标记消息的唯一有序序号)
,multiple(是否批量confirm true代表是)
String quequ = "queue-2";
String exchange = "exchange-2";
String key = "key-2";
//创建交换机
channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, true);
//创建队列
channel.queueDeclare(quequ, true, false, false, null);
//队列与交换机绑定
channel.queueBind(quequ, exchange, key);
channel.confirmSelect();
final ConcurrentSkipListMap<Long,String> map = new ConcurrentSkipListMap();
// 添加一个异步确认的监听器
channel.addConfirmListener(new ConfirmListener() {
//参数一:deliveryTag: 消息的编号
//参数二:multiple:是否批量confirm true 是
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("map 数据:" + map.size());
if (multiple) {
//如果是批量确认 返回的是小于等于当前序列号的消息 是一个 map
ConcurrentNavigableMap<Long, String> confirmed =
map.headMap(deliveryTag, true);
//清除该部分未确认消息
confirmed.clear();
System.out.println("批量确认清楚 map 数据:" + map.size());
}else{
//只清除当前序列号的消息
map.remove(deliveryTag);
System.out.println("只清除当前序列号的消息 map 数据:" + map.size());
}
System.out.println("消息发送到交换机成功,deliveryTag: " + deliveryTag + ", multiple: " + multiple);
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息发送到交换机失败, deliveryTag: " + deliveryTag + ", multiple: " + multiple);
String message = map.get(deliveryTag);
System.out.println("消息发送到交换机失败,发布的消息:"+message+"未被确认,序列号为:"+deliveryTag);
//拿到了未确认的信息,可以进行其他逻辑,比如添加处理消息重发
}
});
for (int i = 1; i < 6; i++) {
String message = "发送路由key为 = "+ key + "的消息";
// channel.getNextPublishSeqNo()获取下一个消息的序列号
map.put(channel.getNextPublishSeqNo(),message);
channel.basicPublish(exchange,key,null,message.getBytes());
}
System.out.println("其他逻辑");
Затем проверьте случай, когда переключатель не существует, установитеexchange
имя изменено наexchange-9527
, аннотируйте код, создающий переключатель.
Connection conn = RabbitMQUtil.createConn();
Channel channel = conn.createChannel();
String quequ = "queue-2";
String exchange = "exchange-9527";
String key = "key-2";
//创建交换机
// channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, true);
//创建队列
// channel.queueDeclare(quequ, true, false, false, null);
//队列与交换机绑定
// channel.queueBind(quequ, exchange, key);
channel.confirmSelect();
final ConcurrentSkipListMap<Long,String> map = new ConcurrentSkipListMap();
// 添加一个异步确认的监听器
channel.addConfirmListener(new ConfirmListener() {
//参数一:deliveryTag: 消息的编号
//参数二:multiple:是否批量confirm true 是
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("map 数据:" + map.size());
if (multiple) {
//如果是批量确认 返回的是小于等于当前序列号的消息 是一个 map
ConcurrentNavigableMap<Long, String> confirmed =
map.headMap(deliveryTag, true);
//清除该部分未确认消息
confirmed.clear();
System.out.println("批量确认清楚 map 数据:" + map.size());
}else{
//只清除当前序列号的消息
map.remove(deliveryTag);
System.out.println("只清除当前序列号的消息 map 数据:" + map.size());
}
System.out.println("消息发送到交换机成功,deliveryTag: " + deliveryTag + ", multiple: " + multiple);
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息发送到交换机失败, deliveryTag: " + deliveryTag + ", multiple: " + multiple);
String message = map.get(deliveryTag);
System.out.println("消息发送到交换机失败,发布的消息:"+message+"未被确认,序列号为:"+deliveryTag);
//拿到了未确认的信息,可以进行其他逻辑,比如添加处理消息重发
}
});
for (int i = 1; i < 6; i++) {
String message = "发送路由key为 = "+ key + "的消息";
// channel.getNextPublishSeqNo()获取下一个消息的序列号
map.put(channel.getNextPublishSeqNo(),message);
channel.basicPublish(exchange,key,null,message.getBytes());
}
System.out.println("其他逻辑");
Как видите, ни один код внутри слушателя не выполняется. То есть ни один коммутатор не получает сообщение.
Суммировать
Обычное подтверждение: синхронно ожидает подтверждения, простая, но очень ограниченная пропускная способность.
Пакетное подтверждение: пакетная синхронизация с ожиданием подтверждения, простая, разумная пропускная способность, когда возникает проблема, но трудно определить, в каком сообщении проблема.
Асинхронное подтверждение: лучшая производительность и использование ресурсов, хороший контроль в случае ошибок, но немного громоздкий в реализации.
Метод загрузки
Настройте в yml, требуется ли подтверждение сообщения
spring:
application:
name: info-config-boot
rabbitmq:
host: 47.105.*
port: 5672
virtual-host: /test-1
username: *
password: *
# 开启消息确认
publisher-confirm-type: correlated
publisher-confirm-type
Есть три варианта:
- NONE: отключить режим подтверждения выпуска, по умолчанию.
- КОРРЕЛЯЦИЯ: метод обратного вызова будет активирован после того, как сообщение будет успешно опубликовано на бирже.
- ПРОСТО: тестируются два эффекта, один эффект и
CORRELATED
Одно и то же значение вызовет метод обратного вызова, а второе будет использовано после успешной публикации сообщения.rabbitTemplate
перечислитьwaitForConfirms
илиwaitForConfirmsOrDie
Метод ожидает, пока узел-брокер вернет результат отправки, и определяет логику следующего шага в соответствии с возвращенным результатом.waitForConfirmsOrDie
метод, если возвращаетсяfalse
закроетсяchannel
, следующее сообщение не может быть отправленоbroker
.
кодирование
выполнитьConfirmCallback
@Component
public class InfoConfirm implements RabbitTemplate.ConfirmCallback {
Logger logger = LoggerFactory.getLogger(InfoConfirm.class);
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 需要给ConfirmCallback赋值 不然不会走回调方法,默认是null
*/
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
}
/**
* 此方法用于监听消息是否发送到交换机
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
logger.info("消息成功发送到交换机");
logger.info("id = {} ",correlationData.getId());
if(correlationData.getReturnedMessage() == null){
logger.info("消息被确认");
}else{
byte[] body = correlationData.getReturnedMessage().getBody();
logger.info("message = {}",new String(body));
}
}else {
logger.info("消息发送到交换机失败");
logger.info("cause = {}",cause);
logger.info("id = {} ",correlationData.getId());
if(correlationData.getReturnedMessage() == null){
logger.info("消息异常");
}else{
byte[] body = correlationData.getReturnedMessage().getBody();
logger.info("message = {}",new String(body));
}
}
}
}
реализовать интерфейсConfirmCallback
, переписывая егоconfirm()
метод с тремя параметрамиcorrelationData
,ack
,cause
.
- корреляцияДанные: внутри объекта есть только одно свойство id, которое используется для представления уникальности текущего сообщения.
- ack: статус доставки сообщения брокеру, true указывает на успех.
- причина: указывает причину сбоя доставки.
Укажите внешний способ доставки
@GetMapping("/send")
public void send(){
CorrelationData correlation = new CorrelationData("设置:" + UUID.randomUUID().toString());
// exchange-1 的交换机之前已经存在了
rabbitTemplate.convertAndSend("exchange-1","key-55","发送消息",correlation);
}
Интерфейс вызова:http://localhost:8080/send
Отправьте еще одно сообщение о том, что переключатель не существует, и измените значение переключателя наexchange-12222
.
Интерфейс вызова:http://localhost:8080/send
Можно обнаружить, что даже если переключатель не существует,confirm
также можно контролировать методы. Умнее всех вышеперечисленных.Boot YYDS.
- Если у вас есть какие-либо вопросы по этой статье или есть ошибки в этой статье, пожалуйста, оставьте комментарий. Если вы считаете, что эта статья была вам полезна, ставьте лайк и подписывайтесь на нее.