Пакет jdk JUC (java.util.concurrent) предоставляет большое количество параллельных инструментов Java для использования. Он в основном написан Дугом Ли. Многие места заслуживают изучения и изучения, и это единственный способ для расширенных обновлений.
Эта статья начинается с часто используемых блокировок объектов в пакете JUC, использования инструментов параллелизма и их функциональных характеристик, с проблем, от мелких до глубоких, и шаг за шагом анализирует конкретную реализацию базового абстрактного класса AQS.
Глоссарий
1 AQS
AQS — это абстрактный класс, полный путь к классу java.util.concurrent.locks.AbstractQueuedSynchronizer, абстрактный синхронизатор очереди, — это абстрактный класс инструментов параллелизма, разработанных на основе шаблонного режима.На основе AQS реализованы следующие параллельные классы:
2 CAS
CAS — это аббревиатура от Compare And Swap (сравнение и обмен), которая представляет собой инструкцию атомарной операции.
Механизм CAS использует три основных операнда: адрес памяти addr, ожидаемое старое значение oldVal и новое значение newVal, которое необходимо изменить. При обновлении переменной, только когда ожидаемое значение переменной oldVal и фактическое значение в адресе памяти совпадают, значение, соответствующее адресу памяти в адресе, будет изменено на newVal
Основываясь на идее оптимистической блокировки, значение переменной можно потокобезопасно обновлять через CAS, а затем продолжать пробовать и сравнивать.
3 прерывание потока
Прерывание потока — это механизм взаимодействия потоков для взаимодействия с другими потоками для прерывания выполнения задач.
Когда поток находится в состоянии ожидания блокировки, например, после вызова методов wait(), join(), sleep() и после вызова метода прерывания() потока, поток немедленно выйдет из состояния блокировки и получит прерываемое исключение;
Когда поток запущен и вызывается метод прерывания() потока, поток не прерывает выполнение немедленно.Необходимо обнаружить флаг прерывания потока, вызвав метод isInterrupted() в конкретной логике выполнения задачи потока. поток, а затем активно реагировать на прерывание.Обычно выбрасывает InterruptedException
Функция блокировки объекта
Далее сначала вводятся основные функции блокировок объектов и инструментов параллелизма, а затем постепенно расширяется, как реализовать эти функции.
1 Получите это явно
Взяв в качестве примера блокировку ReentrantLock, в основном поддерживаются следующие четыре метода для явного получения блокировки.
- (1) Блокировка и ожидание получения
ReentrantLock lock = new ReentrantLock();
// 一直阻塞等待,直到获取成功
lock.lock();
- (2) Неблокирующая попытка получения
ReentrantLock lock = new ReentrantLock();
// 尝试获取锁,如果锁已被其他线程占用,则不阻塞等待直接返回false
// 返回true - 锁是空闲的且被本线程获取,或者已经被本线程持有
// 返回false - 获取锁失败
boolean isGetLock = lock.tryLock();
- (3) Блокировка и ожидание получения в течение заданного времени
ReentrantLock lock = new ReentrantLock();
try {
// 尝试在指定时间内获取锁
// 返回true - 锁是空闲的且被本线程获取,或者已经被本线程持有
// 返回false - 指定时间内未获取到锁
lock.tryLock(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// 内部调用isInterrupted() 方法检测线程中断标志位,主动响应中断
e.printStackTrace();
}
- (4) Реакция на получение прерывания
ReentrantLock lock = new ReentrantLock();
try {
// 响应中断获取锁
// 如果调用线程的thread.interrupt()方法设置线程中断,线程退出阻塞等待并抛出中断异常
lock.lockInterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
}
2 Явный выпуск
ReentrantLock lock = new ReentrantLock();
lock.lock();
// ... 各种业务操作
// 显式释放锁
lock.unlock();
3 повторный вход
Поток, получивший блокировку, может напрямую получить блокировку, запросив ее снова.
4
Относится к одному и тому же ресурсу, который позволяет совместно использовать несколько потоков, например, блокировка чтения блокировки чтения-записи позволяет совместно использовать несколько потоков, а общая блокировка позволяет нескольким потокам одновременно и безопасно получать доступ к данным, повышая эффективность выполнения программы.
5 Справедливо и несправедливо
Справедливые блокировки: несколько потоков конкурируют за блокировки в порядке очереди. Перед добавлением каждой блокировки он проверяет, есть ли потоки в очереди ожидания, и если потока нет, он пытается получить блокировку. Несправедливые блокировки: когда поток получает блокировку несправедливым образом, поток сначала попытается получить блокировку, а не будет ждать. Если приобретение не удалось, оно войдет в очередь ожидания
Поскольку метод недобросовестной блокировки может привести к тому, что более поздний поток будет иметь определенную вероятность напрямую получить блокировку, уменьшая вероятность зависания и ожидания потока, производительность выше, чем у справедливой блокировки.
Принцип реализации СКС
1 Основные понятия
(1) Состояние интерфейса
Подобно методам wait(), wait(long timeout), notify() и notifyAll(), в сочетании с синхронизированной встроенной блокировкой можно реализовать режим ожидания/уведомления. Интерфейс блокировки также имеет аналогичные функции:
Интерфейс Condition определяет await(), awaitNanos(long), signal(), signalAll() и другие методы и взаимодействует с экземпляром блокировки объекта для реализации функции ожидания/уведомления Принцип заключается в реализации интерфейса Condition на основе Внутренний класс AQS ConditionObject, а поток блокируется и входит в очередь ожидания CLH (упомянутую ниже), ожидая пробуждения других потоков после вызова метода сигнала
(2) очередь CLH
Очередь CLH, CLH — это аббревиатура имени создателя алгоритма Craig, Landin, Hagersten.
AQS внутренне поддерживает двунаправленную очередь FIFO CLH. AQS использует ее для управления ожидающими потоками. Если потоку не удается получить ресурс соревнования синхронизации, он блокирует поток и добавляет его в очередь синхронизации CLH; когда ресурс соревнования простаивает , он будет основан на CLH.Queue блокирует потоки и распределяет ресурсы
Головной узел CLH сохраняет поток, занимающий в данный момент ресурсы, или информация о потоке отсутствует, другие узлы сохраняют информацию о потоке очереди.
Статус (waitStatus) каждого узла в CLH следующий:
- CANCELLED(1): Указывает, что текущий узел не был запланирован. При тайм-ауте или прерывании (в случае реагирования на прерывание) будет срабатывать переход в это состояние, и узел после входа в это состояние больше не будет меняться
- SIGNAL(-1): Указывает, что узел-преемник ожидает пробуждения текущего узла. Прежде чем узел-преемник перейдет в состояние сна после входа в очередь, состояние узла-предшественника будет обновлено до SIGNAL.
- CONDITION(-2): Указывает, что узел ожидает выполнения условия. Когда другие потоки вызывают метод signal() условия, узел в состоянии УСЛОВИЯ будет переведен из очереди ожидания в очередь синхронизации, ожидая получения блокировки синхронизации.
- PROPAGATE(-3): в совместно используемом режиме узел-предшественник не только разбудит свои узлы-преемники, но также может разбудить узлы-преемники.
- 0: состояние по умолчанию, когда новый узел ставится в очередь.
(3) Метод совместного использования ресурсов
AQS определяет два метода совместного использования ресурсов: Эксклюзивный Эксклюзивный, может выполняться только один поток, например ReentrantLock Общий доступ, несколько потоков могут выполняться одновременно, например Semaphore/CountDownLatch
(4) Способы блокировки/пробуждения потоков
AQS блокирует поток на основе метода парковки, предоставляемого классом sun.misc.Unsafe, метод unpark пробуждает поток, а поток, заблокированный методом park, может ответить на запрос прерывания (interrupt()) для выхода из режима блокировки.
2 Базовый дизайн
Основная идея дизайна: AQS предоставляет платформу для реализации блокировок блокировки и связанных параллельных синхронизаторов, которые полагаются на очереди CLH.Подкласс реализует метод защиты, который определяет, могут ли ресурсы быть получены/освобождены.AQS реализует стратегию планирования потоков для постановки в очередь и пробуждения потоков на основе этих методов защиты.
AQS также предоставляет переменную типа int, которая поддерживает потокобезопасное атомарное обновление в качестве значения состояния синхронизации (состояния).Подклассы могут гибко определять значение переменной для обновления в соответствии с фактическими потребностями.
Ряд методов защиты, переопределяемых подклассами, выглядит следующим образом:
- boolean tryAcquire(int) Исключительно попытаться получить ресурсы, вернуть true в случае успеха, false в случае неудачи
- boolean tryRelease(int) Исключительно попытаться освободить ресурсы, вернуть true в случае успеха, false в случае неудачи
- int tryAcquireShared(int) Общий метод пытается получить ресурс. Отрицательное число означает сбой; 0 означает успех, но доступных ресурсов нет; положительное число означает успех и оставшиеся ресурсы.
- boolean tryReleaseShared(int) Общий метод пытается освободить ресурс.Если выпуску разрешено разбудить следующий ожидающий узел, он возвращает true, в противном случае он возвращает false
Эти методы всегда вызываются потоками, которые требуют координации планирования, и подклассы должны переопределять эти методы неблокирующим образом.
Основываясь на приведенных выше методах tryXXX, AQS предоставляет следующие методы для получения/высвобождения ресурсов:
- void Acquire(int) Ресурс получен исключительно, и поток возвращается напрямую, в противном случае он входит в очередь ожидания до тех пор, пока ресурс не будет получен, и весь процесс игнорирует влияние прерывания.
- boolean release(int) В эксклюзивном режиме поток освобождает ресурсы, сначала освобождает указанное количество ресурсов, если он полностью освобожден (т. е. состояние = 0), он разбудит другие потоки в очереди ожидания для получения ресурсов.
- voidacquireShared(int) Получить ресурсы в общем режиме
- boolean releaseShared(int) Общий способ освобождения ресурсов
Возьмем эксклюзивный режим в качестве примера: основная реализация получения/высвобождения ресурсов выглядит следующим образом:
Acquire:
while (!tryAcquire(arg)) {
如果线程尚未排队,则将其加入队列;
}
Release:
if (tryRelease(arg))
唤醒CLH中第一个排队线程
На данный момент это немного сбивает с толку.Следующая картинка перерисовывает идеи дизайна, представленные выше:
Реализация функции
Далее представлен принцип реализации ряда функциональных возможностей блокировок объектов и инструментов параллелизма на основе AQS.
1 Получите это явно
Эта функция по-прежнему использует блокировку ReentrantLock в качестве примера. ReentrantLock — это блокировка объекта с повторным входом. Каждый раз, когда поток запрашивает успешную блокировку, состояние значения состояния синхронизации увеличивается на 1, состояние освобождения блокировки уменьшается на 1, а состояние 0 означает, что ни один поток не удерживает блокировку.
Блокировки ReentrantLock поддерживают честные/несправедливые функции. Следующие явные функции получения берут честные блокировки в качестве примера.
(1) Блокировка и ожидание получения
Базовая реализация выглядит следующим образом:
- 1. ReentrantLock реализует метод tryAcquire(int) AQS. Сначала судите: если ни один поток не удерживает блокировку или текущий поток уже не удерживает блокировку, вернуть true, иначе вернуть false
- 2. Метод Acquire(int) AQS определяет, является ли текущий узел головным и может ли быть получен ресурс на основе tryAcquire(int).Если он не может быть получен, присоединяйте очередь CLH к очереди и блокируйте ожидание.
- 3. Метод lock() ReentrantLock основан на методе Acquire(int) AQS и блоках, ожидающих получения блокировки.
Реализован метод tryAcquire(int) в ReentrantLock:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 没有任何线程持有锁
if (c == 0) {
// 通过CLH队列的head判断没有别的线程在比当前更早acquires
// 且基于CAS设置state成功(期望的state旧值为0)
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// 设置持有锁的线程为当前线程
setExclusiveOwnerThread(current);
return true;
}
}
// 持有锁的线程为当前线程
else if (current == getExclusiveOwnerThread()) {
// 仅仅在当前线程,单线程,不用基于CAS更新
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 其他线程已经持有锁
return false;
}
Реализация метода Acquisition (int) AQS
public final void acquire(int arg) {
// tryAcquire检查释放能获取成功
// addWaiter 构建CLH的节点对象并入队
// acquireQueued线程阻塞等待
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// acquireQueued返回true,代表线程在获取资源的过程中被中断
// 则调用该方法将线程中断标志位设置为true
selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
// 标记是否成功拿到资源
boolean failed = true;
try {
// 标记等待过程中是否被中断过
boolean interrupted = false;
// 循环直到资源释放
for (;;) {
// 拿到前驱节点
final Node p = node.predecessor();
// 如果前驱是head,即本节点是第二个节点,才有资格去尝试获取资源
// 可能是head释放完资源唤醒本节点,也可能被interrupt()
if (p == head && tryAcquire(arg)) {
// 成功获取资源
setHead(node);
// help GC
p.next = null;
failed = false;
return interrupted;
}
// 需要排队阻塞等待
// 如果在过程中线程中断,不响应中断
// 且继续排队获取资源,设置interrupted变量为true
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
(2) Неблокирующая попытка получения
Реализация tryLock() в ReentrantLock — это всего лишь реализация недобросовестной блокировки.Логика реализации в основном такая же, как и в tryAcquire, за исключением того, что hasQueuedPredecessors() не используется для проверки того, есть ли другие потоки, ожидающие в голове очереди CLH, поэтому что когда ресурс освобождается, есть запрос потока. Ресурс может перейти в очередь, чтобы получить приоритет
Конкретная реализация tryLock() в ReentrantLock выглядит следующим образом:
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 没有任何线程持有锁
if (c == 0) {
// 基于CAS设置state成功(期望的state旧值为0)
// 没有检查CLH队列中是否有线程在等待
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 持有锁的线程为当前线程
else if (current == getExclusiveOwnerThread()) {
// 仅仅在当前线程,单线程,不用基于CAS更新
int nextc = c + acquires;
if (nextc < 0) // overflow,整数溢出
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 其他线程已经持有锁
return false;
}
(3) Блокировка и ожидание получения в течение заданного времени
Базовая реализация выглядит следующим образом:
- 1. TryLock(long, TimeUnit) ReentrantLock вызывает метод tryAcquireNanos(int, long) AQS.
- 2. Функция tryAcquireNanos AQS сначала вызывает функцию tryAcquire(int), чтобы попытаться получить ее, а затем вызывает метод doAcquireNanos(int, long), если она не может быть получена.
- 3. DoAcquireNanos AQS определяет, является ли текущий узел головным и можно ли получить ресурс на основе tryAcquire(int).Если он не может быть получен и время ожидания превышает 1 микросекунду, попытайтесь получить его после ожидания в течение периода времени.
Реализация в ReentrantLock выглядит следующим образом:
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 如果线程已经被interrupt()方法设置中断
if (Thread.interrupted())
throw new InterruptedException();
// 先tryAcquire尝试获取锁
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
Реализация в AQS выглядит следующим образом:
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
// 获取到资源的截止时间
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
// 标记是否成功拿到资源
boolean failed = true;
try {
for (;;) {
// 拿到前驱节点
final Node p = node.predecessor();
// 如果前驱是head,即本节点是第二个节点,才有资格去尝试获取资源
// 可能是head释放完资源唤醒本节点,也可能被interrupt()
if (p == head && tryAcquire(arg)) {
// 成功获取资源
setHead(node);
// help GC
p.next = null;
failed = false;
return true;
}
// 更新剩余超时时间
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
// 排队是否需要排队阻塞等待
// 且超时时间大于1微秒,则线程休眠到超时时间到了再尝试获取
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 如果线程已经被interrupt()方法设置中断
// 则不再排队,直接退出
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
(4) Реакция на получение прерывания
ReentrantLock реагирует на прерывание и получает блокировку следующим образом: когда поток просыпается в ответ на прерывание метода thead.interrupt() в режиме сна, он проверяет, что флаг прерывания потока равен true, и активно генерирует исключение. Основной реализацией является метод doAcquireInterruptably(int) в AQS.
Базовая реализация аналогична блокировке в ожидании получения, за исключением того, что метод Acquire(int) AQS вызывается вместо метода doAcquireInterruptably(int) AQS.
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
// 标记是否成功拿到资源
boolean failed = true;
try {
for (;;) {
// 拿到前驱节点
final Node p = node.predecessor();
// 如果前驱是head,即本节点是第二个节点,才有资格去尝试获取资源
// 可能是head释放完资源唤醒本节点,也可能被interrupt()
if (p == head && tryAcquire(arg)) {
// 成功获取资源
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
// 需要排队阻塞等待
if (shouldParkAfterFailedAcquire(p, node) &&
// 从排队阻塞中唤醒,如果检查到中断标志位为true
parkAndCheckInterrupt())
// 主动响应中断
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2 Явный выпуск
Методы совместного использования ресурсов AQS делятся на эксклюзивный тип и общий тип. Здесь мы возьмем ReentrantLock в качестве примера, чтобы представить явное освобождение эксклюзивных ресурсов. Общий тип будет представлен позже.
Подобно явному захвату, базовая реализация явного освобождения ReentrantLock выглядит следующим образом:
- 1. ReentrantLock реализует метод tryRelease(int) AQS. Метод уменьшает переменную состояния на 1. Если состояние становится равным 0, это означает, что ни один поток не удерживает блокировку, и возвращает значение true, в противном случае возвращает значение false.
- 2. Метод release(int) AQS основан на tryRelease(int) для проверки наличия каких-либо потоков, удерживающих ресурсы в очереди, и, если нет, пробуждения потока головного узла в очереди CLH.
- 3. Пробуждённый поток продолжает выполнять логику for(;;) вAcquireQueued(Node, int) или doAcquireNanos(int, long) или doAcquireInterruptably(int) и продолжает попытки получить ресурсы.
Метод tryRelease(int) в ReentrantLock реализован следующим образом:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 只有持有锁的线程才有资格释放锁
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 标识是否没有任何线程持有锁
boolean free = false;
// 没有任何线程持有锁
// 可重入锁每lock一次都需要对应一次unlock
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
Метод release(int) в AQS реализован следующим образом:
public final boolean release(int arg) {
// 尝试释放资源
if (tryRelease(arg)) {
Node h = head;
// 头节点不为空
// 后继节点入队后进入休眠状态之前,会将前驱节点的状态更新为SIGNAL(-1)
// 头节点状态为0,代表没有后继的等待节点
if (h != null && h.waitStatus != 0)
// 唤醒第二个节点
// 头节点是占用资源的线程,第二个节点才是首个等待资源的线程
unparkSuccessor(h);
return true;
}
return false;
}
3 повторный вход
Реализация повторного входа относительно проста.В качестве примера возьмем ReentrantLock, он в основном реализован в методе tryAcquire(int).Является ли поток, удерживающий блокировку, текущим потоком, если это так, обновите состояние значения состояния синхронизации и верните true, что означает, что он может получить Lock
4
Общий ресурс использует ReentrantReadWriteLock в качестве примера. Основное отличие от монопольной блокировки ReentrantLock заключается в том, что нескольким потокам разрешено совместно использовать блокировку чтения при ее получении. Когда блокировка записи снята, несколько потоков, заблокированных в ожидании блокировки чтения, могут ее получить. в то же время.
В классе ReentrantReadWriteLock значение состояния синхронизации состояния AQS определяется как: старшие 16 бит — это количество блокировок чтения, а нижние 16 бит — блокировки записи.
Логика, реализованная с помощью tryAcquireShared(int) и tryReleaseShared(int) в ReentrantReadWriteLock, относительно длинная, в основном включает взаимное исключение чтения и записи, реентерабельное суждение и уступку блокировки чтения блокировке записи. Из-за нехватки места я не буду ее здесь раскрывать. .
Основная реализация получения блокировки чтения (ReadLock.lock()) выглядит следующим образом.:
- 1. ReentrantReadWriteLock реализует метод tryAcquireShared(int) AQS, чтобы определить, может ли текущий поток получить блокировку чтения.
- 2. AcquireShared(int) AQS сначала пытается получить ресурс на основе tryAcquireShared(int).Если получение не удается, он присоединяется к очереди CLH в очередь и блокирует ожидание.
- 3. Метод ReadLock.lock() ReentrantReadWriteLock основан на методе AcquireShared(int) AQS и блокирует ожидание получения блокировки.
Конкретная реализация получения ресурсов в совместно используемом режиме в AQS выглядит следующим образом:
public final void acquireShared(int arg) {
// tryAcquireShared返回负数代表获取共享资源失败
// 则通过进入等待队列,直到获取到资源为止才返回
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
// 与前面介绍到的acquireQueued逻辑基本一致
// 不同的是将tryAcquire改为tryAcquireShared
// 还有资源获取成功后将传播给CLH队列上等待该资源的节点
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
// 标记是否成功拿到资源
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
// 资源获取成功
if (r >= 0) {
// 传播给CLH队列上等待该资源的节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 需要排队阻塞等待
// 如果在过程中线程中断,不响应中断
// 且继续排队获取资源,设置interrupted变量为true
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 资源传播给CLH队列上等待该资源的节点
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
// 释放共享资源
doReleaseShared();
}
}
Основная реализация снятия блокировки чтения (ReadLock.unlock()) выглядит следующим образом.: Основная реализация освобождения общих ресурсов в ReentrantReadWriteLock выглядит следующим образом:
- 1. ReentrantReadWriteLock реализует метод tryReleaseShared(int) AQS, чтобы определить, есть ли еще потоки, удерживающие блокировку чтения после снятия блокировки чтения.
- 2. ReleaseShared(int) AQS определяет, нужен ли спящий поток в очереди CLH, на основе tryReleaseShared(int) и при необходимости выполняет doReleaseShared().
- 3. Метод ReadLock.unlock() ReentrantReadWriteLock снимает блокировку на основе метода releaseShared(int) AQS.
Конкретная реализация освобождения ресурсов в совместно используемом режиме в AQS выглядит следующим образом:
public final boolean releaseShared(int arg) {
// 允许唤醒CLH中的休眠线程
if (tryReleaseShared(arg)) {
// 执行资源释放
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 当前节点正在等待资源
if (ws == Node.SIGNAL) {
// 当前节点被其他线程唤醒了
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
// 进入else的条件是,当前节点刚刚成为头节点
// 尾节点刚刚加入CLH队列,还没在休眠前将前驱节点状态改为SIGNAL
// CAS失败是尾节点已经在休眠前将前驱节点状态改为SIGNAL
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
// 每次唤醒后驱节点后,线程进入doAcquireShared方法,然后更新head
// 如果h变量在本轮循环中没有被改变,说明head == tail,队列中节点全部被唤醒
if (h == head)
break;
}
}
5 Справедливо и несправедливо
Эта функция относительно проста в реализации.В качестве примера возьмем блокировку ReentrantLock, честная блокировка получает ресурсы непосредственно на основе Acquire(int) AQS, в то время как нечестная блокировка сначала пытается сократить очередь: на основе CAS, ожидаемого состояния значение переменной синхронизации равно 0 (ни один поток не удерживает блокировку) ), обновляется до 1 и ставится в очередь, если все обновления CAS завершаются неудачно.
// 公平锁实现
final void lock() {
acquire(1);
}
// 非公平锁实现
final void lock() {
// 第1次CAS
// state值为0代表没有任何线程持有锁,直接插队获得锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
// 在nonfairTryAcquire方法中再次CAS尝试获取锁
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 第2次CAS尝试获取锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 已经获得锁
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
Суммировать
Значение переменной состояния AQS не обязательно представляет ресурсы.Разные классы наследования AQS могут иметь разные определения значения переменной состояния.
Например, в классе countDownLatch значение переменной состояния представляет количество защелок, которые необходимо открыть (что можно понимать как количество защелок, которые необходимо открыть).Каждая защелка должна быть открыта до того, как дверь может быть открыт, и все ожидающие потоки начнут выполняться.Каждый раз countDown() будет уменьшать переменную состояния на 1, и если переменная состояния уменьшается до 0, пробуждает спящий поток в очереди CLH.
Для изучения аналогичного базового исходного кода рекомендуется задать несколько вопросов и изучить их; перед общим изучением рекомендуется понять общую конструкцию и общий принцип (сначала можно прочитать соответствующие документы), а затем изучить детали исходного кода, чтобы не попасть в исходный код в начале. , легко ошибиться