[Перевод] - Введение в Redis-Streams

Redis

макет: сообщение категория: [Redis,Перевод] теги:

  • Redis
  • перевести

дружеское напоминание

Когда я учился в колледже, я не сдал CET 6, поэтому, если вы немного знаете английский, если вы зайдете на эту страницу, попробуйте прочитать оригинальный текст, ссылка находится по исходному адресу ниже. читать против исходного текста, чтобы не ошибиться в чем-то(Это неизбежно), стравливал других друзей.

Если вы обнаружите какую-либо двусмысленность в переводе, пожалуйста, прокомментируйте или отправьте электронное письмо по адресуhuyanshi2580@gmail.com

исходный адрес

Эта статья переведена сВнедрение Stream на официальном сайте Reids.

Недавно мне нужно было узнать о новой структуре данных Redis.Stream.Поскольку это относительно новая технология, китайских материалов относительно мало.Я нашел введение автора в Stream на официальном сайте Redis.Прочитав его, я получил много пользы.

При этом, чтобы зафиксировать и углубить понимание, я решил перевести исходный текст и записать его в блог.


Следующее содержимое представляет собой исходный текст под названием «Введение в потоки Redis».


Поток — это новый тип данных, представленный в Redis 5.0, который более абстрактно имитирует структуру данных журнала, однако сущность журнала остается неизменной: как и файл журнала, он обычно реализуется как файл, открытый в режиме только для добавления. Потоки Redis в основном представляют собой структуру данных только для добавления. По крайней мере концептуально, поскольку Redis Streams — это абстрактный тип данных в памяти, он реализует более мощные операции для преодоления ограничений самого файла журнала.

Что делает Redis Streams очень сложным, так это то, что, хотя сама структура данных Stream очень проста, она реализует дополнительные необязательные функции: набор блокирующих операций, которые позволяют потребителям ждать новых данных, добавленных в поток производителем, в дополнение к There это концепция под названием «Группы потребителей».

Группы потребителей изначально были представлены Kafka (TM), популярной системой обмена сообщениями. Redis реализует аналогичную идею совершенно по-другому, но с той же целью: позволить группе клиентов совместно потреблять разные части одного и того же потока сообщений.

Основы потоков

Чтобы понять, что такое Redis Streams и как их использовать, мы проигнорируем все расширенные функции и вместо этого сосредоточимся на самой структуре данных с точки зрения команд, используемых для управления ею и доступа к ней. Это в основном часть, общая для большинства других типов данных Redis, таких как списки, наборы, отсортированные наборы и т. д. Однако обратите внимание, что у списков также есть необязательный более сложный API блокировки, что-то вродеBLPOPЖдать. Так что Streams в этом отношении мало чем отличается от Lists, просто дополнительный API более сложный и мощный.

Поскольку Stream является структурой данных только для добавления, базовая команда записи (называемая XADD) будет добавлять новые записи в указанный поток. Запись потока представляет собой не просто строку, а состоит из одного или нескольких列-值парная композиция. Таким образом, каждая запись потока уже структурирована как файл, написанный только для добавления в формате CSV, с несколькими отдельными полями в каждой строке.

XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

лицевой стороной вверхXADDвызов команды, где находится ключmystreamПоток добавляется со значениемsensor-id: 123, temperature: 19.8запись, которая использует идентификатор записи1518951480106-0, генерируется автоматически и генерируетсяXADDвозвращается командой. Это будет имя ключаmystreamВ качестве первого параметра вторым параметром является идентификатор записи, который идентифицирует каждую запись в потоке. Однако в приведенном выше примере мы использовали*, потому что мы хотим, чтобы сервер генерировал для нас новые идентификаторы. Каждый новый идентификатор монотонно увеличивается, и, проще говоря, каждая новая добавленная запись будет иметь более высокий идентификатор, чем все записи в прошлом. Автоматически сгенерированные сервером идентификаторы почти всегда являются тем, что вам нужно, и есть очень редкие причины для явного указания идентификаторов. Мы обсудим это подробно позже. Подобно тому, как файлы журналов имеют номера строк или смещения байтов внутри файла, каждая запись имеет идентификатор — это еще одна особенность Stream, аналогичная файлам журналов.Возвращаясь к нашему примеру XADD, после имени ключа и идентификатора следующим параметром является столбец. пары значений, которые составляют нашу запись Stream.

Просто используйте команду XLEN, чтобы получить количество элементов в потоке:

> XLEN mystream
(integer) 1

идентификатор записи

Идентификатор записи задаетсяXADDКоманда возвращает, однозначно идентифицируя каждую запись в данном потоке, состоит из двух частей.

<millisecondsTime>-<sequenceNumber> | 毫秒时间-序列号

Часть времени в миллисекундах — это локальное время узла Redis, который генерирует идентификатор потока, однако, если текущее время в миллисекундах оказывается меньше предыдущего времени входа, используется предыдущее время входа, поэтому, если часы откатываются назад, монотонно возрастающее свойство ID все еще существует. Порядковые номера используются для записей, созданных в течение одной миллисекунды. Поскольку порядковые номера являются 64-битными, количество записей, которые могут быть сгенерированы за одну миллисекунду, не ограничено.

Формат этих идентификаторов поначалу может показаться странным, и благонамеренные читатели могут задаться вопросом, почему время является частью идентификатора. Причина в том, что Redis Stream поддерживает запросы диапазона на основе идентификаторов. Поскольку идентификатор связан со временем создания записи, это делает запросы на основе временного диапазона практически бесплатными == бесплатно== в исходном тексте. мы собираемся использоватьXRANGEЗная это при заказе,

Если по какой-либо причине пользователю требуется инкрементный идентификатор, который не связан со временем, но фактически связан с другим идентификатором внешней системы, как упоминалось ранее,XADDКоманды могут принимать явные идентификаторы вместо использования*Подстановочные знаки для запуска автоматического создания идентификатора, как в следующем примере:

XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Обратите внимание, что в этом случае минимальный идентификатор равен 0-1, и команда не будет принимать идентификаторы, равные или меньшие, чем предыдущий идентификатор:

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Получить данные из потока

