Интервью обязательно: подробное описание Java JUC LongAdder [бутиковая длинная статья]

Java

На основе OpenJDK 12

Прежде чем читать эту статью, рекомендуется прочитать следующие две статьи для лучшего сравнения и понимания:

LongAdder — это новый атомарный класс в JDK 1.8, основанный на реализации Striped64. Согласно официальной документации, LongAdder будет иметь лучшую производительность, чем AtomicLong, в сценариях с высоким параллелизмом за счет использования большего объема памяти:

This class is usually preferable to AtomicLong when multiple threads update a common sum that is used for purposes such as collecting statistics, not for fine-grained synchronization control. Under low update contention, the two classes have similar characteristics. But under high contention, expected throughput of this class is significantly higher, at the expense of higher space consumption.

Так как же реализован LongAdder?

Сначала взгляните на диаграмму классов LongAdder:

Базовый класс Number, Number — это абстрактный класс без какой-либо логики.Этот класс является базовым классом для byte, double, float, int, long и short.

Striped64

дизайн-мышление

Эта часть переведена из комментариев к исходному коду Striped64 и может быть пропущена.

Рассеять горячие точки, рассредоточить значения значений в массив, разные потоки будут попадать в разные слоты в массиве, и каждый поток выполняет операцию CAS только над значением в своем слоте, так что горячие точки рассредоточены, а вероятность конфликтов гораздо меньше. Если вы хотите получить реальное длинное значение, просто накапливайте значения переменных в каждом слоте и возвращайтесь.


Как видно из аннотации класса Striped64:

Striped64 используется в пакете для обеспечения унифицированной реализации динамического сегментирования 64-битных элементов (это немного похоже на: AbstractQueuedSynchronizer). Striped64 наследует класс Number, а это означает, что подкласс конкретной реализации также должен реализовывать соответствующий контент.

Класс поддерживает таблицу атомарно обновляемых переменных с ленивой инициализацией и дополнительное «базовое» поле. Размер таблицы равен степени числа 2. Индекс использует хэш-код каждого потока под маской. Почти все объявления в этом классе являются частными для пакета и напрямую доступны для подклассов.

Элементы внутри таблицы относятся к классу Cell, который представляет собой вариант AtomicLong, заполненный для уменьшения конкуренции за кэш. Заполнение является избыточным для большинства атомов, поскольку они обычно неравномерно разбросаны в памяти, чтобы не слишком сильно мешать друг другу. Однако атомарные объекты, находящиеся в массивах, как правило, располагаются рядом друг с другом, поэтому без этой меры предосторожности наиболее распространенным случаем является совместное использование строк кэша (что оказывает большое негативное влияние на производительность).

Отчасти из-за того, что классы Cell относительно велики, мы создаем их только тогда, когда они действительно нужны. Если конкуренции нет, то все операции по обновлению будут выполняться на базовом поле. При возникновении первого конфликта (т. е. при сбое первой операции CAS над базовым полем) инициализируется таблица размера 2. Когда возникает дальнейшая конкуренция, размер таблицы удваивается, пока не станет равным или большим, чем количество ЦП. Таблица имеет значение null, пока не будет использована.

Используйте спин-блокировку (cellsBusy) для инициализации и изменения размера таблицы, а также для заполнения слотов новыми ячейками. В этом месте нет необходимости использовать блокирующие блокировки, если блокировка недоступна, поток может попробовать другие слоты или базовое поле. Во время этих повторных попыток конкуренция возрастает, но снижается локальность, что все же лучше, чем блокировка блокировок.

Поле Thread.probe, поддерживаемое через ThreadLocalRandom, используется как хэш-код для каждого потока. Мы оставляем их неинициализированными (нулевыми) (если они выглядят так), пока они не будут конкурировать в слоте 0. Инициализируется значением, которое обычно не конфликтует с другими значениями, такими как хэш-код потока, после гонки. Сбой операции CAS при выполнении обновления означает, что имеется конфликт или конфликт таблиц, или и то, и другое. В случае конфликта, если размер таблицы меньше емкости, она удвоится, если только другой поток не удерживает блокировку. Если хешированный слот пуст и замок доступен, создается новая ячейка. Если присутствует, то будет предпринята попытка CAS. Повторите попытку двойным хешированием, используя вторичный хэш (алгоритм случайных чисел Marsaglia XorShift), чтобы попытаться найти свободный слот.

