Сделайте ConcurrentHashMap своим бонусом за интервью

Java

在这里插入图片描述из-за предыдущей статьиHashMapЭто было объяснено очень подробно, поэтому эта статья кратко представит идею, и будет намного проще изучить параллельный HashMap, В последней статье мы, наконец, знаемHashMapявляется небезопасным для потоков, поэтому предоставляется в более старых версиях JDK.HashTableДля достижения уровня многопоточности важные изменения заключаются в следующем.

  1. HashTableизput, get,removeи т.п. метод черезsynchronizedИзменить, чтобы обеспечить его потокобезопасность.
  2. HashTableНе допускается, чтобы ключ и значение были нулевыми.
  3. проблема в томsynchronizedЭто ==весовая блокировка== на уровне ключевого слова, и любые операции записи при получении данных запрещены. Относительно плохая производительность. Поэтому в настоящее время основнымConcurrentHashMapдля обеспечения безопасности резьбы.

ConcurrentHashMapВ основном разделен на две версии JDK7 и JDK8,ConcurrentHashMapБолее низкий коэффициент использования пространства обычно составляет от 10% до 20%, который будет введен отдельно.

JDK7

Давайте сначала поговорим об общем составе JDK7, ConcurrentHashMap состоит изSegmentструктура массива иHashEntryСостав массива. Сегмент — это блокировка с повторным входом, которая представляет собой массив и структуру связанного списка. Сегмент содержит массив HashEntry, и каждый HashEntry представляет собой структуру связанного списка. Именно с помощью Segment==segment lock== ConcurrentHashMap обеспечивает эффективный параллелизм. Недостатком является то, что степень параллелизма определяется массивом сегментов, и после инициализации степени параллелизма ее нельзя расширить. нарисуй первымConcurrentHashMapвизуализация изображения.在这里插入图片描述понятьcurrentHashMap, что можно просто понимать как данныеПодтаблица и подбиблиотека.ConcurrentHashMapОтSegmentструктура массива иHashEntryструктура массива.

  • Сегмент представляет собой реентерабельную блокировкуReentrantLockПодкласс , вConcurrentHashMapиграть роль замка,HashEntryОн используется для хранения данных пары ключ-значение.
  • ConcurrentHashMapсодержитSegmentМассив для разделения блокировки,Segmentструктура иHashMapпохожий, аSegmentсодержитHashEntryмассив, каждыйHashEntryявляется элементом структуры связанного списка, каждыйSegmentОпекунHashEntryэлементы в массиве, если они правильныеHashEntryПри изменении данных массива необходимо сначала получить соответствующий емуSegmentЗамок.
  1. Давайте сначала посмотрим на класс сегмента:
static final class Segment<K,V> extends ReentrantLock implements Serializable {
     transient volatile HashEntry<K,V>[] table; //包含一个HashMap 可以理解为
}

можно понимать как каждый из нашихsegmentэто все реализованоLockфункциональныйHashMap. Если у нас есть несколькоsegmentсформированныйsegmentArray, тогда мы можем добиться параллелизма.

  1. ПосмотримcurrentHashMapКонструктор, сначала подытожим несколько моментов.

    1. В конечном итоге размер массива сегментов должен быть степенью двойки.
    2. Начальный размер таблицы (массива HashEntry), содержащейся в каждом сегменте, также должен быть степенью двойки.
    3. Здесь задаются несколько параметров для расчета битов.
    4. InitialCapacity: начальный размер емкости, по умолчанию 16.
    5. loadFactor: коэффициент расширения, значение по умолчанию – 0,75. Если количество элементов, хранящихся в сегменте, больше, чем initialCapacity* loadFactor, сегмент будет расширен один раз.
    6. concurrencyLevel: параллелизм, по умолчанию 16. Параллелизм можно понимать как способность программы выполнятьОбновите заодноМаксимальное количество потоков в ConccurentHashMap без конкуренции блокировок на самом деле равно количеству потоков в ConcurrentHashMap.блокировка сегментаЧисло, то есть длина массива Segment[]. Если параллелизм установлен слишком маленьким, это вызовет серьезные проблемы с конкуренцией блокировок; если параллелизм установлен слишком большой, доступ, изначально расположенный в одном и том же сегменте, будет распространяться на другие сегменты, и частота попаданий в кэш ЦП уменьшится, что приведет к Падает производительность программы.

