Подтверждение сообщения RabbitMQ (9)

RabbitMQ
Подтверждение сообщения RabbitMQ (9)

Это первый раз, когда я участвую в Gengwen Challenge.27День, подробности о событии уточняйте:Обновить вызов

Со временем капли воды и камни изнашиваются 😄

предисловие

在上一篇中,笔者介绍了怎么让 RabbitMQ 如何保证数据不丢失, 但除此之外,我们还会遇到一个问题,当生产者将消息发送出去之后,消息到底有没有正确地到达RabbitMQ 服务器呢?如果不进行特殊配置,默认情况下发送消息的操作是不会返回任何信息给生产者的,也就是默认情况下生产者是不知道消息有没有正确地到达 RabbitMQ 服务器的。如果在消息到达服务器之前己经丢失,持久化操作也解决不了这个问题,因为消息根本没有到达服务器,何谈持久化?

RabbitMQЕсть два решения этой проблемы:

  • Реализуется механизмом транзакций

  • Реализуется через механизм подтверждения отправителя

механизм транзакции

注:事务机制是确认生产者是否成功发送消息到交换机

RabbitMQСуществует три метода, связанных с механизмом транзакций в клиенте:channel.txSelect,channel.txCommit,channel.txRollback.

channel.txSelectИспользуется для запуска транзакции;

channel.txCommitдля совершения сделок;

channel.txRollbackИспользуется для отката транзакции.

проходя черезchannel.txSelectПосле того, как метод запустит транзакцию, мы можем отправить сообщение наRabbitMQЕсли транзакция успешно завершена, сообщение должно быть полученоRabbitMQ, если перед фиксацией транзакции выполняется из-заRabbitMQЕсли исключение вылетает или выбрасывает исключение по другим причинам, в это время мы можем его перехватить, а затем выполнитьchannel.txRollbackспособ реализации отката транзакции.

совершить транзакцию

public static void main(String[] args) throws IOException {
        Connection conn = RabbitMQUtil.createConn();
        Channel channel = conn.createChannel();
        String exchange = "exchange-1";
        String key = "key-1";

        // 创建交换机
        channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC,true);
        //开启事务
        channel.txSelect();
        try{
            // 发送消息到交换机
            channel.basicPublish(exchange,key,null,"发送路由key为 = key-1 的消息".getBytes());
            //提交事务
            channel.txCommit();
            System.out.println("发送成功");
        }catch (Exception e){
            System.out.println("发送失败,进行日志记录");
            //回滚事务
            channel.txRollback();
        }
    }

бегатьmainметод, вывод发送成功. Это потому, что коммутатор уже существует.流程图По приведенному выше рисунку видно, что есть еще четыре шага, когда механизм транзакций включен и механизм транзакций не включен (прямая отправка):

  • 1. Клиент отправляет Tx.Select, чтобы перевести канал в режим транзакции.
  • 2. Брокер отвечает на Tx.Select-Ok, чтобы подтвердить, что канал был установлен в режим транзакций.
  • 3. После отправки сообщения клиент отправляет Tx.Commit для подтверждения транзакции.
  • 4. Брокер отвечает на Tx.Commit.Ok, чтобы подтвердить фиксацию транзакции.

откат транзакции

Давайте посмотрим на откат транзакции и код. будетexchangeЗначение изменяется наexchange-122, и будет создан комментарий к коду для переключателя.

public static void main(String[] args) throws IOException {
        Connection conn = RabbitMQUtil.createConn();
        Channel channel = conn.createChannel();
        String exchange = "exchange-122";
        String key = "key-1";

        // 创建交换机
       //channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC,true);
        //开启事务
        channel.txSelect();
        try{
            // 发送消息到交换机
            channel.basicPublish(exchange,key,null,"发送路由key为 = key-1 的消息".getBytes());
            //提交事务
            channel.txCommit();
            System.out.println("发送成功");
        }catch (Exception e){
            System.out.println("发送失败,进行日志记录");
            //回滚事务
            channel.txRollback();
        }
    }

