Полный анализ HashMap и ConcurrentHashMap в Java7/8
Время обновления: 2018-02-24
Сегодня я публикую "гидрологию", многие читатели могут ее не понять, но я хочу представить ее как неотъемлемую часть статьи о параллельной последовательности. Я думал, что это не займет много времени, но в итоге я потратил много времени, чтобы закончить эту статью.
В Интернете действительно есть много статей о HashMap и ConcurrentHashMap, но есть много статей, которых не хватает, поэтому я хочу написать одну и четко объяснить детали, особенно как ConcurrentHashMap в Java8, в большинстве статей говорится, что не ясно . В конце концов, я надеюсь удешевить обучение каждого, и я не хочу, чтобы все находили всевозможные не очень достоверные статьи, и читали одну за другой, но они все равно были расплывчатыми.
Рекомендации по прочтению: четыре раздела в основном можно читать независимо друг от друга.Начинающим рекомендуется читать в порядке Java7 HashMap -> Java7 ConcurrentHashMap -> Java8 HashMap -> Java8 ConcurrentHashMap, что может соответствующим образом снизить порог чтения.
Предварительные требования для прочтения: в этой статье анализируется исходный код, поэтому читатели должны быть знакомы с использованием их интерфейсов. не будут представлены в этой статье. В Java8 используются красно-черные деревья, но эта статья не будет расширяться, заинтересованные читатели должны найти нужную информацию самостоятельно.
Java7 HashMap
HashMap самый простой, во-первых, мы с ним хорошо знакомы, во-вторых, он не поддерживает параллельные операции, поэтому исходный код тоже очень простой.
Во-первых, мы используем следующую картинку, чтобы представить структуру HashMap.
Это только принципиальная схема, так как расширение массива не рассматривается, а подробности будут рассмотрены позже.
В общем направлении внутри есть HashMapмножество, то каждый элемент массива являетсяОдносвязный список.
На приведенном выше рисунке каждая зеленая сущность является экземпляром вложенного класса Entry. Entry содержит четыре свойства: ключ, значение, хэш-значение и следующий для односвязного списка.
емкость: Текущая емкость массива, которая всегда поддерживается на уровне 2^n, может быть увеличена.После расширения размер массива будет в два раза больше текущего размера.
loadFactor: коэффициент нагрузки, по умолчанию 0,75.
порог: Порог для расширения, равный емкости * loadFactor
поставить анализ процесса
Это все еще относительно просто, просто следуйте коду.
public V put(K key, V value) {
// 当插入第一个元素的时候,需要先初始化数组大小
if (table == EMPTY_TABLE) {
inflateTable(threshold);
}
// 如果 key 为 null,感兴趣的可以往里看,最终会将这个 entry 放到 table[0] 中
if (key == null)
return putForNullKey(value);
// 1. 求 key 的 hash 值
int hash = hash(key);
// 2. 找到对应的数组下标
int i = indexFor(hash, table.length);
// 3. 遍历一下对应下标处的链表,看是否有重复的 key 已经存在,
// 如果有,直接覆盖,put 方法返回旧值就结束了
for (Entry<K,V> e = table[i]; e != null; e = e.next) {
Object k;
if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
V oldValue = e.value;
e.value = value;
e.recordAccess(this);
return oldValue;
}
}
modCount++;
// 4. 不存在重复的 key,将此 entry 添加到链表中,细节后面说
addEntry(hash, key, value, i);
return null;
}
инициализация массива
При вставке первого элемента в HashMap сначала выполняется инициализация массива, которая заключается в определении начального размера массива и вычислении порога расширения массива.
private void inflateTable(int toSize) {
// 保证数组大小一定是 2 的 n 次方。
// 比如这样初始化:new HashMap(20),那么处理成初始数组大小是 32
int capacity = roundUpToPowerOf2(toSize);
// 计算扩容阈值:capacity * loadFactor
threshold = (int) Math.min(capacity * loadFactor, MAXIMUM_CAPACITY + 1);
// 算是初始化数组吧
table = new Entry[capacity];
initHashSeedAsNeeded(capacity); //ignore
}
Вот способ сохранить размер массива в n-й степени 2. HashMap и ConcurrentHashMap в Java7 и Java8 имеют соответствующие требования, но реализованный код немного отличается, и вы узнаете, когда увидите его позже.
Вычислить конкретную позицию массива
Это просто, мы можем сделать это сами: использовать хеш-значение ключа по модулю длины массива.
static int indexFor(int hash, int length) {
// assert Integer.bitCount(length) == 1 : "length must be a non-zero power of 2";
return hash & (length-1);
}
Этот метод очень прост, проще говоря, он берет младшие n бит хеш-значения. Например, когда длина массива равна 32, младшие 5 бит хеш-значения ключа фактически берутся в качестве его позиции в индексе в массиве.
добавить узел в связанный список
После нахождения нижнего индекса массива ключ будет оцениваться первым, если повторения нет, новое значение будет помещено в связанный список.заголовок.
void addEntry(int hash, K key, V value, int bucketIndex) {
// 如果当前 HashMap 大小已经达到了阈值,并且新值要插入的数组位置已经有元素了,那么要扩容
if ((size >= threshold) && (null != table[bucketIndex])) {
// 扩容,后面会介绍一下
resize(2 * table.length);
// 扩容以后,重新计算 hash 值
hash = (null != key) ? hash(key) : 0;
// 重新计算扩容后的新的下标
bucketIndex = indexFor(hash, table.length);
}
// 往下看
createEntry(hash, key, value, bucketIndex);
}
// 这个很简单,其实就是将新值放到链表的表头,然后 size++
void createEntry(int hash, K key, V value, int bucketIndex) {
Entry<K,V> e = table[bucketIndex];
table[bucketIndex] = new Entry<>(hash, key, value, e);
size++;
}
Основная логика этого метода заключается в том, чтобы сначала определить, требуется ли расширение, и при необходимости сначала расширить емкость, а затем вставить новые данные в заголовок связанного списка в соответствующую позицию расширяемого массива.
Расширение массива
Ранее мы видели, что при вставке нового значения, еслиТекущий размер достиг порогового значения, и в позиции массива, который нужно вставить, уже есть элемент., то расширение сработает, после расширения размер массива удвоится.
void resize(int newCapacity) {
Entry[] oldTable = table;
int oldCapacity = oldTable.length;
if (oldCapacity == MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return;
}
// 新的数组
Entry[] newTable = new Entry[newCapacity];
// 将原来数组中的值迁移到新的更大的数组中
transfer(newTable, initHashSeedAsNeeded(newCapacity));
table = newTable;
threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);
}
Расширение заключается в замене исходного небольшого массива новым большим массивом и переносе значений исходного массива в новый массив.
Из-за двойного расширения в процессе миграции все узлы связанного списка в исходной таблице[i] будут разбиты на позиции newTable[i] и newTable[i + oldLength] нового массива. Если исходная длина массива равна 16, то после расширения все элементы в исходном связанном списке в таблице [0] будут размещены в двух позициях newTable[0] и newTable[16] в новом массиве. Код относительно прост и не будет здесь расширяться.
получить анализ процесса
По сравнению с процессом размещения процесс получения очень прост.
- Вычислить хеш-значение на основе ключа.
- Найдите соответствующий индекс массива: hash & (длина - 1).
- Просматривайте связанный список в этой позиции массива, пока не будет найден ключ, равный (== или равный).
public V get(Object key) {
// 之前说过,key 为 null 的话,会被放到 table[0],所以只要遍历下 table[0] 处的链表就可以了
if (key == null)
return getForNullKey();
//
Entry<K,V> entry = getEntry(key);
return null == entry ? null : entry.getValue();
}
getEntry(key):
final Entry<K,V> getEntry(Object key) {
if (size == 0) {
return null;
}
int hash = (key == null) ? 0 : hash(key);
// 确定数组下标,然后从头开始遍历链表,直到找到为止
for (Entry<K,V> e = table[indexFor(hash, table.length)];
e != null;
e = e.next) {
Object k;
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
return e;
}
return null;
}
Java7 ConcurrentHashMap
ConcurrentHashMap и HashMap имеют схожие идеи, но поскольку они поддерживают параллельные операции, они немного сложнее.
Весь ConcurrentHashMap состоит из сегментов, а «сегмент» означает «часть» или «сегмент», поэтому во многих местах он будет описан какблокировка сегмента. Обратите внимание, что в тексте я использовал "канавка" для представления сегмента.
Простое понимание состоит в том, что ConcurrentHashMap — это массив сегментов, а сегменты заблокированы путем наследования ReentrantLock, поэтому каждая операция, которую необходимо заблокировать, блокирует сегмент, поэтому, пока каждый сегмент гарантированно является потокобезопасным, глобальная потокобезопасность .
concurrencyLevel: Параллельный уровень, параллелизм, номер сегмента, как перевести не важно, разбирайтесь. Значение по умолчанию — 16, что означает, что ConcurrentHashMap имеет 16 сегментов, поэтому теоретически в настоящее время одновременно могут писать до 16 потоков, если их операции распределены по разным сегментам. Этому значению можно присвоить другие значения во время инициализации, но после инициализации его нельзя расширить.
Чтобы быть конкретным внутри каждого сегмента, на самом деле каждый сегмент очень похож на HashMap, представленный ранее, но он должен обеспечивать безопасность потоков, поэтому его сложнее обрабатывать.
инициализация
InitialCapacity: начальная емкость, это значение относится к начальной емкости всего ConcurrentHashMap, которая должна быть равномерно распределена между каждым сегментом в реальной работе.
loadFactor: коэффициент загрузки Как мы уже говорили ранее, массив сегментов не может быть расширен, поэтому этот коэффициент загрузки используется внутри каждого сегмента.
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
// 计算并行级别 ssize,因为要保持并行级别是 2 的 n 次方
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
// 我们这里先不要那么烧脑,用默认值,concurrencyLevel 为 16,sshift 为 4
// 那么计算出 segmentShift 为 28,segmentMask 为 15,后面会用到这两个值
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
// initialCapacity 是设置整个 map 初始的大小,
// 这里根据 initialCapacity 计算 Segment 数组中每个位置可以分到的大小
// 如 initialCapacity 为 64,那么每个 Segment 或称之为"槽"可以分到 4 个
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
// 默认 MIN_SEGMENT_TABLE_CAPACITY 是 2,这个值也是有讲究的,因为这样的话,对于具体的槽上,
// 插入一个元素不至于扩容,插入第二个的时候才会扩容
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// 创建 Segment 数组,
// 并创建数组的第一个元素 segment[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
// 往数组写入 segment[0]
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
Инициализация завершена, и мы получаем массив сегментов.
Мы просто используем новый конструктор без аргументов ConcurrentHashMap() для инициализации, а затем после завершения инициализации:
- Длина массива сегментов равна 16 и не может быть расширена
- Размер Segment[i] по умолчанию равен 2, коэффициент загрузки равен 0,75, а начальный порог равен 1,5, то есть вставка первого элемента в будущем не вызовет раскрытие, а вставка второго элемента выполнит первое раскрытие. .
- Сегмент [0] инициализируется здесь, а другие позиции по-прежнему равны нулю.Что касается того, почему инициализируется сегмент [0], следующий код представит
- Текущее значение segmentShift 32 - 4 = 28, а segmentMask 16 - 1 = 15. Давайте просто переведем их какколичество смена такжемаска, эти два значения будут использованы сразу
поставить анализ процесса
Давайте посмотрим на основной процесс размещения первыми. Некоторые ключевые детали мы подробно рассмотрим позже.
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
// 1. 计算 key 的 hash 值
int hash = hash(key);
// 2. 根据 hash 值找到 Segment 数组中的位置 j
// hash 是 32 位,无符号右移 segmentShift(28) 位,剩下低 4 位,
// 然后和 segmentMask(15) 做一次与操作,也就是说 j 是 hash 值的最后 4 位,也就是槽的数组下标
int j = (hash >>> segmentShift) & segmentMask;
// 刚刚说了,初始化的时候初始化了 segment[0],但是其他位置还是 null,
// ensureSegment(j) 对 segment[j] 进行初始化
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
// 3. 插入新值到 槽 s 中
return s.put(key, hash, value, false);
}
Первый слой скина очень прост, по значению хеша можно быстро найти соответствующий Сегмент, а затем выполнить операцию put внутри Сегмента.
Сегмент внутренне определяетсямассив + связанный списоксостоит из.
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 在往该 segment 写入前,需要先获取该 segment 的独占锁
// 先看主流程,后面还会具体介绍这部分内容
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
// 这个是 segment 内部的数组
HashEntry<K,V>[] tab = table;
// 再利用 hash 值,求应该放置的数组下标
int index = (tab.length - 1) & hash;
// first 是数组该位置处的链表的表头
HashEntry<K,V> first = entryAt(tab, index);
// 下面这串 for 循环虽然很长,不过也很好理解,想想该位置没有任何元素和已经存在一个链表这两种情况
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
// 覆盖旧值
e.value = value;
++modCount;
}
break;
}
// 继续顺着链表走
e = e.next;
}
else {
// node 到底是不是 null,这个要看获取锁的过程,不过和这里都没有关系。
// 如果不为 null,那就直接将它设置为链表表头;如果是null,初始化并设置为链表表头。
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
// 如果超过了该 segment 的阈值,这个 segment 需要扩容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node); // 扩容后面也会具体分析
else
// 没有达到阈值,将 node 放到数组 tab 的 index 位置,
// 其实就是将新的节点设置成原链表的表头
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
// 解锁
unlock();
}
return oldValue;
}
В целом процесс относительно прост, благодаря защите эксклюзивных блокировок операции внутри сегмента не усложняются. Что касается проблемы параллелизма здесь, мы представим ее позже.
На этом операция put завершена, теперь поговорим о некоторых ключевых операциях.
Слот инициализации: sureSegment
Сегмент первого слота [0] инициализируется при инициализации ConcurrentHashMap, а для других слотов он инициализируется при вставке первого значения.
Здесь необходимо учитывать параллелизм, потому что вполне вероятно, что несколько потоков войдут одновременно для инициализации одного и того же сегмента слота [k], но до тех пор, пока один из них будет успешным.
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
// 这里看到为什么之前要初始化 segment[0] 了,
// 使用当前 segment[0] 处的数组长度和负载因子来初始化 segment[k]
// 为什么要用“当前”,因为 segment[0] 可能早就扩容过了
Segment<K,V> proto = ss[0];
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
// 初始化 segment[k] 内部的数组
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // 再次检查一遍该槽是否被其他线程初始化了。
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
// 使用 while 循环,内部用 CAS,当前线程成功设值或其他线程成功设值后,退出
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
В целом, методsureSegment(int k) относительно прост, и CAS используется для управления одновременными операциями.
Я не понимаю, почему здесь цикл while.Означает ли сбой CAS, что другие потоки добились успеха?Почему мы должны судить снова?
Получить блокировку записи: scanAndLockForPut
Как мы видели ранее, при помещении в сегмент сначала вызовите node = tryLock() ? null : scanAndLockForPut(key, hash, value), то есть сначала выполните tryLock(), чтобы быстро получить монопольное владение сегментом. lock, если это не удается, введите метод scanAndLockForPut, чтобы получить блокировку.
Разберем, как управлять блокировкой в этом методе.
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
// 循环获取锁
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
// 进到这里说明数组该位置的链表是空的,没有任何元素
// 当然,进到这里的另一个原因是 tryLock() 失败,所以该槽存在并发,不一定是该位置
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
// 顺着链表往下走
e = e.next;
}
// 重试次数如果超过 MAX_SCAN_RETRIES(单核1多核64),那么不抢了,进入到阻塞队列等待锁
// lock() 是阻塞方法,直到获取锁后返回
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
// 这个时候是有大问题了,那就是有新的元素进到了链表,成为了新的表头
// 所以这边的策略是,相当于重新走一遍这个 scanAndLockForPut 方法
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
Этот метод имеет два выхода, один из которых состоит в том, что tryLock() завершается успешно, цикл завершается, другой заключается в том, что количество повторных попыток превышает MAX_SCAN_RETRIES, и вводится метод lock().Этот метод будет блокироваться и ждать, пока не будет эксклюзивной блокировки успешно получен.
Этот метод кажется сложным, но на самом деле он делает одну вещь:Получите эксклюзивную блокировку сегмента, и, кстати, создайте экземпляр узла, если это необходимо.
Расширение: перефразировать
Повторяю, массив сегментов не может быть расширен, расширение представляет собой массив HashEntry\ в определенной позиции массива сегментов [] Расширить вместимость.После расширения вместимость будет удвоена.
Прежде всего, мы должны просмотреть место, где запускается расширение.При размещении, если судят, что вставка значения приведет к тому, что количество элементов в сегменте превысит пороговое значение, то сначала выполняется расширение, а затем выполняется интерполяция.В это время читатель может вернуться к методу put, чтобы посмотреть.
Этот метод не требует учета параллелизма, поскольку в этот момент удерживается монопольная блокировка сегмента.
// 方法参数上的 node 是这次扩容后,需要添加到新的数组中的数据。
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
// 2 倍
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
// 创建新数组
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
// 新的掩码,如从 16 扩容到 32,那么 sizeMask 为 31,对应二进制 ‘000...00011111’
int sizeMask = newCapacity - 1;
// 遍历原数组,老套路,将原数组位置 i 处的链表拆分到 新数组位置 i 和 i+oldCap 两个位置
for (int i = 0; i < oldCapacity ; i++) {
// e 是链表的第一个元素
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
// 计算应该放置在新数组中的位置,
// 假设原数组长度为 16,e 在 oldTable[3] 处,那么 idx 只可能是 3 或者是 3 + 16 = 19
int idx = e.hash & sizeMask;
if (next == null) // 该位置处只有一个元素,那比较好办
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
// e 是链表表头
HashEntry<K,V> lastRun = e;
// idx 是当前链表的头结点 e 的新位置
int lastIdx = idx;
// 下面这个 for 循环会找到一个 lastRun 节点,这个节点之后的所有元素是将要放到一起的
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
// 将 lastRun 及其之后的所有节点组成的这个链表放到 lastIdx 这个位置
newTable[lastIdx] = lastRun;
// 下面的操作是处理 lastRun 之前的节点,
// 这些节点可能分配在另一个链表中,也可能分配到上面的那个链表中
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
// 将新来的 node 放到新数组中刚刚的 两个链表之一 的 头部
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
Расширение здесь немного сложнее, чем в предыдущем HashMap, и код немного сложнее для понимания. Есть два цикла for рядом друг с другом, для чего нужен первый?
При ближайшем рассмотрении видно, что если нет первого цикла for, он может работать, но после этого цикла for, если за lastRun есть еще узлы, то это время того стоит. Поскольку нам нужно только клонировать узел перед lastRun, а следующая строка узлов просто следует за lastRun, ничего делать не нужно.
Я думаю, что идея Дуга Ли также очень интересна, но в худшем случае каждый раз, когда lastRun является последним элементом связанного списка или очень поздним элементом, этот обход становится немного расточительным.Однако Дуг Ли также сказал, что согласно статистике, если используется порог по умолчанию, необходимо клонировать только около 1/6 узлов..
получить анализ процесса
По сравнению с put, get действительно не так уж и прост.
- Вычислите хеш-значение и найдите конкретную позицию в массиве сегментов или «слот», который мы использовали ранее.
- Слот также является массивом, найдите конкретную позицию в массиве по хэшу
- Вот связанный список, вы можете искать по связанному списку
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
// 1. hash 值
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
// 2. 根据 hash 找到对应的 segment
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
// 3. 找到segment 内部数组相应位置的链表,遍历
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
Анализ проблем параллелизма
Теперь, когда мы закончили процесс размещения и процесс получения, мы видим, что в процессе получения нет блокировки, поэтому, естественно, нам необходимо рассмотреть вопросы параллелизма.
Операция добавления узла и операция удаления узла должны добавить эксклюзивную блокировку на сегмент, поэтому между ними не будет проблем.
-
Потокобезопасность операций put.
- Инициализация слота, о которой мы говорили ранее, использует CAS для инициализации массива в сегменте.
- Операция добавления узла в связанный список вставляется в заголовок, поэтому, если операция get находится в середине процесса обхода связанного списка в это время, она не повлияет на него. Конечно, еще одна проблема параллелизма заключается в том, что операция получения должна гарантировать, что узел, только что вставленный в заголовок, будет прочитан после операции размещения, что зависит от UNSAFE.putOrderedObject, используемого в методе setEntryAt.
- Расширение. Расширение состоит в том, чтобы заново создать массив, затем перенести данные и, наконец, установить newTable в таблицу атрибутов. Поэтому, если в это время также выполняется операция get, это не имеет значения.Если сначала выполняется операция get, выполняется операция запроса к старой таблице, а сначала выполняется операция put, гарантия видимости put заключается в том, что в таблице используется ключевое слово volatile.
-
Потокобезопасность операций удаления.
Мы не анализировали исходный код операции удаления, поэтому, если упомянутые здесь читатели заинтересованы, им все равно нужно быть реалистом в исходном коде.
Операция get должна пройти по списку, но операция удаления «уничтожает» список.
Если операция получения узла, сломанная удалением, прошла, то здесь нет проблемы.
Если remove сначала уничтожает узел, следует рассмотреть два случая. 1. Если этот узел является головным узлом, то необходимо установить следующим из головного узла элемент в этой позиции массива.Хотя таблица оформлена volatile, volatile не дает гарантий видимости для внутренних операций массив, поэтому исходный код использует Для управления массивами без UNSAFE см. метод setEntryAt. 2. Если удаляемый узел не является головным узлом, он соединит узел-преемник удаляемого узла с узлом-предшественником.Гарантия параллелизма здесь заключается в том, что следующий атрибут является изменчивым.
Java8 HashMap
Java8 внесла некоторые изменения в HashMap, самое большое отличие заключается в использовании красно-черных деревьев, поэтому он состоит изМассив + связанный список + красное черное деревосочинение.
Согласно представлению Java7 HashMap, мы знаем, что при поиске мы можем быстро найти конкретный индекс массива в соответствии с хеш-значением, но после этого нам нужно сравнить связанный список один за другим, чтобы найти то, что нам нужно. Временная сложность зависит от длины связанного списка.O(n).
Чтобы уменьшить накладные расходы на эту часть, в Java8, когда количество элементов в связанном списке превышает 8, связанный список будет преобразован в красно-черное дерево, при поиске в этих местах можно уменьшить временную сложность кO(logN).
Вот простая схема для иллюстрации:
Обратите внимание, что приведенное выше изображение представляет собой схематическую диаграмму, в основном для описания структуры, и не достигнет этого состояния, потому что когда данных так много, они уже были расширены.
Теперь давайте познакомимся с кодом.Лично исходный код Java8 менее удобочитаем, но он более оптимизирован.
Entry используется в Java7 для представления узла данных в каждом HashMap, а в Java8Node, разницы в принципе нет, это все четыре атрибута key, value, hash и next.Однако Node можно использовать только в случае связанных списков, а в случае красно-черных деревьев нужно использоватьTreeNode.
Мы оцениваем, является ли позиция связанным списком или красно-черным деревом, в зависимости от того, является ли тип данных первого узла в элементе массива Node или TreeNode.
поставить анализ процесса
public V put(K key, V value) {
return putVal(hash(key), key, value, false, true);
}
// 第三个参数 onlyIfAbsent 如果是 true,那么只有在不存在该 key 时才会进行 put 操作
// 第四个参数 evict 我们这里不关心
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
Node<K,V>[] tab; Node<K,V> p; int n, i;
// 第一次 put 值的时候,会触发下面的 resize(),类似 java7 的第一次 put 也要初始化数组长度
// 第一次 resize 和后续的扩容有些不一样,因为这次是数组从 null 初始化到默认的 16 或自定义的初始容量
if ((tab = table) == null || (n = tab.length) == 0)
n = (tab = resize()).length;
// 找到具体的数组下标,如果此位置没有值,那么直接初始化一下 Node 并放置在这个位置就可以了
if ((p = tab[i = (n - 1) & hash]) == null)
tab[i] = newNode(hash, key, value, null);
else {// 数组该位置有数据
Node<K,V> e; K k;
// 首先,判断该位置的第一个数据和我们要插入的数据,key 是不是"相等",如果是,取出这个节点
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
e = p;
// 如果该节点是代表红黑树的节点,调用红黑树的插值方法,本文不展开说红黑树
else if (p instanceof TreeNode)
e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
else {
// 到这里,说明数组该位置上是一个链表
for (int binCount = 0; ; ++binCount) {
// 插入到链表的最后面(Java7 是插入到链表的最前面)
if ((e = p.next) == null) {
p.next = newNode(hash, key, value, null);
// TREEIFY_THRESHOLD 为 8,所以,如果新插入的值是链表中的第 9 个
// 会触发下面的 treeifyBin,也就是将链表转换为红黑树
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
treeifyBin(tab, hash);
break;
}
// 如果在该链表中找到了"相等"的 key(== 或 equals)
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
// 此时 break,那么 e 为链表中[与要插入的新值的 key "相等"]的 node
break;
p = e;
}
}
// e!=null 说明存在旧值的key与要插入的key"相等"
// 对于我们分析的put操作,下面这个 if 其实就是进行 "值覆盖",然后返回旧值
if (e != null) {
V oldValue = e.value;
if (!onlyIfAbsent || oldValue == null)
e.value = value;
afterNodeAccess(e);
return oldValue;
}
}
++modCount;
// 如果 HashMap 由于新插入这个值导致 size 已经超过了阈值,需要进行扩容
if (++size > threshold)
resize();
afterNodeInsertion(evict);
return null;
}
Немного отличается от Java7 то, что Java7 вставляет новые значения после расширения, Java8 сначала интерполирует, а потом расширяет, но это не важно.
Расширение массива
Метод resize() используется дляинициализировать массивилиРасширение массива, после каждого расширения емкость удваивается, и выполняется миграция данных.
final Node<K,V>[] resize() {
Node<K,V>[] oldTab = table;
int oldCap = (oldTab == null) ? 0 : oldTab.length;
int oldThr = threshold;
int newCap, newThr = 0;
if (oldCap > 0) { // 对应数组扩容
if (oldCap >= MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return oldTab;
}
// 将数组大小扩大一倍
else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
oldCap >= DEFAULT_INITIAL_CAPACITY)
// 将阈值扩大一倍
newThr = oldThr << 1; // double threshold
}
else if (oldThr > 0) // 对应使用 new HashMap(int initialCapacity) 初始化后,第一次 put 的时候
newCap = oldThr;
else {// 对应使用 new HashMap() 初始化后,第一次 put 的时候
newCap = DEFAULT_INITIAL_CAPACITY;
newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
}
if (newThr == 0) {
float ft = (float)newCap * loadFactor;
newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
(int)ft : Integer.MAX_VALUE);
}
threshold = newThr;
// 用新的数组大小初始化新的数组
Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
table = newTab; // 如果是初始化数组,到这里就结束了,返回 newTab 即可
if (oldTab != null) {
// 开始遍历原数组,进行数据迁移。
for (int j = 0; j < oldCap; ++j) {
Node<K,V> e;
if ((e = oldTab[j]) != null) {
oldTab[j] = null;
// 如果该数组位置上只有单个元素,那就简单了,简单迁移这个元素就可以了
if (e.next == null)
newTab[e.hash & (newCap - 1)] = e;
// 如果是红黑树,具体我们就不展开了
else if (e instanceof TreeNode)
((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
else {
// 这块是处理链表的情况,
// 需要将此链表拆成两个链表,放到新的数组中,并且保留原来的先后顺序
// loHead、loTail 对应一条链表,hiHead、hiTail 对应另一条链表,代码还是比较简单的
Node<K,V> loHead = null, loTail = null;
Node<K,V> hiHead = null, hiTail = null;
Node<K,V> next;
do {
next = e.next;
if ((e.hash & oldCap) == 0) {
if (loTail == null)
loHead = e;
else
loTail.next = e;
loTail = e;
}
else {
if (hiTail == null)
hiHead = e;
else
hiTail.next = e;
hiTail = e;
}
} while ((e = next) != null);
if (loTail != null) {
loTail.next = null;
// 第一条链表
newTab[j] = loHead;
}
if (hiTail != null) {
hiTail.next = null;
// 第二条链表的新的位置是 j + oldCap,这个很好理解
newTab[j + oldCap] = hiHead;
}
}
}
}
}
return newTab;
}
получить анализ процесса
По сравнению с put, get действительно слишком прост.
- Вычислите хеш-значение ключа и найдите соответствующий индекс массива в соответствии с хеш-значением: хэш & (длина-1)
- Определяем, является ли элемент на позиции массива именно тем, что мы ищем, если нет, переходим к третьему шагу
- Определяем, является ли тип элемента TreeNode, если да, то используем метод красно-черного дерева для получения данных, если нет, переходим к четвертому шагу
- Перемещайтесь по связанному списку, пока не найдете ключ, равный (== или равный)
public V get(Object key) {
Node<K,V> e;
return (e = getNode(hash(key), key)) == null ? null : e.value;
}
final Node<K,V> getNode(int hash, Object key) {
Node<K,V>[] tab; Node<K,V> first, e; int n; K k;
if ((tab = table) != null && (n = tab.length) > 0 &&
(first = tab[(n - 1) & hash]) != null) {
// 判断第一个节点是不是就是需要的
if (first.hash == hash && // always check first node
((k = first.key) == key || (key != null && key.equals(k))))
return first;
if ((e = first.next) != null) {
// 判断是否是红黑树
if (first instanceof TreeNode)
return ((TreeNode<K,V>)first).getTreeNode(hash, key);
// 链表遍历
do {
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
return e;
} while ((e = e.next) != null);
}
}
return null;
}
Java8 ConcurrentHashMap
Если честно, ConcurrentHashMap, реализованный в Java7, довольно сложен, а в Java8 в ConcurrentHashMap были внесены относительно большие изменения. Читателям рекомендуется обратиться к изменениям HashMap в Java 8 по сравнению с HashMap в Java 7. Для ConcurrentHashMap в Java 8 также представлены красно-черные деревья.
Честно говоря, исходный код Java8 ConcurrentHashMap действительно не простой, самое сложное — расширение, да и операция переноса данных непроста для понимания.
Сначала мы используем схематическую диаграмму, чтобы описать ее структуру:
Структура в основном такая же, как у HashMap в Java8, но она должна обеспечивать безопасность потоков, поэтому исходный код действительно немного сложнее.
инициализация
// 这构造函数里,什么都不干
public ConcurrentHashMap() {
}
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
Этот метод инициализации немного интересен.Задав начальную емкость, вычисляется sizeCtl, sizeCtl = [(1,5 * initialCapacity + 1), а затем берется вверх ближайшее число 2 в n-й степени]. Если initialCapacity равно 10, то sizeCtl равно 16, если initialCapacity равно 11, то sizeCtl равно 32.
Атрибут sizeCtl используется во многих сценариях, но если вы будете следовать идеям статьи, он вас не смутит.
Если вам нравится бросание, вы также можете посмотреть на другой метод построения с тремя параметрами.Я не буду здесь говорить о нем.Большую часть времени мы будем использовать конструктор без аргументов для инстанцирования, и мы также будем следовать этой идее для анализа исходный код. .
поставить анализ процесса
Внимательно посмотрите на код построчно:
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 得到 hash 值
int hash = spread(key.hashCode());
// 用于记录相应链表的长度
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果数组"空",进行数组初始化
if (tab == null || (n = tab.length) == 0)
// 初始化数组,后面会详细介绍
tab = initTable();
// 找该 hash 值对应的数组下标,得到第一个节点 f
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 如果数组该位置为空,
// 用一次 CAS 操作将这个新值放入其中即可,这个 put 操作差不多就结束了,可以拉到最后面了
// 如果 CAS 失败,那就是有并发操作,进到下一个循环就好了
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// hash 居然可以等于 MOVED,这个需要到后面才能看明白,不过从名字上也能猜到,肯定是因为在扩容
else if ((fh = f.hash) == MOVED)
// 帮助数据迁移,这个等到看完数据迁移部分的介绍后,再理解这个就很简单了
tab = helpTransfer(tab, f);
else { // 到这里就是说,f 是该位置的头结点,而且不为空
V oldVal = null;
// 获取数组该位置的头结点的监视器锁
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) { // 头结点的 hash 值大于 0,说明是链表
// 用于累加,记录链表的长度
binCount = 1;
// 遍历链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 如果发现了"相等"的 key,判断是否要进行值覆盖,然后也就可以 break 了
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
// 到了链表的最末端,将这个新值放到链表的最后面
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) { // 红黑树
Node<K,V> p;
binCount = 2;
// 调用红黑树的插值方法插入新节点
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// binCount != 0 说明上面在做链表操作
if (binCount != 0) {
// 判断是否要将链表转换为红黑树,临界值和 HashMap 一样,也是 8
if (binCount >= TREEIFY_THRESHOLD)
// 这个方法和 HashMap 中稍微有一点点不同,那就是它不是一定会进行红黑树转换,
// 如果当前数组的长度小于 64,那么会选择进行数组扩容,而不是转换为红黑树
// 具体源码我们就不看了,扩容部分后面说
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//
addCount(1L, binCount);
return null;
}
Основной процесс установки прочитан, но осталось как минимум несколько вопросов: первый — инициализация, второй — расширение, третий — помощь в переносе данных, их мы представим по одному позже.
Инициализировать массив: initTable
Это относительно просто, главное инициализироватьподходящий размер, затем установите sizeCtl.
Проблемы параллелизма в методе инициализации контролируются выполнением операции CAS над sizeCtl.
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// 初始化的"功劳"被其他线程"抢去"了
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
// CAS 一下,将 sizeCtl 设置为 -1,代表抢到了锁
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
// DEFAULT_CAPACITY 默认初始容量是 16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
// 初始化数组,长度为 16 或初始化时提供的长度
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
// 将这个数组赋值给 table,table 是 volatile 的
table = tab = nt;
// 如果 n 为 16 的话,那么这里 sc = 12
// 其实就是 0.75 * n
sc = n - (n >>> 2);
}
} finally {
// 设置 sizeCtl 为 sc,我们就当是 12 吧
sizeCtl = sc;
}
break;
}
}
return tab;
}
Связанный список с красно-черным деревом: treeifyBin
Мы также говорили ранее в анализе исходного кода, что treeifyBin может не обязательно выполнять преобразование красно-черного дерева или может выполнять только расширение массива. Проведем анализ исходного кода.
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
// MIN_TREEIFY_CAPACITY 为 64
// 所以,如果数组长度小于 64 的时候,其实也就是 32 或者 16 或者更小的时候,会进行数组扩容
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
// 后面我们再详细分析这个方法
tryPresize(n << 1);
// b 是头结点
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
// 加锁
synchronized (b) {
if (tabAt(tab, index) == b) {
// 下面就是遍历链表,建立一颗红黑树
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
// 将红黑树设置到数组相应位置中
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
Расширение: tryPresize
Если исходный код Java8 ConcurrentHashMap непрост, то это операция расширения и операция миграции.
Чтобы полностью понять этот метод, вам нужно прочитать метод передачи позже, и читатель должен знать это заранее.
Расширение здесь тоже происходит за счет удвоения емкости, после расширения емкость массива удваивается.
// 首先要说明的是,方法参数 size 传进来的时候就已经翻了倍了
private final void tryPresize(int size) {
// c:size 的 1.5 倍,再加 1,再往上取最近的 2 的 n 次方。
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
// 这个 if 分支和之前说的初始化数组的代码基本上是一样的,在这里,我们可以不用管这块代码
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2); // 0.75 * n
}
} finally {
sizeCtl = sc;
}
}
}
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table) {
// 我没看懂 rs 的真正含义是什么,不过也关系不大
int rs = resizeStamp(n);
if (sc < 0) {
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 2. 用 CAS 将 sizeCtl 加 1,然后执行 transfer 方法
// 此时 nextTab 不为 null
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 1. 将 sizeCtl 设置为 (rs << RESIZE_STAMP_SHIFT) + 2)
// 我是没看懂这个值真正的意义是什么?不过可以计算出来的是,结果是一个比较大的负数
// 调用 transfer 方法,此时 nextTab 参数为 null
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
Суть этого метода заключается в работе со значением sizeCtl, сначала установить его в отрицательное число, затем выполнить перенос (tab, null), а затем добавить 1 к sizeCtl в следующем цикле и выполнить перенос (tab, nt) , а затем продолжите sizeCtl, добавьте 1 и выполните transfer(tab, nt).
Таким образом, возможным действием является выполнение1 передача (вкладка, null) + несколько передач (вкладка, nt), как закончить цикл здесь нужно прочитать исходный код передачи, чтобы понять.
Миграция данных: передача
Следующий метод немного длинный и переносит элементы исходного массива вкладок в новый массив nextTab.
Хотя многократный вызов метода передачи в методе tryPresize, о котором мы упоминали ранее, не включает несколько потоков, этот метод передачи можно вызывать в других местах. Обычно мы говорили об этом, когда говорили о методе put. См. метод put выше. есть место для вызова метода helpTransfer, метод helpTransfer вызовет метод передачи.
Этот метод поддерживает многопоточное выполнение. Когда этот метод вызывается извне, он гарантирует, что в первом потоке, инициирующем перенос данных, параметр nextTab имеет значение null, а при последующем вызове этого метода параметр nextTab не будет иметь значение null.
Прежде чем читать исходный код, необходимо понять механизм параллельной работы. Длина исходного массива равна n, поэтому у нас есть n задач миграции.Проще всего каждому потоку одновременно отвечать за небольшую задачу.После выполнения задачи проверьте, есть ли другие незавершенные задачи и помогите с миграцией , и Дуг Ли использовал шаг, простое пониманиеразмер шага, каждый поток каждый раз отвечает за перенос его части, например 16 небольших задач каждый раз. Следовательно, нам нужен глобальный планировщик, чтобы определить, какие потоки выполняют какие задачи.Это роль атрибута transferIndex.
Первый поток, который инициирует миграцию данных, укажет TransferIndex на последнюю позицию исходного массива, а затемзадом напередШаговые задачи относятся к первому потоку, затем TransferIndex указывает на новое местоположение, шаговые задачи впереди принадлежат второму потоку и так далее. Конечно, второй поток, упомянутый здесь, не обязательно относится ко второму потоку, это также может быть один и тот же поток, этот читатель должен быть в состоянии это понять. По сути, это разделение большой задачи миграции на пакеты задач.
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// stride 在单核下直接等于 n,多核模式下为 (n>>>3)/NCPU,最小值是 16
// stride 可以理解为”步长“,有 n 个位置是需要进行迁移的,
// 将这 n 个任务分为多个任务包,每个任务包有 stride 个任务
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 如果 nextTab 为 null,先进行一次初始化
// 前面我们说了,外围会保证第一个发起迁移的线程调用此方法时,参数 nextTab 为 null
// 之后参与迁移的线程调用此方法时,nextTab 不会为 null
if (nextTab == null) {
try {
// 容量翻倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
// nextTable 是 ConcurrentHashMap 中的属性
nextTable = nextTab;
// transferIndex 也是 ConcurrentHashMap 的属性,用于控制迁移的位置
transferIndex = n;
}
int nextn = nextTab.length;
// ForwardingNode 翻译过来就是正在被迁移的 Node
// 这个构造方法会生成一个Node,key、value 和 next 都为 null,关键是 hash 为 MOVED
// 后面我们会看到,原数组中位置 i 处的节点完成迁移工作后,
// 就会将位置 i 处设置为这个 ForwardingNode,用来告诉其他线程该位置已经处理过了
// 所以它其实相当于是一个标志。
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// advance 指的是做完了一个位置的迁移工作,可以准备做下一个位置的了
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
/*
* 下面这个 for 循环,最难理解的在前面,而要看懂它们,应该先看懂后面的,然后再倒回来看
*
*/
// i 是位置索引,bound 是边界,注意是从后往前
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 下面这个 while 真的是不好理解
// advance 为 true 表示可以进行下一个位置的迁移了
// 简单理解结局:i 指向了 transferIndex,bound 指向了 transferIndex-stride
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
// 将 transferIndex 值赋给 nextIndex
// 这里 transferIndex 一旦小于等于 0,说明原数组的所有位置都有相应的线程去处理了
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
// 看括号中的代码,nextBound 是这次迁移任务的边界,注意,是从后往前
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
// 所有的迁移操作已经完成
nextTable = null;
// 将新的 nextTab 赋值给 table 属性,完成迁移
table = nextTab;
// 重新计算 sizeCtl:n 是原数组长度,所以 sizeCtl 得出的值将是新数组长度的 0.75 倍
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 之前我们说过,sizeCtl 在迁移前会设置为 (rs << RESIZE_STAMP_SHIFT) + 2
// 然后,每有一个线程参与迁移就会将 sizeCtl 加 1,
// 这里使用 CAS 操作对 sizeCtl 进行减 1,代表做完了属于自己的任务
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 任务结束,方法退出
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// 到这里,说明 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT,
// 也就是说,所有的迁移任务都做完了,也就会进入到上面的 if(finishing){} 分支了
finishing = advance = true;
i = n; // recheck before commit
}
}
// 如果位置 i 处是空的,没有任何节点,那么放入刚刚初始化的 ForwardingNode ”空节点“
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 该位置处是一个 ForwardingNode,代表该位置已经迁移过了
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// 对数组该位置处的结点加锁,开始处理数组该位置处的迁移工作
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// 头结点的 hash 大于 0,说明是链表的 Node 节点
if (fh >= 0) {
// 下面这一块和 Java7 中的 ConcurrentHashMap 迁移是差不多的,
// 需要将链表一分为二,
// 找到原链表中的 lastRun,然后 lastRun 及其之后的节点是一起进行迁移的
// lastRun 之前的节点需要进行克隆,然后分到两个链表中
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 其中的一个链表放在新数组的位置 i
setTabAt(nextTab, i, ln);
// 另一个链表放在新数组的位置 i+n
setTabAt(nextTab, i + n, hn);
// 将原数组该位置处设置为 fwd,代表该位置已经处理完毕,
// 其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了
setTabAt(tab, i, fwd);
// advance 设置为 true,代表该位置已经迁移完毕
advance = true;
}
else if (f instanceof TreeBin) {
// 红黑树的迁移
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 如果一分为二后,节点数少于 8,那么将红黑树转换回链表
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
// 将 ln 放置在新数组的位置 i
setTabAt(nextTab, i, ln);
// 将 hn 放置在新数组的位置 i+n
setTabAt(nextTab, i + n, hn);
// 将原数组该位置处设置为 fwd,代表该位置已经处理完毕,
// 其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了
setTabAt(tab, i, fwd);
// advance 设置为 true,代表该位置已经迁移完毕
advance = true;
}
}
}
}
}
}
Ведь метод transfer не реализует все задачи миграции, каждый раз при вызове этого метода он реализует только миграцию transferIndex в положение шага, а всем остальным нужно управлять на периферии.
Сейчас, возможно, будет понятнее вернуться назад и внимательно изучить метод tryPresize.
получить анализ процесса
Метод get всегда самый простой, и это не исключение:
- Рассчитать хэш-значение
- Найдите соответствующую позицию массива в соответствии с хеш-значением: (n - 1) & h
- В соответствии с характером узла в локации выполняется соответствующий поиск
- Если позиция равна нулю, просто верните значение null напрямую
- Если узел в этой позиции именно то, что нам нужно, просто верните значение узла
- Если значение хеш-функции узла в этом месте меньше 0, это означает, что он расширяется, или это красно-черное дерево, метод find мы представим позже.
- Если вышеуказанные 3 пункта не удовлетворены, это связанный список, и вы можете просмотреть и сравнить
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 判断头结点是否就是我们需要的节点
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 如果头结点的 hash 小于 0,说明 正在扩容,或者该位置是红黑树
else if (eh < 0)
// 参考 ForwardingNode.find(int h, Object k) 和 TreeBin.find(int h, Object k)
return (p = e.find(h, key)) != null ? p.val : null;
// 遍历链表
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
Проще говоря, большая часть содержимого этого метода очень проста, только в случае расширения ForwardingNode.find(int h, Object k) немного сложнее, но после понимания процесса переноса данных это не сложно, поэтому я не буду его здесь расширять из-за нехватки места.
Суммировать
После того, как я написал эту статью, я задумался о том, в чем ценность этой статьи?
С точки зрения анализа исходного кода, я считаю, что моя статья действительно качественная, хотя она и не анализирует исходный код построчно, как предыдущие AQS и пулы потоков, но все же углубляется в те места, где все новички могут запутаться. Пока вы не умеете читать, вы сможете легко понять исходный код HashMap и ConcurrentHashMap.
Однако, с точки зрения учащегося, насколько ценно глубокое понимание исходного кода HashMap и ConcurrentHashMap?
Я не думаю, что это многого стоит. Если это исходный код AQS или исходный код пула потоков, я думаю, читатель может многое узнать, разобравшись с ним. Однако исходный код HashMap и ConcurrentHashMap, представленный в этой статье, может иметь наибольшую ценность для проведения интервью. Юнби~~~
Однако взгляды на это у всех могут быть разные, и это как-то связано с запасом знаний каждого. Я все написал, и бесполезно так много говорить. Если это нужно для интервью, то оно просто необходимо, пожалуйста, примите это с улыбкой. Если вы считаете, что что-то не так с текстом, пожалуйста, поправьте меня~~~
(Конец полного текста)