| Красиво, пожалуйста, лайкните, выработайте привычку
У тебя одна мысль, у меня одна мысль, после того как мы обменяемся, у одного человека две мысли
If you can NOT explain it simply, you do NOT understand it well enough
Теперь демо-код и технические статьи отсортированы вместе.Практика на Github, всем удобно читать и просматривать, эта статья тоже попала сюда, я думаю это хорошо, пожалуйста тоже поставьте звездочку
написать впереди
Выйдя на стадию исходного кода, я написал более десятка статейПараллельная серияПредчувствие знания, наконец, пригодится. Я считаю, что многие люди забыли некоторые теоретические знания. Не волнуйтесь, я приведу соответствующие теоретические знания в ссылку на исходный код, чтобы помочь вам вспомнить, чтобы объединить теорию и практику. Кроме того, это супер длинная картинка и текст.Рекомендую собрать.Если она вам полезна ставьте лайк и пусть ее увидит больше людей
Почему Java SDK должен проектировать Lock?
Я когда-то представлял себе, что если управление параллелизмом в Java только синхронизировано, то есть только три способа его использования, которые просты и удобны
public class ThreeSync {
private static final Object object = new Object();
public synchronized void normalSyncMethod(){
//临界区
}
public static synchronized void staticSyncMethod(){
//临界区
}
public void syncBlockMethod(){
synchronized (object){
//临界区
}
}
}
Если это было правдой до Java 1.5, мастер Дуг Ли переделал блокировку колеса, начиная с версии 1.5.
Мы часто говорим: «Избегайте повторного создания колес».Если у вас есть колесо, вам все равно приходится настаивать на воссоздании колеса, тогда очевидно, что традиционное колесо не может хорошо решить проблему в некоторых сценариях применения.
Я не знаю, помнишь ли ты ещеКоффман резюмирует четыре ситуации, в которых может возникнуть взаимоблокировка., где [неделимое условие] означает:
Поток получил ресурс и не может быть лишен, пока он не будет израсходован, и может быть освобожден только тогда, когда он будет израсходован.
Чтобы нарушить это условие,Для этого требуется возможность высвобождать существующие ресурсы, не запрашивая дополнительные ресурсы.
Очевидно, что эта возможность недоступна в синхронизированном. При использовании синхронизированного, если поток не может подать заявку на ресурсы, он войдет в состояние блокировки, и мы не можем ничего сделать, чтобы изменить его состояние. Это ахиллесова пята синхронизированного колеса, которое сильно дает причину для повторного изобретения блокировки колес
Явная блокировка Блокировка
У старых колес есть недостатки, а новые колеса должны решить эти проблемы, поэтому они должны иметь функции, которые не будут блокироваться.Следующие три решения являются хорошими способами решения этой проблемы (см. таблицу ниже, и вы поймете смысл три решения)
характеристика | описывать | API |
---|---|---|
Может реагировать на прерывания | Также приятно иметь возможность реагировать на прерывания, если он не может освободиться.Механизм многопоточных прерываний JavaКонкретно описывает процесс прерывания, цель которого состоит в том, чтобы выйти из определенного состояния с помощью сигнала прерывания, такого как блокировка. | lockInterruptbly() |
получение неблокирующей блокировки | Попробуйте получить его, если вы не можете его получить, он не будет блокироваться и возвращаться напрямую | tryLock() |
время ожидания поддержки | Учитывая ограничение по времени, если оно не будет получено в течение определенного периода времени, оно не перейдет в состояние блокировки, а также вернется напрямую. | tryLock(long time, timeUnit) |
Есть хорошее решение, но вы не можете иметь оба. У Lock больше возможностей, которых нет у synchronized. Естественно, он не будет путешествовать по миру с одним ключевым словом и тремя геймплеями, как синхронизированный, и его относительно сложно использовать.
Парадигма использования блокировки
Существует стандартное использование синхронизированного, и у нас должна быть такая прекрасная традиция Lock.Я думаю, что многие люди знают, как использовать Lock.Парадигма
Lock lock = new ReentrantLock();
lock.lock();
try{
...
}finally{
lock.unlock();
}
Поскольку это парадигма (не оспаривайте ту, которая меняет способ письма), должна быть причина, давайте посмотрим
Критерий 1 — окончательное освобождение замков
Это должен понимать каждый.наконец-томожет быть освобожден
Критерий 2 — получение блокировки вне try{}
Не знаю, задумывались ли вы когда-нибудь о том, почему существует стандарт 2. Обычно мы «как» пытаемся жить во всем, опасаясь, что исключение не удастся поймать.
существуетtry{}
Внешнее приобретение замков в основном учитывает два аспекта:
- Если исключение генерируется без получения блокировки, должна возникнуть проблема с окончательным освобождением блокировки, потому что как вы можете снять блокировку до того, как получите блокировку?
- Если при получении блокировки возникает исключение, то есть текущий поток не получает блокировку, но когда выполняется код finally, если другой поток получает блокировку, она будет освобождена (освобождена без причины).
Разные блокировки реализованы немного по-разному.Существование парадигмы заключается в том, чтобы избежать всех проблем, поэтому все стараются соблюдать парадигму.
Как Lock работает как замок?
Если вы знакомы с синхронизацией, то знаете, что после компиляции программы в инструкции процессора в критической секции будетmoniterenter
а такжеmoniterexit
Внешний вид инструкции можно понимать как идентификацию входа и выхода из критической секции.
С точки зрения парадигмы:
-
lock.lock()
Получите блокировку, «эквивалентную» инструкции синхронизированного мониторинга. -
lock.unlock()
Снимите блокировку, «эквивалентную» инструкции синхронизированного мониторинга выхода.
Так как же Лок это делает?
Вот краткое объяснение, чтобы когда дело доходит до анализа исходного кода, вы могли посмотреть на схему дизайна издалека и детали реализации с близкого расстояния, это станет проще.
На самом деле все очень просто: например, внутри ReentrantLock поддерживается volatile измененное состояние переменной, которое читается и записывается через CAS (нижний слой все равно передается аппаратному обеспечению для обеспечения атомарности и видимости). успешно, блокировка получена, и поток входит в блок кода попытки, чтобы продолжить выполнение; если изменение не будет успешным, поток будет [приостановлен] и не будет выполняться вниз
Но Lock — это интерфейс, и в нём вообще нет переменной состояния:
Как он справляется с этим состоянием? Очевидно, что требуется небольшое дополнение к дизайну: интерфейс определяет поведение, и необходимо реализовать конкретный класс.
Классы реализации интерфейса Lock в основном дополняют управление доступом к потоку, [агрегируя] подкласс [Queue Synchronizer].
Так что же такое синхронизатор очереди? (Это должна быть самая сильная вечеринка с заголовками, которую вы когда-либо видели. Потребовалось полвека, чтобы добраться до темы, и я оставил сообщение в области комментариев, чтобы отругать меня)
Синхронизатор очереди AQS
Queue Synchronizer (AbstractQueuedSynchronizer), также известный как Synchronizer или AQS, — наш сегодняшний герой.
В: Почему вы начинаете с AQS при анализе исходного кода JUC?
Ответ: смотрите картинку ниже
Я думаю, вы кое-что поймете, когда увидите этот скриншот, вы его слышали, что часто спрашивают на собеседованиях и что обычно используют в работе.
ReentrantLock
ReentrantReadWriteLock
Semaphore(信号量)
CountDownLatch
公平锁
非公平锁
-
ThreadPoolExecutor
(Для понимания пулов потоков вы можете просмотретьЗачем использовать пулы потоков? )
Все они имеют непосредственное отношение к AQS, поэтому разберитесь в абстрактной реализации AQS, и на этой основе немного проверьте детали реализации вышеперечисленных типов, и вы сможете быстро все это сделать, чтобы не запутаться при просмотре исходный код и потерять основную строку
Как упоминалось выше, синхронизатор будет агрегирован в классе реализации блокировки, а затем синхронизатор будет использоваться для реализации семантики блокировки, тогда проблема:
Зачем использовать режим агрегации и как лучше понять взаимосвязь между блокировками и синхронизаторами?
Большинство из нас используют блокировки.После внедрения блокировок ядро должно быть простым в использовании.
С точки зрения имени класса и оформления AQS, это абстрактный класс, поэтому с точки зрения режима разработки синхронизатор должен быть разработан на основе [режим шаблона].Пользователи должны наследовать синхронизатор и реализовать пользовательский синхронизатор. И переопределить указанный метод, затем объединить синхронизатор в настраиваемый компонент синхронизации и вызвать методы шаблона синхронизатора, и эти методы шаблона вызывают методы, переопределенные пользователем
Я не хочу делать приведенное выше объяснение таким абстрактным, на самом деле, чтобы понять приведенное выше предложение, нам достаточно знать следующие два вопроса.
- Каковы переопределяемые методы пользовательского синхронизатора?
- Какие шаблонные методы предоставляет абстрактный синхронизатор?
Переопределяемые методы синхронизатора
Синхронизатор предоставляет всего 5 переопределяемых методов, что значительно облегчает пользователю блокировки:
Само собой разумеется, что методы, которые необходимо переопределить, также должны быть украшены абстрактными, почему бы и нет? Причина на самом деле очень проста.Вышеуказанные методы были разделены на две категории по цвету:
独占式
共享式
Невозможно, чтобы пользовательский компонент синхронизации или блокировка были одновременно эксклюзивными и общими.Во избежание принудительного переписывания нерелевантных методов нет абстракции для декорирования, но должно быть выброшено исключение, информирующее о том, что метод нельзя использовать напрямую:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
Теплый и интимный (если у вас похожие потребности, вы можете следовать этому дизайну)
что сказано в описании метода таблицы同步状态
Это состояние с изменчивой модификацией, упомянутое выше, поэтому мы находимся в重写
При использовании вышеуказанных методов состояние синхронизации должно быть получено или изменено с помощью следующих трех методов, предоставляемых синхронизатором (предоставляемых AQS):
И разница между эксклюзивными и общими переменными состояния операции очень проста
так ты видишьReentrantLock
ReentrantReadWriteLock
Semaphore(信号量)
CountDownLatch
На самом деле эти классы лишь немного отличаются реализацией вышеуказанных методов.Другие реализации реализованы через шаблонный метод синхронизатора.Вы чувствуете себя здесь намного спокойнее? Давайте посмотрим на метод шаблона:
Метод шаблона, предоставляемый синхронизатором
Выше мы разделили методы реализации синхронизаторов на два типа: эксклюзивные и разделяемые.Шаблонный метод фактически предоставляет два вышеуказанных типа шаблонных методов, но есть и другие.响应中断
а также超时限制
Используется шаблонный метод для Lock, давайте посмотрим
Не запоминайте сначала функции вышеуказанных методов, вам нужно понять только общие функции в настоящее время. Кроме того, я думаю, вы также заметили:
Вышеупомянутые методы украшены ключевым словом final, указывающим, что подклассы не могут переопределить этот метод.
Увидев это, вы можете немного растеряться, давайте немного подытожим:
Программисты по-прежнему смотрят на код и чувствуют себя более непринужденно.Давайте воспользуемся кодом, чтобы объяснить приведенную выше взаимосвязь (обратите внимание на комментарии в коде, следующий код не очень строг, просто чтобы кратко объяснить реализацию кода на рисунке выше):
package top.dayarch.myjuc;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* 自定义互斥锁
*
* @author tanrgyb
* @date 2020/5/23 9:33 PM
*/
public class MyMutex implements Lock {
// 静态内部类-自定义同步器
private static class MySync extends AbstractQueuedSynchronizer{
@Override
protected boolean tryAcquire(int arg) {
// 调用AQS提供的方法,通过CAS保证原子性
if (compareAndSetState(0, arg)){
// 我们实现的是互斥锁,所以标记获取到同步状态(更新state成功)的线程,
// 主要为了判断是否可重入(一会儿会说明)
setExclusiveOwnerThread(Thread.currentThread());
//获取同步状态成功,返回 true
return true;
}
// 获取同步状态失败,返回 false
return false;
}
@Override
protected boolean tryRelease(int arg) {
// 未拥有锁却让释放,会抛出IMSE
if (getState() == 0){
throw new IllegalMonitorStateException();
}
// 可以释放,清空排它线程标记
setExclusiveOwnerThread(null);
// 设置同步状态为0,表示释放锁
setState(0);
return true;
}
// 是否独占式持有
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 后续会用到,主要用于等待/通知机制,每个condition都有一个与之对应的条件等待队列,在锁模型中说明过
Condition newCondition() {
return new ConditionObject();
}
}
// 聚合自定义同步器
private final MySync sync = new MySync();
@Override
public void lock() {
// 阻塞式的获取锁,调用同步器模版方法独占式,获取同步状态
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
// 调用同步器模版方法可中断式获取同步状态
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
// 调用自己重写的方法,非阻塞式的获取同步状态
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
// 调用同步器模版方法,可响应中断和超时时间限制
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
// 释放锁
sync.release(1);
}
@Override
public Condition newCondition() {
// 使用自定义的条件
return sync.newCondition();
}
}
Если вы откроете IDE сейчас, вы найдете вышеупомянутыйReentrantLock
ReentrantReadWriteLock
Semaphore(信号量)
CountDownLatch
Все они реализованы в соответствии с этой структурой, поэтому давайте посмотрим, как шаблонный метод AQS реализует блокировку.
Анализ реализации AQS
Из приведенного выше кода вы должны понятьlock.tryLock()
Получение неблокирующей блокировки переписано путем вызова пользовательского синхронизатора.tryAcquire()
метод, установите состояние состояния через CAS, и он немедленно вернется, независимо от того, успешно это или нет, тогда как реализована блокирующая блокировка, такая как lock.lock()?
Если есть блокировка, вам нужно стоять в очереди, а чтобы добиться очереди, вы должны стоять в очереди.
CLH: Очередь Крейга, Лэндина и Хагерстена — это односвязный список Очередь в AQS — это виртуальная двусторонняя очередь (FIFO) варианта CLH — просто поймите концепцию, не запоминайте
Каждый поставленный в очередь человек в очереди является узлом, поэтому давайте посмотрим на структуру узла.
Узел узла
AQS поддерживает внутреннюю очередь синхронизации для управления состоянием синхронизации.
- Когда потоку не удается получить состояние синхронизации, такая информация, как текущий поток и состояние ожидания, будет встроена в узел узла и добавлена в конец очереди синхронизации, чтобы заблокировать поток.
- Когда состояние синхронизации освобождается, оно пробуждает поток «первого узла» в очереди синхронизации, чтобы получить состояние синхронизации.
Чтобы прояснить описанные выше шаги, нам нужно взглянуть на структуру узла (было бы здорово, если бы вы могли открыть IDE и вместе посмотреть на нее).
На первый взгляд это выглядит немного беспорядочно, поэтому давайте классифицируем его и объясним:
Приведенных выше описаний состояний достаточно, чтобы составить впечатление.С описанием структуры Node вы также можете представить себе структуру подключения очереди синхронизации:
Предварительные знания в основном завершены, давайте посмотрим на весь процесс эксклюзивного получения статуса синхронизации.
Эксклюзивный доступ к статусу синхронизации
История начинается с парадигмы lock.lock().
public void lock() {
// 阻塞式的获取锁,调用同步器模版方法,获取同步状态
sync.acquire(1);
}
Введите метод шаблона Acquis() AQS.
public final void acquire(int arg) {
// 调用自定义同步器重写的 tryAcquire 方法
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
Во-первых, также предпринимается попытка неблокирующего состояния синхронизации выборки, и если выборка не удалась (tryAcquire возвращает false), она вызоветaddWaiter
Метод создает узел Node (исключая Node.EXCLUSIVE) и безопасно добавляет его в очередь синхронизации (CAS) [tail]
private Node addWaiter(Node mode) {
// 构造Node节点,包含当前线程信息以及节点模式【独占/共享】
Node node = new Node(Thread.currentThread(), mode);
// 新建变量 pred 将指针指向tail指向的节点
Node pred = tail;
// 如果尾节点不为空
if (pred != null) {
// 新加入的节点前驱节点指向尾节点
node.prev = pred;
// 因为如果多个线程同时获取同步状态失败都会执行这段代码
// 所以,通过 CAS 方式确保安全的设置当前节点为最新的尾节点
if (compareAndSetTail(pred, node)) {
// 曾经的尾节点的后继节点指向当前节点
pred.next = node;
// 返回新构建的节点
return node;
}
}
// 尾节点为空,说明当前节点是第一个被加入到同步队列中的节点
// 需要一个入队操作
enq(node);
return node;
}
private Node enq(final Node node) {
// 通过“死循环”确保节点被正确添加,最终将其设置为尾节点之后才会返回,这里使用 CAS 的理由和上面一样
for (;;) {
Node t = tail;
// 第一次循环,如果尾节点为 null
if (t == null) { // Must initialize
// 构建一个哨兵节点,并将头部指针指向它
if (compareAndSetHead(new Node()))
// 尾部指针同样指向哨兵节点
tail = head;
} else {
// 第二次循环,将新节点的前驱节点指向t
node.prev = t;
// 将新节点加入到队列尾节点
if (compareAndSetTail(t, node)) {
// 前驱节点的后继节点指向当前新节点,完成双向队列
t.next = node;
return t;
}
}
}
}
Вы можете быть сбиты с толку методом обработки enq(), вход в этот метод представляет собой «бесконечный цикл», мы будем использовать диаграмму, чтобы описать, как он выходит из цикла.
У некоторых учащихся могут возникнуть вопросы, почему существует дозорный узел?
Дозорные, как следует из названия, используются для решения пограничных вопросов между странами и непосредственно в производственной деятельности не участвуют. Точно так же упомянутый в информатике часовой используется и для решения краевой задачи.Если границы нет, если границы нет, то на границе может возникнуть исключение по тому же алгоритму.
acquireQueued()
метод
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// "死循环",尝试获取锁,或者挂起
for (;;) {
// 获取当前节点的前驱节点
final Node p = node.predecessor();
// 只有当前节点的前驱节点是头节点,才会尝试获取锁
// 看到这你应该理解添加哨兵节点的含义了吧
if (p == head && tryAcquire(arg)) {
// 获取同步状态成功,将自己设置为头
setHead(node);
// 将哨兵节点的后继节点置为空,方便GC
p.next = null; // help GC
failed = false;
// 返回中断标识
return interrupted;
}
// 当前节点的前驱节点不是头节点
//【或者】当前节点的前驱节点是头节点但获取同步状态失败
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
Понятно, что статус синхронизации будет возвращен успешно, но в случае неудачи она всегда попадет в «бесконечный цикл» и потратит ресурсы? Очевидно нет,shouldParkAfterFailedAcquire(p, node)
а такжеparkAndCheckInterrupt()
Поток, которому не удастся получить состояние синхронизации, будет приостановлен, и мы продолжим поиск.
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前驱节点的状态
int ws = pred.waitStatus;
// 如果是 SIGNAL 状态,即等待被占用的资源释放,直接返回 true
// 准备继续调用 parkAndCheckInterrupt 方法
if (ws == Node.SIGNAL)
return true;
// ws 大于0说明是CANCELLED状态,
if (ws > 0) {
// 循环判断前驱节点的前驱节点是否也为CANCELLED状态,忽略该状态的节点,重新连接队列
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 将当前节点的前驱节点设置为设置为 SIGNAL 状态,用于后续唤醒操作
// 程序第一次执行到这返回为false,还会进行外层第二次循环,最终从代码第7行返回
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
В этот момент у вас может возникнуть вопрос:
Каков эффект установки узла-предшественника в состояние SIGNAL в этом месте?
Держите этот вопрос, мы раскроем его один за другим
Если waitStatus узла-предшественника находится в состоянии SIGNAL, то есть метод shouldParkAfterFailedAcquire вернет true, и программа продолжит выполнение внизparkAndCheckInterrupt
способ приостановить текущий поток
private final boolean parkAndCheckInterrupt() {
// 线程挂起,程序不会继续向下执行
LockSupport.park(this);
// 根据 park 方法 API描述,程序在下述三种情况会继续向下执行
// 1. 被 unpark
// 2. 被中断(interrupt)
// 3. 其他不合逻辑的返回才会继续向下执行
// 因上述三种情况程序执行至此,返回当前线程的中断状态,并清空中断状态
// 如果由于被中断,该方法会返回 true
return Thread.interrupted();
}
Пробужденная программа будет продолжать выполнятьсяacquireQueued
Цикл в методе, если статус синхронизации получен успешно, он вернетinterrupted = true
результат
Программа продолжает возвращаться к верхнему уровню стека вызовов и, наконец, возвращается к шаблонному методу AQS.acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
У вас могут возникнуть сомнения:
Программа успешно приобрела состояние синхронизации и вернулась, как может быть самопрерывание?
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
Если вы не можете понять прерывание, настоятельно рекомендуется оглянуться назадМеханизм многопоточных прерываний Java
Здесь у нас пропущена строчка про получение состояния синхронизации, блок finally из acceptQueued, если присмотреться, сразу могут возникнуть сомнения:
При каких обстоятельствах будет выполняться код if(failed)?
if (failed)
cancelAcquire(node);
Условием для выполнения этого кода является то, что failed равно true. При нормальных обстоятельствах, если код выходит из цикла, значение failed равно false. Если код не может выйти из цикла, кажется, что здесь он не может быть выполнен То есть произойдет исключение, и оно будет выполнено здесь
Глядя на блок try, только два метода генерируют исключения:
-
node.processor()
метод -
переписал сам
tryAcquire()
метод
Посмотрите на прежнее сначала:
Очевидно, что выброшенное здесь исключение не имеет значения, поэтому в качестве примера возьмем метод tryAcquire(), переопределяемый ReentrantLock.
Кроме того, приведенный выше анализshouldParkAfterFailedAcquire
Метод также оценивает состояние ОТМЕНЕНО, затем
Когда будет сгенерирован узел с отмененным состоянием?
ответcancelAcquire
В методе давайте посмотрим, как cancelAcquire устанавливает/обрабатывает CANNELLED
private void cancelAcquire(Node node) {
// 忽略无效节点
if (node == null)
return;
// 将关联的线程信息清空
node.thread = null;
// 跳过同样是取消状态的前驱节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 跳出上面循环后找到前驱有效节点,并获取该有效节点的后继节点
Node predNext = pred.next;
// 将当前节点的状态置为 CANCELLED
node.waitStatus = Node.CANCELLED;
// 如果当前节点处在尾节点,直接从队列中删除自己就好
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 1. 如果当前节点的有效前驱节点不是头节点,也就是说当前节点不是头节点的后继节点
if (pred != head &&
// 2. 判断当前节点有效前驱节点的状态是否为 SIGNAL
((ws = pred.waitStatus) == Node.SIGNAL ||
// 3. 如果不是,尝试将前驱节点的状态置为 SIGNAL
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
// 判断当前节点有效前驱节点的线程信息是否为空
pred.thread != null) {
// 上述条件满足
Node next = node.next;
// 将当前节点有效前驱节点的后继节点指针指向当前节点的后继节点
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 如果当前节点的前驱节点是头节点,或者上述其他条件不满足,就唤醒当前节点的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
Вы можете быть немного сбиты с толку, когда увидите этот комментарий. Основная цель — удалить узел CANCELED из очереди ожидания и повторно сплайсировать всю очередь. Таким образом, есть только три случая для установки состояния узла CANCELED. Проанализируем его, нарисовав картинку. :
На этом процесс получения статуса синхронизации закончен, мы просто используем блок-схему, чтобы объяснить весь процесс.
Это конец процесса получения замка. Сделайте паузу на несколько минут, чтобы разобраться в своих мыслях. Мы не объяснили функцию СИГНАЛА выше.Какова цель сигнала состояния СИГНАЛА? Это включает в себя выпуск замков.Давайте продолжим понимать, что общая идея такая же, как и в приобретении замков, но процесс выпуска относительно прост.
Эксклюзивное состояние синхронизации выпуска
История начинается с метода unlock()
public void unlock() {
// 释放锁
sync.release(1);
}
Вызовите выпуск метода шаблона AQS, чтобы ввести метод
public final boolean release(int arg) {
// 调用自定义同步器重写的 tryRelease 方法尝试释放同步状态
if (tryRelease(arg)) {
// 释放成功,获取头节点
Node h = head;
// 存在头节点,并且waitStatus不是初始状态
// 通过获取的过程我们已经分析了,在获取的过程中会将 waitStatus的值从初始状态更新成 SIGNAL 状态
if (h != null && h.waitStatus != 0)
// 解除线程挂起状态
unparkSuccessor(h);
return true;
}
return false;
}
Посмотрите на метод unparkSuccessor, который на самом деле должен разбудить узел-преемник головного узла.
private void unparkSuccessor(Node node) {
// 获取头节点的waitStatus
int ws = node.waitStatus;
if (ws < 0)
// 清空头节点的waitStatus值,即置为0
compareAndSetWaitStatus(node, ws, 0);
// 获取头节点的后继节点
Node s = node.next;
// 判断当前节点的后继节点是否是取消状态,如果是,需要移除,重新连接队列
if (s == null || s.waitStatus > 0) {
s = null;
// 从尾节点向前查找,找到队列第一个waitStatus状态小于0的节点
for (Node t = tail; t != null && t != node; t = t.prev)
// 如果是独占式,这里小于0,其实就是 SIGNAL
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 解除线程挂起状态
LockSupport.unpark(s.thread);
}
У некоторых учащихся могут возникнуть вопросы:
Почему это место ожидает с конца очереди узлы, которые не ОТМЕНЕНЫ?
Есть две причины:
Во-первых, давайте вернемся к ситуации, когда узел присоединяется к очереди:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
Постановка узла в очередь не является атомарной операцией, строки 6 и 7 кода
node.prev = pred;
compareAndSetTail(pred, node)
Эти два места можно рассматривать как атомарную операцию хвостового узла, входящего в очередь.Если код не был выполнен до pred.next = node, в это время происходит выполнение метода unparkSuccessor, и нет возможности найдите его спереди назад, потому что преемник указатели еще не подключены, поэтому вам нужно смотреть назад
Вторая причина заключается в том, что когда на приведенной выше диаграмме создается узел состояния CANCELED, указатель Next отключается первым, а указатель Prev не отключается.Поэтому также необходимо пройти от конца к началу, чтобы иметь возможность пройти все узлы.
На данный момент состояние синхронизации было успешно освобождено, и поток, который ранее был приостановлен от получения состояния синхронизации, будет разбужен и продолжит возвращаться к выполнению со строки 3 следующего кода:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
Продолжайте возвращаться к верхнему стеку вызовов, начните выполнение со строки 15 следующего кода, повторно выполните цикл и снова попытайтесь получить состояние синхронизации.
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
На данный момент процесс эксклюзивного получения/снятия блокировок закрыт, но два других шаблонных метода AQS не введены.
响应中断
超时限制
Эксклюзивное подтверждение прерываний для статуса синхронизации
История начинается с метода lock.lockInterruptably().
public void lockInterruptibly() throws InterruptedException {
// 调用同步器模版方法可中断式获取同步状态
sync.acquireInterruptibly(1);
}
При предыдущем понимании с первого взгляда действительно ясно понять, как получить состояние синхронизации эксклюзивного прерывания:
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 尝试非阻塞式获取同步状态失败,如果没有获取到同步状态,执行代码7行
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
продолжить просмотрdoAcquireInterruptibly
метод:
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 获取中断信号后,不再返回 interrupted = true 的值,而是直接抛出 InterruptedException
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
Неожиданно внутри JDK есть такой похожий код, нет ничего эзотерического в получении блокировки в ответ на прерывание, то есть при прерывании прерывания выбрасывается исключение InterruptedException (строка 17 кода), поэтому он возвращает к верхнему стеку вызовов слой за слоем, чтобы перехватить исключение и перейти к следующему шагу.
Куй железо, пока горячо, давайте рассмотрим еще один шаблонный метод:
Эксклюзивное ограничение времени ожидания для получения статуса синхронизации
Это легко понять, то есть, учитывая ограничение по времени, если состояние синхронизации получено в течение этого периода времени, оно вернет true, иначе — false. Например, поток поставил себе будильник, когда прозвенит будильник, поток вернется сам, что не сделает его заблокированным.
Поскольку задействован предел времени ожидания, основная логика должна заключаться в вычислении интервала времени, потому что в течение периода ожидания должно быть несколько попыток получить блокировку, и каждое получение блокировки должно занимать время, поэтому логика вычисления интервал времени, как будто мы находимся в программе.
nanosTimeout = deadline - System.nanoTime()
история изlock.tryLock(time, unit)
метод
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
// 调用同步器模版方法,可响应中断和超时时间限制
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
Посмотрите на метод tryAcquireNanos
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
Это то же самое, что и вышеacquireInterruptibly
Метод выглядит очень подробно, продолжайте смотреть на метод doAcquireNanos, посмотрите на программу, этот метод также выдает InterruptedException, мы говорили в статье о прерывании, тег метода имеетthrows InterruptedException
Это показывает, что этот метод также может реагировать на прерывания, поэтому вы можете понять, что ограничение времени ожидания равноacquireInterruptibly
Усовершенствованная версия метода с двойной страховкой тайм-аута и неблокирующим контролем
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 超时时间内,为获取到同步状态,直接返回false
if (nanosTimeout <= 0L)
return false;
// 计算超时截止时间
final long deadline = System.nanoTime() + nanosTimeout;
// 以独占方式加入到同步队列中
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 计算新的超时时间
nanosTimeout = deadline - System.nanoTime();
// 如果超时,直接返回 false
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
// 判断是最新超时时间是否大于阈值 1000
nanosTimeout > spinForTimeoutThreshold)
// 挂起线程 nanosTimeout 长时间,时间到,自动返回
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
Приведенный выше метод не должен быть сложным для понимания, но учащиеся могут запутаться в строке 27.
Почему nanosTimeout и порог тайм-аута вращения 1000 сравниваются?
/**
* The number of nanoseconds for which it is faster to spin
* rather than to use timed park. A rough estimate suffices
* to improve responsiveness with very short timeouts.
*/
static final long spinForTimeoutThreshold = 1000L;
На самом деле в доке это очень четко сказано.Грубо говоря, время в 1000 наносекунд очень и очень мало.Не нужно выполнять операции приостановки и пробуждения.Лучше напрямую входить в следующий цикл с текущий поток.
На данный момент нашему пользовательскому MyMutex не хватает только Condition, и я не знаю, устали ли вы? я все еще держусь
Condition
Если вы читаете то, что было написано ранееОжидание механизма уведомления о параллельном программировании, вы должны быть впечатлены следующей картиной:
Если бы вы разобрались с моделью в то время, а потом посмотрели на реализацию Condition, то это вообще не было бы проблемой.Во-первых, Condition все-таки интерфейс, и у него должен быть класс реализации.
История начинается сlock.newnewCondition
давай поговорим
public Condition newCondition() {
// 使用自定义的条件
return sync.newCondition();
}
Пользовательский синхронизатор переупаковывает этот метод:
Condition newCondition() {
return new ConditionObject();
}
ConditionObject — это класс реализации Condition, который определен в AQS и имеет только две переменные-члены:
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
Итак, нам просто нужно взглянуть на метод await/signal, реализованный ConditionObject, чтобы использовать эти две переменные-члены.
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 同样构建 Node 节点,并加入到等待队列中
Node node = addConditionWaiter();
// 释放同步状态
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 挂起当前线程
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
Обратите внимание на формулировку здесь: при введении статуса синхронизации addWaiter добавляется в [очередь синхронизации], которая представляет собой очередь ожидания входа, упомянутую на рисунке выше, здесь это [очередь ожидания], поэтому addConditionWaiter должен был построить очередь собственного:
private Node addConditionWaiter() {
Node t = lastWaiter;
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 新构建的节点的 waitStatus 是 CONDITION,注意不是 0 或 SIGNAL 了
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 构建单向同步队列
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
У некоторых друзей здесь могут быть вопросы:
Почему здесь односторонняя очередь, а CAS не используется для обеспечения безопасности присоединения к очереди?
Поскольку в парадигме Lock используется await, это означает, что блокировка получена, поэтому нет необходимости использовать CAS, а односторонний, потому что здесь нет конкуренции за блокировки, это просто условное ожидание. очередь.
В блокировке можно определить несколько условий, и каждое условие будет соответствовать очереди ожидания условий, поэтому подробное объяснение приведенного выше рисунка выглядит следующим образом:
Поток был добавлен в условную очередь ожидания в соответствии с соответствующими условиями, так как же попытаться снова получить блокировку? Методы signal / signalAll уже используются
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
Метод Signal пробуждает только первый узел в очереди ожидания условия, вызывая метод doSignal.
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 调用该方法,将条件等待队列的线程节点移动到同步队列中
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
Продолжай читатьtransferForSignal
метод
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 重新进行入队操作
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 唤醒同步队列中该线程
LockSupport.unpark(node.thread);
return true;
}
Итак, давайте проиллюстрируем весь процесс пробуждения снова
На данный момент очень просто понять signalAll, просто цикл, чтобы определить, есть ли nextWaiter, если есть операция сигнала, переместите его из очереди ожидания условия в очередь синхронизации
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
Я не знаю, помнишь ли ты еще, я былОжидание механизма уведомления о параллельном программированиитакже сказал слово
Попробуйте использовать метод signalAll без особых причин.
Когда вы можете использовать метод сигнала, также объясняется, пожалуйста, проверьте это самостоятельно.
Здесь я хочу сказать еще одну деталь: существует разница во времени перехода из условной очереди ожидания в синхронную очередь, поэтому использование метода await() — это тоже парадигма, которая также объясняется в этой статье.
Если есть разница во времени, будут проблемы справедливости и несправедливости.Чтобы полностью понять эту проблему, мы должны подойти к ReentrantLock, чтобы увидеть.Помимо понимания проблемы справедливости/несправедливости, просмотр приложения ReentrantLock должен быть обратным, чтобы проверить его использование AQS, давайте продолжим
Как ReentrantLock работает с AQS
Типичным применением эксклюзива является ReentrantLock, давайте посмотрим, как он переписывает этот метод.
На первый взгляд кажется странным, почему в нем настроены три синхронизатора: На самом деле NonfairSync, FairSync просто еще больше делит Sync:
Вы должны знать и по имени, это то, что вы слышали公平锁/非公平锁
охватывать
Что такое честная блокировка/нечестная блокировка?
В жизни очередь считается справедливой. Честность в программе также соответствует абсолютному времени блокировки запроса, которое фактически является FIFO, иначе это будет считаться несправедливым.
Давайте сравним, как ReentrantLock реализует честные и нечестные блокировки.
На самом деле это не имеет большого значения.Честная блокировка предназначена для определения того, есть ли еще узел-первопроходец в очереди на синхронизацию.Блокировка может быть получена только в том случае, если узла-первопроходца нет;несправедливая блокировка не заботится об этом, и он может получить состояние синхронизации Это так просто, вот в чем проблема.
Почему существует схема честной/несправедливой блокировки?
Учитывая эту проблему, нужно вспомнить приведенную выше схему реализации захвата блокировки, по сути, я уже раскрыл один момент выше.
Есть две основные причины:
Причина первая:
Между возобновлением приостановленного потока и получением реальной блокировки по-прежнему существует разница во времени.С точки зрения человека это время очень мало, но с точки зрения процессора эта разница во времени все еще очевидна. Следовательно, несправедливые блокировки могут полностью использовать кванты времени ЦП и минимизировать время простоя ЦП.
Причина вторая:
Я не знаю, помнишь ли ты меня ещеВ интервью спросили, сколько потоков уместно создать?Как неоднократно упоминалось в статье, важным фактором при использовании многопоточности являются накладные расходы на переключение потоков.Представьте себе, что используется несправедливая блокировка, когда поток запрашивает блокировку для получения состояния синхронизации, а затем освобождает состояние синхронизации, потому что существует нет необходимости учитывать, есть ли еще узлы-предшественники, поэтому вероятность того, что поток, который только что снял блокировку, снова получит состояние синхронизации в этот момент, становится очень большим, поэтому накладные расходы потока уменьшаются
Думаю, вы поймете, почему конструктор ReentrantLock по умолчанию использует синхронизатор недобросовестной блокировки.
public ReentrantLock() {
sync = new NonfairSync();
}
Увидев это, я чувствую, что замок несправедлив
Что плохого в использовании честных замков?
Справедливые блокировки обеспечивают справедливость постановки в очередь, недобросовестные блокировки властно игнорируют это правило, поэтому это может привести к долгому стоянию в очереди и невозможности получить блокировку."голод"
Как выбрать честный замок/нечестный замок?
Я полагаю, что к этому моменту ответ уже у вас в голове.Если речь идет о более высокой пропускной способности, очевидно, что несправедливые блокировки более уместны, потому что это экономит много времени на переключение потоков, и пропускная способность, естественно, будет расти.В противном случае , используйте честные блокировки, честные для всех
Мы все еще в одном последнем шаге, действительно держись
повторная блокировка
До сих пор мы не анализировали название ReentrantLock, название JDK такое особое, и оно должно иметь свое значение.
Зачем поддерживать повторный вход в блокировку?
Только представьте, если это метод рекурсивного вызова с синхронизированной модификацией, разве не будет большой шуткой, что программа блокируется сама по себе при повторном входе, так что синхронизированный поддерживает повторный вход в блокировку
Блокировка - это новое колесо. Естественно, она также поддерживает эту функцию. Ее реализация также очень проста. Пожалуйста, проверьте сравнительную таблицу честной блокировки и нечестной блокировки. Там есть кусок кода:
// 判断当前线程是否和已占用锁的线程是同一个
else if (current == getExclusiveOwnerThread())
Посмотрите внимательно на код, вы можете обнаружить, что одна из моих предыдущих инструкций неверна, я должен объяснить это заново
Реентерабельный поток всегда будет добавлять состояние +1, а снятие блокировки будет состоянием -1 до тех пор, пока оно не станет равным 0. Выше написанное также поможет вам быстро отличить
Суммировать
Эта статья представляет собой длинную статью, объясняющую, почему нам нужно создать новое колесо блокировки, как использовать блокировку стандартным способом, что такое AQS и как реализовать блокировку.В сочетании с ReentrantLock некоторые приложения в AQS и некоторые его уникальные черты перевернуты.
Эксклюзивное приобретение блокировок было введено таким образом, нам все еще не хватает общих AQSxxxShared
Без разбора в сочетании с шарингом давайте читать Semaphore, ReentrantReadWriteLock и CountLatch и т.д.
Наконец, комментарии всех также приветствуются.Если есть какие-либо ошибки, пожалуйста, укажите. Руки болят и глаза сухие, иду готовиться к следующей статье...
вопрос души
-
Почему есть два способа изменить состояние, setState() и compareAndSetState() , последний безопаснее, но setState() используется во многих местах в поле зрения замка, безопасно ли это?
-
Следующий код представляет собой программу переноса. Есть ли взаимоблокировка или другие проблемы с блокировкой?
class Account { private int balance; private final Lock lock = new ReentrantLock(); // 转账 void transfer(Account tar, int amt){ while (true) { if(this.lock.tryLock()) { try { if (tar.lock.tryLock()) { try { this.balance -= amt; tar.balance += amt; } finally { tar.lock.unlock(); } }//if } finally { this.lock.unlock(); } }//if }//while }//transfer }
Ссылаться на
- Параллелизм Java в действии
- Искусство параллельного программирования на Java
- https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html
Личный блог: https://dayarch.top
добавь меня в чат друга, Войдите в группу для развлечения, обучения и обмена и отметьте «войти в группу».
Добро пожаловать, чтобы продолжать обращать внимание на общественный номер: «Сун Гун И Бин».
- Передовая технология Java для обмена галантереей
- Резюме эффективных инструментов | Ответ на «Инструменты»
- Анализ вопроса интервью и ответ
- Сбор технических данных | Ответ на «данные»
Узнайте о стеке технологий Java легко и весело, думая о чтении детективных романов, и постепенно разлагайте технические проблемы, основываясь на принципах упрощения сложных проблем, конкретизации абстрактных проблем и графики.Технология постоянно обновляется, пожалуйста, продолжайте платить внимание...