Параллельное программирование — построчный анализ расширения ConcurrentHashMap#transfer()

задняя часть исходный код

предисловие

ConcurrentHashMap является высшим приоритетом в параллелизме и наиболее часто используемым результатом данных.В предыдущей статье мы представили метод putVal.ConcurrentHashMap (JDK 1.8) анализ исходного кода putVal параллельного программирования. Он анализирует метод initTable и метод putVal, но также оставляет предложение:

Эта статья — только начало ConcurrentHashMap, в ConcurrentHashMap слишком много сущностей, которые стоит изучить.

Говоря о сущности, его метод расширения определенно является сущностью.Вы должны знать, что расширение ConcurrentHashMap очень параллельное.

Сегодня давайте разберем исходный код построчно.

Вывод первый

Вывод первый. Исходники и комментарии выложу позже. Логика выполнения этого метода следующая:

  1. Подсчитайте, сколько сегментов помогает обрабатывать каждый поток (ЦП), рассчитав количество ядер ЦП и длину массива Map, и здесь обработка каждого потока является средней. По умолчанию каждый поток обрабатывает 16 сегментов. Следовательно, если длина равна 16, при расширении будет расширяться только одна нить.

  2. Инициализируйте временную переменную nextTable. Расширьте его в два раза больше, чем оригинал.

  3. Бесконечная петля начинает сдвигаться. Многопоточная параллельная передача находится в этом бесконечном цикле, оцениваемом в соответствии с конечной переменной, переменная имеет значение true, чтобы указать конец расширения, в противном случае расширение продолжается.

    3.1 входит в цикл While, назначаю массив на резьбу ведра диапазона, по умолчанию используется 16. Выделенный по упуском. При назначении значений получить, для меня уменьшается. Это индекс массива I. (其中有一个 bound 参数,这个参数指的是该线程此次可以处理的区间的最小下标,超过这个下标,就需要重新领取区间或者结束扩容,还有一个 advance 参数,该参数指的是是否继续递减转移下一个桶,如果为 true,表示可以继续向后推进,反之,说明还没有处理好当前桶,不能推进) 3.2 Выйдите из цикла while, введите суждение if и определите, закончилось ли расширение.Если расширение закончилось, очистите умирающую переменную, обновите табличную переменную и обновите порог емкости хранилища. Если он не завершен, но интервал не может быть получен (не более), поток выходит из метода и уменьшает sizeCtl на единицу, указывая, что осталось на один поток меньше для расширения. Если после этого число уменьшается, sizeCtl возвращается в исходное состояние, указывая на то, что потоков для расширения больше нет, и расширение всех потоков в этом методе закончено. (这里主要是判断扩容任务是否结束,如果结束了就让线程退出该方法,并更新相关变量). Затем проверьте все бочки, чтобы не допустить пропусков. 3.3 Если задача не завершена и слот, соответствующий i, пуст, попробуйте CAS вставить заполнитель, чтобы сделать поток метода putVal осведомленным. 3.4 Если слот, соответствующий i, не пуст и есть заполнитель, поток пропускает этот слот и обрабатывает следующий слот. 3.5 Если ничего из вышеперечисленного не соответствует действительности, это означает, что этот слот имеет фактическую стоимость. Начать обработку этого сегмента синхронно. 3.6 До этого момента данные в корзине не передавались, но индекс и интервал обработки были рассчитаны, а затем были завершены некоторые суждения о состоянии. При этом, если в соответствующем индексе нет данных или он занят, он пропускается.

  4. Акт обработки каждого сегмента является синхронным. Предотвращает вставку данных в связанный список при вызове putVal. 4.1 Если ведро представляет собой связанный список, то разделите связанный список на две части в соответствии с длиной, возьмите результат 0 и поместите его в нижнюю позицию новой таблицы, и возьмите результат 1 и поместите его в высокое положение новой таблицы. 4.2 Если ведро представляет собой красно-черное число, оно также разбивается на 2 части так же, как и связанный список.Затем оценивается количество узлов в разделенном дереве.Если число меньше или равно 6 , он преобразуется в связанный список. Вместо этого продолжайте использовать красно-черную древовидную структуру. 4.3 На этом процесс переноса корзины со старой таблицы на новую завершен.

Ну и выше общая логика метода переноса. Еще довольно сложно. Далее он упрощается и делится на 3 этапа:

  1. Вычислите интервал корзины, который может обработать каждый поток. По умолчанию 16.
  2. Инициализируйте временную переменную NEXTTABLE, удвоив расширение.
  3. Бесконечный цикл, вычисление индексов. Завершите общее суждение.
  4. 1 Если в корзине есть данные, передайте данные синхронно. Обычно он разбивается на 2 части, как связанный список.

В основном 3 шага выше.

Давайте посмотрим на исходный код и комментарии.

Посмотрите на анализ исходного кода

