Углубленный анализ того, как работает LongAdder

Java
Углубленный анализ того, как работает LongAdder

LongAdder был добавлен в JUC JDK8. Это потокобезопасный «счетчик» с более высокой производительностью, чем инструменты серии Atomic*.

Методы, которые он предоставляет, в основном включают следующее:

имя метода инструкция
void add(long x) Добавьте x к текущему значению.
void increment() Увеличивает текущее значение на 1.
void decrement() Уменьшает текущее значение на 1.
long sum() Возвращает текущее значение. В частности, обратите внимание, что sum возвращает точное значение при отсутствии одновременных обновлений значения, а sum не гарантирует возврат точного значения при наличии параллелизма. Причина объясняется ниже.
void reset() Сброс значения до 0 можно использовать вместо обновления LongAdder, но этот метод можно использовать только без одновременных обновлений.
long sumThenReset() Получить текущее значение и сбросить значение до 0.

0 Диаграмма классов для LongAdder

LongAdder类图

Сам LongAdder не имеет переменных-членов, и изменением его значения фактически управляет родительский класс Striped64.

Striped64 управляет значением через две переменные-члены, а именно base и ячейки.cells — это массив, элементы которого являются реализацией внутреннего класса Striped64 Cell.Cell очень прост и записывает только одно значение.

Когда нет параллельного доступа к LongAdder, он будет напрямую обновлять значение базы через cas.Когда есть параллельный доступ, он найдет определенную ячейку и изменит значение ячейки.

В конечном итоге значение = база + сумма (ячейки).

1 Покопайтесь в методе add

Функция метода add состоит в том, чтобы добавить текущее значение value к x. На самом деле, после того, как вы поймете, что такое метод сложения, вы, естественно, поймете метод увеличения и метод уменьшения, которые на самом деле являются методом сложения.

/**
 * Equivalent to {@code add(1)}.
 */
public void increment() {
    add(1L);
}

/**
 * Equivalent to {@code add(-1)}.
 */
public void decrement() {
    add(-1L);
}

1.1 Реализация метода добавления

Исходный код выглядит следующим образом:

public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    // cells和base即Striped64维护的实例变量
    if ((as = cells) != null || !casBase(b = base, b + x)) { // #1
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}

В начальном состоянии ячейки==null, поэтому в коде №1 первое условие должно быть ложным.В это время базовое значение будет обновляться CAS через метод casBase, и только при сбое cas будет идти если средний.

Когда это не удастся? когда есть конкуренция. Из этого видно, что при отсутствии конкуренции LongAdder реализует обновление значения путем накопления базового значения.

При наличии конкуренции LongAdder попытается найти одну из ячеек и сохранить общее значение, обновив значение этой ячейки. Процесс выглядит следующим образом:

LongAdder.add

Метод longAccumulate определен в Striped64. Ниже приведена обработка, когда ячейки==null (другой нерелевантный код временно опущен, он может объяснить проблему):