image.pngбегатьmainметод, результат вывода:发送失败,进行日志记录.image.pngЭтапы процесса:

  • 1. Клиент отправляет Tx.Select, чтобы перевести канал в режим транзакции.
  • 2. Брокер отвечает на Tx.Select-Ok, чтобы подтвердить, что канал был установлен в режим транзакций.
  • 3. После отправки сообщения обнаруживается исключение, и клиент отправляет Tx.Rollback для отката транзакции.
  • 4. Брокер отвечает на Tx.Rollback.Ok, чтобы подтвердить откат транзакции.

пакетная транзакция

Если нужно отправить несколько сообщений, тоchannel.basicPublish,channel.txCommitИ другие методы могут быть завернуты в цикл.

Пример:

отправить сообщениеexchange-1Коммутатор, коммутатор уже существует, но после отправки сообщения возникает исключение, которое также войдет в операцию отката транзакции.

 String exchange = "exchange-1";
 //开启事务
        channel.txSelect();
        for (int a = 0; a < 10; a++) {
            try{
                channel.basicPublish(exchange,key,null,"发送路由key为 = key-1 的消息".getBytes());
                int i = 1/0;
                //提交事务
                channel.txCommit();
                System.out.println("发送成功");
            }catch (Exception e){
                System.out.println("发送失败,进行日志记录");
                //回滚事务
                channel.txRollback();
            }
        }

image.png

Транзакции разрешают отправителя сообщения иRabbitMQПроблема между подтверждением сообщения, только сообщение успешноRabbitMQПосле получения транзакция может быть успешно отправлена, в противном случае транзакция может быть отброшена после перехвата исключения, и в то же время сообщение может быть повторно отправлено. Но использование механизма транзакций уменьшитRabbitMQпроизводительность, так есть ли лучший способ убедиться, что отправитель сообщения подтверждает, что сообщение было доставлено правильно, без существенной потери производительности? Давайте представимRabbitMQУкажите другой способ:Механизм подтверждения отправителя.

Механизм подтверждения отправителя

注:发送方确认机制是确认生产者是否成功发送消息到交换机

принцип

Продюсер звонитchannel.confirmSelectметод будетchannelустановлен вconfirmрежим, один разchannelВходитьconfirmрежиме, все вchannelСообщениям, опубликованным выше, будет присвоен уникальный идентификатор (начиная с 1), как только сообщение будет доставлено в соответствующую очередь,brokerотправит подтверждение(Basic.Ack)Дайте производителю (включая уникальный идентификатор сообщения), который сообщает производителю, что сообщение прибыло в очередь назначения правильно.Если сообщение и очередь являются постоянными, подтверждающее сообщение будет отправлено после записи сообщения на диск .brokerв подтверждающем сообщении, возвращенном производителюdeliver-tagсодержит порядковый номер сообщения подтверждения, кромеbrokerтакже можно установитьbasic.ackизmultipleпараметр, указывающий, что все сообщения до этого порядкового номера обработаны.

Механизм транзакции блокирует отправителя после отправки сообщения, ожидаяRabbitMQ, прежде чем продолжить отправку следующего сообщения.confirmСамым большим преимуществом шаблона является то, что он асинхронный, после публикации сообщения приложение-производитель может подождать.channelПродолжайте отправлять следующее сообщение, возвращая подтверждение. Когда сообщение будет окончательно подтверждено, приложение-производитель может обработать подтверждающее сообщение с помощью метода обратного вызова. ЕслиRabbitMQСообщение потеряно из-за собственной внутренней ошибки, сообщение отправленоnack(Basic.Nack)команда, приложение-производитель также может обрабатывать это в методе обратного вызоваnackИнформация.

существуетchannelустановлен наconfirmрежиме, все последующие отправленные сообщения будутackили бытьnackоднажды. , сообщение не появляетсяackБылnackситуация, иRabbitMQсообщения не былоconfirmСкорость не дает никаких гарантий.

собственный API

Обычное подтверждение

Вызывается каждый раз при отправке сообщенияchanne.waitForConfirmsметод, ожидающий подтверждения от сервера, что на самом деле является способом последовательного синхронного ожидания. То же, что и механизм транзакций. То есть медленно.

