Основы: атомарные компоненты JAVA и синхронные компоненты

Java задняя часть
Основы: атомарные компоненты JAVA и синхронные компоненты

предисловие

При использовании многопоточного параллельного программирования часто приходится изменять общие переменные. На этом этапе мы можем выбрать ConcurrentHashMap, ConcurrentLinkedQueue для безопасного хранения данных. Но если это связано только с изменением состояния и последовательности выполнения потоков, будет лучше использовать атомарные компоненты, начиная с атомарных или компонентов синхронизации, таких как ReentrantLock и CyclicBarrier, Ниже будут представлены их принципы и способы использования один за другим.

  • Принцип реализации атомарных компонентов CAS
  • Использование атомарных компонентов, таких как AtomicBoolean и AtomicIntegerArray,
  • Принцип реализации компонентов синхронизации
  • Использование компонентов синхронизации, таких как ReentrantLock и CyclicBarrier.

Следите за официальной учетной записью, общайтесь друг с другом и ищите в WeChat: Sneak forward

Принцип реализации атомарных компонентов CAS

Сценарии применения

  • Его можно использовать для реализации атомарной операции с переменными и состояниями в многопоточности.
  • Может использоваться для реализации блокировок синхронизации (ReentrantLock)

Атомные компоненты

  • Атомарная работа атомарных компонентов достигается за счет использования cas для вращения изменчивых переменных.
  • Переменная типа volatile гарантирует, что при изменении переменной другие потоки смогут увидеть последнее значение.
  • cas гарантирует, что операция изменения значения является атомарной и не будет прервана

атомарный класс базового типа

AtomicBoolean //布尔类型
AtomicInteger //正整型数类型
AtomicLong	  //长整型类型
  • Пример использования
public static void main(String[] args) throws Exception {
    AtomicBoolean atomicBoolean = new AtomicBoolean(false);
    //异步线程修改atomicBoolean
    CompletableFuture<Void> future = CompletableFuture.runAsync(() ->{
        try {
            Thread.sleep(1000); //保证异步线程是在主线程之后修改atomicBoolean为false
            atomicBoolean.set(false);
        }catch (Exception e){
            throw new RuntimeException(e);
        }
    });
    atomicBoolean.set(true);
    future.join();
    System.out.println("boolean value is:"+atomicBoolean.get());
}
---------------输出结果------------------
boolean value is:false

атомарный класс ссылочного класса

AtomicReference
//加时间戳版本的引用类原子类
AtomicStampedReference
//相当于AtomicStampedReference,AtomicMarkableReference关心的是
//变量是否还是原来变量,中间被修改过也无所谓
AtomicMarkableReference
  • Исходный код AtomicReference выглядит следующим образом, который определяетvolatile V value, и используйте VarHandle (конкретный подкласс — FieldInstanceReadWrite) для выполнения атомарных операций, MethodHandles поможет вычислить позицию смещения значения в классе и, наконец, вызовет Unsafe для VarHandle.public final native boolean compareAndSetReference(Object o, long offset, Object expected, Object x)Метод атомарно изменяет свойство объекта
public class AtomicReference<V> implements java.io.Serializable {
    private static final long serialVersionUID = -1848883965231344442L;
    private static final VarHandle VALUE;
    static {
        try {
            MethodHandles.Lookup l = MethodHandles.lookup();
            VALUE = l.findVarHandle(AtomicReference.class, "value", Object.class);
        } catch (ReflectiveOperationException e) {
            throw new ExceptionInInitializerError(e);
        }
    }
    private volatile V value;
    ....

АВА-проблема

