Параллелизм Java 5: ConcurrentHashMap

Java

Зачем использовать ConcurrentHashMap

HashMap не является потокобезопасным, операция put может привести к бесконечному циклу. Его решения — HashTable и Collections.synchronizedMap(hashMap) . Обе эти схемы являются блокировками для чтения и записи, которые являются монопольными и неэффективными.

HashMap вызовет бесконечный цикл при одновременном выполнении операции размещения, поскольку многопоточность приводит к тому, что связанный список HashMap Entry формирует кольцевую структуру данных, тогда следующий узел Entry никогда не будет пустым, а Entry будет получен в бесконечном цикле. .

HashTable использует синхронизацию для обеспечения безопасности потоков, но в случае интенсивной конкуренции потоков эффективность очень низкая. Причина этого в том, что все потоки, обращающиеся к контейнеру, должны бороться за блокировку.

В ответ на вышеуказанные проблемы ConcurrentHashMap использует технологию сегментации блокировки.В контейнере несколько блокировок, и каждая блокировка используется для части данных.Когда несколько потоков обращаются к данным в разных сегментах данных, между потоками не будет конкуренции блокировок. .

Реализация ConcurrentHashMap (JDK 1.7)

В JDK 1.7 ConcurrentHashMap реализован с использованием массива Segment и массива HashEntry. Среди них Segment — это своего рода реентерабельная блокировка (ReentrantLock), играющая роль блокировки. А HashEntry используется для хранения пар ключ-значение данных. Структура показана на следующем рисунке:

Сегмент содержит массив HashEntry, каждый HashEntry является элементом структуры связанного списка. Каждый сегмент защищает элемент массива HashEntry.

инициализация

Во время инициализации вычисляются размер ssize массива Segment и максимальный размер массива HashEntry в каждом Segment, а также инициализируется первый элемент массива Segment. Где ssize — это степень числа 2, значение по умолчанию равно 16, размер шапки также является степенью числа 2, а минимальное значение равно 2. Окончательный результат рассчитывается на основе начальной емкости initialCapacity.

//计算segment数组长度
if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        //初始化segmentShift和SegmentMask
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        //计算每个Segment中HashEntry数组大小cap
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
        // 初始化segment数组和segment[0]
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;

Во-первых, массив segments инициализируется с длиной ssize, вычисленной concurrencyLevel. Необходимо убедиться, что длина ssize равна 2 в N-й степени, а максимальная длина массива сегментов равна 65536.

Затем инициализируются две глобальные переменные, segmentShift и segmentMask, которые используются для поиска алгоритма хеширования сегмента. segmentShift — это количество битов, используемых для хэш-операции, а segmentMask — это маска для хэш-операции.

Затем рассчитайте максимальный размер массива HashEntry в каждом сегменте на основе двух параметров initialCapaicity и loadfactor.

Наконец, в соответствии с параметрами, определенными выше, инициализируется массив сегментов и segment[0].

получить операцию

Весь процесс операции get не требует блокировки, поэтому он очень эффективен. Сначала найдите ключ к сегменту после хэша, а затем найдите конкретный элемент с помощью хэша. Нет необходимости в блокировке, потому что метод get определяет общие переменные, которые должны использоваться как изменчивые типы, поэтому он может поддерживать видимость между потоками и может гарантировать, что значения с истекшим сроком действия не будут считаны, когда несколько потоков читают в одном месте. в то же время.

поставить операцию

Метод put должен записывать в общую переменную, которая должна быть заблокирована для безопасности потоков. Метод put сначала находит сегмент, а затем выполняет операцию вставки в сегмент. Операция вставки сначала определяет, нужно ли расширять массив HashEntry в сегменте, а затем находит позицию добавленного элемента и помещает его в массив HashEntry.

