Анализ AQS и реальный бой

Java

предисловие

В интервью некоторое время назад я обнаружил, что интервьюеры задавали вопросы о синхронизаторе AQS. AQS предоставляет базовую структуру почти для всех блокировок и синхронизаторов в Java и является производным сегментов семейства AQS, таких как ReentrantLock, Semaphore и CountDownLatch. Основываясь на нескольких основных моментах принципа AQS, в этой статье рассказывается о понимании AbstractQueuedSynchronizer и реализации пользовательского синхронизатора.

Основные ответы на основные вопросы интервью AQS

  1. поддержание гос.
  2. очередь CLH
  3. уведомление ConditionObject
  4. Шаблон проектирования метода шаблона
  5. Эксклюзивные и общие режимы.
  6. Пользовательский синхронизатор.
  7. Некоторые расширения корзины семейства AQS, такие как: ReentrantLock и т. д.

Структура диаграммы классов AQS

Полное имя AQS — AbstractQueuedSynchronizer, которое представляет собой абстрактную очередь синхронизации. Давайте взглянем на структуру диаграммы классов AQS:

Чтобы облегчить понимание следующих ключевых моментов, мы сначалаОзнакомьтесь со структурой диаграммы классов AQS..

поддержание состояния

在AQS中维持了一个单一的共享状态state,来实现同步器同步。看一下state的相关代码如下:

исходный код состояния

  
    /**
   * The synchronization state.
   */
  private volatile int state;

  /**
   * Returns the current value of synchronization state.
   * This operation has memory semantics of a {@code volatile} read.
   * @return current state value
   */
  protected final int getState() {
      return state;
  }

  /**
   * Sets the value of synchronization state.
   * This operation has memory semantics of a {@code volatile} write.
   * @param newState the new state value
   */
  protected final void setState(int newState) {
      state = newState;
  }

  /**
   * Atomically sets synchronization state to the given updated
   * value if the current state value equals the expected value.
   * This operation has memory semantics of a {@code volatile} read
   * and write.
   *
   * @param expect the expected value
   * @param update the new value
   * @return {@code true} if successful. False return indicates that the actual
   *         value was not equal to the expected value.
   */
  protected final boolean compareAndSetState(int expect, int update) {
      // See below for intrinsics setup to support this
      return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
  }

Несколько ключевых моментов для ответа на дизайн исходного кода состояния:

  • Состояние украшено volatile для обеспечения видимости в нескольких потоках.
  • Методы getState() и setState() доработаны, чтобы запретить подклассам AQS переопределять их.
  • Метод compareAndSetState() использует алгоритм CAS с оптимистичной блокировкой, а также дополнен final, а перезапись подкласса не допускается.

очередь CLH

Когда дело доходит до очереди CLH, давайте взглянем на приведенное выше состояние.Схема AQS:

CLH (замки Крейга, Лэндина и Хагерстена) очередь синхронизациипредставляет собой двунаправленную очередь FIFO,Внутренне через узлы head и tailЗапишите элементы начала и конца очереди, а тип элемента очереди — Node. AQS полагается на это для завершениясостояние синхронизацииЕсли текущему потоку не удается получить состояние синхронизации, AQS создаст состояние ожидания текущего потока и другую информацию в узле (Node), добавит его в очередь синхронизации CLH и одновременно заблокирует текущий поток. состояние синхронизации освобождается, он разбудит первый узел (справедливая блокировка) и заставит его снова попытаться получить состояние синхронизации.

Узел узел

В очереди синхронизации CLH узел представляет поток, который сохраняет ссылку на поток (поток), состояние (waitStatus), предшествующий узел (предыдущий), последующий узел (следующий) и последующий узел (nextWaiter) очереди условий. следующее:

waitStatus несколько состояний состояния:

Давайте взглянем на код для постановки и удаления очереди CLH:

зачислен

Запись в очереди CLHtail указывает на новый узел, prev нового узла указывает на текущий последний узел, а следующий из текущего последнего узла указывает на текущий узел. Метод addWaiter выглядит следующим образом:

//构造Node
private Node addWaiter(Node mode) {
  Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure(快速尝试添加尾节点)
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            //CAS设置尾节点
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //多次尝试
        enq(node);
        return node;
        }

