структура данных канала
Redis сохраняет все отношения подписки в состоянии сервераpubsub_channels
В словарной статье:
struct redisServer {
...
dict *pubsub_channels; /* Map channels to list of subscribed clients */
...
}
словарной статьиkey
это название канала, на который вы подписаны, словарная статьяvalue
Это связанный список, в котором хранятся все клиенты, подписавшиеся на этот канал:
Подписаться на канал
SUBSCRIBE channel [channel ...]
когда клиент выполняетSUBSCRIBE
команда, при подписке на один или несколько каналов сервер подключит клиента к подписанному каналуpubsub_channels
ассоциации в словаре.pubsub.c
:
void subscribeCommand(client *c) {
int j;
for (j = 1; j < c->argc; j++)
pubsubSubscribeChannel(c,c->argv[j]);
c->flags |= CLIENT_PUBSUB;
}
В зависимости от того, существует ли канал вpubsub_channels
В словаре операция ассоциации делится на два случая:
- Если канал не существует, значит, у канала нет подписчиков.
pubsub_channels
Канал создается в словаре, имя ключа — это имя канала, а значение этого ключа устанавливается в пустой связанный список, а затем клиент добавляется в связанный список. - Если канал существует, значит, у канала есть другие подписчики, то он находится в
pubsub_channels
В словаре должен быть соответствующий список подписчиков, и единственное, что должна сделать программа, это добавить клиента в конец списка подписчиков.
/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
* 0 if the client was already subscribed to that channel. */
int pubsubSubscribeChannel(client *c, robj *channel) {
...
/* Add the channel to the client -> channels hash table */
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
/* Add the client to the channel -> list of clients hash table */
de = dictFind(server.pubsub_channels,channel);
if (de == NULL) {
clients = listCreate();
dictAdd(server.pubsub_channels,channel,clients);
incrRefCount(channel);
} else {
clients = dictGetVal(de);
}
listAddNodeTail(clients,c);
}
...
}
Из кода видно, что канал добавляется в состояние клиента до выполнения операции ассоциации.pubsub_channels
словарь, зарезервированный для использования, когда клиент находится в автономном режиме. Ключ словаря — это имя канала, а значение ключа равно null.
// 将频道添加到客户端状态的pubsub_channels字典中,键为channel,键的值为NULL
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {}
Если вы сделаете следующее:
client-9527> subscribe qiuxiang
Когда на канале появляется новое сообщение, Redis просматривает список клиентов канала и по очереди отправляет сообщение каждому клиенту.
отписаться
UNSUBSCRIBE channel [channel ...]
Отписаться, значит удалить клиента изpubsub_channels
Удалено из словаря:
client-9527> unsubscribe qiuxiang
int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {
...
// 找到要取消订阅频道对应的字典项
de = dictFind(server.pubsub_channels,channel);
...
// 订阅该频道的所有客户端列表
clients = dictGetVal(de);
// 找到要取消订阅的客户端
ln = listSearchKey(clients,c);
...
// 将该客户端从列表中移除
listDelNode(clients,ln);
// 如果列表项未空,表示没有客户端订阅该频道了,将该频道从pubsub_channels字典中移除
if (listLength(clients) == 0) {
/* Free the list and associated hash entry at all if this was
* the latest client, so that it will be possible to abuse
* Redis PUBSUB creating millions of channels. */
dictDelete(server.pubsub_channels,channel);
}
...
}
Клиент не в сети
Когда клиент находится в автономном режиме, Redis выполнитfreeClient
функция,
void freeClient(client *c) {
...
// 取消所有频道的订阅
pubsubUnsubscribeAllChannels(c,0);
...
}
int pubsubUnsubscribeAllChannels(client *c, int notify) {
// 获取客户端已订阅频道的字典迭代器
dictIterator *di = dictGetSafeIterator(c->pubsub_channels);
...
// 迭代字典
while((de = dictNext(di)) != NULL) {
robj *channel = dictGetKey(de);
// 依次取消订阅
count += pubsubUnsubscribeChannel(c,channel,notify);
}
}
Если клиент «Клиент 1» не в сети:
После того, как «Клиент 1» отключится:
постскриптум
Редисpub/sub
функция, в режиме реального времени можно получать только сообщения канала с подпиской.Когда клиент находится в автономном режиме, сообщения автономного канала не будут сохранены, что отличается от службы MQ.