Требования к собеседованию: анализ принципа реализации Java AQS (графика) [длинная статья в бутике]

Java
Требования к собеседованию: анализ принципа реализации Java AQS (графика) [длинная статья в бутике]

Пусть люди, которых я встречу, вещи, которые я испытаю, даже если я стану немного лучше, я буду удовлетворен.

AQS: абстрактный синхронизатор в очереди

1. Введение в дизайн AQS

  • Реализация AQS основана на очереди ожидания FIFO.
  • Использование одной атомарной переменной для представления получения и освобождения состояния блокировки (final int) изменяет значение int с помощью CAS. (Думаю: почему значение int гарантирует видимость памяти?)
  • Подклассы должны определять закрытый внутренний класс, который наследует AQS и реализует его методы.
  • AQS поддерживает эксклюзивный и общий режимы.
  • Внутренний класс ConditionObject используется для поддержки подклассов для реализации монопольного режима.
  • Подклассы должны переопределить:
    • tryAcquire
    • tryRelease
    • tryReleaseShared
    • методы, такие как isHeldExclusively, и гарантированно являются потокобезопасными.

Цифры по всему тексту (основные):

Шаблон проектирования шаблонного метода: определите скелет алгоритма в операции и отложите реализацию некоторых шагов до подклассов.

2. Структура класса

  • Класс ConditionObject
  • Класс узла
  • N несколько методов

3. FIFO-очередь

Очередь ожидания — это очередь блокировки CLH (Craig, Landin и Hagersten).

Должен ли поток блокироваться, определяется полем «статус» в узле. Когда предыдущий узел узла снимает блокировку, узел будет разбужен.

private transient volatile Node head;
private transient volatile Node tail;
//The synchronization state.
//在互斥锁中它表示着线程是否已经获取了锁,0未获取,1已经获取了,大于1表示重入数。
private volatile int state;

AQS поддерживает изменчивое состояние int (представляющее общие ресурсы) и очередь ожидания потока FIFO (в которую входят, когда многопоточные конкурирующие ресурсы блокируются).

Существует три способа доступа к состоянию:

  • getState()
  • setState()
  • compareAndSetState()

AQS определяет два метода совместного использования ресурсов: эксклюзивный (эксклюзивный, может выполняться только один поток, например ReentrantLock) и общий (общий, несколько потоков могут выполняться одновременно, например Semaphore/CountDownLatch).

Различные пользовательские синхронизаторы конкурируют за общие ресурсы по-разному. При реализации пользовательского синхронизатора вам нужно только реализовать методы получения и освобождения состояния общего ресурса.Что касается обслуживания определенных потоков, ожидающих очереди (например, сбой получения ресурсов для входа в очередь/пробуждение и удаление из очереди и т. д.), AQS реализована на высшем уровне.

При реализации пользовательского синхронизатора в основном реализуются следующие методы:

  • isHeldExclusively(): использует ли поток исключительно ресурсы. Вам нужно реализовать его только в том случае, если вы используете условие.
  • tryAcquire(int): Эксклюзивный режим. Попытка получить ресурс, возвращает true в случае успеха, false в случае неудачи.
  • tryRelease(int): Эксклюзивный режим. Попытаться освободить ресурс, вернуть true в случае успеха, false в случае неудачи.
  • tryAcquireShared(int): общий метод. Попробуйте получить ресурс. Отрицательное число означает сбой, 0 — успех, но доступных ресурсов нет, положительное число — успех и оставшиеся ресурсы.
  • tryReleaseShared(int): общий метод. Попытайтесь освободить ресурс, верните true, если последующим ожидающим узлам разрешено просыпаться после освобождения, в противном случае — false.

Взяв в качестве примера ReentrantLock, состояние инициализируется равным 0, что указывает на разблокированное состояние. Когда поток A блокирует(), он вызывает функцию tryAcquire(), чтобы монополизировать блокировку и состояние+1. После этого другие потоки будут терпеть неудачу, когда они снова попытаются выполнить попытку Acquire().Пока поток A не разблокирует () до состояния=0 (то есть не снимет блокировку), у других потоков нет шансов получить блокировку. Конечно, прежде чем снять блокировку, поток A может повторно ее получить (состояние будет накапливаться), что является концепцией повторного входа. Но обратите внимание на то, сколько раз вам нужно его отпустить, чтобы убедиться, что состояние может вернуться в нулевое состояние.