Теперь мы можем, наконец, пройтиXADDДобавлена ​​запись в наш поток. Однако добавление данных к потоку очень явное, но запрос потока для извлечения данных не такой явный. Если мы продолжим аналогию с файлом журнала, очевидный способ сделать это — имитировать команды Unix, которые мы обычно используем.tail -fdo, то есть мы можем начать слушать новые сообщения, прикрепленные к Stream. Обратите внимание, что это отличается от операции блокировки списков Redis.BLPOPТакие операции в популярном стиле блокируют его доступ к одному клиенту, тогда как в Stream мы хотим, чтобы несколько потребителей видели новые сообщения, добавленные к Stream, точно так же, как несколько потребителей.tail -fТаким образом, процессы могут видеть, что добавлено в журнал. Используя традиционную терминологию, мы хотим, чтобы Stream мог разветвлять сообщения == разветвлять == нескольким клиентам.

Однако это только один из возможных режимов доступа. Мы также можем посмотреть на Stream совсем по-другому: не как на систему обмена сообщениями, а как на хранилище временных рядов. Получение вновь добавленной информации также полезно в этом случае, но другим естественным шаблоном запроса является получение сообщений по временному диапазону или использование курсора для перебора сообщений для пошагового просмотра всей истории. Это определенно еще один полезный режим доступа.

Наконец, если мы посмотрим на поток с точки зрения потребителя, мы можем захотеть получить доступ к потоку другим способом, т. е. как к потоку сообщений, который может отделить нескольких потребителей от обработки этих сообщений. увидеть подмножество информации, поступающей в поток.Таким образом, обработка сообщений может выполняться между разными потребителями, не требуя от одного потребителя обработки всех сообщений: каждому потребителю нужно только обработать отдельное сообщение. По сути, это группа потребителей в Kafka(TM). Чтение сообщений через группы потребителей — еще один интересный шаблон для чтения из Redis Streams.

Redis Stream поддерживает три вышеуказанных режима запросов с помощью разных команд. В следующих разделах они будут показаны, начиная с самого простого использования: запросов диапазона.

Запрос диапазона:XRANGEиXREVRANGE.

Чтобы запросить поток по диапазону, нам просто нужно указать два идентификатора, начало и конец. Возвращаемый диапазон будет включать элементы с начальным и конечным идентификаторами, поэтому диапазон включает первый и последний элементы. Эти два специальных идентификатора-и+означает наименьший и наибольший возможный идентификатор соответственно.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Каждая возвращаемая запись представляет собой массив из двух элементов: идентификатора и списка пар столбец-значение. Мы уже говорили, что идентификаторы записей связаны со временем, потому что-Левая часть — это Unix-время (в миллисекундах) локального узла, на котором была создана запись Stream (но обратите внимание, что Stream реплицируется с использованием полностью указанной команды XADD, поэтому ведомое устройство будет иметь тот же идентификатор, что и ведущее). Это означает, что я могу использоватьXRANGEЗапросите диапазон раз. Однако для этого я могу захотеть опустить порядковую часть идентификатора: если она опущена, предполагается, что минимальное значение диапазона равно 0, а максимальное значение будет считаться максимально доступным порядковым номером. Таким образом, всего за два миллисекундных запроса Unix мы можем получить все записи, сгенерированные за этот период времени. Например, я могу запросить:

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

У меня есть только одна запись в этом временном диапазоне, но в фактическом наборе данных я могу запросить диапазон часов или, может быть, много элементов за две миллисекунды, поэтому возвращаемые результаты могут быть большими. следовательно,XRANGEНаконец поддерживает необязательныйCOUNTопции. Указав число, я могу получить только первые N элементов. Если мне нужно больше, я могу получить последний идентификатор, увеличить серийный номер на единицу и запросить снова. Давайте поймем это в следующем примере, мы начнем сXADDДобавьте 10 элементов (я не указал это, предполагая, что Stream mystream уже заполнен 10 элементами). Чтобы начать свой обход, получая по 2 элемента на команду, я начинаю поиск с полного диапазона, но указываю число 2.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

Чтобы продолжить обход следующих двух элементов, мне нужно вернуть последний идентификатор, т.е.1519073279157-0и увеличить его часть серийного номера на 1. Обратите внимание, что цифры серийного номера имеют длину 64 бита, поэтому нет необходимости проверять наличие переполнения. Сгенерированный идентификатор,1519073279157-1теперь можно использовать как следующийXRANGEНовые параметры начала звонка:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

Как выше. так какXRANGEВременная сложность поиска составляет O(log(N)), затем он возвращает M элементов за время O(M), поэтому эта команда имеет логарифмическую временную сложность, что означает, что каждый шаг обхода выполняется быстро. следовательноXRANGEТакже фактический итератор потока == де-факто не переводит ==, нет необходимостиXSCANЗаказ.

ЗаказXREVRANGEиXRANGEаналогично, просто возвращает элементы в обратном порядке, поэтомуXREVRANGEФактическое использование состоит в том, чтобы проверить, что является последним элементом в потоке:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Уведомление,XREVRANGEКоманда принимает параметры запуска и остановки в обратном порядке.

использоватьSREADСледите за новыми проектами

Когда мы не хотим анализировать элементы в потоке по области действия, обычно нам нужно подписаться на новые элементы, поступающие в поток. Эта концепция может появиться в связи с публикацией/подпиской Redis, когда вы подписываетесь на канал или список блокировки Reids, а затем ждете определенного ключа, чтобы получить последний поступивший элемент, Но это в основном связано с вашим потреблением Поток другой:

  1. У потока может быть несколько клиентов (потребителей), ожидающих данных. По умолчанию каждый новый элемент будет доставлен каждому потребителю, ожидающему данные в указанном потоке. Это поведение отличается от черного списка, где каждый потребитель получает отдельный элемент. Однако возможность разветвления для нескольких потребителей аналогична публикации/подписке.

  2. В pub/sub сообщения направляются автономно и никогда не сохраняются, в списках блокировки, когда клиент получает сообщение, оно выталкивается из списка (фактически удаляется), поток работает совершенно по-другому, все сообщения добавляются к потоку на неопределенный срок ( если пользователь явно не просит удалить запись): разные потребители определяют, что нового, запоминая идентификатор последнего полученного сообщения.

  3. Группы потребителей потоков (==Группа потребителей потока==) обеспечивают уровень контроля, недоступный для списков публикации/подписки или списков блокировки, различных групп в одном потоке, однозначного подтверждения обработанных элементов, возможности проверки ожидающих элементов, необработанных. сообщения и согласованная историческая видимость отдельных клиентов могут только просматривать их частные исторические записи потребления сообщений.