Исходный код с комментариями:

/**
 * Moves and/or copies the nodes in each bin to new table. See
 * above for explanation.
 * 
 * transferIndex 表示转移时的下标,初始为扩容前的 length。
 * 
 * 我们假设长度是 32
 */
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    // 将 length / 8 然后除以 CPU核心数。如果得到的结果小于 16,那么就使用 16。
    // 这里的目的是让每个 CPU 处理的桶一样多,避免出现转移任务不均匀的现象,如果桶较少的话,默认一个 CPU(一个线程)处理 16 个桶
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range 细分范围 stridea:TODO
    // 新的 table 尚未初始化
    if (nextTab == null) {            // initiating
        try {
            // 扩容  2 倍
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            // 更新
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            // 扩容失败, sizeCtl 使用 int 最大值。
            sizeCtl = Integer.MAX_VALUE;
            return;// 结束
        }
        // 更新成员变量
        nextTable = nextTab;
        // 更新转移下标,就是 老的 tab 的 length
        transferIndex = n;
    }
    // 新 tab 的 length
    int nextn = nextTab.length;
    // 创建一个 fwd 节点,用于占位。当别的线程发现这个槽位中是 fwd 类型的节点,则跳过这个节点。
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    // 首次推进为 true,如果等于 true,说明需要再次推进一个下标(i--),反之,如果是 false,那么就不能推进下标,需要将当前的下标处理完毕才能继续推进
    boolean advance = true;
    // 完成状态,如果是 true,就结束此方法。
    boolean finishing = false; // to ensure sweep before committing nextTab
    // 死循环,i 表示下标,bound 表示当前线程可以处理的当前桶区间最小下标
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        // 如果当前线程可以向后推进;这个循环就是控制 i 递减。同时,每个线程都会进入这里取得自己需要转移的桶的区间
        while (advance) {
            int nextIndex, nextBound;
            // 对 i 减一,判断是否大于等于 bound (正常情况下,如果大于 bound 不成立,说明该线程上次领取的任务已经完成了。那么,需要在下面继续领取任务)
            // 如果对 i 减一大于等于 bound(还需要继续做任务),或者完成了,修改推进状态为 false,不能推进了。任务成功后修改推进状态为 true。
            // 通常,第一次进入循环,i-- 这个判断会无法通过,从而走下面的 nextIndex 赋值操作(获取最新的转移下标)。其余情况都是:如果可以推进,将 i 减一,然后修改成不可推进。如果 i 对应的桶处理成功了,改成可以推进。
            if (--i >= bound || finishing)
                advance = false;// 这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进
            // 这里的目的是:1. 当一个线程进入时,会选取最新的转移下标。2. 当一个线程处理完自己的区间时,如果还有剩余区间的没有别的线程处理。再次获取区间。
            else if ((nextIndex = transferIndex) <= 0) {
                // 如果小于等于0,说明没有区间了 ,i 改成 -1,推进状态变成 false,不再推进,表示,扩容结束了,当前线程可以退出了
                // 这个 -1 会在下面的 if 块里判断,从而进入完成状态判断
                i = -1;
                advance = false;// 这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进
            }// CAS 修改 transferIndex,即 length - 区间值,留下剩余的区间值供后面的线程使用
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;// 这个值就是当前线程可以处理的最小当前区间最小下标
                i = nextIndex - 1; // 初次对i 赋值,这个就是当前线程可以处理的当前区间的最大下标
                advance = false; // 这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进,这样对导致漏掉某个桶。下面的 if (tabAt(tab, i) == f) 判断会出现这样的情况。
            }
        }// 如果 i 小于0 (不在 tab 下标内,按照上面的判断,领取最后一段区间的线程扩容结束)
        //  如果 i >= tab.length(不知道为什么这么判断)
        //  如果 i + tab.length >= nextTable.length  (不知道为什么这么判断)
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            if (finishing) { // 如果完成了扩容
                nextTable = null;// 删除成员变量
                table = nextTab;// 更新 table
                sizeCtl = (n << 1) - (n >>> 1); // 更新阈值
                return;// 结束方法。
            }// 如果没完成
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {// 尝试将 sc -1. 表示这个线程结束帮助扩容了,将 sc 的低 16 位减一。
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)// 如果 sc - 2 不等于标识符左移 16 位。如果他们相等了,说明没有线程在帮助他们扩容了。也就是说,扩容结束了。
                    return;// 不相等,说明没结束,当前线程结束方法。
                finishing = advance = true;// 如果相等,扩容结束了,更新 finising 变量
                i = n; // 再次循环检查一下整张表
            }
        }
        else if ((f = tabAt(tab, i)) == null) // 获取老 tab i 下标位置的变量,如果是 null,就使用 fwd 占位。
            advance = casTabAt(tab, i, null, fwd);// 如果成功写入 fwd 占位,再次推进一个下标
        else if ((fh = f.hash) == MOVED)// 如果不是 null 且 hash 值是 MOVED。
            advance = true; // already processed // 说明别的线程已经处理过了,再次推进一个下标
        else {// 到这里,说明这个位置有实际值了,且不是占位符。对这个节点上锁。为什么上锁,防止 putVal 的时候向链表插入数据
            synchronized (f) {
                // 判断 i 下标处的桶节点是否和 f 相同
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;// low, height 高位桶,低位桶
                    // 如果 f 的 hash 值大于 0 。TreeBin 的 hash 是 -2
                    if (fh >= 0) {
                        // 对老长度进行与运算(第一个操作数的的第n位于第二个操作数的第n位如果都是1,那么结果的第n为也为1,否则为0)
                        // 由于 Map 的长度都是 2 的次方(000001000 这类的数字),那么取于 length 只有 2 种结果,一种是 0,一种是1
                        //  如果是结果是0 ,Doug Lea 将其放在低位,反之放在高位,目的是将链表重新 hash,放到对应的位置上,让新的取于算法能够击中他。
                        int runBit = fh & n;
                        Node<K,V> lastRun = f; // 尾节点,且和头节点的 hash 值取于不相等
                        // 遍历这个桶
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            // 取于桶中每个节点的 hash 值
                            int b = p.hash & n;
                            // 如果节点的 hash 值和首节点的 hash 值取于结果不同
                            if (b != runBit) {
                                runBit = b; // 更新 runBit,用于下面判断 lastRun 该赋值给 ln 还是 hn。
                                lastRun = p; // 这个 lastRun 保证后面的节点与自己的取于值相同,避免后面没有必要的循环
                            }
                        }
                        if (runBit == 0) {// 如果最后更新的 runBit 是 0 ,设置低位节点
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun; // 如果最后更新的 runBit 是 1, 设置高位节点
                            ln = null;
                        }// 再次循环,生成两个链表,lastRun 作为停止条件,这样就是避免无谓的循环(lastRun 后面都是相同的取于结果)
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            // 如果与运算结果是 0,那么就还在低位
                            if ((ph & n) == 0) // 如果是0 ,那么创建低位节点
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else // 1 则创建高位
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        // 其实这里类似 hashMap 
                        // 设置低位链表放在新链表的 i
                        setTabAt(nextTab, i, ln);
                        // 设置高位链表,在原有长度上加 n
                        setTabAt(nextTab, i + n, hn);
                        // 将旧的链表设置成占位符
                        setTabAt(tab, i, fwd);
                        // 继续向后推进
                        advance = true;
                    }// 如果是红黑树
                    else if (f instanceof TreeBin) {
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        // 遍历
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            // 和链表相同的判断,与运算 == 0 的放在低位
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            } // 不是 0 的放在高位
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        // 如果树的节点数小于等于 6,那么转成链表,反之,创建一个新的树
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        // 低位树
                        setTabAt(nextTab, i, ln);
                        // 高位数
                        setTabAt(nextTab, i + n, hn);
                        // 旧的设置成占位符
                        setTabAt(tab, i, fwd);
                        // 继续向后推进
                        advance = true;
                    }
                }
            }
        }
    }
}

