Серия новых функций Java8 (атомарная операция)

Java

题图:by pixel2013 From pixabay

В последнем выпуске мы представили новое время и дату API в Java 8. В этом вопросе мы введем атомные операции в Java 8.LongAdder.

атомарная операция

Согласно определению Википедии:

«Атомарная операция (атомарная операция) не нуждается в синхронизации», — это старая поговорка о многопоточном программировании на Java. Так называемая атомарная операция относится к операции, которая не прерывается механизмом планирования потока; как только такая операция начинается, она выполняется до конца без какого-либо переключения контекста (переключение на другой поток).

AtomicLong

В однопоточной среде используйте Long, если для многопоточной среды, если используете Long, нужно добавитьsynchronizedКлючевые слова, начиная с Java5, JDK предоставляетAtomicLongКласс AtomicLong — это класс Long, который обеспечивает атомарные операции. Сложение и вычитание выполняются потокобезопасным способом. AtomicLong предоставляет атомарные операции для использования Long, поэтому он очень подходит для использования в ситуациях с высоким уровнем параллелизма.

public class AtomicLongFeature {
	private static final int NUM_INC = 1_000_000;

	private static AtomicLong atomicLong = new AtomicLong(0);

	private static void update() {
		atomicLong.set(0);
		ExecutorService executorService = Executors.newFixedThreadPool(5);
		IntStream.range(0, NUM_INC).forEach(i -> {
			Runnable task = () -> atomicLong.updateAndGet(n -> n + 2);
			executorService.submit(task);
		});
		stop(executorService);
		System.out.println(atomicLong.get());
	}

	private static void stop(ExecutorService executorService) {
		try {
			executorService.shutdown();
			executorService.awaitTermination(60, TimeUnit.SECONDS);
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			if (!executorService.isTerminated()) {
				System.out.println("kill tasks");
			}
			executorService.shutdownNow();
		}
	}

	public static void main(String[] args) {
		update();
	}
}

вывод: 2000000

ПочемуAtomicIntegerМожет ли он поддерживать высокий параллелизм? посмотриAtomicLongизupdateAndGetметод:

public final int updateAndGet(IntUnaryOperator updateFunction) {
    int prev, next;
    do {
        prev = get();
        next = updateFunction.applyAsInt(prev);
    } while (!compareAndSet(prev, next));
    return next;
}

public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

Причина в том, что каждый разupdateAndGetбудет вызван, когдаcompareAndSetметод.

AtomicLong — это неблокирующий алгоритм управления параллелизмом, в некоторых случаях очень высокий для параллельных программ, но не подходящий для каждой сцены, для использования в разных сценариях с использованием разных значений класса.

LongAdder

Принцип AtomicLong заключается в том, чтобы полагаться на базовый cas для обеспечения атомарного обновления данных.При добавлении или уменьшении он будет использовать бесконечный цикл для постоянного приведения cas к определенному значению, чтобы достичь цели обновления данных. Так по какому принципу работает LongAdder, есть ли более быстрый способ, чем cas?

public class LongAdderFeature {
	private static final int NUM_INC = 1_000_000;

	private static LongAdder longAdder = new LongAdder();

	private static void update() {
		ExecutorService executorService = Executors.newFixedThreadPool(5);
		IntStream.range(0, NUM_INC).forEach(i -> {
			Runnable task = () -> longAdder.add(2);
			executorService.submit(task);
		});
		stop(executorService);
		System.out.println(longAdder.sum());
	}

	private static void stop(ExecutorService executorService) {
		try {
			executorService.shutdown();
			executorService.awaitTermination(60, TimeUnit.SECONDS);
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			if (!executorService.isTerminated()) {
				System.out.println("kill tasks");
			}
			executorService.shutdownNow();
		}
	}

	public static void main(String[] args) {
		update();
	}
}

вывод: 2000000

Давайте взглянем на метод добавления LongAdder:

public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}

Мы видим класс Cell, для чего этот класс используется?

@sun.misc.Contended static final class Cell {
    volatile long value;
    Cell(long x) { value = x; }
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset
                (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

Мы видим, что внутри класса Cell находится переменная volatile, и единственный способ изменить эту переменную — через cas. Мы можем предположить, что хитрость LongAdder может заключаться в том, чтобы распределять параллелизм одного узла на каждый узел, тем самым повышая эффективность при высоком параллелизме.

На основе AtomicLong LongAdder распределяет давление обновления одной точки на каждый узел.В случае низкого параллелизма прямое обновление базы может гарантировать, что производительность AtomicLong в основном непротиворечива, а в случае высокого параллелизма , его можно улучшить за счет децентрализации производительности.

public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

При подсчете основание и значения в каждом элементе ячейки накладываются друг на друга для получения цели вычисления общего числа. Проблема здесь в том, что если элемент ячейки изменяется во время подсчета, результат подсчета может быть неточным, поэтому недостатком является то, что если LongAdder обновляется одновременно во время статистики, это может привести к ошибкам в статистических данных.

Публичный аккаунт WeChat:На мечеПожалуйста, обратите внимание на мою личную технологию общедоступного аккаунта WeChat, подпишитесь на дополнительный контент