Взяв в качестве примера CountDownLatch, задача делится на N подпотоков для выполнения, и состояние также инициализируется равным N (обратите внимание, что N должно соответствовать количеству потоков). N подпотоков выполняются параллельно.После выполнения каждого подпотока countDown() выполняется один раз, и состояние уменьшает CAS на 1. После выполнения всех дочерних потоков (т. е. состояние = 0) основным вызывающим потоком будет unpark(), а затем основной вызывающий поток вернется из функции await() для продолжения последующих действий.

Как правило, пользовательские синхронизаторы являются либо эксклюзивными, либо общими, и им нужно только реализовать: попробуйПопробуйПопробуйВыпустить попробуй аккуйршаред - попробуй релизешаред Можно использовать один из них.

Конечно, AQS также поддерживает пользовательские синхронизаторы для реализации как эксклюзивных, так и общих методов, таких как ReentrantReadWriteLock.

Следующие части взяты из комментариев к исходному коду:

Каждый раз, когда вы входите в очередь CLH, вам нужно ввести процесс очереди для хвостового узла, который является атомарной операцией. При исключении из очереди нам нужно только обновить головной узел. Когда узел определяет своего преемника, требуются некоторые усилия для обработки потоков, которые отменяют ожидание блокировок из-за тайм-аутов или прерываний.

Указатель предшественника узла, который в основном используется для обработки и отмены потока, ожидающего блокировки. Если узел отменяет блокировку ожидания, указатель преемника узла-предшественника этого узла должен указывать на поток, который не отменяет блокировку ожидания (узел потока фактически ожидает блокировку) в узле-преемнике этого узла.

Мы используем следующее соединение указателя для реализации механизма блокировки. Каждый узел содержит свой собственный поток, и узел пробуждает свой узел-преемник через соединение-преемник узла.

Для очередей CLH в качестве начального узла требуется фиктивный узел. Мы не создаем его в конструкторе, потому что если потоки не конкурируют за блокировку, усилия будут потрачены впустую. Вместо этого мы создаем указатели головы и хвоста только тогда, когда есть первый претендент.

Поток ожидает выполнения условия через тот же узел, но с другим соединением. Условие нужно связать только с узлом в очереди непараллельных соединений, потому что доступ к условию возможен только тогда, когда поток монопольно удерживает блокировку. Когда поток ожидает условия, узел вставляется в очередь условий. При срабатывании условия узел будет переведен в основную очередь. Используйте значение состояния, чтобы описать, в какой очереди находится узел.

4. Узел

static final class Node {
    //该等待节点处于共享模式
    static final Node SHARED = new Node();
    //该等待节点处于独占模式
    static final Node EXCLUSIVE = null;
    
    //表示节点的线程是已被取消的
    static final int CANCELLED =  1;
    //表示当前节点的后继节点的线程需要被唤醒
    static final int SIGNAL    = -1;
    //表示线程正在等待某个条件
    static final int CONDITION = -2;
    //表示下一个共享模式的节点应该无条件的传播下去
    static final int PROPAGATE = -3;

    //状态位 ,分别可以使CANCELLED、SINGNAL、CONDITION、PROPAGATE、0 
    volatile int waitStatus;

    volatile Node prev;//前驱节点
    volatile Node next;//后继节点
    volatile Thread thread;//等待锁的线程

    //ConditionObject链表的后继节点或者代表共享模式的节点。
    //因为Condition队列只能在独占模式下被能被访问,我们只需要简单的使用链表队列来链接正在等待条件的节点。
    //然后它们会被转移到同步队列(AQS队列)再次重新获取。
    //由于条件队列只能在独占模式下使用,所以我们要表示共享模式的节点的话只要使用特殊值SHARED来标明即可。
    Node nextWaiter;
    //Returns true if node is waiting in shared mode
    final boolean isShared() {
            return nextWaiter == SHARED;
    }
    .......
}

Значение разных значений waitStatus:

  • СИГНАЛ (-1): Узел-преемник текущего узла был (или будет) заблокирован (при парковке), поэтому, когда текущий узел освобождается или отменяется, обязательно разблокируйте его узел-преемник. Чтобы избежать конкуренции, метод получения должен сначала установить узел для передачи сигнала, затем снова вызвать метод получения и заблокировать его в случае сбоя.
  • ОТМЕНА(1): Текущий узел был отменен из-за тайм-аута или прерывания. Как только узел будет отменен, его значение состояния больше не будет изменяться, и поток текущего узла больше не будет заблокирован.
  • УСЛОВИЕ (-2): поток этого узла находится в состоянии ожидания и не будет рассматриваться как узел в очереди синхронизации, пока он не будет разбужен (сигнал), установит его значение на 0 и повторно войдет в состояние блокировки. .
  • PROPAGATE(-3:) Операции освобождения в общем режиме должны быть распространены на другие узлы. Значение состояния задается в методе doReleaseShared.
  • 0: Ничего из вышеперечисленного

