SortedSet для анализа исходного кода Redis

Redis исходный код

Сортированный набор SortedSet — очень характерная структура данных в Redis.Эта статья обобщает этот пункт знаний.

Оригинальный адрес:woo woo Краткое описание.com/afraid/75 руб 5 ач 359…

1. Введение в команду SortedSet

Отсортированные наборы в Redis позволяют пользователям сортировать элементы, которые они помещают, используя указанные значения, и предоставляют богатый набор API-интерфейсов для управления наборами на основе отсортированных наборов.
Пример выглядит следующим образом:

//添加元素,table1为有序集的名字,100为用于排序字段(redis把它叫做score),a为我们要存储的元素
127.0.0.1:6379> zadd table1 100 a
(integer) 1
127.0.0.1:6379> zadd table1 200 b
(integer) 1
127.0.0.1:6379> zadd table1 300 c
(integer) 1
//按照元素索引返回有序集中的元素,索引从0开始
127.0.0.1:6379> zrange table1 0 1
1) "a"
2) "b"
//按照元素排序范围返回有序集中的元素,这里用于排序的字段在redis中叫做score
127.0.0.1:6379> zrangebyscore table1 150 400
1) "b"
2) "c"
//删除元素
127.0.0.1:6379> zrem table1 b
(integer) 1

В отсортированном наборе значение, используемое для сортировки, называется оценкой, а фактически сохраненное значение называется членом.

Поскольку в упорядоченном наборе представлено много API, здесь перечислены лишь некоторые из них.Подробности см. в документации по Redis.

Очень распространенный вариант использования заказанных наборов — отзывы пользователей. Опубликуйте сообщение в приложении или на веб-сайте, и ниже будет много комментариев.Обычно отображение устроено в порядке, обратном времени выпуска.Для этого требования можно использовать упорядоченный набор, и используется метка времени опубликованного комментария в качестве оценки, а затем количество отображаемых комментариев ищется в обратном порядке.

Во-вторых, анализ исходного кода команды упорядоченного набора SortedSet.

По старому правилу мы по-прежнему находим функции обработки связанных команд из таблицы команд в файле server.c, а затем анализируем их одну за другой.
Все еще начиная с добавления элементов, функция zaddCommand:

void zaddCommand(client *c) {
    zaddGenericCommand(c,ZADD_NONE);
}

Здесь вы можете видеть, что поток превращается в zaddGenericCommand и передается тег шаблона.
Здесь кратко объясняется режим работы SortedSet.Давайте сначала рассмотрим полную команду zadd:

zadd key [NX|XX] [CH] [INCR] score member [score member ...]

Рассмотрим варианты по очереди:

  1. NX означает, что если элемент существует, выполняется возврат напрямую без выполнения операции замены.
  2. XX означает работать только с существующими элементами.
  3. CH означает возврат количества измененных (включая добавленные, обновленные) элементов и может использоваться только командой ZADD.
  4. INCR означает добавление новой оценки к исходной, а не ее замену.

ZADD_NONE в приведенном выше фрагменте кода означает нормальную работу.

Далее взгляните на исходный код функции zaddGenericCommand, который очень длинный, так что наберитесь терпения и посмотрите на него немного:

void zaddGenericCommand(client *c, int flags) {
    //一条错误提示信息
    static char *nanerr = "resulting score is not a number (NaN)";
    //有序集名字
    robj *key = c->argv[1];
    robj *zobj;
    sds ele;
    double score = 0, *scores = NULL;
    int j, elements;
    int scoreidx = 0;
    //记录元素操作个数
    int added = 0;     
    int updated = 0;    
    int processed = 0;  

    //查找score的位置,默认score在位置2上,但由于有各种模式,所以需要判断
    scoreidx = 2;
    while(scoreidx < c->argc) {
        char *opt = c->argv[scoreidx]->ptr;
        //判断命令中是否设置了各种模式
        if (!strcasecmp(opt,"nx")) flags |= ZADD_NX;
        else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX;
        else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH;
        else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR;
        else break;
        scoreidx++;
    }

    //设置模式
    int incr = (flags & ZADD_INCR) != 0;
    int nx = (flags & ZADD_NX) != 0;
    int xx = (flags & ZADD_XX) != 0;
    int ch = (flags & ZADD_CH) != 0;

    //通过上面的解析,scoreidx为真实的初始score的索引位置
    //这里客户端参数数量减去scoreidx就是剩余所有元素的数量
    elements = c->argc - scoreidx;
    //由于有序集中score,member成对出现,所以加一层判断
    if (elements % 2 || !elements) {
        addReply(c,shared.syntaxerr);
        return;
    }
    //这里计算score,member有多少对
    elements /= 2; 

    //参数合法性校验
    if (nx && xx) {
        addReplyError(c,
            "XX and NX options at the same time are not compatible");
        return;
    }
    //参数合法性校验
    if (incr && elements > 1) {
        addReplyError(c,
            "INCR option supports a single increment-element pair");
        return;
    }

    //这里开始解析score,先初始化scores数组
    scores = zmalloc(sizeof(double)*elements);
    for (j = 0; j < elements; j++) {
        //填充数组,这里注意元素是成对出现,所以各个score之间要隔一个member
        if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)
            != C_OK) goto cleanup;
    }

    //这里首先在client对应的db中查找该key,即有序集
    zobj = lookupKeyWrite(c->db,key);
    if (zobj == NULL) {
        //没有指定有序集且模式为XX(只操作已存在的元素),直接返回
        if (xx) goto reply_to_client; 
        //根据元素数量选择不同的存储结构初始化有序集
        if (server.zset_max_ziplist_entries == 0 ||
            server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
        {
            //哈希表 + 跳表的组合模式
            zobj = createZsetObject();
        } else {
            //ziplist(压缩链表)模式
            zobj = createZsetZiplistObject();
        }
        //加入db中
        dbAdd(c->db,key,zobj);
    } else {
        //如果ZADD操作的集合类型不对,则返回
        if (zobj->type != OBJ_ZSET) {
            addReply(c,shared.wrongtypeerr);
            goto cleanup;
        }
    }
    //这里开始往有序集中添加元素
    for (j = 0; j < elements; j++) {
        double newscore;
        //取出client传过来的score
        score = scores[j];
        int retflags = flags;
        //取出与之对应的member
        ele = c->argv[scoreidx+1+j*2]->ptr;
        //向有序集中添加元素,参数依次是有序集,要添加的元素的score,要添加的元素,操作模式,新的score
        int retval = zsetAdd(zobj, score, ele, &retflags, &newscore);
        //添加失败则返回
        if (retval == 0) {
            addReplyError(c,nanerr);
            goto cleanup;
        }
        //记录操作
        if (retflags & ZADD_ADDED) added++;
        if (retflags & ZADD_UPDATED) updated++;
        if (!(retflags & ZADD_NOP)) processed++;
        //设置新score值
        score = newscore;
    }
    //操作记录
    server.dirty += (added+updated);

//返回逻辑
reply_to_client:
    if (incr) {
        if (processed)
            addReplyDouble(c,score);
        else
            addReply(c,shared.nullbulk);
    } else {
        addReplyLongLong(c,ch ? added+updated : added);
    }
//清理逻辑
cleanup:
    zfree(scores);
    if (added || updated) {
        signalModifiedKey(c->db,key);
        notifyKeyspaceEvent(NOTIFY_ZSET,
            incr ? "zincr" : "zadd", key, c->db->id);
    }
}

Код длинноват, давайте посмотрим на структуру хранилища:

有序集存储结构
упорядоченная структура хранения набора

Примечание. Каждая запись состоит из партитуры + участника.

С приведенной выше структурной диаграммой можно подумать, что операция удаления должна выполняться в соответствии с различными структурами хранения.Если это ziplist, связанный список удаляется.Если это структура хеш-таблицы + таблица пропуска, оба набора должны быть удалены. Какова реальная логика?
Давайте посмотрим на исходный код функции удаления zremCommand, который относительно короткий:

void zremCommand(client *c) {
    //获取有序集名
    robj *key = c->argv[1];
    robj *zobj;
    int deleted = 0, keyremoved = 0, j;
    //做校验
    if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
        checkType(c,zobj,OBJ_ZSET)) return;

    for (j = 2; j < c->argc; j++) {
        //一次删除指定元素
        if (zsetDel(zobj,c->argv[j]->ptr)) deleted++;
        //如果有序集中全部元素都被删除,则回收有序表
        if (zsetLength(zobj) == 0) {
            dbDelete(c->db,key);
            keyremoved = 1;
            break;
        }
    }
    //同步操作
    if (deleted) {
        notifyKeyspaceEvent(NOTIFY_ZSET,"zrem",key,c->db->id);
        if (keyremoved)
            notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
        signalModifiedKey(c->db,key);
        server.dirty += deleted;
    }
    //返回
    addReplyLongLong(c,deleted);
}

Взгляните на исходный код конкретной операции удаления:

//参数zobj为有序集,ele为要删除的元素
int zsetDel(robj *zobj, sds ele) {
    //与添加元素相同,根据不同的存储结构执行不同的删除逻辑
    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
        unsigned char *eptr;
        //ziplist是一个简单的链表删除节点操作
        if ((eptr = zzlFind(zobj->ptr,ele,NULL)) != NULL) {
            zobj->ptr = zzlDelete(zobj->ptr,eptr);
            return 1;
        }
    } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
        zset *zs = zobj->ptr;
        dictEntry *de;
        double score;

        de = dictUnlink(zs->dict,ele);
        if (de != NULL) {
            //查询该元素的score
            score = *(double*)dictGetVal(de);
            //从哈希表中删除元素
            dictFreeUnlinkedEntry(zs->dict,de);

            //从跳表中删除元素
            int retval = zslDelete(zs->zsl,score,ele,NULL);
            serverAssert(retval);
            //如果有需要则对哈希表进行resize操作
            if (htNeedsResize(zs->dict)) dictResize(zs->dict);
            return 1;
        }
    } else {
        serverPanic("Unknown sorted set encoding");
    }
    //没有找到指定元素返回0
    return 0;
}