Команда, обеспечивающая возможность прослушивания новых сообщений, поступающих в поток, называетсяXREAD. это лучше, чемXRANGEЭто немного сложнее, поэтому мы начнем показывать простую форму, а позже предоставим весь макет команды.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

ВышеXREADнеблокирующая форма. Уведомление,COUNTпараметры не требуются, на самом деле единственными обязательными параметрами для этой команды являютсяSTREAMSпараметр, который указывает список ключей и максимальный идентификатор, который потребитель видел для указанного потока, поэтому команда будет обслуживать сообщения только для клиента с идентификатором, превышающим тот, который мы указали.

В приведенной выше команде мы написалиSTREAMS mystream 0, мы хотим получитьmystreamВсе идентификаторы в потоке больше, чем0-0Новости. Как вы можете видеть в приведенном выше примере, команда возвращает имя ключа, поскольку на самом деле эту команду можно вызвать с несколькими ключами для одновременного чтения из разных потоков. Я могу написать,STREAMS mystream otherstream 0 0, Обратите внимание, что после опции STREAMS нам нужно указать ключ, а затем идентификатор. следовательно,STREAMSОпция всегда должна быть последней.

КромеXREADПомимо возможности одновременного доступа к нескольким потокам и нашей возможности указать последний идентификатор, который нам нужен для получения обновленных сообщений, в этой простой форме нет ничего общего сXRANGEразные вещи. Однако интересная часть заключается в том, что мы можем сделать это, указавBLOCKАргументы легко используются в блокирующих командах.XREAD:

> XREAD BLOCK 0 STREAMS mystream $

Обратите внимание, что в приведенном выше примере, в дополнение к удалениюCOUNпараметры, я указал новыеBLOCKвариант с тайм-аутом 0 мс (что означает никогда тайм-аут). и,mystreamВместо обычного идентификатора потока я использовал специальный идентификатор.$. Этот специальный идентификатор означаетXREADследует использовать потокmystreamСамый большой ID, который в нем сохранился, так что начинаем слушать и будем получать только новые сообщения. Это чем-то похоже на команду Unixtail -f.

Обратите внимание, что использованиеBLOCKвариант, нам не нужно использовать специальный идентификатор$. Мы можем использовать любой действительный идентификатор. Если команда может обслужить наш запрос немедленно без блокировки, она это сделает, в противном случае она будет заблокирована. Как правило, если мы хотим начать использовать поток из новой записи, мы начинаем с идентификатора$Старт, после этого продолжаем использовать ID последнего полученного сообщения для совершения следующего звонка и так далее.

XREADБлокирующая форма также может прослушивать несколько потоков, указав несколько имен ключей. Возвращает результат, если запрос может быть обслужен синхронно, потому что хотя бы в одном потоке есть элемент с большим идентификатором, чем тот, который мы указали. В противном случае команда заблокируется и вернет первый элемент Stream, получивший новые данные (согласно указанному ID).

Подобно операциям со списками блокировки, блокирующий поток является справедливым с точки зрения клиента, ожидающего чтения данных, поскольку применяется политика FIFO. Первый заблокированный клиент данного потока также первым получает новый элемент.

XREADне кромеCOUNTиBLOCK, так что это очень простая команда со специальной функциональностью для подключения потребителей к одному или нескольким потокам.Более мощной функцией использования потоков является использование API группы потребителей. Но использование группы потребителей для чтения информации требует другой команды,XREADGROUP, Это будет рассмотрено в следующем разделе этого руководства.

группа потребителей

Когда задача состоит в том, чтобы использовать один и тот же поток с использованием разных клиентов,XREADПредусмотрено разветвление для N клиентов, а также используются подчиненные серверы для обеспечения большей масштабируемости чтения. Однако существует определенная проблема: мы хотим не предоставлять один и тот же поток сообщений многим клиентам, а предоставлять многим клиентам разные подмножества сообщений из одного и того же потока. Очевидным примером является медленная обработка сообщений: возможность иметь N разных рабочих процессов, получающих разные части потока, путем перенаправления разных сообщений одному, который может выполнять больше работы (== мощная вычислительная мощность или простаивающие в данный момент ==) от разных рабочих процессов к расширить работу по обработке сообщений.

На самом деле, если мы представим, что есть три потребителя C1, C2, C3 и поток, содержащий сообщения 1, 2, 3, 4, 5, 6, 7, то нам нужна служба сообщений, подобная показанной ниже:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Для достижения этого эффекта Redis использует концепцию, называемую группами потребителей. Очень важно понимать, что группы потребителей Redis не имеют ничего общего с тем, как реализованы группы потребителей Kafka(TM), просто по концепции реализации они просто похожи, поэтому я решил сравнить с программными продуктами, популяризировавшими эту идею. во-первых, не меняйте терминологию.

Группа потребителей действует как псевдопотребитель, извлекая данные из потока и фактически обслуживая несколько потребителей, предоставляя следующие гарантии:

  1. Каждое сообщение обслуживается отдельному потребителю, поэтому невозможно доставить одно и то же сообщение нескольким потребителям.
  2. Потребители в группе потребителей идентифицируются по имени, которое представляет собой строку с учетом регистра, которую должен выбрать клиент-потребитель. Это означает, что группа потребителей потока сохраняет все состояние даже после отключения, поскольку клиент снова будет утверждать, что является тем же потребителем. Однако это также означает, что уникальный идентификатор предоставляется клиентом.
  3. Каждая группа потребителей имеет понятие первого идентификатора, который никогда не использовал go, поэтому, когда потребитель запрашивает новое сообщение, он может доставлять только те сообщения, которые никогда не доставлялись ранее.
  4. Однако для явного подтверждения потребляемого сообщения требуется специальная команда: это сообщение было обработано правильно и, следовательно, может быть исключено из группы получателей.
  5. Группа потребителей отслеживает все сообщения, которые в настоящее время ожидают обработки, то есть сообщения, доставленные потребителю группы потребителей, которые еще не были подтверждены как обработанные. Благодаря этой функции каждый потребитель будет видеть только доставленные ему сообщения при доступе к истории сообщений потока.

В некотором смысле группу потребителей можно представить как некое состояние о потоке:

