HashMap — это контейнер для хранения K-V, который часто используется в работе. В многопоточной среде использование HashMap небезопасно и может привести к различным нежелательным результатам.
Относительно потокобезопасности HashMap, пожалуйста, обратитесь к другой статье автора:Углубленная интерпретация вопросов безопасности потоков HashMap.
В ответ на проблему, что HashMap небезопасен в многопоточной среде, автор HashMap считает, что это не ошибка, а следует использовать потокобезопасный HashMap.
В настоящее время есть несколько способов получить потокобезопасный HashMap:
- Collections.synchronizedMap
- HashTable
- ConcurrentHashMap
Среди них первые два метода имеют серьезные проблемы с производительностью из-за проблемы глобальных блокировок. Поэтому известный гуру параллельного программирования Дуг Ли добавил множество параллельных инструментов в пакет java.util.concurrent JDK1.5. Среди них — потокобезопасный HashMap ConcurrentHashMap.
В этой статье будет кратко представлен принцип реализации ConcurrentHashMap.
PS: на основе JDK8
0 Обзор ConcurrentHashMap в JDK7
Существует большая разница в том, как ConcurrentHashMap реализован в JDK7 и JDK8. Прежде всего, давайте кратко рассмотрим принцип ConcurrentHashMap в JDK7.
0.1 Сегментная технология блокировки
В ответ на проблему, связанную с блокировкой HashTable всей хеш-таблицы, ConcurrentHashMap предлагает решение блокировки сегментации.
Идея сегментной блокировки такова: при блокировке блокируется не вся хеш-таблица, а блокируется только ее часть.
Как этого добиться? При этом используется наиболее важный сегмент в ConcurrentHashMap.
Массив сегментов поддерживается в ConcurrentHashMap, и каждый сегмент можно рассматривать как HashMap.
А сам Segment наследует ReentrantLock, который сам является замком.
Сегмент поддерживает свою внутреннюю хеш-таблицу через массив HashEntry.
Каждый HashEntry представляет K-V на карте.HashEntry можно использовать для формирования структуры связанного списка и обращения к ее следующему элементу через следующее поле.
Вышеупомянутое содержимое представлено в исходном коде следующим образом:
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable {
// ... 省略 ...
/**
* The segments, each of which is a specialized hash table.
*/
final Segment<K,V>[] segments;
// ... 省略 ...
/**
* Segment是ConcurrentHashMap的静态内部类
*
* Segments are specialized versions of hash tables. This
* subclasses from ReentrantLock opportunistically, just to
* simplify some locking and avoid separate construction.
*/
static final class Segment<K,V> extends ReentrantLock implements Serializable {
// ... 省略 ...
/**
* The per-segment table. Elements are accessed via
* entryAt/setEntryAt providing volatile semantics.
*/
transient volatile HashEntry<K,V>[] table;
// ... 省略 ...
}
// ... 省略 ...
/**
* ConcurrentHashMap list entry. Note that this is never exported
* out as a user-visible Map.Entry.
*/
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
// ... 省略 ...
}
}
Поэтому в JDK7 общую структуру ConcurrentHashMap можно описать следующим рисунком.
Как видно из приведенного выше рисунка, пока наше хеш-значение достаточно рассредоточено, каждый раз, когда мы его помещаем, оно будет помещено в другой сегмент. Сам сегмент является замком. При установке текущий сегмент сам себя заблокирует. В это время другие потоки не могут управлять сегментом. Но это не повлияет на работу других сегментов. Это преимущество блокировки сегментации.
0.2 Безопасный поток
Исходный код метода put ConcurrentHashMap выглядит следующим образом:
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
// 根据key的hash定位出一个segment,如果指定index的segment还没初始化,则调用ensureSegment方法初始化
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
// 调用segment的put方法
return s.put(key, hash, value, false);
}
Наконец, будет вызван метод put сегмента, чтобы поместить элементы в массив HashEntry.Здесь в комментариях приведены только инструкции, связанные с блокировкой.
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 因为segment本身就是一个锁
// 这里调用tryLock尝试获取锁
// 如果获取成功,那么其他线程都无法再修改这个segment
// 如果获取失败,会调用scanAndLockForPut方法根据key和hash尝试找到这个node,如果不存在,则创建一个node并返回,如果存在则返回null
// 查看scanAndLockForPut源码会发现他在查找的过程中会尝试获取锁,在多核CPU环境下,会尝试64次tryLock(),如果64次还没获取到,会直接调用lock()
// 也就是说这一步一定会获取到锁
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
// 扩容
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
// 释放锁
unlock();
}
return oldValue;
}
0.3 Поточно-безопасное расширение (Rehash)
Большинство проблем с потокобезопасностью HashMap связаны с процессом повторного хеширования.
Расширение ConcurrentHashMap предназначено только для массива HashEntry в каждом сегменте.
Как видно из исходного кода put выше, ConcurrentHashMap блокируется во время перехеширования, поэтому в процессе перехеширования другие потоки не могут работать с хеш-таблицей сегмента, что обеспечивает потокобезопасность.
1 Инициализация ConcurrentHashMap в JDK8
Возьмите в качестве примера конструктор без параметров, чтобы увидеть, что делает класс ConcurrentHashMap при его инициализации.
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
Сначала выполняется блок статического кода и инициализируются переменные класса. В основном инициализируются следующие переменные класса:
// Unsafe mechanics
private static final sun.misc.Unsafe U;
private static final long SIZECTL;
private static final long TRANSFERINDEX;
private static final long BASECOUNT;
private static final long CELLSBUSY;
private static final long CELLVALUE;
private static final long ABASE;
private static final int ASHIFT;
static {
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ConcurrentHashMap.class;
SIZECTL = U.objectFieldOffset
(k.getDeclaredField("sizeCtl"));
TRANSFERINDEX = U.objectFieldOffset
(k.getDeclaredField("transferIndex"));
BASECOUNT = U.objectFieldOffset
(k.getDeclaredField("baseCount"));
CELLSBUSY = U.objectFieldOffset
(k.getDeclaredField("cellsBusy"));
Class<?> ck = CounterCell.class;
CELLVALUE = U.objectFieldOffset
(ck.getDeclaredField("value"));
Class<?> ak = Node[].class;
ABASE = U.arrayBaseOffset(ak);
int scale = U.arrayIndexScale(ak);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
}
Здесь используется класс Unsafe, где метод objectFieldOffset используется для получения смещения указанного поля (например, sizeCtl) в памяти.
Для чего в основном используется полученное смещение? Не беспокойтесь, в приведенном ниже анализе просто изучите его, когда столкнетесь с ним.
PS: Для ознакомления и использования Unsafe вы можете ознакомиться с другой статьей автора.Введение и использование класса Unsafe
2 Внутренняя структура данных
Во-первых, давайте посмотрим, как структура хранения определяется в JDK8 с точки зрения исходного кода.
/**
* The array of bins. Lazily initialized upon first insertion.
* Size is always a power of two. Accessed directly by iterators.
*
* hash表,在第一次put数据的时候才初始化,他的大小总是2的倍数。
*/
transient volatile Node<K,V>[] table;
/**
* 用来存储一个键值对
*
* Key-value entry. This class is never exported out as a
* user-mutable Map.Entry (i.e., one supporting setValue; see
* MapEntry below), but can be used for read-only traversals used
* in bulk tasks. Subclasses of Node with a negative hash field
* are special, and contain null keys and values (but are never
* exported). Otherwise, keys and vals are never null.
*/
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
}
Можно обнаружить, что реализация JDK8 и JDK7 сильно отличается, концепция Segment не используется в JDK8, это больше похоже на реализацию HashMap.
PS: За принципом работы HashMap можно обратиться к другой статье автораПринцип HashMap и внутренняя структура хранилища
Эту структуру можно описать следующим рисунком
3 Инициализация поточно-безопасной хеш-таблицы
Из приведенного выше видно, что ConcurrentHashMap использует таблицу переменных-членов для хранения хэш-таблицы.
Инициализация таблицы использует стратегию ленивой инициализации, которая инициализирует таблицу, когда пут выполняется в первый раз.
Исходный код метода put выглядит следующим образом (временно неактуальный код опущен):
/**
* Maps the specified key to the specified value in this table.
* Neither the key nor the value can be null.
*
* <p>The value can be retrieved by calling the {@code get} method
* with a key that is equal to the original key.
*
* @param key key with which the specified value is to be associated
* @param value value to be associated with the specified key
* @return the previous value associated with {@code key}, or
* {@code null} if there was no mapping for {@code key}
* @throws NullPointerException if the specified key or value is null
*/
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 计算key的hash值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果table是空,初始化之
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 省略...
}
// 省略...
}
Исходный код initTable выглядит следующим образом
/**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// #1
while ((tab = table) == null || tab.length == 0) {
// sizeCtl的默认值是0,所以最先走到这的线程会进入到下面的else if判断中
// #2
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
// 尝试原子性的将指定对象(this)的内存偏移量为SIZECTL的int变量值从sc更新为-1
// 也就是将成员变量sizeCtl的值改为-1
// #3
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 双重检查,原因会在下文分析
// #4
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 默认初始容量为16
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
// #5
table = tab = nt; // 创建hash表,并赋值给成员变量table
sc = n - (n >>> 2);
}
} finally {
// #6
sizeCtl = sc;
}
break;
}
}
return tab;
}
Одна из функций переменной-члена sizeCtl в ConcurrentHashMap эквивалентна порогу в HashMap.Когда количество элементов в хэш-таблице превышает sizeCtl, запускается расширение; Другая его функция аналогична идентификатору, например, когда он равен -1, это означает, что поток уже инициализирует хеш-таблицу, а значение меньше -1 означает, что поток выполняет изменение размера хеш-таблицы. . . .
Этот метод сначала определяет, меньше ли sizeCtl 0, и если меньше 0, напрямую изменяет текущий поток на поток в состоянии готовности.
Когда sizeCtl больше или равен 0, текущий поток попытается изменить значение sizeCtl на -1 через CAS. Поток, который не может измениться, войдет в следующий раунд цикла, решит, что sizeCtl
Перед созданием нового Node[] необходимо еще раз проверить, пуста ли таблица, причина двойной проверки здесь в том, что если другой поток зависает после выполнения кода № 1, другой инициализированный поток закончил выполнение кода № 6. , в это время time sizeCtl имеет значение больше 0, то при переключении обратно на этот поток для выполнения есть возможность повторить инициализацию. Эта проблема будет объяснена в сценарии параллелизма на рисунке ниже.
Затем инициализируйте хэш-таблицу, пересчитайте значение sizeCtl и, наконец, верните инициализированную хеш-таблицу.
На следующем рисунке показаны несколько сценариев параллелизма, которые могут привести к повторной инициализации хэш-таблицы. Мы предполагаем, что Thread2 в конечном итоге успешно инициализирует хэш-таблицу.
- Thread1 имитирует параллельный сценарий, в котором CAS обновляет переменную sizeCtl.
- Thread2 имитирует необходимость перепроверки таблицы
Как видно из приведенного выше рисунка, если Thread1 не выполняет управление параллелизмом при обновлении значения sizeCtl, Thread1 может перейти к шагу new Node[]. В Thread3, если нет двойного решения, Thread3 также перейдет к шагу new Node[].
4 Потокобезопасная установка
операции пут можно разделить на следующие две категории
- Когда в индексе текущей хеш-таблицы нет элемента, соответствующего текущему ключу
- Когда элемент уже существует в индексе текущей хэш-таблицы, соответствующей текущему ключу (коллизия хэшей)
4.1 Когда в хеш-таблице нет элементов
Соответствующий исходный код выглядит следующим образом
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
Метод tabAt получает элементы по соответствующему индексу массива с помощью Unsafe.getObjectVolatile().getObjectVolatile действует на соответствующем смещении памяти и имеет семантику volatile памяти.
Если результат пустой, попробуйте создать новый Node по указанному индексу массива способом cas.
4.2 Когда хеш сталкивается
Соответствующий исходный код выглядит следующим образом
else {
V oldVal = null;
// 锁f是在4.1中通过tabAt方法获取的
// 也就是说,当发生hash碰撞时,会以链表的头结点作为锁
synchronized (f) {
// 这个检查的原因在于:
// tab引用的是成员变量table,table在发生了rehash之后,原来index上的Node可能会变
// 这里就是为了确保在put的过程中,没有收到rehash的影响,指定index上的Node仍然是f
// 如果不是f,那这个锁就没有意义了
if (tabAt(tab, i) == f) {
// 确保put没有发生在扩容的过程中,fh=-1时表示正在扩容
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
// 在链表后面追加元素
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 如果链表长度超过8个,将链表转换为红黑树,与HashMap相同,相对于JDK7来说,优化了查找效率
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
В отличие от концепции сегмента в JDK7, JDK8 напрямую использует головной узел связанного списка в качестве блокировки. В JDK7 HashMap может формировать круговой связанный список в случае многопоточного параллельного размещения. ConcurrentHashMap использует этот метод блокировки, чтобы разрешить только одному потоку одновременно выполнять размещение связанного списка, что решает проблему параллелизма.
5 Поточно-безопасное масштабирование
Последним шагом метода put является подсчет количества элементов в хеш-таблице, если оно превышает значение sizeCtl, срабатывает расширение.
Код расширения немного длиннее, вы можете ознакомиться с китайскими комментариями внутри, а затем обратиться к следующему анализу. На самом деле наша главная цель — понять, как ConcurrentHashMap решает проблему параллелизма HashMap. Просто посмотрите на исходный код с этим вопросом. По поводу проблем с HashMap, пожалуйста, обратитесь к другой статье автора, упомянутой в начале этой статьи.
На самом деле, проблема параллелизма HashMap в основном вызвана параллелизмом размещения и расширения.
Здесь мы рассмотрим, как решается ConcurrentHashMap.
Код, участвующий в расширении, выглядит следующим образом:
/**
* The array of bins. Lazily initialized upon first insertion.
* Size is always a power of two. Accessed directly by iterators.
* 业务中使用的hash表
*/
transient volatile Node<K,V>[] table;
/**
* The next table to use; non-null only while resizing.
* 扩容时才使用的hash表,扩容完成后赋值给table,并将nextTable重置为null。
*/
private transient volatile Node<K,V>[] nextTable;
/**
* Adds to count, and if table is too small and not already
* resizing, initiates transfer. If already resizing, helps
* perform transfer if work is available. Rechecks occupancy
* after a transfer to see if another resize is already needed
* because resizings are lagging additions.
*
* @param x the count to add
* @param check if <0, don't check resize, if <= 1 only check if uncontended
*/
private final void addCount(long x, int check) {
// ----- 计算键值对的个数 start -----
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
// ----- 计算键值对的个数 end -----
// ----- 判断是否需要扩容 start -----
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 当上面计算出来的键值对个数超出sizeCtl时,触发扩容,调用核心方法transfer
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 如果有已经在执行的扩容操作,nextTable是正在扩容中的新的hash表
// 如果并发扩容,transfer直接使用正在扩容的新hash表,保证了不会出现hash表覆盖的情况
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 更新sizeCtl的值,更新成功后为负数,扩容开始
// 此时没有并发扩容的情况,transfer中会new一个新的hash表来扩容
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
// ----- 判断是否需要扩容 end -----
}
/**
* Moves and/or copies the nodes in each bin to new table. See
* above for explanation.
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
// 初始化新的hash表,大小为之前的2倍,并赋值给成员变量nextTable
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 扩容完成时,将成员变量nextTable置为null,并将table替换为rehash后的nextTable
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// 接下来是遍历每个链表,对每个链表的元素进行rehash
// 仍然用头结点作为锁,所以在扩容的时候,无法对这个链表执行put操作
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// setTabAt方法调用了Unsafe.putObjectVolatile来完成hash表元素的替换,具备volatile内存语义
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
По приведенному выше коду дается краткое описание того, как ConcurrentHashMap решает проблему параллелизма HashMap.
- Сначала выходит новая хеш-таблица (nextTable), размером в два раза больше исходного размера. Следующие операции повторного хеширования предназначены для этой новой хэш-таблицы и не затрагивают исходную хеш-таблицу (таблицу).
- Затем выполняется перехеширование каждого связанного списка в исходной хеш-таблице (таблице) и предпринимается попытка получить блокировку головного узла. Этот шаг гарантирует, что операция размещения не может быть выполнена в связанном списке во время процесса повторного хеширования.
- Благодаря элементу управления sizeCtl в процессе расширения не будет создаваться несколько новых хеш-таблиц.
- Наконец, после повторного хэширования всех пар ключ-значение в новую таблицу (nextTable) замените таблицу на nextTable. Это позволяет избежать проблемы получения нулевого значения, когда в HashMap одновременно выполняется получение и расширение.
- В течение всего процесса хранение и чтение общих переменных осуществляется через volatile или CAS, что обеспечивает безопасность потоков.
6 Резюме
В многопоточной среде необходимо соблюдать осторожность при работе с общими переменными. Полностью рассмотреть проблему с точки зрения модели памяти Java.
В ConcurrentHashMap используется большое количество методов класса Unsafe, хотя мы и можем сами получить экземпляры Unsafe, в продакшене делать это не рекомендуется. В большинстве случаев мы можем реализовать это с помощью инструментов, предоставляемых в пакете concurrent, например, пакет Atomic можно использовать для реализации операций CAS, а пакет блокировки — для реализации операций, связанных с блокировкой.
Эффективно используйте потокобезопасные инструменты контейнера, такие как ConcurrentHashMap, CopyOnWriteArrayList, ConcurrentLinkedQueue и т. д., поскольку мы не можем передать это обновление элементам Unsafe array atomic getObjectVolatile и setObjectVolatile, поэтому это очень важные инструменты, работающие одновременно, такие как ConcurrentHashMap.
Добро пожаловать в мой публичный аккаунт WeChat