Анализ исходного кода Redis с прогрессивным перефразированием

Redis

Роль перефразирования

При непрерывном выполнении наших операций Redis пары ключ-значение, хранящиеся в хеш-таблице, будут постепенно увеличиваться или уменьшаться.Когда данные в словаре слишком велики, это приведет к большему количеству конфликтов ключей и увеличению стоимости запроса данных. Когда данные уменьшаются, выделенная память все еще занята, что приведет к трате памяти. Чтобы поддерживать коэффициент загрузки хеш-таблицы в разумных пределах, программе необходимо соответственно увеличивать или уменьшать размер хеш-таблицы.

Принцип перефразирования

  • rehash: Во-первых, давайте посмотрим на определение структуры словаря и хеш-таблицы.
/*
 * 字典
 */
typedef struct dict {

    // 类型特定函数
    dictType *type;

    // 私有数据
    void *privdata;

    // 哈希表
    dictht ht[2];

    // rehash 索引
    // 当 rehash 不在进行时,值为 -1
    int rehashidx; /* rehashing not in progress if rehashidx == -1 */

    // 目前正在运行的安全迭代器的数量
    int iterators; /* number of iterators currently running */

} dict;
// 哈希表
typedef struct dictht {
    
    // 哈希表数组
    dictEntry **table;

    // 哈希表大小
    unsigned long size;
    
    // 哈希表大小掩码,用于计算索引值
    // 总是等于 size - 1
    unsigned long sizemask;

    // 该哈希表已有节点的数量
    unsigned long used;

} dictht;

Глядя на определение структуры, мы сначала примерно понимаем, что для определения того, пересчитывается ли поле, путем оценкиif rehashidx == -1, перефразировка вht[1]перераспределить память наht[0]данные перенесены вht[1]

  • Прогрессивный: процесс повторного хеширования не завершается за один раз, но каждый раз выполняется определенный объем миграции в операциях чтения и записи словаря и в синхронизированных событиях.

Анализ исходного кода процесса расширения

Таким образом, это операция, связанная со словарем, и раскрытие обычно происходит, когда необходимо установить значение ключа, поэтому сначала пойдем прямо.dict.cПроверьте файл, чтобы увидеть, есть ли функция для добавления или установки содержимого строки.Посмотрев и проверив код, мы обнаруживаем, что логика функции очень похожа.

int dictAdd(dict *d, void *key, void *val)
{
    // 尝试添加键到字典,并返回包含了这个键的新哈希节点
    // T = O(N)
    dictEntry *entry = dictAddRaw(d,key);

    // todo ...
}

Эта функция вызоветdictAddRaw(...)способ датьdictВыделение памятиПродолжить просмотрdictAddRaw(...)код

dictEntry *dictAddRaw(dict *d, void *key)
{
    int index;
    dictEntry *entry;
    dictht *ht;

    // 如果条件允许的话,进行单步 rehash
    // T = O(1)
    if (dictIsRehashing(d)) _dictRehashStep(d);

    /* Get the index of the new element, or -1 if
     * the element already exists. */
    // 计算键在哈希表中的索引值
    // 如果值为 -1 ,那么表示键已经存在
    // T = O(N)
    if ((index = _dictKeyIndex(d, key)) == -1)
        return NULL;

    // T = O(1)
    /* Allocate the memory and store the new entry */
    // 如果字典正在 rehash ,那么将新键添加到 1 号哈希表
    // 否则,将新键添加到 0 号哈希表
    ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
    // 为新节点分配空间
    entry = zmalloc(sizeof(*entry));
    // 将新节点插入到链表表头
    entry->next = ht->table[index];
    ht->table[index] = entry;
    // 更新哈希表已使用节点数量
    ht->used++;

    /* Set the hash entry fields. */
    // 设置新节点的键
    // T = O(1)
    dictSetKey(d, entry, key);

    return entry;
}

продолжить просмотр_dictKeyIndexкод внутри

