Принцип реализации Redis pub/sub

Redis

структура данных канала

Redis сохраняет все отношения подписки в состоянии сервераpubsub_channelsВ словарной статье:

struct redisServer {
    ...
    dict *pubsub_channels;  /* Map channels to list of subscribed clients */
    ...
}

словарной статьиkeyэто название канала, на который вы подписаны, словарная статьяvalueЭто связанный список, в котором хранятся все клиенты, подписавшиеся на этот канал:pub/sub字典示例

Подписаться на канал

SUBSCRIBE channel [channel ...]когда клиент выполняетSUBSCRIBEкоманда, при подписке на один или несколько каналов сервер подключит клиента к подписанному каналуpubsub_channelsассоциации в словаре.pubsub.c:

void subscribeCommand(client *c) {
    int j;

    for (j = 1; j < c->argc; j++)
        pubsubSubscribeChannel(c,c->argv[j]);
    c->flags |= CLIENT_PUBSUB;
}

В зависимости от того, существует ли канал вpubsub_channelsВ словаре операция ассоциации делится на два случая:

  • Если канал не существует, значит, у канала нет подписчиков.pubsub_channelsКанал создается в словаре, имя ключа — это имя канала, а значение этого ключа устанавливается в пустой связанный список, а затем клиент добавляется в связанный список.
  • Если канал существует, значит, у канала есть другие подписчики, то он находится вpubsub_channelsВ словаре должен быть соответствующий список подписчиков, и единственное, что должна сделать программа, это добавить клиента в конец списка подписчиков.
/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
 * 0 if the client was already subscribed to that channel. */
int pubsubSubscribeChannel(client *c, robj *channel) {
    ...
    /* Add the channel to the client -> channels hash table */
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
        /* Add the client to the channel -> list of clients hash table */
        de = dictFind(server.pubsub_channels,channel);
        if (de == NULL) {
            clients = listCreate();
            dictAdd(server.pubsub_channels,channel,clients);
            incrRefCount(channel);
        } else {
            clients = dictGetVal(de);
        }
        listAddNodeTail(clients,c);
    }
    ...
}

Из кода видно, что канал добавляется в состояние клиента до выполнения операции ассоциации.pubsub_channelsсловарь, зарезервированный для использования, когда клиент находится в автономном режиме. Ключ словаря — это имя канала, а значение ключа равно null.

// 将频道添加到客户端状态的pubsub_channels字典中,键为channel,键的值为NULL
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {}

Если вы сделаете следующее:

client-9527> subscribe qiuxiang 

Когда на канале появляется новое сообщение, Redis просматривает список клиентов канала и по очереди отправляет сообщение каждому клиенту.

отписаться

UNSUBSCRIBE channel [channel ...]Отписаться, значит удалить клиента изpubsub_channelsУдалено из словаря:

client-9527> unsubscribe qiuxiang 

pub/sub字典示例

int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {
   ...
        // 找到要取消订阅频道对应的字典项
        de = dictFind(server.pubsub_channels,channel);
        ...
        // 订阅该频道的所有客户端列表
        clients = dictGetVal(de);
        // 找到要取消订阅的客户端
        ln = listSearchKey(clients,c);
        ...
        // 将该客户端从列表中移除
        listDelNode(clients,ln);
        // 如果列表项未空,表示没有客户端订阅该频道了,将该频道从pubsub_channels字典中移除
        if (listLength(clients) == 0) {
            /* Free the list and associated hash entry at all if this was
             * the latest client, so that it will be possible to abuse
             * Redis PUBSUB creating millions of channels. */
            dictDelete(server.pubsub_channels,channel);
        }
    ...
}

Клиент не в сети

Когда клиент находится в автономном режиме, Redis выполнитfreeClientфункция,

void freeClient(client *c) {
    ...
    // 取消所有频道的订阅
    pubsubUnsubscribeAllChannels(c,0);
    ...
}
int pubsubUnsubscribeAllChannels(client *c, int notify) {
    // 获取客户端已订阅频道的字典迭代器
    dictIterator *di = dictGetSafeIterator(c->pubsub_channels);
    ...
    // 迭代字典
    while((de = dictNext(di)) != NULL) {
        robj *channel = dictGetKey(de);

        // 依次取消订阅
        count += pubsubUnsubscribeChannel(c,channel,notify);
    }
}

Если клиент «Клиент 1» не в сети:Client 1将离线

После того, как «Клиент 1» отключится:

постскриптум

Редисpub/subфункция, в режиме реального времени можно получать только сообщения канала с подпиской.Когда клиент находится в автономном режиме, сообщения автономного канала не будут сохранены, что отличается от службы MQ.