Детали конструктора:

   //initialCapacity 是我们保存所以KV数据的初始值
   //loadFactor这个就是HashMap的负载因子
   // 我们segment数组的初始化大小
      @SuppressWarnings("unchecked")
       public ConcurrentHashMap(int initialCapacity,
                                float loadFactor, int concurrencyLevel) {
           if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
               throw new IllegalArgumentException();
           if (concurrencyLevel > MAX_SEGMENTS) // 最大允许segment的个数,不能超过 1< 24
               concurrencyLevel = MAX_SEGMENTS;
           int sshift = 0; // 类似扰动函数
           int ssize = 1; 
           while (ssize < concurrencyLevel) {
               ++sshift;
               ssize <<= 1; // 确保segment一定是2次幂
           }
           this.segmentShift = 32 - sshift;  
           //有点类似与扰动函数,跟下面的参数配合使用实现 当前元素落到那个segment上面。
           this.segmentMask = ssize - 1; // 为了 取模 专用
           if (initialCapacity > MAXIMUM_CAPACITY) //不能大于 1< 30
               initialCapacity = MAXIMUM_CAPACITY;
   
           int c = initialCapacity / ssize; //总的数组大小 被 segment 分散后 需要多少个table
           if (c * ssize < initialCapacity)
               ++c; //确保向上取值
           int cap = MIN_SEGMENT_TABLE_CAPACITY; 
           // 每个table初始化大小为2
           while (cap < c) // 单独的一个segment[i] 对应的table 容量大小。
               cap <<= 1;
           // 将table的容量初始化为2的次幂
           Segment<K,V> s0 =
               new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]);
               // 负载因子,阈值,每个segment的初始化大小。跟hashmap 初始值类似。
               // 并且segment的初始化是懒加载模式,刚开始只有一个s0,其余的在需要的时候才会增加。
           Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
           UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
           this.segments = ss;
       }
  1. хэш Независимо от того, является ли это нашей операцией получения или операции размещения, нам нужно найти данные с помощью хэша.
   //  整体思想就是通过多次不同方式的位运算来努力将数据均匀的分不到目标table中,都是些扰动函数
   private int hash(Object k) {
       int h = hashSeed;
       if ((0 != h) && (k instanceof String)) {
           return sun.misc.Hashing.stringHash32((String) k);
       }
       h ^= k.hashCode();
       // single-word Wang/Jenkins hash.
       h += (h <<  15) ^ 0xffffcd7d;
       h ^= (h >>> 10);
       h += (h <<   3);
       h ^= (h >>>  6);
       h += (h <<   2) + (h << 14);
       return h ^ (h >>> 16);
   }
  1. получить Условно говоря, это относительно просто, не более чем черезhashнайти соответствующийsegment, продолжить черезhashнайти соответствующийtable, а затем просмотрите связанный список, чтобы увидеть, можно ли его найти, и обратите вниманиеgetКогда это == нет блокировки ==.
   public V get(Object key) {
       Segment<K,V> s;
       HashEntry<K,V>[] tab;
       int h = hash(key); // JDK7中标准的hash值获取算法
       long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; // hash值如何映射到对应的segment上
       if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) {
           //  无非就是获得hash值对应的segment 是否存在,
           for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                    (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                e != null; e = e.next) {
               // 看下这个hash值对应的是segment(HashEntry)中的具体位置。然后遍历查询该链表
               K k;
               if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                   return e.value;
           }
       }
       return null;
   }
  1. ставить Та же идея, сначала найтиhashзначение, соответствующееsegmentрасположение, затем посмотрите наsegmentИнициализирована ли позиция (поскольку сегмент находится в режиме ==ленивой загрузки==). Выборочная инициализация и, наконец, выполнение операции размещения.
   @SuppressWarnings("unchecked")
   public V put(K key, V value) {
       Segment<K,V> s;
       if (value == null)
           throw new NullPointerException();
       int hash = hash(key);// 还是获得最终hash值
       int j = (hash >>> segmentShift) & segmentMask; // hash值位操作对应的segment数组位置
       if ((s = (Segment<K,V>)UNSAFE.getObject          
            (segments, (j << SSHIFT) + SBASE)) == null)
           s = ensureSegment(j); 
       // 初始化时候因为只有第一个segment,如果落在了其余的segment中 则需要现初始化。
       return s.put(key, hash, value, false);
       // 直接在数据中执行put操作。
   }

вputОсновная идея работы иHashMapПочти то же самое, только заблокируйте операцию в начале и в концеtryLock and unlock, а затем в JDK7 сначала расширяется емкость, а затем добавляются данные, и если блокировка не получена, блокировка tryLock или блокировки == spin== будет заблокирована и поставлена ​​в очередь ожидания (при этом, новые данные будут генерироваться заранее до получения блокировки).

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 在往该 segment 写入前,需要先获取该 segment 的独占锁,获取失败尝试获取自旋锁
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        // segment 内部的数组
        HashEntry<K,V>[] tab = table;
        // 利用 hash 值,求应该放置的数组下标
        int index = (tab.length - 1) & hash;
        // first 是数组该位置处的链表的表头
        HashEntry<K,V> first = entryAt(tab, index);
 
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        // 覆盖旧值
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                // 继续顺着链表走
                e = e.next;
            }
            else {
                // node 是不是 null,这个要看获取锁的过程。没获得锁的线程帮我们创建好了节点,直接头插法
                // 如果不为 null,那就直接将它设置为链表表头;如果是 null,初始化并设置为链表表头。
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
 
                int c = count + 1;
                // 如果超过了该 segment 的阈值,这个 segment 需要扩容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node); // 扩容
                else
                    // 没有达到阈值,将 node 放到数组 tab 的 index 位置,
                    // 将新的结点设置成原链表的表头
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        // 解锁
        unlock();
    }
    return oldValue;
}

Если замок не вызываетscanAndLockForPut, завершает работу по поиску или созданию нового узла. Когда блокировка получена, узел может быть непосредственно добавлен в связанный список.продвигатьДля выполнения операции put здесь задействован ==spin==. Грубый процесс:

  1. Когда я не могу получить блокировку, я попытаюсь заблокировать и подготовить новые данные.В то же время существует определенное количество раз.Мне также необходимо учитывать другие узлы, которые получили потоки для изменения головного узла.
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    int retries = -1; // negative while locating node
 
    // 循环获取锁
    while (!tryLock()) {
        HashEntry<K,V> f; // to recheck first below
        if (retries < 0) {
            if (e == null) {
                if (node == null) // speculatively create node
              // 进到这里说明数组该位置的链表是空的,没有任何元素
             // 当然,进到这里的另一个原因是 tryLock() 失败,所以该槽存在并发,不一定是该位置
                    node = new HashEntry<K,V>(hash, key, value, null);
                retries = 0;
            }
            else if (key.equals(e.key))
                retries = 0;
            else
                // 顺着链表往下走
                e = e.next;
        }
    // 重试次数如果超过 MAX_SCAN_RETRIES(单核 1 次多核 64 次),那么不抢了,进入到阻塞队列等待锁
    //    lock() 是阻塞方法,直到获取锁后返回
        else if (++retries > MAX_SCAN_RETRIES) {
            lock();
            break;
        }
        else if ((retries & 1) == 0 &&
                 // 进入这里,说明有新的元素进到了链表,并且成为了新的表头
                 // 这边的策略是,重新执行 scanAndLockForPut 方法
                 (f = entryForHash(this, hash)) != first) {
            e = first = f; // re-traverse if entry changed
            retries = -1;
        }
    }
    return node;
}
  1. Size

    Этот метод размера более интересен.Он сначала подсчитывает все объемы данных без блокировок, чтобы увидеть, совпадают ли данные до и после.Если они одинаковы, возвращает данные.Если они разные, блокирует, считает и разблокирует все сегменты. А метод size просто возвращает статистическое число, поэтому используйте размер с осторожностью.

