Как создать надежную очередь сообщений с помощью Redis

Redis задняя часть

List

Это наиболее часто используемый метод очереди, который заключается в использовании lpush и rpop для списка, как показано на следующем рисунке:

1.png

Из-за проблемы с пустыми очередями следует ввести цикл for плюс определенное время ожидания.Псевдокод выглядит следующим образом:

for {
  if msg:=redis.rpop();msg!=nil{
    handle(msg)
  }else{
    time.sleep(1000)
  }
}

Это решение может иметь риск того, что оно не будет обработано вовремя в течение 1 с (хотя в большинстве сценариев оно не имеет никакого эффекта).

Однако в redis есть блочный оператор, который реализует блокировку pull через brpop и может получать данные вовремя, псевдокод выглядит следующим образом:

for {
  # 超时时间为 0,代表无限等待
  if msg:=redis.brpop(0);msg!=nil{
    handle(msg)
  }
}

Выглядит идеально и решает проблему несвоевременной обработки, но так как клиент redis устанавливает время тайм-аута на 0, сервер redis может определить, что клиент является недействительной ссылкой, когда он не получил сообщение в течение длительного времени, поэтому принудительно выкинул отключено от линии, поэтому, когда сообщение не очень плотное, все еще существует определенный риск, когда оно напрямую установлено на 0. Рекомендуется сохранять минимальное значение ожидания, отличное от 0 (достаточно 1 с).Псевдокод выглядит как следует:

for {
  if msg:=redis.brpop(1000);msg!=nil{
    handle(msg)
  }
}

Это не только обеспечивает производительность в реальном времени, но и позволяет избежать разрыва соединения.

Есть такая простая очередь, но надо заметить, что это не надежная очередь, в основном проблема потери сообщений: из-за отсутствия акк механизма, если потребитель падает (или новая версия выходит в онлайн) после вытягивания( rpop) messages , сообщение, скорее всего, отсутствует

Во-вторых, мы рассматриваем некоторые функции очереди сообщений:

  • Поддержка нескольких групп потребителей: поскольку это простой список, он исчезает при извлечении, что приведет к тому, что сообщение будет использовано определенным потребителем только один раз. В некоторых сценариях группы потребления это невозможно (несколько разных предприятий используют одну и ту же очередь потребления).
  • Воспроизведение сообщений: иногда необходимо откатывать сообщения (исторические данные онлайн-отслеживания ошибок, тестирование и т. д.), и в настоящее время список повторных обращений не поддерживает эту функцию.

Суммировать

  • Простой и легкий в реализации, поэтому он широко используется
  • Нет механизма подтверждения, ненадежный
  • Несколько групп потребителей не поддерживаются
  • Воспроизведение сообщений не поддерживается

Pub/Sub

Как следует из названия, это команда, сгенерированная Redis для решения проблемы распространения/подписки — опубликовать и подписаться.

# 生产者
127.0.0.1:6379> publish queue 1
(integer) 1

# 消费者 1
127.0.0.1:6379> subscribe queue
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "queue"
3) (integer) 1
1) "message"
2) "queue"
3) "1"

# 消费者 2
127.0.0.1:6379> subscribe queue
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "queue"
3) (integer) 1
1) "message"
2) "queue"
3) "1"

Это в основном решает проблему поддержки нескольких групп потребителей следующим образом:

2.png

Однако, поскольку сам pub/sub не может сохраняться, это только пересылка данных в реальном времени, что вызовет следующие проблемы:

  • После того, как потребитель выйдет из сети и снова подключится к сети, поскольку предыдущие сообщения не сохраняются, старые сообщения будут потеряны, и можно будет получать только новые сообщения.
  • После того, как все потребители отключены, никто не потребляет, и все сообщения будут потеряны.

Следовательно, для этого требуется, чтобы потребитель вышел в сеть раньше производителя, иначе сообщение будет потеряно.

Во-вторых, каждое отношение подписки является буфером.Поскольку у буфера есть верхний предел, как только верхний предел будет превышен, Redis заставит потребителя отключиться, что приведет к потере сообщения.

Здесь мы видим, что очередь на основе списка является моделью извлечения, а pub/sub — моделью, основанной на проталкивании, которая сначала помещается в буфер, а затем ожидает, пока потребители ее извлекут.

Суммировать

  • Поддержка публикации/подписки, поддержка нескольких групп потребителей
  • Потребители теряют данные, когда выходят из сети
  • Накопление сообщений запускает потребителей, что приводит к потере данных
  • Нет механизма подтверждения, ненадежный

В целом это очень безвкусно, бессмысленно, не рассматривайте этот план

Stream

Обратите внимание, что эта функция доступна только в Redis 5.0.

Stream реализует производство и потребление сообщений через xadd и xreadgroup.