public static void main(String[] args) throws Exception {
    Connection conn = RabbitMQUtil.createConn();
    Channel channel = conn.createChannel();
    String quequ = "queue-2";
    String exchange = "exchange-2";
    String key = "key-2";
    //创建交换机
    channel.exchangeDeclare(exchange, 
    BuiltinExchangeType.TOPIC, true);
    //创建队列
    channel.queueDeclare(quequ, true, false, false, null);
    //队列与交换机绑定
    channel.queueBind(quequ, exchange, key);
    //将信道置为 publisher confirm 模式
    channel.confirmSelect();
    String message = "发送路由key为 = "+ key + "的消息";
    channel.basicPublish(exchange,key,null,
    message.getBytes());
    boolean b = channel.waitForConfirms();
    System.out.println("发送成功 = " + b);
}
结果:发送成功 = true

Измените ключ маршрутизации на:key-22121, создайте переключатель, создайте очередь, аннотируйте очередь и привязку переключателя и посмотрите, будет ли результат успешным.

public static void main(String[] args) throws Exception {
    Connection conn = RabbitMQUtil.createConn();
    Channel channel = conn.createChannel();
    String quequ = "queue-2";
    String exchange = "exchange-2";
    String key = "key-22121";
    //创建交换机
    //channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, true);
    //创建队列
   // channel.queueDeclare(quequ, true, false, false, null);
    //队列与交换机绑定
   // channel.queueBind(quequ, exchange, key);
    //将信道置为 publisher confirm 模式
    channel.confirmSelect();
    String message = "发送路由key为 = "+ key + "的消息";
    channel.basicPublish(exchange,key,null,message.getBytes());
    boolean b = channel.waitForConfirms();
    System.out.println("发送成功 = " + b);
}
结果:发送成功 = true

Вы можете видеть, что результат отправки успешен. Затем снова измените код и измените значение обмена наexchange-2222, остальной код не шевелится, наблюдайте за результатами.

image.pngЗапустите прямую ошибку!

Если вы отправляете несколько сообщений, вам нужно толькоchannel.basicPublish,channel.waitForConfirmsМетод можно завернуть в цикл. Но он по-прежнему вызывается после отправки каждого сообщения.channe.waitForConfirmsметод, ожидая подтверждения от сервера.

channel.confirmSelect();
for (int i = 1; i < 10; i++) {
    String message = "发送路由key为 = "+ key + "的消息";
    channel.basicPublish(exchange,key,null,message.getBytes());
    boolean b = channel.waitForConfirms();
    System.out.println("发送成功" + b);
}

Пакетное подтверждение

После отправки каждого пакета сообщений звонитеchannel.waitForConfirmsметод, дождитесь подтверждения от сервера (также синхронный, просто отправьте несколько фрагментов информации за раз, а затем подтвердите их единообразно).

channel.confirmSelect();
for (int i = 1; i < 10; i++) {
    String message = "发送路由key为 = "+ key + "的消息";
    channel.basicPublish(exchange,key,null,message.getBytes());
}
//批量确认信息,发送的消息中,如果有失败的,不知道是哪一条失败了
boolean b = channel.waitForConfirms();
System.out.println("发送成功" + b);

Асинхронное подтверждение

асинхронныйconfirmПрограммная реализация метода является наиболее сложной и эффективной. на стороне клиентаChannelпредоставляется в интерфейсеaddConfirmListenerможно добавить методConfirmListenerЭтот интерфейс обратного вызова, этотConfirmListenerИнтерфейс содержит два метода:handleAck,handleNack, которые используются для решенияRabbitMQвернулсяBasic.Ack,Basic.Nack. Оба метода содержат два параметраdeliveryTag(标记消息的唯一有序序号) ,multiple(是否批量confirm true代表是)

