java.util.concurrency.atomic.LongAdder
Это новый класс в Java8, предоставляющий метод атомарного накопления значений. Согласно документации его производительность лучше, чемAtomicLong
, на следующем рисунке показано простое сравнение тестов (платформа: MBP):
Тест здесь основан на JDK1.8.AtomicLong был оптимизирован для платформы x86, начиная с Java8, и заменил операцию CAS на XADD.Мы знаем, что атомарные классы, предоставляемые JUC, основаны на классе Unsafe, а Unsafe предоставляет CAS. Способность. CAS (compare-and-swap) — это, по сути, атомарная инструкция, реализованная на аппаратном уровне современными ЦП, позволяющая выполнять неблокирующие многопоточные операции с данными, принимая во внимание как безопасность, так и эффективность. В большинстве случаев CAS может обеспечить хорошую производительность, но накладные расходы могут увеличиваться в геометрической прогрессии в случае высокой конкуренции. Конкретные исследования см. в этой статье.статья,
Давайте посмотрим непосредственно на код:
public class AtomicLong {
public final long incrementAndGet() {
return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
}
}
public final class Unsafe {
public final long getAndAddLong(Object var1, long var2, long var4) {
long var6;
do {
var6 = this.getLongVolatile(var1, var2);
} while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));
return var6;
}
}
скопировать код
getAndAddLong
Метод будет использовать семантику volatile для чтения последнего значения поля, которое необходимо увеличить, а затем попытается обновить его через CAS.Обычно он возвращается сразу после успеха, но при высоком уровне параллелизма может быть много потоков. Это означает, что срок действия последнего значения, прочитанного потоком А, фактически истек, поэтому его необходимо постоянно повторять в цикле while, что вызывает много ненужных накладных расходов, а xadd будет относительно более длительным. эффективный, псевдокод следующий, самое главное Дело в том, что следующий код является атомарным, а это значит, что другие потоки не могут прервать его выполнение или увидеть промежуточные значения.Данная инструкция напрямую поддерживается на аппаратном уровне:
function FetchAndAdd(address location, int inc) {
int value := *location
*location := value + inc
return value
}
Производительность LongAdder намного лучше вышеперечисленного, поэтому я его изучил. Во-первых, у него есть базовая ценностная база, в случае конкуренции будет массив Ячеек для разделения операций разных потоков на разные узлы (он будет расширяться по мере необходимости, вплоть до количества ядер ЦП) ,sum()
Значение и база во всех массивах ячеек будут накапливаться как возвращаемое значение. Основная идея состоит в том, чтобы распространить давление обновления одного значения AtomicLong на несколько значений, тем самым понизив точку доступа обновления.
public class LongAdder extends Striped64 implements Serializable {
//...
}
LongAdder наследуется отStriped64
,Striped64
Внутренне поддерживает отложенный загружаемый массив и дополнительныйbase
Поле прочности, размер массива равен 2 в N-й степени, используется каждый потокThread
Доступ к внутреннему хешу.
abstract class Striped64 extends Number {
/** Number of CPUS, to place bound on table size */
static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* Table of cells. When non-null, size is a power of 2.
*/
transient volatile Cell[] cells;
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
}
Элементы массиваCell
класс, вы можете видеть, что класс Cell украшен аннотацией Contended, здесь в основном для решения ложного разделения (проблемы ложного разделения), но я лично думаю, что перевод ложного разделения не очень хорош
, или это должно быть неправильное совместное использование, например, две изменчивые переменные выделены в одну и ту же строку кэша, но два обновления будут конкурировать в условиях высокой параллелизма, например, поток A для обновления переменной a, поток B для обновления переменной b, но эти два переменные размещаются в одной и той же строке кеша, поэтому каждый поток будет конкурировать за владение строкой кеша. Например, A получает право собственности, а затем выполняет обновление. В это время из-за семантики volatile он будет очищен в основную память, но поскольку переменная b также кэшируется в той же строке кэша, это приведет к кэшированию
пропустить, что приведет к большой потере производительности, поэтому некоторые авторы библиотек классов, такие как JUC, Disruptor и т. д., использовали метод вставки фиктивных переменных, чтобы сделать строку кэша эксклюзивной, например следующий код:
static final class Cell {
volatile long p0, p1, p2, p3, p4, p5, p6;
volatile long value;
volatile long q0, q1, q2, q3, q4, q5, q6;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
Но этот метод все-таки не универсален, например, размер строки кэша 32-битной и 64-битной операционных систем разный, поэтому в JAVA8 добавлено примечание.@sun.misc.Contended
Решение используется для решения этой проблемы, и JVM будет вставлять эти переменные.Подробности см. на openjdk.java.net/jeps/142, но в целом объекты обычно размещаются в памяти неравномерно, а массивы - это непрерывная память. . , поэтому строки кэша могут быть общими, поэтому добавьте сюда аннотацию Contended, чтобы предотвратить ложное совместное использование массива ячеек.
/**
* 底竞争下直接更新base,类似AtomicLong
* 高并发下,会将每个线程的操作hash到不同的
* cells数组中,从而将AtomicLong中更新
* 一个value的行为优化之后,分散到多个value中
* 从而降低更新热点,而需要得到当前值的时候,直接
* 将所有cell中的value与base相加即可,但是跟
* AtomicLong(compare and change -> xadd)的CAS不同,
* incrementAndGet操作及其变种
* 可以返回更新后的值,而LongAdder返回的是void
*/
public class LongAdder {
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
/**
* 如果是第一次执行,则直接case操作base
*/
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
/**
* as数组为空(null或者size为0)
* 或者当前线程取模as数组大小为空
* 或者cas更新Cell失败
*/
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
public long sum() {
//通过累加base与cells数组中的value从而获得sum
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
}
/**
* openjdk.java.net/jeps/142
*/
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
abstract class Striped64 extends Number {
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
/**
* 若getProbe为0,说明需要初始化
*/
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
/**
* 失败重试
*/
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
/**
* 若as数组已经初始化,(n-1) & h 即为取模操作,相对 % 效率要更高
*/
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {//这里casCellsBusy的作用其实就是一个spin lock
//可能会有多个线程执行了`Cell r = new Cell(x);`,
//因此这里进行cas操作,避免线程安全的问题,同时前面在判断一次
//避免正在初始化的时其他线程再进行额外的cas操作
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
//重新检查一下是否已经创建成功了
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot 现在是非空了,continue到下次循环重试
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;//若cas更新成功则跳出循环,否则继续重试
else if (n >= NCPU || cells != as) // 最大只能扩容到CPU数目, 或者是已经扩容成功,这里只有的本地引用as已经过期了
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // 扩容
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
//重新计算hash(异或)从而尝试找到下一个空的slot
h = advanceProbe(h);
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
/**
* 默认size为2
*/
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x : // 若已经有另一个线程在初始化,那么尝试直接更新base
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
final boolean casCellsBusy() {
return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}
static final int getProbe() {
/**
* 通过Unsafe获取Thread中threadLocalRandomProbe的值
*/
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long BASE;
private static final long CELLSBUSY;
private static final long PROBE;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> sk = Striped64.class;
BASE = UNSAFE.objectFieldOffset
(sk.getDeclaredField("base"));
CELLSBUSY = UNSAFE.objectFieldOffset
(sk.getDeclaredField("cellsBusy"));
Class<?> tk = Thread.class;
//返回Field在内存中相对于对象内存地址的偏移量
PROBE = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomProbe"));
} catch (Exception e) {
throw new Error(e);
}
}
}
Поскольку ячейка занимает относительно большой объем памяти, здесь принят метод ленивой загрузки, и базовое поле обновляется напрямую без конкуренции.При возникновении первой конкуренции (сбой CAS) будет создан массив ячеек размером 2 ., каждое расширение удваивается, только чтобы достичь количества ядер ЦП. В то же время мы знаем, что такие действия, как расширение массива, должны выполняться только одним потоком одновременно, поэтому требуется блокировка.Здесь простая спин-блокировка реализуется путем обновления CellBusy через CAS.
Индекс доступа к массиву реализован по модулю поля threadLocalRandomProbe в Thread.Это поле обновляется с помощью ThreadLocalRandom.Размер ячеек массива ограничен количеством ядер ЦП, потому что даже если потоков больше, чем количество ядер для обновления, каждый поток будет привязан только к одному ЦП, и при обновлении будет не более нескольких потоков ядер ЦП, поэтому нам нужно только дискретизировать поведение обновления разных потоков в разных слотах через хэш.
Мы знаем, что потоки и пулы потоков будут закрыты или уничтожены.В это время слот, занимаемый этим потоком, может стать неиспользуемым, но мы не можем его очистить, потому что общие веб-приложения долго работают, и потоки обычно также будут динамически создается и уничтожается, и, вероятно, через некоторое время он будет занят другими потоками.Для краткосрочного запуска, такого как модульные тесты, какой смысл его очищать?
Суммировать
В общем, LongAdder намного лучше, чем AtomicLong с точки зрения производительности.В общем, его можно использовать напрямую вместо AtomicLong.Netty также инкапсулирует эти два класса через интерфейс и использует LongAdder непосредственно под Java8, но ряд методов AtomicLong может не только самоинкрементироваться, но и получать обновленное значение, было бы удобнее, например, получить глобально уникальный ID или использовать AtomicLong.