Расширение сегмента является более подходящим, чем HashMap, потому что последний должен определить, была ли достигнута емкость после вставки элементов, и если она будет достигнута, она будет расширена, но она может не быть вставлена ​​после расширения, и было выполнено недопустимое расширение. Когда сегмент расширяется, он сначала создает массив с вдвое большей исходной емкостью, а затем повторно хэширует элементы исходного массива и вставляет их в новый массив. При этом для эффективности ConcurrentHashMap не расширяет весь контейнер, а расширяет только определенный сегмент.

метод размера

Каждый Segment имеет изменяемую volatile глобальную переменную count .При расчете размера всего ConcurrentHashMap очевидно, что все counts можно суммировать. Однако переменные, измененные с помощью volatile, не могут гарантировать атомарность многопоточности, а любое прямое накопление чревато проблемами параллелизма. Но если остальные операции заблокированы при вызове метода size, это тоже неэффективно.

Метод ConcurrentHashMap состоит в том, чтобы сначала дважды попытаться вычислить без блокировки.Если результат один и тот же дважды, результат правильный. Если результаты расчетов отличаются, заблокируйте каждый сегмент для статистики.

Реализация ConcurrentHashMap (JDK 1.8)

В JDK 1.8 идея сегментированных блокировок была изменена, и для обеспечения безопасности параллелизма использовался массив Node + CAS + Synchronized. Нижний слой по-прежнему использует структуру хранения массив + связанный список + красно-черное дерево.

Node

В JDK 1.8 используйте Node вместо HashEntry, оба работают одинаково. В Node переменные val и next имеют изменчивое оформление, чтобы обеспечить видимость.

Используйте табличную переменную для хранения массива узлов Node, значение по умолчанию — null, размер по умолчанию — 16, а размер всегда равен степени 2 при каждом расширении. При расширении используйте nextTable для хранения вновь сгенерированных данных, а массив в два раза больше, чем таблица.

ForwardingNode — это специальный узел Node с хэш-значением -1, в котором хранится ссылка на nextTable. Он работает только тогда, когда таблица развернута и помещается в таблицу в качестве заполнителя, чтобы указать, что текущий узел является нулевым или был перемещен.

TreeNode

В HashMap основной структурой данных является связанный список. В ConcurrentHashMap, если данные в связанном списке слишком длинные, они будут преобразованы в красно-черное дерево для обработки. Оборачивая узлы связанного списка в TreeNodes и помещая их в TreeBin, преобразование красно-черного дерева завершается через TreeBin.

TreeBin

TreeBin не отвечает за упаковку пар ключ-значение, он используется для упаковки узлов TreeNode, когда связанный список преобразуется в красно-черное дерево, и используется для построения красно-черного дерева.

Инициализация: initTable()

В конструкторе ConcurrentHashMap устанавливает только некоторые параметры. Когда элемент вставляется впервые, он инициализируется с помощью метода initTable().

private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
        //有其他线程在初始化,挂起当前线程
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
        //获得了初始化的权利,使用CAS将sizeCtl设置为-1,表示本线程正在初始化
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            //进行初始化
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        //下次扩容的大小,相当于0.75*n
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

Ключом к этому методу является sizeCtl.

sizeCtl: Идентификатор элемента управления, используемый для управления операциями инициализации и расширения таблицы, имеет разное применение в разных местах, его значения тоже разные, и значения тоже разные:

  • Отрицательные числа указывают на то, что выполняется инициализация или расширение.
  • -1 означает инициализацию
  • -N означает, что потоки N-1 расширяют емкость
  • Положительное число или 0 указывает на то, что хэш-таблица не была инициализирована Это значение указывает размер инициализации или следующего расширения.

sizeCtl по умолчанию равен 0. Если значение меньше 0, это означает, что инициализируются другие потоки, и поток необходимо приостановить. Если поток получил право на инициализацию, сначала установите его в -1. Наконец, установите для sizeCtl значение 0,75*n, которое представляет собой порог расширения.

поставить операцию

