Четыре стратегии для обеспечения надежности отправки сообщений RabbitMQ! Что вы используете?

Java задняя часть RabbitMQ
Четыре стратегии для обеспечения надежности отправки сообщений RabbitMQ! Что вы используете?

@[toc] Микросервисы могут быть разработаны как микросервисы, управляемые сообщениями, а реагирующие системы также могут быть основаны на промежуточном программном обеспечении сообщений.С этой точки зрения промежуточное программное обеспечение сообщений действительно важно при разработке интернет-приложений.

Сегодня на примере RabbitMQ Сонг Гэ пришел поговорить с вами о надежности отправки сообщения в середине сообщения.

Обратите внимание, что в следующем содержании я в основном обсуждаю с вами, как гарантировать, что производитель сообщения успешно отправляет сообщение, и не затрагивает проблему потребления сообщения.

1. Механизм отправки сообщений RabbitMQ

Как мы все знаем, отправка сообщения в RabbitMQ вводит понятие Exchange (обмена), отправка сообщения сначала поступает на биржу, а затем по установленным правилам маршрутизации биржа направляет сообщение в разные Queues (очереди) , а затем потреблять разными потребителями.

Общий процесс таков, поэтому для обеспечения надежности отправки сообщения оно в основном подтверждается с двух сторон:

  1. Сообщение успешно доставлено на Exchange
  2. Сообщение успешно доставлено в очередь

Если эти два шага могут быть подтверждены, то мы можем считать, что сообщение отправлено успешно.

Если есть проблема на любом из этих двух шагов, значит, сообщение не было успешно доставлено. В это время нам, возможно, придется повторить попытку и использовать другие средства для повторной отправки сообщения. После нескольких попыток, если сообщение все еще не может быть доставлено, оно может быть необходимо вмешательство человека.

После приведенного выше анализа мы можем подтвердить, что для успешной отправки сообщения нам нужно сделать всего три вещи:

  1. Убедитесь, что сообщение поступает в Exchange.
  2. Убедитесь, что сообщение поступает в очередь.
  3. Включите запланированные задачи для доставки сообщений, которые не отправляются регулярно.

2. Усилия RabbitMQ

Три шага предложенных выше, третий шаг нужно реализовать самим, а первые два шага у RabbitMQ есть готовые решения.

Как я могу гарантировать, что сообщение успешно дойдет до RabbitMQ? RabbitMQ дает два варианта:

  1. Механизм открытых транзакций
  2. Механизм подтверждения отправителя

Это два разных решения. Их нельзя включить одновременно. Можно выбрать только одно из них. Если оба включены одновременно, будет выдано сообщение об ошибке:

Давайте рассмотрим их отдельно. Все следующие кейсы разработаны в Spring Boot, и соответствующий исходный код можно скачать в конце статьи.

2.1 Включите механизм транзакций

Способ открытия механизма транзакций RabbitMQ в Spring Boot следующий:

Во-первых, вам необходимо предоставить диспетчер транзакций следующим образом:

@Bean
RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {
    return new RabbitTransactionManager(connectionFactory);
}

Затем сделайте две вещи в генераторе сообщений: добавьте аннотации транзакций и установите канал связи в режим транзакций:

@Service
public class MsgService {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Transactional
    public void send() {
        rabbitTemplate.setChannelTransacted(true);
        rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,RabbitConfig.JAVABOY_QUEUE_NAME,"hello rabbitmq!".getBytes());
        int i = 1 / 0;
    }
}

Обратите внимание на две вещи:

  1. Добавьте способ отправки сообщения@TransactionalАннотации помечают транзакции.
  2. Вызовите метод setChannelTransacted со значением true, чтобы включить режим транзакций.

Вот и все.

В приведенном выше случае у нас есть 1/0 в конце, что должно вызвать исключение во время выполнения, мы можем попытаться запустить метод и обнаружить, что сообщение не было отправлено успешно.

Когда мы включаем режим транзакций, у производителей RabbitMQ есть еще четыре шага для отправки сообщений:

  1. Клиент делает запрос на перевод канала в режим транзакций.
  2. Сервер отвечает, соглашаясь перевести канал в режим транзакций.
  3. Клиент отправляет сообщение.
  4. Клиент фиксирует транзакцию.
  5. Сервер отвечает, подтверждая транзакцию.