  • Поток X собирается изменить значение переменной с A на B, но в этот период поток Y меняет значение переменной с A на C, а затем на A; наконец, поток X обнаруживает, что значение переменной равно A и заменяет его на Б. Но на самом деле А больше не является первоначальным А.
  • Решение состоит в том, чтобы сделать переменную уникальным типом. К значению можно добавить номер версии или отметку времени. Если номер версии добавляется, модификация потока Y становится A1-> B2-> A3. В это время, если поток X снова обновляется, можно судить, что A1 не равен A3.
  • Реализация AtomicStampedReference и AtomicReference аналогичны, но изменены их атомарные переменные.volatile Pair<V> pair;, а Pair — его внутренний класс. AtomicStampedReference можно использовать для решения задач ABA
public class AtomicStampedReference<V> {
    private static class Pair<T> {
        final T reference;
        final int stamp;
        private Pair(T reference, int stamp) {
            this.reference = reference;
            this.stamp = stamp;
        }
        static <T> Pair<T> of(T reference, int stamp) {
            return new Pair<T>(reference, stamp);
        }
    }
    private volatile Pair<V> pair;
  • Если нам все равно, была ли переменная изменена в промежуточном процессе, а важно только, является ли текущая переменная исходной, мы можем использовать AtomicMarkableReference
  • Пример использования AtomicStampedReference
public class Main {
    public static void main(String[] args) throws Exception {
        Test old = new Test("hello"), newTest = new Test("world");
        AtomicStampedReference<Test> reference = new AtomicStampedReference<>(old, 1);
        reference.compareAndSet(old, newTest,1,2);
        System.out.println("对象:"+reference.getReference().name+";版本号:"+reference.getStamp());
    }
}
class Test{
    Test(String name){ this.name = name; }
    public String name;
}
---------------输出结果------------------
对象:world;版本号:2

Массив атомарного класса

AtomicIntegerArray	 //整型数组
AtomicLongArray		 //长整型数组
AtomicReferenceArray	//引用类型数组
  • Атомарный класс массива инициализирует окончательный массив, который обрабатывает весь массив как объект, затем вычисляет смещение элемента в соответствии с индексом нижнего индекса, а затем вызывает UNSAFE.compareAndSetReference для выполнения атомарных операций. Массив не модифицируется volatile, чтобы обеспечить видимость типов элементов в разных потоках, для получения элементов используется UNSAFE.public native Object getReferenceVolatile(Object o, long offset)метод для получения значения элемента в реальном времени
  • Пример использования
//元素默认初始化为0
AtomicIntegerArray array = new AtomicIntegerArray(2);
// 下标为0的元素,期待值是0,更新值是1
array.compareAndSet(0,0,1);
System.out.println(array.get(0));
---------------输出结果------------------
1

Класс атомарного обновления атрибутов

AtomicIntegerFieldUpdater 
AtomicLongFieldUpdater
AtomicReferenceFieldUpdater
  • Если объект операции является атрибутом определенного типа, его можно обновить атомарно с помощью AtomicIntegerFieldUpdater, но атрибут класса нужно определить как volatile модифицированную переменную, чтобы обеспечить видимость атрибута в каждом потоке, иначе будет ошибка. сообщать
  • Пример использования
public class Main {
    public static void main(String[] args) {
        AtomicReferenceFieldUpdater<Test,String> fieldUpdater = AtomicReferenceFieldUpdater.newUpdater(Test.class,String.class,"name");
        Test test = new Test("hello world");
        fieldUpdater.compareAndSet(test,"hello world","siting");
        System.out.println(fieldUpdater.get(test));
        System.out.println(test.name);
    }
}
class Test{
    Test(String name){ this.name = name; }
    public volatile String name;
}
---------------输出结果------------------
siting
siting

аккумулятор

Striped64
LongAccumulator
LongAdder
//accumulatorFunction:运算规则,identity:初始值
public LongAccumulator(LongBinaryOperator accumulatorFunction,long identity)
  • И LongAccumulator, и LongAdder унаследованы от Striped64.Основная идея Striped64 заключается в том, что он похож на ConcurrentHashMap, сегментированный расчет, когда параллельная производительность вычисления одной переменной низкая, мы можем распределить математическую операцию по нескольким переменным, а когда общее значение необходимо рассчитать, а затем сложить
  • LongAdder эквивалентен специальной реализации LongAccumulator.
  • Пример LongAccumulator
public static void main(String[] args) throws Exception {
    LongAccumulator accumulator = new LongAccumulator(Long::sum, 0);
    for(int i=0;i<100000;i++){
        CompletableFuture.runAsync(() -> accumulator.accumulate(1));
    }
    Thread.sleep(1000); //等待全部CompletableFuture线程执行完成,再获取
    System.out.println(accumulator.get());
}
---------------输出结果------------------
100000

Принцип реализации компонентов синхронизации

