Подробное объяснение AQS (AbstractQueuedSynchronizer) одновременной блокировки

Java

1. Структура пакета блокировки J.U.C.

В предыдущей статье говорилось оМеханизм блокировки для параллельного программирования: синхронизированный и блокирующий, в основном представляет механизм блокировки, обычно используемый в параллельном программировании на Java. Lock — это интерфейс, а synchronized — это ключевое слово в Java, а synchronized реализован на базе jvm. Блокировки блокировки могут быть прерваны, поддерживаются временные блокировки и т. д. Класс реализации Lock, реентерабельная блокировка ReentrantLock, мы говорили о ее конкретном использовании. Когда дело доходит до ReentrantLock, мы должны говорить об абстрактном классе AbstractQueuedSynchronizer (AQS). Синхронизатор абстрактного типа очереди, AQS определяет структуру синхронизатора для многопоточного доступа к общим ресурсам, и многие реализации классов синхронизации полагаются на него, такие как часто используемые ReentrantLock и ThreadPoolExecutor.

lock
Заблокировать структуру пакета

2. Введение в AQS

AQS — это абстрактный класс, который в основном используется наследуемым образом. Сам AQS не реализует никакого интерфейса синхронизации, он просто определяет методы получения и освобождения состояния синхронизации для использования пользовательских компонентов синхронизации. Абстрактный класс AQS содержит следующие методы:

AQS определяет два метода совместного использования ресурсов: эксклюзивный (эксклюзивный, может выполняться только один поток, например ReentrantLock) и общий (общий, несколько потоков могут выполняться одновременно, например Semaphore/CountDownLatch). В общем режиме используется только очередь синхронизации, а иногда только очередь синхронизации используется в эксклюзивном режиме, но если задействовано условие, существует также очередь условия. Реализуйте справедливое и несправедливое различие в tryAcquire, tryAcquireShared в подклассах.

Различные пользовательские синхронизаторы конкурируют за общие ресурсы по-разному. При реализации пользовательского синхронизатора вам нужно только реализовать методы получения и освобождения состояния общего ресурса.Что касается обслуживания определенных потоков, ожидающих очереди (например, сбой получения ресурсов для входа в очередь/пробуждение и удаление из очереди и т. д.), AQS реализована на высшем уровне.

Весь AQS разделен на следующие разделы:

  • Узел Node, используемый для хранения узла, который получает поток, существует в Sync Queue, Condition Queue, основным отличием между этими узлами является значение waitStatus (подробно описано ниже).
  • Очередь условий, эта очередь используется в эксклюзивном режиме, узел будет добавлен в хвост только при использовании Condition.awaitXX (PS: предварительным условием для использования условия является получение блокировки)
  • Очередь синхронизации, очередь CLH, в которой хранятся узлы, будет использоваться в режиме эксклюзивного совместного использования (основная особенность заключается в том, что в очереди всегда есть фиктивный узел, условие получения блокировки узлом-преемником определяется узлом-предшественником, и узел-предшественник освобождает блокировку, когда блокировка снимается. Разбудит узел-преемник в спящем режиме)
  • ConditionObject, используемый в эксклюзивном режиме, в основном предназначен для снятия блокировки потоком, присоединения к очереди условий и выполнения соответствующей операции с сигналом.
  • Эксклюзивная блокировка получения (получение, освобождение), например ReentrantLock.
  • Общая блокировка получения (acquireShared, releaseShared), например ReeantrantReadWriteLock, Semaphore, CountDownLatch.

Давайте подробно разберем исходный код реализации AQS.

3. Узел внутреннего класса

Узел Node представляет собой поток, который получает блокировку, существующую в очереди условий и очереди синхронизации, и его основная функция — nextWaiter (отмечающая общий или исключительный доступ), а waitStatus отмечает состояние узла.

node
внутренний класс Node

static final class Node {
    /** 标识节点是否是 共享的节点(这样的节点只存在于 Sync Queue 里面) */
    static final Node SHARED = new Node();
    //独占模式
    static final Node EXCLUSIVE = null;
    /**
     *  CANCELLED 说明节点已经 取消获取 lock 了(一般是由于 interrupt 或 timeout 导致的)
     *  很多时候是在 cancelAcquire 里面进行设置这个标识
     */
    static final int CANCELLED = 1;

    /**
     * SIGNAL 标识当前节点的后继节点需要唤醒(PS: 这个通常是在 独占模式下使用, 在共享模式下有时用 PROPAGATE)
     */
    static final int SIGNAL = -1;
    
    //当前节点在 Condition Queue 里面
    static final int CONDITION = -2;
    
    /**
     * 当前节点获取到 lock 或进行 release lock 时, 共享模式的最终状态是 PROPAGATE(PS: 有可能共享模式的节点变成 PROPAGATE 之前就被其后继节点抢占 head 节点, 而从Sync Queue中被踢出掉)
     */
    static final int PROPAGATE = -3;

    volatile int waitStatus;

    /**
     * 节点在 Sync Queue 里面时的前继节点(主要来进行 skip CANCELLED 的节点)
     * 注意: 根据 addWaiter方法:
     *  1. prev节点在队列里面, 则 prev != null 肯定成立
     *  2. prev != null 成立, 不一定 node 就在 Sync Queue 里面
     */
    volatile Node prev;

    /**
     * Node 在 Sync Queue 里面的后继节点, 主要是在release lock 时进行后继节点的唤醒
     * 而后继节点在前继节点上打上 SIGNAL 标识, 来提醒他 release lock 时需要唤醒
     */
    volatile Node next;