String quequ = "queue-2";
String exchange = "exchange-2";
String key = "key-2";
//创建交换机
channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, true);
//创建队列
channel.queueDeclare(quequ, true, false, false, null);
//队列与交换机绑定
channel.queueBind(quequ, exchange, key);
channel.confirmSelect();
final ConcurrentSkipListMap<Long,String> map = new ConcurrentSkipListMap();
// 添加一个异步确认的监听器
channel.addConfirmListener(new ConfirmListener() {
    //参数一:deliveryTag: 消息的编号
    //参数二:multiple:是否批量confirm true 是
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("map 数据:" + map.size());
        if (multiple) {
            //如果是批量确认  返回的是小于等于当前序列号的消息 是一个 map
            ConcurrentNavigableMap<Long, String> confirmed =
                    map.headMap(deliveryTag, true);
            //清除该部分未确认消息
            confirmed.clear();
            System.out.println("批量确认清楚 map 数据:" + map.size());
        }else{
            //只清除当前序列号的消息
            map.remove(deliveryTag);
            System.out.println("只清除当前序列号的消息 map 数据:" + map.size());
        }
        System.out.println("消息发送到交换机成功,deliveryTag: " + deliveryTag + ", multiple: " + multiple);
    }

    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("消息发送到交换机失败, deliveryTag: " + deliveryTag + ", multiple: " + multiple);
        String message = map.get(deliveryTag);
        System.out.println("消息发送到交换机失败,发布的消息:"+message+"未被确认,序列号为:"+deliveryTag);
        //拿到了未确认的信息,可以进行其他逻辑,比如添加处理消息重发
    }
});

for (int i = 1; i < 6; i++) {
    String message = "发送路由key为 = "+ key + "的消息";
    // channel.getNextPublishSeqNo()获取下一个消息的序列号
    map.put(channel.getNextPublishSeqNo(),message);
    channel.basicPublish(exchange,key,null,message.getBytes());

}
System.out.println("其他逻辑");

image.png

Затем проверьте случай, когда переключатель не существует, установитеexchangeимя изменено наexchange-9527, аннотируйте код, создающий переключатель.

Connection conn = RabbitMQUtil.createConn();
Channel channel = conn.createChannel();
String quequ = "queue-2";
String exchange = "exchange-9527";
String key = "key-2";
//创建交换机
// channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, true);
//创建队列
// channel.queueDeclare(quequ, true, false, false, null);
//队列与交换机绑定
//  channel.queueBind(quequ, exchange, key);
channel.confirmSelect();
final ConcurrentSkipListMap<Long,String> map = new ConcurrentSkipListMap();
// 添加一个异步确认的监听器
channel.addConfirmListener(new ConfirmListener() {
    //参数一:deliveryTag: 消息的编号
    //参数二:multiple:是否批量confirm true 是
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("map 数据:" + map.size());
        if (multiple) {
            //如果是批量确认  返回的是小于等于当前序列号的消息 是一个 map
            ConcurrentNavigableMap<Long, String> confirmed =
                    map.headMap(deliveryTag, true);
            //清除该部分未确认消息
            confirmed.clear();
            System.out.println("批量确认清楚 map 数据:" + map.size());
        }else{
            //只清除当前序列号的消息
            map.remove(deliveryTag);
            System.out.println("只清除当前序列号的消息 map 数据:" + map.size());
        }
        System.out.println("消息发送到交换机成功,deliveryTag: " + deliveryTag + ", multiple: " + multiple);
    }

    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("消息发送到交换机失败, deliveryTag: " + deliveryTag + ", multiple: " + multiple);
        String message = map.get(deliveryTag);
        System.out.println("消息发送到交换机失败,发布的消息:"+message+"未被确认,序列号为:"+deliveryTag);
        //拿到了未确认的信息,可以进行其他逻辑,比如添加处理消息重发
    }
});

for (int i = 1; i < 6; i++) {
    String message = "发送路由key为 = "+ key + "的消息";
    // channel.getNextPublishSeqNo()获取下一个消息的序列号
    map.put(channel.getNextPublishSeqNo(),message);
    channel.basicPublish(exchange,key,null,message.getBytes());

}
System.out.println("其他逻辑");

image.pngКак видите, ни один код внутри слушателя не выполняется. То есть ни один коммутатор не получает сообщение.

Суммировать

Обычное подтверждение: синхронно ожидает подтверждения, простая, но очень ограниченная пропускная способность.

