читать с вопросами
1. Что такое AQS, какова его роль и какова основная идея?
2. Каковы принципы эксклюзивных и общих блокировок в AQS? Является ли механизм блокировки, предоставляемый AQS, справедливым или несправедливым?
3. Какие есть реализации AQS на Java и как реализовать собственное управление блокировками на основе AQS
4. Какие еще возможности предоставляет AQS в дополнение к платформе блокировки?
Введение в AQS
AbstractQueuedSynchronizer(AQS)
Предоставляет платформу, которую можно использовать для реализации механизмов синхронизации блокировок. Не будет преувеличением сказать, чтоAQS
даJUC
Краеугольный камень структуры синхронизации.AQS
черезFIFO
Очередь поддерживает состояние синхронизации потока. Классу реализации нужно только наследовать этот класс и переопределить указанный метод, чтобы реализовать набор механизмов синхронизации потока.
AQS
Предоставляется в соответствии с уровнем взаимного исключения ресурсовэксклюзивный и общийДва режима доступа к ресурсам и их определенияCondition
структура обеспечиваетwait/signal
Дождитесь срабатывания механизма пробуждения. существуетJUC
в, напримерReentrantLock
,CountDownLatch
и т.д. основаны наAQS
выполнить.
Платформа AQS
Принцип AQS
AQS
Принцип не сложныйAQS
поддерживалvolatile int state
переменная иCLH(三个人名缩写)双向队列
, узлы в очереди содержат ссылки на потоки, и каждый узел может передаватьgetState()
,setState()
иcompareAndSetState()
правильноstate
для изменения и доступа. ·
Когда поток получает блокировку, он пытаетсяstate
Если переменная модифицируется, блокировка устанавливается, если модификация прошла успешно; если модификация не удалась, она упаковывается как узел и монтируется в очередь, ожидая, пока поток, удерживающий блокировку, освободит блокировку и пробудит узел в очередь.
Метод шаблона AQS
AQS
Логика обслуживания очереди инкапсулирована внутри, а метод шаблона используется для предоставления следующих методов класса реализации:
tryAcquire(int); // 尝试获取独占锁,可获取返回true,否则false
tryRelease(int); // 尝试释放独占锁,可释放返回true,否则false
tryAcquireShared(int); // 尝试以共享方式获取锁,失败返回负数,只能获取一次返回0,否则返回个数
tryReleaseShared(int); // 尝试释放共享锁,可获取返回true,否则false
isHeldExclusively(); // 判断线程是否独占资源
Если класс реализации должен реализовать только функцию монопольной/разделяемой блокировки, он может реализовать толькоtryAcquire/tryRelease
илиtryAcquireShared/tryReleaseShared
. Хотя понялtryAcquire/tryRelease
Вы можете задать свою логику, но рекомендуется использоватьstate
пара методовstate
переменные для работы для реализации синхронизированных классов.
Ниже приведен пример простой реализации блокировки синхронизации:
public class Mutex extends AbstractQueuedSynchronizer {
@Override
public boolean tryAcquire(int arg) {
return compareAndSetState(0, 1);
}
@Override
public boolean tryRelease(int arg) {
return compareAndSetState(1, 0);
}
public static void main(String[] args) {
final Mutex mutex = new Mutex();
new Thread(() -> {
System.out.println("thread1 acquire mutex");
mutex.acquire(1);
// 获取资源后sleep保持
try {
TimeUnit.SECONDS.sleep(5);
} catch(InterruptedException ignore) {
}
mutex.release(1);
System.out.println("thread1 release mutex");
}).start();
new Thread(() -> {
// 保证线程2在线程1启动后执行
try {
TimeUnit.SECONDS.sleep(1);
} catch(InterruptedException ignore) {
}
// 等待线程1 sleep结束释放资源
mutex.acquire(1);
System.out.println("thread2 acquire mutex");
mutex.release(1);
}).start()
}
}
Пример кода легко передатьAQS
Реализовать операцию взаимного исключения, поток 1 получаетmutex
После этого нить 2-хacquire
Застрял в блокировке, пока поток 1 не освободится. вtryAcquire/acquire/tryRelease/release
изarg
Параметры можно настраивать в соответствии с логикой реализации, особых требований нет.
@param arg the acquire argument. This value is conveyed to {@link #tryAcquire} but is otherwise uninterpreted and can represent anyting you like.
Основная структура AQS
Node
Как упоминалось выше, вAQS
Если потоку не удастся получить ресурсы, он будет упакован как узел и смонтирован вCLH
в очереди,AQS
определено вNode
Классы используются для переноса потоков.
Node
В основном он содержит 5 основных полей:
-
waitStatus
: текущий статус узла, это поле имеет 5 значений:-
CANCELLED = 1
. Состояние потока ссылки на узел, когда время ожидания истекло или было прервано. -
SIGNAL = -1
. Текущее состояние узла, когда необходимо разбудить поток узла-преемника. Когда узел-преемник добавляется в очередь, он приостанавливается.(block)
, его предшествующий узел будет установлен вSIGNAL
Состояние, указывающее, что узел необходимо разбудить. -
CONDITION = -2
. Когда поток узла входитcondition
Состояние очереди. (ВидетьConditionObject
) -
PROPAGATE = -3
. снятие только общих блокировокreleaseShared
При использовании для головного узла. (См. анализ общей блокировки) -
0
. Состояние узла при его инициализации.
-
-
prev
: предшествующий узел. -
next
: Узел-преемник. -
thread
: относится к потоку, головной узел не содержит потока. -
nextWaiter
:condition
Очередь условий. (ВидетьConditionObject
)
Эксклюзивный анализ блокировки
acquire
public final void acquire(int arg) {
// tryAcquire需实现类处理
// 如获取资源成功,直接返回
if (!tryAcquire(arg) &&
// 如获取资源失败,将线程包装为Node添加到队列中阻塞等待
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 如阻塞线程被打断
selfInterrupt();
}
acquire
ядро дляtryAcquire
,addWaiter
иacquireQueued
три функции, гдеtryAcquire
Требуется конкретная реализация класса. всякий раз, когда поток вызываетacquire
позвонит первымtryAcquire
, он будет подключен к очереди только после сбоя, поэтомуacquire
выполнитьПо умолчанию это несправедливая блокировка.
addWaiter
Оберните поток как эксклюзивный узел и добавьте его в очередь вставкой хвоста.Если очередь пуста, будет добавлен пустой головной узел. Стоит отметить, чтоaddWaiter
серединаenq
метод, черезCAS+自旋
способ обработки конфликтов добавления хвостовых узлов.
acquireQueue
После того, как узел потока добавлен в очередь, определяется, может ли он снова попытаться получить ресурс.Если он не может быть получен, его предшествующий узел помечается какSIGNAL
статус (с указанием того, что он должен бытьunpark
проснуться), затем пройтиpark
в состояние блокировки.
Ссылаясь на блок-схему,acquireQueued
Основная логика метода заключается в следующем.for(;;)
иshouldParkAfterFailedAcquire
.tail
Начальное состояние узла по умолчанию — 0. Когда новый узел монтируется в очередь, его предшественником является исходный узел.tail
Состояние узла установлено наSIGNAL
, указывая на то, что узел нужно разбудить, вернутьtrue
был позжеpark
застрял в блоке.for
цикл до тех пор, пока узел-предшественник не будетhead
Затем попытайтесь получить ресурс.
release
release
Процесс относительно прост. После успешного выпуска узел-преемник будет разбужен от головного узла. Если узел-преемник будет отменен, он обратится к хвосту, чтобы найти заблокированный узел, чтобы разбудить его. После пробуждения блокирующего узла он входит вacquireQueued
серединаfor(;;)
Цикл начинает новый виток конкуренции за ресурсы.
Анализ общей блокировки
acquireShared & releaseShared
public final void acquireShared(int arg) {
// 负数表示获取共享锁失败,不同于tryAcquire的bool返回
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
acquireShared
иreleaseShared
Общий процесс аналогичен монопольной блокировке,tryAcquireShared
После неудачиNode.SHARED
Подключение к хвосту блоков очереди до тех пор, пока голова очереди не разбудит его. существуетdoAcquireShared
В отличие от эксклюзивных блокировок, поскольку общие блокировки могут быть получены несколькими потоками, после пробуждения первого блокирующего узла он пройдет.setHeadAndPropagate
Прохождение пробуждает последующие блокирующие узлы.
// doAcquireShared核心代码
final Node node = addWaiter(Node.SHARED);
...
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
// r>=0 表示获取锁成功,调整头结点并传递唤醒
setHeadAndPropagate(node, r);
}
}
...
}
setHeadAndPropagate
иdoReleaseShared
Составить основную логику пробуждения общей блокировки.
Логика этих двух методов относительно проста и не будет расширяться, в основном дляsetheadAndPropagate
Логика оценки пробуждения нескольких узлов выполняет анализ.
ВходитьsetHeadAndPropagate
, первое, что нужно уяснить, это то, что входящие параметры функцииpropagate
Это должно быть неотрицательное число, и тогда его пробуждение в основном состоит из двух логических суждений:
-
если
propagate > 0
, что указывает на то, что можно получить несколько общих блокировок, которые можно выполнить напрямую.doReleaseShared
Разбудите заблокированные узлы. -
если
propagate = 0
, указывающий, что можно разбудить только текущий узел, возможны два случая:-
h == null || h.waitStatus < 0
,как правилоh != null
, теперь даноh.waitStatus < 0
место действия.
-
(h = head) == null || h.waitStatus < 0
Последовательность выполнения сценария следующая:
-
Эксклюзивная блокировка, общая блокировка, сводка
1. По умолчанию эксклюзивные блокировки и общие блокировки являются недобросовестными стратегиями получения и могут быть поставлены в очередь.
2. Только один поток может получить эксклюзивную блокировку, а остальные потоки в очереди заблокированы, общая блокировка может быть получена несколькими потоками.
3. Снятие эксклюзивной блокировки пробуждает только один заблокированный узел, а общая блокировка может разбудить несколько заблокированных узлов одновременно в зависимости от доступного количества.
ConditionObject
AQS
серединаNode
Помимо формирования очереди на блокировку, такжеConditionObject
применяется в,ConditionObject
Основное определение:
public class ConditionObject implements Condition, java.io.Serializable {
...
private transient Node firstWaiter;
private transient Node lastWaiter;
...
}
ConditionObject
пройти черезNode
также представляет собойFIFO
очередь, тоConditionObject
заAQS
Какую функцию он обеспечивает?
public interface Condition {
...
void await() throws InterruptedException;
void signal();
void signalAll();
...
}
ПроверятьCondition
Определение интерфейса, вы можете видеть, что методы, которые он определяет, такие же, как иObject
Категорияwait/notify/notifyAll
Функционал такой же.
существуетСинхронизировано в деталяхУ автора естьObjectMonitor
сделал краткое введение, в том числеObjectMonitor
Включают_WaitSet
и_EntryList
Две очереди, одна для храненияwait调用
иsychronized锁竞争
приостанавливает поток, покаAQS
пройти черезConditionObject
также обеспечиваетwait/notify
механизм блокировки очередей.
ConditionObject
Механизм показан выше, в условной очередиNode
использоватьnextWaiter
Сформируйте односвязный список, когда поток, удерживающий блокировку, инициируетcondition.await
После вызова он будет обернут какNode
подняться наОчередь блокировки условий условия; при соответствииcondition.signal
После срабатывания узлы в очереди условной блокировки будут разбужены и подключены кблокировка очереди блокировкисередина.ConditionObject
Логика очереди такая же, как и вышеacquire/release
Есть сходства и незначительные различия, поэтому я не буду их повторять.