ReentrantLock в Meituan Backstage

Java

В предыдущей статье краткое введениеReentrantLockЗамок. Итак, здесь я представлю методы и атрибуты внутри (эта статья основана на несправедливой блокировке внутри)! ! !

ReentrantLock

Обзор возможностей ReentrantLock

ReentrantLockЭто означает повторную блокировку, что означает, что поток может многократно блокировать критический ресурс. Прямо здесьReentrantLockс часто используемымSynchronizedСравнивать.

ReentrantLock Synchronized
механизм реализации блокировки Зависит от AQS режим монитора
гибкость Поддерживает реагирование на прерывания, тайм-ауты и попытки получения блокировок не гибкий
форма выпуска Должен явно вызывать unlock() для разблокировки Монитор автоматического выпуска
тип замка Должен явно вызывать unlock() для разблокировки Монитор автоматического выпуска
условная очередь Можно связать несколько очередей условий Связать условную очередь
повторный вход возвращающийся возвращающийся

Ассоциация ReentrantLock с AQS

final void lock() {
          if (compareAndSetState(0, 1)) // 设置同步状态
              setExclusiveOwnerThread(Thread.currentThread());//当前线程设置为独占线程。
          else
              acquire(1);// 设置失败,进入acquire 方法进行后续处理。
      }

Приведенный выше код является методом недобросовестной блокировки. Делайте в основном две вещи:

  • Если переменная State (состояние синхронизации) успешно установлена ​​через CAS, то есть блокировка успешно получена, текущая Поток устанавливается как эксклюзивный поток.
  • Если установка переменной State (состояние синхронизации) через CAS не удалась, то есть получение блокировки не удалось, введите Получить метод для последующей обработки.

Если установка состояния синхронизации не удалась, он войдет в соответствующийacquire()способ блокировки. иacquire()Будь то несправедливая блокировка или справедливая блокировка, последним вызовом является метод в родительском классе.

AQS (AbstractQueuedSynchronizer)

Во-первых, давайте посмотрим на общую структуру через следующую диаграмму архитектурыAQSРамка:

  • Цвет на картинке такойMethod, бесцветный этоAttribution.
  • При подключении кастомного синхронизатора нужно только переписать некоторые методы, необходимые первому слою, и закрывать не нужно Обратите внимание на лежащий в основе конкретный процесс реализации. Когда пользовательский синхронизатор выполняет операцию блокировки или разблокировки, первый API первого уровня входит во внутренний метод AQS, затем блокировка получается через второй уровень, а затем блокировка получается через второй уровень. Процесс сбоя получения блокировки входит в очередь ожидания третьего и четвертого уровней для обработки, и эти методы обработки основаны на Это зависит от основного уровня предоставления данных пятого уровня.

AQSОбзор принципа:

Если запрошенный общий ресурс бездействует, то поток, в данный момент запрашивающий ресурс, устанавливается как действительный рабочий поток, а общий ресурс устанавливается в заблокированное состояние; если общий ресурс занят, активируется определенный блокирующий механизм ожидания ожидания. требуется для обеспечения выделения блокировки. Этот механизм в основном реализуется вариантом очереди CLH, который добавляет в очередь потоки, которые не могут временно получить блокировки.

CLH: Очереди Крейга, Лэндина и Хагерстена представляют собой односвязные списки. Очереди в AQS представляют собой виртуальные двусторонние очереди (FIFO) вариантов CLH. AQS реализует выделение блокировок путем инкапсуляции каждого потока, запрашивающего общие ресурсы, в узел.

AQS используетVolatileизintПеременная-член типа для представления состояния синхронизации через встроенный Очередь FIFO для завершения работы в очереди по получению ресурсов, завершение сопряжения через CASStateизменение значения.

AQSструктура данных:

AQSСамая основная структура данных в нем — node. Включенные методы следующие:

Объясните значение нескольких методов и значений свойств:

Методы и значения свойств значение
waitStatus Состояние текущего узла в очереди
thread Указывает поток в этом узле
prev указатель вперед
predecessor Вернитесь к узлу-предшественнику, если нет, бросьте NPE
nextWaiter Указывает на следующий узел в состоянии CONDITION (поскольку в этой статье не рассматривается очередь ConditionQueue, этот указатель не будет подробно описан)
next указатель преемника