Размер таблицы ограничен, потому что, когда потоков больше, чем ЦП, при условии, что каждый поток привязан к ЦП, существует идеальная хеш-функция, которая отображает потоки в слоты, устраняя коллизии. Когда мы достигаем емкости, мы ищем эту карту, случайным образом меняя хэш-коды конфликтующих потоков. Поскольку поиск является случайным, коллизии известны только из-за сбоев CAS, конвергенция может быть медленной, а поскольку потоки обычно не привязаны к ЦП навсегда, они вообще не произойдут. Однако, несмотря на эти ограничения, наблюдаемые показатели конкуренции в этих случаях, как правило, низкие.

Ячейки могут стать недоступными, включая завершение потока, выполняющего хеширование, или поток, выполняющий хеширование неправильно из-за расширения таблицы. Мы не пытаемся обнаруживать или удалять такие ячейки, предполагая, что для долго выполняющихся экземпляров конкуренция будет возникать повторно, поэтому они со временем понадобятся снова; для кратковременных экземпляров нет необходимости тратить время на их уничтожение.


Класс ячейки

Вариант Atomiclong, который поддерживает только необработанный доступ и CAS.

Класс Cell изменен аннотацией @jdk.internal.vm.annotation.Contended. Роль Contended (подробнее см.:JEP 142): Определите способ указать, что одно или несколько полей в объекте, скорее всего, будут сильно конкурировать между ядрами процессора, чтобы виртуальная машина могла организовать, чтобы они не делили строки кэша с другими полями или другими объектами, к которым, вероятно, будет осуществляться независимый доступ. .

Схема

Реализация

Основные методы Striped64 — это longAccumulate и doubleAccumulate. Они похожи. Давайте взглянем на longAccumulate.Что касается кода такого типа, мое личное предложение состоит в том, чтобы понять идею. В конце концов, мы здесь не для того, чтобы изменять JDK. Если вы действительно хотите изменить его или у вас есть аналогичные потребности, вы можете вернуться и рассмотреть его поближе.

    //x  元素
    //fn  更新函数,如果是add可以为null(这个约定避免了longadder中定义额外的变量或者函数)
    //wasUncontended 如果CAS在调用之前失败了,这个值为false
    final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        int h;
        //获取当前线程的probe值,如果为0,则需要初始化该线程的probe值
        if ((h = getProbe()) == 0) {
        	ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            wasUncontended = true;
        }
        boolean collide = false;  // True if last slot nonempty
        done: for (;;) {
            Cell[] cs; Cell c; int n; long v;
            //Cells不为空,进行操作
            if ((cs = cells) != null && (n = cs.length) > 0) {
                //通过(hashCode & (length - 1))这种算法来实现取模 有种看到HashMap代码的感觉
                //如果当前位置为null说明需要初始化
                if ((c = cs[(n - 1) & h]) == null) {
                    //判断锁状态
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(x);   // Optimistically create
                        //再次判断锁状态,同时获取锁
                        if (cellsBusy == 0 && casCellsBusy()) {
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    //创建成功跳出
                                    break done;
                                }
                            } finally {
                                //释放锁
                                cellsBusy = 0;
                            }
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                //运行到此说明cell的对应位置上已经有相应的Cell了,
                //不需要初始化了
                //CAS操作已经失败了,出现了竞争
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                //这里尝试将x值加到a的value上 
                else if (c.cas(v = c.value,
                               (fn == null) ? v + x : fn.applyAsLong(v, x)))
                    //如果尝试成功,跳出循环,方法退出
                    break;
                //cell数组最大为cpu的数量,
                //cells != as表明cells数组已经被更新了 
                //标记为最大状态或者说是过期状态
                else if (n >= NCPU || cells != cs)
                    collide = false;            // At max size or stale
                else if (!collide)
                    collide = true;
                //扩容 当前容量 * 2
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == cs)        // Expand table unless stale
                            cells = Arrays.copyOf(cs, n << 1);
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = advanceProbe(h);
            }
            //尝试获取锁之后扩大Cells
            else if (cellsBusy == 0 && cells == cs && casCellsBusy()) {
                try {                           // Initialize table
                    if (cells == cs) {
                        //初始化cell表,初始容量为2。 
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        break done;
                    }
                } finally {
                    //释放cellsBusy锁
                    cellsBusy = 0;
                }
            }
            //如果创建cell表由于竞争导致失败,尝试将x累加到base上
            // Fall back on using base
            else if (casBase(v = base,
                             (fn == null) ? v + x : fn.applyAsLong(v, x)))
                break done;
        }
    }
    /**
     * CASes the cellsBusy field from 0 to 1 to acquire lock.
     */
    final boolean casCellsBusy() {
        return CELLSBUSY.compareAndSet(this, 0, 1);
    }
    /**
     * CASes the base field.
     */
    final boolean casBase(long cmp, long val) {
        return BASE.compareAndSet(this, cmp, val);
    }