+----------------------------------------+
| consumer_group_name: mygroup           |
| consumer_group_stream: somekey         |
| last_delivered_id: 1292309234234-92    |
|                                        |
| consumers:                             |
|    "consumer-1" with pending messages  |
|       1292309234234-4                  |
|       1292309234232-8                  |
|    "consumer-42" with pending messages |
|       ... (and so forth)               |
+----------------------------------------+

Если вы посмотрите на это с этой точки зрения, станет очень просто понять, что может сделать группа потребителей, как предоставить потребителям их ожидающую историю и как обрабатывать только запросы потребителей на новые сообщения, только если идентификатор сообщения больше, чем идентификатор сообщения.last_delivered_id. В то же время, если рассматривать группу потребителей как вспомогательную структуру данных Redis Stream, очевидно, что один поток может иметь несколько групп потребителей с разными наборами потребителей. На самом деле тот же Stream может даже пропустить клиентаXREADЧтение клиентов без потребительских групп и клиентов черезXREADGROUPчитать из разных групп потребителей.

Теперь пришло время подробно рассмотреть основные команды для использования групп потребителей, а именно:

  • XGROUPИспользуется для создания, уничтожения и управления группами потребителей.
  • XREADGROUPИспользуется для чтения из потока по группе групп потребителей.
  • XACKэто команда, которая позволяет потребителям помечать ожидающие сообщения для надлежащей обработки.

Создайте группу потребителей

Предположим, у меня уже есть файл с именемmystreamStream, чтобы создать группу потребителей, мне нужно сделать следующее:

> XGROUP CREATE mystream mygroup $
OK

Примечание. В настоящее время невозможно создать группы потребителей для несуществующих потоков, но в краткосрочной перспективе мы можемXGROUPВ этом случае добавьте в команду параметр для создания пустого потока.

Как вы можете видеть в приведенной выше команде, при создании группы потребителей мы должны указать идентификатор, в примере это$. Это необходимо, потому что группа потребителей помимо других состояний должна знать, какие сообщения обрабатывать после подключения, т.е. какой был последний идентификатор сообщения, когда группа только что создавалась? Если по нашему$, то отныне потребителям в этой группе будут обслуживаться только новые сообщения, поступающие в поток. Если мы укажем0, группа потребителей будет потреблять все записи сообщений в истории Stream. Конечно, вы можете указать любой другой действительный идентификатор. Все, что вы знаете, это то, что группа потребителей начнет потреблять сообщения с идентификатором, превышающим указанный вами. так как$Указывает текущий самый большой идентификатор в потоке, поэтому укажите$Будут потребляться только новые сообщения.

Теперь, когда группа потребителей создана, мы можем использоватьXREADGROUPКоманда сразу же начинает пытаться читать сообщения через группу потребителей. Мы прочитаем от потребителя, что потребитель названAliceиBob, посмотрите, как система будет возвращать разные сообщения Алисе и Бобу.

XREADGROUPочень похоже наXREAD, также обеспечивает то же самоеBLOCKвариант, в противном случае это синхронная инструкция. Однако обязательная опция всегда должна быть указанаGROUP, который принимает два параметра: имя группы потребителей и имя потребителя, пытающегося прочитать. Также поддерживает опцииCOUNT, что то же самое, чтоXREADВ то же самое.

Прежде чем читать информацию из Stream, давайте поместим в него несколько сообщений:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Примечание: здесь,message— это имя столбца, а плод — это значение.Помните, что элементы Stream — это небольшой словарь.

Теперь пришло время попробовать что-нибудь прочитать, используя группы потребителей.

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

XREADGROUPответ такойXREADОтветьте тем же. Но обратите внимание на приведенное вышеGROUPсередина<group-name> <consumer-name>, это показывает, что я хочу использовать группу потребителей изmystreamчитать сообщения в и я потребитель Алиса. Каждый раз, когда потребитель выполняет операцию с группой потребителей, он должен указать свое имя, которое однозначно идентифицирует этого потребителя в этой группе.

В приведенной выше команде есть еще одна очень важная деталь, в обязательной опцииSTREAMSПосле этого запрошенный ID является специальным ID>. Этот специальный идентификатор действителен только в контексте группы потребителей, это означает: до сих пор сообщение никогда не было доставлено другим потребителям.

Это почти всегда то, что вам нужно, но также можно указать реальный идентификатор, например.0или любое другое действительное удостоверение личности. Но в этом случае мы просимXREADGROUPДайте нам историю ожидающих сообщений, чтобы никогда не видеть новых сообщений в группе. Так что в основномXREADGROUPИмеет следующее поведение в зависимости от указанного нами идентификатора:

  • Если идентификатор является специальным идентификатором>, то команда будет возвращать только новые сообщения, которые до сих пор не были доставлены другим потребителям, и обновит идентификатор последнего сообщения группы потребителей.
  • Если идентификатор является любым другим действительным числовым идентификатором, эта команда позволит нам получить доступ к истории ожидающих сообщений. То есть набор сообщений (идентифицируемых по предоставленному имени), доставленных этому указанному потребителю, никогда не использовавшихся до сих пор.XACKПодтвержденный.

Теперь мы можем протестировать это поведение, указав идентификатор как0, не любойCOUNTВариант: Мы увидим только одно ожидающее сообщение, про яблоко:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Однако, если мы подтвердим обработанное сообщение, оно не будет отсортировано в историю ожидающих сообщений, поэтому система ничего не сообщит.

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Разве ты еще не знаешьXACKПринцип работы заключается в том, что обработанные сообщения больше не являются частью истории, к которой у нас есть доступ.

Теперь твоя очередьBobПрочтите некоторую информацию.

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Бобу требуется не более двух сообщений, и он читает одну и ту же группу mygroup. Что ж, Redis будет сообщать только о новых сообщениях. Как вы видете,appleСообщение не доставлено, потому что оно было доставлено Алисе, поэтому Боб получает апельсины, клубнику и так далее.

Таким образом, Алиса, Боб и любые другие потребители в группе могут читать разные сообщения из одного и того же потока, читать историю сообщений, которые они еще не обработали, или помечать сообщения как обработанные. Это позволяет создавать различные топологии и семантику для использования сообщений Stream.

