Потоки, новая функция Redis5, используются в качестве очереди сообщений.

Redis

предисловие

Среди новых функций Redis 5 введение структуры данных Streams можно назвать самой большой функцией в этой итерации. Это делает Redis лучшей и более мощной встроенной поддержкой при использовании в качестве очереди сообщений в этой итерации версии 5.x, особенно постоянной очереди сообщений. В то же время поток опирается на концепцию и дизайн модели группы потребления Kafka, чтобы сделать обработку сообщений о потреблении более эффективной и быстрой. В этой статье анализируются часто используемые API в структуре данных Streams.

Подготовить

В этой статье используется версия Redis 5.0.5. Если вы используете более раннюю версию 5.x, некоторые эффекты использования API немного отличаются от описанных в этой статье.

добавить сообщение

Потоки добавляют использование данныхXADDДобавляются инструкции, и данные в сообщении обрабатываются в виде пар ключ-значение K-V. Сообщение может иметь несколько пар ключ-значение, добавьте формат команды:

XADD key ID field string [field string ...]

Ключ — это имя потоков, ID — уникальный идентификатор сообщения, который нельзя повторить, а строка поля — это пара ключ-значение. Далее мы добавим поток с именем person и будем управлять им.

XADD person * name ytao des https://ytao.top

В добавленном выше случае идентификатор копируется с *, что означает, что сервер автоматически генерирует идентификатор и возвращает данные после добавления"1578238486193-0"

Формат автоматически сгенерированного идентификатора здесь<millisecondsTime>-<sequenceNumber>Идентификатор состоит из двух частей:

  1. millisecondsTime — текущая метка времени сервера в миллисекундах.
  2. sequenceNumber Текущий порядковый номер. Значение определяется порядком, в котором сообщения генерируются в течение текущей миллисекунды. По умолчанию оно начинается с 0 и увеличивается на 1.

Например: 1578238486193-3 представляет 4-е сообщение, добавленное с отметкой времени 1578238486193 миллисекунд.

Помимо автоматической генерации Id сервером, также поддерживается генерация указанного Id, но указанный Id имеет следующие условия:

  1. Начальная и конечная части идентификатора должны быть числами.
  2. Минимальный Id 0-1, не может быть 0-0, но допускаются 2-0, 3-0.....
  3. Добавлено сообщение, первая половина идентификатора не может быть меньше наибольшего значения существующего идентификатора, а вторая половина идентификатора не может быть меньше наибольшей второй половины той же первой половины.

В противном случае, когда вышеуказанное условие не выполнено, он будет бросать исключение после его добавления:

(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Фактически при добавлении сообщения выполняются две операции. Первый шаг, сначала решите, что если потоков нет, создайте имя потоков, добавьте сообщение в потоки. Даже если сообщение добавлено, так как идентификатор ненормальный, в Redis также есть имя текущих потоков. Идентификатор в Streams также можно использовать в качестве указателя, поскольку это упорядоченный тег.

В продакшене, если использовать добавление сообщений таким способом, возникнет проблема, то есть при слишком большом количестве сообщений сервис будет недоступен. Здесь дизайн Streams также изначально учитывал эту проблему, то есть можно было указать мощность Streams. Если емкость манипулирует этим заданным значением, старые сообщения будут заменены местами. При добавлении сообщения установитеMAXLENпараметр.

XADD person MAXLEN 5 * name ytao des https://ytao.top

Это указывает емкость 5 сообщений в этом потоке. также можно использоватьXTRIMПерехватывайте сообщения и удаляйте лишние сообщения от маленьких до больших:

XTRIM person MAXLEN 8

количество сообщений

Просмотр количества использованных сообщенийXLENинструкция по эксплуатации.

XLEN key

Пример: просмотр количества сообщений в потоке пользователей:

> XLEN person
(integer) 5

сообщение запроса

Запрос сообщений в Streams используетXRANGEиXREVRANGEинструкция.

XRANGE

При запросе данных вы можете запросить в соответствии с указанным диапазоном идентификаторов.Формат команды запроса XRANGE:

XRANGE key start end [COUNT count]

Описание параметра:

  • ключ - это имя потоков
  • start — это начальный идентификатор запроса диапазона, включая этот идентификатор.
  • start — это конечный идентификатор запроса диапазона, включая этот идентификатор.
  • Количество — это максимальное количество сообщений, возвращаемых запросом, не обязательно.

Начни здесь и закончи там-и+Два неуказанных значения, они представляют бесконечно малую и бесконечную соответственно, поэтому при использовании этих двух значений будут запрашиваться все сообщения.

> XRANGE person - +
1) 1) "0-1"
   2) 1) "name"
      2) "ytao"
      3) "des"
      4) "https://ytao.top"
