Статья для понимания механизма AQS JUC

Java
Статья для понимания механизма AQS JUC

Чтобы решить проблему атомарности, в Java добавлен механизм блокировки, обеспечивающий видимость и порядок. Интерфейс блокировки и соответствующие классы реализации добавлены в параллельный пакет JDK1.5 для реализации функции блокировки, которая является более гибкой, чем синхронизированная.Разработчики могут выбрать соответствующий класс реализации в соответствии с фактической сценой. В этой статье основное внимание уделяется объяснению сценариев использования его различных производных классов и принципов его внутреннего AQS. Для введения знаний, связанных с параллелизмом и синхронизацией, см. предыдущую статью.Статья для понимания механизма блокировки Java.

Функция блокировки

возвращающийся

Подобно синхронизированным и ReentrantLock являются реентерабельными блокировками, реентерабельность указывает на то, что механизм выделения блокировки основан на выделении потоков, а не на выделении на основе вызовов методов.

В качестве простого примера, когда поток получил блокировку, а затем получает ту же блокировку позже, получение выполняется успешно. Но получение замков и освобождение замков должны быть сопряжены.

Отзывчивое прерывание

Когда поток переходит в состояние блокировки из-за получения блокировки, внешняя сторона может прервать поток, а вызывающая сторона может перехватить прерывание, перехватив InterruptedException.

Время ожидания может быть установлено

При получении блокировки вы можете указать период ожидания, и вы можете судить об успешном получении блокировки по возвращаемому значению.

справедливость

Обеспечивает выбор справедливого блокировки и недобросовестного блокировки (по умолчанию).

  • Справедливые блокировки, потоки будут получать блокировки в том порядке, в котором они делают запросы, не допускается переход в очередь;
  • Несправедливые блокировки допускают вставку в очередь: когда поток получает запрос на блокировку, если блокировка доступна, поток пропускает ожидающий поток в очереди и получает блокировку.

Рассмотрим такую ​​ситуацию: поток A удерживает блокировку, поток B запрашивает блокировку, поэтому поток B приостанавливается; когда поток A снимает блокировку, поток B пробуждается, поэтому он снова пытается получить блокировку; в то же время , поток C также запрашивает получение блокировки, то поток C, скорее всего, получит, использует и освободит блокировку до того, как поток B будет полностью пробужден. Это беспроигрышная ситуация, момент, когда B получает блокировку (которая может быть получена только после пробуждения B), не задерживается, C получает блокировку раньше, и пропускная способность также улучшается. В большинстве случаев производительность нечестных блокировок выше, чем у честных.

Кроме того, эта справедливость предназначена для потоков, на которые нельзя полагаться для достижения бизнес-справедливости, она должна контролироваться самими разработчиками, например, обеспечивать публикацию через очереди FIFO.

Блокировка чтения-записи

Блокировки чтения и блокировки записи могут быть разделены, а блокировки чтения и блокировки записи являются взаимоисключающими, но несколько блокировок чтения могут сосуществовать, что подходит для сценариев, когда частота чтения намного превышает частоту записи.

Богатый API

Предоставляет несколько методов для получения информации, связанной с блокировкой, что может помочь разработчикам отслеживать и устранять проблемы.

isFair() //Определяем, является ли блокировка честной блокировкой isLocked() //Определяем, была ли блокировка получена каким-либо потоком isHeldByCurrentThread() //Определяем, получена ли блокировка текущим потоком hasQueuedThreads() //Определяем, есть ли потоки, ожидающие блокировки getHoldCount() // Запрашиваем, сколько раз текущий поток удерживал блокировки getQueueLength() // Получить количество потоков, ожидающих этой блокировки

использование замков

ReentrantLock

Реализация эксклюзивной блокировки имеет все перечисленные выше функции, кроме блокировки чтения-записи, и ее относительно просто использовать.

class X {
   // 创建独占锁实例
   private final ReentrantLock lock = new ReentrantLock();
   // ...

   public void m() {
     lock.lock();  // block until condition holds
     try {
       // ... method body
     } finally {
       // 必须要释放锁,unlock与lock成对出现
       lock.unlock()
     }
   }
 }

ReentrantReadWriteLock

Реализация блокировки чтения-записи со всеми перечисленными выше функциями. И блокировки записи могут быть понижены до блокировки чтения, но не наоборот.

