Углубленный анализ реализации ConcurrentHashMap, повесить интервьюера? нет проблем

Java

В разработке мы часто используем контейнер HashMap для хранения пар ключ-значение K-V, но в случае параллельной многопоточности контейнер HashMap небезопасен,Потому что при размещении элемента, если срабатывает операция расширения, то есть перехеширование, содержимое исходного массива будет повторно хешировано в новый массив расширения, но в процессе расширения другие потоки также выполняют операцию размещения , Если эти два хеш-значения каждого элемента одинаковы, и они могут быть представлены связанным списком в одном и том же массиве в одно и то же время, что приводит к замкнутому циклу, что приводит к бесконечному циклу во время получения, поэтому HashMap является потоком -небезопасно.

Так существует ли безопасный контейнер карты? Да, в настоящее время в JDK есть три безопасных контейнера карт:

  • HashTable
  • Collections.SynchronizedMap (метод, предоставляемый оболочкой синхронизации)
  • ConcurrentHashMap

Давайте взглянем на первые два контейнера, они оба используют очень грубый метод синхронизации, а производительность относительно низкая в случае высокой параллелизма. Hashtable добавляет «синхронизированные» блокировки к различным методам, таким как put, get и size, для обеспечения безопасности, в результате чего все параллельные операции конкурируют за одну и ту же блокировку. Когда поток выполняет операцию синхронизации, другие потоки могут только ждать. Это значительно снижает эффективность параллельных операций.

Давайте взглянем на оболочку синхронизации SynchronizedMap, предоставленную Collections.Сначала мы можем взглянуть на исходный код SynchronizedMap:

private static class SynchronizedMap<K,V>
        implements Map<K,V>, Serializable {
        private static final long serialVersionUID = 1978198479659022715L;

        private final Map<K,V> m;     // Backing Map
        final Object      mutex;        // Object on which to synchronize

        SynchronizedMap(Map<K,V> m) {
            this.m = Objects.requireNonNull(m);
            mutex = this;
        }

        SynchronizedMap(Map<K,V> m, Object mutex) {
            this.m = m;
            this.mutex = mutex;
        }
        public int size() {
            synchronized (mutex) {return m.size();}
        }
        public boolean isEmpty() {
            synchronized (mutex) {return m.isEmpty();}
        }
        public boolean containsKey(Object key) {
            synchronized (mutex) {return m.containsKey(key);}
        }
        public boolean containsValue(Object value) {
            synchronized (mutex) {return m.containsValue(value);}
        }
        public V get(Object key) {
            synchronized (mutex) {return m.get(key);}
        }

        public V put(K key, V value) {
            synchronized (mutex) {return m.put(key, value);}
        }
        public V remove(Object key) {
            synchronized (mutex) {return m.remove(key);}
        }
        public void putAll(Map<? extends K, ? extends V> map) {
            synchronized (mutex) {m.putAll(map);}
        }
        public void clear() {
            synchronized (mutex) {m.clear();}
        }
        ......
 }

Из исходного кода мы видим, что, хотя SynchronizedMap не имеет синхронизированных блокировок, он использует this как мьютекс для взаимного исключения, так что, строго говоря, SynchronizedMap — это то же самое, что и HashTable, и в нем нет никаких улучшений.

Третий ConcurrentHashMap также является главным героем этой статьи.По сравнению с первыми двумя безопасными контейнерами Map, он имеет большие изменения в дизайне и мышлении, а также значительно повышает эффективность параллелизма Map. Что касается реализации самого контейнера ConcurrentHashMap, между версиями будут большие различия, типичными версиями являются JDK1.7 и JDK1.8, которые, можно сказать, претерпели кардинальные изменения, которые также будут представлены в этой статье.Реализация этих двух версий ConcurrentHashMap в основном сосредоточена на версии JDK 1.8.Я лично считаю, что JDK 1.7 стал прошедшим временем, и нет необходимости его глубоко изучать.

Реализация ConcurrentHashMap в JDK 1.7