Несколько замечаний:

  • Потребители создаются автоматически при первом упоминании, и их не нужно создавать явно.
  • использоватьXREADGROUP, вы также можете читать несколько ключей одновременно, но для этого вам нужно создать группу потребителей с одинаковым именем в каждом потоке. Это не является общим требованием, но стоит отметить, что эта функция технически доступна.
  • XREADGROUPявляется командой записи, потому что даже если она читает из потока, ее побочный эффект изменяет группу потребителей, поэтому ее можно вызывать только в основном экземпляре.

Ниже приведен пример реализации потребителя с использованием групп потребителей, написанных на языке Ruby. Код Ruby написан таким образом, что почти любой опытный программист, не знающий Ruby, может его прочитать:

require 'redis'

if ARGV.length == 0
    puts "Please specify a consumer name"
    exit 1
end

ConsumerName = ARGV[0]
GroupName = "mygroup"
r = Redis.new

def process_message(id,msg)
    puts "[#{ConsumerName}] #{id} = #{msg.inspect}"
end

$lastid = '0-0'

puts "Consumer #{ConsumerName} starting..."
check_backlog = true
while true
    # Pick the ID based on the iteration: the first time we want to
    # read our pending messages, in case we crashed and are recovering.
    # Once we consumer our history, we can start getting new messages.
    if check_backlog
        myid = $lastid
    else
        myid = '>'
    end

    items = r.xreadgroup('GROUP',GroupName,ConsumerName,'BLOCK','2000','COUNT','10','STREAMS',:my_stream_key,myid)

    if items == nil
        puts "Timeout!"
        next
    end

    # If we receive an empty reply, it means we were consuming our history
    # and that the history is now empty. Let's start to consume new messages.
    check_backlog = false if items[0][1].length == 0

    items[0][1].each{|i|
        id,fields = i

        # Process the message
        process_message(id,fields)

        # Acknowledge the message as processed
        r.xack(:my_stream_key,GroupName,id)

        $lastid = id
    }
end

Как видите, идея здесь в том, чтобы начать потреблять историю, наш список ожидающих сообщений. Это полезно, потому что потребитель мог падать раньше, поэтому в случае перезапуска мы хотим снова прочитать сообщения, которые были отправлены нам, но не подтверждены. Таким образом, мы можем обработать сообщение несколько раз или один раз (в случае сбоя потребителя, но Redis также имеет ограничения по долговечности и репликации, см. специальный раздел по этой теме).

После потребления истории мы получаем пустой список сообщений и можем переключиться на использование специального ID>потреблять новые сообщения.

восстановиться после постоянной неудачи

Приведенный выше пример позволяет нам создавать потребителей, которые входят в одну и ту же группу потребителей, обрабатывать каждое подмножество сообщений и восстанавливаться после сбоев. Однако в реальном мире потребитель может выйти из строя навсегда и никогда не восстановиться.Что происходит с ожидающими сообщениями потребителя после того, как он по какой-либо причине останавливается и не может восстановиться?

Группы потребителей Redis предоставляют функцию, которая используется именно в этой ситуации, объявляя ожидающие сообщения для данного потребителя, так что такие сообщения меняют владельца и переназначаются другим потребителям. Функция очень явная, потребитель должен проверить список ожидающих сообщений и должен объявить конкретное сообщение с помощью специальной команды, иначе сервер навсегда назначит ожидающее сообщение старому потребителю, чтобы разные приложения могли выбирать, использовать ли такое сообщение. функции и как ею пользоваться.

Первым шагом в этом процессе является команда, которая обеспечивает наблюдаемость ожидающих записей в группе потребителей, называемаяXPENDING. Это только команда только для чтения, ее вызов всегда безопасен и не изменяет владельца сообщения. В своей простейшей форме команда вызывается только с двумя аргументами: именем потока и именем группы потребителей.

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

При таком вызове команда просто выводит общее количество ожидающих сообщений в группе потребителей, в текущем случае есть только два сообщения, младшие и старшие идентификаторы сообщений в ожидающих сообщениях и, наконец, список потребителей и их количество ожидающих сообщений. У нас есть только два сообщения для Боба, потому что единственное сообщение, которое запрашивает Алиса, используетXACKподтверждено.

Мы можем сделать это, давXPENDINGУкажите больше параметров, чтобы запросить дополнительную информацию, поскольку полная сигнатура команды выглядит следующим образом:

XPENDING <key> <groupname> [<start-id> <end-id> <count> [<conusmer-name>]]

Предоставляя начальный и конечный ID (также доступны, как вXRANGEТолько то же самое-и+) и количество информации, возвращенной командой, управляемой количеством, мы смогли узнать больше об ожидающих сообщениях. Необязательный конечный параметр (имя группы потребителей) используется, если мы хотим ограничить вывод только ожидающими сообщениями для данной группы потребителей, но мы не будем использовать его в приведенном ниже примере.

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Теперь у нас есть детали для каждого сообщения: идентификатор, имя потребителя, время простоя в миллисекундах (т. е. сколько миллисекунд прошло с момента последней доставки сообщения потребителю) и, наконец, получатель данного сообщения. Сколько раз оно было отправлено. . У нас есть два сообщения от Боба, они бездействуют74170458миллисекунд, около 20 часов.

Обратите внимание, что никто не мешает нам проверить содержание первого сообщения, используяXRANGEможет.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Нам просто нужно дважды повторить один и тот же идентификатор в параметре. Теперь, когда у нас есть некоторые идеи, Алиса может решить, что после 20 часов отсутствия обработки сообщений Боб может не успеть вовремя восстановиться, и пришло время потребовать эти сообщения и продолжить обработку от имени Боба. Для этого мы используемXCLAIMЗаказ.

Форма полных параметров этой команды довольно сложна, поскольку она используется для репликации изменений в группу потребителей, но мы будем использовать только те параметры, которые нам обычно нужны. В этом случае просто вызовите его следующим образом:

XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ... <ID-N>

По сути, для данного ключа и группы я хочу изменить право собственности на сообщение с указанным идентификатором и назначить его указанному имени как<consumer>потребителей. Однако мы также предоставляем минимальное время простоя, поэтому операция будет работать только в том случае, если вышеуказанное сообщение имеет время простоя больше указанного времени простоя. Это полезно, потому что могут быть два клиента, которые одновременно пытаются запросить сообщение:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

Однако при получении сообщения побочный эффект сбросит его время простоя! и увеличит свой счетчик количества принятых сообщений, поэтому второй клиент не сможет заявить об этом. Таким образом мы избегаем нежелательной повторной обработки сообщения (хотя в общем случае вы не получаете однократной обработки).

