1 Обзор
AQS предоставляетРеализация блокировок и синхронизаторов на основе очередей FIFOбазовый каркас.
Классы, которые используют AQS в j.c.u:
- CountDownLatch.Sync
- ThreadPoolExecutor.Worker
- ReentrantLock.FairSync/NonfairSync
- ReentrantReadWriteLock.FairSync/NonfairSync
- Semaphore.Sync
Официальная документация:docs.Oracle.com/java-color/8/do…
2 Как использовать AQS
2.1 Связанные концепции
state: синхронизированное значение состояния, используемое для реализации записи состояния в подклассе;
acquire: изменить состояние, которое можно понимать как получение блокировки;
release: изменение состояния, противоположного получению, можно понимать как снятие блокировки.
predecessor: предшественник,successor: преемник
Node.EXCLUSIVE: эксклюзивный режим,Node.SHARED: режим обмена
2.2 Реализация конвенций
Чтобы реализовать блокировку или синхронизатор с помощью AQS, его необходимо реализовать в соответствии со следующими пунктами:
-
1 Синхронизатору требуется одно число для обозначения состояния.
-
2 Синхронизатор должен определитьВнутренний класс, наследующий AQSреализовать свойства синхронизации.
-
3 После того, как внутренний класс наследует AQS, его необходимо реализовать по мере необходимости.
tryAcquire*
/tryRelease*
изменить состояние. -
4 Существует два режима: эксклюзивный режим (по умолчанию) и общий:
- монопольный режим: монопольный режим, попытки захвата других потоков не увенчаются успехом;
- общий режим: общий режим, в котором несколько потоков пытаются получить данные успешно.
-
5 Обычно поддерживается только один из режимов, но есть особые случаи: ReadWriteLock поддерживает оба режима одновременно.
-
6 AbstractQueuedSynchronizer.ConditionObject может использоваться в подклассах как реализация Condition.
2.3 Этапы реализации
2.3.1 Эксклюзивный замок
Основные шаги заключаются в следующем:
- 1 Создайте внутренний класс, наследующий AQS;
- 2 Реализация
tryAcquire
/tryRelease
; - 3 вызывается при блокировке
acquire
, вызывается при разблокировкеrelease
; - 4 Оценка того, получает ли какая-либо нить в настоящее время блокировку
getState() != 0
.
кjava.util.concurrent.locks.ReentrantLock
Возьмите FairSync в качестве примера:
1 Создайте внутренний класс, наследующий AQS
abstract static class Sync extends AbstractQueuedSynchronizer{....}
static final class FairSync extends Sync{....}
2 Реализация FairSync
tryAcquire
/tryRelease
tryAcquire:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
tryRelease:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
3 вызывается при блокировке
acquire
, вызывается при разблокировкеrelease
// in ReentrantLock.FairSync
final void lock() {
acquire(1);
}
// in ReentrantLock
public void unlock() {
sync.release(1);
}
4 Оценка того, получает ли какая-либо нить в настоящее время блокировку
getState() != 0
// in ReentrantLock.Sync
final boolean isLocked() {
return getState() != 0;
}
2.3.2 Общий замок
Основные шаги заключаются в следующем:
- 1 Создайте внутренний класс, наследующий AQS;
- 2 Реализация
tryAcquireShared
/tryReleaseShared
; - 3 Вызывается при блокировке
acquireShared
, вызывается при снятии блокировкиreleaseShared
.
кjava.util.concurrent.CountDownLatch
Например:
1 Создайте внутренний класс, наследующий AQS
private static final class Sync extends AbstractQueuedSynchronizer{....}
2 Реализация
tryAcquireShared
/tryReleaseShared
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
3 Вызывается при блокировке
acquireShared
, вызывается при снятии блокировкиreleaseShared
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void countDown() {
sync.releaseShared(1);
}
3 основных атрибута
Очередь FIFO, связанный список, состоящий из узлов Node:
+------+ prev +-----+ +-----+ head | | <---- | | <---- | | tail +------+ +-----+ +-----+
3.1 head
private transient volatile Node head;
Функции: Головной узел очереди.
иллюстрировать:
- ленивая инициализация (в
enq
инициализируется в). - За исключением инициализации, в других случаях только вызов
setHead(Node)
будет изменен. - head.waitStatus не будет в состоянии CANCELED.
Связанные методы:
setHead(Node node)
-
setHeadAndPropagate(Node node, int propagate)
: называетсяsetHead
,setHeadAndPropagate
существуетdoAcquireShared*
вызов метода; -
compareAndSetHead(Node update)
только приenq
был вызван метод.
3.2 tail
private transient volatile Node tail;
Функции: хвостовой узел очереди.
иллюстрировать:
- ленивая инициализация (в
enq
используется вtail = head
инициализация). - только при вызове метода
enq(Node)
Он будет изменен только при добавлении нового ожидающего узла.
Связанные методы:
-
enq(final Node node)
: добавить узел в конец очереди -
findNodeFromTail(Node node)
: найти узлы, начиная с хвоста compareAndSetTail(Node expect, Node update)
3.3 state
private volatile int state;
Функции: значение состояния, используемое для определения состояния синхронизации.
Связанные методы:
getState()
setState(int newState)
compareAndSetState(int expect, int update)
4 Node
4.1 Свойства
4.1.1 waitStatus
volatile int waitStatus
:условие
- CANCELED = 1: Указывает, что текущий узел отменен.
Если текущий узел отменен, например, по тайм-ауту или прерыванию после парковки потока, статус изменится на ОТМЕНЕН.
Когда статус узла меняется на ОТМЕНЕН,не изменится ни на какое другое состояние, его поток больше не будет заблокирован.
- SIGNAL = -1 : Указывает текущий узелПотоки последующих узловНужно разбудить, чтобы запустить (распарковать).
Описание текущего узлапоследующий узелЯвляется блочным состоянием (или собирается стать блочным), то есть его поток припаркован.
Поэтому, когда текущий узел освобождается или отменяется, его последующие узлы должны быть разблокированы.
Чтобы избежать гонок, метод получения должен указывать, что требуется сигнал перед попыткой получения, в противном случае блокируется.
- CONDITION = -2 :
Указывает, что поток текущего узла находится в очереди условий (устанавливается в классе ConditionObject), и узел не станет узлом в очереди синхронизации, пока его состояние не будет установлено на 0.
Это состояние только
ConditionObject
используется в.
- PROPAGATE = -3 : последующее приобретениеShared в текущем сценарии будет распространяться безоговорочно.
releaseShared (состояние синхронизации общего выпуска) необходимо распространять на другие узлы; это состояние задается в методе doReleaseShared (только для головных узлов), чтобы гарантировать распространение, даже если вмешиваются другие операции.
- 0: Текущий узел находится в очереди синхронизации, ожидая получения.
Значение по умолчанию для состояния ожидания:
- очередь синхронизации: 0;
- очередь условий: УСЛОВИЕ (-2).
4.1.2 prev
volatile Node prev
: предшествующий узел, только дляне-ConditionObjectОпределение узла включено.
Связанные методы:
-
Node predecessor()
: возвращает узел-предшественник текущего узла; -
boolean hasQueuedPredecessors()
: Запросите очередь синхронизации, чтобы узнать, не ожидает ли поток с более высоким приоритетом, чем он сам.
4.1.3 next
volatile Node next
: узел-преемник, только дляне-ConditionObjectОпределение узла включено.
Связанные методы:
-
void unparkSuccessor(Node node)
: разблокировать узел узлаузел-преемник.
4.1.4 thread
volatile Thread thread
: поток текущего узла, инициализированный при построении и обнуленный после использования.
4.1.5 nextWaiter
Node nextWaiter
: Узел-последователь в очереди условий доступен только для определения узла в ConditionObject, то есть он имеет значение, когда waitStatus==CONDITION.
4.2 Разница между узлом в очереди синхронизации и очередью условий
Сравнивать | очередь синхронизации | очередь условий |
---|---|---|
используемые поля | waitStatus/thread/prev/next/nextWaiter | waitStatus/thread/nextWaiter |
следующийОфициант значение | Режим представления, зафиксированный на Node.EXCLUSIVE или Node.SHARED | следующий узел |
начальное значение состояния ожидания | 0 | CONDITION |
необязательное значение waitStatus | CANCELLED/SIGNAL/PROPAGATE/0 | CANCELLED/CONDITION |
5 Соглашения о реализации подклассов AQS:
- Реализовать по мере необходимости
tryAcquire*/tryRelease*
метод; - использовать
getState/setState(int)/compareAndSetState(int,int)
для отслеживания или изменения состояния синхронизации.
5.1 tryAcquire(int)
Попробуйте приобрести в эксклюзивном режиме.
Когда метод реализован, вам нужно запросить, позволяет ли текущее состояние получать, а затем использовать compareAndSetState для получения.
tryAcquire
существуетacquire(int arg)
вызывается.
Если вызов завершился неудачно и вызывающий поток не находится в очереди, метод получения поместит поток в очередь до тех пор, пока он не будет освобожден другим потоком. Обычно используется для реализации Lock.tryLock().
Возвращаемое значение: true указывает, что выполнение выполнено успешно (обновление состояния выполнено успешно, получение выполнено успешно).
5.2 tryRelease(int)
В эксклюзивном режиме попробуйте изменить состояние для выпуска.
Возвращаемое значение: true указывает, что текущий объект находится в полностью освобожденном состоянии, и любые другие ожидающие потоки могут попытаться его получить.
5.3 tryAcquireShared(int)
Попробуйте получить в общем режиме.
возвращаемое значение:
- Отрицательное число: провалиться;
- 0: получение выполнено успешно, но следующий запрос в совместно используемом режиме не будет успешным, то есть оставшиеся ресурсы недоступны;
- Положительное число: получение успешно, и получение в совместно используемом режиме все еще может быть выполнено успешно, то есть все еще доступны оставшиеся ресурсы. Оставшиеся ожидающие потоки должны проверить, позволяет ли состояние объекта получить данные.
5.4 tryReleaseShared(int)
В общем режиме попробуйте изменить состояние для выпуска.
Возвращаемое значение: true указывает, что выпуск в режиме совместного использования прошел успешно, то есть ожидающее получение (совместное или монопольное) может быть успешно выполнено.
5.5 isHeldExclusively()
Является ли текущий поток исключительно синхронизированным (то есть состояние занято), то естьМонополизирует ли текущий поток ресурс.
Этот метод вызывается в методе ConditionObject, поэтому его необходимо реализовать только при использовании условия.
Вышеупомянутые 5 методов, реализация по умолчанию - броситьUnsupportedOperationException.
6 внутренних методов
Некоторые внутренние методы украшены приватными, а некоторые нет. Но все эти методы используются внутри AQS.
6.1 addWaiter
проиллюстрировать:
- 1 использованиетекущий потока такжезаданный режимсоздать узел;
- 2 Вновь созданный узел добавляется в хвост очереди, чтобы стать новым хвостом;
вернуть: создан новый узел
Режим параметра:Node.EXCLUSIVEилиNode.SHARED
private Node addWaiter(Node mode) {
// 创建节点
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
// tail不为null,处理流程和enq(Node)中tail!=null时的处理流程一样。
node.prev = pred; //当前节点prev指向原来的tail
if (compareAndSetTail(pred, node)) { //将tail对象由原来的tail修改为当前新节点
pred.next = node; //原来的tail的next指向当前节点
return node;
}
}
// tail为null则调用enq方法。
enq(node);
return node;
}
6.2 enq
проиллюстрировать:
- 1 Сначала определите, является ли хвост нулевым;
- 2 если
tail==null
: инициализацияhead
,а такжеtail = head
, а затем переработать; - 3 Добавьте новый узел (узел параметров) в конец очереди;
вернуть: исходный хвост (то есть узел-предшественник нового узла)
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// tail为null,初始化head/tail。
if (compareAndSetHead(new Node()))
tail = head; //tail指向head
} else {
node.prev = t; //当前节点prev指向原来的tail
if (compareAndSetTail(t, node)) { // 将tail对象由原来的tail修改为当前新节点
t.next = node; //原来的tail的next指向当前节点
return t;
}
}
}
}
Здесь возникает вопрос: если хвост равен нулю, а голова не равна нулю, разве хвост не всегда инициализируется?
Способ инициализации головыcompareAndSetHead
, и согласноcompareAndSetHead
в примечаниях, только вenq
вызывать:
/**
* CAS head field. Used only by enq.
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
так:голова и хвост находятся вenq
метод инициализируется одновременно.
addWaiter
а такжеenq
существуетtail!=null
Поток обработки тот же, все сначала указывают текущий узел prev на исходный хвост, а затем вызываютcompareAndSetTail
Установите для нового узла значение tail, затем укажите исходный узел tail.next на новый узел.
6.3 shouldParkAfterFailedAcquire
проиллюстрировать:
- 1 Функция этого методаприобрести не удалосьПосле этого судите узелдолжен блокировать;
- 2 Параметр pred должен быть префиксом узла параметра..
Логика суждения:
Если pred узла является prewaitStatus == Node.SIGNAL, то узел должен заблокироваться;
В противном случае сделайте следующее:
- 1 если
pred.waitStatus == Node.CANCELLED
: Указывает, что узел-предшественник был отменен. Продолжайте смотреть вперед, пока не найдетеwaitStatus <= 0
Узел , как новый фронт узла, и отменить точки CANCELED из очереди; - 2 Если pred.waitStatus равен 0 или PROPAGATE (не CONDITION): поскольку 0 — это начальное состояние, а PROPAGATE — состояние распространения, поэтому, если пред.
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前驱节点的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 当前节点(即pred的下一个节点)已经是等待状态,需要一个release去唤醒它。所以node可以安全的park.
return true;
if (ws > 0) { // CANCELLED
/*
waitStatus > 0 说明是canceled状态。
前驱是canceled状态,则表明前驱节点已经超时或者被中断,需要从同步队列中取消。
持续向前找,直到找到waitStatus<=0的节点,作为node的前驱.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
前驱节点waitStatus为 0 或 PROPAGATE,置换为SIGNAL.
不会是CONDITION,因为CONDITION用在ConditionObject中。
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
6.4 unparkSuccessor
разбудить текущий узелпоследующий узел.
Технологический поток:
- 1 Если узел не ОТМЕНЕН, установите для состояния ожидания значение 0;
- 2 Если следующий узел является нулевым или находится в состоянии CANCELED, тоПервыйЧтобы выполнить условия(waitStatus <= 0) узла unpark.
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); // waitStatus设置为0
Node s = node.next;
if (s == null || s.waitStatus > 0) {
/*
node的后继节点是null或是CANCELLED状态,释放该后继节点;
并从tail开始向前找,找到离node最近的一个非CANCELLED(waitStatus <= 0)的节点,作为要unpark的节点。
*/
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// unpark后继节点。
if (s != null)
LockSupport.unpark(s.thread);
}
6.5 acquireQueued
иллюстрировать:
- 1 В монопольном режиме захватываются потоки, уже находящиеся в очереди;
- 2 дюйма
acquire
метод илиConditionObject.await*
используется в методе.
Технологический поток:
- 1 Только когда предшественник узла является головным, наступает очередь узла попытаться получить блокировку., а затем позвоните
setHead
Установите текущий узел в головной; - 2 В противном случае проверьте, должен ли узел быть припаркован (равно ли состояние ожидания перед узлом SIGNAL), и если да, припаркуйте его, а затем подождите, пока другие потоки не отпаркуют текущий поток.
вернуть: Было ли ожидание прервано.
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
/*
只有前驱节点是head时,才轮到该节点争夺锁。
当前节点的前驱是head 并且 tryAcquire成功:
把当前要获取的节点设置为head,
*/
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
/*
如果当前节点不是head 或 tryAcquire失败,则检验当前节点的线程(此时即当前线程)是否应park。
parkAndCheckInterrupt会park当前线程,因此在这里block;将来有其他线程对当前线程unpark时,将继续此循环。
否则继续循环尝试acquire。
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
6.6 doAcquireShared
иллюстрировать:
- 1 В совместно используемом режиме потоки, уже находящиеся в очереди, захватываются;
- 2 дюйма
acquireShared
метод для использования.
Технологический поток:
- 1 по телефону
addWaiter(Node.SHARED)
Хотите добавить новый узел в конец очереди; - 3 Только когда предшественник узла является головным, наступает очередь узла попытаться получить блокировку., а затем позвоните
setHeadAndPropagate
Установить текущий узел в начало и распространить назадtryAcquireShared
возвращаемое значение; - 3 В противном случае проверьте, должен ли узел быть припаркован (равно ли состояние ожидания перед узлом SIGNAL), и если да, припаркуйте его, а затем подождите, пока другие потоки не отпаркуют текущий поток.
private void doAcquireShared(int arg) {
// 创建Node.SHARED模式的新节点(添加在队尾)
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
// 循环
for (;;) {
/*
1 获取当前节点的前置节点;
2 前置节点是head,则tryAcquireShared,如果成功,则将当前节点设置为head,并将结果向后传播。
*/
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//设置head值,并将tryAcquireShared结果向后传播
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 前置节点不是head,当前线程是否park;
// 如果park当前线程,将来有其他线程对当前线程unpark时,将继续此循环。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
acquireQueued
а такжеdoAcquireShared
При оценке узлов обработки процесс такой же, отличия заключаются в следующем:
-
acquireQueued
:использоватьsetHead
установить головной узел; -
doAcquireShared
:использоватьsetHeadAndPropagate
Настройте головной узел и распространитеtryAcquireShared
Возвращаемое значение.
6.7 isOnSyncQueue
иллюстрировать:
- В основном используетсяConditionОценка в процессе возвращает, находится ли узел в очереди на синхронизацию.
// Internal support methods for Conditions
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
// waitStatus是CONDITION,或prev是null
return false;
if (node.next != null)
// 根据文章前面对Node的描述,next只会在同步队列中有值。
return true;
/*
因为CAS操作替换值的时候可能会失败,所以有可能出现:
node.prev不为null,但是没在同步队列中。
所以需要从队尾向前再遍历一遍。
*/
return findNodeFromTail(node);
}
/**
从同步队列的tail开始向前遍历,查找是否有节点node。
*/
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
7 публичных методов
В публичном методе синхронизацию нужно вызыватьacquire*/release*
Связанные методы, кроме того, может потребоваться использование некоторых других общедоступных методов.
Ниже перечислены исключенияacquire*/acquire*
вне общедоступного метода.
7.1 hasQueuedPredecessors
Запросите, есть ли поток с более высоким приоритетом, чем он сам, ожидающий в очереди синхронизации.
Расставьте приоритеты: на самом деле этоВход в очередь перед собой, в нить перед собой(Поскольку он входит в очередь раньше, это занимает больше времени, чем ожидание само по себе).
Если очередь синхронизации одновременно удовлетворяет следующим трем условиям, она возвращает значение true:
- 1 голова/хвост не равны нулю, и голова != хвост;
- 2 head.next != null;
- 3 head.next.thread != currentThread.
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
7.2 hasQueuedThreads
Есть ли еще ожидающие потоки. Суждение простое.head != tail
, считается, что все еще есть ожидающие потоки.
public final boolean hasQueuedThreads() {
return head != tail;
}
7.3 setExclusiveOwnerThread
Установите поток, который в настоящее время владеет монопольной блокировкой, например Thread.currentThread().
обычно в
tryAcquire
серединаcompareAndSetState
установить после успеха или послеtryRelease
установить вsetExclusiveOwnerThread(null)
.
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
7.4 getExclusiveOwnerThread
вернуть
setExclusiveOwnerThread
Поток, который в настоящее время владеет монопольной блокировкой, установленной в .Обычно используется для определения того, удерживает ли поток (например, текущий поток) текущую монопольную блокировку, например
if (current == getExclusiveOwnerThread()){....}
.
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
7.5 isQueued
Определить, находится ли поток потока в очереди синхронизации
public final boolean isQueued(Thread thread) {
if (thread == null)
throw new NullPointerException();
// 从tail向前查找
for (Node p = tail; p != null; p = p.prev)
if (p.thread == thread)
return true;
return false;
}
7.6 getExclusiveQueuedThreads
вернутьэксклюзивный режимПотоки, ожидающие в очереди
public final Collection<Thread> getExclusiveQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
if (!p.isShared()) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
}
return list;
}
7.7 getSharedQueuedThreads
вернутьобщий режимПотоки, ожидающие в очереди
public final Collection<Thread> getSharedQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
if (p.isShared()) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
}
return list;
}
8 Эксклюзивная синхронизация режима
Эксклюзивный режим, для реализации которого требуется внутренний класс подкласса.tryAcquire
/tryRelease
, и вызывается при блокировке и разблокировкеacquire/release
,какReentrantLock.FairSync
.
8.1 acquire
- 1 успешно ли выполнено действие tryAcquire(arg)
- 2. В случае неудачи вызовите addWaiter, чтобы создать узел, и вызовите acceptQueued, чтобы присоединиться к узлу.
- 3acquireQueued возвращает true, указывая на то, что он был прерван, затем вызывает selfInterrupt.
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
8.2 release
- 1 выполнить
tryRelease
, в случае успеха он продолжит выполнение, в случае неудачи вернет false; - 2 голова не пуста и
nextWaiter!=0
(0 — начальное значение nextWaiter узла в очереди синхронизации), затем выполнитьunparkSuccessor
Разбудите преемника, затем верните true.
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//unpark节点head的后继者
return true;
}
return false;
}
9 Синхронизация общего режима
Общий режим, внутренний класс, который требует подклассов, реализуетtryAcquireShared
/tryReleaseShared
, и вызывается при блокировке и разблокировкеacquireShared/tryReleaseShared
,какCountDownLatch.Sync
.
9.1 acquireShared
Если tryAcquireShared терпит неудачу, выполняется doAcquireShared.
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
9.2 releaseShared
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) { // head != tail
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { // head.waitStatus == Node.SIGNAL
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // 如果将head的waitStatus由SIGNAL置换为0失败,则继续循环。
unparkSuccessor(h); // unpark头节点的后续节点
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // 如果head的waitStatus由0置换为PROPAGATE失败,则继续循环。
}
if (h == head) // 如果head没有改变,则退出;否则继续循环。
break;
}
}
10 Резюме
- 1 AQS используетСвязанный список узловРеализовать очередь выполнения потоков, то есть реализовать функцию синхронизации;
- 2 Управление потоком заключается в использовании
LockSupport.park*/LockSupport.unpark*
выполнить; - 3 использовать
shouldParkAfterFailedAcquire
Чтобы определить, стоит ли парковаться, используйтеLockSupport.park*
для достижения цели вращения, напримерacquireQueued
; - 4 использовать
unparkSuccessor
Разбудите узел-преемник, чтобы узел-преемник мог выполняться, напримерrelease
серединаunparkSuccessor(head)
; - 5 Только узел, чей узел-предшественник является головным, может конкурировать за блокировку, в противном случае припарковаться;
- 6 Общий процесс заключается в добавлении узла потока в конец очереди, затем приостановке потока и пробуждении, когда узел становится преемником головы для достижения цели синхронизатора.