Синхронизатор очередей AQS (AbstractQueuedSynchronizer) является очень важным компонентом JUC.На его основе можно легко и эффективно построить некоторые распространенные блокировки и синхронизаторы, такие как ReentrantLock, Semaphore и т. д. (Учебное содержание этой статьи основано на JDK1. 8) Эта статья в основном посвящена реализации исходного кода AQS и некоторым часто используемым компонентам синхронизации на основе AQS.
Основное содержание
Используя компоненты синхронизации в JUC, параллельное программирование может быть выполнено более лаконично, и это появилось при реализации многих компонентов синхронизации.Sync extends AbstractQueuedSynchronizer
На рисунке, переписывая некоторые методы СКОС, реализуются функции соответствующих компонентов. AQS — это ключ к реализации блокировок. Блокировки предназначены для пользователей блокировок и определяют, как используются блокировки. AQS предназначен для разработчиков блокировок, что упрощает реализацию блокировок и экранирует управление состоянием синхронизации и очередями потоков. операции уровня.
AQS использует шаблон проектирования метода шаблона, который поддерживает реализацию различных синхронизаторов путем перезаписи соответствующих методов через подклассы. В AQS есть переменная состояния, представляющая состояние синхронизации (состояние синхронизации здесь можно рассматривать как ресурс, а получение состояния синхронизации можно рассматривать как соревнование за ресурсы синхронизации), AQS предоставляет различные способы синхронизации получения. состояния, включая монопольное получение, совместное получение, получение тайм-аута и т. д., которые будут подробно описаны ниже.
Принципиальный анализ
Далее будет объединен исходный код для анализа принципа и реализации AQS с точки зрения метода шаблона, управления состоянием синхронизации, очереди блокировки CLH, монопольного метода получения, совместного метода получения и метода получения тайм-аута.
шаблонный метод
Список методов, которые могут быть переопределены подклассами, выглядит следующим образом.
имя метода | использовать |
---|---|
tryAcquire(int arg) | В основном он используется для достижения эксклюзивного доступа к состоянию синхронизации.Для реализации этого метода вам необходимо запросить, соответствует ли текущее состояние ожиданиям, а затем выполнить соответствующее обновление состояния для достижения контроля (возвращает true, если получение данных прошло успешно). , в противном случае возвращает false. Успех обычно означает, что состояние синхронизации может быть обновлено, а если не удается, то оно не соответствует условиям для обновления состояния синхронизации), где arg представляет количество состояний синхронизации, которые необходимо получить |
tryRelease(int arg) | Он в основном используется для исключительного освобождения состояния синхронизации и одновременного обновления состояния синхронизации (обычно он возвращает true, когда состояние состояния синхронизации обновляется до 0, что указывает на то, что ресурсы синхронизации были полностью освобождены), где arg указывает количество состояний синхронизации, которые необходимо освободить. |
tryAcquireShared(int arg) | В основном используется для одновременного достижения общего доступа к статусу синхронизации и обновления статуса синхронизации. |
tryReleaseShared(int arg) | В основном используется для одновременного достижения состояния синхронизации общего выпуска и состояния синхронизации обновления. |
isHeldExclusively() | Обычно используется для определения того, является ли синхронизатор исключительным для текущего потока. |
Управление состоянием синхронизации
Блокировка потока реализована в AQS как операция над состоянием синхронизации.Благодаря управлению состоянием синхронизации могут быть реализованы различные задачи синхронизации и состояния синхронизации.state
Это ключевой домен AQS
// 因为state是volatile的,所以get、set方法均为原子操作,而compareAndSetState方法
// 使用了Unsafe类的CAS操作,所以也是原子的
// 同步状态
private volatile int state;
// 同步状态的操作包括
// 获取同步状态
protected final int getState() { return state;}
// 设置同步状态
protected final void setState(int newState) { state = newState;}
// CAS操作更新同步状态
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
очередь блокировки CLH
Блокировка CLH (Craig, Landin, and Hagersten) представляет собой разновидность спиновой блокировки. Вариант блокировки CLH используется в AQS для реализации двусторонней очереди и использования ее для реализации функции блокировки Блокировка выделяется путем инкапсуляции потока, запрашивающего общие ресурсы, в качестве узла в очереди.
Головной узел двусторонней очереди записывает поток в рабочем состоянии.Если последующий узел не может получить состояние синхронизации, он перейдет в состояние блокировки, и новый узел присоединится к очереди с конца очереди, чтобы конкурировать для состояния синхронизации.
// 队列的数据结构如下
// 结点的数据结构
static final class Node {
// 表示该节点等待模式为共享式,通常记录于nextWaiter,
// 通过判断nextWaiter的值可以判断当前结点是否处于共享模式
static final Node SHARED = new Node();
// 表示节点处于独占式模式,与SHARED相对
static final Node EXCLUSIVE = null;
// waitStatus的不同状态,具体内容见下文的表格
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
// 记录前置结点
volatile Node prev;
// 记录后置结点
volatile Node next;
// 记录当前的线程
volatile Thread thread;
// 用于记录共享模式(SHARED), 也可以用来记录CONDITION队列(见扩展分析)
Node nextWaiter;
// 通过nextWaiter的记录值判断当前结点的模式是否为共享模式
final boolean isShared() { return nextWaiter == SHARED;}
// 获取当前结点的前置结点
final Node predecessor() throws NullPointerException { ... }
// 用于初始化时创建head结点或者创建SHARED结点
Node() {}
// 在addWaiter方法中使用,用于创建一个新的结点
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
// 在CONDITION队列中使用该构造函数新建结点
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
// 记录头结点
private transient volatile Node head;
// 记录尾结点
private transient volatile Node tail;
Таблица состояния узлов (waitStatus, по умолчанию 0 при инициализации)
название штата | значение состояния | описание статуса |
---|---|---|
CANCELLED | 1 | Указывает, что текущий узел (то есть соответствующий поток) отменен из-за тайм-аута или прерывания и не может быть восстановлен после перехода в это состояние. |
SIGNAL | -1 | Указывает, что узел-преемник текущего узла заблокирован (или будет) при парковке. Когда узел освобождается или отменяется, ему необходимо разбудить узел-преемник с помощью отмены парковки (выражается методом unparkSuccessor()). |
CONDITION | -2 | Это состояние используется для узла очереди условий, указывая, что узел находится в очереди ожидания, а поток узла ожидает условия.Когда другие потоки вызывают метод signal() для условия, они будут добавлены к синхронизации. Содержание этой части будет упомянуто в расширении. |
PROPAGATE | -3 | Указывает, что получение следующего общего состояния синхронизации будет безоговорочно распространяться на последующие узлы. |
На следующем рисунке показана базовая структура очереди.
Эксклюзивный доступ
Эксклюзивное (ЭКСКЛЮЗИВНОЕ) приобретение необходимо переписатьtryAcquire
,tryRelease
метод и доступacquire
,release
метод для реализации соответствующей функции.
Схема приобретения выглядит следующим образом:
Приведенная выше блок-схема более сложная, вот краткий обзор процесса.
- Поток пытается получить состояние синхронизации, в случае успеха продолжает выполнение, а в случае неудачи добавляется в очередь синхронизации и раскручивается
- Если состояние синхронизации получено непосредственно в процессе вращения (передний узел является головным и попытка получить состояние синхронизации успешна), оно может быть выполнено напрямую.
- Если состояние синхронизации не может быть получено немедленно, передний узел будет установлен в состояние SIGNAL и перейдет в состояние блокировки с помощью метода park(), ожидая пробуждения метода unpark().
- Если поток пробуждается методом unpark() (в данном случае пре-узел выполняет операцию освобождения), пре-узел является головным узлом и пробудившийся поток получает состояние синхронизации, то возобновляем работу
Основной код выглядит следующим образом:
// 这里不去看tryAcquire、tryRelease方法的具体实现,只知道它们的作用分别为尝试获取同步状态、
// 尝试释放同步状态
public final void acquire(int arg) {
// 如果线程直接获取成功,或者再尝试获取成功后都是直接工作,
// 如果是从阻塞状态中唤醒开始工作的线程,将当前的线程中断
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 包装线程,新建结点并加入到同步队列中
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 尝试入队, 成功返回
if (pred != null) {
node.prev = pred;
// CAS操作设置队尾
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 通过CAS操作自旋完成node入队操作
enq(node);
return node;
}
// 在同步队列中等待获取同步状态
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋
for (;;) {
final Node p = node.predecessor();
// 检查是否符合开始工作的条件
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
// 获取不到同步状态,将前置结点标为SIGNAL状态并且通过park操作将node包装的线程阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果获取失败,将node标记为CANCELLED
if (failed)
cancelAcquire(node);
}
}
Схема выпуска выглядит следующим образом
Процесс релиза относительно прост, главное — обновить состояние синхронизации через tryRelease, а затем при необходимости разбудить заблокированный поток в почтовом узле.
Основной код выглядит следующим образом
// release
public final boolean release(int arg) {
// 首先尝试释放并更新同步状态
if (tryRelease(arg)) {
Node h = head;
// 检查是否需要唤醒后置结点
if (h != null && h.waitStatus != 0)
// 唤醒后置结点
unparkSuccessor(h);
return true;
}
return false;
}
// 唤醒后置结点
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 通过CAS操作将waitStatus更新为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 检查后置结点,若为空或者状态为CANCELLED,找到后置非CANCELLED结点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒后置结点
if (s != null)
LockSupport.unpark(s.thread);
}
общий доступ
Совместное (SHARED) приобретение необходимо переписатьtryAcquireShared
,tryReleaseShared
метод и доступacquireShared
,releaseShared
метод для реализации соответствующей функции. В отличие от эксклюзивного типа, общий тип поддерживает несколько потоков для получения состояния синхронизации и одновременной работы.
acquireShared
ПроцессAcquireShared очень похож на процесс приобретения, и процесс примерно такой же.Ниже приводится краткий обзор
- Поток получает состояние синхронизации. Если его можно получить, он будет выполнен напрямую. Если его невозможно получить, новый общий узел войдет в очередь синхронизации.
- Поскольку состояние синхронизации не может быть получено, поток будет заблокирован методом парковки, ожидая пробуждения.
- После пробуждения, если соблюдены условия для получения состояния синхронизации, он будет распространяться в обратном направлении и пробуждать последующие узлы.
//
public final void acquireShared(int arg) {
// 尝试共享式获取同步状态,如果成功获取则可以继续执行,否则执行doAcquireShared
if (tryAcquireShared(arg) < 0)
// 以共享式不停得尝试获取同步状态
doAcquireShared(arg);
}
// Acquires in shared uninterruptible mode.
private void doAcquireShared(int arg) {
// 向同步队列中新增一个共享式的结点
final Node node = addWaiter(Node.SHARED);
// 标记获取失败状态
boolean failed = true;
try {
// 标记中断状态(若在该过程中被中断是不会响应的,需要手动中断)
boolean interrupted = false;
// 自旋
for (;;) {
// 获取前置结点
final Node p = node.predecessor();
// 若前置结点为头结点
if (p == head) {
// 尝试获取同步状态
int r = tryAcquireShared(arg);
// 若获取到同步状态。
if (r >= 0) {
// 此时,当前结点存储的线程恢复执行,需要将当前结点设置为头结点并且向后传播,
// 通知符合唤醒条件的结点一起恢复执行
setHeadAndPropagate(node, r);
p.next = null; // help GC
// 需要中断,中断当前线程
if (interrupted)
selfInterrupt();
// 获取成功
failed = false;
return;
}
}
// 获取同步状态失败,需要进入阻塞状态
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 获取失败,CANCELL node
if (failed)
cancelAcquire(node);
}
}
// 将node设置为同步队列的头结点,并且向后通知当前结点的后置结点,完成传播
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);
// 向后传播
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if(s == null || s.isShared())
doReleaseShared();
}
}
releasShared
После того, как releaseShared попытается успешно освободить состояние синхронизации, он разбудит задний узел и обеспечит распространение.
public final boolean releaseShared(int arg) {
// 尝试释放同步状态
if (tryReleaseShared(arg)) {
// 成功后唤醒后置结点
doReleaseShared();
return true;
}
return false;
}
// 唤醒后置结点
private void doReleaseShared() {
// 循环的目的是为了防止新结点在该过程中进入同步队列产生的影响,同时要保证CAS操作的完成
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
Как получить сверхурочную работу
Получение тайм-аута позволяет блокировке, реализованной AQS, поддерживать получение блокировок по тайм-ауту, что недоступно с ключевым словом synchronized.Что касается конкретной реализации, она аналогична вышеприведенной реализации, за исключением того, что ограничение времени на основе эксклюзивного и совместного приобретения.Ограничения, блокируя время с помощью метода parkNanos(), здесь больше нет расширения.
Анализ случая
Ниже перечислены несколько часто используемых параллельных компонентов.
ReetrantLock
ReentrantLock, повторная блокировка. Операции блокировки и разблокировки реализуются исключительно через AQS, и один и тот же поток поддерживает многократное получение блокировок. Основными операциями являются блокировка и разблокировка, а их реализация зависит от получения и освобождения соответственно.
private final Sync sync;
// 继承AQS,重写相应方法
abstract static class Sync extends AbstractQueuedSynchronizer {
abstract void lock();
final boolean nonfairTryAcquire(int acquires) { ... }
protected final boolean tryRelease(int releases) { ... }
// ...略
}
static final class NonfairSync extends Sync {
final void lock() { ... }
protected final boolean tryAcquire(int acquires) { ... }
}
static final class FairSync extends Sync {
final void lock() { ... }
protected final boolean tryAcquire(int acquires) { ... }
}
Связанное резюме
- Блокировка с повторным входом поддерживает два метода: добросовестное получение и недобросовестное (по умолчанию) получение. Конструктор честности определяет, следует ли использовать NonfairSync или FairSync для завершения создания экземпляра синхронизации. Разница между этими двумя методами заключается в том, что справедливость требует, чтобы порядок получения блокировки был быть совместимым с приложением.Хронологический порядок , то есть строго в соответствии с синхронной очередью FIFO, и нечестный тип не рассматривается (справедливый тип обеспечивает справедливость, оценивая узлы-предшественники текущего узла)
- Логика повторной блокировки заключается в том, что если блокировка не была получена (состояние = 0), это означает, что блокировка может быть получена напрямую, а состояние синхронизации может быть обновлено (в этом случае требуется обновление CAS для обеспечения атомарности). ). Если блокировка была получена, оценивается получение блокировки. Является ли поток текущим потоком, и если да, то обновляется состояние синхронизации (состояние + получение, которое может быть обновлено непосредственно в это время, поскольку только этот поток может получить доступ к коду), указывая на то, что замок также может быть получен. В противном случае блокировка не может быть получена в данный момент, и поток будет заблокирован.
- Логика разблокировки реентерабельной блокировки заключается в обновлении состояния синхронизации (state - releases), если состояние равно 0, то это означает, что поток полностью снимает блокировку и возвращает true, иначе возвращает false
CountDownLatch
CountDownLatch, инструмент синхронизации, может заставить один или несколько потоков ждать, пока не будет выполнено указанное количество потоков, прежде чем выполняться. Совместная реализация через AQS. Основными операциями являются await (предоставить текущему потоку ожидание, вызвать метод AcquiSharedInterruptably AQS, который можно просто рассматривать как метод AcquireShared, реализация в основном такая же), countDown (выполнить операцию выхода из состояния синхронизации), в на самом деле, общая идея заключается в том, что инициализация CountDownLatch должна быть. Количество состояний синхронизации, выполняющих операцию ожидания, может быть выполнено только тогда, когда количество состояний синхронизации полностью освобождено (до 0), а задачи, которые необходимо выполнить в первую очередь выпустит определенное количество состояний синхронизации через countDown после завершения.
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
// 检查同步状态数是否已经为0,不为0则同步状态获取失败
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 释放一定的同步状态数
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
ReentrantReadWriteLock
ReentrantReadWriteLock, реентерабельная блокировка чтения-записи, использует как эксклюзивный, так и общий типы AQS.При выполнении операции записи блокировка принадлежит исключительно потоку записи, а другие потоки записи и чтения блокируются. Когда выполняется операция чтения, поток записи блокируется, и все потоки чтения могут совместно использовать блокировку. Реализация блокировки чтения-записи относительно сложна, я не буду выкладывать здесь слишком много кода, а кратко подытожу его реализацию:
- Блокировка чтения-записи реализуется внутри блокировкой чтения и блокировкой записи. Блокировка чтения и блокировка записи имеют общее состояние состояния синхронизации, поэтому состояние чтения и записи определяется состоянием синхронизации. Блокировка чтения-записи использует метод побитовой сегментации для достижения состояния синхронизации, которое представляет два разных типа состояния (чтение и запись). Блокировка чтения-записи делит переменную на две части, старшие 16 бит представляют чтение, а младшие 16 бит — запись. Тогда значением состояния записи будет state&0x0000ffff, и операция модификации может быть выполнена непосредственно над состоянием. Значение состояния чтения — это состояние >> 16, а операция модификации (например, операция добавления) — (состояние + 0x00010000).
- Логика получения блокировки записи: если текущий поток получил блокировку записи, состояние записи увеличивается; если была получена блокировка чтения или поток, получивший блокировку записи, не является текущим потоком, текущий поток переходит в состояние записи. очередь синхронизации и ожидания. Если потока получения блокировки нет, он запрашивается напрямую.
- Логика снятия блокировки записи уменьшает состояние записи до тех пор, пока состояние записи не станет равным 0, указывая на то, что блокировка записи полностью снята.
- Логика получения блокировки чтения, когда блокировка записи не получена, всегда можно получить блокировку чтения. Если текущий поток получил блокировку чтения, добавляется состояние чтения (это сумма состояний чтения каждого потока чтения, и состояние чтения каждого потока записывается в ThreadLocal). Если блокировка записи уже была получена, блокировка чтения не может быть получена.
- Логика снятия блокировки чтения, каждое освобождение будет уменьшать состояние чтения
- ReentrantReadWriteLock поддерживает понижение уровня блокировки, что означает, что после получения блокировки записи сначала необходимо получить блокировку чтения, а затем снять блокировку записи, чтобы завершить понижение уровня блокировки записи до блокировки чтения. Это может обеспечить видимость данных и предотвратить получение другими потоками блокировки записи после того, как блокировка записи будет снята напрямую, и текущий поток не сможет получить модификацию потока блокировки записи. Но блокировки чтения-записи не поддерживают укрупнение блокировок по той же причине.
расширять
Condition
При синхронизированной блокировке можно дождаться уведомления через методы wait() и notify() метода класса Object, тогда в процессе блокировки Lock есть аналогичные операции, а именно интерфейс Condition, который предоставляет await() ), signal() с той же функцией.
В AQS есть класс ConditionObject, реализующий интерфейс Condition. Он также использует структуру данных Node для формирования очереди (FIFO), которая отличается от синхронной очереди и может быть названа очередью ожидания. Для получения условия необходимо передать метод newCondition интерфейса блокировки, что означает, что блокировка может иметь несколько очередей ожидания, а объект, предоставляемый моделью монитора объектов, имеет только одну очередь ожидания.
// Condition的数据结构
static final class Node {
// next 指针
Node nextWaiter;
// ...
}
public class ConditionObject implements Condition, java.io.Serializable {
// head
private transient Node firstWaiter;
// tail
private transient Node lastWaiter;
// ...
}
Давайте подробно рассмотрим операции ожидания и сигнала.
await()
// 涉及中断的操作,暂时忽略
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 向等待队列的队尾新建一个CONDITION结点
Node node = addConditionWaiter();
// 因为要进入等待状态,所以需要释放同步状态(即释放锁),如果失败,该结点会被CANCELLED
int savedState = fullyRelease(node);
int interruptMode = 0;
// 判读该结点是否在同步队列上,如果不在就通过park操作将其阻塞,进入等待状态
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 从等待状态恢复,进入同步队列竞争同步状态
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
// 向等待队列的队尾新建一个CONDITION结点
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果最后一个结点的waitStatus并非CONDITION,说明该结点被CANCELLED了,需要
// 从队列中清除掉
if (t != null && t.waitStatus != Node.CONDITION) {
// 将CANCELLED结点从等待队列中清除出去
unlinkCancelledWaiters();
t = lastWaiter;
}
// 新建CONDITION结点并且将其加入队尾
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
О работе ожидания
- После выполнения операции ожидания поток будет обернут как узел CONDITION и войдет в очередь ожидания.
- Блокировать нить через парк
- После пробуждения поток входит в очередь синхронизации из очереди ожидания, чтобы конкурировать за состояние синхронизации.
signal()
//
public final void signal() {
// 校验
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
// 唤醒等待队列的头结点
if (first != null)
doSignal(first);
}
// 执行唤醒操作
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 唤醒结点并且将其加入同步队列
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
// 将唤醒的结点加入到同步队列中竞争同步状态,恢复执行
final boolean transferForSignal(Node node) {
// 将node的状态从CONDITION恢复到默认状态,该CAS操作由外层doSignal的循环保证成功操作
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 将node加入到同步队列中
Node p = enq(node);
int ws = p.waitStatus;
// 如果前置结点已经被取消或者将前置结点设置为SIGNAL失败,就通过unpark唤醒node包装的线程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
О работе сигнала
Разбудите головной узел очереди ожидания, удалите его из очереди ожидания, добавьте в очередь синхронизации, чтобы конкурировать за состояние синхронизации, и возобновите выполнение.
Существуют также некоторые операции, такие как signalAll(), которые удаляют все узлы в очереди ожидания из очереди ожидания и добавляют их в очередь синхронизации, чтобы конкурировать за состояние синхронизации.
StampedLock
StampedLock — это новая блокировка в Java8, которая представляет собой улучшение блокировок чтения-записи. Хотя блокировка чтения-записи разделяет функции чтения и записи, она использует пессимистическую стратегию в отношении параллелизма чтения и записи, что приводит к тому, что имеется много случаев чтения и мало случаев записи. , поток записи может быть не в состоянии конкурировать за блокировку и быть заблокированным, сталкиваясь с проблемой голодания.
StampedLock предоставляет 3 режима управления блокировками: запись, чтение и оптимистическое чтение. При блокировке можно получить штамп в качестве сертификата проверки.При снятии блокировки необходимо проверить этот сертификат.Если сертификат недействителен (например, в процессе чтения поток записи модифицировал его), необходимо повторно получить сертификат и обновить данные. Это очень подходит для сценариев, где мало операций записи и много операций чтения.Можно оптимистично предположить, что операция записи не будет происходить в процессе чтения данных, а проверка учетных данных выполняется до разблокировки потока чтения , При необходимости переключитесь на пессимистическую блокировку чтения, чтобы завершить сбор данных. Это может значительно улучшить производительность программы.
StampedLock не использует AQS в своей реализации, но многие из его дизайнерских идей и методов основаны на AQS с некоторыми модификациями. Очередь CLH также поддерживается внутри StampedLock для выполнения связанных функций.
По сравнению с ReentrantReadWriteLock вызов API StamptedLock относительно сложен, поэтому ReentrantReadWriteLock по-прежнему используется во многих случаях.
Подробнее о StampedLock будет добавлено позже.
использованная литература
- Искусство параллельного программирования на Java
- Основы собеседования по параллельному программированию: принцип AQS и краткое изложение компонентов синхронизации AQS
- AQS подробное объяснение технологии Java
- Изучение новых возможностей Java 8 (10) StampedLock станет новым фаворитом для решения проблем с синхронизацией
Если есть проблема, пожалуйста, также укажите