2) 1) "0-2"
   2) 1) "name"
      2) "luffy"
      3) "des"
      4) "valiant!"
3) 1) "2-0"
   2) 1) "name"
      2) "gaga"
      3) "des"
      4) "fishion!"

Можно видеть, что данные сообщения, запрошенные выше, запрашиваются в порядке «первым пришел — первым вышел».

Используйте COUNT, чтобы указать число, возвращаемое запросом:

# 查询所有的消息,并且返回一条数据
> XRANGE person - + COUNT 1
1) 1) "0-1"
   2) 1) "name"
      2) "ytao"
      3) "des"
      4) "https://ytao.top"

В запросе диапазона вторую половину идентификатора можно опустить, и будут запрошены все данные второй половины.

XREVRANGE

XREVRANGEзапрос иXRANGEИспользование в директиве похоже, но порядок начального и конечного параметров запроса обратный:

XREVRANGE key end start [COUNT count]

Случаи применения:

> XREVRANGE person +  -
1) 1) "2-0"
   2) 1) "name"
      2) "gaga"
      3) "des"
      4) "fishion!"
2) 1) "0-2"
   2) 1) "name"
      2) "luffy"
      3) "des"
      4) "valiant!"
3) 1) "0-1"
   2) 1) "name"
      2) "ytao"
      3) "des"
      4) "https://ytao.top"

Результат запроса такой же, какXRANGEПорядок результатов прямо противоположный, в остальном то же самое, эти две инструкции могут возвращать сообщения в порядке возрастания и убывания.

удаленное сообщение

удалить сообщение с помощьюXDELИнструкция операции, просто укажите имя и идентификатор потоков, которые нужно удалить, и поддерживает удаление нескольких сообщений за раз.

XDEL key ID [ID ...]

Удалить случай:

# 查询所有消息
> XRANGE person - +
1) 1) "0-1"
   2) 1) "name"
      2) "ytao"
      3) "des"
      4) "https://ytao.top"
2) 1) "0-2"
   2) 1) "name"
      2) "luffy"
      3) "des"
      4) "valiant!"
3) 1) "2-0"
   2) 1) "name"
      2) "gaga"
      3) "des"
      4) "fishion!"
# 删除消息      
> XDEL person 2-0
(integer) 1
# 再次查询删除后的所有消息
> XRANGE person - +
1) 1) "0-1"
   2) 1) "name"
      2) "ytao"
      3) "des"
      4) "https://ytao.top"
2) 1) "0-2"
   2) 1) "name"
      2) "luffy"
      3) "des"
      4) "valiant!"
# 查询删除后的长度      
> XLEN person
(integer) 2            

Как видно из вышеприведенного, при удалении сообщения длина также уменьшается на соответствующую величину.

Использование сообщений

В PUB/SUB Redis мы потребляем сообщения по подписке. В структуре данных Streams мы также можем реализовать ту же функцию. Когда нет нового сообщения, мы можем заблокировать и ждать. Поддерживает не только индивидуальное потребление, но и групповое потребление.

только потребление

Использовать отдельноXREADинструкция. Как видите, в следующей команде требуются ПОТОКИ, ключ и идентификатор. ID указывает, что будут прочитаны сообщения большего размера, чем этот ID. Когда используется значение идентификатора$При задании указывает максимальное значение идентификатора существующего сообщения.

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

вышеCOUNTПараметр используется для указания максимального количества чтений сXRANGEиспользование такое же.

> XREAD COUNT 1 STREAMS person 0
1) 1) "person"
   2) 1) 1) "0-1"
         2) 1) "name"
            2) "ytao"
            3) "des"
            4) "https://ytao.top"

> XREAD COUNT 2 STREAMS person 0
1) 1) "person"
   2) 1) 1) "0-1"
         2) 1) "name"
            2) "ytao"
            3) "des"
            4) "https://ytao.top"
      2) 1) "0-2"
         2) 1) "name"
            2) "luffy"
            3) "des"
            4) "valiant!"

существуетXREADесть еще одинBLOCKпараметр, используется для блокировки сообщений о подписке,BLOCKПереносимым параметром является время блокировки в миллисекундах.Если в течение этого времени не будет потребляться новое сообщение, блокировка будет снята. Когда время здесь указано как 0, оно будет блокироваться до тех пор, пока не появится новое сообщение для потребления.

