Атомный класс Action Longaddr был добавлен в атомный jdk1.8. По сравнению с Atomiclong, эффективность была значительно улучшена.
Принцип реализации
Атомарная операция AtomicLong обновляется через CAS.При возникновении конфликта он будет ждать атомарной операции, вращаясь, а именно:
public final long updateAndGet(LongUnaryOperator updateFunction) {
long prev, next;
do {
prev = get();
next = updateFunction.applyAsLong(prev);
} while (!compareAndSet(prev, next));
return next;
}
Как видно из приведенного выше исходного кода, когда конфликтов атомарного обновления мало, эффективность обновления по-прежнему очень высока, и он обновляется непосредственно через CAS, но поскольку обновляется один и тот же элемент, когда операция обновления выполняется относительно часто, это происходит Вероятность конфликта возрастет, эффективность уменьшится, и теперь проблема становится:
Как снизить вероятность конфликтов?
В многопоточной среде поведение потока непредсказуемо, поэтому есть возможность подсуетиться над обновляемым элементом, в AtomicLong вызовом переменной-членаprivate volatile long value
Операция атомной обновления выполняется, поэтому конфликт также вокруг этого элемента. Давайте посмотрим, как Doug Lead делает это:
- инициализировать
volatile long base
с однимint [] cell
, элементы массива инициализируются 0. - Когда поток выполняет атомарную операцию, приоритет отдается базовой переменной CAS. Если нет конфликта, обновление выполняется успешно. Если есть конфликт, схема спина отменяется и выполняется шаг 3.
- Сопоставьте поток с элементом в массиве ячеек с помощью хэш-кода, а затем обновите атомарную операцию для элемента массива.
- Наконец, путем наложения каждого элемента в базе и ячейке можно получить значение атомарной операции.
В LongAddr операция атомарного обновления рассредоточена на базовую переменную base и массив, что снижает вероятность конфликтов и повышает эффективность обновления.
Реализация
Longaddr унаследовал от Striped64, специфическая операция достигается Plipeded64, Plipeded64-члена изменчивости следующим образом:
public class LongAdder extends Striped64 implements Serializable {
//对应上面的数组
transient volatile Cell[] cells;
//基础更新变量
transient volatile long base;
//同步对cells进行操作的
transient volatile int cellsBusy;
Определение ячейки выглядит следующим образом:
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
Аннотация @sun.misc.Contended предназначена для решения проблемы ложного обмена.Подробности см. здесь.Говоря о псевдосовместном использовании.
Cell содержит только поведение атомарной операции cas, в основном операцию атомарного обновления элементов в массиве.
операция обновления
public void add(long x) {
Cell[] as;
long b, v;
int m;
Cell a;
//如果cell不为空,或者cell为空但是caseBase失败
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
//1. cells 为空
//2. cells不为空,但是cells中对应本线程位置为null
//3. cells中本线程对应的位置不为空,但是对本位置操作case失败
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
- Если массив ячеек пуст, непосредственно выполните операцию cas casBase для базовой переменной base.Если выполнение casBase завершается неудачно, возникает конфликт, и перейдите к шагу 3.
- Если массив ячеек не пуст, в это время, чтобы уменьшить вероятность конфликта, сначала обновите массив ячеек и непосредственно выполните шаг 3.
- Выполнение подлогического решения: 3.1 Если ячейки пусты, сразу переходите к шагу 4. 3.2.Если ячейки не пустые, но элементы в массиве ячеек, соответствующем этому потоку, равны нулю, перейти к шагу 4. 3.3.Если ячейки не пусты и элемент в массиве, соответствующем этому потоку, не равен нулю, выполнить операцию cas над элементом, если выполнение завершается успешно, а если выполнение завершается неудачей, перейти к шагу 4.
- Выполните подлогику обновления, подробности см. в комментариях к исходному коду.
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
// 初始化Thread的种子和随机数
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
//步骤6
for (;;) {
Cell[] as; Cell a; int n; long v;
//如果cells数据不为空,即满足条件3.2和3.3时
if ((as = cells) != null && (n = as.length) > 0) {
//如果被hash到的cells数组位置为null(对应条件3.2)
if ((a = as[(n - 1) & h]) == null) {
//如果此时cells数组中同步锁为空
if (cellsBusy == 0) {
//初始化Cell元素,初试化为x
Cell r = new Cell(x);
//设置cellsBusy锁,避免其他线程同时修改cells数组
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try {
//初始化cells数组中本线程对应的元素
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
//恢复cellsBusy锁
cellsBusy = 0;
}
//如果创建成功则退出,否则continue
if (created)
break;
/*如果created为false,说明上面指定的cells数组的位置
cells[m%cells.length]已经有其它线程设置了cell了,
继续重新开始循环。*/
continue;
}
}
//如果cellsBusy=1,说明有线程正在更改cells数组,
将collide设置为false
collide = false;
}
/*如果被hash到的cells数组位置不为null(对应3.3),
说明已经发生竞争,将wasUncontended设置为true,
最后重新计算一个新的probe,然后重新执行循环。
*/
else if (!wasUncontended)
wasUncontended = true;
/*如果当前线程第一次参与cells争用的cas失败,
这里会尝试将x值加到cells[m%cells.length]的value ,
如果成功直接退出*/
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
//如果cells数组的长度已经到了最大值(大于等于cup数量),
或者是当前cells已经做了扩容,则将collide设置为false,
后面重新计算prob的值.*/
else if (n >= NCPU || cells != as)
collide = false;
/*如果发生了冲突collide=false,则设置其为true;
会在最后重新计算hash值后,进入下一次for循环*/
else if (!collide)
collide = true;
/*扩容cells数组,新参与cell争用的线程两次均失败,
且符合库容条件,会执行该分支*/
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) {
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue;
}
//重新计算线程的hash值
h = advanceProbe(h);
}
//如果满足3.1 和 3.2 则初始化cells数组
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try {
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
//如果以上操作都失败了,则尝试将值累加到base上
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
}
}
Приведенный выше код относительно прост в написании, но логика относительно запутана, главное синхронно работать между обновлением данных базы и ячеек, чтобы уменьшить вероятность конфликта и повысить эффективность.
получить результаты
Так как в многопоточной среде операция обновления данных разбросана по базовой переменной и каждому элементу в массиве ячеек, поэтому все значения должны быть наложены друг на друга для каждого результата расчета.
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
Вычисление хеш-значения потока
Основная логика реализации:
- В классе потока Thread хранятся две переменные:
@sun.misc.Contended("tlr")
int threadLocalRandomProbe; //此线程的随机数
@sun.misc.Contended("tlr")
int threadLocalRandomSecondarySeed; //此线程的随机种子
- Класс ThreadLocalRandom будет обновлять начальное число каждого класса потока и вычислять случайное число потока в соответствии с начальным числом, чтобы не было затрат на синхронизацию случайных чисел между потоками и повышалась эффективность.
//判断该线程的随机数是否已经初始化,如果没有则执行localInit初始化
public static ThreadLocalRandom current() {
if (UNSAFE.getInt(Thread.currentThread(), PROBE) == 0)
localInit();
return instance;
}
//更新线程的种子数和随机数
static final void localInit() {
int p = probeGenerator.addAndGet(PROBE_INCREMENT);
int probe = (p == 0) ? 1 : p; // skip 0
long seed = mix64(seeder.getAndAdd(SEEDER_INCREMENT));
Thread t = Thread.currentThread();
UNSAFE.putLong(t, SEED, seed);
UNSAFE.putInt(t, PROBE, probe);
}
- Если есть конфликт, вы можете перефразировать поток следующим образом:
static final int advanceProbe(int probe) {
probe ^= probe << 13; // xorshift
probe ^= probe >>> 17;
probe ^= probe << 5;
UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
return probe;
}
над.