В первых двух статьях были представлены некоторые концепции многопоточности и пулов потоков, а в этой статье я, наконец, познакомлю вас с AQS, очень важным базовым компонентом JUC!
Введение в AQS
AQS — это аббревиатура от AbstractQueuedSynchronizer, которая является базовой структурой для создания блокировок и других компонентов синхронизации очередей (ReentrantLock, CountDownLatch, Semaphore и т. д.). Он использует изменчивую переменную-член с измененным типом int для представления состояния синхронизации и использует очередь (двухсвязный список), состоящую из статического внутреннего класса Node, для выполнения работы в очереди потоков, которые получают ресурсы синхронизации. Шаблон метода шаблона используется в AQS, а подклассы управляют состоянием синхронизации, наследуя AQS и переписывая некоторые из его методов.Как правило, подклассы используются как статические внутренние классы пользовательских компонентов синхронизации.
Анализ реализации AQS
состояние синхронизации
/**
* 同步状态
*/
private volatile int state;
/**
* 返回同步状态的当前值
*/
protected final int getState() {
return state;
}
/**
* 设置同步状态的值
*/
protected final void setState(int newState) {
state = newState;
}
/**
* 利用CAS操作更新当前的status值
*/
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
Поле состояния изменяется с помощью volatile для обеспечения его видимости, а операция CAS используется для обеспечения атомарности изменения операции состояния. Следовательно, в AQS он является потокобезопасным (это не означает, что изменяемая volatile переменная является потокобезопасной, и только чтение/запись одной переменной является атомарной, поэтому операция обновления здесь должна быть гарантирована CAS). Значение состояния может использоваться для управления тем, принадлежит ли синхронизатор монопольному режиму (меньше или равно 1) или разделяемому режиму (больше 1).
Эксклюзивный режим: одновременно состояние синхронизации может получить не более одного потока;
Общий режим: несколько потоков получат состояние синхронизации одновременно;
AQS предоставляет несколько шаблонных методов для управления состоянием синхронизации. Компоненты синхронизации должны использовать шаблонные методы, предоставляемые AQS, для достижения синхронизации. Ключевые методы перезаписи:
метод | инструкция |
---|---|
tryAcquire(int arg) | Эксклюзивный доступ к статусу синхронизации |
tryAcquireShared(int arg) | Общий доступ к статусу синхронизации |
tryRelease(int arg) | Эксклюзивное состояние синхронизации выпуска |
tryReleaseShared(int arg) | Общее состояние синхронизации выпуска |
очередь синхронизации
AQS полагается на очередь синхронизации для завершения управления состоянием синхронизации. Если текущему потоку не удается получить состояние синхронизации, он будет создан как узел узла и добавлен в очередь синхронизации (безопасность потоков гарантируется CAS). текущий поток будет заблокирован в то же время. Когда состояние синхронизации освобождается, поток в первом узле очереди проснется и снова попытается получить состояние синхронизации;
private transient volatile Node head;
private transient volatile Node tail;
/** CAS设置头节点.只被enq()方法调用.后续很多节点入队出队操作都是使用enq()方法*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/** CAS设置尾节点.只被enq()方法调用.*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
static final class Node {
/** 标记表示节点正在共享模式中等待 */
static final Node SHARED = new Node();
/** 标记表示节点正在独占模式中等待 */
static final Node EXCLUSIVE = null;
/**
* 在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待,
* 节点进入该状态将不会变化
*/
static final int CANCELLED = 1;
/**
* 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消
* 会通知后继节点,使后继节点的线程得以运行
*/
static final int SIGNAL = -1;
/**
* 节点在等待队列中,当其他线程对condition调用了signal()后,
* 该节点会从等待队列转移到同步队列中
*/
static final int CONDITION = -2;
/** 表示下一次共享式同步状态获取将会被传播 */
static final int PROPAGATE = -3;
/** 等待状态*/
volatile int waitStatus;
/** 前驱节点*/
volatile Node prev;
/** 后继节点*/
volatile Node next;
/** 获取同步状态的线程*/
volatile Thread thread;
/** 等待队列Condition中的后继节点,如果式共享模式则是SHARED常量*/
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
}
Процесс получения состояния синхронизации (эксклюзивный режим)
Здесь кратко проанализирована часть исходного кода, посвященная получению состояния синхронизации. Картинка стоит тысячи слов, поэтому я дам вам блок-схему, чтобы вы могли понять общий процесс.
Анализ исходного кода
Метод Acquis(int arg) используется для получения состояния синхронизации.
/**
* 以独占模式获取同步状态,忽略中断.通过至少调用tryAcquire实现,成功返回.
* 否则线程排队,可能重复阻塞和解除阻塞,调用tryAcquire直到成功。
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
Метод tryAcquire() имеет разные реализации для разных компонентов синхронизации, например, в ReentrantLock. Справедливые и нечестные блокировки реализуются немного по-разному. В этой статье подробно не разбирается, основная функция — получить статус синхронизации, вернуть true в случае успеха, иначе вернуть false.
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 尝试enq方法的快速路径.失败后备份到完整enq方法
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 如果队列为空,初始化
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
Метод addWaiter(Node.EXCLUSIVE) используется для создания узла Node в монопольном режиме и попытки использовать операцию CAS для добавления его в очередь ожидания в качестве хвостового узла; если добавление не удается, введите метод enq() для установки это как хвост CAS в узле бесконечного цикла до тех пор, пока он не будет успешным.
/**
* 对于已经在同步队列中的线程,以独占不间断模式获取。由条件等待方法使用以及获取。
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
// 如果前驱节点是头节点则尝试获取同步状态
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
// 如果前驱节点不是头节点则阻塞当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
В методеAcquireQueued() текущий поток пытается получить состояние синхронизации через бесконечный цикл и может попытаться получить состояние синхронизации только тогда, когда текущий узел драйвера является головным узлом. Поток головного узла разбудит свой узел-преемник после выхода из состояния синхронизации.Здесь проверяется, является ли предшественник головным узлом перед получением состояния синхронизации, чтобы предотвратить преждевременное уведомление (ожидание пробуждения потока из-за прерывание)
Синхронизированный процесс выпуска состояния (эксклюзивный режим)
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 如果队列中头节点不为null并且该节点不是处于初始状态(waitStatus为0)
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
/*
* 如果状态为负,请尝试以预期信号清
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 如果后继节点为null或者waitStatus>0(为1,从同步队列中取消了)
* 则继续向后寻找节点
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
Этот метод освобождает состояние синхронизации, вызывая tryRelease(arg), который реализуется в соответствии с различными компонентами, а затем пробуждает узел-преемник, чтобы заставить узел-преемник попытаться получить состояние синхронизации.
Введение
Интерфейс Condition определяет два типа методов ожидания/уведомления.Предпосылкой вызова этих методов является получение блокировки, связанной с объектом Condition (аналогично вызову ожидания и уведомления объекта, вам необходимо сначала получить блокировку Synchronized). ConditionObject — это внутренний класс в AQS, который реализует интерфейс Condition.Есть две переменные-члены типа Node, firstWaiter и lastWaiter, которые на самом деле представляют собой двусторонние очереди.
ждать
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 加入等待队列
Node node = addConditionWaiter();
// 释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 进入等待状态
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 被唤醒,重新尝试进入同步队列获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 如果被中断则抛出异常
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
Вызов метода await() приведет к тому, что текущий поток присоединится к очереди ожидания и освободит состояние синхронизации, разбудит узлы-преемники в очереди синхронизации, а затем текущий поток войдет в состояние ожидания. Когда узел в очереди ожидания пробуждается (signal()), он снова пытается получить состояние синхронизации, и если он пробуждается из-за прерывания, генерируется исключение.
уведомлять
public final void signal() {
// 判断当前线程是否获取了锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
// 唤醒等待队列中的第一个节点
if (first != null)
doSignal(first);
}
Вызов этого метода должен сначала получить состояние синхронизации. Этот метод переместит первый узел в очереди ожидания в очередь синхронизации, а затем разбудит его. Пробужденный узел повторит попытку получить состояние синхронизации.
Суммировать
Выше я кратко проанализировал важный инструмент AQS в JUC, Фактически, получение и выход из состояния синхронизации AQS эквивалентны входу и выходу из синхронизированной синхронизации. Очередь ожидания AQS эквивалентна пулу ожидания блокировки Synchronized. Условие в AQS эквивалентно пулу ожидания синхронизированных объектов. Функции await() и signal() эквивалентны функциям wait() и notify() объекта.
Ссылка: "Искусство параллельного программирования на Java"