Поток два режима блокировки:

модель значение
SHARED Указывает, что потоки ожидают блокировки в общем режиме.
EXCLUSIVE Указывает, что поток ожидает исключительно блокировки

waitStatus имеет следующие значения перечисления:

перечислить значение
0 Значение по умолчанию при инициализации узла
CANCELLED равно 1, что указывает на то, что запрос потока на получение блокировки был отменен.
CONDITION Он равен -2, что указывает на то, что узел находится в очереди ожидания, а поток узла ожидает пробуждения.
PROPAGATE Для -3 это поле будет использоваться только тогда, когда текущий поток находится в ситуации SHARED.
SIGNAL Он равен -1, что указывает на то, что поток готов и просто ждет освобождения ресурса.

AQSСтатус синхронизации в:

AQSподдерживаетstateПоле, означающее статус синхронизации, определяетсяVolatileУкрашено, чтобы показать текущую критичность Статус блокировки ресурса.

  /**
   * The synchronization state.
   */
  private volatile int state;

В эксклюзивном режиме:

В общем режиме:

 Ассоциация важных методов AQS с ReentrantLock

Ниже перечислены методы, которые должны быть реализованы в пользовательских синхронизаторах.tryAcquire-tryRelease,tryAcquireShared-tryReleaseSharedПодойдет один из них.AQSОн также поддерживает пользовательские синхронизаторы для одновременной реализации как эксклюзивных, так и общих методов, таких какReentrantReadWriteLock.ReentrantLockЭто эксклюзивная блокировка, поэтому она реализованаtryAcquire-tryRelease.

имя метода описывать
protected boolean isHeldExclusively() Монополизирует ли поток ресурс. Вам нужно реализовать его только в том случае, если вы используете Condition.
protected boolean tryAcquire(int arg) Эксклюзивный способ. arg — это количество попыток получить блокировку, попытаться получить ресурс, вернуть True в случае успеха и False в случае неудачи.
protected boolean tryRelease(int arg) Эксклюзивный способ. arg — это количество попыток снять блокировку, попытаться освободить ресурс, вернуть True в случае успеха и False в случае сбоя.
protected int tryAcquireShared(int arg) способ обмена. arg — количество попыток получения блокировок и ресурсов. Отрицательное число означает сбой, 0 — успех, но доступных ресурсов нет, положительное число — успех и оставшиеся ресурсы.
protected boolean tryReleaseShared(int arg) способ обмена. arg — это количество попыток снять блокировку, попытаться освободить ресурс и вернуть True, если последующим ожидающим узлам разрешено проснуться после освобождения, в противном случае вернуть False.

На следующем рисунке показана связь методов между недобросовестными блокировками и AQS:

Интерактивный процесс блокировки и разблокировки:

Замок:

  1. Операция блокировки выполняется методом блокировки Lock из ReentrantLock.
  2. Будет вызван метод Lock внутреннего класса Sync.Поскольку Sync#lock является абстрактным методом, согласно ReentrantLock инициализирует выбранную справедливую и нечестную блокировку и выполняет метод Lock соответствующего внутреннего класса. метод, который по существу выполняет метод Acquire AQS.
  3. Метод Acquire AQS выполнит метод tryAcquire, но поскольку tryAcquire должен Определите реализацию синхронизатора, чтобы метод tryAcquire в ReentrantLock выполнялся, потому что ReentrantLock — это метод tryAcquire, реализованный внутренними классами справедливой блокировки и недобросовестной блокировки, поэтому разные tryAcquire будут выполняться в соответствии с разными типами блокировки.
  4. tryAcquire — логика получения блокировки.После сбоя получения будет выполнена последующая логика AQS платформы, за которой следует ReentrantLock не имеет ничего общего с кастомными синхронизаторами.

Разблокировать:

  1. Разблокируйте с помощью метода разблокировки ReentrantLock Unlock.
  2. Unlock вызовет метод Release внутреннего класса Sync, унаследованного от AQS.
  3. Метод tryRelease будет вызываться в Release, для tryRelease требуется собственная реализация синхронизатора, tryRelease реализован только в Sync в ReentrantLock, поэтому видно, что снятие блокировки процесс, и не различает, является ли это справедливой блокировкой или нет.
  4. После успешного выпуска вся обработка выполняется платформой AQS и не имеет ничего общего с пользовательскими синхронизаторами.