    //获取 lock 的引用
    volatile Thread thread;

    /**
     * 作用分成两种:
     *  1. 在 Sync Queue 里面, nextWaiter用来判断节点是 共享模式, 还是独占模式
     *  2. 在 Condition queue 里面, 节点主要是链接且后继节点 (Condition queue是一个单向的, 不支持并发的 list)
     */
    Node nextWaiter;

    // 当前节点是否是共享模式
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    // 获取 node 的前继节点
    final Node predecessor() throws NullPointerException{
        Node p = prev;
        if(p == null){
            throw new NullPointerException();
        }else{
            return p;
        }
    }

    Node(){
        // Used to establish initial head or SHARED marker
    }

    // 初始化 Node 用于 Sync Queue 里面
    Node(Thread thread, Node mode){     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    //初始化 Node 用于 Condition Queue 里面
    Node(Thread thread, int waitStatus){ // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

Изменения статуса waitStatus:

  1. Когда поток только входит в очередь синхронизации и обнаруживает, что монопольная блокировка была получена другими, он помечает предшествующий узел как SIGNAL, а затем пытается получить блокировку (вызовите метод tryAcquire).
  2. Если при вызове метода tryAcquire не удается получить результат, оцените, помечен ли предшествующий узел как СИГНАЛ, если да, непосредственно заблокируйте (перед блокировкой он гарантирует, что предшествующий узел помечен как СИГНАЛ, поскольку предшествующий узел помечен как СИГНАЛ в соответствии с к тому, помечен ли он как SIGNAL при освобождении блокировки или нет, чтобы решить, разбудить ли узел-преемник или нет
  3. Узел-предшественник использует блокировку и освобождает ее.Поскольку он помечен как SIGNAL, он пробуждает узел-преемник.

Процесс изменения состояния ожидания:

  1. В эксклюзивном режиме: 0 (начальный) -> сигнал (узел-преемник, помеченный как освобожденный, должен разбудить узел-преемник) -> 0 (когда блокировка снимается, он возвращается к 0)
  2. Эксклюзивный режим + при использовании Условие: 0 (начальное) -> сигнал (отмечено как освобождение узлом-преемником и необходимо разбудить узел-преемник) -> 0 (когда блокировка будет снята, она будет восстановлена ​​до 0) Это может включают прерывания и тайм-ауты, просто есть еще один ОТМЕНЕННЫЙ, когда узел становится ОТМЕНЕННЫМ, он ожидает очистки.
  3. В общем режиме: 0 (начальный) -> PROPAGATE (при получении или снятии блокировки) (при получении блокировки будет вызываться setHeadAndPropagate для транзитивного пробуждения узла-преемника, пока он не встретит узел в эксклюзивном режиме)
  4. В совместно используемом режиме + эксклюзивный режим: 0 (начальный) -> сигнал (помечается как освобождение узлом-преемником и должен разбудить узел-преемник) -> 0 (когда блокировка снимается, она возвращается к 0)

Эти изменения состояния на нем в основном находятся в: doReleaseShared, shouldParkAfterFailedAcquire Inside.

4. Condition Queue

Очередь условий — это очередь, которая не является одновременно безопасной и используется только в эксклюзивном режиме (PS: Почему параллелизм небезопасен? В основном при работе с условием поток должен получить эксклюзивную блокировку, поэтому нет необходимости учитывать проблему безопасности параллелизм); Когда Node существует в очереди условий, только waitStatus, thread и nextWaiter имеют значения, а остальные имеют значение null (waitStatus может быть только CONDITION, 0 (0 означает, что узел передан в очередь синхронизации или прерван/тайм-аут). )); Здесь следует отметить один момент: когда поток прерывается или блокировка истекает по тайм-ауту, узел некоторое время будет существовать в очереди условий и очереди синхронизации.

ConditionQueue
Condition Queue
Узлы Node4, Node5, Node6 и Node7 вызывают метод Condition.awaitXX для присоединения к очереди условий (PS: после присоединения исходная блокировка будет снята).

4.1 Метод постановки в очередь addConditionWaiter

Инкапсулируйте текущий поток как узел Node и поместите его в очередь условий. Вы можете заметить, что следующие операции в очереди условий не учитывают параллелизм (очередь очереди синхронизации поддерживает параллельные операции), почему это так? Потому что в Условием операции является то, что текущий поток получил монопольную блокировку AQS, поэтому нет необходимости учитывать параллелизм.

private Node addConditionWaiter(){
    Node t = lastWaiter;                                
    // Condition queue 的尾节点           
	// 尾节点已经Cancel, 直接进行清除,
    /** 
    * 当Condition进行 awiat 超时或被中断时, Condition里面的节点是没有被删除掉的, 需要其	 * 他await 在将线程加入 Condition Queue 时调用addConditionWaiter而进而删除, 或 await 操作差不多结束时, 调用 "node.nextWaiter != null" 进行判断而删除 (PS: 通过 signal 进行唤
    * 醒时 node.nextWaiter 会被置空, 而中断和超时时不会)
    */
    if(t != null && t.waitStatus != Node.CONDITION){
    	/** 
    	* 调用 unlinkCancelledWaiters 对 "waitStatus != Node.CONDITION" 的节点进行		* 删除(在Condition里面的Node的waitStatus 要么是CONDITION(正常), 要么就是 0 
    	* (signal/timeout/interrupt))
    	*/
        unlinkCancelledWaiters();                     
        t = lastWaiter;                     
    }
    //将线程封装成 node 准备放入 Condition Queue 里面
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if(t == null){
    	//Condition Queue 是空的
        firstWaiter = node;                           
    } else {
    	// 追加到 queue 尾部
        t.nextWaiter = node;                          
    }
    lastWaiter = node;                               
    return node;
}

4.2 Способ удаления узла Canceled unlinkCancelledWaiters

Когда узел находится в очереди условий, если состояние не СОСТОЯНИЕ, оно должно быть прервано или истекло по тайм-ауту. При вызове addConditionWaiter для помещения потока в очередь условий или после завершения метода aviat очистите узлы в очереди условий, которые все еще существуют из-за тайм-аута/прерывания. Эта операция удаления гениальна, в ней вводится следовой узел, который можно понимать как последний действительный узел, обнаруженный при обходе всей очереди условий.

private void unlinkCancelledWaiters(){
    Node t = firstWaiter;
    Node trail = null;
    while(t != null){
        Node next = t.nextWaiter;               // 1. 先初始化 next 节点
        if(t.waitStatus != Node.CONDITION){   // 2. 节点不有效, 在Condition Queue 里面 Node.waitStatus 只有可能是 CONDITION 或是 0(timeout/interrupt引起的)
            t.nextWaiter = null;               // 3. Node.nextWaiter 置空
            if(trail == null){                  // 4. 一次都没有遇到有效的节点
                firstWaiter = next;            // 5. 将 next 赋值给 firstWaiter(此时 next 可能也是无效的, 这只是一个临时处理)
            } else {
                trail.nextWaiter = next;       // 6. next 赋值给 trail.nextWaiter, 这一步其实就是删除节点 t
            }
            if(next == null){                  // 7. next == null 说明 已经 traverse 完了 Condition Queue
                lastWaiter = trail;
            }
        }else{
            trail = t;                         // 8. 将有效节点赋值给 trail
        }
        t = next;
    }
}

4.3 Способ переноса узлов transferForSignal

TransferForSignal — это обычный метод передачи, вызываемый только при нормальном пробуждении узла.
При переводе Node из Condition Queue в Sync Queue, перед вызовом transferForSignal, first.nextWaiter = null, и мы обнаружили, что если узел будет передан из-за таймаута/прерывания, то этот шаг не будет выполнен, перевод в обоих случаях будет принимать wautStatus установлен на 0

final boolean transferForSignal(Node node){
    /**
     * If cannot change waitStatus, the node has been cancelled
     */
    if(!compareAndSetWaitStatus(node, Node.CONDITION, 0)){ // 1. 若 node 已经 cancelled 则失败
        return false;
    }

    Node p = enq(node);                                 // 2. 加入 Sync Queue
    int ws = p.waitStatus;
    if(ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)){ // 3. 这里的 ws > 0 指Sync Queue 中node 的前继节点cancelled 了, 所以, 唤醒一下 node ; compareAndSetWaitStatus(p, ws, Node.SIGNAL)失败, 则说明 前继节点已经变成 SIGNAL 或 cancelled, 所以也要 唤醒
        LockSupport.unpark(node.thread);
    }
    return true;
}

4.4 Метод переноса узлов transferAfterCancelledWait

transferAfterCancelledWait Метод передачи, который вызывается только тогда, когда узел получает блокировку, прерывается или время ожидания истекает. Перенесите узел, который просыпается из-за тайм-аута/прерывания, в очередь условий

final boolean transferAfterCancelledWait(Node node){
    if(compareAndSetWaitStatus(node, Node.CONDITION, 0)){ // 1. 没有 node 没有 cancelled , 直接进行转移 (转移后, Sync Queue , Condition Queue 都会存在 node)
        enq(node);
        return true;
    }
    
    while(!isOnSyncQueue(node)){                // 2.这时是其他的线程发送signal,将本线程转移到 Sync Queue 里面的工程中(转移的过程中 waitStatus = 0了, 所以上面的 CAS 操作失败)
        Thread.yield();                         // 这里调用 isOnSyncQueue判断是否已经 入Sync Queue 了
    }
    return false;
}

5. Sync Queue

AQS внутренне поддерживает очередь FIFO CLH, поэтому AQS не поддерживает стратегии синхронизации на основе приоритетов. Что касается того, почему выбрана очередь CLH, основная причина заключается в том, что блокировка CLH легче справляется с отменой и тайм-аутом, чем блокировка MSC, в то же время она имеет возможность входить и выходить из очереди быстро, без каких-либо препятствий, и также очень легко проверить, есть ли ожидающие потоки (голова != хвост, указатели головы и хвоста разные). Конечно, по сравнению с оригинальной блокировкой очереди CLH, ASQ использует вариант блокировки очереди CLH:

  1. Заблокированный спин, используемый исходным CLH, в то время как CLH AQS использует поле состояния в каждом узле для управления блокировкой вместо спина.
  2. Для обработки операций таймаута и отмены каждый узел поддерживает указатель на предшественника. Если предшественник узла отменен, узел может двигаться вперед, используя поле состояния предшественника.
  3. Головной узел использует фиктивный узел.

SyncQueue
Sync Queue

На этом рисунке показано, что поток получает блокировку, в то время как Node1, Node2 и Node3 ожидают в очереди синхронизации, чтобы получить блокировку (PS: обратите внимание, что SINGNAL фиктивного узла является признаком того, что поток, который получает блокировку, уведомляет узел-преемник при снятии блокировки)

5.1 Метод очереди входа узла очереди синхронизации

Здесь следует отметить одну вещь: узлы, которые инициализируют head и tail, не обязательно являются head.next, потому что они могут быть вытеснены другими потоками в течение периода. Инкапсулируйте текущий поток как узел и добавьте его в очередь синхронизации.

private Node addWaiter(Node mode){
    Node node = new Node(Thread.currentThread(), mode);      // 1. 封装 Node
    Node pred = tail;
    if(pred != null){                           // 2. pred != null -> 队列中已经有节点, 直接 CAS 到尾节点
        node.prev = pred;                       // 3. 先设置 Node.pre = pred (PS: 则当一个 node在Sync Queue里面时  node.prev 一定 != null(除 dummy node), 但是 node.prev != null 不能说明其在 Sync Queue 里面, 因为现在的CAS可能失败 )
        if(compareAndSetTail(pred, node)){      // 4. CAS node 到 tail
            pred.next = node;                  // 5. CAS 成功, 将 pred.next = node (PS: 说明 node.next != null -> 则 node 一定在 Sync Queue, 但若 node 在Sync Queue 里面不一定 node.next != null)
            return node;
        }
    }
    enq(node);                                 // 6. 队列为空, 调用 enq 入队列
    return node;
}


/**
 * 这个插入会检测head tail 的初始化, 必要的话会初始化一个 dummy 节点, 这个和 ConcurrentLinkedQueue 一样的
 * 将节点 node 加入队列
 * 这里有个注意点
 * 情况:
 *      1. 首先 queue是空的
 *      2. 初始化一个 dummy 节点
 *      3. 这时再在tail后面添加节点(这一步可能失败, 可能发生竞争被其他的线程抢占)
 *  这里为什么要加入一个 dummy 节点呢?
 *      这里的 Sync Queue 是CLH lock的一个变种, 线程节点 node 能否获取lock的判断通过其前继节点
 *      而且这里在当前节点想获取lock时通常给前继节点 打上 signal 的标识(表示前继节点释放lock需要通知我来获取lock)
 *      若这里不清楚的同学, 请先看看 CLH lock的资料 (这是理解 AQS 的基础)
 */
private Node enq(final Node node){
    for(;;){
        Node t = tail;
        if(t == null){ // Must initialize       // 1. 队列为空 初始化一个 dummy 节点 其实和 ConcurrentLinkedQueue 一样
            if(compareAndSetHead(new Node())){  // 2. 初始化 head 与 tail (这个CAS成功后, head 就有值了, 详情将 Unsafe 操作)
                tail = head;
            }
        }else{
            node.prev = t;                      // 3. 先设置 Node.pre = pred (PS: 则当一个 node在Sync Queue里面时  node.prev 一定 != null, 但是 node.prev != null 不能说明其在 Sync Queue 里面, 因为现在的CAS可能失败 )
            if(compareAndSetTail(t, node)){     // 4. CAS node 到 tail
                t.next = node;                  // 5. CAS 成功, 将 pred.next = node (PS: 说明 node.next != null -> 则 node 一定在 Sync Queue, 但若 node 在Sync Queue 里面不一定 node.next != null)
                return t;
            }
        }
    }
}

5.2 Метод очереди Sync Queue Node Out Queue

На самом деле есть два способа выйти из очереди: Новый узел получает блокировку, вызывает setHead для вытеснения заголовка и удаляет первоначальный заголовок; узел отменяется из-за прерывания или истечения времени захвата и, наконец, удаляется.

/**
 * 设置 head 节点(在独占模式没有并发的可能, 当共享的模式有可能)
 */
private void setHead(Node node){
    head = node;
    node.thread = null; // 清除线程引用
    node.prev = null; // 清除原来 head 的引用 <- 都是 help GC
}

// 清除因中断/超时而放弃获取lock的线程节点(此时节点在 Sync Queue 里面)
private void cancelAcquire(Node node) {
    if (node == null)
        return;

    node.thread = null;                 // 1. 线程引用清空

    Node pred = node.prev;
    while (pred.waitStatus > 0)       // 2.  若前继节点是 CANCELLED 的, 则也一并清除
        node.prev = pred = pred.prev;
        
    Node predNext = pred.next;         // 3. 这里的 predNext也是需要清除的(只不过在清除时的 CAS 操作需要 它)

    node.waitStatus = Node.CANCELLED; // 4. 标识节点需要清除

    // If we are the tail, remove ourselves.
    if (node == tail && compareAndSetTail(node, pred)) { // 5. 若需要清除额节点是尾节点, 则直接 CAS pred为尾节点
        compareAndSetNext(pred, predNext, null);    // 6. 删除节点predNext
    } else {
        int ws;
        if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL || // 7. 后继节点需要唤醒(但这里的后继节点predNext已经 CANCELLED 了)
                        (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && // 8. 将 pred 标识为 SIGNAL
                pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0) // 8. next.waitStatus <= 0 表示 next 是个一个想要获取lock的节点
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node); // 若 pred 是头节点, 则此刻可能有节点刚刚进入 queue ,所以进行一下唤醒
        }

        node.next = node; // help GC
    }
}