public int size() {
       // Try a few times to get accurate count. On failure due to
       // continuous async changes in table, resort to locking.
       final Segment<K,V>[] segments = this.segments;
       int size;
       boolean overflow; // true if size overflows 32 bits
       long sum;         // sum of modCounts
       long last = 0L;   // previous sum
       int retries = -1; // first iteration isn't retry
       try {
           for (;;) {
               if (retries++ == RETRIES_BEFORE_LOCK) {  //  超过2次则全部加锁
                   for (int j = 0; j < segments.length; ++j)
                       ensureSegment(j).lock(); // 直接对全部segment加锁消耗性太大
               }
               sum = 0L;
               size = 0;
               overflow = false;
               for (int j = 0; j < segments.length; ++j) {
                   Segment<K,V> seg = segmentAt(segments, j);
                   if (seg != null) {
                       sum += seg.modCount; // 统计的是modCount,涉及到增删该都会加1
                       int c = seg.count;
                       if (c < 0 || (size += c) < 0)
                           overflow = true;
                   }
               }
               if (sum == last) // 每一个前后的修改次数一样 则认为一样,但凡有一个不一样则直接break。
                   break;
               last = sum;
           }
       } finally {
           if (retries > RETRIES_BEFORE_LOCK) {
               for (int j = 0; j < segments.length; ++j)
                   segmentAt(segments, j).unlock();
           }
       }
       return overflow ? Integer.MAX_VALUE : size;
   }
  1. rehash segmentПосле того, как массив инициализирован, он неизменяем, то естьПараллелизм неизменен,ноsegmentвнутреннийtableЕго можно масштабировать до 2 раз.Этот метод не учитывает параллелизм, поскольку блокировка была получена до выполнения метода. Который в JDK7rehashИдея та же, что и идея обработки связанного списка после расширения в JDK8, но лично мне кажется, что она не так хороша, как суть, написанная в 8.
// 方法参数上的 node 是这次扩容后,需要添加到新的数组中的数据。
private void rehash(HashEntry<K,V> node) {
    HashEntry<K,V>[] oldTable = table;
    int oldCapacity = oldTable.length;
    // 2 倍
    int newCapacity = oldCapacity << 1;
    threshold = (int)(newCapacity * loadFactor);
    // 创建新数组
    HashEntry<K,V>[] newTable =
        (HashEntry<K,V>[]) new HashEntry[newCapacity];
    // 新的掩码,如从 16 扩容到 32,那么 sizeMask 为 31,对应二进制 ‘000...00011111’
    int sizeMask = newCapacity - 1;
    // 遍历原数组,将原数组位置 i 处的链表拆分到 新数组位置 i 和 i+oldCap 两个位置
    for (int i = 0; i < oldCapacity ; i++) {
        // e 是链表的第一个元素
        HashEntry<K,V> e = oldTable[i];
        if (e != null) {
            HashEntry<K,V> next = e.next;
            // 计算应该放置在新数组中的位置,
            // 假设原数组长度为 16,e 在 oldTable[3] 处,那么 idx 只可能是 3 或者是 3 + 16 = 19
            int idx = e.hash & sizeMask; // 新位置
            if (next == null)   // 该位置处只有一个元素
                newTable[idx] = e;
            else { // Reuse consecutive sequence at same slot
                // e 是链表表头
                HashEntry<K,V> lastRun = e;
                // idx 是当前链表的头结点 e 的新位置
                int lastIdx = idx;
                // for 循环找到一个 lastRun 结点,这个结点之后的所有元素是将要放到一起的
                for (HashEntry<K,V> last = next;
                     last != null;
                     last = last.next) {
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                // 将 lastRun 及其之后的所有结点组成的这个链表放到 lastIdx 这个位置
                newTable[lastIdx] = lastRun;
                // 下面的操作是处理 lastRun 之前的结点,
                //这些结点可能分配在另一个链表中,也可能分配到上面的那个链表中
                for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry<K,V> n = newTable[k];
                    newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                }
            }
        }
    }
    // 将新来的 node 放到新数组中刚刚的 两个链表之一 的 头部
    int nodeIndex = node.hash & sizeMask; // add the new node
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node;
    table = newTable;
}
  1. CAS-операция В JDK7 вConcurrentHashMapчерез атомарные операцииsun.misc.UnsafeПоиск элементов, замена элементов и установка элементов. Получение данных через такой аппаратный уровень может обеспечить своевременную многопоточную обработку, и я также каждый раз получаю самые свежие данные. Эти атомарные операции играют очень важную роль, вы можетеConcurrentHashMapКак видно из базовой функции, случайное расстояние выглядит следующим образом:
     final void setNext(HashEntry<K,V> n) {
            UNSAFE.putOrderedObject(this, nextOffset, n);
        }
    static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
        return (tab == null) ? null :
            (HashEntry<K,V>) UNSAFE.getObjectVolatile
            (tab, ((long)i << TSHIFT) + TBASE);
    }
   static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i,
                                       HashEntry<K,V> e) {
        UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
    }

Общая проблема

  1. Каков принцип реализации ConcurrentHashMap или как ConcurrentHashMap обеспечивает повышение производительности при одновременном обеспечении безопасности потоков при высокой степени параллелизма?

ConcurrentHashMapКлючом к одновременному выполнению нескольких операций модификации является использование технологии == разделения блокировки ==. Он использует несколько блокировок для управления модификациями различных частей хеш-таблицы. Внутренний сегмент (Segment) для представления этих разных частей каждый сегмент на самом деле представляет собой небольшойHashTable, если в разных сегментах выполняется несколько операций модификации, они могут выполняться одновременно.

  1. Как обеспечить, чтобы полученные элементы были самыми последними в случае высокой параллелизма?

для хранения данных типа "ключ-значение"HashEntry, по замыслу за его значением переменной-члена следуетnextобаvolatileтип, который гарантирует, что изменение значения value другими потоками может быть немедленно замечено методом get.

  1. Слабая непротиворечивость ConcurrentHashMap отражается в методах итератора, очистки и получения, поскольку блокировка отсутствует.

    1. Например, когда итератор проходит данные, он проходит сегмент за сегментом.Если поток вставит данные в только что пройденный сегмент после прохождения сегмента, он покажет несогласованность. То же самое касается ясности.
    2. Метод get и метод containsKey обходят все узлы в соответствующей позиции индекса, и они оцениваются без блокировки.Если это модификация, последнее значение может быть получено напрямую из-за существования видимости, но если это новое значение, затемнеспособность поддерживать постоянство.