Суть этого абзаца такова:

  • longAccumulate вычислит хеш-значение в соответствии с текущим потоком, а затем возьмет модуль в соответствии с (hashCode & (length - 1)) для определения позиции в массиве ячеек, где поток рассеян.
  • Если массив Cell не был создан, то приобретите блокировку cellBusy (эквивалентную блокировке, но более легкую).Если получение прошло успешно, инициализируйте массив Cell с начальной емкостью 2. После завершения инициализации упакуйте x в ячейку, после того как вычисление хэша распределяется по соответствующему индексу. Если приобретение cellBusy не удастся, он попытается накопить x в базу, а если обновление не удастся, он попытается снова, пока не добьется успеха.
  • Если массив Cell был инициализирован, то он раскидывается по элементу массива Cell по хеш-значению потока, а Cell в этой позиции получается и присваивается переменной a.Если a равно null, это означает, что position не инициализировался.Тогда инициализировать,конечно,нужно конкурировать за переменную cellBusy перед инициализацией.
  • Если размер массива Cell уже самый большой (больше или равен количеству процессоров), то хэш нужно пересчитать, чтобы перераспределить текущий поток на другую позицию Cell и пройти логику метода заново, иначе Массив ячеек необходимо обработать Расширьте емкость, а затем перенесите исходное содержание подсчета. Так как значение счетчика хранится в ячейке, то никакой другой обработки после расширения не требуется.Скопируйте содержимое старого массива ячеек в новый массив ячеек непосредственно по индексу.

LongAdder

Основная идея LongAdder состоит в том, чтобы рассредоточить точки доступа, рассредоточить значения значений в массив, разные потоки будут обращаться к разным слотам в массиве, и каждый поток выполняет CAS-операции только со значением в своем собственном слоте, чтобы горячие точки разбросаны и конфликты гораздо менее вероятны. Если вы хотите получить реальное длинное значение, просто накапливайте значения переменных в каждом слоте и возвращайтесь.

иллюстрировать

Удерживает одну или несколько переменных с начальными значениями, установленными на ноль для суммирования. Когда несколько потоков утверждают для обновления, переменная коллекция динамически растет, чтобы уменьшить конфликт. Наконец, когда требуется сумма или требуется значение длинного типа, окончательная сумма может быть получена путем суммирования текущих переменных и объединения их.

LongAdder работает лучше, чем AtomicLong, в некоторых сценариях с высокой степенью параллелизма, например, когда несколько потоков одновременно обновляют суммирующую переменную, например, при подсчете количества наборов, но его нельзя использовать для точного управления синхронизацией, другими словами, это может есть ошибки (поскольку обновления и чтения выполняются параллельно). Нет никакой разницы в производительности между LongAdder и AtomicLong в сценариях с низким параллелизмом, но когда возникает конкуренция с высоким параллелизмом, этот класс будет иметь лучшую пропускную способность, но соответственно будет занимать значительное место.