6. Эксклюзивный замок

6.1 Основной процесс получения блокировки эксклюзивным способом

  1. Вызовите tryAcquire, чтобы попытаться получить блокировку (обычно реализуемую подклассами), и вернитесь напрямую в случае успеха.
  2. Вызов tryAcquire не может быть получен, инкапсулируйте текущий поток как узел и добавьте его в очередь синхронизации (вызовите addWaiter) и дождитесь получения сигнала.
  3. Позвоните в командуAcquireQueued, чтобы получить блокировки (возможно, многократно блокируя и разблокируя)
  4. По возвращаемому значению acceptQueued судят о том, прерван ли процесс получения блокировки, если он прерван, то он прервет его снова (selfInterrupt), а если отреагирует на прерывание, то будет выброшено исключение напрямую.

6.2 Эксклюзивный способ получения блокировки в основном делится на 3 категории.

  1. Acquire не отвечает на прерывания для получения блокировки. Неотвечающие прерывания здесь означают, что поток проснется после того, как его прервали, и продолжит получение блокировки. процесс только что прерван (метод selfInterrupt)
  2. doAcquireInterruptably отвечает на прерванное получение блокировки.Ответ на прерывание здесь означает, что если поток будет прерван в процессе получения блокировки, будет выброшено исключение напрямую
  3. doAcquireNanos получает блокировку в ответ на прерывание и тайм-аут.Когда поток прерывается или время ожидания истекает, сразу создается исключение, и получение завершается неудачно.

