Как спроектировать и реализовать поточно-ориентированную карту? (Часть 1)

Java Redis задняя часть Go Objective-C

Карта — это очень распространенная структура данных, используемая для хранения некоторых неупорядоченных пар ключ-значение. В основных языках программирования его реализация идет по умолчанию. STL в C и C++ реализует Map, в JavaScript также есть Map, в Java есть HashMap, в Swift и Python есть Dictionary, в Go есть Map, а в Objective-C есть NSDictionary и NSMutableDictionary.

Являются ли вышеуказанные Карты потокобезопасными? Ответ - нет, не все потокобезопасны. Итак, как мы можем реализовать поточно-ориентированную карту? Чтобы ответить на этот вопрос, нам нужно начать с того, как реализовать карту.

1. Какую структуру данных следует использовать для реализации Map?

Карта — это очень распространенная структура данных, неупорядоченный набор пар ключ/значение, в котором все ключи карты различны, а затем заданный ключ можно искать и обновлять за постоянное время сложности O(1) Или удалить соответствующий ценность.

Что нужно использовать для поиска на постоянном уровне? Читатели должны быстро подумать о хеш-таблицах. Действительно, нижний слой Map обычно реализуется с использованием массива, которому помогает алгоритм хеширования. Для данного ключа обычно сначала выполняется операция хеширования, а затем по модулю длины хэш-таблицы для сопоставления ключа с указанным местом.

Алгоритмов хеширования много, какой из них эффективнее выбрать?

1. Хэш-функция

Можно сказать, что MD5 и SHA1 являются наиболее широко используемыми алгоритмами хеширования, и все они разработаны на основе MD4.

MD4 (RFC 1320) был разработан Рональдом Л. Ривестом из Массачусетского технологического института в 1990 году, MD — это аббревиатура от Message Digest (Дайджест сообщения). Он подходит для высокоскоростной реализации программного обеспечения на 32-битных текстовых процессорах — он основан на битовых манипуляциях с 32-битными операндами.
MD5 (RFC 1321) — это улучшенная версия MD4, разработанная Rivest в 1991 году. Он по-прежнему группирует ввод в 512 бит, а его вывод представляет собой конкатенацию 4 32-битных слов, как и MD4. MD5 сложнее и медленнее, чем MD4, но он более безопасен и более устойчив к анализу и дифференциации.

SHA1 разработан NIST NSA для использования с DSA, он создает хэш-значение длиной 160 бит для входных данных длиной менее 264, поэтому он устойчив к грубой силе.
Секс лучше. SHA-1 разработан на основе тех же принципов, что и MD4, и имитирует алгоритм.

Обычно используемые хеш-функции: SHA-1, SHA-256, SHA-512, MD5. Это все классические хеш-алгоритмы. В современном производстве также используются современные алгоритмы хеширования. Перечислите несколько ниже для сравнения производительности и, наконец, выберите один из исходных кодов для анализа процесса реализации.

(1) Дженкинс Хэш и SpookyHash

1997 г.Bob JenkinsОпубликовал статью о хеш-функциях в журнале Dr. Dobbs."Хэш-функция для поиска в хэш-таблице". В этой статье Боб включил широкий спектр существующих хеш-функций, в том числе то, что он назвал «lookup2». Затем в 2006 году Боб выпустилlookup3. lookup3 — это хэш Дженкинса. Узнайте больше о хеш-функции Боба в Википедии:Jenkins hash function. Хэш-алгоритм memcached поддерживает два алгоритма: jenkins, murmur3, по умолчанию — jenkins.

В 2011 году Боб Дженкинс выпустил новую собственную хеш-функцию.
SpookyHash (назван так потому, что он был выпущен на Хэллоуин). Оба они в 2 раза быстрее, чем MurmurHash, но оба они используют только 64-битные математические функции и не используют 32-битные версии, SpookyHash дает 128-битный результат.

(2) Мурмурхэш

MurmurHash не являетсяшифрованиетипхэш-функция, подходит для общих операций извлечения хэша.
Остин Эпплби выпустил новую хеш-функцию в 2008 году —MurmurHash. Последняя версия Lookup3 Около 2 раза скорость (около 1 байта / цикл), которая имеет две 32-битные и 64-битные версии. 32-битная только 32-битная версия математических функций и дает 32-битное значение хеша, а 64-битная 64-битная версия математической функции, учитывая значение 64 хеша. Согласно анализу Остина, Мурмурхаша, отличное выступление, хотя Боб Дженкинс утверждал, что «я предсказываю Мурмурхашу слабее, чем поиск3, но я не знаю точного значения, потому что я не проверял его» на журнале «Доктор Доббс». Мурмурхаш быстро стал популярным благодаря своей превосходной скорости и статистическим свойствам. Текущая версия - Murmurhash3, Redis, Memcached, Cassandra, HBase, Lucene, используют его.

Вот версия MurmurHash, реализованная на C:


uint32_t murmur3_32(const char *key, uint32_t len, uint32_t seed) {
        static const uint32_t c1 = 0xcc9e2d51;
        static const uint32_t c2 = 0x1b873593;
        static const uint32_t r1 = 15;
        static const uint32_t r2 = 13;
        static const uint32_t m = 5;
        static const uint32_t n = 0xe6546b64;

        uint32_t hash = seed;

        const int nblocks = len / 4;
        const uint32_t *blocks = (const uint32_t *) key;
        int i;
        for (i = 0; i < nblocks; i++) {
                uint32_t k = blocks[i];
                k *= c1;
                k = (k << r1) | (k >> (32 - r1));
                k *= c2;

                hash ^= k;
                hash = ((hash << r2) | (hash >> (32 - r2))) * m + n;
        }

        const uint8_t *tail = (const uint8_t *) (key + nblocks * 4);
        uint32_t k1 = 0;

        switch (len & 3) {
        case 3:
                k1 ^= tail[2] << 16;
        case 2:
                k1 ^= tail[1] << 8;
        case 1:
                k1 ^= tail[0];

                k1 *= c1;
                k1 = (k1 << r1) | (k1 >> (32 - r1));
                k1 *= c2;
                hash ^= k1;
        }

        hash ^= len;
        hash ^= (hash >> 16);
        hash *= 0x85ebca6b;
        hash ^= (hash >> 13);
        hash *= 0xc2b2ae35;
        hash ^= (hash >> 16);

        return hash;
}

(3) CityHash и FramHash

Оба алгоритма являются строковыми алгоритмами, опубликованными Google.

CityHashЭто алгоритм хеширования строк, выпущенный Google в 2011 году. Как и murmurhash, это незашифрованный алгоритм хэширования. Разработка алгоритма CityHash была вдохновлена ​​MurmurHash. Его главное преимущество заключается в том, что большинство шагов включают как минимум две отдельные математические операции. Современные процессоры обычно получают максимальную производительность от этого кода. Есть у CityHash и свои недостатки: код сложнее, чем у популярных алгоритмов в своем роде. Google хочет оптимизировать скорость, а не простоту, поэтому он не заботится об особых случаях для более коротких входных данных. Google опубликовал два алгоритма: cityhash64 и cityhash128. Они вычисляют 64- и 128-битные хеш-значения из строк соответственно. Эти алгоритмы не подходят для шифрования, но подходят для использования в хеш-таблицах и т. д. Скорость CityHash зависит от инструкций CRC32, в настоящее время SSE 4.2 (Intel Nehalem и более поздние версии).

По сравнению с Murmurhash, который поддерживает 32, 64 и 128 бит, Cityhash поддерживает 64, 128 и 256 бит.

В 2014 году Google снова выпустилFarmHash, новое семейство хеш-функций для строк. FarmHash унаследовал множество приемов и приемов от CityHash и является его преемником. FarmHash преследует несколько целей и претендует на улучшение CityHash несколькими способами. Еще одно улучшение FarmHash по сравнению с CityHash заключается в том, что он предоставляет интерфейс поверх нескольких реализаций для конкретных платформ. Таким образом, FarmHash также отвечает всем требованиям, когда разработчикам просто нужна быстрая и надежная хэш-функция для хеш-таблиц, которая не обязательно должна быть одинаковой на каждой платформе. В настоящее время FarmHash включает хеш-функции только для байтовых массивов на 32-, 64- и 128-битных платформах. Планы на будущее включают поддержку целых чисел, кортежей и других данных.

(4) хххэш

xxHash — это некриптографическая хеш-функция, созданная Яном Колле. Первоначально он использовался для алгоритма сжатия LZ4 в качестве окончательной сигнатуры проверки ошибок. Скорость этого алгоритма хеширования близка к пределу оперативной памяти. И дает две версии 32-битную и 64-битную. Сейчас он широко используется вPrestoDB,RocksDB,MySQL,ArangoDB,PGroonga,SparkЭти базы данных, также используемые вCocos2D,Dolphin,Cxbx-reloadedВ этих игровых рамках

Ниже приведен эксперимент по сравнению производительности. Тестовая средаOpen-Source SMHasher program by Austin Appleby, он был скомпилирован с помощью Visual C в Windows 7 и имеет только один поток. Ядром процессора является Core 2 Duo с частотой 3,0 ГГц.

Хэш-функции в таблице выше — это не все хэш-функции, перечислены только некоторые распространенные алгоритмы. Второй столбец — сравнение скорости, видно, что самый быстрый — это xxHash. Третий столбец — качество хэша, всего 5 хэшей самого высокого качества, все 5 звезд, xxHash, MurmurHash 3a, CityHash64, MD5-32, SHA1-32. Судя по данным в таблице, качество хеширования самое высокое, а самым быстрым является xxHash.

(4) Мемхаш

Автор этого хеш-алгоритма не нашел в Интернете очень четкой информации об авторе. В документах Google Go есть всего несколько строк комментариев, объясняющих его источник вдохновения:


// Hashing algorithm inspired by
//   xxhash: https://code.google.com/p/xxhash/
// cityhash: https://code.google.com/p/cityhash/

В нем говорится, что memhash вдохновлен xxhash и cityhash. Тогда давайте посмотрим, как memhash хэширует строки.



const (
    // Constants for multiplication: four random odd 32-bit numbers.
    m1 = 3168982561
    m2 = 3339683297
    m3 = 832293441
    m4 = 2336365089
)