Из приведенного выше кода, если addWaiter не может установить хвостовой узел, вызовите метод enq(Node node), чтобы установить хвостовой узел.Метод enq выглядит следующим образом:

   private Node enq(final Node node) {
        //死循环尝试,知道成功为止
        for (;;) {
            Node t = tail;
            //tail 不存在,设置为首节点
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

исключать из очереди

После того, как поток первого узла освободит состояние синхронизации, он разбудит свой узел-преемник (следующий), и узел-преемник установит себя в качестве первого узла, когда состояние синхронизации будет успешно получено. Вы можете посмотреть на следующие два исходных кода:

  Node h = head;
  if (h != null && h.waitStatus != 0)
  unparkSuccessor(h);
 private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

CLH core несколько пунктов ответа

  • двусвязный список
  • Алгоритм CAS устанавливает хвостовой узел + бесконечное вращение цикла.

Алгоритм CAS, вы можете взглянуть на реализацию имитационного алгоритма CAS для решения проблем параллелизма в моей работе. nuggets.capable/post/684490…

ConditionObject

Введение в ConditionObject

Мы все знаем, что когда синхронизация управляет синхронизацией, она может взаимодействовать сОжидание объекта(), уведомление(), уведомлениеВсе()Ряд методов может реализовать шаблон ожидания/уведомления. А как же Лок? Он предоставляет интерфейс Condition сawait(),signal(),signalAll()и т. д. также могут реализовывать механизм ожидания/уведомления.ConditionObject реализует интерфейс Condition., чтобы обеспечить AQSПоддержка переменной условия.

Эти вещи об очереди условий и очереди CLH

Давайте сначала посмотрим на картинку:

Отношения любви и ненависти между очередью ConditionObject и очередью CLH:

  • Поток, вызывающий метод await(), будет добавлен в очередь ожидания conditionObject и разбудит следующий узел головного узла в очереди CLH.
  • После того как поток вызовет метод singnal() для объекта ConditionObject, firstWaiter в очереди ожидания будет добавлен в очередь CLH AQS, ожидая пробуждения.
  • Когда поток вызывает метод unLock() для снятия блокировки, следующий узел головного узла в очереди CLH (в данном случае firtWaiter) будет разбужен.

разница:

  • Объекты ConditionObject поддерживают отдельныйочередь ожидания, очередь CLH, поддерживаемая AQS,очередь синхронизации, они имеют один и тот же тип узла, оба являются узлами.

Эксклюзивные и общие режимы.

AQS поддерживает два режима синхронизации: эксклюзивный и общий.

Эксклюзивный

Только один поток одновременно поддерживает состояние синхронизации, напримерReentrantLock. Его можно разделить на честный замок и несправедливый замок.

Честный замок:В соответствии с порядком размещения потоков в очереди блокировка по принципу «первым пришел — первым пришел — первым обслужен» является вежливой.

Несправедливая блокировка:Когда поток хочет получить блокировку, он захватывает ее напрямую, независимо от порядка очереди.Это неразумно, и кто бы ни захватил ее, тот и захватил ее.

Acquire(int arg) — это метод исключительного получения состояния синхронизации., давайте посмотрим на исходный код:

  • метод получения (длинный аргумент)
  public final void acquire(long arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
  • метод addWaiter
//构造Node
private Node addWaiter(Node mode) {
  Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure(快速尝试添加尾节点)
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            //CAS设置尾节点
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //多次尝试
        enq(node);
        return node;
        }
  • методAcquireQueued(final Node node, long arg)
 final boolean acquireQueued(final Node node, long arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
  • метод selfInterrupt()
   static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

В сочетании с исходным кодом можно получить блок-схему метода collect(int arg) следующим образом:

общий

Несколько потоков могут выполняться одновременно, например, Semaphore/CountDownLatch и т. д. являются общими продуктами.

AcquireShared(long arg) — это общий метод для получения состояния синхронизации., вы можете посмотреть исходный код:

  public final void acquireShared(long arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

Из приведенного выше сначала вызовите метод tryAcquireShared(int arg), чтобы попытаться получить состояние синхронизации.DOACKQUIRESHARED (INT ARG) Спин, чтобы получить состояние синхронизации, исходный код метода выглядит следующим образом:

 private void doAcquireShared(long arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    long r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

Шаблон проектирования метода шаблона AQS

Шаблон метода шаблона

Шаблон метода шаблона:Определите скелет алгоритма в методе и отложите некоторые шаги до подклассов. Шаблонные методы позволяют подклассам переопределять определенные шаги в алгоритме без изменения структуры алгоритма.

Пример шаблонного метода в жизни:Предположим, мы собираемся путешествовать в Пекин, тогда мы можем воспользоваться высокоскоростной железной дорогой, самолетом или поездом, тогда абстрактный класс, определяющий вид транспорта, может иметь следующий шаблон: купить билеты -> проверка безопасности -> взять xx транспорт -> прибыть в Пекин. Пусть подкласс наследует абстрактный класс и реализует соответствующий метод шаблона.

Вот некоторые из методов шаблона, определенных AQS:

isHeldExclusively()//Монополизирует ли поток ресурсы. Вам нужно реализовать его только в том случае, если вы используете условие.
tryAcquire(int)//Эксклюзивный режим. Попытаться получить ресурс, вернуть true в случае успеха, false в случае неудачи.
tryRelease(int)//Эксклюзивный способ. Попытаться освободить ресурс, вернуть true в случае успеха, false в случае неудачи. tryAcquireShared(int)//Поделиться методом. Попробуйте получить ресурс. Отрицательное число означает сбой, 0 — успех, но доступных ресурсов нет, положительное число — успех и оставшиеся ресурсы.
tryReleaseShared(int)//Поделиться методом. Попытаться освободить ресурс, вернуть true в случае успеха, false в случае неудачи.

Короче говоря, этоAQS предоставляет шаблонные методы, такие как tryAcquire и tryAcquireShared, для реализации настраиваемых синхронизаторов для подклассов..

Пользовательский синхронизатор.

Основываясь на приведенном выше анализе, мы все знаем, чтосостояние, очередь CLH, очередь ConditionObjectВ ожидании этих ключевых моментов, если вы хотите реализовать собственный замок, вам сначала нужно определить, что вы хотите реализоватьЭксклюзивная блокировка или общая блокировка, определение значения состояния атомарной переменной, определение внутреннего класса для наследования AQS и переписывание соответствующего метода шаблона..

Давайте посмотрим на демонстрацию монопольной блокировки без повторного входа на основе AQS из «The Beauty of Java Concurrent Programming»:

public class NonReentrantLock implements Lock,Serializable{

    //内部类,自定义同步器
    static class Sync extends AbstractQueuedSynchronizer {
        //是否锁已经被持有
        public boolean isHeldExclusively() {
            return getState() == 1;
        }
        //如果state为0 则尝试获取锁
        public boolean tryAcquire(int arg) {
            assert arg== 1 ;
            //CAS设置状态,能保证操作的原子性,当前为状态为0,操作成功状态改为1
            if(compareAndSetState(0, 1)){
                //设置当前独占的线程
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        //尝试释放锁,设置state为0
        public boolean tryRelease(int arg) {
            assert arg ==1;
            //如果同步器同步器状态等于0,则抛出监视器非法状态异常
            if(getState() == 0)
                throw new IllegalMonitorStateException();
            //设置独占锁的线程为null
            setExclusiveOwnerThread(null);
            //设置同步状态为0
            setState(0);
            return true;
        }
        //返回Condition,每个Condition都包含了一个Condition队列
        Condition newCondition(){
            return new ConditionObject();
        }
    }
    //创建一个Sync来做具体的工作
    private final Sync sync= new Sync ();

    @Override
    public void lock() {
        sync.acquire(1);
    }

    public boolean isLocked() {
        return sync.isHeldExclusively();
    }
        @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }


    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
    }

NonReentrantLockDemoTest:

public class NonReentrantLockDemoTest {

    private static NonReentrantLock nonReentrantLock = new NonReentrantLock();

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(() -> {
                nonReentrantLock.lock();
                try {
                    System.out.println(Thread.currentThread().getName());
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    nonReentrantLock.unlock();
                }
            });
            thread.start();
        }
    }
}

результат операции:

Реальный боевой ковш семейства AQS

AQS является производным сегментом семейства AQS, таким как ReentrantLock, Semaphore и т. д. Далее вы можете рассмотреть варианты их использования.

ReentrantLock

Описание ReentrantLock

  • ReentrantLock — это реентерабельная блокировка, которая может многократно блокировать общие ресурсы, это класс, реализующий интерфейс Lock.
  • ReentrantLock поддерживает как честные, так и нечестные блокировки.

Дело ReentrantLock

Используйте ReentrantLock, чтобы реализовать простой потокобезопасный список, как показано ниже:

public class ReentrantLockList {
    // 线程不安全的list
    private ArrayList<String> array = new ArrayList<>();
    //独占锁
    private volatile ReentrantLock lock = new ReentrantLock();

    //添加元素
    public  void add(String e){
        lock.lock();
        try {
            array.add(e);
        }finally {
            lock.unlock();
        }
    }

    //删除元素
    public void remove(String e){
        lock.lock();
        try {
            array.remove(e);
        }finally {
            lock.unlock();
        }
    }
    //获取元素
    public String get(int index){
        lock.lock();
        try {
            return array.get(index);
        }finally {
            lock.unlock();
        }
    }
}

Semaphore

Введение в семафор

  • Семафор, также называемый семафором, может использоваться для управления количеством потоков, одновременно обращающихся к ресурсам, и для обеспечения рационального использования ресурсов путем координации каждого потока.

Регистр семафора

Многопоточность в Java имеет от одного до нескольких классических вопросов для собеседования: три потока ABC последовательно выводятся, зацикливаясь 10 раз.

public class ABCSemaphore {

    private static Semaphore A = new Semaphore(1);
    private static Semaphore B = new Semaphore(1);
    private static Semaphore C = new Semaphore(1);


    static class ThreadA extends Thread {

        @Override
        public void run() {
            try {
                for (int i = 0; i < 10; i++) {
                    A.acquire();
                    System.out.print("A");
                    B.release();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

    static class ThreadB extends Thread {

        @Override
        public void run() {
            try {
                for (int i = 0; i < 10; i++) {
                    B.acquire();
                    System.out.print("B");
                    C.release();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

    static class ThreadC extends Thread {

        @Override
        public void run() {
            try {
                for (int i = 0; i < 10; i++) {
                    C.acquire();
                    System.out.print("C");
                    A.release();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

    public static void main(String[] args) throws InterruptedException {
        // 开始只有A可以获取, BC都不可以获取, 保证了A最先执行
        B.acquire();
        C.acquire();
        new ThreadA().start();
        new ThreadB().start();
        new ThreadC().start();
    }

Ссылаться на

Личный публичный аккаунт

Приглашаем всех обратить внимание, учиться и обсуждать вместе.