Недавно автор неожиданно выпустил Redis 5.0, добавив много новых функций. Самая большая новая функция Redis 5.0 — это добавление структуры данных Stream, которая представляет собой новую мощную и надежную очередь сообщений, поддерживающую многоадресную рассылку.Автор признает, что Redis Stream в значительной степени заимствовал дизайн Kafka.
Структура Redis Stream показана на рисунке выше.Он имеет связанный список сообщений, который объединяет все добавленные сообщения.Каждое сообщение имеет уникальный идентификатор и соответствующий контент. Сообщение сохраняется, и содержимое остается после перезапуска Redis.
Каждый поток имеет уникальное имя, это ключ Redis, когда мы впервые используемxadd
Создается автоматически, когда инструкция добавляет сообщение.
Каждый поток может быть связан с несколькими группами потребителей, и у каждой группы потребителей будет свой курсор.last_delivered_id
Переместитесь вперед по массиву Stream, чтобы указать, какое сообщение использовала текущая группа потребителей. Каждая группа потребителей имеет уникальное имя внутри потока, группа потребителей не будет создана автоматически, для этого требуется отдельная инструкцияxgroup create
Для создания необходимо указать ID сообщения Stream для начала потребления, этот ID используется для инициализацииlast_delivered_id
Переменная.
Состояние каждой группы потребителей независимо и не зависит друг от друга. Другими словами, сообщения в одном и том же потоке будут потребляться каждой группой потребителей.
Одна и та же группа потребителей (Consumer Group) может быть присоединена к нескольким потребителям (Consumer), эти потребители находятся в конкурентных отношениях, любой потребитель, прочитав сообщение, сделает курсорlast_delivered_id
Двигаться вперед. Каждый потребитель имеет уникальное имя в группе.
Внутри потребителя будет переменная состояния (Consumer)pending_ids
, который записывает сообщения, которые в данный момент читаются клиентом, но еще не подтверждены. Если у клиента нет подтверждения, идентификатор сообщения в этой переменной будет становиться все больше и больше, а как только сообщение будет подтверждено, он начнет уменьшаться. Эта переменная pending_ids официально называется в Redis.PEL
, это,Pending Entries List
, которая является очень важной структурой данных, она используется для гарантии того, что клиент использует сообщение хотя бы один раз и не потеряет его в процессе передачи по сети.
идентификатор сообщения
Идентификатор сообщения имеет видtimestampInMillis-sequence
,Например1527846880572-5
, который представляет отметку времени текущего сообщения в мм1527846880572
, и является 5-м сообщением, сгенерированным за эту миллисекунду. Идентификатор сообщения может быть автоматически сгенерирован сервером или указан самим клиентом, но форма должна быть整数-整数
, а идентификатор сообщения, добавленного позже, должен быть больше, чем идентификатор предыдущего сообщения.
Содержание сообщения
Содержимое сообщения представляет собой пару ключ-значение, которая похожа на пару ключ-значение в хеш-структуре, в которой нет ничего особенного.
CRUD
- xadd добавить сообщение
- xdel удаляет сообщение, удаление здесь только устанавливает бит флага и не влияет на общую длину сообщения
- xrange получает список сообщений и автоматически фильтрует удаленные сообщения
- xlen длина сообщения
- удалить поток
# *号表示服务器自动生成ID,后面顺序跟着一堆key/value
# 名字叫laoqian,年龄30岁
127.0.0.1:6379> xadd codehole * name laoqian age 30
1527849609889-0 # 生成的消息ID
127.0.0.1:6379> xadd codehole * name xiaoyu age 29
1527849629172-0
127.0.0.1:6379> xadd codehole * name xiaoqian age 1
1527849637634-0
127.0.0.1:6379> xlen codehole
(integer) 3
# -表示最小值, +表示最大值
127.0.0.1:6379> xrange codehole - +
127.0.0.1:6379> xrange codehole - +
1) 1) 1527849609889-0
2) 1) "name"
2) "laoqian"
3) "age"
4) "30"
2) 1) 1527849629172-0
2) 1) "name"
2) "xiaoyu"
3) "age"
4) "29"
3) 1) 1527849637634-0
2) 1) "name"
2) "xiaoqian"
3) "age"
4) "1"
# 指定最小消息ID的列表
127.0.0.1:6379> xrange codehole 1527849629172-0 +
1) 1) 1527849629172-0
2) 1) "name"
2) "xiaoyu"
3) "age"
4) "29"
2) 1) 1527849637634-0
2) 1) "name"
2) "xiaoqian"
3) "age"
4) "1"
# 指定最大消息ID的列表
127.0.0.1:6379> xrange codehole - 1527849629172-0
1) 1) 1527849609889-0
2) 1) "name"
2) "laoqian"
3) "age"
4) "30"
2) 1) 1527849629172-0
2) 1) "name"
2) "xiaoyu"
3) "age"
4) "29"
127.0.0.1:6379> xdel codehole 1527849609889-0
(integer) 1
# 长度不受影响
127.0.0.1:6379> xlen codehole
(integer) 3
# 被删除的消息没了
127.0.0.1:6379> xrange codehole - +
1) 1) 1527849629172-0
2) 1) "name"
2) "xiaoyu"
3) "age"
4) "29"
2) 1) 1527849637634-0
2) 1) "name"
2) "xiaoqian"
3) "age"
4) "1"
# 删除整个Stream
127.0.0.1:6379> del codehole
(integer) 1
самостоятельное потребление
Мы можем самостоятельно потреблять сообщения Stream, не определяя группу потребления, а когда новых сообщений в Stream нет, мы можем даже блокировать и ждать. Redis разработал отдельную инструкцию по потреблениюxread
, Поток можно использовать как обычную очередь сообщений (список). При использовании xread мы можем полностью игнорировать существование Consumer Group, точно так же, как Stream — это обычный список.
# 从Stream头部读取两条消息
127.0.0.1:6379> xread count 2 streams codehole 0-0
1) 1) "codehole"
2) 1) 1) 1527851486781-0
2) 1) "name"
2) "laoqian"
3) "age"
4) "30"
2) 1) 1527851493405-0
2) 1) "name"
2) "yurui"
3) "age"
4) "29"
# 从Stream尾部读取一条消息,毫无疑问,这里不会返回任何消息
127.0.0.1:6379> xread count 1 streams codehole $
(nil)
# 从尾部阻塞等待新消息到来,下面的指令会堵住,直到新消息到来
127.0.0.1:6379> xread block 0 count 1 streams codehole $
# 我们从新打开一个窗口,在这个窗口往Stream里塞消息
127.0.0.1:6379> xadd codehole * name youming age 60
1527852774092-0
# 再切换到前面的窗口,我们可以看到阻塞解除了,返回了新的消息内容
# 而且还显示了一个等待时间,这里我们等待了93s
127.0.0.1:6379> xread block 0 count 1 streams codehole $
1) 1) "codehole"
2) 1) 1) 1527852774092-0
2) 1) "name"
2) "youming"
3) "age"
4) "60"
(93.11s)
Если клиент хочет использовать xread для последовательного потребления, он должен помнить, где находится текущее потребление, то есть возвращаемый идентификатор сообщения. В следующий раз, когда вы продолжите вызывать xread, передайте идентификатор последнего сообщения, возвращенный в прошлый раз, в качестве параметра, и вы сможете продолжать использовать последующие сообщения.
блок 0 означает блокировку навсегда, пока не придет сообщение, блок 1000 означает блокировку на 1 с, если сообщение не приходит в течение 1 с, вернуть nil
127.0.0.1:6379> xread block 1000 count 1 streams codehole $
(nil)
(1.07s)
Создайте группу потребителей
Поток черезxgroup create
Инструкция создает группу потребителей, и для инициализации необходимо передать параметр идентификатора стартового сообщения.last_delivered_id
Переменная.
# 表示从头开始消费
127.0.0.1:6379> xgroup create codehole cg1 0-0
OK
# $表示从尾部开始消费,只接受新消息,当前Stream消息会全部忽略
127.0.0.1:6379> xgroup create codehole cg2 $
OK
# 获取Stream信息
127.0.0.1:6379> xinfo codehole
1) length
2) (integer) 3 # 共3个消息
3) radix-tree-keys
4) (integer) 1
5) radix-tree-nodes
6) (integer) 2
7) groups
8) (integer) 2 # 两个消费组
9) first-entry # 第一个消息
10) 1) 1527851486781-0
2) 1) "name"
2) "laoqian"
3) "age"
4) "30"
11) last-entry # 最后一个消息
12) 1) 1527851498956-0
2) 1) "name"
2) "xiaoqian"
3) "age"
4) "1"
# 获取Stream的消费组信息
127.0.0.1:6379> xinfo groups codehole
1) 1) name
2) "cg1"
3) consumers
4) (integer) 0 # 该消费组还没有消费者
5) pending
6) (integer) 0 # 该消费组没有正在处理的消息
2) 1) name
2) "cg2"
3) consumers # 该消费组还没有消费者
4) (integer) 0
5) pending
6) (integer) 0 # 该消费组没有正在处理的消息
Потребление
Stream предоставляет инструкцию xreadgroup для использования в группе потребителей. Вам необходимо указать имя группы потребителей, имя потребителя и идентификатор начального сообщения. Как и xread, он также может блокировать ожидание новых сообщений. После прочтения нового сообщения соответствующий идентификатор сообщения войдет в структуру PEL (сообщение обрабатывается) потребителя.После того, как клиент завершит обработку, он использует команду xack, чтобы уведомить сервер о том, что сообщение было обработано, и идентификатор сообщения будет изменено с PEL. удалено в.
# >号表示从当前消费组的last_delivered_id后面开始读
# 每当消费者读取一条消息,last_delivered_id变量就会前进
127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole >
1) 1) "codehole"
2) 1) 1) 1527851486781-0
2) 1) "name"
2) "laoqian"
3) "age"
4) "30"
127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole >
1) 1) "codehole"
2) 1) 1) 1527851493405-0
2) 1) "name"
2) "yurui"
3) "age"
4) "29"
127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 2 streams codehole >
1) 1) "codehole"
2) 1) 1) 1527851498956-0
2) 1) "name"
2) "xiaoqian"
3) "age"
4) "1"
2) 1) 1527852774092-0
2) 1) "name"
2) "youming"
3) "age"
4) "60"
# 再继续读取,就没有新消息了
127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole >
(nil)
# 那就阻塞等待吧
127.0.0.1:6379> xreadgroup GROUP cg1 c1 block 0 count 1 streams codehole >
# 开启另一个窗口,往里塞消息
127.0.0.1:6379> xadd codehole * name lanying age 61
1527854062442-0
# 回到前一个窗口,发现阻塞解除,收到新消息了
127.0.0.1:6379> xreadgroup GROUP cg1 c1 block 0 count 1 streams codehole >
1) 1) "codehole"
2) 1) 1) 1527854062442-0
2) 1) "name"
2) "lanying"
3) "age"
4) "61"
(36.54s)
# 观察消费组信息
127.0.0.1:6379> xinfo groups codehole
1) 1) name
2) "cg1"
3) consumers
4) (integer) 1 # 一个消费者
5) pending
6) (integer) 5 # 共5条正在处理的信息还有没有ack
2) 1) name
2) "cg2"
3) consumers
4) (integer) 0 # 消费组cg2没有任何变化,因为前面我们一直在操纵cg1
5) pending
6) (integer) 0
# 如果同一个消费组有多个消费者,我们可以通过xinfo consumers指令观察每个消费者的状态
127.0.0.1:6379> xinfo consumers codehole cg1 # 目前还有1个消费者
1) 1) name
2) "c1"
3) pending
4) (integer) 5 # 共5条待处理消息
5) idle
6) (integer) 418715 # 空闲了多长时间ms没有读取消息了
# 接下来我们ack一条消息
127.0.0.1:6379> xack codehole cg1 1527851486781-0
(integer) 1
127.0.0.1:6379> xinfo consumers codehole cg1
1) 1) name
2) "c1"
3) pending
4) (integer) 4 # 变成了5条
5) idle
6) (integer) 668504
# 下面ack所有消息
127.0.0.1:6379> xack codehole cg1 1527851493405-0 1527851498956-0 1527852774092-0 1527854062442-0
(integer) 4
127.0.0.1:6379> xinfo consumers codehole cg1
1) 1) name
2) "c1"
3) pending
4) (integer) 0 # pel空了
5) idle
6) (integer) 745505
Что делать, если потоковых сообщений слишком много?
Читателям легко подумать, что если накопится слишком много сообщений, связанный список Stream будет очень длинным, а не взорвется ли контент — это проблема. Команда xdel не удаляет сообщение, она просто устанавливает для сообщения флаг.
Redis, естественно, учитывает это, поэтому предоставляет функцию Stream фиксированной длины. Укажите фиксированную длину maxlen в инструкции xadd, вы можете уничтожить старое сообщение, чтобы оно не превышало максимально указанную длину.
127.0.0.1:6379> xlen codehole
(integer) 5
127.0.0.1:6379> xadd codehole maxlen 3 * name xiaorui age 1
1527855160273-0
127.0.0.1:6379> xlen codehole
(integer) 3
Мы видим, что длина Stream обрубается.
Что произойдет, если сообщение забудет ACK
Поток сохраняет список PEL идентификаторов сообщений, обрабатываемых в каждой структуре потребителя. Если потребитель получает сообщение и завершает обработку, но не отвечает ack, список PEL будет продолжать расти. Если групп потребителей много, то этот PEL Занятый память увеличится.
Как PEL предотвращает потерю сообщений
Когда клиент-потребитель читает сообщение Stream, а сервер Redis отвечает на сообщение клиенту, клиент внезапно отключается, и сообщение теряется. Но идентификатор отправленного сообщения уже хранится в PEL. После повторного подключения клиент может снова получить список идентификаторов сообщений в PEL. Однако в настоящее время идентификатор начального сообщения xreadgroup не может быть параметр>, а должен быть любым допустимым идентификатором сообщения.Как правило, параметр устанавливается в значение 0-0, что означает чтение всех сообщений PEL и самостоятельную проверку.last_delivered_id
Новые новости позже.
в заключении
Модель потребления Stream опирается на концепцию группировки потребления Kafka, которая компенсирует дефект, заключающийся в том, что Redis Pub/Sub не может сохранять сообщения. Но это отличается от kafka, сообщения Kafka можно разделить на партиции, а Streams — нет. Если вам нужно разделить раздел, вы должны сделать это на стороне клиента, указать разные имена потоков и выполнить хеширование по модулю сообщения, чтобы выбрать, какой поток вставить. Если читатель изучал Disque, еще один open source проект автора Redis, то, скорее всего, автор понял, что проект Disque недостаточно активен, поэтому пересадил содержимое Disque в Redis. Это только мое предположение, не обязательно изначальное намерение автора. Если у читателей есть другие идеи, они могут вместе принять участие в обсуждении в области комментариев.
Читайте более продвинутые статьи, обратите внимание на паблик-аккаунт "Code Cave"