Разбор принципа ConcurrentHashMap

Java

1. Введение

Зачем использовать ConcurrentHashMap

В основном по двум причинам:

  1. Использование HashMap в параллельном программировании может привести к бесконечному циклу (jdk1.7, jdk1.8 приведет к потере данных)
  2. HashTable очень неэффективен

2. Структура ConcurrentHashMap

В jdk 1.7 и jdk 1.8 структура ConcurrentHashMap сильно изменилась, что будет объяснено позже.

2.1 Структура в jdk 1.7

jdk1.7 ConcurrentHashMap结构

В jdk 1.7 ConcurrentHashMap состоит из структуры данных Segment и структуры массива HashEntry. Возьмите сегментные замки для обеспечения безопасности.

Segment — это реентерабельная блокировка ReentrantLock, играющая роль блокировки в ConcurrentHashMap; HashEntry используется для хранения данных о паре ключ-значение.

ConcurrentHashMap содержит массив Segments, а Segment содержит массив HashEntry s. Структура Segment аналогична структуре HashMap, которая представляет собой массив и структуру связанного списка.

2.2 Структура в jdk 1.8

jdk1.8 ConcurrentHashMap结构

Реализация JDK1.8 отказалась от концепции сегмента, но напрямую реализовала ее со структурой данных массива узлов + связанный список + красно-черное дерево.Управление параллелизмом осуществляется с использованием Synchronized и CAS.Все выглядит оптимизированным и потоковым safe.HashMap, хотя структуру данных Segment все еще можно увидеть в JDK1.8, атрибуты были упрощены, чтобы быть совместимыми со старой версией.

3. Реализация

3.1 Реализация в JDK 1.7

3.1.1 Инициализация

Инициализация ConcurrentHashMap заключается в инициализации размера сегмента (представленного ssize) посредством битовой операции, которая вычисляется concurrentLevel.

int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
    ++sshift;
    ssize <<= 1;
}