class CachedData {
   Object data;
   volatile boolean cacheValid;
   final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

   void processCachedData() {
     rwl.readLock().lock();
     if (!cacheValid) {
       // Must release read lock before acquiring write lock
       rwl.readLock().unlock();
       rwl.writeLock().lock();
       try {
         // Recheck state because another thread might have
         // acquired write lock and changed state before we did.
         if (!cacheValid) {
           data = ...
           cacheValid = true;
         }
         // Downgrade by acquiring read lock before releasing write lock
         rwl.readLock().lock();
       } finally {
         rwl.writeLock().unlock(); // Unlock write, still hold read
       }
     }

     try {
       use(data);
     } finally {
       rwl.readLock().unlock();
     }
   }
 }

StampedLock

StampedLock также является блокировкой чтения-записи, которая обеспечивает два режима чтения: оптимистическое чтение и пессимистическое чтение. Оптимистическое чтение позволяет получить блокировку записи в процессе чтения! Таким образом, данные, которые мы читаем, могут быть несогласованными, поэтому требуется небольшой дополнительный код, чтобы определить, есть ли запись во время процесса чтения.

Оптимистическая блокировка означает оптимистическую оценку того, что существует высокая вероятность того, что в процессе чтения не будет производиться запись, поэтому она называется оптимистической блокировкой. С другой стороны, пессимистичные блокировки отказываются от записи в процессе чтения, то есть запись должна ждать. Очевидно, что эффективность параллелизма оптимистической блокировки выше, но как только есть небольшая вероятность записи, которая приводит к несогласованному чтению данных, ее необходимо обнаружить и прочитать снова.

public class Point {
    private final StampedLock stampedLock = new StampedLock();

    private double x;
    private double y;

    public void move(double deltaX, double deltaY) {
        long stamp = stampedLock.writeLock(); // 获取写锁
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            stampedLock.unlockWrite(stamp); // 释放写锁
        }
    }

    public double distanceFromOrigin() {
        long stamp = stampedLock.tryOptimisticRead(); // 获得一个乐观读锁
        // 注意下面两行代码不是原子操作
        // 假设x,y = (100,200)
        double currentX = x;
        // 此处已读取到x=100,但x,y可能被写线程修改为(300,400)
        double currentY = y;
        // 此处已读取到y,如果没有写入,读取是正确的(100,200)
        // 如果有写入,读取是错误的(100,400)
        if (!stampedLock.validate(stamp)) { // 检查乐观读锁后是否有其他写锁发生
            stamp = stampedLock.readLock(); // 获取一个悲观读锁
            try {
                currentX = x;
                currentY = y;
            } finally {
                stampedLock.unlockRead(stamp); // 释放悲观读锁
            }
        }
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }
}

Condition

Условие становится очередью условий или переменной условия, предоставляя одному потоку возможность приостановить выполнение (подождать), пока другой поток не уведомит, что некоторое условие состояния теперь может быть истинным. Поскольку доступ к этой общей информации о состоянии происходит в разных потоках, он должен быть защищен мьютексом.

Метод ожидания: он должен вызываться после получения блокировки, указывая на то, что текущая блокировка освобождена и текущий поток заблокирован; ожидая, пока другие потоки вызовут метод signal или signalAll блокировки, поток просыпается, чтобы снова получить блокировку .

Блокировка работает с условием для достижения того же эффекта, что и синхронизация, и объекты (ожидание, уведомление) для связи между потоками на основе общих переменных. Но преимущество в том, что одна и та же блокировка может иметь несколько очередей условий.Когда определенное условие выполняется, нужно только разбудить соответствующую очередь условий, чтобы избежать недопустимой конкуренции.

// 此类实现类似阻塞队列(ArrayBlockingQueue)
class BoundedBuffer {
 final Lock lock = new ReentrantLock();
 final Condition notFull  = lock.newCondition(); 
 final Condition notEmpty = lock.newCondition(); 

 final Object[] items = new Object[100];
 int putptr, takeptr, count;

 public void put(Object x) throws InterruptedException {
   lock.lock();
   try {
     while (count == items.length)
       notFull.await();
     items[putptr] = x;
     if (++putptr == items.length) putptr = 0;
     ++count;
     notEmpty.signal();
   } finally {
     lock.unlock();
   }
 }