6.3 Эксклюзивное получение метода блокировки

Acquire(int arg): получить объект в эксклюзивном режиме, игнорируя прерывания.

public final void acquire(int arg){
    if(!tryAcquire(arg)&&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
        selfInterrupt();
    }
}
  1. Вызовите tryAcquire, чтобы попытаться получить блокировку (обычно реализуемую подклассами), и вернитесь напрямую в случае успеха.
  2. Вызов tryAcquire не может быть получен, инкапсулируйте текущий поток как узел и добавьте его в очередь синхронизации (вызовите addWaiter) и дождитесь получения сигнала.
  3. Позвоните в командуAcquireQueued, чтобы получить блокировки (возможно, многократно блокируя и разблокируя)
  4. По возвращаемому значению AcquireQueued судят о том, прерван ли процесс получения блокировки, если прерван, то будет прерван снова (selfInterrupt).

6.4 Метод блокировки получения цикла

final boolean acquireQueued(final Node node, int arg){
        boolean failed = true;
        try {
            boolean interrupted = false;
            for(;;){
                final Node p = node.predecessor();      // 1. 获取当前节点的前继节点 (当一个n在 Sync Queue 里面, 并且没有获取 lock 的 node 的前继节点不可能是 null)
                if(p == head && tryAcquire(arg)){       // 2. 判断前继节点是否是head节点(前继节点是head, 存在两种情况 (1) 前继节点现在占用 lock (2)前继节点是个空节点, 已经释放 lock, node 现在有机会获取 lock); 则再次调用 tryAcquire尝试获取一下
                    setHead(node);                       // 3. 获取 lock 成功, 直接设置 新head(原来的head可能就直接被回收)
                    p.next = null; // help GC          // help gc
                    failed = false;
                    return interrupted;                // 4. 返回在整个获取的过程中是否被中断过 ; 但这又有什么用呢? 若整个过程中被中断过, 则最后我在 自我中断一下 (selfInterrupt), 因为外面的函数可能需要知道整个过程是否被中断过
                }
                if(shouldParkAfterFailedAcquire(p, node) && // 5. 调用 shouldParkAfterFailedAcquire 判断是否需要中断(这里可能会一开始 返回 false, 但在此进去后直接返回 true(主要和前继节点的状态是否是 signal))
                        parkAndCheckInterrupt()){      // 6. 现在lock还是被其他线程占用 那就睡一会, 返回值判断是否这次线程的唤醒是被中断唤醒
                    interrupted = true;
                }
            }
        }finally {
            if(failed){                             // 7. 在整个获取中出错
                cancelAcquire(node);                // 8. 清除 node 节点(清除的过程是先给 node 打上 CANCELLED标志, 然后再删除)
            }
        }
    }