Вот результат выполнения команды:

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Алиса успешно забрала сообщение и теперь может обработать и подтвердить сообщение и двигаться дальше, даже если первоначальный потребитель не восстановился.

Как видно из приведенного выше примера, в качестве побочного эффекта успешного запроса данного сообщенияXCLAIMКоманда также возвращает его. Но это не обязательно.JUSTIDВозможность вернуть сообщение об успешном завершении претензии. Эта опция полезна, если вы хотите уменьшить пропускную способность, используемую между клиентом и сервером, а также повысить производительность команды, и вас не интересует сообщение, потому что позже способ реализации вашего потребителя будет повторно сканировать ожидающие информация об истории.

Претензии также могут быть получены с помощью отдельного процесса: тот, который просто проверяет список ожидающих сообщений и назначает незанятые сообщения кажущимся активным потребителям. Активных потребителей можно получить с помощью функции наблюдения Redis Stream. Это тема следующего раздела.

Претензия и отправка счетчика

ВыXPENDINGСчетчик, наблюдаемый в выводе команды, — это количество доставок на сообщение. Этот счетчик увеличивается в двух случаях: при передачеXCLAIMпри успешном запросе сообщения или при использованииXREADGROUPПри вызове для доступа к истории необработанных сообщений.

Это нормально, когда сообщения доставляются несколько раз в случае сбоя, но в конечном итоге они обычно обрабатываются. Однако обработка данного сообщения иногда может быть проблематичной, поскольку оно повреждено или создано таким образом, что вызывает ошибку в коде обработки (== не совсем нормально ==). В этом случае происходит то, что потребитель постоянно не может обработать это конкретное сообщение. Поскольку у нас есть счетчик попыток доставки, мы можем использовать этот счетчик, чтобы определить, почему сообщение не может быть обработано. Поэтому может быть разумнее поместить эти сообщения в другой поток и отправить уведомление системному администратору, как только счетчик отправки достигнет числа по вашему выбору. Это в основном реализация потоковой передачи Redis.мертвое сообщениепуть концепции.

Наблюдать за потоком

С системами обмена сообщениями, которым не хватает наблюдаемости, трудно иметь дело. Не зная, кто потребляет сообщения, какие сообщения ожидают и какие группы потребителей активны в данном потоке, все становится непрозрачным. По этой причине у Redis Streams и групп потребителей есть разные способы наблюдения за происходящим. Мы уже представилиXPENDING, что позволяет нам проверить список сообщений, обрабатываемых в данный момент, а также их время простоя и количество доставленных сообщений.

Однако мы можем пожелать сделать больше,XINFOКоманда — это интерфейс наблюдения, который можно использовать с подкомандами для получения информации о потоке или группе потребителей.

Эта команда использует подкоманды для отображения различной информации о состоянии Stream и его групп потребителей. Например, используйтеXINFO STREAMСообщить информацию о самом потоке.

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

Вывод показывает информацию о том, как кодируется внутри потока, а также показывает первое и последнее сообщение в потоке. Другая часть доступной информации — это количество групп потребителей, связанных с этим потоком. Мы можем копать дальше для получения дополнительной информации о группах потребителей.

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

Как вы можете видеть в этом выводе и предыдущем,XINFOКоманда выводит ряд элементов значений столбца. Поскольку это команда для наблюдения, она позволяет пользователю-человеку сразу понять сообщаемую информацию и позволяет команде сообщать больше информации, добавляя дополнительные поля, не нарушая совместимости со старыми клиентами. Другие команды, которые должны повысить эффективность полосы пропускания, такие какXPENDING, который сообщает только информацию без имен полей.

Вывод приведенного выше примера (с использованиемGROUPSподкоманда) должны четко соблюдать имена полей. Мы можем изучить статус определенной группы потребителей более подробно, изучив потребителей, зарегистрированных в этой группе.

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

Если вы не помните синтаксис команды, просто вызовите справку по самой команде:

> XINFO HELP
1) XINFO <subcommand> arg arg ... arg. Subcommands are:
2) CONSUMERS <key> <groupname>  -- Show consumer groups of group <groupname>.
3) GROUPS <key>                 -- Show the stream consumer groups.
4) STREAM <key>                 -- Show information about the stream.
5) HELP                         -- Print this help.

Отличия от Kafka Partitioning

Группы потребителей в Redis Stream могут быть чем-то похожи на группы потребителей на основе разделов Kafka(TM), но обратите внимание, что Redis Stream на самом деле сильно отличается. Разделы — это просто логические разделы, сообщения просто помещаются в ключ Redis, поэтому то, как обслуживаются разные клиенты, зависит от того, кто может обрабатывать новые сообщения, а не от того, какой клиент, разделенный на разделы, читает. Например, если потребитель C3 в какой-то момент выйдет из строя навсегда, Redis продолжит обслуживать C1 и C2, и все новые сообщения будут поступать так, как если бы теперь было только два логических раздела.

Точно так же, если данный потребитель обрабатывает сообщения намного быстрее, чем другие, этот потребитель получит соответственно больше сообщений в ту же единицу времени. Это возможно, потому что Redis явно отслеживает все неподтвержденные сообщения и запоминает, кто какое сообщение получил, а также идентификатор первого сообщения, которое так и не было доставлено потребителям.

Однако это также означает, что в Redis, если вы действительно хотите разделить сообщения об одном и том же потоке на несколько экземпляров Redis, вам нужно использовать несколько ключей и некоторую систему сегментирования, такую ​​как Redis Cluster или другую, специфичную для какой-либо системы сегментирования приложения). Один поток Redis не разделяется автоматически на несколько экземпляров.

Можно сказать, что следующий график верен:

  • Если вы используете 1 поток -> 1 потребитель, сообщения обрабатываются последовательно.
  • Если вы используете N потоков для N потребителей, то только данный потребитель потребляет подмножество из N потоков, вы можете масштабировать приведенную выше модель до 1 потока -> 1 потребитель.
  • Если вы используете 1 поток -> N потребителей, балансировка нагрузки на N потребителей, но в этом случае обработка сообщений может быть не в порядке, так как данный потребитель может обрабатывать сообщение 3 больше, чем другой потребитель, обрабатывающий сообщение 4 быстрее.