LongAdder наследует абстрактный класс Number, но не реализует некоторые методы, такие как equals, hashCode, compareTo, поскольку ожидаемое использование экземпляров LongAdder заключается в внесении некоторых частых изменений, поэтому он не подходит для ключей коллекции.

Реализация

Взгляните, какие методы есть у LongAdder:

Далее в основном анализируются методы приращения и суммирования LongAdder, сначала взгляните на исходный код:

/**
 * Equivalent to {@code add(1)}.
 */
public void increment() {
	add(1L);
}

/**
* Adds the given value.
*
* @param x the value to add
*/
public void add(long x) {
    Cell[] cs; long b, v; int m; Cell c;
    if ((cs = cells) != null || !casBase(b = base, b + x)) {
    	//到了这里 表明cs不为null or 线程有并发冲突,导致caseBase失败
        boolean uncontended = true;
        if (cs == null || // cells 为null
            (m = cs.length - 1) < 0 || // cells 不为null 但只有一个元素
            (c = cs[getProbe() & m]) == null || //哈希取模 对应位置元素为null
            !(uncontended = c.cas(v = c.value, v + x))) //cas 替换失败(并发竞争)
            longAccumulate(x, null, uncontended);
    }
}
/**
* CASes the base field (Striped64类中的方法)
*/
final boolean casBase(long cmp, long val) {
    return BASE.compareAndSet(this, cmp, val);
}

//当在sum的过程中,有可能别的线程正在操作cells(因为没有加锁)
//sum取的值,不一定准确
public long sum() {
     Cell[] cs = cells;
     long sum = base;
     if (cs != null) {
        for (Cell c : cs)
            if (c != null)
                sum += c.value;
    }
    return sum;
}

LongAdder vs AtomicLong Performance

Java 8 Performance Improvements: LongAdder vs AtomicLong

Сравните LongAccumulator

Класс LongAdder можно рассматривать как частный случай LongAccumulator, LongAccumulator предоставляет более мощные и гибкие функции, чем LongAdder.

/**
 * Creates a new instance using the given accumulator function
 * and identity element.
 * @param accumulatorFunction a side-effect-free function of two arguments
 * @param identity identity (initial value) for the accumulator function
*/
public LongAccumulator(LongBinaryOperator accumulatorFunction,
                           long identity) {
    this.function = accumulatorFunction;
    base = this.identity = identity;
}
@FunctionalInterface
public interface LongBinaryOperator {

/**
 * Applies this operator to the given operands.
 *
 * @param left the first operand
 * @param right the second operand
 * @return the operator result
*/
long applyAsLong(long left, long right);
}

В конструкторе accumulatorFunction — это интерфейс бинокулярной операции, который возвращает вычисленное значение в соответствии с двумя входными параметрами, а identity — это начальное значение аккумулятора LongAccumulator.

accumulatorFunction в основном используется в Striped64 longAccumulate.Если fn==null, по умолчанию добавляется, иначе будет вызываться fn.applyAsLong(v, x)

По сравнению с LongAdder, LongAccumulator может предоставить ненулевое начальное значение для аккумулятора, в то время как LongAdder может предоставить только значение по умолчанию, равное 0. Кроме того, LongAccumulator также может указывать правила накопления, такие как накопление или умножение, просто нужно передать пользовательский бинокулярный оператор при построении LongAccumulator, который имеет встроенные правила накопления.

Reference

GitHub.com/jiankun король…

GitHub.com/jiankun король…

wooooooo.brief.com/afraid/9ah7's 5644…

openjdk.java.net/jeps/142

Купил .open JDK.java.net/Piper mail/ тоже…

Личный публичный аккаунт WeChat:

Личный блог CSDN:

blog.csdn.net/jiankunking

Личный блог самородков:

Талант /user/255931…

Персональный гитхаб:

github.com/jiankunking

личный блог:

jiankunking.com