Из приведенного выше описания мы, вероятно, можем обобщить отношения сопоставления основных методов уровня API, когда ReentrantLock заблокирован и разблокирован:

Понимание AQS с ReentrantLock

Из приведенного выше простого анализа мы знаем, что если текущий поток не получит блокировку, он войдет в очередь ожидания.Давайте посмотрим, когда и как поток добавляется в очередь ожидания.

Поток присоединяется к очереди ожидания

Когда Acquire(1) выполняется, блокировка получается через tryAcquire. В этом случае, если блокировка приобретена В случае сбоя будет вызван addWaiter для присоединения к очереди ожидания.

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

Основной процесс выглядит следующим образом:

  1. Создайте новый узел с текущим потоком и режимом блокировки.
  2. Указатель Pred указывает на хвостовой узел Tail.
  3. Наведите указатель Prev узла Node в New на Pred.
  4. Завершите настройку хвостового узла с помощью метода compareAndSetTail. Этот метод в основном tailOffset и Expect сравниваются, если узлы tailOffset и Expect's Node Адрес тот же, затем установите значение Tail равным значению Update (используя кнопкуCAS).
  5. Если указатель PRE - NULL (указывает на то, что в очереди ожидания нет элементов), либо текущий указатель ПРЕД и Расположение, указанное на хвостом, отличается (указывает на то, что он был изменен другими потоками), поэтому вам нужно посмотреть на метод ENQ.
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

Если он не инициализирован, необходимо инициализировать головной узел. Но обратите внимание, что инициализированный заголовок Узел является не текущим узлом потока, а узлом, вызвавшим конструктор без аргументов. Если после инициализации или Если в очереди есть элементы из-за параллелизма, это то же самое, что и предыдущий метод. фактически,addWaiterдвойной Операция добавления хвостового узла в конечный связанный список, следует отметить, что головной узел двуконечного связанного списка является конструктором без аргументов. головной узел.

Суммируйте шаги для получения блокировки потоком:

  1. Когда ни один поток не получает блокировку, поток 1 успешно получает блокировку.
  2. Поток 2 применяется для блокировки, но блокировка удерживается потоком 1.
  3. Если есть больше потоков для получения блокировок, они могут быть поставлены в очередь в очереди по очереди.

Возвращаясь к коду выше,hasQueuedPredecessorsКогда это справедливая блокировка, считается, что она находится в очереди ожидания. Есть ли метод для действительных узлов. если вернутьсяFalse, указывающее, что текущий поток может стремиться к общим ресурсам; если возвращениеTrue, указывающее, что в очереди есть действительные узлы и текущий поток должен присоединиться к ожидающей очереди.

    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

Увидев это, давайте поймем h != t && ((s = h.next) == null || s.thread != Thread. текущий поток());

Зачем судить о следующем узле головного узла? Данные, хранящиеся в первом узле, Какие?

Фактически в двусвязном списке первый узел является виртуальным узлом, который фактически не хранит никакой информации, а только занимает место. настоящий Положительный первый узел с данными начинается со второго узла. Когда h != t : if (s =h.next) == null, очередь ожидания инициализируется потоком, но только до тех пор, покаTailСсылаться на В направленииHead, безHeadнаправлениеTail, в это время в очереди есть элементы и их нужно вернутьTrue(эта часть Подробности см. в анализе кода ниже). Если (s = h.next) != null, это означает, что существует хотя бы один действительный узел. Если s.thread == Thread.currentThread() в это время, это означает ожидающую очередь Поток в первом допустимом узле столбца совпадает с текущим потоком, тогда текущий поток может получать ресурсы ; если s.thread != Thread.currentThread(), действителен первый в очереди ожидания Поток узла отличается от текущего потока, текущий поток должен быть добавлен в очередь ожидания.

 1 if (t == null) { // Must initialize
 2               if (compareAndSetHead(new Node()))
 3                   tail = head;
 4           } else {
 5               node.prev = t;
 6               if (compareAndSetTail(t, node)) {
 7                   t.next = node;
 8                   return t;
 9               }
 10           }