В JDK 1.7 и более ранних версиях ConcurrentHashMap был предложен для решения проблемы, связанной с блокировкой HashTable всей хеш-таблицы.Решение для блокировки сегментации, блокировка сегмента предназначена для разбиения большой хеш-таблицы на несколько небольших хэш-таблиц и блокировки маленькой хеш-таблицы, когда она должна быть заблокирована, чтобы повысить производительность хеш-таблицы. ConcurrentHashMap в JDK1.7 представляет объект Segment, который разбивает всю хэш-таблицу на объекты сегментов один за другим.Каждый объект Segment можно рассматривать как детальный HashMap.

Объект Segment наследует класс ReentrantLock, поскольку объект Segment становится замком, обеспечивающим безопасность данных.Внутренняя хэш-таблица поддерживается в объекте Segment через массив HashEntry. Каждый HashEntry представляет K-V на карте, и в случае возникновения конфликта хэшей в этом месте будет сформирован связанный список.

В JDK1.7 общую структуру ConcurrentHashMap можно описать следующим образом:

ConcurrentHashMap 1.7 存储结构

Что нас больше всего волнует в ConcurrentHashMap, так это то, как решить проблему небезопасности, вызванную расширением HashMap при установке? Давайте посмотрим, как ConcurrentHashMap в JDK 1.7 решает эту проблему, начнем с метода put:

 public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        // 二次哈希,以保证数据的分散性,避免哈希冲突
        int hash = hash(key.hashCode());
        int j = (hash >>> segmentShift) & segmentMask;
        // Unsafe 调用方式,直接获取相应的 Segment
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }

В методе put первым шагом является уменьшение возможности коллизий хэшей посредством вторичного хеширования.В соответствии со значением хеша соответствующий сегмент напрямую получается в методе вызова Unsafe, и, наконец, данные добавляются в контейнер с помощью метода put метод объекта сегмента Готово. Исходный код метода put объекта Segment выглядит следующим образом:

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 无论如何,确保获取锁 scanAndLockForPut会去查找是否有key相同Node
    ConcurrentHashMap.HashEntry<K,V> node = tryLock() ? null :
            scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        ConcurrentHashMap.HashEntry<K,V>[] tab = table;
        int index = (tab.length - 1) & hash;
        ConcurrentHashMap.HashEntry<K,V> first = entryAt(tab, index);
        for (ConcurrentHashMap.HashEntry<K,V> e = first;;) {
            // 更新已存在的key
            if (e != null) {
                K k;
                if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                e = e.next;
            }
            else {
                if (node != null)
                    node.setNext(first);
                else
                    node = new ConcurrentHashMap.HashEntry<K,V>(hash, key, value, first);
                int c = count + 1;
                // 判断是否需要扩容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node);
                else
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        unlock();
    }
    return oldValue;
}

Поскольку сам объект Segment является замком, при добавлении данных соответствующий блок объекта Segment блокируется, и другие потоки не могут оперировать объектом Segment, что обеспечивает безопасность данных.То же самое относится и к расширению емкости.Расширение ConcurrentHashMap в JDK1.7 предназначено только для расширения массива HashEntry в объекте Segment или, поскольку объект Segment является блокировкой, другие потоки не могут обновлять хэш-таблицу сегмента во время процесса повторного хеширования. Выполните операцию, которая решает проблему замкнутого цикла, вызванную помещением данных в HashMap..

Так много разговоров о ConcurrentHashMap в JDK1.7, нам просто нужно подождать, пока ConcurrentHashMap будет принят в JDK1.7.блокировка сегментаспособ решить проблему ненадежности HashMap.

Реализация ConcurrentHashMap в JDK1.8

В JDK1.8 ConcurrentHashMap претерпел колоссальные изменения, по объему реализованного кода видно, что в 1.7 меньше 2000 строк кода, а в 1.8 больше 6000 строк кода. Без лишних слов, давайте посмотрим на изменения.

