1. Введение
Зачем использовать ConcurrentHashMap
В основном по двум причинам:
- Использование HashMap в параллельном программировании может привести к бесконечному циклу (jdk1.7, jdk1.8 приведет к потере данных)
- HashTable очень неэффективен
2. Структура ConcurrentHashMap
В jdk 1.7 и jdk 1.8 структура ConcurrentHashMap сильно изменилась, что будет объяснено позже.
2.1 Структура в jdk 1.7
В jdk 1.7 ConcurrentHashMap состоит из структуры данных Segment и структуры массива HashEntry. Возьмите сегментные замки для обеспечения безопасности.
Segment — это реентерабельная блокировка ReentrantLock, играющая роль блокировки в ConcurrentHashMap; HashEntry используется для хранения данных о паре ключ-значение.
ConcurrentHashMap содержит массив Segments, а Segment содержит массив HashEntry s. Структура Segment аналогична структуре HashMap, которая представляет собой массив и структуру связанного списка.
2.2 Структура в jdk 1.8
Реализация JDK1.8 отказалась от концепции сегмента, но напрямую реализовала ее со структурой данных массива узлов + связанный список + красно-черное дерево.Управление параллелизмом осуществляется с использованием Synchronized и CAS.Все выглядит оптимизированным и потоковым safe.HashMap, хотя структуру данных Segment все еще можно увидеть в JDK1.8, атрибуты были упрощены, чтобы быть совместимыми со старой версией.
3. Реализация
3.1 Реализация в JDK 1.7
3.1.1 Инициализация
Инициализация ConcurrentHashMap заключается в инициализации размера сегмента (представленного ssize) посредством битовой операции, которая вычисляется concurrentLevel.
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
ssize вычисляется операцией определения местоположения (ssize
Инициализация HashEntry для каждого элемента Segment также рассчитывается в соответствии с операцией местоположения, которая представлена заглавной буквой.
int cap = 1;
while (cap < c)
cap <<= 1;
Вычисление размера HashEntry также является степенью N числа 2 (cap
3.1.2 получить операцию
Операция получения сегмента очень проста и эффективна: он хешируется один раз, затем значение хеш-функции используется для поиска сегмента с помощью операции хеширования, а затем для поиска элемента с помощью алгоритма хеширования.
public V get(Object key){
int hash = hash(key.hashCode());
return segmentFor(hash).get(key,hash);
}
Эффективность операции get заключается в том, что весь процесс get не нужно блокировать, он будет заблокирован и перечитан, пока не будет прочитано пустое значение. Причина в том, чтобы определить общую переменную, используемую какvolatile
тип.
transient volatile int count;
volatile V value;
3.1.3 поставить операцию
Для вставки данных ConcurrentHashMap здесь выполняются два хэша, чтобы определить место хранения данных.
static class Segment<K,V> extends ReentrantLock implements Serializable {
//省略
}
При выполнении операции put она проходит два этапа:
- Определите, требуется ли расширение
- Перейдите к расположению, где был добавлен элемент и поместил его в массив Hashentry
Процесс вставки выполнит первый ключевой хеш, чтобы найти позицию сегмента.Если сегмент не был инициализирован, то есть присвоить значение с помощью операции CAS, а затем выполнить вторую хэш-операцию, чтобы найти соответствующую позицию HashEntry, которая будет быть унаследованы здесь.Характеристики блокировки, при вставке данных в указанную позицию HashEntry (метод вставки хвоста), она будет наследовать ReentrantLock'stryLock()
Метод пытается получить блокировку. Если получение прошло успешно, она будет вставлена непосредственно в соответствующую позицию. Если поток уже получил блокировку сегмента, текущий поток продолжит вызов в режиме вращения.tryLock()
метод для получения блокировки, приостановки после указанного количества раз и ожидания пробуждения.
3.1.4 операция размера
Вычисление размера элемента ConcurrentHashMap является параллельной операцией, то есть при вычислении размера данные вставляются одновременно, что может привести к тому, что рассчитанный размер будет отличаться от вашего фактического размера.
Решение, принятое ConcurrentHashMap, состоит в том, чтобы сначала попытаться подсчитать размер каждого сегмента, не блокируя сегмент дважды.Если количество изменяется во время статистического процесса, метод блокировки используется для подсчета размера всех сегментов.
try {
for (; ; ) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K, V> seg = segmentAt(segments, j);
if (seg != null) {
/* 在put、remove、clean方法里操作
* 元素都会将变量modCount进行加一,
* 统计也是依靠这个变量的前后变化来进行的 */
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0) overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
3.2 Реализация в JDK 1.8
3.2.1 Основные атрибуты и концепции
посмотриОсновные свойства:
//node数组最大容量:2^30=1073741824
private static final int MAXIMUM_CAPACITY = 1 << 30;
//默认初始值,必须是2的幂数
private static final int DEFAULT_CAPACITY = 16;
//数组可能最大值,需要与toArray()相关方法关联
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//并发级别,遗留下来的,为兼容以前的版本
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//负载因子
private static final float LOAD_FACTOR = 0.75f;
//链表转红黑树阀值,> 8 链表转换为红黑树
static final int TREEIFY_THRESHOLD = 8;
//树转链表阀值,小于等于6(tranfer时,lc、hc=0两个计数器分别++记录原bin、新binTreeNode数量,<=UNTREEIFY_THRESHOLD 则untreeify(lo))
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
private static final int MIN_TRANSFER_STRIDE = 16;
private static int RESIZE_STAMP_BITS = 16;
//2^15-1,help resize的最大线程数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
//32-16=16,sizeCtl中记录size大小的偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
//forwarding nodes的hash值
static final int MOVED = -1;
//树根节点的hash值
static final int TREEBIN = -2;
//ReservationNode的hash值
static final int RESERVED = -3;
//可用处理器数量
static final int NCPU = Runtime.getRuntime().availableProcessors();
//存放node的数组
transient volatile Node<K,V>[] table;
/*控制标识符,用来控制table的初始化和扩容的操作,不同的值有不同的含义
*当为负数时:-1代表正在初始化,-N代表有N-1个线程正在 进行扩容
*当为0时:代表当时的table还没有被初始化
*当为正数时:表示初始化或者下一次进行扩容的大小
*/
private transient volatile int sizeCtl;
Важные понятия:
- table: значение по умолчанию равно нулю. Инициализация происходит при первой операции вставки. Размер по умолчанию — это массив из 16, который используется для хранения данных узла Node. При расширении размер всегда равен степени 2.
- nextTable: значение по умолчанию равно нулю, вновь сгенерированный массив во время расширения в два раза превышает размер исходного массива.
- Node: Структура данных, которая хранит ключ, значение и значение хеширования ключа.
class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
//省略部分代码
}
где используются оба значения и nextvolatile
Украшено для обеспечения одновременной видимости.
- ForwardingNode: специальный узел с хеш-значением -1, в котором хранится ссылка на nextTable.
final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
}
ForwardingNode будет играть роль только при расширении таблицы.Он помещается в таблицу в качестве заполнителя, чтобы указать, что текущий узел является нулевым или был перемещен.
- Класс TreeNode и класс TreeBin: Класс TreeNode представляет каждый узел красно-черного дерева. Когда количество узлов в связанном списке превышает указанное значение, связанный список будет превращен в красно-черное дерево, и, конечно, каждый узел будет преобразован в TreeNode. В отличие от HashMap, ConcurrentHashMap напрямую хранит в ведре не TreeNode, а TreeBin, который поддерживает красно-черное дерево внутри TreeBin, а это значит, что TreeNode используется внутри TreeBin.
3.2.2 Инициализация
При создании экземпляра ConcurrentHashMap с параметрами размер таблицы будет скорректирован в соответствии с параметрами.Предполагая, что параметр равен 100, он в конечном итоге будет скорректирован до 256, гарантируя, что размер таблицы всегда будет степенью двойки.
инициализация таблицы
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//如果一个线程发现sizeCtl<0,意味着另外的线程执行CAS操作成功,当前线程只需要让出cpu时间片
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
3.2.3 операция ввода
Предполагая, что таблица была инициализирована, операция размещения использует CAS + synchronized для реализации параллельных операций вставки или обновления.
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
...省略部分代码
}
addCount(1L, binCount);
return null;
}
хеш-алгоритм
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
Найдите позицию индекса в таблице, n — размер таблицы.
int index = (n - 1) & hash
Получить элемент f соответствующего индекса в таблице
Unsafe.getObjectVolatile может напрямую получать данные из указанной памяти, гарантируя, что данные будут самыми последними при каждом их получении.
Если f равно null, это означает, что элемент впервые вставляется в эту позицию в таблице, а узел Node вставляется с помощью метода Unsafe.compareAndSwapObject.
Если CAS прошел успешно, узел Node был вставлен, а затем метод addCount(1L, binCount) проверит, нужно ли расширить текущую емкость.
Если CAS дает сбой, это означает, что другой поток вставил узел раньше времени, и спин повторяет попытку вставить узел в эту позицию.
Если хэш-значение f равно -1, это означает, что текущий f является узлом ForwardingNode, что означает, что другие потоки расширяются, и операция расширения будет выполняться вместе.
В других случаях новый узел Node вставляется в соответствующую позицию в виде связанного списка или красно-черного дерева.Этот процесс использует синхронные встроенные блокировки для достижения параллелизма.Код выглядит следующим образом:
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
Синхронизируйте на узле f, прежде чем узел будет вставлен, снова используяtabAt(tab, i) == f
Решение для предотвращения модификации другими потоками.
Если f.hash >= 0, это означает, что f является головным узлом структуры связанного списка, пройдите по связанному списку, если соответствующий узел узла найден, измените значение, в противном случае добавьте узел в конец связанного списка. . Если f является узлом типа TreeBin, указывающим, что f является корневым узлом красно-черного дерева, то необходимо пройтись по элементам древовидной структуры, обновить или добавить узлы. Если количество узлов в связанном списке binCount >= TREEIFY_THRESHOLD (по умолчанию 8), то преобразовать связанный список в красно-черную древовидную структуру.
расширение таблицы
Когда емкости таблицы недостаточно, то есть количество элементов в таблице достигает порога емкостиsizeCtl
, таблицу нужно расширить.
Все расширение разделено на две части:
Создайте NEXTTABLE с удвоенным размером таблицы. Скопируйте данные таблицы в NextTable.
Эти два процесса очень просто реализовать в одном потоке, но ConcurrentHashMap поддерживает одновременную вставку, и операция расширения, естественно, будет происходить одновременно.В этом случае второй шаг может поддерживать параллельную репликацию узлов, так что производительность естественным образом улучшится. Меньше, но и сложность реализации выросла на уровень.
Посмотрите на первый шаг,построить следующую таблицу, нет никаких сомнений в том, что только один поток может инициализировать nextTable в этом процессе Конкретная реализация выглядит следующим образом:
private final void addCount(long x, int check) {
... 省略部分代码
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
Измените значение sizeCtl с помощью Unsafe.compareAndSwapInt, чтобы убедиться, что только один поток может инициализировать nextTable.Длина развернутого массива в два раза больше исходной, но емкость равна 1,5.
Узел перемещается из таблицы к Taillttable, общая идея - это процесс прохождения и копирования.
Во-первых, на основе расчета, необходимого для получения количества проходов i, затем с помощью метода tabAt для получения позиции элемента F i инициализируйте пример forwardNode fwd.
Если f == null, поместите fwd в позицию i в таблице Этот процесс реализуется методом Unsafe.compareAndSwapObjectf, который ловко реализует параллельное перемещение узлов.
Если f является головным узлом связанного списка, создайте перевернутый связанный список, поместите их в позиции i и i+n nextTable соответственно, и перемещение будет завершено.Используйте метод Unsafe.putObjectVolatile, чтобы присвоить fwd исходной позиции. стола. Если f является узлом TreeBin, также выполните обработку в обратном порядке и решите, требуется ли untreeify, поместите результаты обработки в позиции i и i+n nextTable соответственно, и перемещение будет завершено, а также используйте метод Unsafe. putObjectVolatile, чтобы указать исходное положение таблицы. Назначьте fwd. После обхода всех узлов работа копирования завершается, таблица указывает на nextTable, а sizeCtl обновляется до 0,75-кратного размера нового массива, и расширение завершается.
красно-черная структура дерева
Примечание. Если элементы в структуре связанного списка превышают пороговое значение TREEIFY_THRESHOLD, по умолчанию8
, то связанный список преобразуется в красно-черное дерево для повышения эффективности обхода и запросов.
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
Далее давайте посмотрим, как построить древовидную структуру, код выглядит следующим образом:
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
Видно, что кодовый блок узла связующего дерева синхронизирован.После ввода синхронизированного кодового блока проверьте еще раз, был ли изменен элемент в позиции индекса в таблице.
- В соответствии со связанным списком узлов в позиции индекса в таблице повторно сгенерируйте связанный список TreeNode с hd в качестве головного узла.
- В соответствии с головным узлом hd создается древовидная структура TreeBin, а корневой узел древовидной структуры записывается в память в позиции индекса таблицы Конкретная реализация выглядит следующим образом:
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null);
this.first = b;
TreeNode<K,V> r = null;
for (TreeNode<K,V> x = b, next; x != null; x = next) {
next = (TreeNode<K,V>)x.next;
x.left = x.right = null;
if (r == null) {
x.parent = null;
x.red = false;
r = x;
}
else {
K k = x.key;
int h = x.hash;
Class<?> kc = null;
for (TreeNode<K,V> p = r;;) {
int dir, ph;
K pk = p.key;
if ((ph = p.hash) > h)
dir = -1;
else if (ph < h)
dir = 1;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0)
dir = tieBreakOrder(k, pk);
TreeNode<K,V> xp = p;
if ((p = (dir <= 0) ? p.left : p.right) == null) {
x.parent = xp;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
r = balanceInsertion(r, x);
break;
}
}
}
}
this.root = r;
assert checkInvariants(root);
}
Двоичное дерево в основном строится в соответствии с хеш-значением узла Node.
3.2.4 получить операцию
Операция get намного проще, чем операция put.
public V get(Object key) {
Node<K,V>[] tab;
Node<K,V> e, p;
int n, eh;
K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
- Определение того, пуста ли таблица, если она пуста, напрямую возвращает значение null.
- Вычислите хеш-значение ключа и получите узел Node в указанной позиции в указанной таблице, найдите соответствующий узел, пройдя по связанному списку или древовидной структуре, и верните значение значения.
3.2.4 операция размера
Размер JDK1.8 рассчитывается CAS по baseCount и counterCell, и, наконец, размер получается по baseCount и обходу массива CounterCell. Конкретная ссылка:Принципиальный анализ метода размера ConcurrentHashMap
4. Почему в JDK 1.8 следует отказаться от блокировок сегментов
Многие люди не понимают, почему Дуг Ли сделал такое большое изменение в JDK1.8.При использовании синхронизации с блокировкой тяжелого уровня производительность выше.Причины следующие:
- Детализация блокировок в jdk1.8 лучше.. ConcurrentLevel (одновременное число) ConcurrentHashMap в jdk1.7 в основном фиксирован. ConcurrentLevel в jdk1.8 соответствует размеру массива, и каждый раз, когда емкость увеличивается, параллелизм удваивается.
- Интродукция красно-черных деревьев, оптимизация связанного списка делает размещение и получение более эффективными при возникновении конфликта хэшей
- Получить поддержку JVM, ReentrantLock все-таки находится на уровне API, и места для последующей оптимизации производительности мало. Synchronized напрямую поддерживается JVM, и JVM может выполнять соответствующие меры по оптимизации во время выполнения: огрубление блокировки, устранение блокировки, прокручивание блокировки и так далее. Это позволяет синхронизироваться для достижения повышения производительности при обновлении версии JDK без изменения кода.
5. Резюме и ссылки
резюме
Видно, что структура данных ConcurrentHashMap версии JDK1.8 близка к HashMap, условно говоря, ConcurrentHashMap лишь добавляет операции синхронизации для контроля параллелизма.ReentrantLock+Segment+HashEntry
, до версии JDK1.8synchronized+CAS+HashEntry+红黑树
, оптимизация действительно отличная.
использованная литература
- Проблема с бесконечным циклом, вызванная многопоточным расширением в HashMap 1.8
- Основной принцип ConcurrentHashMap
- Глубокое объяснение ConcurrenthashMap1.8
- Принципиальный анализ метода размера ConcurrentHashMap
- Искусство параллельного программирования на Java
- Производительность ConcurrentHashMap выше, чем у jdk1.7 в jdk1.8.