[RabbitMQ] Как надежно доставлять сообщения [Часть 1]

RabbitMQ
[RabbitMQ] Как надежно доставлять сообщения [Часть 1]

иллюстрировать

Несколько дней назад внезапно возникла онлайн-тревога, и Dingding отправил несколько сообщений подряд.Когда я увидел, что это связано с RabbitMQ, я занервничал.Не перевернулась ли машина?

u=1091165172,1855706818&fm=26&gp=0.jpg

[橙色报警] 应用[xxx]在[08-15 16:36:04]发生[错误日志异常],alertId=[xxx]。由[org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620]触发。
应用xxx 可能原因如下
服务名为:
 异常为:org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620
 产生原因如下:
1.org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
||Consumer received fatal=false exception on startup:
...
应用xxx 可能原因如下
服务名为:
 异常为:org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer:run:1160
 产生原因如下:
1.Stopping container from aborted consumer||Stopping container from aborted consumer:

При ближайшем рассмотрении кажется, что потребитель отключился по необъяснимым причинам, так же, как компания переезжает, потому что компьютерный зал обманул. . . . Отключение электричества? Поэтому я быстро связался с эксплуатацией и техническим обслуживанием, чтобы узнать, был ли настроен RabbitMQ. Через несколько минут я получил ответ от службы эксплуатации и обслуживания. По каким-то непонятным причинам RabbitMQ перезапустился. Эммм, хотя перезагрузка длилась всего 10 минут, это привело к зависанию всех потребителей в кластере. быть перезапущен.нормальное потребление.

После перезапуска проекта все вроде бы снова пошло нормально, но хорошие времена длились недолго. Заказ на работу не заставил себя долго ждать. После расследования выяснилось, что производитель не справился. для доставки сообщения во время перезапуска RabbitMQ, что приводит к потере сообщения и необходимости ручной обработки и восстановления.

Итак, я начал думать, как можно надежно доставлять сообщения RabbitMQ? Особенно в такой экстремальной ситуации, когда кластер RabbitMQ недоступен, как быть с недоставленными сообщениями?

надежная доставка

Давайте сначала объясним концепцию, что такое надежная доставка? В RabbitMQ сообщение, отправленное производителем на сервер RabbitMQ, должно пройти следующие этапы:

  1. Производитель готов доставить сообщение.
  2. Производитель устанавливает соединение с сервером RabbitMQ.
  3. Производитель отправляет сообщение.
  4. Сервер RabbitMQ получает сообщение и направляет его в указанную очередь.
  5. Сервер RabbitMQ инициирует обратный вызов, чтобы сообщить производителю, что сообщение было успешно отправлено.

Так называемая надежная доставка гарантирует, что 100% сообщения могут быть отправлены от производителя к серверу.

{6582FAF9-A46E-4239-810B-E1D6883ED070}.png.jpg

Во избежание споров добавим, что если не установлен обязательный параметр, то обратный вызов не нужно маршрутизировать заранее, а сервер подтвердит обратный вызов после получения сообщения.

Шаги 2, 3 и 5 взаимодействуют через TCP-соединения.При наличии сетевых вызовов могут возникнуть аварии, а сетевые колебания могут произойти в любое время.Будь то отключение питания во внутреннем компьютерном зале или внешний оптический кабель. разрез, сетевые аварии не могут быть предсказаны, хотя это маловероятные события, но для обработки конфиденциальных данных, таких как заказы, потеря сообщений в этих случаях недопустима.

20170716034945131.jpg

Надежная доставка сообщений в RabbitMQ

По умолчанию операция отправки сообщения не возвращает производителю никакой информации, то есть по умолчанию производитель не знает, правильно ли сообщение поступило на сервер.

Итак, как решить эту проблему?

Для этого в RabbitMQ есть несколько связанных решений:

  1. Используйте механизм транзакций, чтобы сообщить производителю, что сообщение было успешно доставлено на сервер.
  2. Реализуется через механизм подтверждения производителя.

В RabbitMQ все механизмы обеспечения надежной доставки сообщений будут иметь определенное влияние на производительность.При неправильном использовании это может оказать значительное влияние на пропускную способность.Только путем выполнения эталонных тестов производительности можно определить баланс между производительностью и надежной доставкой.

Перед использованием надежной доставки необходимо подумать над следующими вопросами:

  1. Когда сообщение публикуется, насколько важно гарантировать, что сообщение попадет в очередь?
  2. Если сообщение не может быть маршрутизировано, следует ли вернуть сообщение издателю?
  3. Если сообщение не может быть маршрутизировано, следует ли отправить его в другое место и перенаправить позже?
  4. Можно ли терять сообщения в случае сбоя сервера RabbitMQ?
  5. Должен ли RabbitMQ при обработке новых сообщений подтверждать, что он выполнил маршрутизацию и сохранение всех запросов для издателя?
  6. Могут ли издатели сообщений доставлять сообщения пакетами?
  7. Существует ли приемлемый баланс надежной доставки? Допустимо ли иметь некоторую ненадежность для повышения производительности?