func memhash(p unsafe.Pointer, seed, s uintptr) uintptr {
    if GOARCH == "386" && GOOS != "nacl" && useAeshash {
        return aeshash(p, seed, s)
    }
    h := uint32(seed + s*hashkey[0])
tail:
    switch {
    case s == 0:
    case s < 4:
        h ^= uint32(*(*byte)(p))
        h ^= uint32(*(*byte)(add(p, s>>1))) << 8
        h ^= uint32(*(*byte)(add(p, s-1))) << 16
        h = rotl_15(h*m1) * m2
    case s == 4:
        h ^= readUnaligned32(p)
        h = rotl_15(h*m1) * m2
    case s <= 8:
        h ^= readUnaligned32(p)
        h = rotl_15(h*m1) * m2
        h ^= readUnaligned32(add(p, s-4))
        h = rotl_15(h*m1) * m2
    case s <= 16:
        h ^= readUnaligned32(p)
        h = rotl_15(h*m1) * m2
        h ^= readUnaligned32(add(p, 4))
        h = rotl_15(h*m1) * m2
        h ^= readUnaligned32(add(p, s-8))
        h = rotl_15(h*m1) * m2
        h ^= readUnaligned32(add(p, s-4))
        h = rotl_15(h*m1) * m2
    default:
        v1 := h
        v2 := uint32(seed * hashkey[1])
        v3 := uint32(seed * hashkey[2])
        v4 := uint32(seed * hashkey[3])
        for s >= 16 {
            v1 ^= readUnaligned32(p)
            v1 = rotl_15(v1*m1) * m2
            p = add(p, 4)
            v2 ^= readUnaligned32(p)
            v2 = rotl_15(v2*m2) * m3
            p = add(p, 4)
            v3 ^= readUnaligned32(p)
            v3 = rotl_15(v3*m3) * m4
            p = add(p, 4)
            v4 ^= readUnaligned32(p)
            v4 = rotl_15(v4*m4) * m1
            p = add(p, 4)
            s -= 16
        }
        h = v1 ^ v2 ^ v3 ^ v4
        goto tail
    }
    h ^= h >> 17
    h *= m3
    h ^= h >> 13
    h *= m4
    h ^= h >> 16
    return uintptr(h)
}

// Note: in order to get the compiler to issue rotl instructions, we
// need to constant fold the shift amount by hand.
// TODO: convince the compiler to issue rotl instructions after inlining.
func rotl_15(x uint32) uint32 {
    return (x << 15) | (x >> (32 - 15))
}

m1, m2, m3, m4 — 4 случайно выбранных нечетных числа в качестве коэффициента умножения хеша.


// used in hash{32,64}.go to seed the hash function
var hashkey [4]uintptr

func alginit() {
    // Install aes hash algorithm if we have the instructions we need
    if (GOARCH == "386" || GOARCH == "amd64") &&
        GOOS != "nacl" &&
        cpuid_ecx&(1<<25) != 0 && // aes (aesenc)
        cpuid_ecx&(1<<9) != 0 && // sse3 (pshufb)
        cpuid_ecx&(1<<19) != 0 { // sse4.1 (pinsr{d,q})
        useAeshash = true
        algarray[alg_MEM32].hash = aeshash32
        algarray[alg_MEM64].hash = aeshash64
        algarray[alg_STRING].hash = aeshashstr
        // Initialize with random data so hash collisions will be hard to engineer.
        getRandomData(aeskeysched[:])
        return
    }
    getRandomData((*[len(hashkey) * sys.PtrSize]byte)(unsafe.Pointer(&hashkey))[:])
    hashkey[0] |= 1 // make sure these numbers are odd
    hashkey[1] |= 1
    hashkey[2] |= 1
    hashkey[3] |= 1
}

В этой функции инициализации инициализируются 2 массива, и массивы заполняются случайными хеш-ключами. На платформах 386, amd64, не-nacl будет использоваться aeshash. Здесь будет сгенерирован случайный ключ и сохранен в массиве aeskeysched. Точно так же в массиве хэш-ключей будет 4 случайных числа. В конце побитовое и 1 используются, чтобы гарантировать, что сгенерированные случайные числа являются нечетными числами.

Далее, давайте возьмем пример, чтобы увидеть, как memhash вычисляет значение хеш-функции.


func main() {
    r := [8]byte{'h', 'a', 'l', 'f', 'r', 'o', 's', 't'}
    pp := memhashpp(unsafe.Pointer(&r), 3, 7)
    fmt.Println(pp)
}

Для простоты имя автора используется в качестве примера для вычисления хеш-значения, а начальное число просто установлено равным 3.

Первый шаг вычисляет значение h.


h := uint32(seed + s*hashkey[0])

Предположим здесь, что hashkey[0] = 1, тогда значение h равно 3 + 7 * 1 = 10. Поскольку s


    case s <= 8:
        h ^= readUnaligned32(p)
        h = rotl_15(h*m1) * m2
        h ^= readUnaligned32(add(p, s-4))
        h = rotl_15(h*m1) * m2

Функция readUnaligned32() дважды преобразует входящий указатель unsafe.Pointer, сначала в тип *uint32, а затем в тип *(*uint32).

Затем выполните операцию XOR:

Затем второй шаг h m1 = 1718378850 3168982561 = 3185867170

Поскольку это 32-битное умножение, конечный результат будет 64-битным, а верхнее 32-битное переполнение напрямую отбрасывается.

Умноженный результат используется в качестве входного параметра rotl_15().


func rotl_15(x uint32) uint32 {
    return (x << 15) | (x >> (32 - 15))
}

Эта функция выполняет две операции смещения над входными параметрами.

Наконец, выполните логическую операцию ИЛИ над результатами двух сдвигов:

Затем выполните еще одно преобразование readUnaligned32():

После завершения преобразования снова выполняется XOR. На данный момент h = 2615762644.

Затем выполните еще одно преобразование rotl_15(). Здесь нет демонстрации рисунка. После завершения преобразования h = 2932930721.

Наконец, выполните последний шаг hash:


    h ^= h >> 17
    h *= m3
    h ^= h >> 13
    h *= m4
    h ^= h >> 16

Сначала сдвиг вправо на 17 бит, затем XOR, затем умножение на m3, затем сдвиг вправо на 13 бит, затем XOR, затем умножение на m4, затем сдвиг вправо на 16 бит и, наконец, XOR.

С помощью такой серии операций наконец может быть сгенерировано хеш-значение. Наконец, h = 1870717864. Заинтересованные студенты могут считать.

(5) Хэш AES

При анализе алгоритма хэширования Go выше мы видим, что он оценивает, поддерживает ли ЦП набор инструкций AES.Если ЦП поддерживает набор инструкций AES, он выберет алгоритм хеширования AES.Когда ЦП не поддерживает инструкцию AES установите Когда, переключитесь на алгоритм memhash.

Полное название набора инструкций AES:Расширенный стандартный набор инструкций шифрования(или ИнтелНовые директивы Advanced Encryption Standard, именуемыйAES-NI)Являетсяx86архитектура набора инструкцийрасширение дляIntelиAMDмикропроцессор.

Производительность хеш-алгоритма при использовании реализации AES будет очень хорошей, поскольку она обеспечивает аппаратное ускорение.

Конкретный код реализован следующим образом, ассемблер и комментарии см. в следующей программе:


// aes hash 算法通过 AES 硬件指令集实现
TEXT runtime·aeshash(SB),NOSPLIT,$0-32
    MOVQ    p+0(FP), AX    // 把ptr移动到data数据段中
    MOVQ    s+16(FP), CX    // 长度
    LEAQ    ret+24(FP), DX
    JMP    runtime·aeshashbody(SB)

TEXT runtime·aeshashstr(SB),NOSPLIT,$0-24
    MOVQ    p+0(FP), AX    // 把ptr移动到字符串的结构体中
    MOVQ    8(AX), CX    // 字符串长度
    MOVQ    (AX), AX    // 字符串的数据
    LEAQ    ret+16(FP), DX
    JMP    runtime·aeshashbody(SB)

Окончательная реализация хэша находится в aeshashbody:


// AX: 数据
// CX: 长度
// DX: 返回的地址
TEXT runtime·aeshashbody(SB),NOSPLIT,$0-0
    // SSE 寄存器中装填入我们的随机数种子
    MOVQ    h+8(FP), X0            // 每个table中hash种子有64 位
    PINSRW    $4, CX, X0            // 长度占16位
    PSHUFHW $0, X0, X0            // 压缩高位字乱序,重复长度4次
    MOVO    X0, X1                // 保存加密前的种子
    PXOR    runtime·aeskeysched(SB), X0    // 对每一个处理中的种子进行逻辑异或
    AESENC    X0, X0                // 加密种子

    CMPQ    CX, $16
    JB    aes0to15
    JE    aes16
    CMPQ    CX, $32
    JBE    aes17to32
    CMPQ    CX, $64
    JBE    aes33to64
    CMPQ    CX, $128
    JBE    aes65to128
    JMP    aes129plus

// aes 从 0 - 15
aes0to15:
    TESTQ    CX, CX
    JE    aes0

    ADDQ    $16, AX
    TESTW    $0xff0, AX
    JE    endofpage

    //当前加载的16位字节的地址不会越过一个页面边界,所以我们可以直接加载它。
    MOVOU    -16(AX), X1
    ADDQ    CX, CX
    MOVQ    $masks<>(SB), AX
    PAND    (AX)(CX*8), X1
final1:
    PXOR    X0, X1    // 异或数据和种子
    AESENC    X1, X1    // 连续加密3次
    AESENC    X1, X1
    AESENC    X1, X1
    MOVQ    X1, (DX)
    RET

endofpage:
    // 地址结尾是1111xxxx。 这样就可能超过一个页面边界,所以在加载完最后一个字节后停止加载。然后使用pshufb将字节向下移动。
    MOVOU    -32(AX)(CX*1), X1
    ADDQ    CX, CX
    MOVQ    $shifts<>(SB), AX
    PSHUFB    (AX)(CX*8), X1
    JMP    final1

aes0:
    // 返回输入的并且已经加密过的种子
    AESENC    X0, X0
    MOVQ    X0, (DX)
    RET

aes16:
    MOVOU    (AX), X1
    JMP    final1

aes17to32:
    // 开始处理第二个起始种子
    PXOR    runtime·aeskeysched+16(SB), X1
    AESENC    X1, X1

    // 加载要被哈希算法处理的数据
    MOVOU    (AX), X2
    MOVOU    -16(AX)(CX*1), X3

    // 异或种子
    PXOR    X0, X2
    PXOR    X1, X3

    // 连续加密3次
    AESENC    X2, X2
    AESENC    X3, X3
    AESENC    X2, X2
    AESENC    X3, X3
    AESENC    X2, X2
    AESENC    X3, X3

    // 拼接并生成结果
    PXOR    X3, X2
    MOVQ    X2, (DX)
    RET

aes33to64:
    // 处理第三个以上的起始种子
    MOVO    X1, X2
    MOVO    X1, X3
    PXOR    runtime·aeskeysched+16(SB), X1
    PXOR    runtime·aeskeysched+32(SB), X2
    PXOR    runtime·aeskeysched+48(SB), X3
    AESENC    X1, X1
    AESENC    X2, X2
    AESENC    X3, X3

    MOVOU    (AX), X4
    MOVOU    16(AX), X5
    MOVOU    -32(AX)(CX*1), X6
    MOVOU    -16(AX)(CX*1), X7

    PXOR    X0, X4
    PXOR    X1, X5
    PXOR    X2, X6
    PXOR    X3, X7

    AESENC    X4, X4
    AESENC    X5, X5
    AESENC    X6, X6
    AESENC    X7, X7

    AESENC    X4, X4
    AESENC    X5, X5
    AESENC    X6, X6
    AESENC    X7, X7

    AESENC    X4, X4
    AESENC    X5, X5
    AESENC    X6, X6
    AESENC    X7, X7

    PXOR    X6, X4
    PXOR    X7, X5
    PXOR    X5, X4
    MOVQ    X4, (DX)
    RET