Основная логика:

  1. Когда узел-предшественник текущего узла является головным узлом, сначала попробуйте получить блокировку, в случае успеха установите новый головной узел и верните
  2. Первый шаг не удался, проверьте, требуется ли сон, при необходимости спите и подождите, пока узел-предшественник проснется при снятии блокировки или проснется через прерывание.
  3. Весь процесс может потребовать блокировки-неблокировки несколько раз

6.5 Поддержка метода блокировки получения прерывания doAcquireInterruptably

private void doAcquireInterruptibly(int arg) throws InterruptedException{
    final Node node = addWaiter(Node.EXCLUSIVE);  // 1. 将当前的线程封装成 Node 加入到 Sync Queue 里面
    boolean failed = true;
    try {
        for(;;){
            final Node p = node.predecessor(); // 2. 获取当前节点的前继节点 (当一个n在 Sync Queue 里面, 并且没有获取 lock 的 node 的前继节点不可能是 null)
            if(p == head && tryAcquire(arg)){  // 3. 判断前继节点是否是head节点(前继节点是head, 存在两种情况 (1) 前继节点现在占用 lock (2)前继节点是个空节点, 已经释放 lock, node 现在有机会获取 lock); 则再次调用 tryAcquire尝试获取一下
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }

            if(shouldParkAfterFailedAcquire(p, node) && // 4. 调用 shouldParkAfterFailedAcquire 判断是否需要中断(这里可能会一开始 返回 false, 但在此进去后直接返回 true(主要和前继节点的状态是否是 signal))
                    parkAndCheckInterrupt()){           // 5. 现在lock还是被其他线程占用 那就睡一会, 返回值判断是否这次线程的唤醒是被中断唤醒
                throw new InterruptedException();       // 6. 线程此时唤醒是通过线程中断, 则直接抛异常
            }
        }
    }finally {
        if(failed){                 // 7. 在整个获取中出错(比如线程中断)
            cancelAcquire(node);    // 8. 清除 node 节点(清除的过程是先给 node 打上 CANCELLED标志, 然后再删除)
        }
    }
}