ssize вычисляется операцией определения местоположения (ssize

Инициализация HashEntry для каждого элемента Segment также рассчитывается в соответствии с операцией местоположения, которая представлена ​​заглавной буквой.

int cap = 1;
while (cap < c)
    cap <<= 1;

Вычисление размера HashEntry также является степенью N числа 2 (cap

3.1.2 получить операцию

Операция получения сегмента очень проста и эффективна: он хешируется один раз, затем значение хеш-функции используется для поиска сегмента с помощью операции хеширования, а затем для поиска элемента с помощью алгоритма хеширования.

public V get(Object key){
    int hash = hash(key.hashCode());
    return segmentFor(hash).get(key,hash);
}

Эффективность операции get заключается в том, что весь процесс get не нужно блокировать, он будет заблокирован и перечитан, пока не будет прочитано пустое значение. Причина в том, чтобы определить общую переменную, используемую какvolatileтип.

transient volatile int count;
volatile V value;

3.1.3 поставить операцию

Для вставки данных ConcurrentHashMap здесь выполняются два хэша, чтобы определить место хранения данных.

static class Segment<K,V> extends ReentrantLock implements Serializable {
    //省略
}

При выполнении операции put она проходит два этапа:

  1. Определите, требуется ли расширение
  2. Перейдите к расположению, где был добавлен элемент и поместил его в массив Hashentry

Процесс вставки выполнит первый ключевой хеш, чтобы найти позицию сегмента.Если сегмент не был инициализирован, то есть присвоить значение с помощью операции CAS, а затем выполнить вторую хэш-операцию, чтобы найти соответствующую позицию HashEntry, которая будет быть унаследованы здесь.Характеристики блокировки, при вставке данных в указанную позицию HashEntry (метод вставки хвоста), она будет наследовать ReentrantLock'stryLock()Метод пытается получить блокировку. Если получение прошло успешно, она будет вставлена ​​непосредственно в соответствующую позицию. Если поток уже получил блокировку сегмента, текущий поток продолжит вызов в режиме вращения.tryLock()метод для получения блокировки, приостановки после указанного количества раз и ожидания пробуждения.

3.1.4 операция размера

Вычисление размера элемента ConcurrentHashMap является параллельной операцией, то есть при вычислении размера данные вставляются одновременно, что может привести к тому, что рассчитанный размер будет отличаться от вашего фактического размера.

Решение, принятое ConcurrentHashMap, состоит в том, чтобы сначала попытаться подсчитать размер каждого сегмента, не блокируя сегмент дважды.Если количество изменяется во время статистического процесса, метод блокировки используется для подсчета размера всех сегментов.

try {
    for (; ; ) {
        if (retries++ == RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                ensureSegment(j).lock(); // force creation  
        }
        sum = 0L;
        size = 0;
        overflow = false;
        for (int j = 0; j < segments.length; ++j) {
            Segment<K, V> seg = segmentAt(segments, j);
            if (seg != null) {
                /* 在put、remove、clean方法里操作
                * 元素都会将变量modCount进行加一,
                * 统计也是依靠这个变量的前后变化来进行的 */
                sum += seg.modCount;
                int c = seg.count;
                if (c < 0 || (size += c) < 0) overflow = true;
            }
        }
        if (sum == last)
            break;
        last = sum;
    }
} finally {
    if (retries > RETRIES_BEFORE_LOCK) {
        for (int j = 0; j < segments.length; ++j)
            segmentAt(segments, j).unlock();
    }
}

3.2 Реализация в JDK 1.8

3.2.1 Основные атрибуты и концепции

посмотриОсновные свойства:

//node数组最大容量:2^30=1073741824
private static final int MAXIMUM_CAPACITY = 1 << 30;
//默认初始值,必须是2的幂数
private static final int DEFAULT_CAPACITY = 16;
//数组可能最大值,需要与toArray()相关方法关联
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//并发级别,遗留下来的,为兼容以前的版本
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//负载因子
private static final float LOAD_FACTOR = 0.75f;
//链表转红黑树阀值,> 8 链表转换为红黑树
static final int TREEIFY_THRESHOLD = 8;
//树转链表阀值,小于等于6(tranfer时,lc、hc=0两个计数器分别++记录原bin、新binTreeNode数量,<=UNTREEIFY_THRESHOLD 则untreeify(lo))
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
private static final int MIN_TRANSFER_STRIDE = 16;
private static int RESIZE_STAMP_BITS = 16;
//2^15-1,help resize的最大线程数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
//32-16=16,sizeCtl中记录size大小的偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
//forwarding nodes的hash值
static final int MOVED = -1;
//树根节点的hash值
static final int TREEBIN = -2;
//ReservationNode的hash值
static final int RESERVED = -3;
//可用处理器数量
static final int NCPU = Runtime.getRuntime().availableProcessors();
//存放node的数组
transient volatile Node<K,V>[] table;
/*控制标识符,用来控制table的初始化和扩容的操作,不同的值有不同的含义
    *当为负数时:-1代表正在初始化,-N代表有N-1个线程正在 进行扩容
    *当为0时:代表当时的table还没有被初始化
    *当为正数时:表示初始化或者下一次进行扩容的大小
    */
private transient volatile int sizeCtl;

Важные понятия:

  1. table: значение по умолчанию равно нулю. Инициализация происходит при первой операции вставки. Размер по умолчанию — это массив из 16, который используется для хранения данных узла Node. При расширении размер всегда равен степени 2.
  2. nextTable: значение по умолчанию равно нулю, вновь сгенерированный массив во время расширения в два раза превышает размер исходного массива.
  3. Node: Структура данных, которая хранит ключ, значение и значение хеширования ключа.
class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;
    //省略部分代码
}

где используются оба значения и nextvolatileУкрашено для обеспечения одновременной видимости.

  1. ForwardingNode: специальный узел с хеш-значением -1, в котором хранится ссылка на nextTable.
final class ForwardingNode<K,V> extends Node<K,V> {
    final Node<K,V>[] nextTable;
    ForwardingNode(Node<K,V>[] tab) {
        super(MOVED, null, null, null);
        this.nextTable = tab;
    }
}

ForwardingNode будет играть роль только при расширении таблицы.Он помещается в таблицу в качестве заполнителя, чтобы указать, что текущий узел является нулевым или был перемещен.

  1. Класс TreeNode и класс TreeBin:  Класс TreeNode представляет каждый узел красно-черного дерева. Когда количество узлов в связанном списке превышает указанное значение, связанный список будет превращен в красно-черное дерево, и, конечно, каждый узел будет преобразован в TreeNode. В отличие от HashMap, ConcurrentHashMap напрямую хранит в ведре не TreeNode, а TreeBin, который поддерживает красно-черное дерево внутри TreeBin, а это значит, что TreeNode используется внутри TreeBin.

3.2.2 Инициализация

При создании экземпляра ConcurrentHashMap с параметрами размер таблицы будет скорректирован в соответствии с параметрами.Предполагая, что параметр равен 100, он в конечном итоге будет скорректирован до 256, гарантируя, что размер таблицы всегда будет степенью двойки.

инициализация таблицы

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        //如果一个线程发现sizeCtl<0,意味着另外的线程执行CAS操作成功,当前线程只需要让出cpu时间片
        if ((sc = sizeCtl) < 0) 
            Thread.yield(); // lost initialization race; just spin
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

3.2.3 операция ввода

Предполагая, что таблица была инициализирована, операция размещения использует CAS + synchronized для реализации параллельных операций вставки или обновления.

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        ...省略部分代码
    }
    addCount(1L, binCount);
    return null;
}

