Реализовать очередь сообщений с помощью Redis (потребление в реальном времени + механизм подтверждения)

Redis задняя часть TCP/IP RabbitMQ

очередь сообщений

Сначала сделайте простой импорт.

MQ в основном используется для:

  • разрозненные приложения,
  • Асинхронное сообщение
  • Сглаживание пиков трафика и заполнение впадин

В настоящее время больше используются ActiveMQ, RabbitMQ, ZeroMQ, Kafka, MetaMQ, RocketMQ и т.д.
В интернет-ресурсах есть подробные пояснения к каждой ситуации, поэтому здесь я не буду вдаваться в подробности. эта статья
Представлен только процесс использования Redis для реализации облегченного MQ.

Зачем использовать Redis для реализации облегченного MQ?

В процессе реализации бизнеса, даже если нет большого объема трафика, почти всегда доступна развязка и асинхронность, в это время особенно важен MQ. Но в то же время MQ тоже тяжелый компонент.Например, если мы используем RabbitMQ, мы должны построить для него сервер.В то же время, если мы хотим учитывать доступность, мы должны построить кластер для сервера, и если есть проблема в производстве, нам нужно найти функцию. В процессе развития малого и среднего бизнеса другие целые реализации бизнеса могут не иметь такого веса. Службы с чрезмерно тяжелыми компонентами умножают рабочую нагрузку.
К счастью,Структура данных списка, предоставляемая Redis, очень подходит для очередей сообщений.
Но как добиться мгновенного потребления? Как реализовать механизм ack? Это ключи к реализации.

Как добиться мгновенного потребления?

Метод, циркулирующий в Интернете, заключается в использовании операции списка BLPOP или BRPOP в Redis, то есть блокировании всплывающего окна списка. Давайте посмотрим, как используются блокирующие всплывающие окна:

BRPOP key [key ...] timeout

Описание этой команды:

1. Когда в заданном списке нет элементов для извлечения, соединение будет заблокировано командой BRPOP до тех пор, пока не истечет время ожидания или не будет найден элемент, который можно извлечь. 2. Когда задано несколько ключевых параметров, каждый список проверяется в порядке ключей параметров, и всплывает хвостовой элемент первого непустого списка.

Кроме того, BRPOP имеет те же характеристики, что и BLPOP, за исключением того, что положение всплывающего элемента отличается от положения BLPOP.

С этой точки зрения,Блокирование всплывающих списков имеет две характеристики:

1. Если задачи нет в списке, соединение будет заблокировано 2. Есть таймаут на блокировку соединения

На данный момент проблема очевидна.Как здесь установить таймаут?Можно ли гарантировать, что только один останется заблокированным, пока сообщение не попадет в очередь? Это, очевидно, трудно сделать, потому что они не связаны друг с другом. К счастью, Redis также поддерживает Pub/Sub (публикацию/подписку). Когда сообщение A попадает в список очереди, в канал канала публикуется (PUBLISH) сообщение B. В это время воркер, подписавшийся на канал, получает сообщение B. Зная, что в списке есть сообщение A, он может зациклить lpop или rpop, чтобы использовать список новостей. Процесс выглядит следующим образом:

Рабочий процесс может быть отдельным потоком или независимой службой, которая действует как потребитель и бизнес-процессор. Пример приведен ниже.

Экземпляр мгновенного потребления

Пример сценария: рабочему процессу необходимо синхронизировать файлы, и он синхронизируется сразу после создания файла.

Сначала запустите поток от имени рабочего, чтобы подписаться на канал канала:

@Service
public class SubscribeService {

    @Resource
    private RedisService redisService;
    @Resource
    private SynListener synListener;//订阅者
    
    @PostConstruct
    public void subscribe() {
        new Thread(new Runnable() {

            @Override
            public void run() {
                LogCvt.info("服务已订阅频道:{}", channel);
                redisService.subscribe(synListener, channel);
            }
        }).start();

    }
}

SynListener в коде — это объявленный подписчик, а канал — это имя канала, на который подписана Конкретная логика подписки выглядит следующим образом:

@Service
public class SynListener extends JedisPubSub {

    @Resource
    private DispatchMessageHandler dispatchMessageHandler;

    @Override
    public void onMessage(String channel, String message) {
        LogCvt.info("channel:{},receives message:{}",channel,message);
        try {
            //处理业务(同步文件)
            dispatchMessageHandler.synFile();
        } catch (Exception e) {
            LogCvt.error(e.getMessage(),e);
        }
    }
}

При обработке бизнеса перейдите к списку для использования сообщений:

@Service
public class DispatchMessageHandler {
    
    @Resource
    private RedisService redisService;
    @Resource
    private MessageHandler messageHandler;
    
    public void synFile(){
        while(true){
            try {
                String message = redisService.lpop(RedisKeyUtil.syn_file_queue_key());
                if (null == message){
                    break;
                }
                Thread.currentThread().setName(Tools.uuid());
                // 队列数据处理
                messageHandler.synfile(message);
            } catch (Exception e) {
                LogCvt.error(e.getMessage(),e);
            }
        }
    }

}

Таким образом, мы достигаем цели потребления сообщений в реальном времени.

Как реализовать механизм ack?

ack, то есть механизм подтверждения сообщения (Acknowledge).

Сначала посмотрите на механизм ack RabbitMQ:

  • Издатель уведомляет Потребителя о сообщении, и, если Потребитель завершил обработку задачи, он отправит сообщение ACK Брокеру, чтобы сообщить, что сообщение успешно обработано и может быть удалено из очереди. Если Потребитель не отправит обратно сообщение ACK, Брокер посчитает, что обработка сообщения не удалась, и передаст сообщение и последующие сообщения другим Потребителям для обработки (флаг повторной доставки установлен в значение true).
  • Этот механизм подтверждения аналогичен механизму, установленному протоколом TCP/IP. Разница в том, что TCP/IP необходимо пройти три рукопожатия для установления соединения, а RabbitMQ требуется только один ACK.
  • Стоит отметить, что RabbitMQ будет перераспределять сообщение другим Потребителям тогда и только тогда, когда обнаружит, что сообщение ACK не было отправлено и соединение Потребителя разорвано, поэтому нет необходимости беспокоиться о том, что время обработки сообщения слишком велико и перераспределяется.

Так что же нам делать, когда мы используем Redis для реализации механизма ack очереди сообщений? Два момента, на которые следует обратить внимание:

  1. После сбоя обработки работы сообщение должно быть возвращено в исходную очередь ожидания.
  2. Если рабочий процесс зависает, сообщение также следует откатить в исходную очередь ожидания.

Первый пункт выше можно сделать в деле, то есть сообщение отката выполняется после сбоя.

План реализации

(Это решение в основном решает ситуацию, когда рабочий зависает)

  1. Поддерживаются две очереди: очередь ожидания и очередь выполнения.
  2. рабочий определяется как ThreadPool.
  3. После исключения из ожидающей очереди рабочий процесс назначает поток для обработки сообщения — добавляет текущую метку времени и имя текущего потока к целевому сообщению, а затем ставит очередь выполнения.
  4. Включите запланированную задачу, сканируйте очередь выполнения через равные промежутки времени и проверяйте метку времени каждого элемента. Если время ожидания истекло, ThreadPoolExecutor рабочего процесса проверит, существует ли поток. Если он существует, отмените выполнение текущей задачи и выполните откат сделка. Наконец, извлеките задачу из очереди выполнения, а затем снова поместите ее в очередь ожидания.
  5. В потоке работника, если обработка дела не удалась, он будет активно откатываться, удалять задачу из очереди выполнения и снова помещать ее в очередь ожидания.

Суммировать

Redis как очередь сообщений имеет большие ограничения. Из-за своих основных функций и применений он может реализовывать только облегченные очереди сообщений. Написано в конце: Нет абсолютно лучшей технологии, есть только самая удобная для бизнеса технология, предназначенная для всех разработчиков.