AcquireInterruptably(int arg): получить объект в монопольном режиме, прервав его в случае прерывания.

public final void acquireInterruptibly(int arg) throws InterruptedException {    
        if (Thread.interrupted())    
            throw new InterruptedException();    
        if (!tryAcquire(arg))       
            doAcquireInterruptibly(arg);     
    }

Верните успех, сначала проверив состояние прерывания, а затем вызвав tryAcquire хотя бы один раз. В противном случае поток ставится в очередь, блокируется и постоянно пробуждается, вызывая tryAcquire до тех пор, пока он не завершится успешно или не будет прерван.

6.6 Тайм-аут и метод прерывания получения блокировки

tryAcquireNanos(int arg, long nanosTimeout): эксклюзивный и поддерживает получение режима тайм-аута: с тайм-аутом, если тайм-аут истекает, он выйдет.

private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException{
    if(nanosTimeout <= 0L){
        return false;
    }

    final long deadline = System.nanoTime() + nanosTimeout; // 0. 计算截至时间
    final Node node = addWaiter(Node.EXCLUSIVE);  // 1. 将当前的线程封装成 Node 加入到 Sync Queue 里面
    boolean failed = true;

    try {
        for(;;){
            final Node p = node.predecessor(); // 2. 获取当前节点的前继节点 (当一个n在 Sync Queue 里面, 并且没有获取 lock 的 node 的前继节点不可能是 null)
            if(p == head && tryAcquire(arg)){  // 3. 判断前继节点是否是head节点(前继节点是head, 存在两种情况 (1) 前继节点现在占用 lock (2)前继节点是个空节点, 已经释放 lock, node 现在有机会获取 lock); 则再次调用 tryAcquire尝试获取一下
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }

            nanosTimeout = deadline - System.nanoTime(); // 4. 计算还剩余的时间
            if(nanosTimeout <= 0L){                      // 5. 时间超时, 直接返回
                return false;
            }
            if(shouldParkAfterFailedAcquire(p, node) && // 6. 调用 shouldParkAfterFailedAcquire 判断是否需要中断(这里可能会一开始 返回 false, 但在此进去后直接返回 true(主要和前继节点的状态是否是 signal))
                    nanosTimeout > spinForTimeoutThreshold){ // 7. 若没超时, 并且大于spinForTimeoutThreshold, 则线程 sleep(小于spinForTimeoutThreshold, 则直接自旋, 因为效率更高 调用 LockSupport 是需要开销的)
                LockSupport.parkNanos(this, nanosTimeout);
            }
            if(Thread.interrupted()){                           // 8. 线程此时唤醒是通过线程中断, 则直接抛异常
                throw new InterruptedException();
            }
        }
    }finally {
        if(failed){                 // 9. 在整个获取中出错(比如线程中断/超时)
            cancelAcquire(node);    // 10. 清除 node 节点(清除的过程是先给 node 打上 CANCELLED标志, 然后再删除)
        }
    }
}

Попытка получить в эксклюзивном режиме, сдаться, если прервется и тайм-аут. Реализация сначала проверяет состояние прерывания, а затем хотя бы один раз вызывает функцию tryAcquire.

public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {    
     if (Thread.interrupted())    
         throw new InterruptedException();    
     return tryAcquire(arg)|| doAcquireNanos(arg, nanosTimeout);    
}

6.7 Метод снятия блокировки

Процесс снятия блокировки:

  • Вызовите метод tryRelease подкласса, чтобы освободить полученные ресурсы.
  • Определить, снята ли блокировка полностью (существуют случаи, когда блокировка повторяется)
  • Определите, есть ли узел-преемник для пробуждения, при необходимости вызовите unparkSuccessor для пробуждения
public final boolean release(int arg){
    if(tryRelease(arg)){   // 1. 调用子类, 若完全释放好, 则返回true(这里有lock重复获取)
        Node h = head;
        if(h != null && h.waitStatus != 0){ // 2. h.waitStatus !=0 其实就是 h.waitStatus < 0 后继节点需要唤醒
            unparkSuccessor(h);   // 3. 唤醒后继节点
        }
        return true;
    }
    return false;
}

/**
 * 唤醒 node 的后继节点
 * 这里有个注意点: 唤醒时会将当前node的标识归位为 0
 * 等于当前节点标识位 的流转过程: 0(刚加入queue) -> signal (被后继节点要求在释放时需要唤醒) -> 0 (进行唤醒后继节点)
 */
private void unparkSuccessor(Node node) {
    logger.info("unparkSuccessor node:" + node + Thread.currentThread().getName());
    
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);       // 1. 清除前继节点的标识
    Node s = node.next;
    logger.info("unparkSuccessor s:" + node + Thread.currentThread().getName());
    if (s == null || s.waitStatus > 0) {         // 2. 这里若在 Sync Queue 里面存在想要获取 lock 的节点,则一定需要唤醒一下(跳过取消的节点)&emsp;(PS: s == null发生在共享模式的竞争释放资源)
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)              // 3. 找到 queue 里面最前面想要获取 Lock 的节点
                s = t;
    }
    logger.info("unparkSuccessor s:"+s);
    if (s != null)
        LockSupport.unpark(s.thread);
}