JDK8

Основные различия между JDK8 и JDK7 заключаются в следующем:

  1. Массив сегментов отменяется, а данные сохраняются непосредственно в таблице, степень детализации блокировки меньше, а вероятность параллельного конфликта снижается. Элементы массива таблиц используются в качестве замков для блокировки каждой строки данных, что еще больше снижает вероятность конфликтов параллелизма.Для управления параллелизмом используются Synchronized и CAS.
  2. При хранении данных используется форма массив + связанный список + красно-черное дерево.
  1. Важные параметры CurrentHashMap:

private static final int MAXIMUM_CAPACITY = 1 [] table;// Для сохранения элементов используется неинициализированный массив по умолчанию private transient volatile Node[] nextTable; // Массив, используемый при передаче static final int NCPU = Runtime.getRuntime(). AvailableProcessors();// Получаем количество доступных процессоров private transient volatile Node[] nextTable; // Таблица соединений, используемая для расширения хэш-таблицы, будет сброшена до нуля после завершения расширения private transient volatile long baseCount; содержит сумму всех узлов, хранящихся во всей хеш-таблице, что несколько похоже на атрибут размера HashMap. частный переходный volatile intsizeCtl; Отрицательное число: указывает на инициализацию или расширение, -1: указывает на инициализацию, -N: указывает на расширение N-1 потоков. Положительное число: 0 указывает на то, что оно не было инициализировано, а число > 0: инициализация или пороговое значение для следующего расширения, что-то вроде того, что в HashMapthreshold, но функцияБолее могущественный.

  1. несколько важных классов
  • Базовый класс, из которого состоит каждый элементNode
      static class Node<K,V> implements Map.Entry<K,V> {
              final int hash;    // key的hash值
              final K key;       // key
              volatile V val;    // value
              volatile Node<K,V> next; 
               //表示链表中的下一个节点
      }
  • TreeNode наследуется от Node и используется для хранения красно-черных узлов дерева.
      static final class TreeNode<K,V> extends Node<K,V> {
              TreeNode<K,V> parent;  
              // 红黑树的父亲节点
              TreeNode<K,V> left;
              // 左节点
              TreeNode<K,V> right;
             // 右节点
              TreeNode<K,V> prev;    
             // 前节点
              boolean red;
             // 是否为红点
      }
  • узел переадресации в подклассах узловForwardingNodeВ конструкторе вы можете увидеть хэш этой переменной =-1, класс также хранитnextTableцитаты. Метод инициализации толькоtransferвызывается метод, если для класса задан этот случай и хэш = -1, то размер узла не нужно изменять.
static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            //注意这里
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }
 //.....
}
  • ДеревоБин TreeBin можно понимать буквально как контейнер для хранения древовидной структуры, а древовидная структура относится к TreeNode, поэтому TreeBin — это контейнер, который инкапсулирует TreeNode, который обеспечивает некоторые условия для преобразования черно-красного дерева и управления блокировками.
static final class TreeBin<K,V> extends Node<K,V> {
        TreeNode<K,V> root;
        volatile TreeNode<K,V> first;
        volatile Thread waiter;
        volatile int lockState;
        // values for lockState
        static final int WRITER = 1; // set while holding write lock
        static final int WAITER = 2; // set when waiting for write lock
        static final int READER = 4; // increment value for setting read lock
}

Конструктор

Общая конструкция в основном похожа на HashMap, и для совместимости с оригинальным JDK7 можно передать параллелизм. Однако параллелизм в JDK8 уже контролируется конкретной длиной таблицы.

  1. ConcurrentHashMap(): создает новую пустую карту с начальной емкостью по умолчанию (16), коэффициентом загрузки (0,75) и уровнем параллелизма (16).
  2. ConcurrentHashMap(int): создает карту с указанной начальной емкостью.tableSizeFor, новая пустая карта для коэффициента загрузки по умолчанию (0,75) и concurrencyLevel (16)
  3. ConcurrentHashMap(Map extends K, ? extends V> m): создает новую карту с тем же отношением отображения, что и данная карта.
  4. ConcurrentHashMap(int initialCapacity, float loadFactor): создает новую пустую карту с указанной начальной емкостью, коэффициентом загрузки и уровнем параллелизма по умолчанию (1).
  5. ConcurrentHashMap(int, float, int): создает новую пустую карту с указанной начальной емкостью, коэффициентом загрузки и уровнем параллелизма.

put

Предполагая, что таблица была инициализирована, операция размещения использует синхронизацию CAS + для реализации параллельных операций вставки или обновления.Конкретная реализация выглядит следующим образом.

  1. Выполните некоторую граничную обработку и получите хеш-значение.
  2. Если он не инициализирован, он будет инициализирован.После инициализации проверьте, не пуст ли соответствующий бакет.Если он пуст, попробуйте вставить его атомарно.
  3. Если текущий узел расширяется, необходимо помогать расширяться и работать.
  4. использоватьsynЧтобы заблокировать текущий узел, а затем операция почти такая же, как с хэш-картой.

// Node 节点的 hash值在HashMap中存储的就是hash值,在currenthashmap中可能有多种情况哦!
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException(); //边界处理
    int hash = spread(key.hashCode());// 最终hash值计算
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) { //循环表
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable(); // 初始化表 如果为空,懒汉式
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
        // 如果对应桶位置为空
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) 
                         // CAS 原子性的尝试插入
                break;
        } 
        else if ((fh = f.hash) == MOVED) 
        // 如果当前节点正在扩容。还要帮着去扩容。
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            synchronized (f) { //  桶存在数据 加锁操作进行处理
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) { // 如果存储的是链表 存储的是节点的hash值
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 遍历链表去查找,如果找到key一样则选择性
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {// 找到尾部插入
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {// 如果桶节点类型为TreeBin
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) { 
                             // 尝试红黑树插入,同时也要防止节点本来就有,选择性覆盖
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) { // 如果链表数量
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i); //  链表转红黑树哦!
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount); // 统计大小 并且检查是否要扩容。
    return null;
}

включает в себя важные функцииinitTable,tabAt,casTabAt,helpTransfer,putTreeVal,treeifyBin,addCountфункция.

