Представьте Spring Boot + RabbitMQ для подробной реализации очереди задержки.

Spring Boot Java RabbitMQ

задний план

Что такое очередь задержки?

Как следует из названия, отложенная очередь — это очередь, в которой сообщения, поступающие в очередь, будут задержаны для использования. В общих очередях, как только сообщения поставлены в очередь, они будут немедленно обработаны потребителями.

Что может сделать очередь задержки?

Очереди с задержкой в ​​основном используются в сценариях, требующих отложенной работы. Наиболее распространены следующие два сценария:

  1. отсроченное потребление. Например:
    • После того, как пользователь формирует заказ, требуется некоторое время для проверки статуса оплаты заказа.Если заказ не был оплачен, заказ необходимо закрыть вовремя.
    • После успешной регистрации пользователя требуется период времени, например неделя, для проверки использования пользователем. Если активность пользователя оказывается низкой, пользователю будет отправлено электронное письмо или SMS с напоминанием об использовании.
  2. Отложить повтор. Например, потребитель не может использовать сообщение из очереди, но хочет автоматически повторить попытку после задержки.

Если мы не используем отложенную очередь, то мы можем сделать это только с помощью опросного сканера. Это решение не является ни элегантным, ни удобным для использования разработчиками в виде единого сервиса. Но с очередью задержки мы можем сделать это легко.

Как добиться?

Не волнуйтесь, далее мы подробно расскажем, как воспользоваться преимуществамиSpring BootдобавлятьRabbitMQреализовать очередь с задержкой.

Пример кода, представленный в этой статье, был отправлен в репозиторий Github:GitHub.com/love LCP/Нет…

Реализовать идеи

Прежде чем представить конкретные идеи реализации, давайте сначала представим две функции RabbitMQ: одна — Time-To-Live Extensions, а другая — обмен недоставленными письмами.

Time-To-Live Extensions

RabbitMQ позволяет нам устанавливать TTL (время жизни) для сообщений или очередей, то есть время истечения срока действия. TTL указывает максимальное время в миллисекундах, в течение которого сообщение может оставаться в очереди. То есть, когда сообщение установлено с TTL или когда сообщение входит в очередь с установленным TTL, сообщение «умирает» через TTL секунд и становится недоставленным письмом. Если настроены и TTL сообщения, и TTL очереди, будет использоваться меньшее значение. Для получения дополнительной информации см.официальная документация.

Dead Letter Exchange

Как только что упоминалось, сообщение с установленным TTL станет недоставленным письмом после истечения срока его действия. На самом деле в RabbitMQ существует три формы сообщений «смерти»:

  1. Сообщение было отклонено. Вызвав basic.reject или basic.nack и установив для параметра requeue значение false.
  2. Срок действия сообщения истек, поскольку был установлен срок жизни.
  3. Сообщение попало в очередь, которая достигла максимальной длины.

Если очередь настроена на обмен недоставленными письмами (DLX), то эти недоставленные письма будут повторно опубликованы в обмене недоставленными письмами и перенаправлены в другие очереди через обмен недоставленными письмами. Для получения дополнительной информации см.официальная документация.

блок-схема

Вы, должно быть, думали о том, как объединить функции TTL и DLX RabbitMQ для реализации очереди с задержкой.

Для приведенных выше двух сценариев очереди с задержкой у нас есть следующие две блок-схемы:

отсроченное потребление

Отложенное потребление — наиболее распространенный шаблон использования отложенных очередей. Как показано на рисунке ниже, сообщение, сгенерированное производителем, сначала попадет в буферную очередь (красная очередь на рисунке). Благодаря расширению TTL, предоставляемому RabbitMQ, эти сообщения будут установлены на срок действия, то есть на время задержки потребления. После истечения срока действия сообщений эти сообщения будут перенаправлены в очередь фактического потребления (синяя очередь на рисунке) через настроенный DLX, чтобы добиться эффекта отложенного потребления.