final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
    int h;
    if ((h = getProbe()) == 0) {
        ThreadLocalRandom.current(); // force initialization
        h = getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        if ((as = cells) != null && (n = as.length) > 0) {
            // ... 省略 ...
        }
        // cells == null 时的处理
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {                           // Initialize table
                if (cells == as) {
                    Cell[] rs = new Cell[2];
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}

Обработка при Cells==null была отмечена в комментариях. CellsBusy — это переменная, используемая для управления одновременным изменением ячеек.Первоначальное значение равно 0, а значение CellsBusy обновляется с 0 до 1 с помощью casCellsBusy. Затем создайте новый массив ячеек с начальным размером 2, создайте новую ячейку и используйте значение add в качестве начального значения, затем вычислите индекс ячеек и назначьте ей ячейку. Наконец, обновите ячейки переменной экземпляра и сбросьте CellBusy на 0.

Когда ячейки!=null, обработка выглядит следующим образом:

final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
    int h;
    if ((h = getProbe()) == 0) {
        ThreadLocalRandom.current(); // force initialization
        h = getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        if ((as = cells) != null && (n = as.length) > 0) {
            // 从cells中定位一个Cell,如果是null,就new一个Cell,并将x作为初始值
            if ((a = as[(n - 1) & h]) == null) {
                if (cellsBusy == 0) {       // Try to attach new Cell
                    Cell r = new Cell(x);   // Optimistically create
                    if (cellsBusy == 0 && casCellsBusy()) {
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
                
            // 如果定位到的Cell!=null,尝试通过cas的方式更新这个cell维护的value。
            // 如果更新成功,退出循环
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                break;
            else if (n >= NCPU || cells != as)
                collide = false;            // At max size or stale
            else if (!collide)
                collide = true;
            // 如果依然更新失败,扩容cells为原来的两倍,并将原来cells中的元素复制到新的cells中。
            // PS: 通过cellsBusy控制并发
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    if (cells == as) {      // Expand table unless stale
                        Cell[] rs = new Cell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = advanceProbe(h);
        }
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            // ... 省略 ...
        }
        // 上述过程都失败,尝试更新base的值。
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}

Общий принцип приведенного выше кода следующий:

LongAdder.add原理

1.2 Почему он более эффективен, чем AtomicLong

Сравните с методом getAndAdd класса AtomicLong. Вот исходный код этого метода:

/**
 * Atomically adds the given value to the current value.
 *
 * @param delta the value to add
 * @return the previous value
 */
public final long getAndAdd(long delta) {
    return unsafe.getAndAddLong(this, valueOffset, delta);
}

public final long getAndAddLong(Object var1, long var2, long var4) {
    long var6;
    do {
        var6 = this.getLongVolatile(var1, var2);
    } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));

    return var6;
}

Реализация AtomicLong заключается в обновлении текущего поддерживаемого значения с помощью бесконечного цикла cas. Вполне возможно, что при достаточно большом параллелизме частота отказов cas будет очень высокой. Количество циклов здесь резко возрастает, в результате чего при высокой загрузке процессора.

Анализируя принцип LongAdder.add(int x) выше, LongAdder сначала пытается обновить cas.Если это не удается, он вместо этого обновляет значение через Cell[].Если метод вычисления индекса достаточно хеширован, то в случай большого параллелизма В этом случае вероятность того, что несколько потоков обнаружат одну и ту же ячейку, ниже, что несколько похоже на смысл блокировки сегмента.

Исходя из этого, можно проанализировать еще один момент: в случае небольшого количества параллелизма производительность обоих не сильно отличается.

2 Метод глубокой суммы копаний

Метод суммы используется для возврата текущего значения LongAdder.

2.1 Реализация метода суммы

Реализация метода суммы очень проста, по сути это база + сумма(ячейки). Исходный код выглядит следующим образом:

public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

2.2 Почему значение суммы неточно в случае параллелизма

Из приведенного выше исходного кода видно, что обновление базы и ячеек не ограничено при выполнении суммы. То есть последнее обновление LongAdder происходит не раньше самого последнего чтения.

Во-первых, окончательная возвращенная локальная переменная sum изначально копируется как базовая, и когда она наконец возвращается, вполне вероятно, что база была обновлена, и локальная переменная sum не будет обновлена ​​в это время, что приведет к несогласованности.

Во-вторых, здесь также не гарантируется, что чтение в ячейку будет последним записанным значением.

Следовательно, метод суммы может получить правильные результаты без параллелизма.

3 Углубитесь в метод сброса

Используется для инициализации массивов базы и ячеек, сброс значения до 0.

3.1 Реализация метода сброса

Исходный код выглядит следующим образом:

public void reset() {
    Cell[] as = cells; Cell a;
    base = 0L;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                a.value = 0L;
        }
    }
}

На самом деле массивы base и Cells устанавливаются в 0 один за другим.

3.2 Почему его нельзя использовать в параллельных ситуациях

Поскольку метод сброса не является атомарной операцией, он сначала сбрасывает базу на 0. Если предположить, что ЦП вырезает и выполняет сумму в это время, сумма в это время становится значением после вычитания базы.

Другими словами, в случае большого количества параллелизма выполнение этого метода не гарантирует, что другие потоки увидят результат сброса. Поэтому используйте его с осторожностью.

4 Резюме

В реальной работе эти два инструмента можно использовать по характеристикам LongAdder и AtomicLong. LongAdder можно использовать, когда требуется более высокая производительность при высокой степени параллелизма, а точность значения невысока (например, количество посетителей веб-сайта). AtomicLong можно использовать, когда требуется безопасность потоков, допустима некоторая потеря производительности и требуется высокая точность.


Добро пожаловать в мой публичный аккаунт WeChat

公众号