Начнем с безопасности контейнера.С точки зрения безопасности контейнера, ConcurrentHashMap в 1.8 отказался от технологии сегментации в JDK1.7, но принял механизм CAS + синхронизированный для обеспечения безопасности параллелизма, но сохранил определение сегмента в реализации ConcurrentHashMap, которое предназначено только для обеспечения последовательности. совместимость и не имеет никакой структурной полезности.Вот знание механизма CAS:

CAS-механизм

Типичным применением CAS является AtomicInteger, CAS — это своего рода атомарная операция, которая может гарантировать, что операция чтения и записи является атомарной. CAS сравнивает значение в памяти с ожидаемым значением и изменяет значение в памяти только тогда, когда они равны.CAS обеспечивает безопасность потоков в параллельных сценариях, обеспечивая при этом производительность. В Java реализация CAS находится в классе sun.misc.Unsafe, который определяет большое количество нативных методов.Реализация CAS имеет следующие методы:

public final native boolean compareAndSwapObject(Object o, long offset, Object expected, Object x);
public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);
public final native boolean compareAndSwapLong(Object o, long offset, long expected, long x);

Мы можем видеть только определение, но не конкретную реализацию. Конкретная реализация зависит от операционной системы, поэтому нам это не важно. Просто поймите, что означают параметры в методе:

  • o : целевой объект операции
  • offset : адрес смещения памяти операнда назначения
  • ожидаемое : ожидаемое значение
  • х: обновить значение

Хотя механизм CAS не требует блокировки, безопасен и эффективен, он также имеет некоторые недостатки, которые сводятся к следующему:

  • Время циклической проверки может быть больше, но количество циклических проверок может быть ограничено.
  • Атомарные операции могут выполняться только с общей переменной.
  • Существует проблема ABA (проблема ABA означает, что во время двух проверочных операций CAS значение целевой переменной меняется с A на B и обратно на A, но CAS не может увидеть изменение между ними, для него значение целевая переменная не изменяется. Ничего не изменилось, это всегда A, поэтому операция CAS продолжает обновлять значение целевой переменной.)

В структуре хранения,В JDK1.8 ConcurrentHashMap отказался от структуры HashEntry и принял структуру, очень похожую на HashMap, используя массив узлов и связанный список (когда длина связанного списка больше 8, он преобразуется в красно-черное дерево) ., узел Node устроен следующим образом:

static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;
        ...省略...
 }       

Как и в HashMap, поле Key модифицируется с помощью final, что указывает на неизменяемость ключа в течение жизненного цикла, а поле val модифицируется с помощью volatile, что обеспечивает видимость поля val.

Структура ConcurrentHashMap в JDK1.8 показана на следующем рисунке:

JDK1.8 ConcurrentHashMap 结构图

Здесь я упоминаю конструктор ConcurrentHashMap по умолчанию, я думаю, что это место более интересное Конструктор по умолчанию ConcurrentHashMap выглядит следующим образом:

public ConcurrentHashMap() {
}

Выяснено, что без этого конструктора делать нечего, почему он так устроен? Преимущество этого заключается в том, что он реализует ленивую загрузку (форма ленивой загрузки), что эффективно позволяет избежать накладных расходов на инициализацию, на что многие люди жалуются на ConcurrentHashMap в JDK1.7.

Структурные изменения будут говорить о двух вышеупомянутых пунктах.Как и выше, давайте посмотрим на волнующие нас вопросы, как решить проблему небезопасности при расширении HashMap, и прочитаем исходный код ConcurrentHashMap с этой проблемой, о источник кода ConcurrentHashMap, в этой статье мы в основном говорим о двух методах добавления (putVal) и расширения (переноса), а другие методы не будут представлены один за другим.

метод putVal

Добавление элементов в ConcurrentHashMap напрямую не вызывает метод putVal, а использует метод put

public V put(K key, V value) {
    return putVal(key, value, false);
}