Вышеуказанные шаги, кроме третьего шага, уже есть, а остальные шаги лишние без причины. Итак, вы видите, что режим транзакций на самом деле немного неэффективен, и это не оптимальное решение. Мы можем подумать о том, какие проекты будут использовать промежуточное программное обеспечение сообщений? Вообще говоря, это некоторые проекты с высокой степенью параллелизма, и производительность параллелизма особенно важна в настоящее время.

Поэтому RabbitMQ также предоставляет механизм подтверждения отправителя (подтверждение издателя), чтобы убедиться, что сообщение отправлено успешно.Таким образом, производительность намного выше, чем в режиме транзакций.Давайте посмотрим.

2.2 Механизм подтверждения отправителя

2.2.1 Обработка одного сообщения

Во-первых, мы удаляем код о транзакции прямо сейчас, а затем настраиваем и включаем механизм подтверждения отправителя сообщения в application.properties следующим образом:

spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true

Первая строка является обратным вызовом подтверждения для поступления сообщения конфигурации на биржу, а вторая строка — обратным вызовом для поступления сообщения конфигурации в очередь.

Конфигурация атрибута первой строки имеет три значения:

  1. нет: указывает, что режим подтверждения выпуска отключен, что является значением по умолчанию.
  2. коррелированный: указывает метод обратного вызова, который будет активирован после успешной публикации сообщения на бирже.
  3. простой: похож на коррелированный и поддерживаетwaitForConfirms()иwaitForConfirmsOrDie()вызов метода.

Далее нам нужно открыть два монитора, конкретная конфигурация выглядит следующим образом:

@Configuration
public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
    public static final String JAVABOY_EXCHANGE_NAME = "javaboy_exchange_name";
    public static final String JAVABOY_QUEUE_NAME = "javaboy_queue_name";
    private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
    @Autowired
    RabbitTemplate rabbitTemplate;
    @Bean
    Queue queue() {
        return new Queue(JAVABOY_QUEUE_NAME);
    }
    @Bean
    DirectExchange directExchange() {
        return new DirectExchange(JAVABOY_EXCHANGE_NAME);
    }
    @Bean
    Binding binding() {
        return BindingBuilder.bind(queue())
                .to(directExchange())
                .with(JAVABOY_QUEUE_NAME);
    }

    @PostConstruct
    public void initRabbitTemplate() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            logger.info("{}:消息成功到达交换器",correlationData.getId());
        }else{
            logger.error("{}:消息发送失败", correlationData.getId());
        }
    }

    @Override
    public void returnedMessage(ReturnedMessage returned) {
        logger.error("{}:消息未成功路由到队列",returned.getMessage().getMessageProperties().getMessageId());
    }
}

Относительно этого класса конфигурации скажу следующее:

  1. Определить класс конфигурации, реализоватьRabbitTemplate.ConfirmCallbackиRabbitTemplate.ReturnsCallbackДва интерфейса, эти два интерфейса, обратный вызов первого используется для определения того, что сообщение прибыло на биржу, а второй вызывается, когда сообщение не может быть направлено в очередь.
  2. Определите метод initRabbitTemplate и добавьте аннотацию @PostConstruct, а также настройте два обратных вызова для rabbitTemplate в этом методе.

Вот и все.

Далее мы тестируем отправку сообщения.

Сначала мы пытаемся отправить сообщение на несуществующий обмен следующим образом:

rabbitTemplate.convertAndSend("RabbitConfig.JAVABOY_EXCHANGE_NAME",RabbitConfig.JAVABOY_QUEUE_NAME,"hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));

Обратите внимание, что первый параметр — это строка, а не переменная, переключатель не существует, и консоль выдаст следующую ошибку:

Далее даем реальный обмен, но очередь, которой не существует, вот так:

rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,"RabbitConfig.JAVABOY_QUEUE_NAME","hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));

Обратите внимание, что второй параметр теперь является строкой, а не переменной.

Видно, что хотя сообщение успешно дошло до биржи, оно не было успешно перенаправлено в очередь (поскольку очереди не существует).

Это отправка сообщения, давайте рассмотрим пакетную отправку сообщений.

2.2.2 Пакетная обработка сообщений

Если это пакетная обработка сообщений, контроль обратного вызова для успешной отправки такой же, и здесь повторяться не будет.

Это шаблон подтверждения издателя.

По сравнению с транзакциями пропускная способность сообщений в этом режиме будет значительно улучшена.

3. Повторите попытку в случае неудачи

Есть два случая неудачной повторной попытки: первый — MQ вообще не найден, а другой — MQ найден, но сообщение не отправлено.

Давайте рассмотрим два типа повторных попыток отдельно.

3.1 Встроенный механизм повторных попыток

Механизм транзакций и механизм подтверждения отправителя, упомянутые выше, позволяют отправителю подтвердить успешную отправку сообщения. Если отправитель не может подключиться к MQ с самого начала, в Spring Boot также есть соответствующий механизм повторных попыток, но этот механизм повторных попыток не имеет ничего общего с самим MQ. Это делается с помощью механизма повторных попыток в Spring. Конкретная конфигурация следующая: следует:

spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=1000ms
spring.rabbitmq.template.retry.max-attempts=10
spring.rabbitmq.template.retry.max-interval=10000ms
spring.rabbitmq.template.retry.multiplier=2

Смысл конфигурации сверху вниз:

  • Включить механизм повтора.
  • Интервал запуска повтора.
  • Максимальное количество повторных попыток.
  • Максимальный интервал между попытками.
  • Множитель интервального времени. (Настроенный здесь множитель интервала равен 2, затем первый интервал равен 1 секунде, второй интервал повторных попыток равен 2 секундам, третий раз равен 4 секундам и т. д.)

После завершения настройки снова запустите проект Spring Boot, а затем отключите MQ. Если вы попытаетесь отправить сообщение в это время, оно не будет отправлено, что приведет к автоматическому повтору.

3.2 Повторная попытка обслуживания

Повторная попытка обслуживания в основном предназначена для случая, когда сообщение не достигает коммутатора.

Если сообщение не доходит до обмена успешно, согласно нашему объяснению во втором разделе, будет запущен обратный вызов ошибки отправки сообщения, В этом обратном вызове мы можем поднять шум!

Общая идея такова:

  1. Сначала создайте таблицу для записи сообщений, отправляемых промежуточному программному обеспечению, например:

Каждый раз, когда сообщение отправляется, запись добавляется в базу данных. Поля здесь хорошо понятны, и я добавлю три:

  • статус: Указывает статус сообщения. Имеется три значения: 0, 1 и 2 означают, что сообщение отправляется, сообщение было успешно отправлено и сообщение не удалось отправить.
  • tryTime: указывает время первой повторной попытки сообщения (после отправки сообщения оно не было успешно отправлено во время tryTime, и в это время можно начать повторную попытку).
  • count: указывает количество повторных попыток сообщения.

Остальные поля хорошо изучены, поэтому я не буду вдаваться в подробности по отдельности.

  1. Когда сообщение отправлено, мы сохраняем запись об отправке сообщения в таблице и устанавливаем статус состояния на 0, а tryTime на 1 минуту позже.
  2. В методе обратного вызова с подтверждением, если получен обратный вызов о том, что сообщение отправлено успешно, статус сообщения устанавливается равным 1 (для сообщения при отправке устанавливается msgId, а msgId используется для уникальной блокировки сообщение, когда сообщение отправлено успешно. ).
  3. Кроме того, откройте временную задачу, и временная задача будет отправляться в базу данных каждые 10 с для извлечения сообщений из базы данных, в частности, для извлечения тех записей, статус которых равен 0, а время tryTime истекло.После получения этих сообщений сначала определите превысило ли количество повторных попыток 3 раза, если более 3 раз, изменить статус сообщения на 2, что означает, что сообщение не может быть отправлено и не будет повторной попытки. Для записей с не более чем 3 повторными попытками сообщение отправляется повторно и значение его счетчика +1.

Общая идея та же, что и выше. Сонге не будет давать здесь код. Отправка почты в vhr Сонге обрабатывается таким образом. Полный код можно найти в проекте vhr (github.com/lenve/vhr).

Конечно, у этого подхода есть два недостатка:

  1. Переход к базе данных может замедлить Qos MQ, но иногда нам не нужны высокие Qos MQ, поэтому приложение зависит от конкретной ситуации.
  2. Согласно изложенной выше идее, одно и то же сообщение может быть отправлено несколько раз, но это не проблема, мы можем решить проблему идемпотентности при потреблении сообщений.

Конечно, каждый также должен обращать внимание на то, чтобы сообщение было отправлено на 100% успешно, в зависимости от конкретной ситуации.

4. Резюме

Что ж, вот некоторые распространенные проблемы с производителями сообщений и соответствующие решения.В следующей статье Сонг Гэ обсудит с вами, как обеспечить успешное потребление сообщений и решить проблему идемпотентности.

Соответствующий исходный код, использованный в этой статье, можно скачать здесь:GitHub.com/Len VE/Java не…

Категории