отложенный повтор

Отложенный повтор — это тип отложенного потребления, но структура этого режима отличается от обычной блок-схемы отложенного потребления, поэтому он вводится отдельно.

Как показано на рисунке ниже, потребитель обнаруживает, что обработка сообщения ненормальна, например, ненормальность, вызванная колебаниями сети. Так что, если вы не будете ждать какое-то время и попытаетесь снова напрямую, очень вероятно, что в течение этого периода ничего не получится, что приведет к определенной трате ресурсов. Затем мы можем сначала поместить его в буферную очередь (красная очередь на рисунке) и подождать, пока сообщение не войдет в фактическую очередь потребления (синяя очередь на рисунке) после времени задержки. колебания обычно восстанавливаются, и эти сообщения можно нормально использовать.

Код

Далее мы расскажем, как реализовать очередь задержки на основе RabbitMQ в Spring Boot. Мы предполагаем, что читатель уже имеет базовые знания о Spring Boot и RabbitMQ. Если вы хотите быстро понять основы Spring Boot, вы можете обратиться к тому, что я писал ранеестатья.

проект инициализации

Сначала мы создаем проект Spring Boot в Intellij и добавляемspring-boot-starter-amqpрасширение.

Настроить очередь

Из приведенной выше блок-схемы видно, что для реализации очереди с задержкой требуется буферная очередь и фактическая очередь потребления. И поскольку в RabbitMQ у нас есть два метода настройки для истечения срока действия сообщения, поэтому в коде мы настраиваем всего три очереди:

  • delay_queue_per_message_ttl: TTL, настроенный для очереди буфера сообщений.
  • delay_queue_per_queue_ttl: буферная очередь с настроенным сроком жизни очереди.
  • delay_process_queue: Фактическая очередь потребления.

Мы настраиваем вышеуказанную очередь как Bean с помощью Java Config. Поскольку мы добавилиspring-boot-starter-amqpВ качестве расширения Spring Boot автоматически создаст эти очереди на основе нашей конфигурации при запуске. Чтобы облегчить следующие тесты, мы настраиваем DLX delay_queue_per_message_ttl и delay_queue_per_queue_ttl, чтобы они были одинаковыми, а сообщения с истекшим сроком действия будут пересылаться в delay_process_queue через DLX.

delay_queue_per_message_ttl

Сначала введите код конфигурации delay_queue_per_message_ttl:

@Bean
Queue delayQueuePerMessageTTL() {
    return QueueBuilder.durable(DELAY_QUEUE_PER_MESSAGE_TTL_NAME)
                       .withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX,dead letter发送到的exchange
                       .withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter携带的routing key
                       .build();
}

в,x-dead-letter-exchangeобъявляет имя DLX, на которое пересылаются недоставленные сообщения в очереди,x-dead-letter-routing-keyОбъявляет имя ключа маршрутизации, которое эти мертвые буквы несут при пересылке.

delay_queue_per_queue_ttl

Точно так же код конфигурации для delay_queue_per_queue_ttl:

@Bean
Queue delayQueuePerQueueTTL() {
    return QueueBuilder.durable(DELAY_QUEUE_PER_QUEUE_TTL_NAME)
                       .withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX
                       .withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter携带的routing key
                       .withArgument("x-message-ttl", QUEUE_EXPIRATION) // 设置队列的过期时间
                       .build();
}

Конфигурация очереди delay_queue_per_queue_ttl на единицу больше, чем конфигурация очереди delay_queue_per_message_ttl.x-message-ttl, который используется для установки времени истечения срока действия очереди.

delay_process_queue

Конфигурация delay_process_queue самая простая:

@Bean
Queue delayProcessQueue() {
    return QueueBuilder.durable(DELAY_PROCESS_QUEUE_NAME)
                       .build();
}

Настроить обмен

Настроить DLX