aes65to128:
    // 处理第七个以上的起始种子
    MOVO    X1, X2
    MOVO    X1, X3
    MOVO    X1, X4
    MOVO    X1, X5
    MOVO    X1, X6
    MOVO    X1, X7
    PXOR    runtime·aeskeysched+16(SB), X1
    PXOR    runtime·aeskeysched+32(SB), X2
    PXOR    runtime·aeskeysched+48(SB), X3
    PXOR    runtime·aeskeysched+64(SB), X4
    PXOR    runtime·aeskeysched+80(SB), X5
    PXOR    runtime·aeskeysched+96(SB), X6
    PXOR    runtime·aeskeysched+112(SB), X7
    AESENC    X1, X1
    AESENC    X2, X2
    AESENC    X3, X3
    AESENC    X4, X4
    AESENC    X5, X5
    AESENC    X6, X6
    AESENC    X7, X7

    // 加载数据
    MOVOU    (AX), X8
    MOVOU    16(AX), X9
    MOVOU    32(AX), X10
    MOVOU    48(AX), X11
    MOVOU    -64(AX)(CX*1), X12
    MOVOU    -48(AX)(CX*1), X13
    MOVOU    -32(AX)(CX*1), X14
    MOVOU    -16(AX)(CX*1), X15

    // 异或种子
    PXOR    X0, X8
    PXOR    X1, X9
    PXOR    X2, X10
    PXOR    X3, X11
    PXOR    X4, X12
    PXOR    X5, X13
    PXOR    X6, X14
    PXOR    X7, X15

    // 连续加密3次
    AESENC    X8, X8
    AESENC    X9, X9
    AESENC    X10, X10
    AESENC    X11, X11
    AESENC    X12, X12
    AESENC    X13, X13
    AESENC    X14, X14
    AESENC    X15, X15

    AESENC    X8, X8
    AESENC    X9, X9
    AESENC    X10, X10
    AESENC    X11, X11
    AESENC    X12, X12
    AESENC    X13, X13
    AESENC    X14, X14
    AESENC    X15, X15

    AESENC    X8, X8
    AESENC    X9, X9
    AESENC    X10, X10
    AESENC    X11, X11
    AESENC    X12, X12
    AESENC    X13, X13
    AESENC    X14, X14
    AESENC    X15, X15

    // 拼装结果
    PXOR    X12, X8
    PXOR    X13, X9
    PXOR    X14, X10
    PXOR    X15, X11
    PXOR    X10, X8
    PXOR    X11, X9
    PXOR    X9, X8
    MOVQ    X8, (DX)
    RET

aes129plus:
    // 处理第七个以上的起始种子
    MOVO    X1, X2
    MOVO    X1, X3
    MOVO    X1, X4
    MOVO    X1, X5
    MOVO    X1, X6
    MOVO    X1, X7
    PXOR    runtime·aeskeysched+16(SB), X1
    PXOR    runtime·aeskeysched+32(SB), X2
    PXOR    runtime·aeskeysched+48(SB), X3
    PXOR    runtime·aeskeysched+64(SB), X4
    PXOR    runtime·aeskeysched+80(SB), X5
    PXOR    runtime·aeskeysched+96(SB), X6
    PXOR    runtime·aeskeysched+112(SB), X7
    AESENC    X1, X1
    AESENC    X2, X2
    AESENC    X3, X3
    AESENC    X4, X4
    AESENC    X5, X5
    AESENC    X6, X6
    AESENC    X7, X7

    // 逆序开始,从最后的block开始处理,因为可能会出现重叠的情况
    MOVOU    -128(AX)(CX*1), X8
    MOVOU    -112(AX)(CX*1), X9
    MOVOU    -96(AX)(CX*1), X10
    MOVOU    -80(AX)(CX*1), X11
    MOVOU    -64(AX)(CX*1), X12
    MOVOU    -48(AX)(CX*1), X13
    MOVOU    -32(AX)(CX*1), X14
    MOVOU    -16(AX)(CX*1), X15

    // 异或种子
    PXOR    X0, X8
    PXOR    X1, X9
    PXOR    X2, X10
    PXOR    X3, X11
    PXOR    X4, X12
    PXOR    X5, X13
    PXOR    X6, X14
    PXOR    X7, X15

    // 计算剩余128字节块的数量
    DECQ    CX
    SHRQ    $7, CX

aesloop:
    // 加密状态
    AESENC    X8, X8
    AESENC    X9, X9
    AESENC    X10, X10
    AESENC    X11, X11
    AESENC    X12, X12
    AESENC    X13, X13
    AESENC    X14, X14
    AESENC    X15, X15

    // 在同一个block块中加密状态,进行异或运算
    MOVOU    (AX), X0
    MOVOU    16(AX), X1
    MOVOU    32(AX), X2
    MOVOU    48(AX), X3
    AESENC    X0, X8
    AESENC    X1, X9
    AESENC    X2, X10
    AESENC    X3, X11
    MOVOU    64(AX), X4
    MOVOU    80(AX), X5
    MOVOU    96(AX), X6
    MOVOU    112(AX), X7
    AESENC    X4, X12
    AESENC    X5, X13
    AESENC    X6, X14
    AESENC    X7, X15

    ADDQ    $128, AX
    DECQ    CX
    JNE    aesloop

    // 最后一步,进行3次以上的加密
    AESENC    X8, X8
    AESENC    X9, X9
    AESENC    X10, X10
    AESENC    X11, X11
    AESENC    X12, X12
    AESENC    X13, X13
    AESENC    X14, X14
    AESENC    X15, X15
    AESENC    X8, X8
    AESENC    X9, X9
    AESENC    X10, X10
    AESENC    X11, X11
    AESENC    X12, X12
    AESENC    X13, X13
    AESENC    X14, X14
    AESENC    X15, X15
    AESENC    X8, X8
    AESENC    X9, X9
    AESENC    X10, X10
    AESENC    X11, X11
    AESENC    X12, X12
    AESENC    X13, X13
    AESENC    X14, X14
    AESENC    X15, X15

    PXOR    X12, X8
    PXOR    X13, X9
    PXOR    X14, X10
    PXOR    X15, X11
    PXOR    X10, X8
    PXOR    X11, X9
    PXOR    X9, X8
    MOVQ    X8, (DX)
    RET

2. Обработка коллизий хэшей

(1) метод массива связанных списков

Метод массива связанного списка относительно прост.Каждое значение ключа определяется по модулю длины таблицы.Если результаты одинаковы, связанный список используется для их последовательной вставки.

Предположим, что набор значений ключа для вставки равен {2, 3, 5, 7, 11, 13, 19}, а длина таблицы — MOD 8. Предположим, что хэш-функция равномерно распределена на [0,9). Как показано выше.

Далее сосредоточиться на анализе производительности:

Найдите значение ключа k, предполагая, что значение ключа k отсутствует в хеш-таблице, h(k) равномерно распределено в [0, M), то есть P(h(k) = i) = 1/M . Пусть Xi будет количеством ключей, содержащихся в хеш-таблице ht[i]. Если h(k) = i , количество сравнений ключ-значение для неудачного нахождения k равно Xi, поэтому:

Анализ успешного поиска немного сложнее. Чтобы рассмотреть порядок, в котором добавляется хеш-таблица, независимо от случая с одним и тем же значением ключа, предположим, что K = {k1,k2,...kn}, и предположим, что пустая хеш-таблица добавляется в хеш-таблицу в этом порядке. Введем случайные величины, если h(ki) = h(kj), то Xij = 1, если h(ki) ! = h(kj), то Xij = 0 .

Поскольку предыдущее предположение состоит в том, что хеш-таблица распределена равномерно, P(Xij = i) = E(Xij) = 1/M , где E(X) представляет собой математическое ожидание случайной величины X. Предположим, что каждый раз, когда вы добавляете значение ключа, оно добавляется в конец связанного списка. Пусть Ci будет количеством сравнений ключ-значение, необходимых для нахождения Ki, Поскольку вероятность нахождения Ki не может быть определена заранее, предполагая, что вероятность нахождения различных значений ключа одинакова, 1/n, есть:

Отсюда видно, что производительность хэш-таблицы мало связана с количеством элементов в таблице, но связана с коэффициентом заполнения α.Если длина хэш-таблицы пропорциональна количеству элементов в хэш-таблице, то сложность поиска в хеш-таблице составляет O (1).

Подводя итог, можно сказать, что среднее количество успешных и неудачных сравнений ключ-значение для массива связанных списков выглядит следующим образом:

(2) Метод открытого адреса — линейное обнаружение

Правило линейного обнаружения hi = (h (k) + i) MOD M. Например, я = 1, М = 9.

При таком способе разрешения конфликтов при возникновении конфликта позиция увеличивается на 1 до тех пор, пока не будет найдена пустая позиция.

Например, предположим, что набор значений ключа, который нужно вставить, равен {2, 3, 5, 7, 11, 13, 19}, а значение, добавленное после конфликта линейного обнаружения, равно 1, тогда окончательный результат будет таким: следует:

Анализ производительности хеш-таблицы линейного обнаружения более сложен, и здесь приводятся только результаты.

(3) Метод открытого адреса — обнаружение квадрата

Правило линейного зондирования: h0 = h(k), hi = (h0 + i * i) MOD M.

Например, если предположить, что набор значений ключа, который необходимо вставить, равен {2, 3, 5, 7, 11, 13, 20}, а значение, добавленное после обнаружения столкновения квадратов, представляет собой квадрат числа поиска, окончательный результат выглядит следующим образом:

Квадратное обнаружение добавляет вторичную кривую на основе линейного обнаружения. После возникновения конфликта добавляется уже не линейный параметр, а количество зондов.

Одна вещь, которую следует отметить при обнаружении квадратов, заключается в том, что размер M является особым. Если M не является нечетным простым числом, может возникнуть следующая проблема, даже если в хеш-таблице есть пустые позиции, но некоторые элементы не могут найти позицию для вставки.

Например, если предположить, что M = 10, набор значений ключа, который необходимо вставить, равен {0, 1, 4, 5, 6, 9, 10}. После того, как предыдущие 6 значений ключа будут вставлены в хеш-таблицу, 10 уже нельзя вставить. .

Таким образом, при обнаружении квадратов действует следующее правило:

Если M — нечетное простое число, то следующие ⌈M / 2⌉ позиций h0, h1, h2 ... h⌊M/2⌋ отличны друг от друга. где hi = (h0 + i * i ) MOD M.

Это правило можно доказать от противного. Если предположить, что hi = hj, i > j, 0 i = ( h0 + j j ) MOD M такой, что M делится на ( i + j )( i - j ). Поскольку M простое число и 0

Приведенные выше правила объяснят это.Пока M является нечетным простым числом, квадратное зондирование может пройти по крайней мере через общую позицию хеш-таблицы. Таким образом, пока коэффициент заполнения хэш-таблицы α