Основная идея операции put по-прежнему состоит в том, чтобы вставить узел в таблицу в соответствии с вычислением хэша, если он пустой, вставить его напрямую, иначе вставить в связанный список или дерево.

Сначала вычислите хеш-значение, затем войдите в цикл для обхода таблицы и попробуйте вставить.

int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
    Node<K,V> f; int n, i, fh;
//详细代码接下来分别讲述
}

Сначала определите, пуста ли таблица, если она пуста или равна нулю, сначала инициализируйте ее.

if (tab == null || (n = tab.length) == 0)
                tab = initTable();

Если он был инициализирован и в позиции вставки нет узла, вставьте узел напрямую. Попытайтесь вставить узел с помощью CAS.

else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                        new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }

Если есть расширяющиеся темы, сначала помогите расшириться.

//当前位置的hashcode等于-1,需要扩容
 else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);

Если ни один из них не удовлетворен, используйте синхронизированную блокировку для записи данных. В зависимости от структуры данных, если это связанный список, вставьте хвост, если это узел дерева, используйте операцию вставки дерева.

else {
V oldVal = null;
//对该节点进行加锁处理(hash值相同的链表的头节点)
synchronized (f) {
    if (tabAt(tab, i) == f) {
        //fh > 0 表示为链表,将该节点插入到链表尾部
        if (fh >= 0) {
            binCount = 1;
            for (Node<K,V> e = f;; ++binCount) {
                K ek;
                //hash 和 key 都一样,替换value
                if (e.hash == hash &&
                        ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
                    oldVal = e.val;
                    //putIfAbsent()
                    if (!onlyIfAbsent)
                        e.val = value;
                    break;
                }
                Node<K,V> pred = e;
                //链表尾部  直接插入
                if ((e = e.next) == null) {
                    pred.next = new Node<K,V>(hash, key,
                            value, null);
                    break;
                }
            }
        }
        //树节点,按照树的插入操作进行插入
        else if (f instanceof TreeBin) {
            Node<K,V> p;
            binCount = 2;
            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                    value)) != null) {
                oldVal = p.val;
                if (!onlyIfAbsent)
                    p.val = value;
            }
        }
    }
}

В конце цикла for определите, нужно ли преобразовать длину связанного списка в древовидную структуру.

if (binCount != 0) {
    // 如果链表长度已经达到临界值8,把链表转换为树结构
    if (binCount >= TREEIFY_THRESHOLD)
        treeifyBin(tab, i);
    if (oldVal != null)
        return oldVal;
    break;
}

Наконец, если это узел обновления, oldVal уже был возвращен ранее, в противном случае вставляется новый узел. Вам также необходимо использовать метод addCount(), чтобы добавить единицу к размеру.

Обобщенные шаги следующие:

  1. Определите, являются ли ключ и значение нулевыми, если да, создайте исключение
  2. Рассчитать хэш
  3. Пройдите по таблице и вставьте узлы
    • Если таблица пуста, инициализировать
    • Позиция вставки пуста, вставьте напрямую без блокировки
    • Если это узел ForwardingNode, это означает, что другие потоки расширяются, помогая потокам расширяться вместе.
    • Если это структура связанного списка, перейдите по связанному списку, если есть ключ, обновите значение, в противном случае вставьте его в конец связанного списка; если это узел TreeBin, обновите или добавьте узлы в соответствии с красным Метод черного дерева.
    • Если длина связанного списка после завершения окажется больше установленного порога, замените его красно-черным деревом.
  4. Если это обновление, верните oddVal; если это вставка, используйте метод addCount(), чтобы увеличить размер и вернуть null.

получить метод

  1. Рассчитать хэш-значение
  2. Определить, пуста ли таблица, вернуть null, если она пуста
  3. Получить узел Node по хешу, если это узел, вернуть значение
  4. Найдите соответствующий узел по связанному списку или красно-черному дереву и верните значение
  5. Возвращает ноль, если не найден

использованная литература