структура
Реализация JDK1.8 отбрасываетSegment
Концепция структуры данных напрямую связанного списка Массив узлов + + красно-черное дерево, реализованное с использованием управления параллелизмомSynchronized
иCAS
работать,
sizeCtl
Он используется для управления инициализацией и расширением таблицы.Разные значения имеют разное значение.
- Когда таблица не инициализирована: sizeCtl = 0 или sizeCtl = емкость;
- таблица инициализируется: sizeCtl = -1;
- Инициализация таблицы завершена: sizeCtl = порог;
- Когда sizeCtl = - (1 + N), это указывает на то, что имеется N потоков, выполняющих операцию изменения размера;
ПОЛОЖИТЬ () метод
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
//hash操作
int hash = spread(key.hashCode());
int binCount = 0;
//bitCount表示i处的节点数量,用来判断是否转换成红黑树
for (Node<K,V>[] tab = table;;) {//CAS插入
Node<K,V> f; int n, i, fh;
//除非构造时指定初始化集合,否则默认构造不初始化table
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//当前节点为NULL,CAS设置新头节点
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
//当前节点正在扩容,让当前线程帮助扩容,扩容完指向新的table
else {
//正常插入到链表或者红黑树当中
V oldVal = null;
//对当前节点加锁
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {//表明是链表结点类型
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
//onlyIfAbsent表示是新元素才加入,旧值不替换,默认为fase。
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
//加入链表尾部
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {//是红黑树
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
//默认table的一个链表结点数超过8个数据结构会转为红黑树
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);//更新size,检测扩容
return null;
}
хеш-алгоритм
// &HASH_BITS是为了保证hash为正数
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
инициализация
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
// 多线程同时初始化(即sizeCtl = -1 )时,竞争失败的线程yield
Thread.yield();
// CAS设置 sizeCtl = -1
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);//扩容阈值为新容量的0.75倍
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
функция treeifyBin
Функция treeifyBin обрабатывает слишком много узлов, что примерно соответствует HashMap.
- Таблица относительно небольшая (менее MIN_TREEIFY_CAPACITY=64), сначала измените размер;
- Когда таблица достигает определенного числа, преобразуйте красно-черное дерево (длина связанного списка> 8)
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
// 如果tab.length < MIN_TREEIFY_CAPACITY=64,先resize
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0)
//转换成红黑树
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
Расширение
Если количество элементов в связанном списке больше или равно 8 после добавления нового узла, будет вызвана команда treeifyBin для преобразования связанного списка в красно-черное дерево. При преобразовании структуры, если длина вкладки меньше MIN_TREEIFY_CAPACITY, а значение по умолчанию равно 64, длина массива будет удвоена, а передача будет инициирована для перенастройки позиции узла. (Только если tab.length >= 64, ConcurrentHashMap будет использовать красно-черное дерево.)
// tab是旧table,nextTab是新table
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
//stride表示每个线程处理桶的最小数目。
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//table两倍扩容
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;//扩容总进度,>=transferIndex的桶都已分配出去。
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
//advance表示是否迁移完成
boolean advance = true;
//finish表示扩容是否结束
boolean finishing = false;
for (int i = 0, bound = 0;;) {
//处理一个 stride 长度的任务,i 会被赋值为该 stride 内最大的下标,而 bound 会被赋值为该 stride 内最小的下标
//通过循环不断减小 i 的值,从右往左依次迁移桶上面的数据,直到 i 小于 bound 时结束该次长度为 stride 的迁移任务
//结束这次的任务后会通过外层 addCount、helpTransfer、tryPresize 方法的 while 循环达到继续领取其他任务的效果
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
//transferIndex <= 0 说明数组的hash桶已被线程分配完毕,没有了待分配的hash桶,将 i 设置为 -1 ,后面的代码根据这个数值退出当前线的扩容操作
i = -1;
advance = false;//false表示没处理完当前桶
}
//只有首次进入for循环才会进入这个判断里面去,设置 bound 和 i 的值,也就是领取到的迁移任务的数组区间
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
//确定当前线程每次分配的待迁移桶的范围为[bound, nextIndex)
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// i<0表示当前线程的任务已经做完,后两个判断是边界检查
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
//扩容结束后做后续工作,将 nextTable 设置为 null,表示扩容已结束,将 table 指向新数组,sizeCtl 设置为扩容阈值
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//每当一条线程扩容结束就会更新一次 sizeCtl 的值,进行减 1 操作
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//如果下面为false,说明该线程不是扩容大军里面的最后一条线程,直接return
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
//如果为true,修改标志位,并且重新检查有没有遗漏的
finishing = advance = true;
i = n; // recheck before commit
//重新检查一遍
//正常情况下,tab应该全都是ForwardingNode
//如果出现问题,多个线程同时申请到了一个transfer,此时当前线程领取的任务作废
//重新检查时候要处理作废而没被迁移的桶
}
}
else if ((f = tabAt(tab, i)) == null)
//桶为null,直接放一个ForwardingNode
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
//数组上遇到hash值为MOVED,也就是 -1 的位置,说明该位置已经被其他线程迁移过了,
//将 advance 设置为 true ,以便继续往下一个桶检查并进行迁移操作
advance = true; // already processed
else {
//数据迁移
synchronized (f) {
//桶内元素迁移加锁
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
//fh >= 0说明是链表
int runBit = fh & n;//n是2的幂次,所以runBit只能为1或者0
Node<K,V> lastRun = f;//lastRun指向最后一个相邻runBit不同的节点
for (Node<K,V> p = f.next; p != null; p = p.next) {
//遍历整条链表,找出 lastRun 节点
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
//头插,以lastRun分界拆成hn和ln
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
//四个参数分别是hash值,key,value,next节点,所以是头插法
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
//ln 链设置到新数组下标为 i 的位置上
setTabAt(nextTab, i + n, hn);
//hn 链设置到新数组下标为 i + n(n为原数组长度) 的位置上
setTabAt(tab, i, fwd);
//在原table中设置ForwardingNode节点以提示该桶扩容完成。
advance = true;
//advance 设置为 true 表示当前 hash 桶已处理完,可以继续处理下一个 hash 桶
}
else if (f instanceof TreeBin) {
//红黑树处理与链表类似
//同样也是使用高位和低位两条链表进行迁移
//在迁移过程中,判断是否需要转换成红黑树
//1、如果符合条件则直接将 TreeNode 链表转为红黑树,再设置到新数组中去
//2、如果不符合条件则将 TreeNode 转换为普通的 Node 节点,再将该普通链表设置到新数组中去
......
}
}
}
}
}
}
Роль ForwardingNode:
- Функция-заполнитель, которая используется для идентификации корзины в этой позиции массива, была перенесена и находится в состоянии расширения.
- В качестве функции пересылки, если операция запроса встречается в течение периода расширения, если она сталкивается с пересылающим узлом, операция запроса будет перенаправлена в новый массив, и операция запроса не будет заблокирована.
Распределение расширения при многопоточности показано на рисунке:
статистика размера
Количество элементов ConcurrentHashMap равно сумме baseCounter и значения каждой ячейки CounterCell в массиве. Причина этого в том, что когда несколько потоков одновременно выполняют CAS для изменения значения baseCount, сбойный поток помещает значение в CounterCell. При вычислении размера значения элементов в массивах baseCount и CounterCell суммируются для получения общего размера, но это число все равно может быть неточным.
private transient volatile long baseCount;
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 如果cas修改 baseCount 失败
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
// 如果计数盒子是空(尚未出现并发)
// 如果随机取余一个数组位置为空 或者
// 修改这个槽位的变量失败(出现并发了)
// 执行 fullAddCount 方法。并结束
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
// 如果需要检查,检查是否需要扩容,在 putVal 方法调用时,默认就是要检查的。
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 如果map.size() 大于 sizeCtl(达到扩容阈值需要扩容) 且
// table 不是空;且 table 的长度小于 1 << 30。(可以扩容)
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// 根据 length 得到一个标识
int rs = resizeStamp(n);
// 如果正在扩容
if (sc < 0) {
// 如果 sc 的低 16 位不等于 标识符(校验异常 sizeCtl 变化了)
// 如果 sc == 标识符 + 1 (扩容结束了,不再有线程进行扩容)(默认第一个线程设置 sc ==rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1)
// 如果 sc == 标识符 + 65535(帮助线程数已经达到最大)
// 如果 nextTable == null(结束扩容了)
// 如果 transferIndex <= 0 (转移状态变化了)
// 结束循环
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 如果可以帮助扩容,那么将 sc 加 1. 表示多了一个线程在帮助扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
// 扩容
transfer(tab, nt);
}
// 如果不在扩容,将 sc 更新:标识符左移 16 位 然后 + 2. 也就是变成一个负数。高 16 位是标识符,低 16 位初始是 2.
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
// 更新 sizeCtl 为负数后,开始扩容。
transfer(tab, null);
s = sumCount();
}
}
}
Контркретл: используется@sun.misc.Contended
Помеченный класс, который внутри содержитvolatile
Переменная.@sun.misc.Contended
Эта аннотация идентифицирует класс, чтобы предотвратить необходимость предотвращения «ложного совместного использования».
Кэш-система хранится в единицах строк кеша. Строка кэша представляет собой целую степень двух последовательных байтов, обычно 32–256 байтов. Наиболее распространенный размер строки кэша составляет 64 байта. Когда несколько потоков изменяют переменные, которые независимы друг от друга, если эти переменные совместно используют одну и ту же строку кэша, это непреднамеренно повлияет на производительность друг друга, что является ложным разделением.