Для начала нам нужно настроить DLX, код такой:

@Bean
DirectExchange delayExchange() {
    return new DirectExchange(DELAY_EXCHANGE_NAME);
}

Затем привяжите DLX к фактической очереди потребления, а именно delay_process_queue. Таким образом, все мертвые письма будут пересылаться в delay_process_queue через DLX:

@Bean
Binding dlxBinding(Queue delayProcessQueue, DirectExchange delayExchange) {
    return BindingBuilder.bind(delayProcessQueue)
                         .to(delayExchange)
                         .with(DELAY_PROCESS_QUEUE_NAME);
}

Настройка Exchange, требуемой для отложенной повторной попытки

Из блок-схемы отложенных повторных попыток мы видим, что после сбоя обработки сообщения нам необходимо переслать сообщение в очередь буфера, поэтому очередь буфера также должна быть привязана к Exchange.В этом примере мы используем delay_process_per_queue_ttl в качестве очереди буфера при отложенной повторной попытке.. Как настроить конкретный код, я не буду здесь вдаваться в подробности, вы можете обратиться ко мнеGithubкод в .

определить потребителей

Создаем простейшего потребителя ProcessReceiver, этот потребитель слушает очередь delay_process_queue, для полученных сообщений он будет:

  • Если тело сообщения в сообщении не равно FAIL_MESSAGE, будет выведено тело сообщения.
  • Если тело сообщения в сообщении оказывается FAIL_MESSAGE, то оно имитирует создание исключения, а затем перенаправляет сообщение в буферную очередь (соответствует сценарию отложенной повторной попытки).

Кроме того, нам также необходимо создать новый прослушивающий контейнер для хранения потребителей Код выглядит следующим образом:

@Bean
SimpleMessageListenerContainer processContainer(ConnectionFactory connectionFactory, ProcessReceiver processReceiver) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(DELAY_PROCESS_QUEUE_NAME); // 监听delay_process_queue
    container.setMessageListener(new MessageListenerAdapter(processReceiver));
    return container;
}

На данный момент наш предварительно сконфигурированный код написан, а затем нам нужно написать тестовые примеры для проверки нашей очереди задержки.

Пишите тестовые случаи

Сценарии отложенного потребления

Сначала мы напишем тестовый код, который проверяет значение TTL для сообщения.

мы используемspring-rabbitКласс RabbitTemplate, предоставляемый в пакете, используется для отправки сообщений. Поскольку мы добавилиspring-boot-starter-amqpExtension, Spring Boot автоматически загрузит RabbitTemplate как bean-компонент в контейнер во время инициализации.

Проблема с отправкой сообщения решена, так как выставить TTL для каждого сообщения? Здесь нам нужно использовать MessagePostProcessor. MessagePostProcessor обычно используется для установки заголовка сообщения и свойств сообщения. Мы создаем новый класс ExpirationMessagePostProcessor, который будет отвечать за установку свойства TTL сообщения:

/**
 * 设置消息的失效时间
 */
public class ExpirationMessagePostProcessor implements MessagePostProcessor {
    private final Long ttl; // 毫秒

    public ExpirationMessagePostProcessor(Long ttl) {
        this.ttl = ttl;
    }

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties()
               .setExpiration(ttl.toString()); // 设置per-message的失效时间
        return message;
    }
}

Затем при вызове метода convertAndSend RabbitTemplate передайте ExpirationMessagePostPorcessor. Мы отправляем 3 сообщения в буферную очередь со временем истечения 1 секунда, 2 секунды и 3 секунды. Конкретный код выглядит следующим образом:

@Test
public void testDelayQueuePerMessageTTL() throws InterruptedException {
    ProcessReceiver.latch = new CountDownLatch(3);
    for (int i = 1; i <= 3; i++) {
        long expiration = i * 1000;
        rabbitTemplate.convertAndSend(QueueConfig.DELAY_QUEUE_PER_MESSAGE_TTL_NAME,
                (Object) ("Message From delay_queue_per_message_ttl with expiration " + expiration), new ExpirationMessagePostProcessor(expiration));
    }
    ProcessReceiver.latch.await();
}