В приведенном выше примере причина, по которой значение ключа 10 не может быть вставлено, также заключается в том, что α > 1/2, поэтому нет гарантии, что существует вставляемая позиция.

(4) Метод открытого адреса — обнаружение двойного хэша

Обнаружение двойного хэша предназначено для решения проблемы агрегации. Будь то линейное обнаружение или квадратичное обнаружение, если h(k1) и h(k2) являются смежными, их последовательности обнаружения также являются смежными, что является так называемым явлением агрегации. Чтобы избежать этого явления, вводится двойная хеш-функция h2, так что расстояние между двумя обнаружениями равно h2(k). Таким образом, тестовая последовательность такова: h0 = h1(k), hi = (h0 + i * h2(k)) MOD M . Эксперименты показывают, что эффективность обнаружения двойного хеша аналогична случайному обнаружению.

В среднем длина поиска для двойных хеш-зондов и квадратичных зондов сложнее, чем для линейных зондов. Таким образом, для аппроксимации этих двух обнаружений вводится понятие случайного обнаружения. Случайное обнаружение означает, что последовательность обнаружения { hi } выбирается случайным образом независимо с умеренной вероятностью в интервале [0, M], так что P(hi = j) = 1/M .

Предположим, что тестовая последовательность такова: h0, h1, ..., hi. Позиция hi хеш-таблицы пуста, и хэш-таблица не пуста в позициях h0, h1, ..., hi-1.Количество сравнений значений ключа в этом поиске равно i. Пусть случайная величина X будет количеством сравнений ключ-значение, необходимых для неудачного поиска. Поскольку коэффициент заполнения хэш-таблицы равен α, вероятность наличия нулевого значения в хэш-таблице в позиции равна 1 - α, а вероятность ненулевого значения равна α, поэтому P(X = i) = α ^ я * ( 1 - α ) .

В теории вероятностей вышеупомянутое распределение называется геометрическим распределением.

Предполагая, что порядок добавления элементов хэш-таблицы {k1, k2,..., kn}, пусть Xi будет количеством сравнений ключ-значение для неудачного поиска, когда хеш-таблица содержит только {k1, k2,... , ki}. Обратите внимание, что коэффициент заполнения хеш-таблицы в это время равен i/M, а число раз, которое нужно найти значение ключа k(i+1), равно Yi = 1 + Xi. Предполагая, что вероятность нахождения любого значения ключа равна 1/n, среднее количество сравнений значений ключа для успешного поиска равно:

Подводя итог, можно сказать, что среднее количество успешных и неудачных сравнений ключ-значение для квадратичных зондов и зондов с двойным хешированием выглядит следующим образом:

В общем, в случае очень большого объема данных простая хэш-функция не может неизбежно вызвать коллизии, даже если будет принят подходящий метод для обработки коллизий, все равно есть определенная временная сложность. Поэтому, если вы хотите максимально избежать коллизий, вам все равно нужно выбрать высокопроизводительную хеш-функцию или увеличить количество хэш-битов, например, 64-бит, 128-бит, 256-бит, чтобы вероятность столкновения будет намного меньше.

3. Стратегия расширения хеш-таблицы

По мере увеличения коэффициента загрузки хеш-таблицы количество коллизий становится все больше и больше, а производительность хэш-таблицы становится все хуже и хуже. Для хеш-таблицы, реализованной методом раздельного связанного списка, это еще можно терпеть, но для метода открытой адресации такое снижение производительности неприемлемо, поэтому необходимо найти способ решения этой проблемы для метода открытой адресации.

В практических приложениях решение этой проблемы заключается в динамическом увеличении длины хеш-таблицы.Когда коэффициент загрузки превышает определенный порог, увеличение длины хэш-таблицы и автоматическое расширение емкости. Всякий раз, когда длина хеш-таблицы изменяется, индексы индексов, соответствующие всем ключам в хэш-таблице, должны быть пересчитаны и не могут быть напрямую скопированы из исходной хэш-таблицы в новую хэш-таблицу. Хэш-значения ключей в исходной хэш-таблице должны быть вычислены одно за другим и вставлены в новую хеш-таблицу. Этот метод определенно не соответствует требованиям производственной среды, потому что временная сложность слишком высока, O (n), когда объем данных велик, производительность будет очень низкой. Redis придумал способ завершить операцию вставки за постоянное время O(1) даже при запуске роста. Решение состоит в том, чтобы сделать несколько инкрементных копий старой хэш-таблицы в новую хеш-таблицу вместо одной копии.

Затем возьмем Redis в качестве примера, чтобы рассказать о том, как это хеш-таблица, которую можно расширить без ущерба для производительности.

Redis определяет словарь следующим образом:


/*
 * 字典
 *
 * 每个字典使用两个哈希表,用于实现渐进式 rehash
 */
typedef struct dict {
    // 特定于类型的处理函数
    dictType *type;
    // 类型处理函数的私有数据
    void *privdata;
    // 哈希表(2 个)
    dictht ht[2];
    // 记录 rehash 进度的标志,值为 -1 表示 rehash 未进行
    int rehashidx;
    // 当前正在运作的安全迭代器数量
    int iterators;
} dict;

Из определения видно, что словарь Redis сохраняет две хеш-таблицы, а хеш-таблица ht[1] используется для повторного хеширования.

В Redis определена следующая структура данных хеш-таблицы:


/*
 * 哈希表
 */
typedef struct dictht {
    // 哈希表节点指针数组(俗称桶,bucket)
    dictEntry **table;
    // 指针数组的大小
    unsigned long size;
    // 指针数组的长度掩码,用于计算索引值
    unsigned long sizemask;
    // 哈希表现有的节点数量
    unsigned long used;

} dictht;

Свойство table представляет собой массив, и каждый элемент массива является указателем на структуру dictEntry.

Каждый dictEntry содержит пару ключ-значение и указатель на другую структуру dictEntry:

/*
 * 哈希表节点
 */
typedef struct dictEntry {
    // 键
    void *key;
    // 值
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
    } v;
    // 链往后继节点
    struct dictEntry *next;

} dictEntry;

Атрибут next указывает на другую структуру dictEntry, а несколько dictEntry могут быть объединены в связанный список с помощью указателя next.Из этого видно, что dicttht использует метод цепных адресов для решения проблемы коллизии ключей.

Dictadd Перед каждой новой парой клавишной валюты добавляется в словарь, будет ли хэш Table HT [0] проверяет HT [0] размера и использованное свойство, если соотношение между ними = используется / размер для удовлетворения любого из следующего состояния Затем процесс RefaSh будет активирован:

Естественная перефразировка: отношение >= 1 и переменная dict_can_resize имеет значение true.
Force rehash: ratio больше, чем переменная dict_force_resize_ratio (в текущей версии значение dict_force_resize_ratio равно 5).

Предполагая, что текущий словарь нуждается в расширении перехеширования, Redis сначала установит rehashidx словаря равным 0, отметив начало перехеширования, затем выделит место для ht[1]->table, а размер будет как минимум в два раза больше размера из ht[0]->используется .

Как показано выше, ht[1]->table выделило 8 пробелов.

Далее начните перефразировать. Переместите значение ключа в ht[0]->table в ht[1]->table Перемещение значения ключа выполняется не за один раз, а несколько раз.

Как видно из приведенного выше рисунка, некоторые значения ключа в ht[0] были перенесены в ht[1], и в это время вставлены новые значения ключа, которые напрямую вставлены в ht[1] , не вставляется в ht[0] снова. Гарантируется, что ht[0] только убывает, а не увеличивается.

В процессе повторного хеширования постоянно вставляются новые значения ключа, а значения ключа в ht[0] непрерывно переносятся до тех пор, пока не будут перенесены все значения ключа в ht[0]. Обратите внимание, что Redis использует метод вставки головы, и новое значение всегда вставляется в первую позицию связанного списка, поэтому нет необходимости переходить к концу связанного списка, что экономит временную сложность O(n ). В случае с приведенным выше рисунком все узлы были перенесены.

Rehash очистит до конца, освободит пространство ht[0]; заменит ht[0] на ht[1], сделает исходный ht[1] новым ht[0]; создаст новый пустой хэш-таблицу Rehash , и присвойте ему значение ht[1]; присвойте атрибуту rehashidx словаря значение -1, указывая на то, что перефразирование остановлено;

После окончательного перефразирования ситуация выглядит так, как показано выше. Если вам все еще нужно перефразировать в следующий раз, повторите описанный выше процесс. Этот метод многократного последовательного повторения также обеспечивает высокую производительность Redis.

Стоит отметить, что Redis поддерживает операцию повторного сжатия словарей. Шаги
Обратный процесс перефразирования.

2. Оптимизация красно-черного дерева

Прочитав это, читатель должен был понять, как управлять картой, чтобы
Вероятность хеш-столкновения мала, а массив хэша ведра занимает меньше места. Ответ заключается в том, чтобы выбрать хороший хеш-алгоритм и увеличить механизм расширения.

Java снова оптимизировала базовую реализацию HashMap в JDK1.8.

Изображение выше представляет собой сводку из блога Meituan. Отсюда мы можем найти:

Количество начальных сегментов в нижней части Java равно 16, а коэффициент загрузки по умолчанию равен 0,75, то есть, когда значение ключа впервые достигает 12, оно будет расширено и изменено. Критическое значение расширения равно 64. Когда оно превышает 64, а конфликтный узел равен 8 или больше 8, в это время будет запущено преобразование красно-черного дерева. Чтобы базовый связанный список не был слишком длинным, связанный список преобразуется в красно-черное дерево.

Другими словами, когда общее количество бакетов не достигает 64, даже если длина связанного списка равна 8, преобразование красно-черного дерева выполняться не будет.

Если количество узлов меньше 6, красно-черное дерево снова выродится в связный список.

Конечно, причина, по которой мы решили использовать здесь красно-черные деревья для оптимизации, гарантирует, что наихудший случай не выродится в
O (n), красно-черное дерево гарантирует, что наихудшая временная сложность также равна o (log n).

В блоге Meituan также упоминалось, что у Java есть еще одна оптимизация, которую стоит изучить в JDK1.8. Java не нужно снова вычислять вычисление хэша во время процесса миграции узла ключ-значение rehash!

Поскольку используется расширение степени 2 (имеется в виду, что длина увеличивается в 2 раза по сравнению с исходной), положение элемента либо находится в исходном положении, либо перемещается в положение степени 2 в исходном положении. Вы можете понять смысл этого предложения, взглянув на рисунок ниже. n — длина таблицы. На рисунке (a) показаны key1 и
Пример определения позиции индекса для двух ключей ключа 2. На рисунке (б) показано, что ключ 1 и
key2 Пример определения индексной позиции двух ключей, где hash1 — результат хэширования и операции высокого порядка, соответствующей key1.

После того, как элемент пересчитает хэш, поскольку n удваивается, диапазон маски n-1 на 1 бит больше в старшем разряде (красный), поэтому новый индекс изменится следующим образом:

Поэтому после расширения нужно только посмотреть, равно ли значение бита после расширения 0 или 1. Если он равен 0, значит, индекс остается неизменным, если равен 1, значит, новое значение индекса равно исходному значению индекса плюс Просто перейдите к oldCap, поэтому вам не нужно снова вычислять хэш.

ФИГ. 16 находится на расширении корпуса 32.

3. Конкретный пример реализации Map in Go

Прочитав это, у читателей должны появиться некоторые идеи о том, как создать карту. Выберите отличный хэш-алгоритм, используйте связанный список + массив в качестве базовой структуры данных, а как расширить и оптимизировать, вы все должны знать. Прочитав это, читатели могут подумать, что содержание этой статьи составляет больше половины, но предыдущие — это частичная теория, а затем, возможно, ключевая часть этой статьи — анализ полной реализации Map с нуля.

Далее автор анализирует базовую реализацию карты в Go, которая также является примером конкретной реализации карты и нескольких важных операций, добавления ключевых значений, удаления ключевых значений и стратегий расширения.

Карта Go реализована в файле /src/runtime/hashmap.go.

Основная сущность карты по-прежнему является хэш-таблицей.

Давайте сначала посмотрим, как Go определяет некоторые константы.



const (
    // 一个桶里面最多可以装的键值对的个数,8对。
    bucketCntBits = 3
    bucketCnt     = 1 << bucketCntBits

    // 触发扩容操作的最大装载因子的临界值
    loadFactor = 6.5

    // 为了保持内联,键 和 值 的最大长度都是128字节,如果超过了128个字节,就存储它的指针
    maxKeySize   = 128
    maxValueSize = 128

    // 数据偏移应该是 bmap 的整数倍,但是需要正确的对齐。
    dataOffset = unsafe.Offsetof(struct {
        b bmap
        v int64
    }{}.v)

    // tophash 的一些值
    empty          = 0 // 没有键值对
    evacuatedEmpty = 1 // 没有键值对,并且桶内的键值被迁移走了。
    evacuatedX     = 2 // 键值对有效,并且已经迁移了一个表的前半段
    evacuatedY     = 3 // 键值对有效,并且已经迁移了一个表的后半段
    minTopHash     = 4 // 最小的 tophash

    // 标记
    iterator     = 1 // 当前桶的迭代子
    oldIterator  = 2 // 旧桶的迭代子
    hashWriting  = 4 // 一个goroutine正在写入map
    sameSizeGrow = 8 // 当前字典增长到新字典并且保持相同的大小

    // 迭代子检查桶ID的哨兵
    noCheck = 1<<(8*sys.PtrSize) - 1
)

Здесь стоит пояснить, как достигается критическое значение 6,5 для запуска операции расширения. Если это значение слишком велико, это приведет к слишком большому количеству сегментов переполнения и эффективность поиска будет снижена. Если это значение слишком мало, пространство для хранения будет потрачено впустую.

По словам разработчиков Google, это значение представляет собой тестовую программу, которая измеряет выбранное эмпирическое значение.

% переполнение:
Скорость переполнения, сколько ключей kv в ведре в среднем переполнится.

байт/запись:
Сколько дополнительных байтов данных необходимо хранить в среднем для хранения значения ключа kv.

хитпроб:
Найдите среднее количество поиска для существующего ключа.

промах:
Найдите среднее количество поисков несуществующего ключа.

После нескольких наборов тестовых данных в качестве критического коэффициента нагрузки, наконец, был выбран 6,5.

Затем посмотрите определение заголовка карты в Go:



type hmap struct {
    count     int // map 的长度
    flags     uint8
    B         uint8  // log以2为底,桶个数的对数 (总共能存 6.5 * 2^B 个元素)
    noverflow uint16 // 近似溢出桶的个数
    hash0     uint32 // 哈希种子

    buckets    unsafe.Pointer // 有 2^B 个桶的数组. count==0 的时候,这个数组为 nil.
    oldbuckets unsafe.Pointer // 旧的桶数组一半的元素
    nevacuate  uintptr        // 扩容增长过程中的计数器

    extra *mapextra // 可选字段
}

В структуре заголовка карты Go он также содержит два указателя на массивы сегментов, Buckets указывает на новый массив сегментов, а oldbuckets указывает на старый массив сегментов. Это похоже на то, что в словаре Redis есть два массива dicttht.

Последнее поле hmap — это указатель на структуру mapextra, которая определяется следующим образом:


type mapextra struct {
    overflow [2]*[]*bmap
    nextOverflow *bmap
}

Если пара ключ-значение не находит соответствующий указатель, они сначала будут сохранены в корзине переполнения.
перелив внутрь. В mapextra также есть указатель на следующую доступную область переполнения.

Переполнение ведра переполнения — это массив, в котором хранятся 2 указателя на массив *bmap. overflow[0] заполнен hmap.buckets . overflow[1] содержит hmap.oldbuckets.

Давайте посмотрим на определение структуры данных бакета.bmap — это тип структуры, соответствующий бакету на карте в Go.



type bmap struct {
    tophash [bucketCnt]uint8
}

Определение ведра относительно простое, оно просто содержит массив типа uint8, содержащий 8 элементов. Эти 8 элементов хранят старшие 8 бит хеш-значения.

В схеме памяти есть еще 2 блока после tophash. Сразу после тофаша идут 8 пар ключ-значение. А расположение таково, что 8 ключей и 8 значений собраны вместе.

8 Пары пар клавишных значений Конец пары ключа-значение сопровождается указателем переполнения, указывая на следующую BMAP. Из этого также можно увидеть, что карта в Go использует связанный список для решения хэш-конфликтов.

Почему значение ключа хранилища Go не является обычным ключом/значением, ключом/значением, ключом/значением... так что хранилище? Это ключевой ключ для хранения его вместе, тогда значение значения хранится вместе, почему?

В Redis при использовании REDIS_ENCODING_ZIPLIST для кодирования хэш-таблицы программа формирует структуру пары ключ-значение, необходимую для сохранения хэш-таблицы, путем объединения ключа и значения в сжатый список, как показано выше. Недавно добавленные пары ключ-значение добавляются в нижний колонтитул сжатого списка.

У этой структуры есть недостаток: если хранимые ключи и значения имеют разные типы и занимают разные байты в структуре памяти, требуется выравнивание. Например, сохраните словарь типа map[int64]int8.

Чтобы уменьшить потребление памяти при выравнивании памяти, Go разработал его, как показано на рисунке выше.

Если на карте хранятся триллионы больших данных, объем памяти, сэкономленный здесь, весьма значителен.

1. Создайте новую карту

Makemap создает новую карту. Если входной параметр h не пуст, то hmap карты является hmap входного параметра. Если ведро входного параметра не пусто, то это ведро ведра используется как первое ведро.



func makemap(t *maptype, hint int64, h *hmap, bucket unsafe.Pointer) *hmap {
    // hmap 的 size 大小的值非法
    if sz := unsafe.Sizeof(hmap{}); sz > 48 || sz != t.hmap.size {
        println("runtime: sizeof(hmap) =", sz, ", t.hmap.size =", t.hmap.size)
        throw("bad hmap size")
    }

    // 超过范围的 hint 值都为0
    if hint < 0 || hint > int64(maxSliceCap(t.bucket.size)) {
        hint = 0
    }

    // key 值的类型不是 Go 所支持的
    if !ismapkey(t.key) {
        throw("runtime.makemap: unsupported map key type")
    }

    // 通过编译器和反射检车 key 值的 size 是否合法
    if t.key.size > maxKeySize && (!t.indirectkey || t.keysize != uint8(sys.PtrSize)) ||
        t.key.size <= maxKeySize && (t.indirectkey || t.keysize != uint8(t.key.size)) {
        throw("key size wrong")
    }
    // 通过编译器和反射检车 value 值的 size 是否合法
    if t.elem.size > maxValueSize && (!t.indirectvalue || t.valuesize != uint8(sys.PtrSize)) ||
        t.elem.size <= maxValueSize && (t.indirectvalue || t.valuesize != uint8(t.elem.size)) {
        throw("value size wrong")
    }

    // 虽然以下的变量我们不依赖,而且可以在编译阶段检查下面这些值的合法性,
    // 但是我们还是在这里检测。

    // key 值对齐超过桶的个数
    if t.key.align > bucketCnt {
        throw("key align too big")
    }
    // value 值对齐超过桶的个数
    if t.elem.align > bucketCnt {
        throw("value align too big")
    }
    // key 值的 size 不是 key 值对齐的倍数
    if t.key.size%uintptr(t.key.align) != 0 {
        throw("key size not a multiple of key align")
    }
    // value 值的 size 不是 value 值对齐的倍数
    if t.elem.size%uintptr(t.elem.align) != 0 {
        throw("value size not a multiple of value align")
    }
    // 桶个数太小,无法正确对齐
    if bucketCnt < 8 {
        throw("bucketsize too small for proper alignment")
    }
    // 数据偏移量不是 key 值对齐的整数倍,说明需要在桶中填充 key
    if dataOffset%uintptr(t.key.align) != 0 {
        throw("need padding in bucket (key)")
    }
    // 数据偏移量不是 value 值对齐的整数倍,说明需要在桶中填充 value
    if dataOffset%uintptr(t.elem.align) != 0 {
        throw("need padding in bucket (value)")
    }

    B := uint8(0)
    for ; overLoadFactor(hint, B); B++ {
    }

    // 分配内存并初始化哈希表
    // 如果此时 B = 0,那么 hmap 中的 buckets 字段稍后分配
    // 如果 hint 值很大,初始化这块内存需要一段时间。
    buckets := bucket
    var extra *mapextra
    if B != 0 {
        var nextOverflow *bmap
        // 初始化 bucket 和 nextOverflow 
        buckets, nextOverflow = makeBucketArray(t, B)
        if nextOverflow != nil {
            extra = new(mapextra)
            extra.nextOverflow = nextOverflow
        }
    }

    // 初始化 hmap
    if h == nil {
        h = (*hmap)(newobject(t.hmap))
    }
    h.count = 0
    h.B = B
    h.extra = extra
    h.flags = 0
    h.hash0 = fastrand()
    h.buckets = buckets
    h.oldbuckets = nil
    h.nevacuate = 0
    h.noverflow = 0

    return h
}

Самое главное для создания новой карты — это выделение памяти и инициализация хеш-таблицы.Если B не равно 0, mapextra также будет инициализирована, а корзины будут перегенерированы.


func makeBucketArray(t *maptype, b uint8) (buckets unsafe.Pointer, nextOverflow *bmap) {
    base := uintptr(1 << b)
    nbuckets := base
    if b >= 4 {
        nbuckets += 1 << (b - 4)
        sz := t.bucket.size * nbuckets
        up := roundupsize(sz)
        // 如果申请 sz 大小的桶,系统只能返回 up 大小的内存空间,那么桶的个数为 up / t.bucket.size
        if up != sz {
            nbuckets = up / t.bucket.size
        }
    }
    buckets = newarray(t.bucket, int(nbuckets))
    // 当 b > 4 并且计算出来桶的个数与 1 << b 个数不等的时候,
    if base != nbuckets {
        // 此时 nbuckets 比 base 大,那么会预先分配 nbuckets - base 个 nextOverflow 桶
        nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize)))
        last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize)))
        last.setoverflow(t, (*bmap)(buckets))
    }
    return buckets, nextOverflow
}