# 窗口 1 开启阻塞,等待新消息的到来
> XREAD BLOCK 0 STREAMS person $

# 另开一个连接窗口 2,添加一条新的消息
> XADD person 2-2 name tao des coder
"2-2"

# 窗口 1,获取到有新的消息来消费,并且带有阻塞的时间
> XREAD BLOCK 0 STREAMS person $
1) 1) "person"
   2) 1) 1) "2-2"
         2) 1) "name"
            2) "tao"
            3) "des"
            4) "coder"
(60.81s)

когда используешьXREADПри последовательном потреблении необходимо дополнительно записать идентификатор места чтения, чтобы в следующий раз продолжить его потребление.

групповое потребление

Основной целью группового потребления является распространение сообщений между разными клиентами для обработки и обработки сообщений с более высокой скоростью. Чтобы удовлетворить это требование функции печени, нам нужно сделать три вещи:Создать группу,Группы читать сообщения,Подтверждение сообщения серверу для обработки.

групповое действие

Использование группы операцийXGROUPинструкция:

XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]

В приведенной выше команде операции включают:

  • CREATE создает группу потребителей.
  • SETID Изменяет идентификатор следующего обрабатываемого сообщения.
  • DESTROY уничтожает группу потребителей.
  • DELCONSUMER удаляет указанного потребителя в группе потребителей.

Что нам в настоящее время необходимо использовать, - это создать потребительскую группу:

# 以当前存在的最大 Id 作为消费起始 
> XGROUP CREATE person group1 $
OK

группа прочитала сообщение

использование группового чтенияXREADGROUPинструкция,COUNTиBLOCKиспользовать аналогичноXREADОперация - это просто обозначение большего количества групп и потребителей:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

Так как групповое потребление аналогично индивидуальному потреблению, здесь выполняется только блокирующий анализ, и Id здесь тоже имеет особое значение>Означает, что сообщения не были потреблены:

# 窗口 1,消费群组中,taotao 消费者建立阻塞监听
XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person >

# 窗口 2,消费群组中,yangyang 消费者建立阻塞监听 
XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person >

# 窗口 3,添加消费消息
> XADD person 3-1 name tony des 666
"3-1"

# 窗口 1,读取到新消息,此时 窗口 2 没有任何反应
> XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person >
1) 1) "person"
   2) 1) 1) "3-1"
         2) 1) "name"
            2) "tony"
            3) "des"
            4) "666"
(77.54s)

# 窗口 3,再次添加消费消息
> XADD person 3-2 name james des abc!
"3-2"

# 窗口 2,读取到新消息,此时 窗口 1 没有任何反应
> XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person >
1) 1) "person"
   2) 1) 1) "3-2"
         2) 1) "name"
            2) "james"
            3) "des"
            4) "abc!"
(76.36s)

В приведенном выше процессе выполнения есть два потребителя в группе group1.Когда добавляются два сообщения, два потребителя потребляют по очереди.

сообщение ACK

После потребления сообщения, чтобы избежать повторного использования, необходимо отправить ACK на сервер, чтобы гарантировать, что сообщение будет использовано. Например, в следующей ситуации мы использовали последние два сообщения выше, но когда мы снова читаем сообщение, оно все еще прочитано:

>  XREADGROUP GROUP group1 yangyang STREAMS person 0
1) 1) "person"
   2) 1) 1) "3-2"
         2) 1) "name"
            2) "james"
            3) "des"
            4) "abc!"

В это время мы используемXACKДиректива сообщает серверу, что мы обработали сообщение:

XACK key group ID [ID ...]0

Пусть сервер обрабатывает флаг 3-2:

> XACK person group1 3-2
(integer) 1

Получите сообщение о прочтении группы снова:

>  XREADGROUP GROUP group1 yangyang STREAMS person 0
1) 1) "person"
   2) (empty list or set)

В очереди больше нет читаемых сообщений. В дополнение к API, описанным выше, для просмотра информации о группе потребителей вы можете использоватьXINFOПросмотр инструкции, в этой статье анализ не делается.

Суммировать

Из анализа общих API-интерфейсов Streams мы можем почувствовать, что Redis становится все более и более мощным на пути поддержки очередей сообщений. Если вы использовали его функцию PUB/SUB, вы почувствуете, что итерация 5.x оптимизировала некоторые ваши болевые точки.


личный блог: ytao.top

Обратите внимание на паблик [ytao], больше оригинальных хороших статей

我的公众号