7. Общий замок

7.1 Процесс получения блокировки путем совместного использования

  1. Вызовите tryAcquireShared, чтобы попытаться получить блокировку (обычно реализуемую подклассами) и вернуться напрямую в случае успеха.
  2. Вызов tryAcquireShared не может получить, инкапсулировать текущий поток как Node и добавить его в очередь синхронизации (вызов addWaiter) и дождаться получения сигнала.
  3. Приобретайте блокировки, вращаясь в очереди синхронизации (возможно, многократно блокируя и разблокируя)
  4. Когда получение не удается, оцените, можно ли его заблокировать (предпосылка блокировки заключается в том, что предшествующий узел помечен СИГНАЛОМ)
  5. Основное различие между получением разделяемой и монопольной блокировки заключается в том, что если блокировка успешно получена в совместно используемом режиме, будет определено, необходимо ли продолжать пробуждение следующих узлов (и метод doReleaseShared), которые продолжают получать разделяемую блокировку. .

7.2 Существует три основных типа замков, получаемых при совместном использовании

  1. AcquireShared не отвечает на прерванные блокировки получения. Неотвечающие прерывания здесь означают, что поток проснется после того, как его прервали, и продолжит получать блокировку. только сейчас прерывается (метод selfInterrupt)
  2. doAcquireSharedInterruptably отвечает на прерванное получение блокировки.Отклик на прерывание здесь означает, что если поток будет прерван в процессе получения блокировки, сразу будет выброшено исключение
  3. doAcquireSharedNanos получает блокировку в ответ на прерывание и тайм-аут.Когда поток прерывается или время ожидания истекает, сразу создается исключение, и получение завершается неудачно.

7.3 Метод получения общей блокировки

public final void acquireShared(int arg){
    if(tryAcquireShared(arg) < 0){  // 1. 调用子类, 获取共享 lock  返回 < 0, 表示失败
        doAcquireShared(arg);       // 2. 调用 doAcquireShared 当前 线程加入 Sync Queue 里面, 等待获取 lock
    }
}

7.4 Метод получения общей блокировки doAcquireShared

private void doAcquireShared(int arg){
    final Node node = addWaiter(Node.SHARED);       // 1. 将当前的线程封装成 Node 加入到 Sync Queue 里面
    boolean failed = true;

    try {
        boolean interrupted = false;
        for(;;){
            final Node p = node.predecessor();      // 2. 获取当前节点的前继节点 (当一个n在 Sync Queue 里面, 并且没有获取 lock 的 node 的前继节点不可能是 null)
            if(p == head){
                int r = tryAcquireShared(arg);      // 3. 判断前继节点是否是head节点(前继节点是head, 存在两种情况 (1) 前继节点现在占用 lock (2)前继节点是个空节点, 已经释放 lock, node 现在有机会获取 lock); 则再次调用 tryAcquireShared 尝试获取一下
                if(r >= 0){
                    setHeadAndPropagate(node, r);   // 4. 获取 lock 成功, 设置新的 head, 并唤醒后继获取  readLock 的节点
                    p.next = null; // help GC
                    if(interrupted){               // 5. 在获取 lock 时, 被中断过, 则自己再自我中断一下(外面的函数可能需要这个参数)
                        selfInterrupt();
                    }
                    failed = false;
                    return;
                }
            }

            if(shouldParkAfterFailedAcquire(p, node) && // 6. 调用 shouldParkAfterFailedAcquire 判断是否需要中断(这里可能会一开始 返回 false, 但在此进去后直接返回 true(主要和前继节点的状态是否是 signal))
                    parkAndCheckInterrupt()){           // 7. 现在lock还是被其他线程占用 那就睡一会, 返回值判断是否这次线程的唤醒是被中断唤醒
                interrupted = true;
            }
        }
    }finally {
        if(failed){             // 8. 在整个获取中出错(比如线程中断/超时)
            cancelAcquire(node);  // 9. 清除 node 节点(清除的过程是先给 node 打上 CANCELLED标志, 然后再删除)
        }
    }
}