хеш-алгоритм

static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}

Найдите позицию индекса в таблице, n — размер таблицы.

int index = (n - 1) & hash

Получить элемент f соответствующего индекса в таблице

Unsafe.getObjectVolatile может напрямую получать данные из указанной памяти, гарантируя, что данные будут самыми последними при каждом их получении.

Если f равно null, это означает, что элемент впервые вставляется в эту позицию в таблице, а узел Node вставляется с помощью метода Unsafe.compareAndSwapObject.

Если CAS прошел успешно, узел Node был вставлен, а затем метод addCount(1L, binCount) проверит, нужно ли расширить текущую емкость.

Если CAS дает сбой, это означает, что другой поток вставил узел раньше времени, и спин повторяет попытку вставить узел в эту позицию.

Если хэш-значение f равно -1, это означает, что текущий f является узлом ForwardingNode, что означает, что другие потоки расширяются, и операция расширения будет выполняться вместе.

В других случаях новый узел Node вставляется в соответствующую позицию в виде связанного списка или красно-черного дерева.Этот процесс использует синхронные встроенные блокировки для достижения параллелизма.Код выглядит следующим образом:

synchronized (f) {
    if (tabAt(tab, i) == f) {
        if (fh >= 0) {
            binCount = 1;
            for (Node<K,V> e = f;; ++binCount) {
                K ek;
                if (e.hash == hash &&
                    ((ek = e.key) == key ||
                     (ek != null && key.equals(ek)))) {
                    oldVal = e.val;
                    if (!onlyIfAbsent)
                        e.val = value;
                    break;
                }
                Node<K,V> pred = e;
                if ((e = e.next) == null) {
                    pred.next = new Node<K,V>(hash, key,
                                              value, null);
                    break;
                }
            }
        }
        else if (f instanceof TreeBin) {
            Node<K,V> p;
            binCount = 2;
            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                           value)) != null) {
                oldVal = p.val;
                if (!onlyIfAbsent)
                    p.val = value;
            }
        }
    }
}

Синхронизируйте на узле f, прежде чем узел будет вставлен, снова используяtabAt(tab, i) == fРешение для предотвращения модификации другими потоками.