initTable

Допускается только один потокИнициализируйте таблицу. Если появится другой поток, он позволит другим потокам передать ЦП и дождаться следующего системного планирования.Thread.yield. Таким образом, гарантируется, что таблица будет инициализирована только одним потоком одновременно.Что касается размера таблицы, он будет определяться в соответствии сsizeCtlУстановите значение szieCtl, если значение szieCtl не установлено, то размер сгенерированной таблицы по умолчанию равен 16, в противном случае он будет основан наsizeCtlРазмер задает размер таблицы.

// 容器初始化 操作
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        if ((sc = sizeCtl) < 0) // 如果正在初始化-1,-N 正在扩容。
            Thread.yield(); // 进行线程让步等待
     // 让掉当前线程 CPU 的时间片,使正在运行中的线程重新变成就绪状态,并重新竞争 CPU 的调度权。
     // 它可能会获取到,也有可能被其他线程获取到。
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { 
          //  比较sizeCtl的值与sc是否相等,相等则用 -1 替换,这表明我这个线程在进行初始化了!
            try {
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 默认为16
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2); // sc = 0.75n
                }
            } finally {
                sizeCtl = sc; //设置sizeCtl 类似threshold
            }
            break;
        }
    }
    return tab;
}

unsafe

существуетConcurrentHashMapиспользуется вunSafeметод, который обеспечивает безопасность параллельной обработки за счет непосредственного использования памяти, используя механизм безопасности ==hardware==.

 // 用来返回节点数组的指定位置的节点的原子操作
@SuppressWarnings("unchecked")
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

// cas原子操作,在指定位置设定值
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                    Node<K,V> c, Node<K,V> v) {
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

// 原子操作,在指定位置设定值
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
    U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

// 比较table数组下标为i的结点是否为c,若为c,则用v交换操作。否则,不进行交换操作。
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                    Node<K,V> c, Node<K,V> v) {
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

Вы можете видеть, что данные table[i] получены черезUnsafeОбъект получается через рефлексию, нельзя ли напрямую вывести данные в table[index], почему так сложно? В модели памяти Java мы уже знаем, что у каждого потока есть рабочая память, в которой хранятся данные таблицы.копировать, хотя таблицаvolatileИзменено, но не гарантируется, что поток каждый раз будет получать последний элемент в таблице, Unsafe.getObjectVolatile может напрямую получать данные указанной памяти.Убедитесь, что данные актуальны каждый раз, когда вы их получаете.

helpTransfer

// 可能有多个线程在同时帮忙运行helpTransfer
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
        // table不是空  且 node节点是转移类型,并且转移类型的nextTable 不是空 说明还在扩容ing
        int rs = resizeStamp(tab.length); 
        // 根据 length 得到一个前16位的标识符,数组容量大小。
        // 确定新table指向没有变,老table数据也没变,并且此时 sizeCtl小于0 还在扩容ing
        while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) {
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0)
            // 1. sizeCtl 无符号右移16位获得高16位如果不等 rs 标识符变了
            // 2. 如果扩容结束了 这里可以看 trePresize 函数第一次扩容操作:
            // 默认第一个线程设置 sc = rs 左移 16 位 + 2,当第一个线程结束扩容了,
            // 就会将 sc 减一。这个时候,sc 就等于 rs + 1。
            // 3. 如果达到了最大帮助线程个数 65535个
            // 4. 如果转移下标调整ing 扩容已经结束了
                break;
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
            // 如果以上都不是, 将 sizeCtl + 1,增加一个线程来扩容
                transfer(tab, nextTab); // 进行转移
                break;// 结束循环
            }
        }
        return nextTab;
    }
    return table;
}
  • Integer.numberOfLeadingZeros(n)

Что делает этот методВозвращает количество нулей, предшествующих старшему ненулевому биту целого числа без знака i., включая бит знака; Если i отрицательное, этот метод вернет 0, а бит знака равен 1. Например, двоичное представление числа 10 равно 0000 0000 0000 0000 0000 0000 0000 1010. Целочисленная длина Java составляет 32 бита. Затем этот метод возвращает 28

  • изменить размерштамп В основном он используется для получения идентификатора, который можно просто понимать как мониторинг текущей пропускной способности системы.
static final int resizeStamp(int n) {
   return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); 
   //RESIZE_STAMP_BITS = 16
}

addCount

Есть две основные вещи: одна — обновить baseCount, а другая — определить, нужно ли расширение.

private final void addCount(long x, int check) {
 CounterCell[] as; long b, s;
 // 首先如果没有并发 此时countCells is null, 此时尝试CAS设置数据值。
 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
     // 如果 counterCells不为空以为此时有并发的设置 或者 CAS设置 baseCount 失败了
     CounterCell a; long v; int m;
     boolean uncontended = true;
     if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
         !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
         // 1. 如果没出现并发 此时计数盒子为 null
         // 2. 随机取出一个数组位置发现为空
         // 3. 出现并发后修改这个cellvalue 失败了
         // 执行funAddCount
         fullAddCount(x, uncontended);// 死循环操作
         return;
     }
     if (check <= 1)
         return;
     s = sumCount(); // 吧counterCells数组中的每一个数据进行累加给baseCount。
 }
 // 如果需要扩容
 if (check >= 0) {
  Node<K,V>[] tab, nt; int n, sc;
  while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) {
   int rs = resizeStamp(n);// 获得高位标识符
   if (sc < 0) { // 是否需要帮忙去扩容
    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
     sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0)
     break;
    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
     transfer(tab, nt);
   } // 第一次扩容
   else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
    transfer(tab, null);
   s = sumCount();
  }
 }
}
  1. baseCount добавитьConcurrentHashMapпри условииbaseCount,counterCellsдве вспомогательные переменные и однаCounterCellВспомогательный внутренний класс. sumCount() выполняет итерациюcounterCellsдля подсчета процесса суммы. Когда положишь операцию, это обязательно повлияетsize(),существуетput()В конечном итоге метод будет называтьсяaddCount()метод. целостный образ мышленияLongAdderТочно так же мышление используется для справкиConcurrentHashMap. КаждыйCellиспользовать обаContendedУкрашено, чтобы избежать ложного обмена.
  1. Расчет размера отличается в JDK1.7 и JDK1.8. В версии 1.7 он вычисляется три раза без предварительной блокировки, и если три результата различаются, он блокируется.
  2. Размер JDK1.8 рассчитывается CAS по baseCount и counterCell, и, наконец, размер получается по baseCount и обходу массива CounterCell.
  3. JDK 8 рекомендует использовать метод mappingCount, так как возвращаемое значение этого метода имеет тип long и не ограничивает максимальное значение, поскольку метод size имеет тип int.
  1. О расширении在这里插入图片描述существуетaddCountБудет буйная операция, когда выйдет первое дополнениеsc=rs << RESIZE_STAMP_SHIFT) + 2)вrs = resizeStamp(n). Вот ключевой момент, чтобы сказать,

