0 Введение
Синхронизатор очередей AbstractQueuedSynchronizer (далее именуемый AQS) — это базовая платформа для реализации блокировок и связанных с ними синхронизаторов.
В JDK5 Дуг Ли добавил большое количество инструментов синхронизации в параллельные пакеты, такие как ReentrantLock, ReentrantReadWriteLock, Semaphore, CountDownLatch и т. д., все они основаны на AQS.
Внутри он управляет состоянием синхронизации между несколькими потоками через переменную с именем state, которая помечена как volatile. AQS можно использовать для вытеснения ресурсов исключительно или совместно с несколькими потоками.
На базе AQS очень удобно реализовывать функции, которых нет в Java.
Например, в отношении блокировок Java предоставляет ключевое слово synchronized, которое можно использовать для простой синхронизации между несколькими потоками. Но у этого ключевого слова также есть много недостатков, таких как:
- Он не поддерживает получение блокировок с течением времени.Как только поток не получит блокировку от synchronized, он застрянет здесь и не будет иметь шансов на побег. Поэтому обычно взаимоблокировка, вызванная синхронизацией, неразрешима.
- Не реагирует на прерывания.
- Невозможно попытаться получить блокировку. Если вы не получите его, когда попытаетесь получить его, он вернется немедленно. Синхронизация не имеет этой функции.
И ReentrantLock сделал все вышеперечисленные пункты на основе AQS.
1 Основная структура
Как видно из названия AbstractQueuedSynchronizer, AQS должен быть реализован на основе очередей (Queue). Внутри AQS это очередь, реализованная через связанный список. Каждый элемент связанного списка является реализацией своего внутреннего класса Node. Затем AQS указывает на начало очереди через переменную экземпляра head и указывает на конец очереди через переменную экземпляра tail.
Его исходный код определяется следующим образом:
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
/**
* The synchronization state.
*/
private volatile int state;
static final class Node {
/** 标识为共享式 */
static final Node SHARED = new Node();
/** 标识为独占式 */
static final Node EXCLUSIVE = null;
/** 同步队列中等待的线程等待超时或被中断,需要从等待队列中取消等待,进入该状态的节点状态将不再变化 */
static final int CANCELLED = 1;
/** 当前节点的后继节点处于等待状态,且当前节点释放了同步状态,需要通过unpark唤醒后继节点,让其继续运行 */
static final int SIGNAL = -1;
/** 当前节点等待在某一Condition上,当其他线程调用这个Conditino的signal方法后,该节点将从等待队列恢复到同步队列中,使其有机会获取同步状态 */
static final int CONDITION = -2;
/** 表示下一次共享式同步状态获取状态将无条件的传播下去 */
static final int PROPAGATE = -3;
/* 当前节点的等待状态,取值为上述几个常量之一,另外,值为0表示初始状态 */
volatile int waitStatus;
/* 前驱节点 */
volatile Node prev;
/* 后继节点 */
volatile Node next;
/* 等待获取同步状态的线程 */
volatile Thread thread;
/* 等待队列中的后继节点 */
Node nextWaiter;
// ...
}
Когда поток получает состояние синхронизации через AQS, AQS инкапсулирует текущий поток в Node и присоединяется к очереди. Таким образом, когда несколько потоков получают состояние синхронизации одновременно, AQS будет содержать очередь со следующей структурой:
На основе этой модели очереди ниже будет объяснен принцип получения состояния синхронизации потока в AQS.
2 Принцип реализации
Как видно из названия AQS, автор надеется, что AQS послужит базовым классом для предоставления услуг (обозначенных как Abstract). Поэтому обычно AQS используется по наследству.
AQS предоставляет несколько шаблонных методов для реализации классов для реализации пользовательских функций.
Эти методы:
- boolean tryAcquire(int arg): исключительно получает состояние синхронизации, обычно путем изменения значения состояния способом CAS для выполнения определенных функций.
- boolean tryRelease(int arg): Эксклюзивное состояние синхронизации выпуска, обычно также для изменения значения состояния.
- int tryAcquireShared(int arg): состояние синхронизации общего приобретения, возвращаемое значение >= 0 указывает на успех, в противном случае происходит сбой.
- boolean tryReleaseShared(int arg): Общее состояние синхронизации выпуска, также достигаемое путем изменения значения состояния.
- boolean isHeldExclusively(): указывает, принадлежит ли AQS исключительно текущему потоку.
Реализация этих методов по умолчанию вызовет исключение UnsupportedOperationException.
В настоящее время нам не нужно заботиться об этих методах, если мы понимаем, что состояние синхронизации управляется внутренне, контролируя значение состояния.
2.1 Получение состояния синхронизации
Обычно реализующий класс сначала пытается изменить значение состояния, чтобы получить синхронизированное состояние. Например, если поток успешно изменяет значение состояния с 0 на 1, это означает, что состояние синхронизации было успешно получено. Этот процесс модификации выполняется через CAS, поэтому безопасность потоков может быть гарантирована.
И наоборот, если изменить состояние не удается, текущий поток будет добавлен в очередь AQS, а поток будет заблокирован.
AQS предоставляет три метода для изменения состояния состояния.Исходный код выглядит следующим образом:
/**
* Returns the current value of synchronization state.
* This operation has memory semantics of a {@code volatile} read.
* @return current state value
*/
protected final int getState() {
return state;
}
/**
* Sets the value of synchronization state.
* This operation has memory semantics of a {@code volatile} write.
* @param newState the new state value
*/
protected final void setState(int newState) {
state = newState;
}
/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
2.2 Синхронная очередь
Как упоминалось выше, AQS на самом деле представляет собой двустороннюю очередь FIFO.Когда потоку не удается получить состояние синхронизации, он создает узел и добавляет его в конец очереди (этот процесс является потокобезопасным, реализация CAS). и блокирует текущий поток (через метод LockSupport.park()); При освобождении состояния синхронизации AQS сначала определит, является ли головной узел нулевым. Если это не пустое значение, это означает, что есть потоки, ожидающие состояния синхронизации, и он попытается разбудить головной узел, чтобы повторно конкурировать за него. состояние синхронизации.
2.3 Приобретение эксклюзивного статуса синхронизации
Эксклюзивный означает, что одновременно может быть синхронизирован только один поток.
AQS сначала попытается вызвать метод tryAcquire класса реализации, чтобы получить статус синхронизации.Если получение не удалось, он попытается инкапсулировать его как узел Node и добавить в конец очереди синхронизации.
Приобретение состояния исключительной синхронизации реализуется методом получения AQS. Его исходный код выглядит следующим образом:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
Этот метод сначала попытается получить состояние синхронизации (tryAcquire), если получение не удастся, текущий поток будет добавлен в очередь синхронизации с помощью метода addWaiter. И заблокируйте текущий поток (LockSupport.park()) в методе AcquireQueued и войдите в состояние вращения, чтобы получить состояние синхронизации.
Давайте посмотрим, как он создает Node и добавляет его в конец очереди. Во-первых, это addWaiter:
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
// mode = Node.EXCLUSIVE,表示是独占模式
Node node = new Node(Thread.currentThread(), mode);
// 先快速的通过CAS的方式将Node添加到队尾,如果失败,再进入enq方法通过无限循环添加
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
// 无限循环的将node添加到队尾,保证能添加成功
/*
注意:如果是首次向队列中添加Node,那么调addWaiter方法时,tail还是null,所以addWaiter方法不会设置成功,会直接进入这个方法
进入这个方法后,由于tail仍然是null,所以会走到第一个if里面,这是会创建一个空的Node出来作为头结点
然后再次循环,此时tail不是null了,会进入else的代码中,这时,才会将需要add的Node添加到队列尾部。
也就是说,首次创建队列时,会默认加一个空的头结点。
*/
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
Посмотрите еще раз на методAcquireQueued:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 进入自旋,不断的获取同步状态
for (;;) {
// 获取node在队列中的前驱节点
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 如果成功进入到这块代码,说明成功的获取了同步状态
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 获取不成功,调用LockSupport.park()方法将当前线程阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
В методе shouldParkAfterFailedAcquire пользователь определяет, нужно ли блокировать текущий поток.Метод будет оперировать состоянием ожидания предшествующего узла хвостового узла текущей очереди и оценивать необходимость парковки в соответствии с состоянием ожидания.
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // Node.SIGNAL == -1
/*
* 表明当前节点需要其他线程的唤醒才能继续执行,此时可以安全的park。
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 如果一个节点是初始状态,即waitStatus=0时,
* 将前驱节点的waitStatus设置为-1,表明其需要别的线程唤醒才能继续执行
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
Когда метод shouldParkAfterFailedAcquire определяет, что текущий узел необходимо припарковать, он вызывает функцию parkAndCheckInterrupt, чтобы заблокировать его:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
2.4 Выпуск эксклюзивного состояния синхронизации
Эксклюзивное освобождение состояния синхронизации реализовано в AQS с помощью метода release(). Исходный код этого метода выглядит следующим образом:
public final boolean release(int arg) {
// 尝试调用实现类的tryRelease方法来修改同步状态(state)
if (tryRelease(arg)) {
Node h = head;
/*
1.如果head节点是null,表示没有其他线程竞争同步状态,直接返回释放成功
2.如果head节点不是null,表明有竞争。通过unparkSuccessor方法,通过unpark方法唤醒head节点的next节点。使其重新尝试竞争同步状态。
*/
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
Метод unparkSuccessor разбудит следующий узел головного узла, чтобы он мог повторно конкурировать за состояние синхронизации:
private void unparkSuccessor(Node node) {
/*
* 如果waitStatus的值是负数,例如:-1(等待signal)
* 则将其值还原为0
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 获取头结点的next节点,如果非空,则unpark他
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
2.5 Получение и освобождение состояния монопольной синхронизации — схема
Далее будет показан процесс в исходном коде на рисунке.Во-первых, мы предполагаем, что реализация tryAcquire выглядит следующим образом:
boolean tryAcquire(int acquires) {
return compareAndSetState(0, acquires);
}
Параметр приобретает фиксированное значение 1, что означает: через CAS, если значение состояния успешно изменено с 0 на 1, это означает, что состояние синхронизации получено успешно, в противном случае происходит сбой и его необходимо добавить в очередь на синхронизацию.
Предположим, что tryRelease реализован следующим образом:
boolean tryRelease(int releases) {
int c = getState() - releases;
if (c == 0) {
setState(c);
return true;
}
return false;
}
Параметр выпусков фиксируется равным 1, что означает: если текущее состояние-1=0, выпуск считается успешным, и другие потоки могут конкурировать за состояние синхронизации.
Предположим, что три потока одновременно получают состояние синхронизации, обозначенное как t1, t2 и t3, и три потока одновременно изменяют значение состояния с помощью метода получения.
Предполагается, что модификация t1 прошла успешно, а модификация t2 и t3 не удалась.
После успешного изменения t1 измените значение состояния на 1 и вернитесь напрямую. В этот момент и голова, и хвост пусты, поэтому очередь синхронизации также пуста. На данный момент состояние очереди синхронизации следующее:
Поток t2 не может конкурировать за состояние синхронизации и присоединяется к очереди синхронизации:
Поток t3 не может конкурировать за состояние синхронизации и присоединяется к очереди синхронизации:
После выполнения потока t1 ресурсы освобождаются. Сначала восстановите состояние до 0, а затем разблокируйте следующий узел (узел t2) головного узла, чтобы восстановить квалификацию соревнования для состояния синхронизации.
Предполагая, что t2 успешно получает состояние синхронизации после пробуждения (то есть, вызвав метод tryAcquire и успешно установив состояние в 1), t2 установит свой собственный Node в качестве головного узла и установит следующий из исходного головного узла в null (помогает в GC)
Выполнение t2 завершено, состояние синхронизации освобождается, состояние устанавливается на 0, а t3 пробуждается, чтобы снова сделать его пригодным для участия в соревновании.
Предполагая, что t3 успешно получает состояние синхронизации, t3 устанавливает свой собственный узел как головной узел и устанавливает следующий из предыдущих головных узлов в нуль (то есть устанавливает следующий из t2 в нуль)
После выполнения t3 сбросьте состояние синхронизации и установите состояние на 0. Поскольку в это время его состояние ожидания равно 0, это означает, что нет узла-преемника, который нужно разпарковать, и релиз возвращается напрямую.
Последний узел t3 не освобождается, поскольку его можно использовать в качестве головного узла для следующего соревнования состояния синхронизации.
2.6 Тайм-аут для получения статуса синхронизации
Метод tryAcquireNanos реализует эту функциональность. Это примерно то же самое, что описанный выше процесс получения состояния синхронизации, но с добавлением оценки времени. Иными словами, каждый раз, когда спин получает состояние синхронизации, он сначала определяет, превышает ли текущее время указанный период тайм-аута, и, если тайм-аут превышен, сразу возвращается ошибка получения.
Давайте взглянем на исходный код Исходный код метода tryAcquireNanos выглядит следующим образом:
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 先尝试获取同步状态,如果失败,尝试超时获取
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
Можно обнаружить, что функция тайм-аута окончательно реализована методом doAcquireNanos, в этом методе большая часть логики согласуется с описанным выше процессом. Различия отмечены в примечаниях.
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
// 计算出超时那个时间点的时间戳
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 判断,如果超时,直接返回获取失败
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
// 没超时的话,判断剩余时间是否大于1000纳秒,如果大于才park当前线程
// 否则,不park,直接进入下一次自旋获取,因为这个时间足够小了,可能已经超出了一次系统调用的时间
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold) // spinForTimeoutThreshold = 1000
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
3 ссылки
- Искусство параллельного программирования на Java Фан Тэнфэй, Вэй Пэн, Ченг Сяомин
Добро пожаловать в мой публичный аккаунт WeChat