Недостаточно просто учитывать баланс без учета производительности.Что касается того, как понять степень баланса, необходимо проанализировать конкретную ситуацию.Например, конфиденциальная информация, такая как данные заказа, требует большей надежности, чем общие деловые сообщения. Требования к надежности намного выше, потому что данные заказа напрямую связаны с деньгами, что может привести к прямым экономическим потерям.

Поэтому вы должны не только знать, какие решения доступны для обеспечения надежности сообщений, но и знать степень влияния каждого решения на производительность, чтобы выбрать решение.

Механизм транзакций RabbitMQ

RabbitMQ поддерживает механизм транзакций AMQP.До того, как производитель подтвердит механизм, транзакция является единственным способом гарантировать успешную доставку сообщения.

В проекте SpringBoot использование транзакций RabbitMQ на самом деле очень просто: вам нужно только объявить bean-компонент, управляемый транзакциями, и установить транзакцию RabbitTemplate в значение true.

Файл конфигурации выглядит следующим образом:

spring:
  rabbitmq:
    host: localhost
    password: guest
    username: guest
    listener:
      type: simple
      simple:
        default-requeue-rejected: false
        acknowledge-mode: manual

Сначала настроим коммутаторы и очереди, а также менеджер транзакций.

@Configuration
public class RabbitMQConfig {

    public static final String BUSINESS_EXCHANGE_NAME = "rabbitmq.tx.demo.simple.business.exchange";
    public static final String BUSINESS_QUEUEA_NAME = "rabbitmq.tx.demo.simple.business.queue";

    // 声明业务Exchange
    @Bean("businessExchange")
    public FanoutExchange businessExchange(){
        return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
    }

