1. Давай сначала поговорим о другом
Честно говоря, я планировал написать техническую статью о концепции дизайна, реализации и использовании AQS, но после написания первого черновика обнаружил, что то, что я освоил, все еще расплывчато и неоднозначно.
Тяжело снова спуститься на землю. Исходный код для интерпретации Java 8 основан.
2, профиль AQS
существуетjava.util.concurrent.locks
В пакете есть два таких класса:
- AbstractQueuedSynchronizer
- AbstractQueuedLongSynchronizer
Единственная разница между двумя классами заключается в следующем:
- Внутренне поддерживается AbstractQueuedSynchronizer
state
переменнаяint
тип - Внутренне поддерживается абстрактным AudiedLongsynchronizer
state
переменнаяlong
тип
что мы часто говоримAQS
Фактически, он относится к этим двум категориям, а именно抽象队列同步器
.
Синхронизатор абстрактной очереди AbstractQueuedSynchronizer (далее AQS) — это каркасный класс, используемый для построения блокировок или других компонентов синхронизации, который уменьшает объем кода, реализуемого каждым функциональным компонентом, а также решает множество деталей, связанных с реализацией синхронизаторов. , такие как ожидающие потоки, принимают порядок операций очереди FIFO. Некоторые гибкие критерии также могут быть определены в разных синхронизаторах, чтобы судить, должен ли поток пройти или ожидать.
AQS принимает режим метода шаблона.Основываясь на внутренней поддержке более чем n шаблонных методов, подклассы должны реализовывать только несколько конкретных методов (не абстрактные методы! Не абстрактные методы! Не абстрактные методы!), и подклассы могут быть реализованы. потребности.
Компоненты, реализованные на базе AQS, такие как:
- ReentrantLock реентерабельная блокировка (поддерживает честные и нечестные способы получения блокировок)
- Семафор подсчета семафоров
- ReentrantReadWriteLock Блокировка чтения-записи
- …
AQS - один из шедевров Дуга Ли, в Википедии, чтобы проверить его информацию, наткнулся на отца, похожего на красную или бледно-розовую рубашку?
3. Идеи дизайна AQS
AQS внутренне поддерживает переменную-член int для представления состояния синхронизации и управляет потоком, который получает общие ресурсы через встроенную очередь синхронизации FIFO (первым поступил — первым обслужен).
Мы можем предположить, что AQS на самом деле делает несколько вещей:
- Управление обслуживанием состояния синхронизации (состояния)
- Обслуживание и управление очередями ожидания
- Синхронизация и пробуждение
PS: конечно, он также поддерживает
ConditionObject
Внутренний класс в основном используется для кооперации и связи потоков.Мы пока не будем говорить об этом красавце.
Состояние типа int, поддерживаемое внутри AQS, может использоваться для представления любого состояния!
- ReentrantLock использует его, чтобы указать количество раз, когда поток-держатель блокировки неоднократно получал блокировку, а для потоков без блокировки, если состояние больше 0, это означает, что блокировка не может быть получена, и поток упакован как Node, добавьте в синхронную очередь ожидания.
- Semaphore использует его для представления количества оставшихся лицензий.Когда количество лицензий равно 0, потоки, которые не получили лицензию, но пытаются получить лицензию, войдут в синхронную очередь ожидания и заблокируются, пока некоторые потоки не освободит лицензию, которую они имеют. (состояние+1), а затем бороться за освобожденную лицензию.
- Он используется FutureTask для представления статуса задачи (не запущено, выполняется, завершено, отменено).
- Когда используется ReentrantReadWriteLock, ситуация немного отличается: состояние типа int представлено 32 битами в двоичном виде, первые 16 бит (старшие биты) представлены как блокировки чтения, а последние 16 бит (младшие биты) представлены как блокировки записи. .
- CountDownLatch использует состояние для указания количества счетчиков. Если состояние больше 0, это означает, что ему необходимо присоединиться к синхронной очереди ожидания и заблокировать. Пока состояние не равно 0, потоки в очереди ожидания будут разбужены одним.
3.1 Псевдокод для получения блокировки:
boolean acquire() throws InterruptedException {
while(当前状态不允许获取操作) {
if(需要阻塞获取请求) {
如果当前线程不在队列中,则将其插入队列
阻塞当前线程
}
else
返回失败
}
可能更新同步器的状态
如果线程位于队列中,则将其移出队列
返回成功
}
3.2 Замок с псевдокодом:
void release() {
更新同步器的状态
if (新的状态允许某个被阻塞的线程获取成功)
解除队列中一个或多个线程的阻塞状态
}
Наверное, чтобы объяснить эту мысль.
3.3 Предоставленные методы
3.3.1 Твердый метод
Следующие три методаprotected final
Измененный, каждый класс, наследующий AQS, может вызывать эти три метода.
- protected final int getState() Получить состояние синхронизации
- protected final void setState(int newState) Установить состояние синхронизации
- protected final boolean compareAndSetState(int expect, int update) Атомарно устанавливает состояние синхронизации в заданное значение обновления и возвращает true, если текущее значение состояния равно ожидаемому значению; в противном случае возвращает false
3.3.2 Необходимо внедрить субкарпоны
Следующие пять методов не реализованы в AQS, но реализуются подклассами, а затем AQS вызывает методы реализации подклассов для завершения логической обработки.
- protected boolean tryAcquire(int) Попытка получить операцию в эксклюзивном режиме, должна запросить состояние объекта, чтобы разрешить его получение в эксклюзивном режиме, и получить его, если это так.
- protected boolean tryRelease(int) попытаться освободить состояние синхронизации
- protected int tryAcquireShared(int) Общий метод пытается получить операцию
- protected boolean tryReleaseShared(int) Общий метод пытается освободить
- protected boolean isHeldExclusively() Является ли поток, вызывающий этот метод, держателем эксклюзивной блокировки
Подклассы не обязаны реализовывать все вышеперечисленные методы и могут выбирать некоторые из них для переопределения, но логика реализации должна оставаться неизменной и не может быть перемешана. Согласно различным реализациям, он делится на реализацию стратегии эксклюзивной блокировки и реализацию стратегии общей блокировки.
Это также является причиной того, что вышеуказанные методы не определены как абстрактные методы. Если подклассы определены как абстрактные методы, они должны реализовывать все пять методов, даже если вы их вообще не используете.
Эксклюзивный замок:
- ReentrantLock
- ReentrantReadWriteLock.WriteLock
Стратегия реализации: - tryAcquire(int)
- tryRelease(int)
- isHeldExclusively()
Общий замок:
- CountDownLatch
- ReentrantReadWriteLock.ReadLock
- Semaphore
Стратегия реализации: - tryAcquireShared(int)
- tryReleaseShared(int)
AQS также имеет множество внутренних шаблонных методов, поэтому я не буду приводить примеры по одному, последующая интерпретация исходного кода покажет некоторые из них и будет сопровождаться гневными комментариями.
4. Внутренние свойства AQS
4.1 Очередь CLH
AQS управляет потоками, которые получают общие ресурсы через встроенную очередь синхронизации FIFO (первым поступил — первым обслужен). Очередь CLH представляет собой двухстороннюю двустороннюю очередь FIFO, и механизм синхронизации AQS полагается на эту очередь CLH для завершения. Каждый узел очереди имеет указатель узла-предшественника и указатель узла-последователя.
Головной узел не находится в очереди на блокировку!
Исходный код узла:
static final class Node {
// 共享模式下等待标记
static final Node SHARED = new Node();
// 独占模式下等待标记
static final Node EXCLUSIVE = null;
// 表示当前的线程被取消
static final int CANCELLED = 1;
// 表示当前节点的后继节点包含的线程需要运行,也就是unpark
static final int SIGNAL = -1;
// 表示当前节点在等待condition,也就是在condition队列中
static final int CONDITION = -2;
// 表示当前场景下后续的acquireShared能够得以执行
static final int PROPAGATE = -3;
/**
* CANCELLED = 1 // 当前线程因为超时或者中断被取消。这是一个终结态,也就是状态到此为止。
* SIGNAL = -1 // 表示当前线程的后继线程被阻塞或即将被阻塞,当前线程释放锁或者取消后需要唤醒后继线程。这个状态一般都是后继节点设置前驱节点的
* CONDITION = -2 // 表示当前线程在Condition队列中
* PROPAGATE = -3 // 用于将唤醒后继线程传递下去,这个状态的引入是为了完善和增强共享锁的唤醒机制
* 0 // 表示无状态或者终结状态!
*/
volatile int waitStatus;
// 前驱节点
volatile Node prev;
// 后继节点
volatile Node next;
// 当前节点的线程,初始化使用,在使用后失效
volatile Thread thread;
// 存储condition队列中的后继节点
Node nextWaiter;
// 如果该节点处于共享模式下等待,返回true
final boolean isShared() {
return nextWaiter == SHARED;
}
// 返回当前节点的前驱节点,如果为空,直接抛出空指针异常
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
// 指定线程和模式的构造方法
Node(Thread thread, Node mode) { // Used by addWaiter
// SHARED和EXCLUSIVE 用于表示当前节点是共享还是独占
this.nextWaiter = mode;
this.thread = thread;
}
// 指定线程和节点状态的构造方法
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
4.2 volatile state
Наиболее важное свойство, это целое число можно использовать для представления любого состояния! Сказал выше.
4.2 volatile head & volatile tail
Заголовок заголовка, но этот головной узел является просто виртуальным узлом, просто логически представляет узел потока, удерживающий блокировку, и узел HEAD не хранится в информации о потоке THREAD и информации об узле перед приводом.
хвост Хвостовой узел, каждый новый узел попадет в хвост очереди. Информация об узле-преемнике не сохраняется.
- Эти два свойства инициализируются лениво. Когда первый поток удерживает блокировку в первый раз, второй поток инициализирует начало и конец при входе в очередь синхронизации, поскольку получение не удается. Когда блокировка может быть получена, его внутренние начало и конец оба являются нулевыми.После инициализации головы и хвоста, даже если ни один поток не удерживает блокировку позже, внутренние голова и хвост все еще сохраняют последний узел потока, удерживающий блокировку! (и голова, и хвост указывают на адрес памяти)
- Когда потоку не удается получить блокировку и он добавляется в очередь синхронизации, CAS используется для установки хвостового узла в качестве узла узла, соответствующего текущему потоку.
- Все операции cas внутри AQS зависят от класса Unsafe.Начиная с Java 9 и более поздних версий, класс Unsafe был удален и заменен классом VarHandle.
Эти два свойства изменяются с помощью volatile (гарантируя порядок и видимость переменных)
4.3 spinForTimeoutThreshold
Порог тайм-аута вращения используется в таких методах, как doAcquireSharedNanos().
- Если заданное пользователем время ожидания превышает этот порог, поток блокируется.В течение периода блокировки, если он может дождаться возможности проснуться и попытка tryAcquireShared завершается успешно, он возвращает true, в противном случае он возвращает false, и тайм-аут также возвращает false .
- Если определяемое пользователем время ожидания меньше или равно этому порогу, он будет зацикливаться бесконечно, и поток не будет блокироваться до тех пор, пока поток не выйдет из состояния синхронизации или не истечет время ожидания, а затем вернет соответствующий результат.
4.4 exclusiveOwnerThread
Это AQS через наследованиеAbstractOwnableSynchronizer
Класс, полученное свойство, представляющее держатель синхронизатора в монопольном режиме.
5. Специальная реализация AQS
5.1 Идея реализации монопольной блокировки
5.1.1 Получение блокировки ReentrantLock.lock()
/**
* 获取独占锁,忽略中断。
* 首先尝试获取锁,如果成功,则返回true;否则会把当前线程包装成Node插入到队尾,在队列中会检测是否为head的直接后继,并尝试获取锁,
* 如果获取失败,则会通过LockSupport阻塞当前线程,直至被释放锁的线程唤醒或者被中断,随后再次尝试获取锁,如此反复。被唤醒后继续之前的代码执行
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
---------------------------------------------------------------------------------------
其中tryAcquire()方法需要由子类实现,ReentrantLock通过覆写这个方法实现了公平锁和非公平锁
---------------------------------------------------------------------------------------
/**
* 在同步等待队列中插入节点
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 判断尾节点是否为null
if (pred != null) {
node.prev = pred;
// 通过CAS在队尾插入当前节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// tail节点为null,则将新节点插入队尾,必要时进行初始化
enq(node);
return node;
}
/**
* 通过无限循环和CAS操作在队列中插入一个节点成功后返回。
* 将节点插入队列,必要时进行初始化
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 初始化head和tail
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
/*
CAS设置tail为node
表面上看是把老tail的next连接到node。
如果同步队列head节点和tail节点刚刚被这个线程初始化,实际上也把head的next也连接到了node,而老tail节点被node取缔。
反之则是,把老tail的next连接到node,head并没有与node产生连接,这样就形成了链表 head <-> old_tail <-> tail
*/
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
* 在队列中的节点通过此方法获取锁,忽略中断。
* 这个方法很重要,如果上述没有获取到锁,将线程包装成Node节点加入到同步队列的尾节点,然后看代码里的注释
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
/*
* 检测当前节点前驱是否head,这是试获取锁。
* 如果是的话,则调用tryAcquire尝试获取锁,
* 成功,则将head置为当前节点。原head节点的next被置为null等待GC垃圾回收
*/
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
/*
* 如果未成功获取锁则根据前驱节点判断是否要阻塞。
* 如果阻塞过程中被中断,则置interrupted标志位为true。
* shouldParkAfterFailedAcquire方法在前驱状态不为SIGNAL的情况下都会循环重试获取锁。
* 如果shouldParkAfterFailedAcquire返回true,则会将当前线程阻塞并检查是否被中断
*/
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 根据前驱节点中的waitStatus来判断是否需要阻塞当前线程。
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 前驱节点设置为SIGNAL状态,在释放锁的时候会唤醒后继节点,
* 所以后继节点(也就是当前节点)现在可以阻塞自己。
*/
return true;
if (ws > 0) {
/*
* 前驱节点状态为取消,向前遍历,更新当前节点的前驱为往前第一个非取消节点。
* 当前线程会之后会再次回到循环并尝试获取锁。
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/**
* 等待状态为0或者PROPAGATE(-3),设置前驱的等待状态为SIGNAL,
* 并且之后会回到循环再次重试获取锁。
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* 该方法实现某个node取消获取锁。
*/
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
// 遍历并更新节点前驱,把node的prev指向前部第一个非取消节点。
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 记录pred节点的后继为predNext,后续CAS会用到。
Node predNext = pred.next;
// 直接把当前节点的等待状态置为取消,后继节点调用cancelAcquire方法时,也可以跨过该节点
node.waitStatus = Node.CANCELLED;
// 如果当前节点是尾节点,则将尾节点置为当前节点的前驱节点
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// 如果node还有后继节点,这种情况要做的是把pred和后继非取消节点拼起来。
int ws;
if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
Node next = node.next;
/*
* 如果node的后继节点next非取消状态的话,则用CAS尝试把pred的后继置为node的后继节点
* 这里if条件为false或者CAS失败都没关系,这说明可能有多个线程在取消,总归会有一个能成功的。
*/
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
/*
* 在GC层面,和设置为null具有相同的效果
*/
node.next = node;
}
}
Процесс выполнения получения эксклюзивной блокировки примерно следующий:
Если предположить, что текущая блокировка удерживается потоком A, и блокировка удерживается в течение достаточно долгого времени (для облегчения нашего объяснения и предотвращения поднятия планки), потоки B и C не могут получить блокировку.
Тема Б:
- 1. Упакуйте поток B в узел Node (сокращенно BN) и добавьте его в синхронную очередь ожидания.В это время состояние ожидания BN=0
- 2. Установите хвостовой узел в BN и соедините его с головным узлом, чтобы сформировать связанный список.
- 3. Головной узел — виртуальный узел, то есть поток, удерживающий блокировку (но не содержащий информации о потоке), а хвостовой узел — BN.
- 4. Поток B входит в «бесконечный цикл», определяет, является ли узел-предшественник головным узлом (истина), и пытается снова получить блокировку (ложь, не удается получить блокировку).
- 5. Поток B войдет в метод shouldParkAfterFailedAcquire.Внутри метода установите для состояния ожидания узла-предшественника (то есть головного узла) BN значение -1, и этот метод вернет false
- 6. Поскольку это бесконечный цикл, поток B снова входит в метод shouldParkAfterFailedAcquire.Поскольку состояние ожидания узла-предшественника (то есть головного узла) BN равно -1, он напрямую возвращает значение true.
- 7. Вызовите parkAndCheckInterrupt, текущий поток B заблокирован, ожидая пробуждения.
нить С:
- 1. Упакуйте поток C в узел Node (сокращенно CN) и добавьте его в синхронную очередь ожидания.В это время CN waitStatus=0
- 2. Установите хвостовой узел на CN и соедините его с исходным хвостовым узлом (узлом BN).
- 3. Поток C входит в «бесконечный цикл», чтобы определить, является ли предшествующий узел головным узлом (ложь).
- 4. Поток C войдет в метод shouldParkAfterFailedAcquire.Внутри метода установите для waitStatus узла-предшественника CN (то есть узла BN) значение -1, и этот метод вернет false
- 5. Поскольку это бесконечный цикл, поток C снова входит в метод shouldParkAfterFailedAcquire.Поскольку состояние ожидания узла-предшественника CN (то есть узла BN) равно -1, он напрямую возвращает значение true.
- 6. Вызовите parkAndCheckInterrupt, поток C заблокирован, ожидая пробуждения.
Окончательная очередь выглядит следующим образом:
+------+ +------+ +------+
| | <--- | | <--- | |
| head | | BN | | tail |
| AN | ---> | | ---> | (CN) |
+------+ +------+ +------+
5.1.2 Снятие блокировки ReentrantLock.unlock ()
Для выпуска эксклюзивного блокировки требуется метод Tryrelaes (INT), который реализован подклассом. После того, как блокировка полностью освобождена, нить, которая выделяет блокировку, будет просыпаться на теме преемника, а нить преемника будет соревноваться Для блокировки (недобросовестный замок)
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 头结点不为null且后继节点是需要被唤醒的
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
Процесс выполнения снятия монопольной блокировки примерно следующий (при условии, что есть узлы-преемники, которым необходимо проснуться):
- головной узел
waitStatus
установить на 0 - Разбудить узел-преемник
- После пробуждения потока узла-преемника узлу-преемнику будет присвоено значение head, а свойствам prev и thread в узле-преемнике будет присвоено значение null.
- Установите для следующего указателя исходного головного узла значение null и подождите, пока сборщик мусора вернет исходный головной узел.
+------+ +------+ +------+
| old | <-X- | new | <--- | |
| head | | head | | tail |
| AN | -X-> | BN | ---> | (CN) |
+------+ +------+ +------+
Как показано выше, узел AN (исходный головной узел) ожидает сбора мусора сборщиком мусора.
5.2 Идеи реализации общей блокировки
5.2.1 Получение замков
В отличие от монопольной блокировки, общая блокировка может удерживаться несколькими потоками.
Если вам нужен AQS для реализации общих блокировок, при реализации метода tryAcquireShared():
- Возвращается отрицательное число, указывающее, что получение не удалось
- Возвращает 0, указывая, что получение успешно, но последующий конкурирующий поток не будет успешным.
- Возвращает положительное число, указывающее, что получение данных прошло успешно, что указывает на то, что последующий конкурирующий поток также может быть успешным.
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
// 一旦共享获取成功,设置新的头结点,并且唤醒后继线程
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 这个函数做的事情有两件:
* 1. 在获取共享锁成功后,设置head节点
* 2. 根据调用tryAcquireShared返回的状态以及节点本身的等待状态来判断是否要需要唤醒后继线程
*/
private void setHeadAndPropagate(Node node, int propagate) {
// 把当前的head封闭在方法栈上,用以下面的条件检查
Node h = head;
setHead(node);
/*
* propagate是tryAcquireShared的返回值,这是决定是否传播唤醒的依据之一
* h.waitStatus为SIGNAL或者PROPAGATE时也根据node的下一个节点共享来决定是否传播唤醒
*/
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
/**
* 这是共享锁中的核心唤醒函数,主要做的事情就是唤醒下一个线程或者设置传播状态。
* 后继线程被唤醒后,会尝试获取共享锁,如果成功之后,则又会调用setHeadAndPropagate,将唤醒传播下去。
* 这个函数的作用是保障在acquire和release存在竞争的情况下,保证队列中处于等待状态的节点能够有办法被唤醒。
*/
private void doReleaseShared() {
/*
* 以下的循环做的事情就是,在队列存在后继线程的情况下,唤醒后继线程;
* 或者由于多线程同时释放共享锁由于处在中间过程,读到head节点等待状态为0的情况下,
* 虽然不能unparkSuccessor,但为了保证唤醒能够正确稳固传递下去,设置节点状态为PROPAGATE。
* 这样的话获取锁的线程在执行setHeadAndPropagate时可以读到PROPAGATE,从而由获取锁的线程去释放后继等待线程。
*/
for (;;) {
Node h = head;
// 如果队列中存在后继线程。
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
// 如果h节点的状态为0,需要设置为PROPAGATE用以保证唤醒的传播。
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
// 检查h是否仍然是head,如果不是的话需要再进行循环。
if (h == head)
break;
}
}
5.2.1 Снять блокировку
Код для снятия общих блокировок и получения общих блокировок использует doReleaseShared(int)
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
// doReleaseShared的实现上面获取共享锁已经介绍
doReleaseShared();
return true;
}
return false;
}
Думаю, всем должно быть понятно, скажем кратко (ручная собачья голова~):
В очереди синхронного ожидания, после пробуждения потока узла-преемника, заблокированного из-за невозможности получить общую блокировку, поток узла-преемника, в свою очередь, разбудит свой узел-преемник! И так далее.
Другой способ сказать это?
Эта ситуация может быть: блокировка записи вызывает некоторую блокировку потоков для чтения, и после того, как замок записи будет выпущен, он будет проснуться после следующего узла. Если последующий узел, это происходит потому, что поток, который заблокирован чтением Отказ блокировки блокировки, затем преемник узла потока пробудит следующие узлы ... пока все не получится успешно, или узел получает замок записи.
6. Расширение
6.1 РАСПРОСТРАНЕНИЕ Я должен сказать
В AQS есть баг, на него действительно стоит обратить внимание
В операции получения и освобождения разделяемой блокировки, я думаю, есть особенно важное значение статуса waitStatus, о котором я хочу вам рассказать, т.е.PROPAGATE
, значение этого атрибута означает, что он используется для передачи потока-преемника пробуждения.Это состояние введено для улучшения и улучшения механизма пробуждения общих блокировок.
Я прочитал много статей об AQS, в которых говорилось, что это значение статуса немного меньше, даже если это «фактическое сражение с параллельным программированием на Java» в этой книге, оно не упоминается, и, наконец, я увидел блог-сад очень подробно. Уточните этоPEOPAGATE
Статус также дал мне много вдохновения.
Правильно, когда я впервые посмотрел исходный код AQS, я даже прямо это поставилPROPAGATE
Значения состояния игнорируются. На самом деле, люди, не только читающие исходный код, легко вкладывают в этоPROPAGATE
Значение статуса игнорируется.Даже сам старик Дуг Леа не осознавал, какие последствия будут вызваны, если бы не было такого значения статуса.И только после появления ошибки, указанной выше, старик добавил этот статус и полностью починил это. есть эта ошибка.
Код для воспроизведения ошибки:
import java.util.concurrent.Semaphore;
public class TestSemaphore {
private static Semaphore sem = new Semaphore(0);
private static class Thread1 extends Thread {
@Override
public void run() {
sem.acquireUninterruptibly();
}
}
private static class Thread2 extends Thread {
@Override
public void run() {
sem.release();
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10000000; i++) {
Thread t1 = new Thread1();
Thread t2 = new Thread1();
Thread t3 = new Thread2();
Thread t4 = new Thread2();
t1.start();
t2.start();
t3.start();
t4.start();
t1.join();
t2.join();
t3.join();
t4.join();
System.out.println(i);
}
}
}
Когда программа выполняется, поток иногда зависает.
Давайте посмотрим на предыдущийsetHeadAndPropagate
На что похож метод.
private void setHeadAndPropagate(Node node, int propagate) {
setHead(node);
if (propagate > 0 && node.waitStatus != 0) {
Node s = node.next;
if (s == null || s.isShared())
unparkSuccessor(node);
}
}
Затем Semaphore.release() вызывает AQSreleaseShared
, посмотрите на времяreleaseShared
На что это похоже:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
Взгляните на Node в то время:
static final class Node {
// 忽略掉无关的代码,只展示waitStatus的状态值
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
}
setHeadAndPropagate
Методы иreleaseShared
метод, дизайн также очень прост.
В исходном коде того времени состояние ожидания Node не былоPROPAGATE=-3
ценность этого состояния.
Для удобства сравнения я поставилunparkSuccessor
Исходный код метода также отображается вместе:
private void unparkSuccessor(Node node) {
// 将node的waitStatus设置为0
compareAndSetWaitStatus(node, Node.SIGNAL, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
Далее давайте поговорим медленно ~
PS: Действительно, теперь должность начальника недалеко от меня, хоть моя работа и выполнена на много дней вперед, но все равно немного паника ~ Приходится продолжать писать с риском!
В процессе получения AQS общей блокировки есть два способа разбудить поток, который входит в режим ожидания синхронизации (заблокирован):
- После того, как другие потоки освободит семафор, вызовите unparkSuccessor(
releaseShared
метод) - После того, как другие потоки успешно получат общую блокировку, они разбудят узлы-преемники с помощью механизма распространения (то есть в
setHeadAndPropagate
метод).
Примеры воспроизводят ошибку, она проста, то есть постоянно повторяется в цикле из четырех потоков экземпляра, два потока до получения семафора, семафор освобождает два потока, основной поток ждет, пока все четыре потока запустят печать завершена.
Когда последние два потока не освобождают семафор, очередь синхронного ожидания внутри AQS выглядит следующим образом:
+------+ +------+ +------+
| | <--- | | <--- | |
| head | | t1 | | t2 |
| | ---> | | ---> | |
+------+ +------+ +------+
- 1. t3 освобождает семафор и вызывает
releaseShared
, разбудить поток t1 в узле-преемнике, и в то же время состояние ожидания головы становится равным 0 - 2. t1 просыпается, вызывает метод tryAcquireShared Semaphore.NonfairSync и возвращает 0
- 3. t4 освобождает семафор и вызывает
releaseShared
,существуетreleaseShared
Голова, прочитанная в методе, по-прежнему является исходной головкой, но в это время состояние ожидания головы стало равным 0, поэтому она не будет вызываться.unparkSuccessor
метод - 4. t1 пробуждается.Поскольку на шаге 2 вызывается метод tryAcquireShared Semaphore.NonfairSync, и возвращается 0, поэтому он не будет вызывать
unparkSuccessor
метод
Пока что два подхода все запечатаны, нить будить t2 нет, нить висит вживую...
PS: Дуг Ли с черным вопросительным знаком, ха-ха~
Для того, чтобы исправить эту ошибку, старичок сделал следующие улучшения:
- 1. Добавьте статус ожидания, т.е.
PROPAGATE
- 2. В
releaseShared
извлечено из методаdoReleaseShared()
(показано выше) вdoReleaseShared
В методе, если состояние головного узла равно 0, для него необходимо установить значение PROPAGATE, чтобы обеспечить распространение пробуждения. - 3. В
setHeadAndPropagate
Также в методе есть еще несколько суждений, среди которых, если waitStatus головного узла меньше 0, он разбудит узел-преемник (PROPAGATE = -3).
Через улучшенный код давайте рассмотрим его снова:
- 1. t3 освобождает семафор и вызывает
releaseShared
, разбудить поток t1 в узле-преемнике, и в то же время состояние ожидания головы становится равным 0 - 2. t1 просыпается, вызывает метод tryAcquireShared Semaphore.NonfairSync и возвращает 0
- 3. Этот шаг происходит одновременно с шагами 2 и 2, t4 освобождает семафор и вызывает
releaseShared
,существуетdoReleaseShared
Голова, прочитанная в методе, по-прежнему является исходной головкой, но в это время состояние ожидания головы стало равным 0, а состояние ожидания головы установлено в PROPAGATE (-3). - 4. t1 просыпается, колл
setHeadAndPropagate
метод, установите t1 в качестве заголовка, выполните условия, введите оператор ветвления и вызовитеdoReleaseShared
метод, а затем разбудить поток узла t2.
6.2 Некоторые мысли об unparkSuccessor
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 通常情况下,要唤醒的线程都是当前节点的后继线程
* 但是,如果当前节点的后继节点被取消了,则从队列尾部向前遍历,直到找到未被取消的后继节点
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
В методе unparkSuccessor, если узел-преемник текущего узла отменяется, он будет проходить вперед из хвоста очереди, пока не будет найден узел-преемник, который не был отменен.
Этот вопрос, вы также можете подумать об этом сами, почему вы хотите траверсировать вперед от хвостового узла?
Предположим, очередь CLH показана на следующем рисунке:
+------+ +------+ +------+
| | <--- | | <--- | |
| head | | t1 | | tail |
| | ---> | | ---> | |
+------+ +------+ +------+
t1.waitStatus = 1 и tail.waitStatus = 1
head пытается разбудить узел-преемник t1 и обнаруживает, что t1 находится в отмененном состоянии, затем обнаруживает хвост узла-преемника t1 и обнаруживает, что tail также находится в отмененном состоянии, но tail.next == null.
При этом в хвост очереди добавляется новый узел, но исходный tail.next еще не указан на новый узел.
Другими словами, хвост. Никтор происходит, если шаги 1 и 2 в середине, затем перерываются.
Выдержка из части кода addWaiter:
node.prev = pred;
// 通过CAS在队尾插入当前节点
if (compareAndSetTail(pred, node)) { // 步骤1
pred.next = node; // 步骤2
return node;
}
6.3 В методе AcquireQueued зачем еще раз пытаться получить?
В эксклюзивном режиме вот что я думаю об этой проблеме:
Момент 1: поток B пытается получить блокировку, но поскольку блокировка удерживается потоком A, поток B готов к вызову.addWaiter
, поставил себя в очередь (но еще не установил соединение указателя с головным узлом)
Время 1: в то же время поток A пытается снять блокировку, вводит метод освобождения, вызывает функцию tryRelease() подкласса, устанавливает состояние, представляющее количество удерживаний блокировки, равным 0 (что означает, что блокировка не удерживается любым потоком) и входитunparkSuccessor
метод, обнаруживается, что узла-преемника нет (поскольку новый узел еще не вошел в очередь), поэтому ни один поток не будет разбужен. В этот момент операция снятия блокировки потока A завершена.
Момент 2: вызов потока BaddWaiter
Метод завершен, он поставлен в очередь, и создано соединение указателя с головным узлом.
Момент 3: вызов потока BacquireQueued
метод (показан в следующем коде), если он не вызывается в этом методеtryAcquire
, Такая ситуация будет иметь место:Очевидно, что блокировку можно получить, но поток приостанавливается, что делает всю очередь синхронизации недоступной.
так,TryAcquire вызывается снова, чтобы предотвратить попадание нового узла в очередь, но головной узел снял блокировку, что привело к параличу всей очереди синхронизации.
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 防止新节点还未入队,但是头结点已经释放了锁,导致整个同步队列中断瘫痪的情况发生
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
Заканчивать
Чтение исходного кода AQS очень полезно для нас, чтобы изучить и освоить компоненты, реализованные на основе AQS.
Особенно его дизайнерские концепции и идеи находятся в центре нашего исследования!
Бумага AQS Doug Lea, друзья с лучшим английским, может пожелать прочитать его