7.5 Метод получения общей блокировки doAcquireSharedInterruptably

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException{
    final Node node = addWaiter(Node.SHARED);            // 1. 将当前的线程封装成 Node 加入到 Sync Queue 里面
    boolean failed = true;

    try {
        for(;;){
            final Node p = node.predecessor();          // 2. 获取当前节点的前继节点 (当一个n在 Sync Queue 里面, 并且没有获取 lock 的 node 的前继节点不可能是 null)
            if(p == head){
                int r = tryAcquireShared(arg);          // 3. 判断前继节点是否是head节点(前继节点是head, 存在两种情况 (1) 前继节点现在占用 lock (2)前继节点是个空节点, 已经释放 lock, node 现在有机会获取 lock); 则再次调用 tryAcquireShared 尝试获取一下
                if(r >= 0){
                    setHeadAndPropagate(node, r);       // 4. 获取 lock 成功, 设置新的 head, 并唤醒后继获取  readLock 的节点
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }

            if(shouldParkAfterFailedAcquire(p, node) && // 5. 调用 shouldParkAfterFailedAcquire 判断是否需要中断(这里可能会一开始 返回 false, 但在此进去后直接返回 true(主要和前继节点的状态是否是 signal))
                    parkAndCheckInterrupt()){           // 6. 现在lock还是被其他线程占用 那就睡一会, 返回值判断是否这次线程的唤醒是被中断唤醒
                throw new InterruptedException();     // 7. 若此次唤醒是 通过线程中断, 则直接抛出异常
            }
        }
    }finally {
        if(failed){              // 8. 在整个获取中出错(比如线程中断/超时)
            cancelAcquire(node); // 9. 清除 node 节点(清除的过程是先给 node 打上 CANCELLED标志, 然后再删除)
        }
    }
}

7.6 Метод получения общей блокировки doAcquireSharedNanos

private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException{
    if (nanosTimeout <= 0L){
        return false;
    }

    final long deadline = System.nanoTime() + nanosTimeout;  // 0. 计算超时的时间
    final Node node = addWaiter(Node.SHARED);               // 1. 将当前的线程封装成 Node 加入到 Sync Queue 里面
    boolean failed = true;

    try {
        for(;;){
            final Node p = node.predecessor();          // 2. 获取当前节点的前继节点 (当一个n在 Sync Queue 里面, 并且没有获取 lock 的 node 的前继节点不可能是 null)
            if(p == head){
                int r = tryAcquireShared(arg);          // 3. 判断前继节点是否是head节点(前继节点是head, 存在两种情况 (1) 前继节点现在占用 lock (2)前继节点是个空节点, 已经释放 lock, node 现在有机会获取 lock); 则再次调用 tryAcquireShared 尝试获取一下
                if(r >= 0){
                    setHeadAndPropagate(node, r);       // 4. 获取 lock 成功, 设置新的 head, 并唤醒后继获取  readLock 的节点
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
            }

            nanosTimeout = deadline - System.nanoTime(); // 5. 计算还剩余的 timeout , 若小于0 则直接return
            if(nanosTimeout <= 0L){
                return false;
            }
            if(shouldParkAfterFailedAcquire(p, node) &&         // 6. 调用 shouldParkAfterFailedAcquire 判断是否需要中断(这里可能会一开始 返回 false, 但在此进去后直接返回 true(主要和前继节点的状态是否是 signal))
                    nanosTimeout > spinForTimeoutThreshold){// 7. 在timeout 小于  spinForTimeoutThreshold 时 spin 的效率, 比 LockSupport 更高
                LockSupport.parkNanos(this, nanosTimeout);
            }
            if(Thread.interrupted()){                           // 7. 若此次唤醒是 通过线程中断, 则直接抛出异常
                throw new InterruptedException();
            }
        }
    }finally {
        if (failed){                // 8. 在整个获取中出错(比如线程中断/超时)
            cancelAcquire(node);    // 10. 清除 node 节点(清除的过程是先给 node 打上 CANCELLED标志, 然后再删除)
        }
    }
}

7.7 Снятие общей блокировки

Когда в очереди синхронизации есть несколько последовательных узлов, которые получают общую блокировку, будет происходить одновременное пробуждение узлов-преемников (поскольку в совместном режиме после получения блокировки соседние узлы-преемники будут активированы для получения блокировки). замок). Сначала вызовите tryReleaseShared подкласса, чтобы снять блокировку, а затем определите, необходимо ли разбудить узел-преемник, чтобы получить блокировку.

private void doReleaseShared(){
    for(;;){
        Node h = head;                      // 1. 获取 head 节点, 准备 release
        if(h != null && h != tail){        // 2. Sync Queue 里面不为 空
            int ws = h.waitStatus;
            if(ws == Node.SIGNAL){         // 3. h节点后面可能是 独占的节点, 也可能是 共享的, 并且请求了唤醒(就是给前继节点打标记 SIGNAL)
                if(!compareAndSetWaitStatus(h, Node.SIGNAL, 0)){ // 4. h 恢复  waitStatus 值置0 (为啥这里要用 CAS 呢, 因为这里的调用可能是在 节点刚刚获取 lock, 而其他线程又对其进行中断, 所用cas就出现失败)
                    continue; // loop to recheck cases
                }
                unparkSuccessor(h);         // 5. 唤醒后继节点
            }
            else if(ws == 0 &&
                    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)){ //6. h后面没有节点需要唤醒, 则标识为 PROPAGATE 表示需要继续传递唤醒(主要是区别 独占节点最终状态0 (独占的节点在没有后继节点, 并且release lock 时最终 waitStatus 保存为 0))
                continue; // loop on failed CAS // 7. 同样这里可能存在竞争
            }
        }

        if(h == head){ // 8. head 节点没变化, 直接 return(从这里也看出, 一个共享模式的 节点在其唤醒后继节点时, 只唤醒一个, 但是它会在获取 lock 时唤醒, 释放 lock 时也进行, 所以或导致竞争的操作)
            break;           // head 变化了, 说明其他节点获取 lock 了, 自己的任务完成, 直接退出
        }

    }
}

8. Резюме

В этой статье в основном рассказывается об основных методах и принципах реализации абстрактного синхронизатора типа очереди AQS. Представлены конкретные реализации исходного кода Node, Condition Queue, Sync Queue, эксклюзивной блокировки получения и освобождения, а также совместно используемой блокировки получения и освобождения. AQS определяет структуру синхронизатора для многопоточного доступа к общим ресурсам, и многие реализации классов синхронизации полагаются на нее.

Подписывайтесь на свежие статьи, приглашаю обратить внимание на мой публичный номер

微信公众号

Ссылаться на

  1. AQS подробное объяснение параллелизма Java
  2. Анализ исходного кода AbstractQueuedSynchronizer (на основе Java 8)