Если f.hash >= 0, это означает, что f является головным узлом структуры связанного списка, пройдите по связанному списку, если соответствующий узел узла найден, измените значение, в противном случае добавьте узел в конец связанного списка. . Если f является узлом типа TreeBin, указывающим, что f является корневым узлом красно-черного дерева, то необходимо пройтись по элементам древовидной структуры, обновить или добавить узлы. Если количество узлов в связанном списке binCount >= TREEIFY_THRESHOLD (по умолчанию 8), то преобразовать связанный список в красно-черную древовидную структуру.

расширение таблицы

Когда емкости таблицы недостаточно, то есть количество элементов в таблице достигает порога емкостиsizeCtl, таблицу нужно расширить.

Все расширение разделено на две части:

Создайте NEXTTABLE с удвоенным размером таблицы. Скопируйте данные таблицы в NextTable.

Эти два процесса очень просто реализовать в одном потоке, но ConcurrentHashMap поддерживает одновременную вставку, и операция расширения, естественно, будет происходить одновременно.В этом случае второй шаг может поддерживать параллельную репликацию узлов, так что производительность естественным образом улучшится. Меньше, но и сложность реализации выросла на уровень.

Посмотрите на первый шаг,построить следующую таблицу, нет никаких сомнений в том, что только один поток может инициализировать nextTable в этом процессе Конкретная реализация выглядит следующим образом:

private final void addCount(long x, int check) {
    ... 省略部分代码
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            if (sc < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}

Измените значение sizeCtl с помощью Unsafe.compareAndSwapInt, чтобы убедиться, что только один поток может инициализировать nextTable.Длина развернутого массива в два раза больше исходной, но емкость равна 1,5.

Узел перемещается из таблицы к Taillttable, общая идея - это процесс прохождения и копирования.

Во-первых, на основе расчета, необходимого для получения количества проходов i, затем с помощью метода tabAt для получения позиции элемента F i инициализируйте пример forwardNode fwd.

Если f == null, поместите fwd в позицию i в таблице Этот процесс реализуется методом Unsafe.compareAndSwapObjectf, который ловко реализует параллельное перемещение узлов.

Если f является головным узлом связанного списка, создайте перевернутый связанный список, поместите их в позиции i и i+n nextTable соответственно, и перемещение будет завершено.Используйте метод Unsafe.putObjectVolatile, чтобы присвоить fwd исходной позиции. стола. Если f является узлом TreeBin, также выполните обработку в обратном порядке и решите, требуется ли untreeify, поместите результаты обработки в позиции i и i+n nextTable соответственно, и перемещение будет завершено, а также используйте метод Unsafe. putObjectVolatile, чтобы указать исходное положение таблицы. Назначьте fwd. После обхода всех узлов работа копирования завершается, таблица указывает на nextTable, а sizeCtl обновляется до 0,75-кратного размера нового массива, и расширение завершается.

красно-черная структура дерева

Примечание. Если элементы в структуре связанного списка превышают пороговое значение TREEIFY_THRESHOLD, по умолчанию8, то связанный список преобразуется в красно-черное дерево для повышения эффективности обхода и запросов.

if (binCount != 0) {
    if (binCount >= TREEIFY_THRESHOLD)
        treeifyBin(tab, i);
    if (oldVal != null)
        return oldVal;
    break;
}

Далее давайте посмотрим, как построить древовидную структуру, код выглядит следующим образом:

private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    if (tab != null) {
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            tryPresize(n << 1);
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
            synchronized (b) {
                if (tabAt(tab, index) == b) {
                    TreeNode<K,V> hd = null, tl = null;
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                              null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}

Видно, что кодовый блок узла связующего дерева синхронизирован.После ввода синхронизированного кодового блока проверьте еще раз, был ли изменен элемент в позиции индекса в таблице.

  1. В соответствии со связанным списком узлов в позиции индекса в таблице повторно сгенерируйте связанный список TreeNode с hd в качестве головного узла.
  2. В соответствии с головным узлом hd создается древовидная структура TreeBin, а корневой узел древовидной структуры записывается в память в позиции индекса таблицы Конкретная реализация выглядит следующим образом:
TreeBin(TreeNode<K,V> b) {
    super(TREEBIN, null, null, null);
    this.first = b;
    TreeNode<K,V> r = null;
    for (TreeNode<K,V> x = b, next; x != null; x = next) {
        next = (TreeNode<K,V>)x.next;
        x.left = x.right = null;
        if (r == null) {
            x.parent = null;
            x.red = false;
            r = x;
        }
        else {
            K k = x.key;
            int h = x.hash;
            Class<?> kc = null;
            for (TreeNode<K,V> p = r;;) {
                int dir, ph;
                K pk = p.key;
                if ((ph = p.hash) > h)
                    dir = -1;
                else if (ph < h)
                    dir = 1;
                else if ((kc == null &&
                          (kc = comparableClassFor(k)) == null) ||
                         (dir = compareComparables(kc, k, pk)) == 0)
                    dir = tieBreakOrder(k, pk);
                    TreeNode<K,V> xp = p;
                if ((p = (dir <= 0) ? p.left : p.right) == null) {
                    x.parent = xp;
                    if (dir <= 0)
                        xp.left = x;
                    else
                        xp.right = x;
                    r = balanceInsertion(r, x);
                    break;
                }
            }
        }
    }
    this.root = r;
    assert checkInvariants(root);
}

Двоичное дерево в основном строится в соответствии с хеш-значением узла Node.

3.2.4 получить операцию

Операция get намного проще, чем операция put.

public V get(Object key) {
    Node<K,V>[] tab; 
    Node<K,V> e, p;
    int n, eh; 
    K ek;
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}
  1. Определение того, пуста ли таблица, если она пуста, напрямую возвращает значение null.
  2. Вычислите хеш-значение ключа и получите узел Node в указанной позиции в указанной таблице, найдите соответствующий узел, пройдя по связанному списку или древовидной структуре, и верните значение значения.

3.2.4 операция размера

Размер JDK1.8 рассчитывается CAS по baseCount и counterCell, и, наконец, размер получается по baseCount и обходу массива CounterCell. Конкретная ссылка:Принципиальный анализ метода размера ConcurrentHashMap

4. Почему в JDK 1.8 следует отказаться от блокировок сегментов

Многие люди не понимают, почему Дуг Ли сделал такое большое изменение в JDK1.8.При использовании синхронизации с блокировкой тяжелого уровня производительность выше.Причины следующие:

  1. Детализация блокировок в jdk1.8 лучше.. ConcurrentLevel (одновременное число) ConcurrentHashMap в jdk1.7 в основном фиксирован. ConcurrentLevel в jdk1.8 соответствует размеру массива, и каждый раз, когда емкость увеличивается, параллелизм удваивается.
  2. Интродукция красно-черных деревьев, оптимизация связанного списка делает размещение и получение более эффективными при возникновении конфликта хэшей
  3. Получить поддержку JVM, ReentrantLock все-таки находится на уровне API, и места для последующей оптимизации производительности мало. Synchronized напрямую поддерживается JVM, и JVM может выполнять соответствующие меры по оптимизации во время выполнения: огрубление блокировки, устранение блокировки, прокручивание блокировки и так далее. Это позволяет синхронизироваться для достижения повышения производительности при обновлении версии JDK без изменения кода.

5. Резюме и ссылки

резюме

Видно, что структура данных ConcurrentHashMap версии JDK1.8 близка к HashMap, условно говоря, ConcurrentHashMap лишь добавляет операции синхронизации для контроля параллелизма.ReentrantLock+Segment+HashEntry, до версии JDK1.8synchronized+CAS+HashEntry+红黑树, оптимизация действительно отличная.

использованная литература