static int _dictKeyIndex(dict *d, const void *key)
{
    unsigned int h, idx, table;
    dictEntry *he;

    /* Expand the hash table if needed */
    // 单步 rehash
    // T = O(N)
    if (_dictExpandIfNeeded(d) == DICT_ERR)
        return -1;

        /* Compute the key hash value */
    // 计算 key 的哈希值
    h = dictHashKey(d, key);
    // T = O(1)
    for (table = 0; table <= 1; table++) {

        // 计算索引值
        idx = h & d->ht[table].sizemask;

        /* Search if this slot does not already contain the given key */
        // 查找 key 是否存在
        // T = O(1)
        he = d->ht[table].table[idx];
        while(he) {
            if (dictCompareKeys(d, key, he->key))
                return -1;
            he = he->next;
        }

        // 如果运行到这里时,说明 0 号哈希表中所有节点都不包含 key
        // 如果这时 rehahs 正在进行,那么继续对 1 号哈希表进行 rehash
        if (!dictIsRehashing(d)) break;
    }

    // 返回索引值
    return idx;
}

Взглянув на приведенный выше код, мы можем найти несколько ключевых моментов.Вычисление значения индекса в словаре выполняется

    // 计算 key 的哈希值
    h = dictHashKey(d, key);
    idx = h & d->ht[table].sizemask;

Подсчитано, и мы также можем видеть, что при конфликте ключей стоимость поиска ключа

        he = d->ht[table].table[idx];
        while(he) {
            if (dictCompareKeys(d, key, he->key))
                return -1;
            he = he->next;
        }

Ключ_dictExpandIfNeededСудя по названию функции, мы думаем, что это связано с расширением

static int _dictExpandIfNeeded(dict *d)
{
    /* Incremental rehashing already in progress. Return. */
    // 渐进式 rehash 已经在进行了,直接返回
    if (dictIsRehashing(d)) return DICT_OK;

    /* If the hash table is empty expand it to the initial size. */
    // 如果字典(的 0 号哈希表)为空,那么创建并返回初始化大小的 0 号哈希表
    // T = O(1)
    if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);

    /* If we reached the 1:1 ratio, and we are allowed to resize the hash
     * table (global setting) or we should avoid it but the ratio between
     * elements/buckets is over the "safe" threshold, we resize doubling
     * the number of buckets. */
    // 一下两个条件之一为真时,对字典进行扩展
    // 1)字典已使用节点数和字典大小之间的比率接近 1:1
    //    并且 dict_can_resize 为真
    // 2)已使用节点数和字典大小之间的比率超过 dict_force_resize_ratio
    if (d->ht[0].used >= d->ht[0].size &&
        (dict_can_resize ||
         d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))
    {
        // 新哈希表的大小至少是目前已使用节点数的两倍
        // T = O(N)
        return dictExpand(d, d->ht[0].used*2);
    }

    return DICT_OK;
}

Как видно из приведенного выше кода, наиболее фундаментальная операция выделения памяти выполняется в_dictExpandIfNeeded(...)выполняется внутри функции. Эта функция будет определять, когда количество используемых ключей в хеш-таблице больше, чем выделенная память.dict_force_resize_ratio(представляя константу 5) раз, память будет перераспределена, а размер памяти будет в два раза больше исходного используемого числа.

Суммировать

  • Прочитав весь процесс исходного кода, мы обнаружили, что выполнениеdictAdd(...)При добавлении значения ключа в словарь он будет вызываться_dictExpandIfNeeded(...)Проверитьht[0].used/ht[0].size > 5Этоtrue, если да, перераспределите память размеромht[0].used * 2
  • viewingdictAddRaw(...)код функции, есть команда
    // 如果条件允许的话,进行单步 rehash
    // T = O(1)
if (dictIsRehashing(d)) _dictRehashStep(d);

_dictRehashStepЭффект заключается в выполнении значения ключа изh[0]прибытьh[1]миграция, вdict.cЕсли вы будете искать эту функцию внутри, вы обнаружите, что операции чтения и записи, связанные с dict, будут вызывать эту функцию, которая также проверяет, что процесс rehahs завершается не за один шаг, а постепенно.