Постановка узла в очередь не является атомарной операцией, поэтому будет короткая голова != хвост, в это время хвост указывает на конец Узел, а Хвост указывает на Голову. Если Голова не указывает на Хвост (см. строки 5, 6, 7), В этом случае соответствующий поток также необходимо добавить в очередь. Итак, этот фрагмент кода предназначен для угловых случаев Проблемы параллелизма.

Ожидание удаления потока из очереди

public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

объяснено вышеaddWaiterметод, этот метод на самом деле состоит в том, чтобы поместить соответствующий поток вNodeДанные Структура добавляется в двустороннюю очередь и возвращаетNode. и этоNodeбудет использоваться как параметр для вводаacquireQueuedметод.acquireQueuedметод может использоваться для строки в очереди Процесс выполняет «заблокированную» операцию. Как правило, потоку не удается получить блокировку, и он помещается в очередь ожидания.acquireQueuedположит Потоки в очереди продолжают запрашивать блокировку до тех пор, пока получение не будет успешным или больше не будет требоваться (прервано). Давайте проанализируем с двух сторон: «когда удалять из очереди?» и «как удалять из очереди?»acquireQueuedИсходный код:

final boolean acquireQueued(final Node node, int arg) {
        // 标记是否成功拿到资源
        boolean failed = true;
        try {
             // 标记等待过程中是否中断过
            boolean interrupted = false;
            // 开始自旋,要么获取锁,要么中断
            for (;;) {
                // 获取当前节点的前驱节点
                final Node p = node.predecessor();
                // 如果 p 是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(别忘了头结点是虚节点)
                if (p == head && tryAcquire(arg)) {
                    // 获取锁成功,头指针移动到当前 node
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 说明 p 为头节点且当前没有获取到锁(可能是非公平锁被抢占了)或者是 p不为头结点,这个时候就要判断当前 node 是否要被阻塞(被阻塞条件:前驱节点的waitStatus 为 -1),防止无限循环浪费资源。具体两个方法下面细细分析
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

Примечание. Метод setHead устанавливает текущий узел как виртуальный узел, но не изменяет его.waitStatus,так как Это данные, которые нужны всегда.

private void setHead(Node node) {
         head = node;
         node.thread = null;
         node.prev = null; 
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer
// 靠前驱节点判断当前线程是否应该被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 获取头结点的节点状态
        int ws = pred.waitStatus;
        // 说明头结点处于唤醒状态
        if (ws == Node.SIGNAL)
            return true; 
        // 通过枚举值我们知道 waitStatus>0 是取消状态
        if (ws > 0) {
            do {
            // 循环向前查找取消节点,把取消节点从队列中剔除
             node.prev = pred = pred.prev;
             } while (pred.waitStatus > 0);
            pred.next = node;
         } else {
        // 设置前任节点等待状态为 SIGNAL
         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
         }
        return false; 
    
}

parkAndCheckInterrupt в основном используется для приостановки текущего потока, блокировки стека вызовов и возврата статуса прерывания текущего потока.

private final boolean parkAndCheckInterrupt() {
         LockSupport.park(this);
         return Thread.interrupted();
}

Блок-схема вышеуказанного метода выглядит следующим образом:

Как видно из приведенного выше рисунка, условие выхода из текущего цикла — это когда «предыдущий узел является головным узлом, а текущий поток получает Блокировка была успешно получена». Чтобы предотвратить трату ресурсов ЦП из-за бесконечного цикла, мы будем судить о состоянии переднего узла. Чтобы принять решение о приостановке текущего потока, конкретный приостанавливающий процесс представлен в виде следующей блок-схемы (процессshouldParkAfterFailedAcquire):

Сомнения по поводу освобождения узлов из очереди ушли, поэтому появилась новая проблема:

  1. Как создается узел отмены в shouldParkAfterFailedAcquire? Когда для состояния ожидания узла будет установлено значение -1?
  2. Когда уведомление об освобождении узла отправляется приостановленному потоку?

Генерация узла статуса CANCELED

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

С помощью метода cancelAcquire пометьте состояние узла какCANCELLED. Следующий, Разберем принцип работы этого метода построчно:

private void cancelAcquire(Node node) {
        // 将无效节点过滤
        if (node == null)
            return;
        // 设置该节点不关联任何线程,也就是虚节点
        node.thread = null;
        Node pred = node.prev;
        // 通过前驱节点,跳过取消状态的 node
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
        // 获取过滤后的前驱节点的后继节点
        Node predNext = pred.next;
        // 把当前 node 的状态设置为 CANCELLED
        node.waitStatus = Node.CANCELLED;
        // 如果当前节点是尾节点,将从后往前的第一个非取消状态的节点设置为尾节点
        // 更新失败的话,则进入 else,如果更新成功,将 tail 的后继节点设置为 null
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // 如果当前节点不是 head 的后继节点,
            // 1: 判断当前节点前驱节点的是否为 SIGNAL,
            // 2: 如果不是,则把前驱节点设置为 SINGAL 看是否成功
            // 如果 1 和 2 中有一个为 true,再判断当前节点的线程是否为 null
            // 如果上述条件都满足,把当前节点的前驱节点的后继指针指向当前节点的后继节点
            int ws;
            if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                // 如果当前节点是 head 的后继节点,或者上述条件不满足,那就唤醒当前节点的后继节点
                unparkSuccessor(node);
            }
            node.next = node; // help GC
        }
    }

Текущий процесс:

  1. Получить узел-предшественник текущего узла. Если состояние узла-предшественника ОТМЕНЕНО, он всегда будет Перейдите вперед, найдите первый узел с waitStatus

В зависимости от положения текущего узла рассмотрим следующие три случая:

1. 当前节点是尾节点。
2. 当前节点是 Head 的后继节点。
3. 当前节点不是 Head 的后继节点,也不是尾节点。

Согласно второму пункту выше, мы анализируем течение каждой ситуации.

Текущий узел является хвостовым узлом:

Текущий узел является преемником Head:
Текущий узел не является ни преемником Головы, ни хвостом:

Благодаря описанному выше процессу мы имеемCANCELLEDГенерация и изменение состояния узла примерно понятно, но почему все изменения правильныеNextУказатель был изменен без указателя Prev управлять им? при каких обстоятельствах будетPrevуказатель для работы?

  1. При выполнении cancelAcquire узел-предшественник текущего узла может быть удален из очереди. (метод shouldParkAfterFailedAcquire в блоке кода Try был выполнен), если вы измените его в это времяPrevуказатель, можно вызватьPrevуказывает на другой, который был удален из очередиNode, Итак, это изменениеPrevУказатели не безопасны. В методе shouldParkAfterFailedAcquire Будет выполнен следующий код, который фактически обрабатываетPrevуказатель. shouldParkAfterFailedAcquire выполняется только в случае сбоя получения блокировки.После ввода этого метода он указывает, что общий ресурс был приобретен, узлы перед текущим узлом не изменятся, поэтому на этот раз изменитсяPrevуказатель безопаснее.
do {
 node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);

Как открыть

Так как ReentrantLock при разблокировке не делает различий между честными и нечестными блокировками, то мы напрямую смотрим на разблокированный исходный код:

public void unlock() {
    sync.release(1);
}
public final boolean release(int arg) {
        if (tryRelease(arg)) {
             Node h = head;
        // 头结点不为空并且头结点的 waitStatus 不是初始化节点情况,解除线程挂起状态
        if (h != null && h.waitStatus != 0)
             unparkSuccessor(h);
            return true;
         }
        return false; 
}
// 方法返回当前锁是不是没有被线程持有
protected final boolean tryRelease(int releases) {
    // 减少可重入次数
    int c = getState() - releases;
    // 当前线程不是持有锁的线程,抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 如果持有线程全部释放,将当前独占锁所有线程设置为 null,并更新 state
    if (c == 0) {
         free = true;
         setExclusiveOwnerThread(null);
     }
    setState(c);
    return free;
}

Почему здесь условие оценки h != null && h.waitStatus != 0 ? h == ноль означаетHeadЕще не инициализирован. В исходном случае head == null, первый узел ставится в очередь,HeadВиртуальный узел будет инициализирован. Поэтому, если нет времени влиться в команду, она выбывает. Теперь голова == нулевой регистр. h != null && waitStatus == 0 указывает, что поток, соответствующий узлу-преемнику, все еще работает и его не нужно пробуждать. h != null && waitStatus

private void unparkSuccessor(Node node) {
    // 获取头结点 waitStatus
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 获取当前节点的下一个节点
     Node s = node.next;
    // 如果下个节点是 null 或者下个节点被 cancelled,就找到队列最开始的非cancelled 的节点
    if (s == null || s.waitStatus > 0) {
         s = null;
    // 就从尾部节点开始找,到队首,找到队列第一个 waitStatus<0 的节点。
    for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
             }
    // 如果当前节点的下个节点不为空,而且状态 <=0,就把当前节点 unpark
    if (s != null)
        LockSupport.unpark(s.thread);
}

Зачем искать первую не-Cancelledузел? Причина в следующем. Предыдущий метод addWaiter:

private Node addWaiter(Node mode) {
     Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
     Node pred = tail;
    if (pred != null) {
         node.prev = pred;
    if (compareAndSetTail(pred, node)) {
             pred.next = node;
             return node;
         }
     }
     enq(node);
    return node;
}

Отсюда видно, что node enqueue не является атомарной операцией, то есть node.prev = pred;compareAndSetTail(pred, node) Эти два места можно рассматривать какTailпоставленные в очередь атомарные операции, Но в это время pred.next=node не был выполнен, если в это время выполняется unparkSuccessor метод, нет возможности найти его спереди назад, поэтому вам нужно найти его сзади наперед. Есть еще одна причина дляCANCELLEDКогда узел состояния, первое отключениеNextуказатель,PrevУказатель не сломан, поэтому Также необходимо пройти от задней части к передней, чтобы иметь возможность пройти через всеNode. Подводя итог, если вы ищете спереди назад, из-за неатомарных операций иCANCELLEDОтключение во время генерации узлаNextРабота с указателями может привести к невозможности обойти все некоторые узлы. Следовательно, после пробуждения соответствующего потока соответствующий поток будет продолжать выполняться. Как обработать прерывание после продолжения выполнения методаAcquireQueued?

Поток выполнения после восстановления прерывания

После пробуждения будет выполнена функция return Thread.interrupted(); Эта функция возвращает прерванный статус текущего потока выполнения и очищает его.

private final boolean parkAndCheckInterrupt() {
     LockSupport.park(this);
    return Thread.interrupted();
}

Снова вернитесь к коду AcquireQueued, когда функция parkAndCheckInterrupt вернется.TrueилиFalseкогда,interruptedЗначения разные, но будет выполняться следующий цикл. Если блокировка успешно получена в это время, текущаяinterruptedвозвращение.

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                     setHead(node);
                     p.next = null; // help GC
                     failed = false;
                    return interrupted;
                 }
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
             }
         } finally {
            if (failed)
              cancelAcquire(node);
         }
}