Значение состояния представляет собой числовой тип для простоты использования. Неотрицательное значение означает, что узел не нужно будить. Поэтому в большей части кода нет необходимости проверять определенное значение этого значения состояния.

Обычный узел, его начальное значение waitStatus равно 0. Если вы хотите изменить это значение, вы можете использовать AQS, чтобы предоставить CAS для модификации.

5. Эксклюзивный режим и общий режим

Когда блокировка получена, не только один поток может удерживать блокировку (или называется состоянием синхронизации), поэтому в это время существует разница между эксклюзивным режимом и общим режимом, то есть он идентифицируется nextWaiter в Node. узел. Например, ReentrantLock является эксклюзивной блокировкой, и только один поток может получить блокировку, в то время как блокировка чтения WriteAndReadLock может быть получена несколькими потоками одновременно, но ее блокировка записи может удерживаться только одним потоком.

5.1 Эксклюзивный режим

5.1.1 Получение статуса синхронизации монопольного режима

//忽略中断的(即不手动抛出InterruptedException异常)独占模式下的获取方法。
//该方法在成功返回前至少会调用一次tryAcquire()方法(该方法是子类重写的方法,如果返回true则代表能成功获取).
//否则当前线程会进入队列排队,重复的阻塞和唤醒等待再次成功获取后返回, 
//该方法可以用来实现Lock.lock
public final void acquire(int arg) {
       if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

Этот метод сначала пытается получить блокировку (конкретная реализация tryAcquire(arg) определена в подклассе), если она получена, выполнение завершается, в противном случае текущий узел добавляется в конец очереди ожидания через addWaiter(Node .EXCLUSIVE), arg) и установите монопольный режим.

private Node addWaiter(Node mode) {
        //把当前线程包装为node,设为独占模式
        Node node = new Node(Thread.currentThread(), mode);
        // 尝试快速入队,即无竞争条件下肯定成功。如果失败,则进入enq自旋重试入队
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            //CAS替换当前尾部。成功则返回
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
//插入节点到队列中,如果队列未初始化则初始化,然后再插入。
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

Если хвостовой узел пуст, выполните enq(node); повторите попытку и, наконец, вставьте узел.После вставки узла в конец очереди он не сразу приостанавливает поток в узле, потому что в процессе вставки Это предыдущий поток Возможно, он уже был выполнен, поэтому он сначала выполнит операцию вращенияAcquireQueued(node, arg) и попытается позволить потоку повторно получить блокировку! можно выйти из процесса отжима, в противном случае продолжить.

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                //如果它的前继节点为头结点,尝试获取锁,获取成功则返回           
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //判断当前节点的线程是否应该被挂起,如果应该被挂起则挂起。
                //等待release唤醒释放
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                //在队列中取消当前节点
                cancelAcquire(node);
        }
    }

Если блокировка не получена, решается, следует ли ее приостановить, и это суждение должно определяться состоянием ожидания предшествующего узла:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //该节点如果状态如果为SIGNAL。则返回true,然后park挂起线程
        if (ws == Node.SIGNAL)
            return true;
       //表明该节点已经被取消,向前循环重新调整链表节点
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //执行到这里代表节点是0或者PROPAGATE,然后标记他们为SIGNAL,但是
            //还不能park挂起线程。需要重试是否能获取,如果不能,则挂起。
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
   
//挂起当前线程,且返回线程的中断状态
private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

Наконец, мы суммируем процесс получения эксклюзивной блокировки:

Шаблонный метод получения AQS не может получить статус синхронизации, вызвав tryAcquire, который является пользовательской реализацией подкласса. -> Конструировать поток в узел Node (addWaiter) -> Добавить узел Node в хвост очереди синхронизации (addWaiter) -> Метод вращения узла для получения статуса синхронизации (acquirQueued). Когда узел вращается, чтобы получить состояние синхронизации, он попытается получить состояние синхронизации, только если его предшествующий узел является головным узлом.Если предшествующий узел узла не является головным узлом или предшествующий узел узла является головным узел, ему не удается получить состояние синхронизации.Текущий поток должен быть заблокирован, а если он должен быть заблокирован, его необходимо разбудить перед возвратом.

Процесс получения замка:

  • Когда поток вызывает методAcquire() для запроса ресурса блокировки, в случае успеха он входит в критическую секцию.
  • Когда получение блокировки не удается, она попадает в очередь ожидания FIFO, а затем приостанавливается в ожидании пробуждения.
  • Когда ожидающий поток в очереди пробуждается, он снова попытается получить ресурс блокировки, в случае успеха он войдет в критическую секцию, в противном случае он продолжит приостановку и ожидание.