Внимательные друзья наверняка спросят, а зачем добавлять в код CountDownLatch? Это связано с тем, что если нет защелки для блокировки тестового метода, тестовый пример завершится напрямую, программа завершится, и мы не увидим производительность отложенного потребления сообщений.

Затем аналогичным образом код для проверки настройки TTL в очереди выглядит следующим образом:

@Test
public void testDelayQueuePerQueueTTL() throws InterruptedException {
    ProcessReceiver.latch = new CountDownLatch(3);
    for (int i = 1; i <= 3; i++) {
        rabbitTemplate.convertAndSend(QueueConfig.DELAY_QUEUE_PER_QUEUE_TTL_NAME,
                "Message From delay_queue_per_queue_ttl with expiration " + QueueConfig.QUEUE_EXPIRATION);
    }
    ProcessReceiver.latch.await();
}

Мы отправляем 3 сообщения в буферную очередь. Теоретически эти 3 сообщения истекают одновременно через 4 секунды.

Сценарий отложенной повторной попытки

Нам также необходимо протестировать сценарий отложенного повтора.

@Test
public void testFailMessage() throws InterruptedException {
    ProcessReceiver.latch = new CountDownLatch(6);
    for (int i = 1; i <= 3; i++) {
        rabbitTemplate.convertAndSend(QueueConfig.DELAY_PROCESS_QUEUE_NAME, ProcessReceiver.FAIL_MESSAGE);
    }
    ProcessReceiver.latch.await();
}

Мы отправляем 3 сообщения в delay_process_queue, которые вызовут FAIL, и теоретически эти 3 сообщения будут автоматически повторены через 4 секунды.

Посмотреть результаты теста

Сценарии отложенного потребления

Для проверки сценария отложенного потребления мы разделяем параметр TTL для сообщения и параметр TTL для очереди. Во-первых, давайте посмотрим на результаты тестирования настройки TTL в сообщении:

На рисунке выше видно, что ProcessReceiver получает сообщения через 1 секунду, 2 секунды и 3 секунды соответственно. Результаты теста показывают, что сообщения не только потребляются с задержкой, но и время задержки каждого сообщения можно настроить. Тест сценария отложенного потребления с установленным сроком жизни сообщения прошел успешно.
Затем результат проверки настройки TTL в очереди выглядит следующим образом:

Как видно из изображения выше, ProcessReceiver получил 3 сообщения одновременно с задержкой в ​​4 секунды. Результаты теста показывают, что сообщение не только потребляется с опозданием, но также и то, что время истечения срока действия сообщения фиксируется, когда для очереди установлено значение TTL. Тест сценария отложенного потребления с установленным TTL в очереди прошел успешно.

Сценарий отложенной повторной попытки

Далее давайте посмотрим на результаты теста отложенного повтора:

ProcessReceiver сначала получил 3 сообщения, которые вызовут FAIL, а затем переместил его в буферную очередь.Через 4 секунды он получил 3 сообщения только сейчас. Тест сценария отложенного повтора прошел успешно.

Суммировать

В этой статье впервые представлены концепция и использование отложенных очередей, а также подробно объясняется, как реализовать отложенную очередь с помощью Spring Boot и RabbitMQ с помощью кода. Я надеюсь, что эта статья сможет вдохновить и помочь вам в вашей обычной учебе и работе. Если у вас есть какие-либо комментарии или вопросы, пожалуйста, оставьте сообщение под комментариями, спасибо!

Эта статья была впервые опубликована впоцелуй с.org/2017/11/18/…
Комментарии и перепечатки приветствуются!
Подпишитесь на общедоступную учетную запись WeChat ниже, чтобы получать информацию из первых рук!