  • Большинство компонентов синхронизации в java поддерживают внутреннее значение состояния.Как и атомарные компоненты, изменение значения состояния обычно реализуется через cas. Работа по обслуживанию модификации состояния осуществляется Дугом Ли, абстрагирующим AbstractQueuedSynchronizer (AQS)
  • Принцип AQS можно увидеть в статье, написанной ранее:Подробный принцип разблокировки, синхронизированная, базовая реализация volatile+cas

Компонент синхронизации

ReentrantLock, ReentrantReadWriteLock

  • ReentrantLock и ReentrantReadWriteLock реализованы на основе AQS (AbstractQueuedSynchronizer). Поскольку они имеют различие между справедливыми и нечестными блокировками, они не наследуют AQS напрямую, а используют для наследования внутренние классы.Добросовестные и нечестные блокировки реализуют AQS соответственно, а ReentrantLock и ReentrantReadWriteLock используют внутренние классы для достижения синхронизации.
  • Пример использования ReentrantLock
ReentrantLock lock = new ReentrantLock();
if(lock.tryLock()){
    //业务逻辑
    lock.unlock();
}
  • Пример использования ReentrantReadWriteLock
public static void main(String[] args) throws Exception {
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    if(lock.readLock().tryLock()){ //读锁
        //业务逻辑
        lock.readLock().unlock();
    }
    if(lock.writeLock().tryLock()){ //写锁
        //业务逻辑
        lock.writeLock().unlock();
    }
}

Принцип реализации семафора и сценарии использования