    // 声明业务队列
    @Bean("businessQueue")
    public Queue businessQueue(){
        return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).build();
    }

    // 声明业务队列绑定关系
    @Bean
    public Binding businessBinding(@Qualifier("businessQueue") Queue queue,
                                    @Qualifier("businessExchange") FanoutExchange exchange){
        return BindingBuilder.bind(queue).to(exchange);
    }


    /**
     * 配置启用rabbitmq事务
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }
}

Затем создайте потребителя для прослушивания сообщения, чтобы определить, было ли сообщение успешно отправлено.

@Slf4j
@Component
public class BusinessMsgConsumer {


    @RabbitListener(queues = BUSINESS_QUEUEA_NAME)
    public void receiveMsg(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("收到业务消息:{}", msg);
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
    }
}

И тогда производитель сообщения:

@Slf4j
@Component
public class BusinessMsgProducer{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    private void init() {
        rabbitTemplate.setChannelTransacted(true);
    }

    @Transactional
    public void sendMsg(String msg) {
        rabbitTemplate.convertAndSend(BUSINESS_EXCHANGE_NAME, "key", msg);
        log.info("msg:{}", msg);
        if (msg != null && msg.contains("exception"))
            throw new RuntimeException("surprise!");
        log.info("消息已发送 {}" ,msg);
    }
}

Здесь следует отметить две вещи:

  1. В методе инициализации с помощьюrabbitTemplate.setChannelTransacted(true);чтобы начать бизнес.
  2. Добавить в способ отправки сообщения@TransactionalАннотация, когда такая аномалия возникает в процессе, сообщение не будет отправлено.

Добавьте интерфейс к контроллеру для создания сообщений:

@RestController
public class BusinessController {

    @Autowired
    private BusinessMsgProducer producer;

    @RequestMapping("send")
    public void sendMsg(String msg){
        producer.sendMsg(msg);
    }
}

Давайте проверим:

msg:1
消息已发送 1
收到业务消息:1
msg:2
消息已发送 2
收到业务消息:2
msg:3
消息已发送 3
收到业务消息:3
msg:exception

Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.RuntimeException: surprise!] with root cause

java.lang.RuntimeException: surprise!
	at com.mfrank.rabbitmqdemo.producer.BusinessMsgProducer.sendMsg(BusinessMsgProducer.java:30)
    ...

когдаmsgценностьexceptionпри звонкеrabbitTemplate.convertAndSendПосле метода программа выбрасывает исключение, сообщение не отправляется, а откатывается текущей транзакцией.

Конечно, вы можете закомментировать диспетчер транзакций или закомментировать открывающую транзакцию метода инициализации, чтобы транзакция не вступила в силу.Даже если программа выйдет из строя после вызова метода отправки сообщения, сообщение будет отправлено и использовано. нормально. .

Хотя транзакция в RabbitMQ проста в использовании, ее влияние на производительность нельзя игнорировать, потому что каждая отправка транзакции блокируется в ожидании обработки сервером возвращенного результата.В режиме по умолчанию клиенту не нужно ждать, прямая отправка сделано.Кроме того, транзакционные сообщения требуют на 4 взаимодействия с сервером больше, чем обычные сообщения, а это значит, что они будут занимать больше времени на обработку.Поэтому, если есть высокие требования к скорости обработки сообщений, старайтесь не использовать механизм транзакций.

Механизм подтверждения производителя RabbitMQ

Функция подтверждения производителя в RabbitMQ является усовершенствованием спецификации AMQP, базовым, когда маршрутизируемые сообщения, опубликованные производителями во все очереди, потребляются непосредственно приложениями-потребителями или когда сообщения помещаются в очереди и сохраняются по мере необходимости. Запрос .Ack отправляется на поставщику, и если сообщение не может быть маршрутизировано, прокси-сервер отправит запрос RPC Basic.Nack, чтобы указать на сбой. Затем производитель должен решить, что делать с сообщением.

То есть с помощью механизма подтверждения производителя производитель может получить обратную связь, когда сообщение будет успешно получено сервером, и иметь возможность обработать сообщение, которое не было успешно получено.

Также очень просто включить режим подтверждения производителя RabbitMQ в Springboot только с еще одной строкой конфигурации:

spring:
  rabbitmq:
    host: localhost
    password: guest
    username: guest
    listener:
      type: simple
      simple:
        default-requeue-rejected: false
        acknowledge-mode: manual
    publisher-confirms: true

publisher-confirms: trueЭто означает, что включен режим подтверждения производителя.

Затем частично измените делегат производителя сообщения:

@Slf4j
@Component
public class BusinessMsgProducer implements RabbitTemplate.ConfirmCallback{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    private void init() {
//        rabbitTemplate.setChannelTransacted(true);
        rabbitTemplate.setConfirmCallback(this);
    }

    public void sendCustomMsg(String exchange, String msg) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        log.info("消息id:{}, msg:{}", correlationData.getId(), msg);

        rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData);
    }
    
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (b) {
            log.info("消息确认成功, id:{}", id);
        } else {
            log.error("消息未成功投递, id:{}, cause:{}", id, s);
        }
    }
}

Пусть производитель наследует отRabbitTemplate.ConfirmCallbackкласс, а затем реализовать егоconfirmметод, который вы можете использовать для получения обратных вызовов сервера.

Важно отметить, что код корректируется и при отправке сообщения:

CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData);

Здесь мы устанавливаем идентификатор сообщения для сообщения, чтобы идентификатор можно было использовать для определения того, какое сообщение вызывается обратно во время обратного вызова.Потому что в функции обратного вызова мы не можем напрямую получить содержимое сообщения, поэтому нам нужно временно сначала сохраните сообщение.В зависимости от важности сообщения вы можете рассмотреть возможность использования локального кеша или сохранения его в Redis или Mysql, а затем обновить его состояние или удалить его из кеша при обратном вызове и, наконец, использовать временная задача для сброса сообщения, которое не было отправлено в течение определенного периода времени.

Ниже приведена картинка, которую я украл, простите меня за лень и нежелание рисовать [ручную собачью голову]:

5b65729e0001439305000294.jpg

Кроме того, следует отметить, что если сообщение публикуется на несуществующем обмене, канал, используемый для публикации, будет закрыт RabbitMQ.

Кроме того, механизм подтверждения производителя не работает с транзакциями и является облегченной альтернативой транзакциям. Поскольку режимы подтверждения транзакций и подтверждения издателя — это оба режима, которые необходимо сначала согласовать с сервером, режим включен для канала, и эти два режима нельзя использовать для одного и того же канала одновременно.

В режиме подтверждения производителя подтверждение сообщений может быть асинхронным и пакетным, поэтому производительность будет выше, чем при использовании транзакций.

Как механизм транзакций, так и механизм подтверждения производителя могут гарантировать, что сообщение будет правильно отправлено в RabbitMQ. «Правильно отправлено в RabbitMQ» здесь означает, что сообщение успешно получено обменом, но если очередь, которая может получить сообщение, не может быть найдено, эти сообщения также теряются. А как быть с теми сообщениями, которые не могут быть доставлены в очередь, будет рассказано в следующей статье.

Заключение

Итак, когда компьютерный зал компании «выключается», что делать с теми сообщениями, которые необходимо отправить? Я верю, что после прочтения вышеизложенного у вас уже есть ответ в сердце.

Вообще говоря, этот «сбой питания» не будет длиться долго, обычно от нескольких минут до получаса, и его можно будет быстро восстановить, поэтому, если это важное сообщение, его можно сохранить в базе данных и если это неважное сообщение, его можно использовать, Redis сохраняет, конечно, в соответствии с порядком величины сообщения.

Если объем сообщения относительно велик, вы можете рассмотреть возможность отправки сообщения в очередь недоставленных сообщений другого кластера.На самом деле у компании есть два кластера RabbitMQ, поэтому, когда один кластер недоступен, вы можете отправлять сообщения в другой кластер, эммм, Если две компьютерные комнаты выключены, когда я не сказал.

111.png.jpg