если AcquireQueuedTrue, выполняется метод selfInterrupt.

static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

Этот метод на самом деле предназначен для прерывания потока. Но зачем прерывать поток после получения блокировки? Эта часть принадлежит Для получения информации о совместном прерывании, предоставляемой Java, заинтересованные студенты могут ознакомиться с ней. Вот краткое введение:

  1. Когда прерванный поток пробуждается, ему неизвестна причина пробуждения.Возможно, текущий поток прерывается во время ожидания или он может быть разбужен после снятия блокировки. Итак, мы передаем Thread.interrupted() Метод проверяет флаг прерывания (этот метод возвращает статус прерывания текущего потока и установлен в False) и записать его, и если обнаружит, что поток был прерван, снова прервет его.
  2. Поток пробуждается во время ожидания ресурса, и после пробуждения он будет продолжать попытки получить блокировку, пока не захватит блокировку. То есть в течение всего процесса он не реагирует на прерывание, а только записывает запись о прерывании. Наконец захватите блокировку и вернитесь, затем, если она была прервана, вам нужно добавить прерывание. Метод обработки здесь в основном заключается в использовании базового операционного модуля Worder в пуле потоков.runWorker, Дополнительная обработка суждения выполняется с помощью Thread.interrupted().Заинтересованные студенты могут посмотреть исходный код ThreadPoolExecutor.