Таким образом, в основном разделение Kafka больше похоже на использование N различных ключей Redis. Группа потребителей Redis — это балансировка нагрузки системы обмена сообщениями от заданного потока к N различным потребителям.

верхний предел потока

Многие приложения не хотят постоянно собирать данные в поток. Иногда полезно иметь не более заданного количества элементов в потоке, а иногда полезно переместить данные из Redis на носитель, который не находится в памяти и не такой быстрый, но подходит для хранения исторических сообщений один раз в заданное время. размер достигнут. Redis Stream имеет некоторую поддержку для этого. одинXADDкомандаMAXLENопции. Этот вариант очень прост в использовании.

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

использоватьMAXLENПри достижении указанной длины старые записи автоматически удаляются, чтобы поток имел постоянный размер. В настоящее время нет возможности указать Stream, чтобы он сохранял только заданное количество элементов, поскольку для последовательного выполнения такая команда должна была бы блокироваться в течение длительного времени для удаления элементов. Представьте, например, что если есть всплеск вставки, за которым следует длинная пауза и еще одна вставка, все они имеют одинаковое максимальное время. Поток будет заблокирован, чтобы удалить данные, которые станут слишком старыми во время паузы. Поэтому пользователю необходимо выполнить некоторое планирование и определить максимальную требуемую длину потока. Кроме того, хотя длина Stream пропорциональна используемой памяти, отсечение по времени не так просто контролировать и прогнозировать: оно зависит от скорости вставки, которая является переменной, часто изменяющейся со временем (когда она не изменяется, то это просто по размеру тривиально вносить коррективы).

Однако, используяMAXLENОбрезка обходится дорого: потоки представлены макроузлами в виде дерева счисления, чтобы очень эффективно использовать память. Изменение одного узла макроса, состоящего из десятков элементов, не является оптимальным. Таким образом, команды могут быть предоставлены с использованием следующих специальных форм:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

существуетMAXLENмежду вариантами и реальными технологиями~Параметры означают: мне на самом деле не нужно, чтобы это было ровно 1000 элементов, это может быть 1000, 1010 или 1030, просто убедитесь, что вы сохранили не менее 1000 элементов. С этим параметром обрезка выполняется только тогда, когда мы можем удалить целые узлы. Это делает его более эффективным, что обычно и требуется.

Также доступныXTRIMкоманда, которая выполняется так же, как и вышеMAXLENпараметры очень похожи, но этой команде не нужно ничего добавлять, и ее можно запускать в любом потоке в автономном режиме.

> XTRIM mystream MAXLEN 10

В качестве альтернативы используйтеXADD:

> XTRIM mystream MAXLEN ~ 10

Однако, даже если только реализоватьMAXLEN,XTRIMПредназначен для использования различных стратегий обрезки. Учитывая, что это явная команда, в будущем можно будет разрешить указывать сокращение времени, поскольку пользователь должен знать, что он или она делает, когда вызывает эту команду независимым образом.

XTRIMПолезной стратегией выселения, которая должна была бы быть, может быть возможность удаления по диапазону идентификаторов. В настоящее время это невозможно, но может быть реализовано в будущем, чтобы упростить интеграцию.XRANGEиXTRIMИспользуется вместе для перемещения данных из Redis в другие системы хранения (при необходимости).

Специальные идентификаторы в Streams API

Возможно, вы заметили, что в API Redis доступно несколько специальных идентификаторов. Вот краткий обзор, чтобы он мог быть более значимым в будущем.

Первые два специальных идентификатора-и+,существуетXRANGEКоманда, используемая в запросах диапазона. Два идентификатора представляют наименьший возможный идентификатор (в основном 0-1) и наибольший возможный идентификатор (например, 18446744073709551615-18446744073709551615). Как вы видете,-и+Это яснее писать, чем эти цифры.

Затем есть API, который мы хотим сказать, идентификатор элемента с самым большим идентификатором в потоке. Это代表着什么。因此,如果我只想要使用`XREADGROUP`的新内容,我使用这样的ID来告诉系统我已经拥有所有现有条目,但是没有将要插入的新消息。类似地,当我创建或设置消费者组的ID时,我可以将最后交付的项目设置``, используйте эту группу только для доставки нового контента потребителям.

как вы видете$это не означает+, это две разные вещи,+это максимально возможный идентификатор в каждом возможном потоке, и$это самый большой идентификатор, уже содержащийся в данном потоке. Дополнительные API обычно известны только+или$, потому что полезно избегать загрузки данного символа несколькими значениями.

Другой специальный идентификатор>, только в разрезе групп потребителей и только с использованиемXREADGROUPОн имеет особое значение только при использовании команды. Этот специальный идентификатор означает, что нам нужны только те записи, которые до сих пор не были предоставлены другим потребителям. Так что в основном>это последний идентификатор доставки группы потребителей.

Наконец, специальный идентификатор*, только сXADDвместе, означает, что идентификатор автоматически выбирается для новой записи, которую мы хотим создать.

Таким образом, у нас есть-,+,$,>и*, они имеют разные значения и в большинстве случаев могут использоваться только в разных контекстах.

Постоянство, репликация и безопасность сообщений

Как и другие структуры данных Redis, потоки асинхронно реплицируются на подчиненные устройства и сохраняются в файлах AOF и RDB. Однако, что может быть менее очевидным, так это то, что полное состояние группы потребителей также распространяется на AOF, RDB и ведомые устройства, поэтому, если сообщение в ведущем устройстве не обрабатывается, ведомое устройство также будет иметь ту же информацию. Аналогично, после перезапуска AOF восстановит состояние группы потребителей.

Обратите внимание, однако, что Redis Streams и группы потребителей используют репликацию Redis по умолчанию для сохраняемости и репликации, поэтому:

  • Если в вашем приложении важна сохраняемость сообщений, AOF следует использовать с надежной стратегией синхронизации.
  • Асинхронная репликация не гарантирует репликацию по умолчаниюXADDСостояние группы потребителей изменяется из-за команд: после аварийного переключения что-то может быть потеряно, в зависимости от способности ведомого получать данные от ведущего.
  • WAITКоманда может принудительно распространить эти изменения на ряд книжных серверов. Обратите внимание, однако, что, хотя это снижает вероятность потери данных,SentinelилиRedis ClusterПроцесс аварийного переключения Redis операции выполняет только проверку с максимальной эффективностью для переключения на последний подчиненный сервер, и при некоторых конкретных сбоях подчиненный может потерять некоторые данные.