在这里插入图片描述Если это не первое расширение, просто добавьте 1 к младшему 16-битному числу.

putTreeVal

Эта операция практическиHashMapОперация точно такая же, основная идея состоит в том, чтобы решить, идти ли влево или вправо, и, наконец, попытаться разместить новые данные, а затем сбалансировать. Разница в том, что есть соображения блокировки.

treeifyBin

Основная идея здесьHashMapПочти то же самое, разница в том, что сначала он становится TreeNode, затемОдносвязный списокобъединить.

private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    if (tab != null) {
        //如果整个table的数量小于64,就扩容至原来的一倍,不转红黑树了
        //因为这个阈值扩容可以减少hash冲突,不必要去转红黑树
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            tryPresize(n << 1);
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
            synchronized (b) { //锁定当前桶
                if (tabAt(tab, index) == b) {
                    TreeNode<K,V> hd = null, tl = null;
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        //遍历这个链表然后将每个节点封装成TreeNode,最终单链表串联起来,
                        // 最终 调用setTabAt 放置红黑树
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                              null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    //通过TreeBin对象对TreeNode转换成红黑树
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}

TreeBin

Основная функция состоит в том, чтобы преобразовать связанный список в красно-черное дерево.Это красно-черное дерево используетTreeBinупаковать. И следует отметить, что после преобразования в красно-черное дерево информация о структуре предыдущего связанного списка все еще существует, и окончательная информация выглядит следующим образом:

  1. TreeBin.first = первый узел в связанном списке.
  2. TreeBin.root = корневой узел красно-черного дерева.
TreeBin(TreeNode<K,V> b) {
            super(TREEBIN, null, null, null);   
            //创建空节点 hash = -2 
            this.first = b;
            TreeNode<K,V> r = null; // root 节点
            for (TreeNode<K,V> x = b, next; x != null; x = next) {
                next = (TreeNode<K,V>)x.next;
                x.left = x.right = null;
                if (r == null) {
                    x.parent = null;
                    x.red = false;
                    r = x; // root 节点设置为x 
                }
                else {
                    K k = x.key;
                    int h = x.hash;
                    Class<?> kc = null;
                    for (TreeNode<K,V> p = r;;) {
                   // x代表的是转换为树之前的顺序遍历到链表的位置的节点,r代表的是根节点
                        int dir, ph;
                        K pk = p.key;
                        if ((ph = p.hash) > h)
                            dir = -1;
                        else if (ph < h)
                            dir = 1;
                        else if ((kc == null &&
                                  (kc = comparableClassFor(k)) == null) ||
                                 (dir = compareComparables(kc, k, pk)) == 0)
                            dir = tieBreakOrder(k, pk);    
                            // 当key不可以比较,或者相等的时候采取的一种排序措施
                            TreeNode<K,V> xp = p;
                        // 放一定是放在叶子节点上,如果还没找到叶子节点则进行循环往下找。
                        // 找到了目前叶子节点才会进入 再放置数据
                        if ((p = (dir <= 0) ? p.left : p.right) == null) {
                            x.parent = xp;
                            if (dir <= 0)
                                xp.left = x;
                            else
                                xp.right = x;
                            r = balanceInsertion(r, x); 
                     // 每次插入一个元素的时候都调用 balanceInsertion 来保持红黑树的平衡
                            break;
                        }
                    }
                }
            }
            this.root = r;
            assert checkInvariants(root);
        }

tryPresize

Когда длина массива меньше 64, длина расширенного массива удваивается, и вызывается эта функция. Проверка размера емкости после расширения может включать инициализацию размера контейнера. И когда мощность была увеличена, она снова была привязана к мощности 2! , когда карта передается во время инициализации, будет вызван метод putAll, чтобы напрямую поместить карту вputAllЭтот метод не вызывает метод initTable для инициализации таблицы, а напрямую вызывает метод tryPresize, поэтому здесь нам нужно решить, нужно ли инициализировать таблицу.

PS: По умолчанию первый поток устанавливает sc = rs и сдвигается влево на 16 бит + 2. Когда первый поток заканчивает расширение, sc будет уменьшен на единицу. В это время sc равно rs + 1, что означает, что расширение завершено.

     /**
     * 扩容表为指可以容纳指定个数的大小(总是2的N次方)
     * 假设原来的数组长度为16,则在调用tryPresize的时候,size参数的值为16<<1(32),此时sizeCtl的值为12
     * 计算出来c的值为64, 则要扩容到 sizeCtl ≥ c
     *  第一次扩容之后 数组长:32 sizeCtl:24
     *  第三次扩容之后 数组长:128  sizeCtl:96 退出
     */
    private final void tryPresize(int size) {
        int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
            tableSizeFor(size + (size >>> 1) + 1); // 合理范围
        int sc;
        while ((sc = sizeCtl) >= 0) {
            Node<K,V>[] tab = table; int n;
                if (tab == null || (n = tab.length) == 0) {
                // 初始化传入map,今天putAll会直接调用这个。
                n = (sc > c) ? sc : c;
                if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {    
                //初始化tab的时候,把 sizeCtl 设为 -1
                    try {
                        if (table == tab) {
                            @SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = nt;
                            sc = n - (n >>> 2); // sc=sizeCtl = 0.75n
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                }
            }
             // 初始化时候如果  数组容量<=sizeCtl 或 容量已经最大化了则退出
            else if (c <= sc || n >= MAXIMUM_CAPACITY) {
                    break;//退出扩张
            }
            else if (tab == table) {
                int rs = resizeStamp(n);

                if (sc < 0) { // sc = siztCtl 如果正在扩容Table的话,则帮助扩容
                    Node<K,V>[] nt;
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break; // 各种条件判断是否需要加入扩容工作。
                     // 帮助转移数据的线程数 + 1
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                 // 没有在初始化或扩容,则开始扩容
                 // 此处切记第一次扩容 直接 +2 
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                      (rs << RESIZE_STAMP_SHIFT) + 2)) {
                        transfer(tab, null);
                }
            }
        }
    }

transfer

Объем кода здесь относительно велик, в основном разделен на три части, и я чувствую, что идея очень важна, особенноЭто эффектная операция, которую другие потоки помогают расширить.

  1. В основном это расчет минимального количества узлов бакетов, которые может обработать один поток, и инициализация некоторых атрибутов.
  2. Каждый поток приходит и сначала получает свой собственный диапазон задач[bound,i], а затем запустите --i, чтобы обойти собственный диапазон задач и обработать каждую корзину. Если головной узел корзины столкновений пуст, используйтеForwardingNodeУказывает, что сегмент в старой таблице обработан. Если вы столкнулись с уже обработанным сегментом, пропустите обработку следующего сегмента. Если это обычное ведро, заблокируйте первый узел ведра, а затем выполните миграцию в обычном режиме (та же идея, что и в третьей части HashMap).После миграции флаг местоположения исходной таблицы все равно будет обработан.

в этой функцииfinish= trueЭто означает, что операция миграции всей таблицы былавсеГотово, нам просто нужно сброситьtableцитаты и воляnextTableНазначьте его пустым. в противном случае,CASбудетsizeCtlУменьшение на единицу, указывающее, что текущий поток завершил задачу и выходит из операции расширения. Если выход успешен, необходимо дополнительно определить, является ли текущий поток последним, выполняющим расширение.

f ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
   return;

Когда было первое расширениеaddCountнаписал(resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2Указывает, что в настоящее время работает только один поток,Соответствующий,если(sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT, указывающий, что текущий поток является == последним == потоком, который все еще расширяется, тогда флаг завершения будет истинным, и метод расширения будет завершен в следующем цикле.

  1. почти какHashMapОбщая идея аналогична обходу связанного списка/красно-черного дерева, а затем расширению емкости.
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)    //MIN_TRANSFER_STRIDE 用来控制不要占用太多CPU
        stride = MIN_TRANSFER_STRIDE; // subdivide range    //MIN_TRANSFER_STRIDE=16 每个CPU处理最小长度个数

    if (nextTab == null) { // 新表格为空则直接新建二倍,别的辅助线程来帮忙扩容则不会进入此if条件
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n; // transferIndex 指向最后一个桶,方便从后向前遍历
    }
    int nextn = nextTab.length; // 新表长度
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 创建一个fwd节点,这个是用来控制并发的,当一个节点为空或已经被转移之后,就设置为fwd节点
    boolean advance = true;    //是否继续向前查找的标志位
    boolean finishing = false; // to ensure sweep(清扫) before committing nextTab,在完成之前重新在扫描一遍数组,看看有没完成的没
     // 第一部分
    // i 指向当前桶, bound 指向当前线程需要处理的桶结点的区间下限【bound,i】 这样来跟线程划分任务。
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
       // 这个 while 循环的目的就是通过 --i 遍历当前线程所分配到的桶结点
       // 一个桶一个桶的处理
        while (advance) {//  每一次成功处理操作都会将advance设置为true,然里来处理区间的上一个数据
            int nextIndex, nextBound;
            if (--i >= bound || finishing) { //通过此处进行任务区间的遍历
                advance = false;
            }
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;// 任务分配完了
                advance = false;
            }
            // 更新 transferIndex
           // 为当前线程分配任务,处理的桶结点区间为(nextBound,nextIndex)
            else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
               // nextIndex本来等于末尾数字,
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        // 当前线程所有任务完成 
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            if (finishing) {  // 已经完成转移 则直接赋值操作
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);    //设置sizeCtl为扩容后的0.75
                return;
            }
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // sizeCtl-1 表示当前线程任务完成。
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) { 
                // 判断当前线程完成的线程是不是最后一个在扩容的,思路精髓
                        return;
                }
                finishing = advance = true;// 如果是则相应的设置参数
                i = n; 
            }
        }
        else if ((f = tabAt(tab, i)) == null) // 数组中把null的元素设置为ForwardingNode节点(hash值为MOVED[-1])
            advance = casTabAt(tab, i, null, fwd); // 如果老节点数据是空的则直接进行CAS设置为fwd
        else if ((fh = f.hash) == MOVED) //已经是个fwd了,因为是多线程操作 可能别人已经给你弄好了,
            advance = true; // already processed
        else {
            synchronized (f) { //加锁操作
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    if (fh >= 0) { //该节点的hash值大于等于0,说明是一个Node节点
                    // 关于链表的操作整体跟HashMap类似不过 感觉好像更扰一些。
                        int runBit = fh & n; // fh= f.hash first hash的意思,看第一个点 放老位置还是新位置
                        Node<K,V> lastRun = f;

                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;    //n的值为扩张前的数组的长度
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;//最后导致发生变化的节点
                            }
                        }
                        if (runBit == 0) { //看最后一个变化点是新还是旧 旧
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun; //看最后一个变化点是新还是旧 旧
                            ln = null;
                        }
                        /*
                         * 构造两个链表,顺序大部分和原来是反的,不过顺序也有差异
                         * 分别放到原来的位置和新增加的长度的相同位置(i/n+i)
                         */
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                    /*
                                     * 假设runBit的值为0,
                                     * 则第一次进入这个设置的时候相当于把旧的序列的最后一次发生hash变化的节点(该节点后面可能还有hash计算后同为0的节点)设置到旧的table的第一个hash计算后为0的节点下一个节点
                                     * 并且把自己返回,然后在下次进来的时候把它自己设置为后面节点的下一个节点
                                     */
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                    /*
                                     * 假设runBit的值不为0,
                                     * 则第一次进入这个设置的时候相当于把旧的序列的最后一次发生hash变化的节点(该节点后面可能还有hash计算后同不为0的节点)设置到旧的table的第一个hash计算后不为0的节点下一个节点
                                     * 并且把自己返回,然后在下次进来的时候把它自己设置为后面节点的下一个节点
                                     */
                                hn = new Node<K,V>(ph, pk, pv, hn);    
                        }
                        setTabAt(nextTab, i, ln);    
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    else if (f instanceof TreeBin) { // 该节点hash值是个负数否则的话是一个树节点
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null; // 旧 头尾
                        TreeNode<K,V> hi = null, hiTail = null; //新头围
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p; //旧头尾设置
                                loTail = p;
                                ++lc;
                            }
                            else { // 新头围设置
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                         //ln  如果老位置数字<=6 则要对老位置链表进行红黑树降级到链表,否则就看是否还需要对老位置数据进行新建红黑树
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd); //老表中i位置节点设置下
                        advance = true;
                    }
                }
            }
        }
    }
}

