предисловие
Условие является компаньоном Lock.Что касается того, как его использовать, мы уже писали несколько статей, таких какРеализуйте очередь блокировки, используя ReentrantLock и Condition,Три блокировки Java в параллельном программировании, в обеих статьях мы подробно описываем их использование. Сегодня мы подробно рассмотрим реализацию исходного кода.
Метод строительства
Condition
Интерфейс имеет 2 класса реализации, одинAbstractQueuedSynchronizer.ConditionObject
, а другой естьAbstractQueuedLongSynchronizer.ConditionObject
, являются внутренними классами AQS, структура классов следующая:
Несколько общедоступных методов:
- await()
- await(long time, TimeUnit unit)
- awaitNanos(long nanosTimeout)
- awaitUninterruptibly()
- awaitUntil(Date deadline)
- signal()
- signalAll()
Сегодня мы сосредоточимся на 2 методах и 2 наиболее часто используемых методах: ожидание и сигнал.
метод ожидания
Сначала вставьте волну кода и добавьте комментарии:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 创建一个新的节点,追加到 Condition 队列中最后一个节点.
Node node = addConditionWaiter();
// 释放这个锁,并唤醒 AQS 队列中一个线程.
int savedState = fullyRelease(node);
int interruptMode = 0;
// 判断这个节点是否在 AQS 队列上,第一次判断总是返回 false
while (!isOnSyncQueue(node)) {
// 第一次总是 park 自己,开始阻塞等待
LockSupport.park(this);
// 线程判断自己在等待过程中是否被中断了,如果没有中断,则再次循环,会在 isOnSyncQueue 中判断自己是否在队列上.
// isOnSyncQueue 判断当前 node 状态,如果是 CONDITION 状态,或者不在队列上了(JDK 注释说,由于 CAS 操作队列上的节点可能会失败),就继续阻塞.
// isOnSyncQueue 判断当前 node 还在队列上且不是 CONDITION 状态了,就结束循环和阻塞.
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
// 如果被中断了,就跳出循环
break;
}
// 当这个线程醒来,会尝试拿锁, 当 acquireQueued 返回 false 就是拿到锁了.
// interruptMode != THROW_IE >>> 表示这个线程没有成功将 node 入队,但 signal 执行了 enq 方法让其入队了.
// 将这个变量设置成 REINTERRUPT.
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 如果 node 的下一个等待者不是 null, 则进行清理,清理 Condition 队列上的节点.
// 如果是 null ,就没有什么好清理的了.
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 如果线程被中断了,需要抛出异常.或者什么都不做
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
Подводя итог логике этого метода:
- В Condition поддерживается очередь, и всякий раз, когда выполняется метод await, узел создается на основе текущего потока и добавляется в хвост.
- Затем блокировка снимается, и пробуждается поток, заблокированный в очереди AQS блокировки.
- Затем заблокируйте себя.
- После того, как его разбудил другой поток, поместите узел только что в очередь AQS.Следующее действие — этот узел, например захват блокировок.
- Затем он попытается захватить блокировку.Дальше логика такая же, как и в обычном замке.Если его не удается захватить, он блокируется, а если захвачен, то продолжает выполняться.
Взгляните на подробную реализацию исходного кода, как Condition добавляет узлы? Метод addConditionWaiter выглядит следующим образом:
// 该方法就是创建一个当前线程的节点,追加到最后一个节点中.
private Node addConditionWaiter() {
// 找到最后一个节点,放在局部变量中,速度更快
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
// 如果最后一个节点失效了,就清除链表中所有失效节点,并重新赋值 t
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 创建一个当前线程的 node 节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 如果最后一个节点是 null
if (t == null)
// 将当前节点设置成第一个节点
firstWaiter = node;
else
// 如果不是 null, 将当前节点追加到最后一个节点
t.nextWaiter = node;
// 将当前节点设置成最后一个节点
lastWaiter = node;
// 返回
return node;
}
Создайте узел текущего потока и присоедините его к последнему узлу.Конечно, также есть вызов метода unlinkCancelledWaiters.При сбое последнего узла необходимо очистить недопустимые узлы в очереди Condition.Код следующим образом:
// 清除链表中所有失效的节点.
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
// 当 next 正常的时候,需要保存这个 next, 方便下次循环是链接到下一个节点上.
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
// 如果这个节点被取消了
if (t.waitStatus != Node.CONDITION) {
// 先将他的 next 节点设置为 null
t.nextWaiter = null;
// 如果这是第一次判断 trail 变量
if (trail == null)
// 将 next 变量设置为 first, 也就是去除之前的 first(由于是第一次,肯定去除的是 first)
firstWaiter = next;
else
// 如果不是 null,说明上个节点正常,将上个节点的 next 设置为无效节点的 next, 让 t 失效
trail.nextWaiter = next;
// 如果 next 是 null, 说明没有节点了,那么就可以将 trail 设置成最后一个节点
if (next == null)
lastWaiter = trail;
}
// 如果该节点正常,那么就保存这个节点,在下次链接下个节点时使用
else
trail = t;
// 换下一个节点继续循环
t = next;
}
}
Так как же снять блокировку и разбудить поток на узле в очереди AQS? Метод fullRelease выглядит следующим образом:
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 获取 state 变量
int savedState = getState();
// 如果释放成功,则返回 state 的大小,也就是之前持有锁的线程的数量
if (release(savedState)) {
failed = false;
return savedState;
} else {
// 如果释放失败,抛出异常
throw new IllegalMonitorStateException();
}
} finally {
//释放失败
if (failed)
// 将这个节点是指成取消状态.随后将从队列中移除.
node.waitStatus = Node.CANCELLED;
}
}
А в методе выпуска, как это работает? код показывает, как показано ниже:
// 主要功能,就是释放锁,并唤醒阻塞在锁上的线程.
public final boolean release(int arg) {
// 如果释放锁成功,返回 true, 可能会抛出监视器异常,即当前线程不是持有锁的线程.
// 也可能是释放失败,但 fullyRelease 基本能够释放成功.
if (tryRelease(arg)) {
// 释放成功后, 唤醒 head 的下一个节点上的线程.
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
// 释放失败
return false;
}
Метод release в основном вызывает метод tryRelease, который снимает блокировку. Код tryRelease выглядит следующим образом:
// 主要功能就是对 state 变量做减法, 如果 state 变成0,则将持有锁的线程设置成 null.
protected final boolean tryRelease(int releases) {
// 计算 state
int c = getState() - releases;
// 如果当前线程不是持有该锁的线程,则抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果结果是 0,说明成功释放了锁.
if (c == 0) {
free = true;
// 将持有当前锁的线程设置成 null.
setExclusiveOwnerThread(null);
}
// 设置变量
setState(c);
return free;
}
Что ж, мы, наверное, знаем, как Condition снимает блокировку, так как же оно блокирует себя? Прежде чем заблокировать себя, вам нужно вызвать метод isOnSyncQueue, чтобы судить, код выглядит следующим образом:
final boolean isOnSyncQueue(Node node) {
// 如果他的状态不是等地啊,且他的上一个节点是 null, 便不在队列中了
// 这里判断 == CONDITION,实际上是第一次判断,而后面的判断则是线程醒来后的判断.
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 如果他的 next 不是 null, 说明他还在队列上.
if (node.next != null) // If has successor, it must be on queue
return true;
// 如果从 tail 开始找上一个节点,找到了给定的节点,说明也在队列上.返回 true.
return findNodeFromTail(node);
}
На самом деле, он всегда возвращает fasle в первый раз, таким образом входя в блок while для вызова метода park и блокируя себя.В этот момент Condition успешно освобождает блокировку, в которой оно находится, и блокирует себя.
Несмотря на то, что он заблокирован, кто-то всегда будет вызывать метод signal, чтобы разбудить его.После пробуждения перейдите к следующей логике if, то есть к методу checkInterruptWhileWaiting.Название — проверка состояния прерывания во время ожидания.Код такой следует:
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
// transferAfterCancelledWait >>>> 如果将 node 放入 AQS 队列失败,就返回 REINTERRUPT, 成功则返回 THROW_IE
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}
Когда поток прерывается, он должен возвращать разные константы в соответствии с возвращаемым значением вызова метода TransferAfterCancelledWait.Какова внутренняя логика этого метода?
final boolean transferAfterCancelledWait(Node node) {
// 将 node 的状态设置成 0 成功后,将这个 node 放进 AQS 队列中.
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
// 如果 CAS 失败, 返回 false ,
// 当 node 不在 AQS 节点上, 就自旋. 直到 enq 方法完成.
// JDK 认为, 在这个过程中, node 不在 AQS 队列上是少见的,也是暂时的.所以自旋.
// 如果不自旋的话,后面的逻辑是无法继续执行的. 实际上,自旋是在等待在 signal 中执行 enq 方法让 node 入队.
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
На самом деле это попытка поставить себя в очередь. Если его нельзя поставить, он крутится и ждет, пока сигнальный метод его поставит.
Вернитесь к методу await, продолжайте идти вниз, выполните 3 блока if, первый блок if, попытайтесь получить блокировку, почему? Т.к. в это время поток пробудился, и он стоит в очереди AQS, то ему нужно получить блокировку, когда он проснется. Логика взятия блокировки — это методAcquireQueued.Код выглядит следующим образом:
// 返回结果:是否被中断了, 当返回 false 就是拿到锁了,反之没有拿到.
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 返回他的上一个节点
final Node p = node.predecessor();
// 如果这个节点的上个节点是 head, 且成功获取了锁.
if (p == head && tryAcquire(arg)) {
// 将当前节点设置成 head
setHead(node);
// 他的上一个节点(head)设置成 null.
p.next = null; // help GC
failed = false;
// 返回 false,没有中断
return interrupted;
}
// shouldParkAfterFailedAcquire >>> 如果没有获取到锁,就尝试阻塞自己等待(上个节点的状态是 -1 SIGNAL).
// parkAndCheckInterrupt >>>> 返回自己是否被中断了.
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
Обратите внимание, что если блокировка не получена здесь, она будет заблокирована в методе parkAndCheckInterrupt. Это точно такой же узел очереди в обычном AQS, ничего особенного.
Существует метод tryAcquire, на который стоит обратить внимание: это логика попытки получить блокировку. код показывает, как показано ниже:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 如果锁的状态是空闲的.
if (c == 0) {
// !hasQueuedPredecessors() >>> 是否含有比自己的等待的时间长的线程, false >> 没有
// compareAndSetState >>> CAS 设置 state 变量成功
// 设置当前线程为锁的持有线程成功
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
// 上面 3 个条件都满足, 抢锁成功.
return true;
}
}
// 如果 state 状态不是0, 且当前线程和锁的持有线程相同,则认为是重入.
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
Основная логика состоит в том, чтобы установить переменную состояния, чтобы превратить блокирующий поток в себя. Это происходит без потоков, ожидающих дольше, чем они сами. Это означает, что потоки, ожидающие в течение длительного времени, получают приоритет для получения блокировки. Конечно, здесь есть некоторая реентерабельная логика.
Следующие два блока if просты.Если в Condition есть узлы, то попробуйте подчистить неверные узлы и вызвать метод unlinkCancelledWaiters.Этот метод мы разобрали выше, поэтому повторять анализ не будем.
Наконец, чтобы определить, прерван ли он, выполните метод reportInterruptAfterWait, который может вызвать исключение или пометить текущий поток прерыванием.
код показывает, как показано ниже:
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
Ну вот и закончен разбор метода await.Видно, что арендодатель оставил много комментариев.На самом деле это все для лучшего обзора в будущем, а так же заинтересованным студентам удобно анализировать исходный код вместе с моими комментариями.
Краткое изложение этого метода было сказано в начале, поэтому я не буду повторяться. Позже мы подведем итог вместе с сигнальным методом.
Давайте посмотрим на реализацию метода сигнала.
сигнальный метод
код показывает, как показано ниже:
public final void signal() {
// 如果当前线程不是持有该锁的线程.抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 拿到 Condition 队列上第一个节点
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
Ясно, что стратегия пробуждения начинается с головы.
Взгляните на реализацию метода doSignal(first):
private void doSignal(Node first) {
do {
// 如果第一个节点的下一个节点是 null, 那么, 最后一个节点也是 null.
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 将 next 节点设置成 null.
first.nextWaiter = null;
// 如果修改这个 node 状态为0失败了(也就是唤醒失败), 并且 firstWaiter 不是 null, 就重新循环.
// 通过从 First 向后找节点,直到唤醒或者没有节点为止.
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
Основное внимание уделяется методу transferForSignal, который определенно выполняет операцию пробуждения.
final boolean transferForSignal(Node node) {
// 如果不能改变状态,就取消这个 node.
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 将这个 node 放进 AQS 的队列,然后返回他的上一个节点.
Node p = enq(node);
int ws = p.waitStatus;
// 如果上一个节点的状态被取消了, 或者尝试设置上一个节点的状态为 SIGNAL 失败了(SIGNAL 表示: 他的 next 节点需要停止阻塞),
// 唤醒输入节点上的线程.
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
// 如果成功修改了 node 的状态成0,就返回 true.
return true;
}
Конечно же, я видел операцию unpark.Этот метод сначала изменяет состояние узла с помощью CAS.В случае успеха он помещает узел в очередь AQS, а затем пробуждает поток на этом узле. В этот момент этот узел просыпается в методе await и начинает попытки получить блокировку после выполнения метода checkInterruptWhileWaiting.
Суммировать
Кратко опишите процесс выполнения условия await и signal.
-
Прежде всего, если поток хочет выполнить метод await, он должен получить блокировку.В AQS головка, которая захватывает блокировку, обычно является головкой, а затем эта головка терпит неудачу и удаляется из очереди.
-
После того, как текущий поток (т. head) узел в очереди AQS, а затем заблокируйте себя, ожидая пробуждения по сигналу.
-
Когда другой поток вызывает метод signal, он пробуждает первый узел в очереди Condition, а затем помещает этот узел в конец очереди AQS.
-
После пробуждения потока, заблокированного в методе await, он был переведен из очереди Condition в очередь AQS, в это время он является обычным узлом AQS и попытается захватить блокировку. И удалите недопустимые узлы из очереди условий.
На рисунке ниже показаны эти шаги.
Ну вот и все, что касается Condition, в общем случае это реализуется путем добавления очереди сна в Condition. Пока вызывается метод await, он будет находиться в спящем режиме, войдет в очередь условий, вызовет сигнальный метод, и поток будет взят из очереди условий и вставлен в очередь AQS, а затем проснется, позволяя потоку схватить замок самостоятельно.