Новый массив здесь уже mallocgc.

Как видно из приведенного выше кода, только при B>=4 MakebucketArray генерирует указатель nextoverflow на BMAP, который будет генерировать MaPextra, когда MAP генерирует HMAP.

Когда B = 3 ( B

Когда B = 4 ( B >= 4 ), mapextra дополнительно генерируется при инициализации hmap и инициализируется nextOverflow. Указатель nextOverflow mapextra будет указывать на конец 16-го сегмента и первый адрес 17-го сегмента. Bucketsize - адрес sys.PtrSize 17-го бакета (начиная с 0, то есть бакета с индексом 16) хранит указатель, указывающий на первый адрес всего текущего бакета. Этот указатель является указателем переполнения bmap.

Когда B = 5 (B> = ​​4), MapExtra дополнительно генерируется при инициализации HMAP и NextOverFlow инициализируется. В это время будет сгенерировано в общей сложности 34 ведра. Аналогично, адрес последнего размера ведра минус размер указателя начинает хранить указатель переполнения.

2. Найти ключ

В Go, если вы ищете несуществующий ключ в словаре, он не вернет nil, если он не будет найден, а вернет нулевое значение текущего типа. Например, строка возвращает пустую строку, а тип int возвращает 0.



func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
    if raceenabled && h != nil {
        // 获取 caller 的 程序计数器 program counter
        callerpc := getcallerpc(unsafe.Pointer(&t))
        // 获取 mapaccess1 的程序计数器 program counter
        pc := funcPC(mapaccess1)
        racereadpc(unsafe.Pointer(h), callerpc, pc)
        raceReadObjectPC(t.key, key, callerpc, pc)
    }
    if msanenabled && h != nil {
        msanread(key, t.key.size) 
    }
    if h == nil || h.count == 0 {
        return unsafe.Pointer(&zeroVal[0])
    }
    // 如果多线程读写,直接抛出异常
    // 并发检查 go hashmap 不支持并发访问
    if h.flags&hashWriting != 0 {
        throw("concurrent map read and map write")
    }
    alg := t.key.alg
    // 计算 key 的 hash 值
    hash := alg.hash(key, uintptr(h.hash0))
    m := uintptr(1)<<h.B - 1
    // hash % (1<<B - 1) 求出 key 在哪个桶
    b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
    // 如果当前还存在 oldbuckets 桶
    if c := h.oldbuckets; c != nil {
        // 当前扩容不是等量扩容
        if !h.sameSizeGrow() {
            // 如果 oldbuckets 未迁移完成 则找找 oldbuckets 中对应的 bucket(低 B-1 位)
            // 否则为 buckets 中的 bucket(低 B 位)
            // 把 mask 缩小 1 倍
            m >>= 1
        }

        oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
        if !evacuated(oldb) {
            // 如果 oldbuckets 桶存在,并且还没有扩容迁移,就在老的桶里面查找 key 
            b = oldb
        }
    }
    // 取出 hash 值的高 8 位
    top := uint8(hash >> (sys.PtrSize*8 - 8))
    // 如果 top 小于 minTopHash,就让它加上 minTopHash 的偏移。
    // 因为 0 - minTopHash 这区间的数都已经用来作为标记位了
    if top < minTopHash {
        top += minTopHash
    }
    for {
        for i := uintptr(0); i < bucketCnt; i++ {
            // 如果 hash 的高8位和当前 key 记录的不一样,就找下一个
            // 这样比较很高效,因为只用比较高8位,不用比较所有的 hash 值
            // 如果高8位都不相同,hash 值肯定不同,但是高8位如果相同,那么就要比较整个 hash 值了
            if b.tophash[i] != top {
                continue
            }
            // 取出 key 值的方式是用偏移量,bmap 首地址 + i 个 key 值大小的偏移量
            k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
            if t.indirectkey {
                k = *((*unsafe.Pointer)(k))
            }
            // 比较 key 值是否相等
            if alg.equal(key, k) {
                // 如果找到了 key,那么取出 value 值
                // 取出 value 值的方式是用偏移量,bmap 首地址 + 8 个 key 值大小的偏移量 + i 个 value 值大小的偏移量
                v := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))
                if t.indirectvalue {
                    v = *((*unsafe.Pointer)(v))
                }
                return v
            }
        }
        // 如果当前桶里面没有找到相应的 key ,那么就去下一个桶去找
        b = b.overflow(t)
        // 如果 b == nil,说明桶已经都找完了,返回对应type的零值
        if b == nil {
            return unsafe.Pointer(&zeroVal[0])
        }
    }
}

Конкретный код реализации приведен выше, пожалуйста, обратитесь к коду для подробного объяснения.

Как показано выше, это и есть весь процесс поиска ключа.

Сначала вычисляется хеш-значение, соответствующее ключу, а хеш-значение — это остаток от B.

Здесь есть пункт оптимизации. m % n Этот шаг расчета, если n кратно 2, то этот шаг операции остатка можно опустить.


m % n = m & ( n - 1 )

Эта оптимизация может сэкономить время на выполнение остаточных операций. В этом примере вычисленное значение равно 0010, что равно 2, поэтому оно соответствует третьему сегменту в массиве сегментов. Почему третье ведро? Первый адрес указывает на 0-е ведро, и он смещен на размер 2 ведра, поэтому он смещается на первый адрес третьего ведра Конкретную реализацию см. В приведенном выше коде.

Младшие биты B хэш-значения определяют количество сегментов в массиве сегментов, а старшие 8 битов хэш-значения определяют количество битов в массиве tophash, где существует ключ в массиве сегментов bmap. Как показано на рисунке выше, старшие 8 бит хеша используются для сравнения с каждым значением в массиве tophash.Если старшие 8 бит не равны tophash[i], оно напрямую сравнивается со следующим. Если они равны, извлеките соответствующий полный ключ из bmap и снова сравните его, чтобы убедиться, что он полностью согласован.

Весь процесс поиска сначала находится в олдбакете (если есть лодбакет), а потом переходим в новый бмап, чтобы его найти.

У кого-то могут возникнуть вопросы, а почему здесь добавлен tophash для еще одного сравнения?

Введение tophash должно ускорить поиск. Поскольку он сохраняет только старшие 8 бит хеш-значения, это намного быстрее, чем поиск полных 64 бит. Сравнивая старшие 8 бит, быстро найти индекс согласованного хеш-значения старших 8 бит, а затем выполнить полное сравнение.Если оно все еще непротиворечиво, то определяется, что ключ найден.

Если ключ найден, возвращается соответствующее значение. Если он не найден, он будет продолжать поиск в переполненном сегменте, пока не будет найден последний сегмент, если он не будет найден, он вернет нулевое значение соответствующего типа.

3. Вставьте ключ

Вставьте ключевые процессы и процесс поиска ключей в целом согласованы.


func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
    if h == nil {
        panic(plainError("assignment to entry in nil map"))
    }
    if raceenabled {
        // 获取 caller 的 程序计数器 program counter
        callerpc := getcallerpc(unsafe.Pointer(&t))
        // 获取 mapassign 的程序计数器 program counter
        pc := funcPC(mapassign)
        racewritepc(unsafe.Pointer(h), callerpc, pc)
        raceReadObjectPC(t.key, key, callerpc, pc)
    }
    if msanenabled {
        msanread(key, t.key.size)
    }
    // 如果多线程读写,直接抛出异常
    // 并发检查 go hashmap 不支持并发访问
    if h.flags&hashWriting != 0 {
        throw("concurrent map writes")
    }
    alg := t.key.alg
    // 计算 key 值的 hash 值
    hash := alg.hash(key, uintptr(h.hash0))

    // 在计算完 hash 值以后立即设置 hashWriting 变量的值,如果在计算 hash 值的过程中没有完全写完,可能会导致 panic
    h.flags |= hashWriting

    // 如果 hmap 的桶的个数为0,那么就新建一个桶
    if h.buckets == nil {
        h.buckets = newarray(t.bucket, 1)
    }

again:
    // hash 值对 B 取余,求得所在哪个桶
    bucket := hash & (uintptr(1)<<h.B - 1)
    // 如果还在扩容中,继续扩容
    if h.growing() {
        growWork(t, h, bucket)
    }
    // 根据 hash 值的低 B 位找到位于哪个桶
    b := (*bmap)(unsafe.Pointer(uintptr(h.buckets) + bucket*uintptr(t.bucketsize)))
    // 计算 hash 值的高 8 位
    top := uint8(hash >> (sys.PtrSize*8 - 8))
    if top < minTopHash {
        top += minTopHash
    }

    var inserti *uint8
    var insertk unsafe.Pointer
    var val unsafe.Pointer
    for {
        // 遍历当前桶所有键值,查找 key 对应的 value
        for i := uintptr(0); i < bucketCnt; i++ {
            if b.tophash[i] != top {
                if b.tophash[i] == empty && inserti == nil {
                    // 如果往后找都没有找到,这里先记录一个标记,方便找不到以后插入到这里
                    inserti = &b.tophash[i]
                    // 计算出偏移 i 个 key 值的位置
                    insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
                    // 计算出 val 所在的位置,当前桶的首地址 + 8个 key 值所占的大小 + i 个 value 值所占的大小
                    val = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))
                }
                continue
            }
            // 依次取出 key 值
            k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
            // 如果 key 值是一个指针,那么就取出改指针对应的 key 值
            if t.indirectkey {
                k = *((*unsafe.Pointer)(k))
            }
            // 比较 key 值是否相等
            if !alg.equal(key, k) {
                continue
            }
            // 如果需要更新,那么就把 t.key 拷贝从 k 拷贝到 key
            if t.needkeyupdate {
                typedmemmove(t.key, k, key)
            }
            // 计算出 val 所在的位置,当前桶的首地址 + 8个 key 值所占的大小 + i 个 value 值所占的大小
            val = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))
            goto done
        }
        ovf := b.overflow(t)
        if ovf == nil {
            break
        }
        b = ovf
    }

    // 没有找到当前的 key 值,并且检查最大负载因子,如果达到了最大负载因子,或者存在很多溢出的桶
    if !h.growing() && (overLoadFactor(int64(h.count), h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
        // 开始扩容
        hashGrow(t, h)
        goto again // Growing the table invalidates everything, so try again
    }
    // 如果找不到一个空的位置可以插入 key 值
    if inserti == nil {
        // all current buckets are full, allocate a new one.
        // 意味着当前桶已经全部满了,那么就生成一个新的
        newb := h.newoverflow(t, b)
        inserti = &newb.tophash[0]
        insertk = add(unsafe.Pointer(newb), dataOffset)
        val = add(insertk, bucketCnt*uintptr(t.keysize))
    }

    // store new key/value at insert position
    if t.indirectkey {
        // 如果是存储 key 值的指针,这里就用 insertk 存储 key 值的地址
        kmem := newobject(t.key)
        *(*unsafe.Pointer)(insertk) = kmem
        insertk = kmem
    }
    if t.indirectvalue {
        // 如果是存储 value 值的指针,这里就用 val 存储 key 值的地址
        vmem := newobject(t.elem)
        *(*unsafe.Pointer)(val) = vmem
    }
    // 将 t.key 从 insertk 拷贝到 key 的位置
    typedmemmove(t.key, insertk, key)
    *inserti = top
    // hmap 中保存的总 key 值的数量 + 1
    h.count++

done:
    // 禁止并发写
    if h.flags&hashWriting == 0 {
        throw("concurrent map writes")
    }
    h.flags &^= hashWriting
    if t.indirectvalue {
        // 如果 value 里面存储的是指针,那么取值该指针指向的 value 值
        val = *((*unsafe.Pointer)(val))
    }
    return val
}