Пакетное подтверждение: пакетная синхронизация с ожиданием подтверждения, простая, разумная пропускная способность, когда возникает проблема, но трудно определить, в каком сообщении проблема.

Асинхронное подтверждение: лучшая производительность и использование ресурсов, хороший контроль в случае ошибок, но немного громоздкий в реализации.

Метод загрузки

Настройте в yml, требуется ли подтверждение сообщения

spring:
  application:
    name: info-config-boot
  rabbitmq:
    host: 47.105.*
    port: 5672
    virtual-host: /test-1
    username: *
    password: *
    # 开启消息确认
    publisher-confirm-type: correlated

publisher-confirm-typeЕсть три варианта:

  • NONE: отключить режим подтверждения выпуска, по умолчанию.
  • КОРРЕЛЯЦИЯ: метод обратного вызова будет активирован после того, как сообщение будет успешно опубликовано на бирже.
  • ПРОСТО: тестируются два эффекта, один эффект иCORRELATEDОдно и то же значение вызовет метод обратного вызова, а второе будет использовано после успешной публикации сообщения.rabbitTemplateперечислитьwaitForConfirmsилиwaitForConfirmsOrDieМетод ожидает, пока узел-брокер вернет результат отправки, и определяет логику следующего шага в соответствии с возвращенным результатом.waitForConfirmsOrDieметод, если возвращаетсяfalseзакроетсяchannel, следующее сообщение не может быть отправленоbroker.

кодирование

выполнитьConfirmCallback

@Component
public class InfoConfirm implements RabbitTemplate.ConfirmCallback {

   Logger logger = LoggerFactory.getLogger(InfoConfirm.class);

   @Autowired
   private RabbitTemplate rabbitTemplate;

    /**
     * 需要给ConfirmCallback赋值 不然不会走回调方法,默认是null
     */
   @PostConstruct
   public void init(){
       rabbitTemplate.setConfirmCallback(this);
   }

    /**
     * 此方法用于监听消息是否发送到交换机
     * @param correlationData
     * @param ack
     * @param cause
     */
   @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(ack){
            logger.info("消息成功发送到交换机");
            logger.info("id = {} ",correlationData.getId());
            if(correlationData.getReturnedMessage() == null){
                logger.info("消息被确认");
            }else{
                byte[] body = correlationData.getReturnedMessage().getBody();
                logger.info("message = {}",new String(body));
            }

        }else {
            logger.info("消息发送到交换机失败");
            logger.info("cause = {}",cause);
            logger.info("id = {} ",correlationData.getId());
            if(correlationData.getReturnedMessage() == null){
                logger.info("消息异常");
            }else{
                byte[] body = correlationData.getReturnedMessage().getBody();
                logger.info("message = {}",new String(body));
            }

        }
    }
}

реализовать интерфейсConfirmCallback, переписывая егоconfirm()метод с тремя параметрамиcorrelationData,ack,cause.

  • корреляцияДанные: внутри объекта есть только одно свойство id, которое используется для представления уникальности текущего сообщения.
  • ack: статус доставки сообщения брокеру, true указывает на успех.
  • причина: указывает причину сбоя доставки.

Укажите внешний способ доставки

   @GetMapping("/send")
    public void send(){
        CorrelationData correlation = new CorrelationData("设置:" + UUID.randomUUID().toString());
        // exchange-1 的交换机之前已经存在了
        rabbitTemplate.convertAndSend("exchange-1","key-55","发送消息",correlation);
    }

Интерфейс вызова:http://localhost:8080/send

image.pngОтправьте еще одно сообщение о том, что переключатель не существует, и измените значение переключателя наexchange-12222.

Интерфейс вызова:http://localhost:8080/send image.pngМожно обнаружить, что даже если переключатель не существует,confirmтакже можно контролировать методы. Умнее всех вышеперечисленных.Boot YYDS.

  • Если у вас есть какие-либо вопросы по этой статье или есть ошибки в этой статье, пожалуйста, оставьте комментарий. Если вы считаете, что эта статья была вам полезна, ставьте лайк и подписывайтесь на нее.