Поэтому, используяRedis StreamПри разработке приложения с группами потребителей убедитесь, что понимаете семантические свойства, которыми должно обладать ваше приложение во время сбоев, и настройте его соответствующим образом, оценив, достаточно ли оно безопасно для вашего случая.

Удалить один элемент из потока

В потоках также есть специальная команда для удаления элементов из середины потока по идентификатору. В общем, это может показаться странной функцией для структур данных, предназначенных только для добавления, но на самом деле она полезна для приложений, связанных, например, с правилами конфиденциальности. Команда названаXDEL, просто нужно получить имя потока и идентификатор для удаления:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Но в текущей реализации память не высвобождается до тех пор, пока узел макроса не станет полностью пустым, так что злоупотреблять этой возможностью не стоит.

Длина потока

Одно различие между потоками и другими структурами данных Redis заключается в том, что когда в другой структуре данных больше нет элементов, команда удаления элемента также удаляет сам ключ. Например, когдаZREMВызов полностью удалит отсортированный набор, когда будет удален последний элемент в отсортированном наборе. Stream позволяет сохранять нулевые элементы при использованииMAXLENвариант и количество равно нулю (XADDиXTRIMкоманда), или потому что вызовXDEL.

Причина этой асимметрии заключается в том, что потоки могут иметь связанные группы потребителей, и мы не хотим терять состояние, определенное группой потребителей, только потому, что в потоке нет элементов.В настоящее время, даже если нет связанной группы потребителей, это не удалит The Stream, но это может измениться в будущем.

Общая задержка в потреблении сообщений

нетBLOCKопции для неблокирующих команд Stream (например,XRANGEиXREADилиXREADGROUP) обслуживается синхронно, как и любая другая команда Redis, поэтому обсуждать задержку таких команд нецелесообразно: интереснее проверить временную сложность команд в документации Redis. Можно сказать, что при извлечении диапазона потокXADDКоманды выполняются очень быстро, и если вы используете конвейерную обработку, вы можете легко вставлять от 500 000 до 1 миллиона элементов в секунду на обычной машине.

Однако, если мы хотим понять задержку обработки сообщений в контексте потребителя в блокирующей группе потребителей, отXADDЗадержка становится интересным параметром с момента создания сообщения до момента получения сообщения потребителем. так какXREADGROUPвернуть эту информацию.

Как работает блокировка клиентов

Прежде чем предоставлять результаты выполнения тестов, необходимо понять, какую модель Redis использует для маршрутизации Stream-сообщений (и, собственно, как управляются любые блокирующие операции, ожидающие данных).

  • На заблокированных клиентов ссылается хэш-таблица, которая сопоставляет ключ по крайней мере с одним заблокированным потребителем со списком потребителей, ожидающих этот ключ. Таким образом, имея ключ, который получает данные, мы можем разрешить все клиенты, ожидающие этих данных.

  • Когда происходит запись, в данном случае при вызовеXADD命команда, он вызоветsignalKeyAsReady()функция. Эта функция поместит ключи в список ключей, которые необходимо обработать, поскольку эти ключи могут предоставлять новые данные заблокированным потребителям. Обратите внимание, что такие готовые ключи обрабатываются позже, поэтому ключи могут получить дополнительные записи в течение того же цикла цикла событий.

  • Наконец, прежде чем цикл обработки событий завершится, обработайте ключ готовности. Для каждого ключа запустите список клиентов, ожидающих данных, которые будут получать новые данные по мере их поступления, если это применимо. В Stream данные — это сообщение в рамках запроса потребителя.

Как видите, в основном перед возвратом в цикл событий все вызовыXADDКлиент блокируется, ожидая получения сообщения, поэтомуXADDВызывающий должен одновременно получить ответ от Redis, а потребитель получит новое сообщение.

Эта модель основана на push, добавление данных в потребительский буфер будет выполняться напрямую вызовомXADDвыполняются операции, поэтому задержка имеет тенденцию быть предсказуемой.

Задержка результатов теста

Чтобы изучить эту характеристику задержки, мы протестировали несколько экземпляров программы Ruby, используя время компьютера в качестве информации для дополнительных сообщений, и программу Ruby, считывающую сообщения группы потребителей и обрабатывающие их. Этап обработки сообщения включает сравнение текущего времени компьютера с отметкой времени сообщения, чтобы понять общую задержку.

Такие программы не оптимизированы и работают на небольшом двухъядерном экземпляре Redis, пытаясь обеспечить показатели задержки, которые могут возникнуть в неоптимальных условиях. Сообщения создаются со скоростью 10 000 в секунду, при этом 10 потребителей одновременно потребляют и подтверждают сообщения из одного и того же Redis Stream и группы потребителей.

Полученный результат:

Processed between 0 and 1 ms -> 74.11%
Processed between 1 and 2 ms -> 25.80%
Processed between 2 and 3 ms -> 0.06%
Processed between 3 and 4 ms -> 0.01%
Processed between 4 and 5 ms -> 0.02%

99,9% запросов имеют задержку 2 мс или меньше, а выбросы все еще очень близки к среднему значению.

Добавление миллионов неподтвержденных сообщений в Stream не меняет сути теста, большинство запросов по-прежнему обрабатываются с очень низкой задержкой.

Несколько комментариев:

  • Здесь мы обрабатываем до 10 тыс. сообщений за обход, что означаетXREADGROUPизcountПараметр имеет значение 10000. Это увеличивает задержку, но необходимо для медленных потребителей, чтобы не отставать от потока сообщений. Таким образом, вы можете ожидать меньшую задержку в реальном мире.
  • Система, использованная для этого теста (==конфигурация ПК==), очень медленная по сравнению с сегодняшними стандартами.

Заканчивать.





ChangeLog

04.04.2019 Начать перевод 2019-04-07 Завершено

Все вышеизложенное является личными мыслями, если есть какие-либо ошибки, пожалуйста, исправьте их в комментариях.

Добро пожаловать на перепечатку, пожалуйста, подпишите и сохраните исходную ссылку.

Контактный адрес электронной почты: huyanshi2580@gmail.com

Дополнительные заметки об обучении см. в личном блоге ------>Хуян тен