Введение
Подобно методу монитора базового класса Object, интерфейс Condition также предоставляет аналогичные методы, которые можно использовать с Lock для реализации режима ожидания/уведомления.
Метод монитора объекта сравнивается с интерфейсом Condition:
Контраст | Методы мониторинга объектов | Condition |
Предварительные условия | получить блокировку объекта | Вызовите Lock.lock(), чтобы получить блокировку Вызовите Lock.newCondition(), чтобы получить объект Condition |
метод вызова | позвонить напрямую Например: object.wait() |
позвонить напрямую Например: условие. ожидание () |
количество ожидающих очередей | Один | несколько |
Текущий поток снимает блокировку и переходит в состояние ожидания. | служба поддержки | служба поддержки |
Текущий поток снимает блокировку и переходит в состояние ожидания и не отвечает на прерывания в состоянии ожидания | не поддерживается | служба поддержки |
Текущий поток снимает блокировку и переходит в состояние ожидания тайм-аута. | служба поддержки | служба поддержки |
Текущий поток освобождает блокировку и переходит в состояние ожидания до определенного момента в будущем. | не поддерживается | служба поддержки |
Разбудить поток в очереди ожидания | служба поддержки | служба поддержки |
Разбудить все потоки в очереди ожидания | служба поддержки | служба поддержки |
Реализация условия
ПредставляемAQSСинхронизатор знает, что он поддерживаетсяОдинОчередь синхронизации фактически поддерживаетсянесколькоочередь ожидания,Обе очереди являются очередями FIFO., F4 смотрит на класс реализации Condition, и вы можете обнаружить, что он имеет только эти два класса (разница между этими двумя классами в том, что состояние синхронизации состояния — это int и long)
очередь ожидания
В частности, давайте взглянем на внутренний класс ConditionObject в классе реализации Condition AQS.
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
// 头节点
private transient Node firstWaiter;
// 尾结点
private transient Node lastWaiter;
public ConditionObject() { }
...
}
Структура показана на рисунке (очередь в один конец):
await()
Процесс метода await() эквивалентенГоловной узел очереди синхронизации (узел, получивший блокировку) перемещается в очередь ожидания Условия.
public final void await() throws InterruptedException {
// 判断当前线程是否中断
if (Thread.interrupted())
throw new InterruptedException();
// 当前线程加入等待队列
Node node = addConditionWaiter();
// 释放同步状态
int savedState = fullyRelease(node);
int interruptMode = 0;
// 判断此节点是否在同步队列中,若不在直至在同步队列为止
while (!isOnSyncQueue(node)) {
// 阻塞当前线程
LockSupport.park(this);
// 若线程已经中断则退出
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 竞争同步状态
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
Во-первых, определите, прерван ли поток. Если прерывание прервано, сразу генерируется исключение. В противном случае вызывается addConditionWaiter(), чтобы обернуть поток в узел и присоединиться к очереди ожидания.
private Node addConditionWaiter() {
// 获取等待队列的尾节点
Node t = lastWaiter;
// 若尾节点状态不为CONDITION,清除节点
if (t != null && t.waitStatus != Node.CONDITION) {
// 清除等待队列中所有状态不为CONDITION的节点
unlinkCancelledWaiters();
t = lastWaiter;
}
// 将当前线程包装成Node节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 尾插节点
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
// 将节点置为尾节点
lastWaiter = node;
return node;
}
Из исходного кода добавьте узел в очередь ожиданияне использует CAS, потому что поток, вызывающий метод await(), должен быть потоком, который получил блокировку, то есть этот процесс гарантируется блокировкой для обеспечения безопасности потока.После успешного присоединения к очереди ожидания вызовите fullRelease(), чтобы освободить состояние синхронизации
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 获取同步状态
int savedState = getState();
// 释放锁
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
Вызовите метод шаблона release() AQS, чтобы отменить состояние синхронизации и разбудить поток, на который ссылается узел-преемник головного узла в очереди синхронизации., возвращается в обычном режиме, если выпуск выполнен успешно, в противном случае генерируется исключение. Затем вызовите isOnSyncQueue(), чтобы определить, находится ли узел в очереди синхронизации.
final boolean isOnSyncQueue(Node node) {
// 若状态为CONDITION或者前驱节点为null
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 若后继节点不为null,表明节点肯定在同步队列中
if (node.next != null) // If has successor, it must be on queue
return true;
// 从同步队列尾节点找节点
return findNodeFromTail(node);
}
Если узел не находится в очереди на синхронизацию, он всегда будет находиться в теле цикла while, когдаЭтот поток прерывается или узел, связанный с потоком, перемещается в очередь синхронизации (то есть метод signal или signalAll условия, вызываемого другим потоком), завершит цикл и вызовет методAcquireQueued() для его получения, в противном случае он будет получен через LockSupport.park() в потоке блоков метода тела цикла
Метод await() показывает:
signal()/signalAll()
public final void signal() {
// 判断当前线程是否为获取锁的线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 唤醒条件队列中的第一个节点
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
Метод isHeldExclusively() должен быть переписан подклассами, его цель — определить, является ли текущий поток тем потоком, который получает блокировку.
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
doSignal
private void doSignal(Node first) {
do {
// 将头节点从等待队列中移除
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
doSignal() удаляет головной узел из очереди ожидания, а затем вызывает метод transferForSignal() для добавления узла в очередь синхронизации.
final boolean transferForSignal(Node node) {
// 将node节点状态由CONDITION CAS置为初始状态0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 将node节点尾插同步队列,返回插入后同步队列中node节点的前驱节点
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
Метод signal() показывает:
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
// 将等待队列中节点从头节点开始逐个移出等待队列,添加到同步队列
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
Метод signalAll() эквивалентен однократному выполнению метода signal() для каждого узла в очереди ожидания, перемещению всех узлов в очереди ожидания в очередь синхронизации и пробуждению потока каждого узла.благодарный
Искусство параллельного программирования на Java