get

Это очень просто: получите хэш-значение, а затем решите, существует оно или нет, просто пройдитесь по связанному списку, обратите внимание, что у get нет операции блокировки!

    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        // 计算key的hash值
        int h = spread(key.hashCode()); 
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) { // 表不为空并且表的长度大于0并且key所在的桶不为空
            if ((eh = e.hash) == h) { // 表中的元素的hash值与key的hash值相等
                if ((ek = e.key) == key || (ek != null && key.equals(ek))) // 键相等
                    // 返回值
                    return e.val;
            }
            else if (eh < 0) // 是个TreeBin hash = -2 
                // 在红黑树中查找,因为红黑树中也保存这一个链表顺序
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) { // 对于结点hash值大于0的情况链表
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

clear

Очистка относительно проста, это не что иное, как обход массива сегментов, а затем его очистка через CAS.

public void clear() {
    long delta = 0L;
    int i = 0;
    Node<K,V>[] tab = table;
    while (tab != null && i < tab.length) {
        int fh;
        Node<K,V> f = tabAt(tab, i);
        if (f == null)
            ++i; //这个桶是空的直接跳过
        else if ((fh = f.hash) == MOVED) { // 这个桶的数据还在扩容中,要去扩容同时等待。
            tab = helpTransfer(tab, f);
            i = 0; // restart
        }
        else {
            synchronized (f) { // 真正的删除
                if (tabAt(tab, i) == f) {
                    Node<K,V> p = (fh >= 0 ? f :(f instanceof TreeBin) ?((TreeBin<K,V>)f).first : null);
                        //循环到链表/者红黑树的尾部
                        while (p != null) {
                            --delta; // 记录删除了多少个
                            p = p.next;
                        } 
                        //利用CAS无锁置null  
                        setTabAt(tab, i++, null);
                    }
                }
            }
        }
        if (delta != 0L)
            addCount(delta, -1); //调整count
    }

end

ConcurrentHashMap — это что, еслиПараллельная безопасность, и какэффективныйКак насчет одновременного?

  1. Первый — это операция чтения.После прочтения исходного кода обнаруживается, что механизм синхронизации в методе get вообще не используется и не используется.unsafeметод, поэтому операции чтения поддерживают параллельные операции.

  2. операция записи

  • , Функция расширения данныхtransfer, единственный методaddCount,helpTransferиtryPresizeЭти три метода называются.
  1. addCount — это метод, который будет вызываться при манипулировании массивом и изменении количества элементов, хранящихся в массиве.
  2. helpTransferКогда поток хочет работать с элементами в таблице, если он обнаружит, что хэш узла = MOVED, он вызоветhelpTransferметод, вhelpTransferОтзыватьtransferметод, помогающий завершить расширение массива
  1. tryPresizeвtreeIfybinиputAllвызываемый метод,treeIfybinв основном вputПосле добавления элементов оцените, превысило ли количество связанных элементов узла массива 8. Если оно превышает, этот метод будет вызываться для расширения массива или преобразования связанного списка в дерево. УведомлениеputAllВызывается при инициализации большой карты. ·

Кратко опишите ситуацию с расширением:

  1. При добавлении элементов на карту расширение массива сработает только тогда, когда номер определенного узла превысил 8, а длина массива меньше 64.
  2. Когда количество элементов в массиве достигнет sizeCtl, будет вызван метод переноса для расширения емкости

3. Можно ли выполнять чтение и запись во время расширения.

Для операций чтения это возможно, поскольку оно не заблокировано. Для операций записи JDK8 расширил диапазон блокировок доtable[i]l, когда массив расширяется, если текущий узел не был обработан (то есть он не был установлен в узел ==fwd==), тогда может быть выполнена операция установки. Если узел был обработан, текущий поток также == присоединится == к операции раскрытия.

  1. Как синхронизируются несколько потоков? существуетConcurrentHashMap, процесс синхронизации в основном черезSynchronizedиunsafeАтомарность аппаратного уровня выполняется в обоих направлениях.
  1. При получении sizeCtl и Node в определенной позиции используются обаunsafeметод для достижения цели безопасности параллелизма
  2. Когда узел должен быть установлен в определенной позиции, он будет передан черезSynchronizedМеханизм синхронизации для блокировки узла в этом месте.
  3. Когда массив расширяется, он обрабатывается步长иfwdУзел достигает цели параллельной безопасности, устанавливая хеш-значение в MOVED=-1.
  4. При копировании узла в определенной позиции в развернутую таблицу также проходитSynchronizedмеханизм синхронизации для обеспечения безопасности потоков

рутина

  1. Расскажите о HashMap, который вы понимаете, и обсудите с ним процесс get put.
  2. 1.8 Какие оптимизации были сделаны?
  3. Это потокобезопасно?
  4. Какие проблемы может вызвать неуверенность?
  5. Как решить? Существуют ли потокобезопасные параллельные контейнеры?
  6. Как реализован ConcurrentHashMap? 1.7, 1.8 чем отличается реализация и почему.
  7. Роль sizeCtl в ConcurrentHashMap в 1.8 примерно заключается в том, чтобы помочь увеличить емкость и биты флага.
  8. Почему HashMap не заменяет красно-черное дерево таблицей переходов?

Ссылаться на

Передача CurrentHashMap CurrentHashMapDetails Анализ принципа LongAdder

В этой статье используетсяmdniceнабор текста