резюме

Вопрос. Каковы последующие действия для потока, которому не удалось получить блокировку?

О: Существует какой-то механизм ожидания в очереди, поток продолжает ждать, возможность получения блокировки по-прежнему зарезервирована, и процесс получения блокировки продолжается.

В: Раз упоминается механизм очередей, то должно быть какое-то формирование очереди, какова структура данных такой очереди?

A: представляет собой дек FIFO варианта CLH.

Вопрос. Когда поток в механизме организации очередей сможет получить блокировку?

A: Вы можете увидеть выше в деталях ==>Ожидание удаления потока из очереди

Вопрос. Если поток в механизме очередей не смог получить блокировку, нужно ли мне все время ждать? Или есть другие стратегии решения этой проблемы?

О: Состояние узла, на котором находится поток, станет отмененным, а узел в отмененном состоянии будет освобожден из очереди, подробнее см. выше ==>Генерация узла статуса CANCELED.

В: Функция блокировки заблокирована с помощью метода получения, но как она заблокирована?

A: Acquire AQS вызовет метод tryAcquire. TryAcquire реализуется каждым настраиваемым синхронизатором, а процесс блокировки завершается через tryAcquire.

Приложение AQS

Реентерабельное применение ReentrantLock

Реентерабельность ReentrantLockAQSОдно из хороших приложений.После понимания вышеуказанных знаний мы узнали, что ReentrantLock реализует реентерабельные методы. В ReentrantLock, будь то справедливая блокировка или нечестная блокировка, есть логика.

