Оригинальная статья, краткое изложение опыта и жизненные перипетии на всем пути от набора в школу до фабрики.
Нажмите, чтобы узнать подробностиwww.codercc.com[1]
1. Введение в ConcurrentHashmap
При использовании HashMap загрузка ЦП будет близка к 100% в случае многопоточности, потому что hashmap не является потокобезопасным, обычно мы можем использовать древний класс hashtable в системе java, в основном все методы этого класса синхронизированы. Потокобезопасное управление, вполне возможно, что в случае высокого уровня параллелизма только один поток может одновременно получить блокировку монитора объекта.Такая производительность параллелизма действительно неудовлетворительна. Другой способ — через КоллекцииMap<K,V> synchronizedMap(Map<K,V> m)
Оберните хэш-карту в потокобезопасную карту. Например, исходный код метода put SynchronzedMap:
public V put(K key, V value) {
synchronized (mutex) {return m.put(key, value);}
}
На самом деле реализация SynchronizedMap по-прежнему использует синхронизированные эксклюзивные блокировки для поточно-ориентированного управления параллелизмом. Кроме того, производительность этого решения не является удовлетворительной. В ответ на эту ситуацию мастер Дуг Ли не пожалел усилий, чтобы создать для нас несколько потокобезопасных параллельных контейнеров, осчастливив каждого разработчика Java. По сравнению с hashmap, ConcurrentHashMap является поточно-ориентированной картой, гдеИспользование идеи сегментации блокировки для улучшения параллелизма.
В сети много информации о ConcurrentHashMap по версии JDK1.6, если интересно, можете глянуть. Ключевые элементы выпуска JDK 1.6:
- Сегмент наследует роль ReentrantLock в качестве блокировки, обеспечивая гарантию потокобезопасности для каждого сегмента;
- Сегмент поддерживает несколько сегментов хэш-таблицы, и каждый сегмент состоит из связанного списка HashEntry.
В JDK 1.8 ConcurrentHashMap сильно изменился, и объем кода значительно увеличился. Версия 1.8 отказалась от сегмента и широко использовала синхронизированные операции и операции без блокировки CAS для обеспечения потокобезопасности операций ConcurrentHashMap. А почему бы не использовать ReentrantLock, но Synchronzied? На самом деле, synchronzied сделал много оптимизаций, включая предвзятые блокировки, облегченные блокировки и тяжеловесные блокировки, которые могут повышать статус блокировки в свою очередь, но не понижать (о синхронизированном можетсм. эту статью[2]), поэтому производительность использования synchronized по сравнению с ReentrantLock будет такой же или даже лучше в некоторых случаях.Для конкретных тестов производительности вы можете проверить некоторую информацию в Интернете. Кроме того, базовая структура данных изменена на форму данных массив + связанный список + красно-черное дерево.
2. Ключевые атрибуты и классы
Прежде чем понять конкретную реализацию метода ConcurrentHashMap, нам нужно систематически просмотреть несколько ключевых мест.
Ключевые свойства ConcurrentHashMap
-
tablevolatile Node
[] table://Загрузить массив Node в качестве контейнера данных ConcurrentHashMap, использовать метод ленивой загрузки, операция инициализации не будет выполняться до тех пор, пока данные не будут вставлены в первый раз, размер массива всегда равна степени 2. -
nextTablevolatile Node
[] nextTable; //Используется при расширении, обычно null, только ненулевое значение при расширении -
sizeCtlизменчивый int sizeCtl; Это свойство используется для управления размером табличного массива.Существует несколько ситуаций в зависимости от того, инициализируется ли он и расширяется ли: **Когда значение отрицательное: **Если это -1, это означает инициализацию, если это -N, это означает, что в настоящее время имеется N-1 потоков для операции расширения; ** Когда значение положительное: ** Если текущий массив равен нулю, это означает, что таблица находится в процессе инициализации, а sizeCtl означает длину создаваемого нового массива; Если он был инициализирован, это означает, что доступная емкость текущего контейнера данных (табличного массива) также может быть понята как критическое значение (количество вставленных узлов необходимо расширить, если количество вставленных узлов превышает критическое значение) , который конкретно относится к длине массива, умноженной на коэффициент нагрузки loadFactor; Когда значение равно 0, длина массива является начальным значением по умолчанию.
-
sun.misc.Unsafe UВ реализации ConcurrentHashMapde вы можете увидеть большое количество методов U.compareAndSwapXXXX для изменения некоторых свойств ConcurrentHashMap. Эти методы фактически используют алгоритм CAS для обеспечения безопасности потоков, что является оптимистичной стратегией, предполагающей, что каждая операция не приведет к конфликту, и повторить попытку, если и только если возникнет конфликт. В то время как операции CAS основаны на современных наборах инструкций процессора, через лежащий в их основеCMPXCHGРеализация инструкции. Основная идея CAS(V,O,N):Если фактическое значение V текущей переменной совпадает с ожидаемым старым значением O, это означает, что переменная не была изменена другими потоками, поэтому безопасно присвоить переменной новое значение N; если фактическое значение V текущей переменной отличается от ожидаемого старого значения O Если оно такое же, это означает, что переменная была обработана другими потоками В настоящее время небезопасно присваивать переменной новое значение N. Повторная попытка. Использование CAS при реализации большого количества синхронных компонентов и параллельных контейнеров достигается за счет
sun.misc.Unsafe
Реализуется классом, предоставляющим некоторые низкоуровневые операции, которые могут напрямую манипулировать памятью и потоками, которые можно понимать как «указатели» в java. Доступ к этой переменной-члену находится в блоке статического кода:static { try { U = sun.misc.Unsafe.getUnsafe(); ....... } catch (Exception e) { throw new Error(e); } }
Ключевые внутренние классы в ConcurrentHashMap
-
NodeКласс Node реализует интерфейс Map.Entry, который в основном хранит пары ключ-значение и имеет следующее поле
static class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; volatile V val; volatile Node<K,V> next; ...... }
Кроме того, видно, что многие атрибуты модифицируются с помощью volatile, то есть для обеспечения видимости памяти.
-
TreeNodeУзел дерева, унаследованный от класса Node, несущего данные. Операция красно-черного дерева для класса TreeBin, из комментариев этого класса также видно, что TreeBin снова будет инкапсулировать TreeNode.
** * Nodes for use in TreeBins */ static final class TreeNode<K,V> extends Node<K,V> { TreeNode<K,V> parent; // red-black tree links TreeNode<K,V> left; TreeNode<K,V> right; TreeNode<K,V> prev; // needed to unlink next upon deletion boolean red; ...... }
-
TreeBinЭтот класс не отвечает за перенос информации о ключе и значении пользователя, но является переносом многих узлов TreeNode. Фактический «массив» ConcurrentHashMap хранит объекты TreeBin, а не объекты TreeNode.
static final class TreeBin<K,V> extends Node<K,V> { TreeNode<K,V> root; volatile TreeNode<K,V> first; volatile Thread waiter; volatile int lockState; // values for lockState static final int WRITER = 1; // set while holding write lock static final int WAITER = 2; // set when waiting for write lock static final int READER = 4; // increment value for setting read lock ...... }
-
ForwardingNodeКлюч, значение и хеш-хестные узлы, которые появляются только во время расширения, все ноль. И иметь указатель Nallytable, ссылаясь на новый массив таблицы.
static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } ..... }
Ключевые операции CAS
Выше мы упоминали, что CAS будет широко использоваться в ConcurrentHashMap для изменения его свойств и некоторых операций. Поэтому, прежде чем понять метод ConcurrentHashMap, нам необходимо понять следующие часто используемые алгоритмы CAS для обеспечения потокобезопасных операций.
-
tabAt
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); }
Этот метод используется для получения элемента Node с индексом i в массиве таблиц.
-
casTabAt
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); }
Используйте операцию CAS, чтобы установить элемент с индексом i в массиве таблиц.
-
setTabAt
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) { U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v); }
Этот метод используется для установки элемента с индексом i в массиве таблиц
3. Объяснение основных методов
Ознакомившись с приведенной выше основной информацией, давайте посмотрим, как последовательно реализуются несколько часто используемых методов.
3.1 Методы конструктора экземпляра
Первое, что нужно сделать при использовании ConcurrentHashMap, — это создать новый объект ConcurrentHashMap, который предоставляет следующие методы конструктора:
// 1. 构造一个空的map,即table数组还未初始化,初始化放在第一次插入数据时,默认大小为16
ConcurrentHashMap()
// 2. 给定map的大小
ConcurrentHashMap(int initialCapacity)
// 3. 给定一个map
ConcurrentHashMap(Map<? extends K, ? extends V> m)
// 4. 给定map的大小以及加载因子
ConcurrentHashMap(int initialCapacity, float loadFactor)
// 5. 给定map大小,加载因子以及并发度(预计同时操作数据的线程)
ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel)
ConcurrentHashMap предоставляет нам в общей сложности 5 методов конструктора.Подробности см. в комментариях.Давайте взглянем на второй конструктор.Когда указанный размер передается, исходный код конструктора:
public ConcurrentHashMap(int initialCapacity) {
//1. 小于0直接抛异常
if (initialCapacity < 0)
throw new IllegalArgumentException();
//2. 判断是否超过了允许的最大值,超过了话则取最大值,否则再对该值进一步处理
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
//3. 赋值给sizeCtl
this.sizeCtl = cap;
}
Пожалуйста, смотрите комментарии для логики этого кода.Это легко понять.Если оно меньше 0, будет выброшено исключение напрямую.Если указанное значение больше максимально допустимого, будет взято максимальное значение.В противном случае , указанное значение будет дополнительно обработано. Наконец, присвойте размеру sizeCtl ограничение. Пожалуйста, обратитесь к приведенному выше описанию для описания sizeCtl.При вызове метода конструктора размер sizeCtl должен представлять размер ConcurrentHashMap, то есть длину массива таблицы.. Что делает tableSizeFor? Исходный код:
/**
* Returns a power of two table size for the given desired capacity.
* See Hackers Delight, sec 3.2
*/
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
Из комментариев видно, что этот метод будет преобразовывать размер, указанный при вызове метода-конструктора, в степень двойки, то есть размер ConcurrentHashMap должен быть степенью двойки, например, когда указанный размер равен 18. Чтобы удовлетворить характеристику степени 2, размер concurrentHashMapd фактически равен 2 в 5-й степени (32). Кроме того, следует отметить, что,При вызове метода-конструктора массив таблиц не строится (что можно понимать как контейнер данных ConcurrentHashMap), а только вычисляется длина массива таблиц. инициализация и создание массива таблиц действительно завершены..
3.2 Метод initTable
Перейдите непосредственно к исходному коду:
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
// 1. 保证只有一个线程正在进行初始化操作
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
// 2. 得出数组的大小
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
// 3. 这里才真正的初始化数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 4. 计算数组中可用的大小:实际大小n*0.75(加载因子)
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
Пожалуйста, обратитесь к комментариям для логики кода.Возможна ситуация, когда несколько потоков обращаются к этому методу одновременно.Чтобы обеспечить правильную инициализацию, на шаге 1 сначала будет оцениваться if.Если есть в настоящее время является потоком, который инициализируется Значение sizeCtl становится равным 1. В это время другие потоки оценивают If как истинное и вызывают Thread.yield(), чтобы отказаться от кванта времени ЦП. Инициализирующийся поток вызовет метод U.compareAndSwapInt, чтобы изменить sizeCtl на -1, то есть состояние инициализации. Следует также отметить, что на четвертом шаге будет дополнительно рассчитан доступный размер массива, который представляет собой фактический размер массива, умноженный на коэффициент загрузки 0,75. Вы можете видеть, как здесь вычисляется умножение 0,75, а 0,75 четверть три, здесьn - (n >>> 2)
Это просто n-(1/4)n=(3/4)n, что интересно :). Если выбран конструктор без параметров, здесь будет использоваться размер нового массива Node по умолчанию.DEFAULT_CAPACITY
(16), затем умножаем на коэффициент загрузки 0,75, чтобы получить 12, что означает, что доступный размер массива равен 12.
3.3 ставить метод
Наиболее продолжительное использование ConcurrentHashMap должно быть методами put и get.Давайте посмотрим, как реализован метод put. При вызове метода put фактической конкретной реализацией является метод putVal.Исходный код выглядит следующим образом:
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
//1. 计算key的hash值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//2. 如果当前table还没有初始化先调用initTable方法将tab进行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//3. tab中索引为i的位置的元素为null,则直接使用CAS将值插入即可
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//4. 当前正在扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
//5. 当前为链表,在链表中插入新的键值对
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 6.当前为红黑树,将新的键值对插入到红黑树中
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 7.插入完键值对后再根据实际大小看是否需要转换成红黑树
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//8.对当前容量大小进行检查,如果超过了临界值(实际大小*加载因子)就需要扩容
addCount(1L, binCount);
return null;
}
Объем кода метода put немного велик, поэтому давайте пошагово рассмотрим приведенные выше шаги декомпозиции.В целом, чтобы решить проблему безопасности потоков, ConcurrentHashMap использует синхронизированные и CAS-методы.. Если вы знали о HashMap до и о ConcurrenHashMap до версии 1.8, то должны знать структурную схему ConcurrentHashMap.Для последующего пояснения она будет приведена прямо здесь.Если у вас есть какие-либо сомнения по этому поводу, вы можете поискать ее на Интернет.
Как показано на рисунке (картинка взята из сети), ConcurrentHashMap представляет собой массив хеш-багет, при отсутствии хэш-конфликта каждый элемент равномерно распределяется в массиве хеш-багет. Когда происходит коллизия хэшей, даСтандартное решение для цепочек адресов, узлы с одинаковым значением хеш-функции образуют связанный список, который называется «метод застежки».Кроме того, в версии 1.8, чтобы предотвратить слишком длинную застежку-молнию, когда длина связанного списка больше 8 , связанный список будет преобразован в красно-черное дерево. Каждый элемент массива таблиц фактически является головным узлом односвязного списка или корневым узлом красно-черного дерева. При вставке пары ключ-значение он должен сначала найти вставляемое ведро, то есть вставить индекс i массива таблиц. Итак, как рассчитать индекс i? Конечно, это основано на значении hashCode ключа.
- Повторное хэширование spread() для уменьшения коллизий хэшей
Мы знаем, что для хеш-таблицы, если значение хеш-функции распределено неравномерно, это значительно увеличит вероятность коллизии хэшей, что повлияет на производительность хэш-таблицы. Таким образом, повторное хеширование выполняется с помощью метода распространения, чтобы значительно снизить вероятность коллизий хэшей. Метод распространения:
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
Метод в основномXOR младших 16 бит хэш-кода ключа в старших 16 битах, который может не только разогнать хеш-значение и равномерно снизить вероятность конфликта хэшей, но и использовать только операцию XOR, которая также может учитывать накладные расходы на производительность и достигать сбалансированного компромисса.
2. Инициализируйте таблицу
Сразу после второго шага он определит, инициализирован ли текущий массив таблиц, если нет, то вызовем initTable для его инициализации, этот метод упоминался выше.
3. Можно ли вставить новое значение прямо в массив таблицы
Из приведенной выше структурной схемы видно, что существует такая ситуация, если массив таблиц, в который должно быть вставлено значение, является нулевым, значение может быть вставлено напрямую. Итак, как определить индекс i, который нужно вставить в таблицу, по хэшу? Очевидно, вы можете определить, где новое значение вставлено в массив, взяв операцию по модулю хеш-значения и длины массива. Ранее мы упоминали, что размер ConcurrentHashMap всегда равен степени 2, а операция (n - 1) & hash эквивалентна взятию по модулю длины n, то есть hash%n, но битовая операция более эффективна. чем операция по модулю.Многие мастера Дуг Ли также максимально оптимизировали производительность при разработке параллельных контейнеров, что достойно восхищения.
После определения индекса i массива вы можете использовать метод tabAt() (этот метод был объяснен выше, вы можете вернуться и посмотреть на него, если у вас есть какие-либо сомнения), чтобы получить элемент в этой позиции. Узел f имеет значение null, вы можете вставить новое значение напрямую с помощью метода casTabAt.
4. Расширяется ли емкость в настоящее время
Если текущий узел не нулевой, а узел является специальным узлом (forwardingNode), это означает, что текущий concurrentHashMap подвергается операции расширения.Операция расширения будет объяснена как конкретный метод ниже. Итак, как определить, является ли текущий узел особым узлом? Судя по тому, равно ли хеш-значение узла -1 (MOVED), код (fh = f.hash) == MOVED, и объяснение MOVED также четко написано в исходном коде:
static final int MOVED = -1; // hash for forwarding nodes
5. Когда таблица [I] является узлом головного узла связанного списка, вставьте новое значение в связанный список
Когда table[i] не равен null и не forwardingNode, а значение хеш-функции текущего узла f больше 0 (fh >= 0), это означает, что текущий узел f является головным узлом связанного списка, состоящего из всех узлы текущего сегмента. Затем, если вы хотите вставить новое значение в ConcurrentHashMap, вы вставите новое значение в этот связанный список. Блокировка выполняется с помощью synchronized (f) для обеспечения безопасности потоков. Часть кода для вставки узла в связанный список:
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 找到hash值相同的key,覆盖旧值即可
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
//如果到链表末尾仍未找到,则直接将新值插入到链表末尾即可
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
Эта часть кода проста для понимания, есть два случая: 1. Если в связанном списке найден узел с тем же ключом, что и вставляемая пара ключ-значение, его можно перезаписать напрямую 2. Если есть не является узлом, пока не будет найден конец связанного списка. Если он найден, просто добавьте пару ключ-значение, которая будет вставлена в конец связанного списка.
6. Когда table[i] является корневым узлом красно-черного дерева, вставьте новое значение в красно-черное дерево.
Согласно предыдущей схеме проектирования массив + связанный список, здесь есть проблема. Даже если коэффициент загрузки и алгоритм хеширования спроектированы разумно, неизбежно возникнет ситуация, когда застежка-молния слишком длинная. даже в крайних случаях найти узел будет иметь временную сложность O(n), что серьезно повлияет на производительность ConcurrentHashMap, поэтому в версии JDK1.8 структура данных дополнительно оптимизирована и красно-черное дерево вводится. Когда длина связанного списка слишком велика (по умолчанию превышает 8), связанный список преобразуется в красно-черное дерево, а производительность ConcurrentHashMap повышается за счет использования характеристик быстрого добавления, удаления, модификации и запроса красно-черное дерево, которое будет использовать красно-черное дерево для алгоритма вставки, удаления, поиска и т. д. Когда table[i] является узлом дерева красно-черного дерева, операция такова:
if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
Первый проход в iff instanceof TreeBin
Судя по тому, является ли текущая таблица[i] узлом дерева, это просто подтверждает, что TreeBin, о котором мы упоминали выше, будет дополнительно инкапсулировать TreeNode, а операция красно-черного дерева нацелена на TreeBin, а не на TreeNode. Этот код очень прост, вызовите метод putTreeVal для завершения вставки новых узлов в красно-черное дерево, та же логика,Если в красно-черном дереве есть узел с таким же ключом вставляемой пары ключ-значение (значение хеш-функции равно и метод equals оценивается как истинный), старое значение будет перезаписано, в противном случае к красно-черному дереву добавится новый узел.
7. Отрегулируйте в соответствии с текущим количеством узлов
Когда вставка нового узла данных будет завершена, текущий размер связанного списка будет дополнительно скорректирован.Эта часть кода:
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
Нетрудно понять, что если текущее количество узлов связанного списка больше или равно 8 (TREEIFY_THRESHOLD), будет вызван метод treeifyBin для преобразования tabel[i] (i-го хэш-сегмента) в красно-черное дерево.
На данный момент логика метода Put в основном такая же.Теперь давайте сделаем некоторые выводы:
Общий процесс:
- Во-первых, для каждого значения put сначала используйте метод распространения, чтобы выполнить хеш-вычисление хэш-кода ключа, тем самым определив позицию значения в таблице;
- Если текущий массив таблиц не был инициализирован, сначала инициализируйте массив таблиц;
- Если эта позиция равна нулю, используйте операцию CAS, чтобы указать ее напрямую;
- Если в этой позиции есть узел, значит, произошла хеш-коллизия.Сначала определите тип этого узла. Если узел fh==MOVED (представляющий forwardingNode, массив расширяется), это означает, что расширение выполняется;
- Если это узел связанного списка (fh>0), полученный узел является головным узлом связанного списка, состоящего из узлов с одинаковым значением хеш-функции. Необходимо по очереди пройти назад, чтобы определить местонахождение вновь добавленной стоимости. Если вы встретите узел с тем же ключом, вам нужно только перезаписать значение узла. В противном случае идите назад по очереди, пока конец связанного списка не будет вставлен в этот узел;
- Если тип этого узла — TreeBin, напрямую вызовите метод вставки красно-черного дерева, чтобы вставить новый узел;
- После вставки узла снова проверьте длину связанного списка, если длина больше 8, преобразуйте связанный список в красно-черное дерево;
- Проверьте текущий размер емкости, если он превышает критическое значение (фактический размер * коэффициент загрузки), вам необходимо увеличить емкость.
3.4 получить метод
После прочтения метода put очень легко посмотреть на метод get.Просто посмотрите на него обратным мышлением.Если он будет сохранен, я вытащу его в обратном порядке. Исходный код метода get:
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 1. 重hash
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 2. table[i]桶节点的key与查找的key相同,则直接返回
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 3. 当前节点hash小于0说明为树节点,在红黑树中查找即可
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
//4. 从链表中查找,查找到则返回该节点的value,否则就返回null即可
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
Пожалуйста, обратитесь к комментариям для ознакомления с логикой кода. Во-первых, проверьте, является ли текущий узел массива хеш-контейнеров, то есть table[i], искомым узлом. Если да, верните его напрямую, если нет, продолжайте посмотрите, является ли это узлом дерева? Увидев, что хеш-значение узла меньше 0, если оно меньше 0, это узел дерева. Если это узел дерева найти узел в красно-черном дереве, если это не узел дерева, остается только одна возможность в виде связанного списка, то пройти по узлу поиска в обратном направлении, и если он найден, вернуть значение узла. , или ноль, если не найдено.
3.5 способ передачи
Когда емкости ConcurrentHashMap недостаточно, таблицу необходимо расширить. Основная идея этого метода очень похожа на HashMap, но поскольку он поддерживает параллельное расширение, он намного сложнее. Причина в том, что он поддерживает многопоточные операции расширения без блокировки. Я думаю, что цель этого состоит не только в том, чтобы удовлетворить требования параллельной обработки, но и в том, чтобы использовать параллельную обработку для сокращения времени, затрачиваемого на расширение. Исходный код метода передачи:
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
//1. 新建Node数组,容量为之前的两倍
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
//2. 新建forwardingNode引用,在之后会用到
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 3. 确定遍历中的索引i
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
//4.将原数组中的元素复制到新数组中去
//4.5 for循环退出,扩容结束修改sizeCtl属性
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
//4.1 当前数组中第i个元素为null,用CAS设置成特殊节点forwardingNode(可以理解成占位符)
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//4.2 如果遍历到ForwardingNode节点 说明这个点已经被处理过了 直接跳过 这里是控制并发扩容的核心
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
//4.3 处理当前节点为链表的头结点的情况,构造两个链表,一个是原链表 另一个是原链表的反序排列
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//在nextTable的i位置上插入一个链表
setTabAt(nextTab, i, ln);
//在nextTable的i+n的位置上插入另一个链表
setTabAt(nextTab, i + n, hn);
//在table的i位置上插入forwardNode节点 表示已经处理过该节点
setTabAt(tab, i, fwd);
//设置advance为true 返回到上面的while循环中 就可以执行i--操作
advance = true;
}
//4.4 处理当前节点是TreeBin时的情况,操作和上面的类似
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
Пожалуйста, смотрите комментарии для логики кода Вся операция расширения делится надве части:
первая частьЭто построить nextTable, его емкость в два раза больше исходной, эта операция выполняется одним потоком. Код для создания нового массива таблиц:Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]
, исходя из исходной емкости, сдвиг вправо на единицу.
вторая частьЭто копирование элементов исходной таблицы в nextTable, в основном через процесс копирования. Получите позицию i текущего пройденного массива в соответствии с операцией, а затем используйте метод tabAt, чтобы получить элемент в позиции i, а затем оцените:
- Если эта позиция пуста, поместите узел forwardNode в позицию i в исходной таблице, которая также является ключевой точкой для запуска параллельного расширения;
- Если эта позиция является узлом узла (fh>=0), отметьте исходный узел связанного списка через fh и n, а затем создайте обратный связанный список и поместите их в позиции i и i+n следующей таблицы соответственно (см. этот блог.: https://www.cnblogs.com/yangchunchun/p/7279881.html
- Если эта позиция является узлом TreeBin (fh
- После обхода всех узлов работа по репликации завершена. В это время пусть nextTable будет новой таблицей, и обновите sizeCtl до 0,75-кратной новой емкости, чтобы завершить расширение. Установите в 0,75 раза больше новой емкости Код
sizeCtl = (n << 1) - (n >>> 1)
, если очень умно понять внимательно, n>>1, и сдвигу n вправо эквивалентно делению n на 2, что равно 0,5n, а затем и тому, и другому. Вычитание равно 2n-0,5n=1,5n, точно ли оно равно 0,75-кратной новой емкости, то есть 2n*0,75=1,5n. Наконец, для подведения итогов используется принципиальная схема (картинка взята из сети):
3.6 Некоторые методы, связанные с размером
Для ConcurrentHashMap количество вещей в этой таблице на самом деле является неопределенной суммой, потому чтоНевозможно остановить другие потоки, чтобы вы считали, как сборщик мусора «остановит мир» при вызове метода size(), поэтому это число можно назвать только оценочным. для этой оценкиConcurrenthathMap также рассчитывается с большим усилием.
Чтобы подсчитать количество элементов, ConcurrentHashMap определяет некоторые переменные и внутренний класс.
/** * A padded cell for distributing counts. Adapted from LongAdder * and Striped64. See their internal docs for explanation. */ @sun.misc.Contended static final class CounterCell { volatile long value; CounterCell(long x) { value = x; } }
/******************************************/
/**
- 实际上保存的是hashmap中的元素个数 利用CAS锁进行更新
但它并不用返回当前hashmap的元素个数
/ private transient volatile long baseCount; /*
- Spinlock (locked via CAS) used when resizing and/or creating CounterCells.
*/ private transient volatile int cellsBusy;
/**
- Table of counter cells. When non-null, size is a power of 2.
*/ private transient volatile CounterCell[] counterCells;
методы отображения и размера
mappingCountиsizeСходство метода Из приведённых комментариев следует использовать mapCount вместо метода size.Оба метода напрямую не возвращают basecount, а подсчитывают значение один раз, и это значение на самом деле является приблизительным значением, поэтому могут быть и другие статистика во время вычисления Поток выполняет операцию вставки или удаления.
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
/**
* Returns the number of mappings. This method should be used
* instead of {@link #size} because a ConcurrentHashMap may
* contain more mappings than can be represented as an int. The
* value returned is an estimate; the actual count may differ if
* there are concurrent insertions or removals.
*
* @return the number of mappings
* @since 1.8
*/
public long mappingCount() {
long n = sumCount();
return (n < 0L) ? 0L : n; // ignore transient negative values
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;//所有counter的值求和
}
}
return sum;
}
метод addCount
В конце метода put вызывается метод addCount, который добавляет к количеству элементов текущего ConcurrentHashMap 1. Всего этот метод делает две вещи: обновляет значение baseCount и определяет, нужно ли расширять.
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//利用CAS方法更新baseCount的值
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
//如果check值大于等于0 则需要检验是否需要进行扩容操作
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
//
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
//如果已经有其他线程在执行扩容操作
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//当前线程是唯一的或是第一个发起扩容的线程 此时nextTable=null
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
4. Резюме
ConcurrentHashmap в JDK6 и 7 в основном использует сегмент для уменьшения детализации блокировки и разделен на несколько сегментов. Сегменты должны быть заблокированы при вводе, но не блокироваться при получении, а volatile используется для обеспечения видимости. Когда требуется глобальная статистика ( для Например, размер), во-первых, он несколько раз попытается вычислить количество модов, чтобы определить, изменили ли его другие потоки за эти несколько попыток, и если нет, то вернет размер напрямую. Если есть, вам нужно заблокировать все Сегменты по очереди для расчета.
До версии 1.8 при размещении узла для поиска узла он должен сначала найти определенный сегмент, а затем определить местонахождение определенного сегмента в сегменте. В версии 1.8 от раздутого сегмента отказались, и он был напрямую нацелен на каждую корзину в массиве историй Node[], что еще больше уменьшило детализацию блокировок. И чтобы предотвратить слишком большую производительность застежки-молнии, когда длина связанного списка больше 8, используется красно-черный дизайн дерева.
Основные изменения конструкции заключаются в следующем:
- Вместо использования сегментов используются узлы, а узлы блокируются, чтобы уменьшить степень детализации блокировки.
- Состояние MOVED разработано. Когда поток 2 все еще помещает данные в процесс изменения размера, поток 2 помогает изменить размер.
- Используйте 3 операции CAS, чтобы обеспечить атомарность некоторых операций на узле вместо блокировок.
- Разные значения sizeCtl представляют разные значения и играют контролирующую роль.
- Используйте синхронизированный вместо ReentrantLock
Дополнительные сведения о сравнении реализации ConcurrentHashMap между версиями 1.7 и 1.8 см.эта статья[3].
Справочная статья
Версия 1.8 ConcurrentHashMap
- http://www.importnew.com/22007.html[4]
- http://www.jianshu.com/p/c0642afe03e0[5]
Версия 1.8.
http://www.importnew.com/20386.html[6]
использованная литература
[1]www.codercc.com: https://www.codercc.com
[2]Смотрите эту статью:https://juejin.im/post/6844903600334831629
[3]Эта статья:http://www.jianshu.com/p/e694f1e868ec
[4]http://www.importnew.com/22007.html: http://www.importnew.com/22007.html
[5]http://www.jianshu.com/p/c0642afe03e0: http://www.jianshu.com/p/c0642afe03e0
[6]http://www.importnew.com/20386.html: http://www.importnew.com/20386.html