Между процессом вставки ключа и поиском ключа есть несколько отличий, которые следует отметить:

    1. Если вы найдете ключ для вставки, просто обновите соответствующее значение напрямую.
    1. Если вставляемый ключ не найден в bmap, то возможны несколько ситуаций.
      Случай 1: В bmap еще есть вакансии.При обходе bmap заранее отметить вакансию.После того, как поиск окончен и ключ не найден, поставить ключ на отмеченную при предварительном обходе вакансию.
      Случай 2: Вакансий в bmap больше нет. В это время bmap очень заполнен. В этот момент необходимо проверить, достигнут ли максимальный коэффициент нагрузки. Если оно достигнуто, операция расширения выполняется немедленно. После расширения вставьте ключ в новое ведро, и процесс будет таким же, как описано выше. Если максимальный коэффициент загрузки не достигнут, создается новый bmap, и указатель переполнения предыдущего bmap указывает на новый bmap.
    1. В процессе расширения старая корзина t замораживается, и при поиске ключа
      Найти в oldbucket, но не вставлять данные в oldbucket. если в
      oldbucket находит соответствующий ключ, перенося его на новый bmap и добавляя его
      оцененный тег.

Остальные процессы в основном аналогичны поиску ключей, поэтому я не буду их здесь повторять.

3. Удалить ключ


func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
    if raceenabled && h != nil {
        // 获取 caller 的 程序计数器 program counter
        callerpc := getcallerpc(unsafe.Pointer(&t))
        // 获取 mapdelete 的程序计数器 program counter
        pc := funcPC(mapdelete)
        racewritepc(unsafe.Pointer(h), callerpc, pc)
        raceReadObjectPC(t.key, key, callerpc, pc)
    }
    if msanenabled && h != nil {
        msanread(key, t.key.size)
    }
    if h == nil || h.count == 0 {
        return
    }
    // 如果多线程读写,直接抛出异常
    // 并发检查 go hashmap 不支持并发访问
    if h.flags&hashWriting != 0 {
        throw("concurrent map writes")
    }

    alg := t.key.alg
    // 计算 key 值的 hash 值
    hash := alg.hash(key, uintptr(h.hash0))

    // 在计算完 hash 值以后立即设置 hashWriting 变量的值,如果在计算 hash 值的过程中没有完全写完,可能会导致 panic
    h.flags |= hashWriting

    bucket := hash & (uintptr(1)<<h.B - 1)
    // 如果还在扩容中,继续扩容
    if h.growing() {
        growWork(t, h, bucket)
    }
    // 根据 hash 值的低 B 位找到位于哪个桶
    b := (*bmap)(unsafe.Pointer(uintptr(h.buckets) + bucket*uintptr(t.bucketsize)))
    // 计算 hash 值的高 8 位
    top := uint8(hash >> (sys.PtrSize*8 - 8))
    if top < minTopHash {
        top += minTopHash
    }
    for {
        // 遍历当前桶所有键值,查找 key 对应的 value
        for i := uintptr(0); i < bucketCnt; i++ {
            if b.tophash[i] != top {
                continue
            }
            k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
            // 如果 k 是指向 key 的指针,那么这里需要取出 key 的值
            k2 := k
            if t.indirectkey {
                k2 = *((*unsafe.Pointer)(k2))
            }
            if !alg.equal(key, k2) {
                continue
            }
            // key 的指针置空
            if t.indirectkey {
                *(*unsafe.Pointer)(k) = nil
            } else {
            // 清除 key 的内存
                typedmemclr(t.key, k)
            }
            v := unsafe.Pointer(uintptr(unsafe.Pointer(b)) + dataOffset + bucketCnt*uintptr(t.keysize) + i*uintptr(t.valuesize))
            // value 的指针置空
            if t.indirectvalue {
                *(*unsafe.Pointer)(v) = nil
            } else {
            // 清除 value 的内存
                typedmemclr(t.elem, v)
            }
            // 清空 tophash 里面的值
            b.tophash[i] = empty
            // map 里面 key 的总个数减1
            h.count--
            goto done
        }
        // 如果没有找到,那么就继续查找 overflow 桶,一直遍历到最后一个
        b = b.overflow(t)
        if b == nil {
            goto done
        }
    }

done:
    if h.flags&hashWriting == 0 {
        throw("concurrent map writes")
    }
    h.flags &^= hashWriting
}

Основной процесс операции удаления аналогичен процессу поиска ключа: после нахождения соответствующего ключа, если указатель указывает на исходный ключ, установить указатель на ноль. Если это значение, память, в которой оно находится, очищается. Также очистите значение в tophash и, наконец, уменьшите общее количество ключей в карте на 1.

Если в процессе расширения операция удаления будет удалена в новом bmap после расширения.

Процесс поиска все равно должен перейти к списку последнего BMAP Barrel.

4. Постепенное удвоение и расширение

Эта часть является основной частью всей реализации карты. Мы все знаем, что когда Map постоянно загружает значения Key, эффективность поиска будет становиться все ниже и ниже.Если операция расширения не выполняется в это время, коллизия хэшей приведет к тому, что связанный список будет становиться все длиннее и длиннее, а производительность станет все больше и больше. Расширение обязательно.

Однако, если запись значения ключа заблокирована в процессе расширения, будет период бездействия при обработке больших данных.Если он используется в системе с высоким уровнем реального времени, каждое расширение будет задержано на несколько секунд. В течение этого времени не может ответить ни на один запрос. Такое исполнение явно неприемлемо. Поэтому необходимо не затрагивать написание, но и расширять возможности одновременно. В это время должно быть сделано постепенное расширение.

Инкрементное расширение здесь на самом деле используется для самых разных целей.Стратегия инкрементного расширения, принятая Redis в предыдущем примере.

Далее давайте посмотрим, как поэтапно масштабируется Go.

Когда mapassign в Go вставляет значение ключа, а mapdelete удаляет значение ключа, он проверяет, расширяется ли он в данный момент.


func growWork(t *maptype, h *hmap, bucket uintptr) {
    // 确保我们迁移了所有 oldbucket
    evacuate(t, h, bucket&h.oldbucketmask())

    // 再迁移一个标记过的桶
    if h.growing() {
        evacuate(t, h, h.nevacuate)
    }
}

Отсюда мы видим, каждый раз расти будет мигрировать 2 ведра. Одним из них является текущее ведро, это частичная миграция, а другая - бочка невацитов указывает в HMAP, что является инкрементной миграцией.

При вставке значения ключа, если старая корзина зависла в процессе расширения, старая корзина будет искаться первой, но данные не будут вставлены в старую корзину. Только в том случае, если соответствующий ключ найден в старом сегменте, добавьте оценочную метку после его переноса в новый сегмент.

При удалении значения ключа, если в данный момент находится в процессе расширения, сначала найдите ведро, то есть новое ведро, и, найдя его, установите для соответствующего ключа и значения пустое значение. Если он не найден в ведре, он отправится в старое ведро, чтобы найти его.

Каждый раз, когда вставляется значение ключа, будет оцениваться, превышает ли текущий коэффициент загрузки 6,5.Если этот предел достигнут, операция расширения hashGrow будет выполняться немедленно. Это подготовительная работа перед расширением.



func hashGrow(t *maptype, h *hmap) {
    // 如果达到了最大装载因子,就需要扩容。
    // 不然的话,一个桶后面链表跟着一大堆的 overflow 桶
    bigger := uint8(1)
    if !overLoadFactor(int64(h.count), h.B) {
        bigger = 0
        h.flags |= sameSizeGrow
    }
    // 把 hmap 的旧桶的指针指向当前桶
    oldbuckets := h.buckets
    // 生成新的扩容以后的桶,hmap 的 buckets 指针指向扩容以后的桶。
    newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger)

    flags := h.flags &^ (iterator | oldIterator)
    if h.flags&iterator != 0 {
        flags |= oldIterator
    }
    // B 加上新的值
    h.B += bigger
    h.flags = flags
    // 旧桶指针指向当前桶
    h.oldbuckets = oldbuckets
    // 新桶指针指向扩容以后的桶
    h.buckets = newbuckets
    h.nevacuate = 0
    h.noverflow = 0

    if h.extra != nil && h.extra.overflow[0] != nil {
        if h.extra.overflow[1] != nil {
            throw("overflow is not nil")
        }
        // 交换 overflow[0] 和 overflow[1] 的指向
        h.extra.overflow[1] = h.extra.overflow[0]
        h.extra.overflow[0] = nil
    }
    if nextOverflow != nil {
        if h.extra == nil {
            // 生成 mapextra
            h.extra = new(mapextra)
        }
        h.extra.nextOverflow = nextOverflow
    }

    // 实际拷贝键值对的过程在 evacuate() 中
}

Диаграмма, показывающая его поток:

Операция hashGrow является подготовительной работой перед расширением, а фактический процесс копирования находится в стадии эвакуации.

Операция hashGrow сначала создаст новый массив сегментов после расширения. Новый массив сегментов в два раза больше предыдущего. Тогда сегменты hmap будут указывать на этот новый расширенный сегмент, а старые сегменты будут указывать на текущий массив сегментов.

После обработки hmap, а затем обработки mapextra, nextOverflow указывает на исходный указатель переполнения, а указатель переполнения устанавливается равным нулю.

На этом подготовка перед расширением завершена.