 public Object take() throws InterruptedException {
   lock.lock();
   try {
     while (count == 0)
       notEmpty.await();
     Object x = items[takeptr];
     if (++takeptr == items.length) takeptr = 0;
     --count;
     notFull.signal();
     return x;
   } finally {
     lock.unlock();
   }
 }
}

BlockingQueue

Очередь блокировки BlockingQueue на самом деле является моделью производитель/потребитель: когда длина очереди больше заданного максимального значения, производственный поток будет заблокирован, в противном случае, когда элемент очереди пуст, поток-потребитель будет заблокирован, в то же время время, когда потребление будет успешным, он разбудит заблокированный поток производителя, если производство будет успешным, он разбудит поток-потребитель;

Внутреннее использование реализовано с помощью ReentrantLock + Condition, вы можете обратиться к приведенному выше примеру.

CountDownLatch

Назовите это блокировкой таймера обратного отсчета, инициализируйте указанное значение и вызовите countDown, чтобы уменьшить значение на 1. Когда значение уменьшится до 0, он разбудит все потоки, заблокированные вызовом метода await.

Это может привести к тому, что одна группа потоков ожидает завершения задачи другой группой потоков.

class Driver { // ...
   void main() throws InterruptedException {
     CountDownLatch startSignal = new CountDownLatch(1);
     CountDownLatch doneSignal = new CountDownLatch(N);

     for (int i = 0; i < N; ++i) // create and start threads
       new Thread(new Worker(startSignal, doneSignal)).start();

     doSomethingElse();            // don't let run yet
     startSignal.countDown();      // let all threads proceed
     doSomethingElse();
     doneSignal.await();           // wait for all to finish
   }
}

class Worker implements Runnable {
   private final CountDownLatch startSignal;
   private final CountDownLatch doneSignal;
   Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
     this.startSignal = startSignal;
     this.doneSignal = doneSignal;
   }
   public void run() {
     try {
       startSignal.await();
       doWork();
       doneSignal.countDown();
     } catch (InterruptedException ex) {} // return;
   }

   void doWork() { ... }
}

CyclicBarrier

Называемый барьером синхронизации, он заставляет группу потоков ожидать друг друга, пока не будет достигнута некоторая общая точка барьера.

Инициализируйте указанное значение и вызовите метод await, чтобы заблокировать поток. Когда указанное количество потоков вызовет метод await, все заблокированные потоки будут разбужены и продолжат выполняться.

Разница с CountDownLatch заключается в том, что CountDownLatch — это группа потоков, ожидающих другую группу потоков, а CyclicBarrier — это группа потоков, ожидающих друг друга.

Semaphore

Называемый семафором, он похож на использование блокировки мьютекса ReentrantLock, за исключением того, что семафор совместно использует несколько ресурсов, позволяя нескольким потокам успешно конкурировать в одно и то же время.

Принцип AQS

AQS этоAbstractQueuedSynchronizerАббревиатура , китайский абстрактный синхронизатор очереди, является базовой реализацией для создания различных блокировок и синхронизаторов. Внутренне поддерживаемое состояние общих переменных (тип int) и двусторонняя очередь (включая указатель начала и указатель конца)

Решение проблем параллелизма

атомарность

Unsafe.compareAndSwapXXX реализует CAS для изменения состояния и указателя очереди Внутренне полагается на атомарные инструкции, предоставляемые ЦП

Видимость и порядок

volatile изменяет указатели состояния и очереди (prev/next/head/tail)

Блокировка потока и пробуждение

Unsafe.park Unsafe.parkNanos Unsafe.unpark

Класс Unsafe находится в пакете sun.misc и не является частью стандарта Java. Он предоставляет такие функции, как управление памятью, создание экземпляров объектов, операции с массивами, операции CAS, приостановка и восстановление потоков и т. д. Класс Unsafe повышает эффективность операций Java и расширяет базовые операционные возможности языка Java. Многие библиотеки базовых классов Java, в том числе некоторые широко используемые высокопроизводительные библиотеки разработки, разработаны на основе классов Unsafe, таких как Netty, Cassandra, Hadoop, Kafka и т. д.

Внутри AQS есть два режима: эксклюзивный режим и общий режим.

Дизайн AQS основан на шаблонных методах, и пользователям необходимо наследовать AQS и переопределять указанные методы. Различные пользовательские синхронизаторы конкурируют за общие ресурсы по-разному, например, повторный вход, справедливость и т. д. Все они реализуются подклассами.