Честный замок:

if (c == 0) {
    if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(current);
        return true;
     }
}else if (current == getExclusiveOwnerThread()) {
     int nextc = c + acquires;
     if (nextc < 0)
        throw new Error("Maximum lock count exceeded");
     setState(nextc);
     return true; 
}

Несправедливая блокировка:

if (c == 0) {
    if (compareAndSetState(0, acquires)){
        setExclusiveOwnerThread(current);
        return true;
     }
}else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true; 
}

Как видно из двух предыдущих абзацев, есть состояние синхронизацииStateконтролировать общую ситуацию повторного входа.StateдаVolatileМодифицировано, используется для обеспечения некоторой наглядности и порядка.

Далее рассмотрим основной процесс поля State:

  1. Состояние инициализируется равным 0, что указывает на то, что ни один поток не удерживает блокировку.
  2. Когда поток удерживает блокировку, исходное значение будет +1. Если один и тот же поток получает блокировку несколько раз, он будет несколько раз +1. Это концепция повторного входа.
  3. Разблокировка также равна -1 для этого поля, пока не будет равно 0, поток снимает блокировку.

Сценарии приложений в JUC

В дополнение к реентерабельному приложению ReentrantLock выше,AQSВ качестве основы для параллельного программирования он обеспечивает хорошее решение для многих других инструментов синхронизации. Несколько инструментов синхронизации в JUC перечислены ниже, а общее введениеAQSСценарии применения:

Инструмент синхронизации Привязка инструментов синхронизации к AQS
ReentrantLock Количество повторных удерживаний блокировки с помощью AQS. Когда поток получает блокировку, ReentrantLock записывает идентификатор потока, который в данный момент получает блокировку, что используется для определения того, происходит ли повторное получение блокировки, и для обработки исключений, когда неверный поток пытается разблокировать операцию.
Semaphore Используйте состояние синхронизации AQS, чтобы сохранить текущий счетчик семафора. tryRelease увеличивает счетчик, а AcquireShared уменьшает его.
CountDownLatch Используйте состояние синхронизации AQS для представления счетчиков. Когда счетчик равен 0, все операции Acquire (метод await CountDownLatch) могут быть пройдены.
ReentrantReadWriteLock 16 бит в состоянии синхронизации AQS используются для хранения количества удерживаемых блокировок записи, а оставшиеся 16 бит используются для хранения количества удерживаемых блокировок чтения.
ThreadPoolExecutor Рабочий процесс использует состояние синхронизации AQS для реализации настройки эксклюзивных переменных потока (tryAcquire и tryRelease).

Суммировать

Существует слишком много сценариев, в которых мы используем параллелизм в нашей повседневной разработке, но не многие люди понимают основные принципы фреймворка внутри параллелизма. А в случае многопоточности найти проблему тоже большая проблема. Только заложив прочный фундамент, мы сможем двигаться дальше.

Справочная статья:Мейтуан за кулисами