Но метод put вызывает метод putVal. Другими словами, putVal — это конкретный новый метод и конкретная реализация метода put. К исходному коду метода putVal добавляются комментарии. Конкретный код выглядит следующим образом:

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        // 如果 key 为空,直接返回
        if (key == null || value == null) throw new NullPointerException();
        // 两次 hash ,减少碰撞次数
        int hash = spread(key.hashCode());
        // 记录链表节点得个数
        int binCount = 0;
        // 无条件得循环遍历整个 node 数组,直到成功
        for (ConcurrentHashMap.Node<K,V>[] tab = table;;) {
            ConcurrentHashMap.Node<K,V> f; int n, i, fh;
            // lazy-load 懒加载的方式,如果当前 tab 容器为空,则初始化 tab 容器
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();

            // 通过Unsafe.getObjectVolatile()的方式获取数组对应index上的元素,如果元素为空,则直接无所插入
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                //// 利用CAS去进行无锁线程安全操作
                if (casTabAt(tab, i, null,
                        new ConcurrentHashMap.Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            // 如果 fh == -1 ,说明正在扩容,那么该线程也去帮扩容
            else if ((fh = f.hash) == MOVED)
                // 协作扩容操作
                tab = helpTransfer(tab, f);
            else {
                // 如果上面都不满足,说明存在 hash 冲突,则使用 synchronized 加锁。锁住链表或者红黑树的头结点,来保证操作安全
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {

                        if (fh >= 0) {// 表示该节点是链表
                            binCount = 1;
                            // 遍历该节点上的链表
                            for (ConcurrentHashMap.Node<K,V> e = f;; ++binCount) {
                                K ek;
                                //这里涉及到相同的key进行put就会覆盖原先的value
                                if (e.hash == hash &&
                                        ((ek = e.key) == key ||
                                                (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                ConcurrentHashMap.Node<K,V> pred = e;
                                if ((e = e.next) == null) {//插入链表尾部
                                    pred.next = new ConcurrentHashMap.Node<K,V>(hash, key,
                                            value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof ConcurrentHashMap.TreeBin) {// 该节点是红黑树节点
                            ConcurrentHashMap.Node<K,V> p;
                            binCount = 2;
                            if ((p = ((ConcurrentHashMap.TreeBin<K,V>)f).putTreeVal(hash, key,
                                    value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                // 插入完之后,判断链表长度是否大于8,大于8就需要转换为红黑树
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    // 如果存在相同的key ,返回原来的值
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        //统计 size,并且检测是否需要扩容
        addCount(1L, binCount);
        return null;
    }

В исходном коде есть более подробные комментарии.Если вы хотите узнать подробную реализацию, вы можете прочитать исходный код построчно.Здесь мы сделаем краткий обзор метода putVal.Метод putVal в основном выполняет следующие действия:

  • первый шаг, В ConcurrentHashMap поле key val не может быть пустым, поэтому первым шагом является проверка значения значения ключа, два поля key и val не могут быть нулевыми, прежде чем продолжить вниз, в противном случае будет возвращена ошибка NullPointerException, который похож на HashMap. Есть разница, HashMap может быть пустым.
  • второй шаг, Определите, инициализирован ли контейнер. Если контейнер не инициализирован, вызовите метод initTable для инициализации. Метод initTable выглядит следующим образом:
    /**
     * Initializes table, using the size recorded in sizeCtl.
     */
    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            // 负数表示正在初始化或扩容,等待
            if ((sc = sizeCtl) < 0)
                // 自旋等待
                Thread.yield(); // lost initialization race; just spin
            // 执行 CAS 操作,期望将 sizeCtl 设置为 -1,-1 是正在初始化的标识
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            // CAS 抢到了锁
                try {
                // 对 table 进行初始化,初始化长度为指定值,或者默认值 16
                    if ((tab = table) == null || tab.length == 0) {
                        // sc 在初始化的时候用户可能会自定义,如果没有自定义,则是默认的
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        // 创建数组
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        // 指定下次扩容的大小,相当于 0.75 × n
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

Таблица по существу является массивом узлов, а процесс ее инициализации является процессом инициализации массива узлов.Метод использует стратегию CAS для выполнения операции инициализации. Процесс инициализации:

1. Определяем, меньше ли значение sizeCtl 0. Если оно меньше 0, значит ConcurrentHashMap выполняет операцию инициализации, поэтому нужно подождать некоторое время.Если инициализация других потоков не удалась, можно заменить его 2. Если значение sizeCtl больше или равно 0, флаг вытеснения sizeCtl равен -1 на основе политики CAS, указывая, что ConcurrentHashMap выполняет инициализацию, затем создает таблицу и обновляет значение sizeCtl.

  • третий шаг, Найдите позицию нижнего индекса, соответствующую массиву, по хэш-значению после двойного хэша.Если узел не хранится в этой позиции, то есть нет конфликта хэшей, используйте метод CAS без блокировки, чтобы добавить данные в контейнер и завершить цикл.
  • четвертый шаг, Если третий шаг не выполнен, будет оцениваться, расширяется ли контейнер другими потоками.Если он расширяется другими потоками, операция добавления будет отменена и добавлена ​​в армию расширения (операция расширения ConcurrentHashMap принимает многопоточный метод, о нем мы поговорим позже), бесконечный цикл при расширении не выскочил,Это гарантирует, что никакие другие потоки не будут выполнять операции добавления данных при расширении контейнера, что также обеспечивает безопасность контейнера..
  • пятый шаг, Если хэш конфликтует, выполните операцию связанного списка или операцию красно-черного дерева (если дерево связанного списка превышает 8, измените связанный список на красно-черное дерево),При выполнении операций со связным списком или красно-черным деревом синхронизированная блокировка используется для блокировки головного узла, что гарантирует, что только один поток одновременно изменяет связанный список, и предотвращает формирование связанного списка в цикле..
  • Шаг 6, выполните операцию addCount(1L, binCount), которая обновит размер, чтобы определить, требуется ли расширение.Исходный код метода addCount выглядит следующим образом:
    // X传入的是1,check 传入的是 putVal 方法里的 binCount,没有hash冲突的话为0,冲突就会大于1
    private final void addCount(long x, int check) {
        ConcurrentHashMap.CounterCell[] as; long b, s;
        // 统计ConcurrentHashMap里面节点个数
        if ((as = counterCells) != null ||
                !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            ConcurrentHashMap.CounterCell a; long v; int m;
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                    (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                    !(uncontended =
                            U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
        // check就是binCount,binCount 最小都为0,所以这个条件一定会为true
        if (check >= 0) {
            ConcurrentHashMap.Node<K,V>[] tab, nt; int n, sc;
            // 这儿是自旋,需同时满足下面的条件
            // 1. 第一个条件是map.size 大于 sizeCtl,也就是说需要扩容
            // 2. 第二个条件是`table`不为null
            // 3. 第三个条件是`table`的长度不能超过最大容量
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                    (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                // 该判断表示已经有线程在进行扩容操作了
                if (sc < 0) {
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                            sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                            transferIndex <= 0)
                        break;
                    // 如果可以帮助扩容,那么将 sc 加 1. 表示多了一个线程在帮助扩容
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                // 如果不在扩容,将 sc 更新:标识符左移 16 位 然后 + 2. 也就是变成一个负数。高 16 位是标识符,低 16 位初始是 2
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                        (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }

Метод addCount выполняет две задачи: 1. Добавьте единицу к размеру карты 2. Проверьте, требуется ли расширение или выполняется ли расширение. Если ему нужно расшириться, вызовите метод расширения, и если он расширяется, помогите ему расшириться.

Способ переноса расширения

Метод переноса расширения — очень мощный метод. Прежде чем смотреть на конкретный исходный код переноса,Давайте сначала поймем, когда будет запущена операция расширения. Если ничего другого, операция расширения может быть запущена в следующих двух ситуациях.:

  • После вызова метода put для добавления элемента будет вызван метод addCount для обновления размера и проверки необходимости расширения.Когда количество элементов массива достигает порогового значения, срабатывает метод передачи

  • Запущена операция tryPresize, операция tryPresize вызовет операцию расширения, есть две ситуации, которые запускают операцию tryPresize:

    • Первый случай: Когда количество элементов связанного списка узла достигает порога 8, необходимо преобразовать связанный список в красно-черное дерево, Перед преобразованием структуры он сначала определит, является ли длина массива n меньше порогового значения MIN_TREEIFY_CAPACITY, значение по умолчанию равно 64. Если оно меньше этого значения, будет вызван метод tryPresize, чтобы увеличить длину массива до удвоенного исходного размера, и будет запущен метод передачи для повторной настройки положения узел.
    • Второй случай: операция tryPresize срабатывает первой во время операции putAll.

Исходный код метода tryPresize выглядит следующим образом:

tryPresize 方法源码

Что ж, узнав, когда будет запущено расширение, давайте взглянем на исходный код метода переноса расширения. Это тоже твердая кость, и ее очень трудно разжевать. Надеюсь, я смогу объяснить это максимально понятно. исходный код метода передачи выглядит следующим образом:

    private final void transfer(ConcurrentHashMap.Node<K,V>[] tab, ConcurrentHashMap.Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        // 多线程扩容,每核处理的量小于16,则强制赋值16
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        // nextTab 为空,先实例化一个新的数组
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                // 新数组的大小是原来的两倍
                ConcurrentHashMap.Node<K,V>[] nt = (ConcurrentHashMap.Node<K,V>[])new ConcurrentHashMap.Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            // 更新成员变量
            nextTable = nextTab;
            // 更新转移下标,就是 老的 tab 的 length
            transferIndex = n;
        }
        // bound :该线程此次可以处理的区间的最小下标,超过这个下标,就需要重新领取区间或者结束扩容
        // advance: 该参数
        int nextn = nextTab.length;
        // 创建一个 fwd 节点,用于占位。当别的线程发现这个槽位中是 fwd 类型的节点,则跳过这个节点。
        ConcurrentHashMap.ForwardingNode<K,V> fwd = new ConcurrentHashMap.ForwardingNode<K,V>(nextTab);
        // advance 变量指的是是否继续递减转移下一个桶,如果为 true,表示可以继续向后推进,反之,说明还没有处理好当前桶,不能推进
        boolean advance = true;
        // 完成状态,如果是 true,表示扩容结束
        boolean finishing = false; // to ensure sweep before committing nextTab
        // 死循环,i 表示下标,bound 表示当前线程可以处理的当前桶区间最小下标
        for (int i = 0, bound = 0;;) {
            ConcurrentHashMap.Node<K,V> f; int fh;
            while (advance) {
                int nextIndex, nextBound;
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSwapInt
                        (this, TRANSFERINDEX, nextIndex,
                                nextBound = (nextIndex > stride ?
                                        nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            else {
                synchronized (f) {
                // 这儿多判断一次,是否为了防止可能出现的remove()操作
                    if (tabAt(tab, i) == f) {
                        // 旧链表上该节点的数据,会被分成低位和高位,低位就是在新链表上的位置跟旧链表上一样,
                        // 高位就是在新链表的位置是旧链表位置加上旧链表的长度
                        ConcurrentHashMap.Node<K,V> ln, hn;
                        if (fh >= 0) {
                            int runBit = fh & n;
                            ConcurrentHashMap.Node<K,V> lastRun = f;
                            for (ConcurrentHashMap.Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (ConcurrentHashMap.Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                // 该节点哈希值与旧链表长度与运算,结果为0,则在低位节点上,反之,在高位节点上
                                if ((ph & n) == 0)
                                    ln = new ConcurrentHashMap.Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new ConcurrentHashMap.Node<K,V>(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);
                            // 在nextTable i + n 位置处插上链表
                            setTabAt(nextTab, i + n, hn);
                            // 在table i 位置处插上ForwardingNode 表示该节点已经处理过了
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        else if (f instanceof ConcurrentHashMap.TreeBin) {
                            // 如果是TreeBin,则按照红黑树进行处理,处理逻辑与上面一致
                            // 红黑树的逻辑跟节点一模一样,最后也会分高位和低位
                            ConcurrentHashMap.TreeBin<K,V> t = (ConcurrentHashMap.TreeBin<K,V>)f;
                            ConcurrentHashMap.TreeNode<K,V> lo = null, loTail = null;
                            ConcurrentHashMap.TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (ConcurrentHashMap.Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                ConcurrentHashMap.TreeNode<K,V> p = new ConcurrentHashMap.TreeNode<K,V>
                                        (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            // 如果树的节点数小于等于 6,那么转成链表,反之,创建一个新的树
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                    (hc != 0) ? new ConcurrentHashMap.TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                    (lc != 0) ? new ConcurrentHashMap.TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

Если вы хотите узнать конкретные детали реализации, пожалуйста, прочитайте исходный код построчно. Если вы не понимаете его, пожалуйста, оставьте сообщение для связи. Как и метод putVal, мы также суммируем метод передачи. Передача обычно делает следующие события:

  • первый шаг: подсчитайте количество сегментов, которые каждый поток может обрабатывать каждый раз. В соответствии с длиной карты рассчитайте сегменты (количество табличных массивов), которые необходимо обработать каждому потоку (ЦП). По умолчанию каждый поток обрабатывает 16 сегментов. каждый раз.Если он меньше 16, он будет вынужден стать 16 ведрами.

  • второй шаг: Инициализировать nextTab, если входящая новая таблица nextTab пуста, затем инициализировать nextTab, по умолчанию в два раза больше исходной таблицы

  • третий шаг: ForwardingNode, переменные продвижения и завершения введены для расширения емкости. ForwardingNode указывает, что узел был обработан и не нуждается в обработке. Advance указывает, можно ли переместить поток вниз к следующему сегменту (true: указывает, что он можно сдвинуть вниз), а завершение указывает, закончилось ли оно.

  • четвертый шаг: пропустите некоторые другие детали и перейдите непосредственно к переносу данных,В процессе передачи данных будет добавлена ​​синхронизированная блокировка, головной узел будет заблокирован, а операция будет синхронизирована, чтобы предотвратить вставку данных в связанный список во время putVal.

  • пятый шаг: выполнить миграцию данных,Если узел в этой корзине представляет собой связанный список или красно-черное дерево, данные узла будут разделены на младшие и старшие биты.Правило вычисления состоит в том, чтобы выполнить битовую операцию (&) над хеш-значением узел, за которым следует длина контейнера таблицы до расширения. Если результат равен 0, поместите данные в нижнюю позицию новой таблицы (i-я позиция в текущей таблице и i-я позиция в новая таблица), если результат не 0, поставить данные в верхнюю позицию новой таблицы (в текущей таблице i-я позиция, а позиция в новой таблице i + длина текущей таблицы контейнер).

  • Шаг 6: Если ведро смонтировано красно-черным деревом, необходимо не только разделить младшие и старшие узлы, но и определить, хранятся ли младшие и старшие узлы в связанном списке или красно-черное дерево в новой таблице.

В этом предложении все еще сложно расширить метод передачи. Надеюсь, я ясно выразился. Я не буду здесь говорить о других методах ConcurrentHashMap. ConcurrentHashMap широк и глубок, и для его тщательного изучения потребуется время. ConcurrentHashMap реализует инсайдерский обмен , я надеюсь, что содержание этой статьи может помочь вам в учебе или работе, спасибо за вашу поддержку.

Наконец

В настоящее время многие большие ребята в Интернете имеют статьи, связанные с ConcurrentHashMap, Если есть какие-то сходства, пожалуйста, потерпите меня. Нелегко быть оригинальным, и нелегко кодировать слова, я также надеюсь, что вы поддержите это. Если в тексте будут ошибки, надеюсь сообщить о них, спасибо.

Добро пожаловать, чтобы отсканировать код и подписаться на общедоступную учетную запись WeChat: «Технический блог брата Пинтоу». Брат Пинтоу будет учиться и добиваться успехов вместе.

平头哥的技术博文