5.1.2 Освобождение состояния синхронизации монопольного режима

Поскольку это освобождение, для выполнения операции освобождения должен быть поток, удерживающий блокировку, то есть поток в головном узле снимает блокировку.

Состояние синхронизации выпуска в AQS совпадает с состоянием синхронизации получения. Это шаблонный метод. Конкретные операции выпуска tryRelease реализуются подклассами. Родительский класс AQS предоставляет только скелет алгоритма.

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
//如果node的后继节点不为空且不是作废状态,则唤醒这个后继节点,
//否则从末尾开始寻找合适的节点,如果找到,则唤醒
private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

Процесс: сначала вызовите метод tryRelease() подкласса, чтобы снять блокировку, а затем разбудите узел-преемник.Во время процесса пробуждения необходимо оценить, удовлетворяет ли узел-преемник ситуации.Если узел-преемник не пуст и не находится в недопустимом состоянии, затем разбудите узел-преемник. В противном случае ищите подходящий узел вперед от хвостового узла и просыпайтесь, если он найден.

Процесс снятия блокировки:

  • Когда поток вызывает release() для освобождения ресурса блокировки, если никакие другие потоки не ждут ресурса блокировки, освобождение завершено.
  • Если в очереди есть другие потоки, ожидающие ресурсов блокировки, которые необходимо пробудить, пробуждается первый ожидающий узел в очереди (первым пришел, первым вышел).

5.2 Режим обмена

5.2.1 Получение состояния синхронизации общего режима

  • Когда поток вызывает методAcquireShared() для запроса ресурса блокировки, в случае успеха он входит в критическую секцию.
  • Когда получение блокировки не удается, узел общего типа создается и помещается в очередь ожидания FIFO, а затем приостанавливается в ожидании пробуждения.
  • Когда ожидающий поток в очереди пробуждается, он снова попытается получить ресурс блокировки.В случае успеха он разбудит общий узел, который все еще ожидает позади, и передаст событие пробуждения, то есть проснется. все общие узлы за узлом по очереди, а затем войти в критическую секцию, в противном случае продолжать приостанавливать ожидание.

5.2.2 Освобождение состояния синхронизации общего режима

  • Когда поток вызывает releaseShared() для освобождения ресурса блокировки, если освобождение прошло успешно, он пробуждает ожидающий узел в очереди, если таковой имеется.

6. Резюме СКО

Многие блокирующие классы (например, ReentrantLock) в java.util.concurrent реализованы на основе AQS. AQS — это платформа синхронизации, предоставляющая общие механизмы для атомарного управления состояниями синхронизации, блокировки и пробуждения потоков, а также поддержки очередей заблокированных потоков.

AQS широко используется в JDK.Синхронизаторы на основе AQS включают:

  • ReentrantLock
  • Semaphore
  • ReentrantReadWriteLock (статья будет опубликована позже)
  • CountDownLatch
  • FutureTask

Каждый синхронизатор на основе AQS будет содержать два типа операций, а именно:

  • По крайней мере, одна операция получения. Эта операция блокирует вызывающий поток до тех пор, пока состояние AQS не позволит потоку продолжить выполнение.
  • По крайней мере, одна операция освобождения. Эта операция изменяет состояние AQS, что позволяет разблокировать один или несколько заблокированных потоков.

Основываясь на принципе «композиция имеет приоритет над наследованием», синхронизаторы на основе AQS обычно: объявляют внутренний частный подкласс Sync, который наследуется от AQS, и вызовы всех общедоступных методов синхронизатора будут делегированы этому внутреннему подклассу.

7. Последующие действия

Следующие статьи об AQS будут опубликованы позже, что углубит понимание AQS.

  • Разрешение объекта AQS ConditionObject
  • Пример применения AQS Анализ ReentrantReadWriteLock
  • Семантика энергозависимой памяти Java и видимость блокировки памяти AQS

8. Спасибо

Большая часть содержания этой статьи взята из Интернета.сегмент fault.com/ah/119000001… сегмент fault.com/ah/119000001… zhuanlan.zhihu.com/p/27134110 blog.CSDN.net/Меня зовут Лин... У блога Wooooooo.cn на.com/water есть камень…

ФИФО-очередь:У блога Wooooooo.cn на.com/water есть камень…

9. Информация о блогере

Личный публичный аккаунт WeChat:

личный блог

личный гитхаб

Личный блог самородков

Личный блог CSDN