func evacuate(t *maptype, h *hmap, oldbucket uintptr) {
    b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
    // 在准备扩容之前桶的个数
    newbit := h.noldbuckets()
    alg := t.key.alg
    if !evacuated(b) {
        // TODO: reuse overflow buckets instead of using new ones, if there
        // is no iterator using the old buckets.  (If !oldIterator.)

        var (
            x, y   *bmap          // 在新桶里面 低位桶和高位桶
            xi, yi int            // key 和 value 值的索引值分别为 xi , yi 
            xk, yk unsafe.Pointer // 指向 x 和 y 的 key 值的指针 
            xv, yv unsafe.Pointer // 指向 x 和 y 的 value 值的指针  
        )
        // 新桶中低位的一些桶
        x = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
        xi = 0
        // 扩容以后的新桶中低位的第一个 key 值
        xk = add(unsafe.Pointer(x), dataOffset)
        // 扩容以后的新桶中低位的第一个 key 值对应的 value 值
        xv = add(xk, bucketCnt*uintptr(t.keysize))
        // 如果不是等量扩容
        if !h.sameSizeGrow() {
            y = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
            yi = 0
            yk = add(unsafe.Pointer(y), dataOffset)
            yv = add(yk, bucketCnt*uintptr(t.keysize))
        }
        // 依次遍历溢出桶
        for ; b != nil; b = b.overflow(t) {
            k := add(unsafe.Pointer(b), dataOffset)
            v := add(k, bucketCnt*uintptr(t.keysize))
            // 遍历 key - value 键值对
            for i := 0; i < bucketCnt; i, k, v = i+1, add(k, uintptr(t.keysize)), add(v, uintptr(t.valuesize)) {
                top := b.tophash[i]
                if top == empty {
                    b.tophash[i] = evacuatedEmpty
                    continue
                }
                if top < minTopHash {
                    throw("bad map state")
                }
                k2 := k
                // key 值如果是指针,则取出指针里面的值
                if t.indirectkey {
                    k2 = *((*unsafe.Pointer)(k2))
                }
                useX := true
                if !h.sameSizeGrow() {
                    // 如果不是等量扩容,则需要重新计算 hash 值,不管是高位桶 x 中,还是低位桶 y 中
                    hash := alg.hash(k2, uintptr(h.hash0))
                    if h.flags&iterator != 0 {
                        if !t.reflexivekey && !alg.equal(k2, k2) {
                            // 如果两个 key 不相等,那么他们俩极大可能旧的 hash 值也不相等。
                            // tophash 对要迁移的 key 值也是没有多大意义的,所以我们用低位的 tophash 辅助扩容,标记一些状态。
                            // 为下一个级 level 重新计算一些新的随机的 hash 值。以至于这些 key 值在多次扩容以后依旧可以均匀分布在所有桶中
                            // 判断 top 的最低位是否为1
                            if top&1 != 0 {
                                hash |= newbit
                            } else {
                                hash &^= newbit
                            }
                            top = uint8(hash >> (sys.PtrSize*8 - 8))
                            if top < minTopHash {
                                top += minTopHash
                            }
                        }
                    }
                    useX = hash&newbit == 0
                }
                if useX {
                    // 标记低位桶存在 tophash 中
                    b.tophash[i] = evacuatedX
                    // 如果 key 的索引值到了桶最后一个,就新建一个 overflow
                    if xi == bucketCnt {
                        newx := h.newoverflow(t, x)
                        x = newx
                        xi = 0
                        xk = add(unsafe.Pointer(x), dataOffset)
                        xv = add(xk, bucketCnt*uintptr(t.keysize))
                    }
                    // 把 hash 的高8位再次存在 tophash 中
                    x.tophash[xi] = top
                    if t.indirectkey {
                        // 如果是指针指向 key ,那么拷贝指针指向
                        *(*unsafe.Pointer)(xk) = k2 // copy pointer
                    } else {
                        // 如果是指针指向 key ,那么进行值拷贝
                        typedmemmove(t.key, xk, k) // copy value
                    }
                    // 同理拷贝 value
                    if t.indirectvalue {
                        *(*unsafe.Pointer)(xv) = *(*unsafe.Pointer)(v)
                    } else {
                        typedmemmove(t.elem, xv, v)
                    }
                    // 继续迁移下一个
                    xi++
                    xk = add(xk, uintptr(t.keysize))
                    xv = add(xv, uintptr(t.valuesize))
                } else {
                    // 这里是高位桶 y,迁移过程和上述低位桶 x 一致,下面就不再赘述了
                    b.tophash[i] = evacuatedY
                    if yi == bucketCnt {
                        newy := h.newoverflow(t, y)
                        y = newy
                        yi = 0
                        yk = add(unsafe.Pointer(y), dataOffset)
                        yv = add(yk, bucketCnt*uintptr(t.keysize))
                    }
                    y.tophash[yi] = top
                    if t.indirectkey {
                        *(*unsafe.Pointer)(yk) = k2
                    } else {
                        typedmemmove(t.key, yk, k)
                    }
                    if t.indirectvalue {
                        *(*unsafe.Pointer)(yv) = *(*unsafe.Pointer)(v)
                    } else {
                        typedmemmove(t.elem, yv, v)
                    }
                    yi++
                    yk = add(yk, uintptr(t.keysize))
                    yv = add(yv, uintptr(t.valuesize))
                }
            }
        }
        // Unlink the overflow buckets & clear key/value to help GC.
        if h.flags&oldIterator == 0 {
            b = (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
            // Preserve b.tophash because the evacuation
            // state is maintained there.
            if t.bucket.kind&kindNoPointers == 0 {
                memclrHasPointers(add(unsafe.Pointer(b), dataOffset), uintptr(t.bucketsize)-dataOffset)
            } else {
                memclrNoHeapPointers(add(unsafe.Pointer(b), dataOffset), uintptr(t.bucketsize)-dataOffset)
            }
        }
    }

    // Advance evacuation mark
    if oldbucket == h.nevacuate {
        h.nevacuate = oldbucket + 1
        // Experiments suggest that 1024 is overkill by at least an order of magnitude.
        // Put it in there as a safeguard anyway, to ensure O(1) behavior.
        stop := h.nevacuate + 1024
        if stop > newbit {
            stop = newbit
        }
        for h.nevacuate != stop && bucketEvacuated(t, h, h.nevacuate) {
            h.nevacuate++
        }
        if h.nevacuate == newbit { // newbit == # of oldbuckets
            // Growing is all done. Free old main bucket array.
            h.oldbuckets = nil
            // Can discard old overflow buckets as well.
            // If they are still referenced by an iterator,
            // then the iterator holds a pointers to the slice.
            if h.extra != nil {
                h.extra.overflow[1] = nil
            }
            h.flags &^= sameSizeGrow
        }
    }
}

Вышеуказанные функции являются основной работой по копированию процесса миграции.

Весь процесс миграции не сложен. Здесь необходимо объяснить значение x и y. После расширения новый массив сегментов в два раза превышает размер исходного массива сегментов. Используйте x для представления младшей половины нового массива сегментов и y для представления старшей половины. Другими переменными являются некоторые маркеры, курсор и клавиша маркера - значение, где исходная позиция. Подробности смотрите в комментариях к коду.

На приведенной выше диаграмме показан процесс после начала миграции. Вы можете видеть, что корзины из старого массива корзин мигрируют в новую корзину, а новая корзина постоянно записывает новые значения ключей.

Продолжайте копировать пары ключ-значение, пока все ключи-значения из старого сегмента не будут скопированы в новый сегмент.

Последним шагом является освобождение старого сегмента, а указатель oldbuckets устанавливается равным нулю. На этом процесс миграции полностью завершен.

5. Эквивалентное расширение

Строго говоря, этот метод нельзя рассматривать как расширение. Но имя функции — Grow, так что давайте пока назовем ее так.

Начиная с версии go1.8 добавлена ​​функция sameSizeGrow при переполнении контейнеров.
Когда количество превышает определенное число (2 ^ B), но коэффициент загрузки не достигает 6,5, могут быть некоторые пустые ведра, то есть скорость использования ведра низкая, и будет запущен тот же размер размера, то есть B остается без изменений, но данные отправляются В процессе миграции данные старых бакетов переупорядочиваются и улучшается использование бакетов. Конечно, во время процесса sameSizeGrow loadFactorGrow не будет запущен.

4. Некоторые оптимизации в реализации Map

Я считаю, что после прочтения этого у читателя должно быть четкое представление о том, как спроектировать и реализовать карту. Включает реализации различных операций в Map. Прежде чем исследовать, как реализовать карту, безопасную для потоков, давайте подытожим некоторые из основных моментов и моментов оптимизации, упомянутых ранее.

В Redis Chash Collisions обрабатываются постепенно. Когда средняя длина поиска превышает 5, инкрементная операция расширения будет запущена для обеспечения высокой производительности хэш-таблицы.

В то же время Redis использует метод вставки заголовка, чтобы обеспечить производительность при вставке значений ключа.

В Java, когда количество сегментов превышает 64, а количество конфликтующих узлов равно 8 или больше, в это время будет запущено преобразование красно-черного дерева. Это может гарантировать, что длина поиска связанного списка все еще не слишком велика, когда связанный список очень длинный, а красно-черное дерево гарантирует, что временная сложность O (log n) также поддерживается в худшем случае.

У Java после миграции очень хороший дизайн, нужно только сравнить, равен ли старший бит числа бакетов после миграции 0. Если он равен 0, то относительное положение ключа в новом бакете остается неизменным. равно 1, добавьте значение ведра.Количество старых ведер oldCap может получить новую позицию.

В Go много точек оптимизации:

  1. Алгоритм хеширования использует эффективный алгоритм memhash и набор инструкций CPU AES. Набор инструкций AES в полной мере использует аппаратные характеристики ЦП, а эффективность вычисления хеш-значений очень высока.
  2. Расположение ключ-значение устроено так, что вместе складываются ключи и складываются значения, а не ключи, а значения располагаются попарно. Это облегчает выравнивание памяти и экономит некоторые потери, вызванные выравниванием памяти, когда объем данных велик.
  3. Когда размер памяти ключа и значения превышает 128 байт, они автоматически преобразуются в указатель для сохранения.
  4. Дизайн массива tophash ускоряет процесс поиска ключа. Кроме того, tophash повторно используется для обозначения состояния операции расширения.
  5. Используйте битовую операцию для преобразования операции остатка, m % n , когда n = 1
  6. Инкрементальное расширение.
  7. Эквивалентное расширение, компактная работа.
  8. Начиная с Go 1.9 Map изначально поддерживает потокобезопасность. (рассмотрите этот вопрос в следующей главе)

Конечно, в Go еще есть места, которые нужно оптимизировать:

  1. В процессе переноса текущая версия не будет повторно использовать переполнение сегмента, а напрямую повторно подаст заявку на новый сегмент. Это можно оптимизировать, чтобы предпочтительно повторно использовать корзину переполнения без указателей и применять новую, когда она больше недоступна. Автор уже писал об этом в TODO.
  2. Динамически объединяйте несколько пустых сегментов.
  3. В текущей версии нет операции сжатия, карта может только увеличиваться, но не уменьшаться. Эта часть Redis имеет связанную реализацию.

(Учитывая длину одной статьи, вся часть, посвященная безопасности резьбы, будет рассмотрена в следующей статье, а следующая статья будет обновлена ​​позже)


Ссылка:
«Алгоритмы и структуры данных»
«Проектирование и реализация Redis»
xxHash
Строковая хэш-функция
General Purpose Hash Function Algorithms
Серия Java 8 рекомендует Hashmap

Репозиторий GitHub:Halfrost-Field

Follow: halfrost · GitHub

Source: halfrost.com/go_map/