Наконец, взгляните на исходный код функции запроса zrangeCommand, который также очень длинный, пот~~~, но будьте уверены, с приведенным выше основанием вы можете примерно предположить, как должна выглядеть логика запроса:

void zrangeCommand(client *c) {
    //第二个参数,0表示顺序,1表示倒序
    zrangeGenericCommand(c,0);
}

void zrangeGenericCommand(client *c, int reverse) {
    //有序集名
    robj *key = c->argv[1];
    robj *zobj;
    int withscores = 0;
    long start;
    long end;
    int llen;
    int rangelen;
    //参数校验
    if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
        (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;

    //根据参数附加信息判断是否需要返回score
    if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
        withscores = 1;
    } else if (c->argc >= 5) {
        addReply(c,shared.syntaxerr);
        return;
    }
    //有序集校验
    if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL
         || checkType(c,zobj,OBJ_ZSET)) return;

    //索引值重置
    llen = zsetLength(zobj);
    if (start < 0) start = llen+start;
    if (end < 0) end = llen+end;
    if (start < 0) start = 0;
     //返回空集
    if (start > end || start >= llen) {
        addReply(c,shared.emptymultibulk);
        return;
    }
    if (end >= llen) end = llen-1;
    rangelen = (end-start)+1;

    //返回给客户端结果长度
    addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen);
    //同样是根据有序集的不同结构执行不同的查询逻辑
    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
        unsigned char *zl = zobj->ptr;
        unsigned char *eptr, *sptr;
        unsigned char *vstr;
        unsigned int vlen;
        long long vlong;
        //根据正序还是倒序计算起始索引
        if (reverse)
            eptr = ziplistIndex(zl,-2-(2*start));
        else
            eptr = ziplistIndex(zl,2*start);

        serverAssertWithInfo(c,zobj,eptr != NULL);
        sptr = ziplistNext(zl,eptr);

        while (rangelen--) {
            serverAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL);
            //注意嵌套的ziplistGet方法就是把eptr索引的值读出来保存在后面三个参数中
            serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
            //返回value
            if (vstr == NULL)
                addReplyBulkLongLong(c,vlong);
            else
                addReplyBulkCBuffer(c,vstr,vlen);
            //如果需要则返回score
            if (withscores)
                addReplyDouble(c,zzlGetScore(sptr));
            //倒序从后往前,正序从前往后
            if (reverse)
                zzlPrev(zl,&eptr,&sptr);
            else
                zzlNext(zl,&eptr,&sptr);
        }

    } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
        zset *zs = zobj->ptr;
        zskiplist *zsl = zs->zsl;
        zskiplistNode *ln;
        sds ele;

        //找到起始节点
        if (reverse) {
            ln = zsl->tail;
            if (start > 0)
                ln = zslGetElementByRank(zsl,llen-start);
        } else {
            ln = zsl->header->level[0].forward;
            if (start > 0)
                ln = zslGetElementByRank(zsl,start+1);
        }
         //遍历并返回给客户端
        while(rangelen--) {
            serverAssertWithInfo(c,zobj,ln != NULL);
            ele = ln->ele;
            addReplyBulkCBuffer(c,ele,sdslen(ele));
            if (withscores)
                addReplyDouble(c,ln->score);
            ln = reverse ? ln->backward : ln->level[0].forward;
        }
    } else {
        serverPanic("Unknown sorted set encoding");
    }
}

Выше приведен исходный код для добавления, удаления и поиска в отсортированном наборе SortedSet. Видно, что SortedSet выберет ziplist или хеш-таблицу + пропустить две структуры данных для реализации в соответствии с количеством сохраненных элементов.Основная причина, по которой исходный код выглядит очень длинным, заключается в том, что для разных структур данных требуются разные реализации кода. . Пока вы овладеваете этой основной идеей, вам не составит труда взглянуть на исходный код.

3. Сводка команд SortedSet

Логика упорядоченных наборов несложная, но код немного длинный, включающий три набора структур данных: ziplist, skiplist и dict.Помимо обычного dict, две другие структуры данных имеют много содержимого. Я собираюсь написать специальную статью, чтобы подвести итоги, я не буду вдаваться в подробности здесь. Основная цель этой статьи — обобщить принцип реализации SortedSet.