Анализ исходного кода процесса сжатия

Сокращение словарной памяти в основном связано с синхронизированным событием, регулярной проверкой, суждением, соответствующий код выглядит следующим образом.

void databasesCron(void) {

    // todo ...
    
    // 在没有 BGSAVE 或者 BGREWRITEAOF 执行时,对哈希表进行 rehash
    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
        /* We use global counters so if we stop the computation at a given
         * DB we'll be able to start from the successive in the next
         * cron loop iteration. */
        static unsigned int resize_db = 0;
        static unsigned int rehash_db = 0;
        unsigned int dbs_per_call = REDIS_DBCRON_DBS_PER_CALL;
        unsigned int j;

        /* Don't test more DBs than we have. */
        // 设定要测试的数据库数量
        if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum;

        /* Resize */
        // 调整字典的大小
        for (j = 0; j < dbs_per_call; j++) {
            tryResizeHashTables(resize_db % server.dbnum);
            resize_db++;
        }

        /* Rehash */
        // 对字典进行渐进式 rehash
        if (server.activerehashing) {
            for (j = 0; j < dbs_per_call; j++) {
                int work_done = incrementallyRehash(rehash_db % server.dbnum);
                rehash_db++;
                if (work_done) {
                    /* If the function did some work, stop here, we'll do
                     * more at the next cron loop. */
                    break;
                }
            }
        }
    }
}

В дополнение к циклу и оценке, приведенный выше код имеет две специальные функции.

  1. tryResizHashTables, связанный исходный код
void tryResizeHashTables(int dbid) {
    if (htNeedsResize(server.db[dbid].dict))
        dictResize(server.db[dbid].dict);
    if (htNeedsResize(server.db[dbid].expires))
        dictResize(server.db[dbid].expires);
}

//htNeedsResize
int htNeedsResize(dict *dict) {
    long long size, used;

    size = dictSlots(dict);
    used = dictSize(dict);
    return (size && used && size > DICT_HT_INITIAL_SIZE &&
            (used*100/size < REDIS_HT_MINFILL));
}

Анализируя исходный код, мы видим, что функция будет вызываться первой.htNeedsResize,судитьused* 100 / size < REDIS_HT_MINFILLЕсли это правда, он будет называтьсяdictResizeперераспределить память

  1. incrementallyRehash, связанный исходный код
int incrementallyRehash(int dbid) {

    /* Keys dictionary */
    if (dictIsRehashing(server.db[dbid].dict)) {
        dictRehashMilliseconds(server.db[dbid].dict,1);
        return 1; /* already used our millisecond for this loop... */
    }

    /* Expires */
    if (dictIsRehashing(server.db[dbid].expires)) {
        dictRehashMilliseconds(server.db[dbid].expires,1);
        return 1; /* already used our millisecond for this loop... */
    }

    return 0;
}


//dictRehashMillisecnods
 /* 在给定毫秒数内,以 100 步为单位,对字典进行 rehash 。
 *
 * T = O(N)
 */
int dictRehashMilliseconds(dict *d, int ms) {
    // 记录开始时间
    long long start = timeInMilliseconds();
    int rehashes = 0;

    while(dictRehash(d,100)) {
        rehashes += 100;
        // 如果时间已过,跳出
        if (timeInMilliseconds()-start > ms) break;
    }

    return rehashes;
}

В результате анализа видно, что функция этой функции заключается в переносе данных хеш-таблицы словаря, который перехэшируется каждый раз в течение 1 миллисекунды и повторяется 100 раз каждый раз.

Суммировать

  • Сокращение хеш-таблицы в основном реализуется в рамках временного события.
  • Когда используемое значение ключа составляет менее 0,1 выделенной памяти, объем памяти уменьшается.
  • В дополнение к переносу данных, упомянутому в приведенном выше расширении, перенос данных будет выполняться во время чтения и записи, а перенос данных также будет выполняться во время событий, рассчитанных по времени. В то же время это позволяет избежать проблемы, связанной с невозможностью переноса данных без операций чтения и записи в течение длительного времени.