предисловие
Я полагаю, что большинство разработчиков в той или иной степени видели или писали код параллельного программирования. Параллельные ключевые слова, кроме синхронизированных (если вы не понимаете, перейдите на портал,[Статья длиной 10 000 символов, рекомендуемая коллекция] Об обновлении синхронизированной блокировки, вы должны знать это), и еще одна большая ветвь, Atomic. Если не слышали, сначала прочтите基础篇
, если вы слышали об этом, пожалуйста, прокрутите вниз, чтобы увидеть进阶篇
, глубокий анализ исходного кода.
Задавая вопрос: является ли int потокобезопасным?
Друзья, которые читали статьи по теме Synchronized, должны знать, что это небезопасно, и использовать код, чтобы снова проверить его небезопасность:
public class testInt {
static int number = 0;
public static void main(String[] args) throws Exception {
Runnable runnable = new Runnable() {
@Override
public void run() {
for (int i = 0; i < 100000; i++) {
number = number+1;
}
}
};
Thread t1 = new Thread(runnable);
t1.start();
Thread t2 = new Thread(runnable);
t2.start();
t1.join();
t2.join();
System.out.println("number:" + number);
}
}
результат операции:
В приведенном выше примере мы определяем номер статической переменной с начальным значением 0, а затем создаем и запускаем два потока, каждый из которых выполняет 100 000 операций автоинкремента. Результат 200 000, но мы обнаружили, что окончательный результат меньше 200 000, а значит он не безопасен.
Как упоминалось в предыдущей статье о Synchronized, вы можете добавить ключевое слово Synchronized в код number=number+1 для обеспечения безопасности потоков. Однако он имеет большие накладные расходы на ресурсы, поэтому сегодня мы рассмотрим еще один метод достижения потокобезопасности — Atomic.
Границы атомных основ
Атомарное целое (базовый тип)
Общее введение
Atomic — это общий термин для серии пакетов, предоставляемых jdk.Это большое семейство включает атомарные целые числа (AtomicInteger, AtomicLong, AtomicBoolean), атомарные ссылки (AtomicReference, AtomicStampedReference, AtomicMarkableReference), атомарные массивы (AtomicIntegerArray, AtomicLongArray, AtomicReferenceArray), средства обновления ( AtomicIntegerFieldUpdater, AtomicLongFieldUpdater, AtomicReferenceFieldUpdater).
AtomicInteger
Аналогичные функции есть у AtomicInteger, AtomicBoolean и AtomicLong, мы будем анализировать атомарные классы с AtomicInteger в качестве основного метода.
Давайте взглянем на API и их конкретные функции:
public class testInt {
public static void main(String[] args) {
//定义AtomicInteger类型的变量,值为1
AtomicInteger i = new AtomicInteger(1);
//incrementAndGet方法先新增1再返回,所以打印2,此时i为2
System.out.println(i.incrementAndGet());
//getAndIncrement方法先返回值再新增1,所以打印2,此时i为3
System.out.println(i.getAndIncrement());
//get方法返回当前i值,所以打印3,此时i为3
System.out.println(i.get());
//参数为正数即新增,getAndAdd方法先返回值再新增666,所以打印3,此时i为669
System.out.println(i.getAndAdd(666));
//参数为负数即减去,getAndAdd方法先返回值再减去1,所以打印669,此时i为668
System.out.println(i.getAndAdd(-1));
//参数为正数即新增,addAndGet方法先新增666再返回值,所以打印1334,此时i为1334
System.out.println(i.addAndGet(666));
//参数为负数即减去,addAndGet方法先减去-1再返回值,所以打印1333,此时i为1333
System.out.println(i.addAndGet(-1));
//getAndUpdate方法IntUnaryOperator参数是一个箭头函数,后面可以写任何操作,所以打印1333,此时i为13331
System.out.println(i.getAndUpdate(x -> (x * 10 + 1)));
//最终打印i为13331
System.out.println(i.get());
}
}
Результаты:
Улучшения приведенного выше примера типа int
public class testInt {
//1.定义初始值为0的AtomicInteger类型变量number
static AtomicInteger number = new AtomicInteger(0);
public static void main(String[] args) throws Exception {
Runnable runnable = new Runnable() {
@Override
public void run() {
for (int i = 0; i < 100000; i++) {
//2.调用incrementAndGet方法,实现加1操作
number.incrementAndGet();
}
}
};
Thread t1 = new Thread(runnable);
t1.start();
Thread t2 = new Thread(runnable);
t2.start();
t1.join();
t2.join();
System.out.println("number:" + number.get());
}
}
Мы видим, что рабочий результат правильный 200 000, что указывает на то, что AtomicInteger действительно гарантирует потокобезопасность, то есть в процессе многопоточности рабочий результат все еще правильный.但是这存在一个ABA问题,下面将原子引用的时候再说,先立个flag。
Анализ исходного кода
Давайте возьмем метод incrementAndGet в качестве примера, чтобы увидеть, как реализован нижний слой.Метод incrementAndGet в классе AtomicInteger вызывает метод getAndAddInt класса Unsafe.
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
Давайте посмотрим на метод getAndAddInt, в нем есть цикл, прямое значение compareAndSwapInt возвращает true, и цикл завершается. Здесь мы должны упомянуть CAS, который является решением проблемы безопасности многопоточности.
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
CAS
Коллеги потока 1 и потока 2 получили значение переменной основной памяти 0, поток 1 добавляет 1 и записывает в основную память, теперь значение переменной основной памяти равно 1, а поток 2 также добавляет 2 в основную память и пытается для записи в основную память.В настоящее время основная память не может быть записана в.Память, поскольку она перезапишет операцию потока 1, конкретный процесс выглядит следующим образом.
CAS — это когда поток 2 пытается записать в память, он путем сравнения и установки (CompareAndSet) обнаруживает, что текущее значение основной памяти равно 1, которое отличается от значения 0, которое он только что прочитал, поэтому он откажется от этой модификации и re-read Возьмите последнее значение основной памяти, затем повторите конкретную логическую операцию потока 2 и повторите попытку записи в основную память. Если в это время поток 1 снова изменит основную память, поток 2 обнаружит, что значение основной памяти не совпадает с ожидаемым, поэтому он откажется от модификации, снова прочитает последнее значение основной памяти, повторит попытку и повторит попытку. для записи основной памяти. Мы видим, что это процесс повторных сравнений, то есть пока ожидаемое начальное значение не будет одинаковым, он не будет писать в основную память, иначе он будет продолжать читать и повторять цикл. В этом суть цикла for выше.
CAS的实现实际上利用了CPU指令来实现的,如果操作系统不支持CAS,还是会加锁的,如果操作系统支持CAS,则使用原子性的CPU指令。
атомарная ссылка
В повседневном использовании мы не только выполняем атомарные операции над перечисленными выше базовыми типами, но также должны выполнять атомарные операции над некоторыми сложными типами, поэтому требуется AtomicReference.
небезопасная реализация
Сначала взгляните на небезопасный тип BigDecimal:
public class testReference {
static BigDecimal number = BigDecimal.ZERO;
public static void main(String[] args) throws Exception {
Runnable runnable = new Runnable() {
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
number=number.add(BigDecimal.ONE);
}
}
};
Thread t1 = new Thread(runnable);
t1.start();
Thread t2 = new Thread(runnable);
t2.start();
t1.join();
t2.join();
System.out.println(number);
}
}
Текущий результат показан на рисунке ниже.Мы видим, что два потока, самозацикливание 1000 раз плюс 1 операция, конечный результат должен быть 2000, но результат меньше 2000.
Реализация безопасности — использование CAS
public class testReference {
//定义AtomicReference类型BigDecimal变量
static AtomicReference<BigDecimal> number = new AtomicReference<BigDecimal>(BigDecimal.ZERO);
public static void main(String[] args) throws Exception {
Runnable runnable = new Runnable() {
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
//手动写循环+CAS判断
while(true){
BigDecimal pre=number.get();
BigDecimal next=number.get().add(BigDecimal.ONE);
if(number.compareAndSet(pre,next)) {
break;
}
}
}
}
};
Thread t1 = new Thread(runnable);
t1.start();
Thread t2 = new Thread(runnable);
t2.start();
t1.join();
t2.join();
System.out.println(number.get());
}
}
Результаты приведены ниже:
Проблемы АБА и решения
В приведенном выше процессе CAS сравнение значений используется, чтобы узнать, может ли обновление быть успешным.Если поток 1 сначала добавляет 1, а затем вычитает 1, основная память по-прежнему остается исходным значением, то есть поток 2 все еще может быть успешно обновлен. . Но эта логика错了
, поток 1 был изменен, а поток 2 не может успешно обновиться напрямую.
Код:
public class testInt {
static AtomicInteger number = new AtomicInteger(0);
public static void main(String[] args) throws Exception {
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
int a = number.get();
System.out.println("开始number:" + a);
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(number.compareAndSet(a, a++));
}
});
t1.start();
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("开始增加操作");
int a = number.incrementAndGet();
System.out.println("当前number:" + a);
int b = number.decrementAndGet();
System.out.println("当前number:" + b);
}
});
t2.start();
t1.join();
t2.join();
}
}
Мы видим, что поток 2 выполнил над ним ряд операций, но окончательный вывод по-прежнему верен, что указывает на то, что обновление может быть успешным. Это явно неправильно.
Затем мы можем использовать AtomicStampedReference, чтобы добавить к нему номер версии. Когда поток 1 впервые начинает читать основную память, он получает значение 0, а версия равна 1. Поток 2 также получает эти два значения.Когда поток 1 выполняет операцию добавления 1 и вычитания 1, версия увеличивается на 1. , Теперь значение основной памяти равно 0, версия равна 2, а поток 2 все еще пытается записать в основную память данные с ожидаемым значением 0 и версией 1. В настоящее время обновление не выполняется из-за разных версий. . В частности, давайте попробуем код:
public class testInt {
static AtomicStampedReference<Integer> number = new AtomicStampedReference<Integer>(0, 0);
public static void main(String[] args) throws Exception {
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
int a = number.getReference();
int s = number.getStamp();
System.out.println("开始number:" + a + ",stamp:" + s);
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(number.compareAndSet(a, a + 1, s, s + 1));
}
});
t1.start();
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("开始增加操作");
int a = number.getReference();
int s = number.getStamp();
number.compareAndSet(a, a + 1, s, s + 1);
System.out.println("当前number:" + a + ",stamp:" + (s + 1));
a = number.getReference();
s = number.getStamp();
number.compareAndSet(a, a - 1, s, s + 1);
System.out.println("当前number:" + a + ",stamp:" + (s+1));
}
});
t2.start();
t1.join();
t2.join();
}
}
Мы видим, что штамп (номер версии) обновляется для каждой операции, при финальном сравнении сравнивается не только значение, но и номер версии, поэтому обновление не может быть успешным, false.
атомный массив
AtomicIntegerArray, AtomicLongArray и AtomicReferenceArray похожи, поэтому на примере AtomicIntegerArray следующий AtomicIntegerArray можно рассматривать как массив типа AtomicInteger.Нижний слой очень похож, поэтому я не буду его подробно описывать.
AtomicIntegerArray array = new AtomicIntegerArray(10);
array.getAndIncrement(0); // 将第0个元素原子地增加1
AtomicInteger[] array = new AtomicInteger[10];
array[0].getAndIncrement(); // 将第0个元素原子地增加1
Полевые апдейтеры и атомарные аккумуляторы относительно просты, поэтому я не буду здесь о них говорить.
Атомная расширенная разделительная линия главы
Анализ исходного кода LongAdder
Использование LongAdder
LongAdder был добавлен после jdk1.8, так зачем его добавлять? На этот вопрос будет дан ответ ниже, давайте сначала посмотрим, как его использовать.
public class testLongAdder {
public static void main(String[] args) throws Exception {
LongAdder number = new LongAdder();
Runnable runnable = new Runnable() {
@Override
public void run() {
for (int j = 0; j < 10000; j++) {
number.add(1L);
}
}
};
Thread t1 = new Thread(runnable);
Thread t2 = new Thread(runnable);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("number:" + number);
}
}
Мы видим, что LongAdder используется примерно так же, как и AtomicLong: два потока Thread1 и Thread2 используются для увеличения числового значения в 10 000 раз каждый, и окончательное число равно 20 000.
Сравнительное преимущество с Atomic
Вопрос в том, поскольку AtomicLong может выполнять потокобезопасные операции над многопоточными числами, зачем вам LongAdder? Давайте сначала сравним код. Разница в производительности между ними заключается в предположении, что результаты правильные.
public class testLongAdder {
public static void main(String[] args) {
/ //1个线程,进行100万次自增操作
test1(1,1000000);
/ //10个线程,进行100万次自增操作
test1(10,1000000);
/ //100个线程,进行100万次自增操作
test1(100,1000000);
}
static void test1(int threadCount,int times){
long startTime=System.currentTimeMillis();
AtomicLong number1=new AtomicLong();
List<Thread> threads1=new ArrayList<>();
for(int i=0;i<threadCount;i++) {
threads1.add(new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < times; j++) {
number1.incrementAndGet();
}
}
}));
}
threads1.forEach(thread -> thread.start());
threads1.forEach(thread ->{
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
} );
long endTime=System.currentTimeMillis();
System.out.println("AtomicLong:"+number1+",time:"+(endTime-startTime));
LongAdder number2=new LongAdder();
List<Thread> threads2=new ArrayList<>();
for(int i=0;i<threadCount;i++) {
threads2.add(new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < times; j++) {
number2.add(1);
}
}
}));
}
threads2.forEach(thread -> thread.start());
threads2.forEach(thread ->{
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
} );
System.out.println("LongAdder:"+number2+",time:"+(System.currentTimeMillis()-endTime));
}
}
Приведенный выше код сравнивает время, затрачиваемое AtomicLong и LongAdder после 1 потока, 10 потоков и 100 потоков после 100 операций автоинкремента. С помощью оператора печати мы обнаружили, что LongAdder занимает на порядок меньше времени, чем AtomicLong, на основании того, что и число1, и число2 в конечном итоге верны.
Анализ исходного кода
Тогда почему это происходит, мы должны проанализировать это на уровне исходного кода. Почему AtomicLong неэффективен? Потому что, если количество потоков слишком велико, особенно в случае высокого параллелизма, например, есть 100 потоков, которые хотят работать с объектом одновременно, только один поток получит блокировку, а остальные 99 потоков может простаивать, зацикливаясь до тех пор, пока поток не освободит блокировку. Если поток завершает операцию и освобождает блокировку, другие 99 потоков снова конкурируют, только один поток получает блокировку, а остальные 98 потоков все еще бездействуют, пока блокировка не будет снята. Таким образом, операция CAS будет тратить много ресурсов на бездействие, что будет делать AtomicLong все медленнее и медленнее по мере увеличения количества потоков.
AtomicLong заключается в том, что несколько потоков работают с одним и тем же значением, что приводит к слишком большому количеству вращений нескольких потоков и снижению производительности. В случае отсутствия конкуренции LongAdder, как и AtomicLong, работает на той же основе, а при наличии конкуренции принимает метод деления на нули, перехода от пространства к времени, использования ячеек массива и разбиения значения на Ячейки массива. Когда несколько потоков должны работать со значением одновременно, идентификатор потока может быть хеширован для получения хеш-значения, а затем сопоставлен с нижним индексом ячеек массива в соответствии с хэш-значением, а затем выполнять операцию автоинкремента. на значение, соответствующее индексу. Когда все потоки будут выполнены, добавьте все значения ячеек массива и базу неконкурентных значений в качестве конечного результата.
Давайте сначала посмотрим на поля в LongAdder и обнаружим, что в нем нет полей, в основном в классе Stripped64, который он наследует, и есть следующие четыре основные переменные.
/** CPU数量,即cells数组的最大长度*/
static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
*cells数组,为2的幂,2,4,8,16.....,方便以后位运算
*/
transient volatile Cell[] cells;
/**
* 基值,主要用于没有竞争的情况,通过CAS更新。
*/
transient volatile long base;
/**
* 调整单元格大小(扩容),创建单元格时使用的锁。
*/
transient volatile int cellsBusy;
Ниже показано начало метода добавления.
public void add(long x) {
//as:cells数组的引用
//b:base的基础值
//v:期望值
//m:cells数组大小
//a:当前数组命中的单元
Cell[] as; long b, v; int m; Cell a;
//as不为空(cells已经初始化过,说明之前有其他线程对初始化)或者CAS操作不成功(线程间出现竞争)
if ((as = cells) != null || !casBase(b = base, b + x)) {
//初始化uncontented,true表示未竞争(因为有两个情况,这里先初始化,后面对其修改,就能区分这两种情况)
boolean uncontended = true;
//as等于null(cells未初始化)
//或者线程id哈希出来的下标所对应的值为空(cell等于空),getProbe() & m功能是获取下标,底层逻辑是位运算
//或者更新失败为false,即发生竞争,取非就为ture
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
//进入到if里面,说明更新case失败,或者更新某个cell也失败了,或者cell为空,或者cells为空
longAccumulate(x, null, uncontended);
}
}
Вызовите метод longAccumulate Stripped64 из LongAdder, в основном初始化cells
,cells的扩容
,多个线程同时命中一个cell的竞争
работать.
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
//x:add的值,fn:为null,wasUncontended:是否发生竞争,true为发生竞争,false为不发生竞争
int h;//线程的hash值
//如果该线程为0,即第一次进来,所以ThreadLocalRandom强制初始化线程id,再对其hash
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current();
h = getProbe();
wasUncontended = true;
}
//扩容意向,为false肯定不扩容,为true可能扩容
boolean collide = false;
//死循环
for (;;) {
//as:cells数组的引用
//a:当前线程命中的cell
//n:cells的长度
//v:当前线程命中的cell所拥有的value值
Cell[] as; Cell a; int n; long v;
//cells不为空
if ((as = cells) != null && (n = as.length) > 0) {
//当前线程命中的cell为空,下面逻辑是新增cell
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
//发生竞争
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//没有竞争,尝试修改当前线程对应的cell值,成功跳出循环
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
//如果n大于CPU最大数量,不可扩容
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
//获取到了锁,进行扩容,为2的幂,
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];//左移一位运算符,数量加倍
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
//cells等于空,并且获取到锁,开始初始化工作,创建结束释放锁,继续循环
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
Эпилог
Все кончено, посыпать цветами. В этой статье в основном рассказывается о некоторых вариантах использования Atomic, включая атомарные классы Atomic (AtomicInteger, AtomicLong, AtomicBoolean), атомарные ссылки (AtomicReference, AtomicStampedReference) и преимуществах LongAdder после версии 1.8, анализ исходного кода. Процесс также перемежается с введением некоторых проблем CAS, ABA и методами решения.
Если вы считаете, что сочинение в порядке, пожалуйста, поставьте лайк 👍, ваше одобрение является движущей силой для моего письма!
Если вы считаете, что что-то не так, пожалуйста, прокомментируйте.
Хорошо, пока.
просить внимания
использованная литература
Java Multithreading Advanced (17) — атомарная структура JUC: LongAdder
Java 8 Performance Improvements: LongAdder vs AtomicLong
AtomicInteger глубокое понимание
Подробное объяснение класса атомарных операций AtomicInteger