  • Semaphore, как и ReentrantLock, также имеет стратегию честной и нечестной конкуренции за блокировки, а также реализует синхронизацию, наследуя AQS от внутренних классов.
  • Популярное объяснение: предположим, что есть колодец и есть не более трех мест для забора воды. Каждый раз, когда человек приносит воду, ему нужно занять позицию. Когда все три позиции заняты и четвертому человеку нужно набрать воду, он должен дождаться, пока одна из первых трех покинет уровень для забора воды, прежде чем продолжить занимать позицию для забора воды.
  • Пример использования
public static void main(String[] args) throws Exception {
    Semaphore semaphore = new Semaphore(2);
    for (int i = 0; i < 3; i++)
        CompletableFuture.runAsync(() -> {
            try {
                System.out.println(Thread.currentThread().toString() + " start ");
                if(semaphore.tryAcquire(1)){
                    Thread.sleep(1000);
                    semaphore.release(1);
                    System.out.println(Thread.currentThread().toString() + " 无阻塞结束 ");
                }else {
                    System.out.println(Thread.currentThread().toString() + " 被阻塞结束 ");
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    //保证CompletableFuture 线程被执行,主线程再结束
    Thread.sleep(2000);
}
---------------输出结果------------------
Thread[ForkJoinPool.commonPool-worker-19,5,main] start 
Thread[ForkJoinPool.commonPool-worker-5,5,main] start 
Thread[ForkJoinPool.commonPool-worker-23,5,main] start 
Thread[ForkJoinPool.commonPool-worker-23,5,main] 被阻塞结束 
Thread[ForkJoinPool.commonPool-worker-5,5,main] 无阻塞结束 
Thread[ForkJoinPool.commonPool-worker-19,5,main] 无阻塞结束 
  • Видно, что три потока, поскольку семафор установлен на 2, третий поток не может успешно получить информацию и напечатает конец блокировки

Принцип реализации CountDownLatch и сценарии использования

  • CountDownLatch также является синхронной операцией, реализованной AQS.
  • Популярное объяснение: во время игры, если основная задача требует выполнения пяти небольших задач, основная задача может быть продолжена. В это время вы можете использовать CountDownLatch, задача основной линии блокируется и ждет, каждый раз, когда выполняется небольшая задача, выполняется подсчет выполненных работ, и основная линия запускается до тех пор, пока не будут выполнены все пять небольших задач.
  • Пример использования
public static void main(String[] args) throws Exception {
    CountDownLatch count = new CountDownLatch(2);
    for (int i = 0; i < 2; i++)
        CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(1000);
                System.out.println(" CompletableFuture over ");
                count.countDown();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    //等待CompletableFuture线程的完成
    count.await();
    System.out.println(" main over ");
}
---------------输出结果------------------
 CompletableFuture over 
 CompletableFuture over 
 main over 

Принцип реализации CyclicBarrier и сценарии использования

  • CyclicBarrier опирается наReentrantLock lockиCondition tripсвойства для синхронизации
  • Популярное объяснение: CyclicBarrier необходимо заблокировать все потоки в состоянии ожидания, а затем все потоки пробуждаются для выполнения. Представьте себе, что есть перила, которые останавливают пять овец.Когда пять овец встанут вместе на перила, перила поднимутся.В это время все овцы могут вылететь из овчарни.
  • Пример использования
public static void main(String[] args) throws Exception {
    CyclicBarrier barrier = new CyclicBarrier(2);
    CompletableFuture.runAsync(()->{
        try {
            System.out.println("CompletableFuture run start-"+ Clock.systemUTC().millis());
            barrier.await(); //需要等待main线程也执行到await状态才能继续执行
            System.out.println("CompletableFuture run over-"+ Clock.systemUTC().millis());
        }catch (Exception e){
            throw new RuntimeException(e);
        }
    });
    Thread.sleep(1000);
    //和CompletableFuture线程相互等待
    barrier.await();
    System.out.println("main run over!");
}
---------------输出结果------------------
CompletableFuture run start-1609822588881
main run over!
CompletableFuture run over-1609822589880

StampedLock

  • StampedLock не полагается на AQS, но поддерживает несколько внутренних значений состояния и реализует их с помощью cas.
  • StampedLock имеет три режима: режим записи, режим чтения, режим оптимистичного чтения.
  • Блокировки чтения-записи StampedLock могут быть преобразованы друг в друга.
//获取读锁,自旋获取,返回一个戳值
public long readLock()
//尝试加读锁,不成功返回0
public long tryReadLock()
//解锁
public void unlockRead(long stamp) 
//获取写锁,自旋获取,返回一个戳值
public long writeLock()
//尝试加写锁,不成功返回0
public long tryWriteLock()
//解锁
public void unlockWrite(long stamp)
//尝试乐观读读取一个时间戳,并配合validate方法校验时间戳的有效性
public long tryOptimisticRead()
//验证stamp是否有效
public boolean validate(long stamp)
  • Пример использования
public static void main(String[] args) throws Exception {
    StampedLock stampedLock = new StampedLock();
    long stamp = stampedLock.tryOptimisticRead();
    //判断版本号是否生效
    if (!stampedLock.validate(stamp)) {
        //获取读锁,会空转
        stamp = stampedLock.readLock();
        long writeStamp = stampedLock.tryConvertToWriteLock(stamp);
        if (writeStamp != 0) { //成功转为写锁
            //fixme 业务操作
            stampedLock.unlockWrite(writeStamp);
        } else {
            stampedLock.unlockRead(stamp);
            //尝试获取写读
            stamp = stampedLock.tryWriteLock();
            if (stamp != 0) {
                //fixme 业务操作
                stampedLock.unlockWrite(writeStamp);
            }
        }
    }
}    

Добро пожаловать на ошибку в тексте

Справочная статья