# 生产者
127.0.0.1:6379> xadd queue * k1 v1
1636289446933-0
127.0.0.1:6379> xadd queue * k2 v2
1636295029388-0
127.0.0.1:6379> xadd queue * k3 v3
1636291571597-0

# 消费者 1
127.0.0.1:6379> xreadgroup group g1 c1 COUNT 1 streams queue >
1) 1) "queue"
   2) 1) 1) "1636289446933-0"
         2) 1) "k1"
            2) "v1"      
127.0.0.1:6379> xreadgroup group g1 c1 COUNT 1 streams queue >
1) 1) "queue"
   2) 1) 1) "1636295029388-0"
         2) 1) "k2"
            2) "v2"
# 消费者 2
127.0.0.1:6379> xreadgroup group g1 c2 COUNT 1 streams queue >
1) 1) "queue"
   2) 1) 1) "1636291571597-0"
         2) 1) "k3"
            2) "v3"

Подтверждение и восстановление сообщений с помощью xack и xreadgroup

# 手动 ack
127.0.0.1:6379> xack queue g1 1636289446933-0
(integer) 1

# 查询尚未提交位于 pending 中的消息
127.0.0.1:6379> xreadgroup group g1 c1 COUNT 1 streams queue 0
1) 1) "queue"
   2) 1) 1) "1636295029388-0"
         2) 1) "k2"
            2) "v2"

Большая часть потребления восстановления реализована на основе следующего кода

for {
  # 从未ack的消息开始消费
  id:=getLastUnackIDByPending()
  if id=0 {
  # 从未推送的消息开始消费
    id=">"
  }
  msg:=xreadgroup(lastUnAckID)
  handle(msg)
  xack(msg)
}

Это очень удобное решение, когда потребители остаются неизменными (номер, уникальный идентификатор и т. д.), однако, если есть изменения, необходимо внедрить дополнительное решение для запланированного опроса или решение zookeeper для обеспечения согласованности потребителей. сложнее, я не буду его здесь слишком расширять, а отдельная тема стрима будет открыта позже.

По сравнению с профессиональными очередями сообщений

Мы сравниваем очередь Redis с профессиональной очередью rabbitMQ и kafka и проводим общую инвентаризацию следующих двух функций:

  • Сообщение не потеряно
  • Низкая стоимость невыполненных сообщений

Сообщение не потеряно

Это общая тема, для достижения которой требуется сотрудничество производителей, потребителей и промежуточного программного обеспечения.

1. Производитель теряет сообщение

  • Не удалось отправить: попробуйте еще раз
  • Отправлено, но отчет потерян: в настоящее время его можно только повторить, но это может привести к повторению нисходящего сообщения.

Таким образом, производитель может только повторить попытку без потери сообщения.В это время потребитель должен рассмотреть возможность повторной попытки обработки сообщения для достижения идемпотентности.

С этой точки зрения тот факт, что производитель не теряет сообщения, не имеет ничего общего со всем промежуточным программным обеспечением, это полностью вопрос реализации бизнеса, учитываются ли вышеперечисленные исключения.

2. Потребители теряют сообщения

В основном потому, что потребитель не работает, и после его извлечения нет квитанции.

В этом сценарии промежуточное программное обеспечение требуется для обеспечения механизма подтверждения, чтобы гарантировать, что сообщения были использованы, чтобы гарантировать, что сообщения не будут потеряны.

На данный момент поток redis совместим с kafka и rabbitMQ, и оба имеют идеальный механизм подтверждения.

3. Случай с потерянными сообщениями посередине

Именно так реализовано промежуточное ПО.

По словам босса@kaitoТем не менее, у Redis есть две точки риска:

  • Aof периодически обновляет диск, этот процесс асинхронный, и есть риск потери
  • Переключение master-slave, ведомая библиотека будет отнесена к основной библиотеке перед синхронизацией (если есть сомнения, можно ли ее повысить до основной библиотеки, даже если она не синхронизирована 🤔️)

С другой стороны, kafka и rabbitMQ используют один узел записи и несколько узлов для подтверждения одновременно, прежде чем они сочтут запись успешной, что еще больше повышает надежность сообщения.

ожидание сообщения

  • Redis: в зависимости от памяти, ограниченное пространство для хранения, сообщения будут удаляться после того, как невыполненная работа достигнет определенного уровня.
  • kafka, rabbitMQ: на основе жесткого диска считается неограниченным пространством для хранения

Суммировать

  • Поддержка ack на основе публикации/подписки
  • Поддержка воспроизведения сообщений
  • При изменении потребителя необходимо вводить дополнительное кодирование для обеспечения достоверности сообщения, что более сложно
  • Сообщения по-прежнему подвержены риску потери из-за сбоя компонента

Ссылаться на

Как Redis работает с очередями сообщений?

Официальный сайт Redis Stream тема