При реализации пользовательского синхронизатора вам нужно только реализовать методы получения и освобождения состояния общего ресурса.Что касается обслуживания конкретного потока, ожидающего очереди (например, невозможность получения ресурсов для входа в очередь/пробуждение и удаление из очереди и т. д.), он обрабатывается внутри AQS.

эксклюзивный режим

  • Только один поток может получить блокировку
  • После снятия блокировки необходимо разбудить узел-преемник.

Эксклюзивные методы, связанные с режимом, предоставляемые AQS

// 获取独占锁(线程阻塞直至获取成功)
public final void acquire(int)
// 获取独占锁,可被中断
public final void acquireInterruptibly(int) 
// 获取独占锁,可被中断 和 指定超时时间
public final boolean tryAcquireNanos(int, long) 
// 释放独占锁(释放锁后,将等待队列中第一个等待节点唤醒 )
public final boolean release(int) 

Методы, связанные с монопольным режимом, которые должны быть реализованы в подклассах AQS.

// 尝试获取独占锁
protected boolean tryAcquire(int)
// 尝试释放独占锁
protected boolean tryRelease(int)

Процесс получения эксклюзивного замка

  • Вызовите подкласс tryAcquire, чтобы попытаться получить блокировку, если получение будет успешным, он вернется напрямую.
  • Инкапсулировать текущий поток как узел в конец очереди с помощью спинового CAS.
  • Круговое ожидание или попытка получить блокировку с помощью tryAcquire.
    • Если установлено, что передний узел является головным, попытайтесь получить блокировку.
    • В зависимости от состояния узлов в очереди решить, блокировать ли текущий поток
    • После того, как tryAcquire успешно захватит блокировку, установите для текущего узла значение head и верните
  • Если текущий поток прерван или истекло время ожидания, выполнитеcancelAcquire
    • Установите текущее состояние узла на CANCELED и удалите его из очереди.
    • Если передний узел — это Head, разбудите задний узел.

Процесс снятия эксклюзивной блокировки

общий режим

  • Несколько потоков могут получить блокировку
  • После снятия блокировки необходимо разбудить узел-преемник.
  • После получения блокировки, если еще есть ресурсы для пробуждения последующих общих узлов

Методы, связанные с общим режимом, предоставляемые AQS

// 获取共享锁(线程阻塞直至获取成功)
public final void acquireShared(int) 
// 获取共享锁,可被中断
public final acquireSharedInterruptibly(int) 
// 获取共享锁,可被中断 和 指定超时时间
public final tryAcquireSharedNanos(int, long)  
// 获取共享锁
public final boolean releaseShared(int) 

Общие методы, связанные со схемой, которые необходимо реализовать в подклассах AQS.

// 尝试获取共享锁
protected int tryAcquireShared(int)
// 尝试释放共享锁
protected boolean tryReleaseShared(int) 

Процесс получения общего замка

  • Вызовите подкласс tryAcquireShared, чтобы попытаться получить блокировку, успешно получить ее и вернуться напрямую.
  • Инкапсулировать текущий поток как узел в конец очереди с помощью спинового CAS.
  • Круговое ожидание или попытка получить блокировку с помощью tryAcquireShared
    • Если установлено, что передний узел является головным, попытайтесь получить блокировку.
    • В зависимости от состояния узлов в очереди решить, блокировать ли текущий поток
    • После того, как tryAcquireShared успешно получит блокировку, установите для текущего узла значение head
      • Если есть оставшиеся ресурсы или исходное состояние головного узла — СИГНАЛ/РАСПРОСТРАНЕНИЕ, вызовитеdoReleaseShared
      • Если текущее состояние головного узла — СИГНАЛ, разбудить узел-преемник.
      • Если текущее состояние головного узла равно НУЛЮ, установите состояние головного узла в PROPAGATE.
  • Если текущий поток прерван или истекло время ожидания, выполнитеcancelAcquire
    • Установите текущее состояние узла на CANCELED и удалите его из очереди.
    • Если передний узел — это Head, разбудите задний узел.

Процесс снятия общих блокировок

Ожидание изменения статуса узлов в очереди

Пример ReentrantLock

логика tryAcquire

логика tryRelease