Код и комментарии относительно длинные.Если интересно,можете сравнить построчно.Есть 2 судьи,которые арендодатель не может понять,почему вынесено такое суждение.Знающие студенты могут напомнить.

Тогда поговорим о сути.

  1. Cmap поддерживает параллельное расширение, разделяя таблицу и позволяя каждому потоку обрабатывать свой собственный диапазон. Как показано ниже:

Предполагая, что общая длина равна 64, каждый поток можно разделить на 16 сегментов, которые будут обрабатываться отдельно и не будут влиять друг на друга.

  1. Когда каждый поток обрабатывает данные в своем сегменте, это выглядит следующим образом:

Состояние до расширения.

При переносе корзины №4 или №10 связанный список будет разбит на две части.Правило брать длину по хеш-значению узла.Если результат равен 0, ставить его в младшем порядке, в противном случае поместите его в высокий порядок.

Следовательно, для данных в корзине 10 черный узел будет помещен в позицию 10 новой таблицы, а белый узел будет помещен в позицию 26 новой корзины.

На следующем рисунке показана логика циклического просмотра данных в корзине:

После обработки данные в новой корзине выглядят так:

image.png

Суммировать

Можно сказать, что метод передачи очень мощный, очень важный, а внутренняя производительность многопоточного расширения очень высока.

Назначая интервалы сегментов каждому потоку, можно избежать конфликтов между потоками, а блокируя каждый узел сегмента, можно избежать несогласованности данных, вызванной методом putVal. В то же время при расширении связанный список также будет разделен на две части, что аналогично методу изменения размера HashMap.

И если новый поток захочет поместить данные, это также поможет ему расшириться. Удивительное мастерство.