Весь исходный код этого блога взят из JDK 1.8.
Автор: Brother Daming Оригинальный адрес: http://cmsblogs.com
Чем больше основной предмет, тем больше вам придется читать его повторно.Эта статья длинная.Я надеюсь, что вы прочитаете ее внимательно и несколько раз перечитаете туда-сюда, чтобы понять ее.
Введение в AQS
Встроенная блокировка Java всегда вызывала споры. До JDK 1.6 производительность синхронизированной, тяжеловесной блокировки, всегда была относительно низкой. Некоторые недостатки: хотя синхронизация обеспечивает удобный неявный механизм получения и освобождения блокировки (на основе механизма JVM), ему не хватает работоспособности для получения и снятия блокировок. этот метод значительно сокращается в сценариях с высоким параллелизмом.
Перед введением Lock нам нужно ознакомиться с очень важным компонентом, после освоения которого многие проблемы с пакетом JUC перестают быть проблемой. Этот компонент — AQS.
AQS: AbstractQueuedSynchronizer, синхронизатор очереди. Это базовая структура для создания блокировок или других компонентов синхронизации (таких как ReentrantLock, ReentrantReadWriteLock, Semaphore и т. д.), а также автор параллельного пакета JUC (Doug Lea) ожидают, что это будет основой для большинства потребностей синхронизации. Это основной базовый компонент параллельного пакета JUC.
AQS решает множество деталей, связанных с реализацией синхронизаторов, таких как получение состояния синхронизации и очереди синхронизации FIFO. Создание синхронизатора на основе AQS может принести много преимуществ. Это не только значительно сокращает усилия по внедрению, но и избавляет от конфликтов, происходящих в нескольких местах.
В синхронизаторе, построенном на основе AQS, блокировка может происходить только в один момент, тем самым снижая накладные расходы на переключение контекста и улучшая пропускную способность. В то же время масштабируемая строка полностью учитывается при проектировании AQS, поэтому все синхронизаторы, построенные на AQS в J.U.C, могут получить это преимущество.
Основное применение AQS — наследование, а подклассы управляют состоянием синхронизации, наследуя синхронизатор и реализуя его абстрактные методы.
AQS использует переменную-член типа int для представления состояния синхронизации. Когда состояние > 0, это означает, что блокировка была получена, а когда состояние = 0, это означает, что блокировка снята. Он предоставляет три метода (getState(), setState(int newState), compareAndSetState(int expect, int update)) для работы с состоянием состояния синхронизации. Разумеется, AQS может гарантировать, что работа с состоянием безопасна.
AQS завершает работу по размещению в очереди потоков получения ресурсов через встроенную очередь синхронизации FIFO. Если текущему потоку не удается получить состояние синхронизации (блокировку), AQS создаст текущий поток, состояние ожидания и другую информацию в узле (узле). и присоединиться к нему Очередь синхронизации одновременно заблокирует текущий поток.Когда состояние синхронизации освобождается, поток в узле будет разбужен, чтобы попытаться снова получить состояние синхронизации.
AQS в основном предоставляет следующие методы:
-
getState(): возвращает текущее значение состояния синхронизации;
-
setState(int newState): установить текущее состояние синхронизации;
-
compareAndSetState(int expect, int update): используйте CAS для установки текущего состояния, этот метод может обеспечить атомарность настроек состояния;
-
tryAcquire(int arg): получить исключительно состояние синхронизации.После успешного получения состояния синхронизации другим потокам необходимо дождаться, пока поток освободит состояние синхронизации, чтобы получить состояние синхронизации.
-
tryRelease(int arg): состояние синхронизации эксклюзивного выпуска;
-
tryAcquireShared(int arg): общий статус синхронизации сбора данных, возвращаемое значение больше или равно 0, это означает, что сбор данных прошел успешно, в противном случае получение данных завершается неудачно;
-
tryReleaseShared(int arg): Общее состояние синхронизации выпуска;
-
isHeldExclusively(): занят ли текущий синхронизатор потоком в эксклюзивном режиме, обычно этот метод указывает, занят ли он исключительно текущим потоком;
-
Acquire(int arg): исключительно получает состояние синхронизации. Если текущий поток успешно получает состояние синхронизации, он будет возвращен этим методом. В противном случае он войдет в очередь синхронизации и будет ждать. Этот метод вызовет переопределяемый метод tryAcquire(int arg) метод. ;
-
AcquireInterruptily(int arg): то же, что иAcquire(int arg), но этот метод реагирует на прерывание, и текущий поток входит в очередь синхронизации, чтобы получить состояние синхронизации.Если текущий поток прерывается, этот метод выдает InterruptedException и возвращает значение ;
-
tryAcquireNanos(int arg, long nanos): состояние синхронизации приобретается с течением времени.Если текущий поток не получает состояние синхронизации в течение наносекунд, он возвращает false, а если оно было получено, возвращает true;
-
AcquireShared(int arg): Общее состояние синхронизации получения. Если текущий поток не получает состояние синхронизации, он войдет в очередь синхронизации для ожидания. Основное отличие от монопольного типа заключается в том, что несколько потоков могут получить состояние синхронизации в то же время;
-
AcquireSharedInterruptably(int arg): общее состояние синхронизации сбора данных, отвечающее на прерывание;
-
tryAcquireSharedNanos(int arg, long nanosTimeout): общее состояние синхронизации сбора данных, увеличение лимита времени ожидания;
-
release(int arg): эксклюзивно освобождает состояние синхронизации, этот метод разбудит поток, содержащийся в первом узле в очереди синхронизации, после освобождения состояния синхронизации;
-
releaseShared(int arg): общее состояние синхронизации выпуска;
очередь синхронизации CLH
Очередь синхронизации CLH представляет собой двустороннюю очередь FIFO. AQS полагается на нее для завершения управления состоянием синхронизации. Если текущему потоку не удается получить состояние синхронизации, AQS создаст узел (узел) и добавит его к информации Например, состояние ожидания текущего потока. В очереди синхронизации CLH текущий поток будет одновременно заблокирован. снова получить состояние синхронизации.
В очереди синхронизации CLH узел представляет поток, который сохраняет ссылку на поток (поток), состояние (waitStatus), предшествующий узел (предыдущий) и последующий узел (следующий), которые определяются следующим образом:
static final class Node { /** 共享 */ static final Node SHARED = new Node(); /** 独占 */ static final Node EXCLUSIVE = null; /** * 因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态; */ static final int CANCELLED = 1; /** * 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行 */ static final int SIGNAL = -1; /** * 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()后,改节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中 */ static final int CONDITION = -2; /** * 表示下一次共享式同步状态获取将会无条件地传播下去 */ static final int PROPAGATE = -3; /** 等待状态 */ volatile int waitStatus; /** 前驱节点 */ volatile Node prev; /** 后继节点 */ volatile Node next; /** 获取同步状态的线程 */ volatile Thread thread; Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { } Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; }}
скопировать код
Структурная схема очереди синхронизации CLH выглядит следующим образом:
зачислен
Для тех из нас, кто изучил структуру данных, вход в очередь CLH не может быть проще, Это не что иное, как то, что хвост указывает на новый узел, предыдущий узел нового узла указывает на текущий последний узел, а следующий из текущий последний узел указывает на текущий узел. Код мы можем посмотреть на метод addWaiter(Node node):
private Node addWaiter(Node mode) { //新建Node Node node = new Node(Thread.currentThread(), mode); //快速尝试添加尾节点 Node pred = tail; if (pred != null) { node.prev = pred; //CAS设置尾节点 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //多次尝试 enq(node); return node; }
скопировать код
addWaiter(узел узла) сначала пытается быстро установить хвостовой узел, если это не удается, вызовите метод enq(узел узла), чтобы установить хвостовой узел
private Node enq(final Node node) { //多次尝试,直到成功为止 for (;;) { Node t = tail; //tail不存在,设置为首节点 if (t == null) { if (compareAndSetHead(new Node())) tail = head; } else { //设置为尾节点 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
скопировать код
В приведенном выше коде оба метода устанавливают хвостовой узел с помощью метода CAS compareAndSetTail(Node expect, Node update), который гарантирует, что узел добавляется потокобезопасно. В методе enq(Node node) AQS обеспечивает корректное добавление узлов с помощью «бесконечного цикла». Только после успешного добавления текущий поток вернется из этого метода, в противном случае он продолжит выполнение.
Схема процесса выглядит следующим образом:
исключать из очереди
Очередь синхронизации CLH следует FIFO.После того, как поток первого узла освободит состояние синхронизации, он разбудит свой узел-преемник (следующий), и узел-преемник установит себя в качестве первого узла, когда состояние синхронизации будет успешно получено.Это процесс очень прост, голова выполняет Узел может быть отключен от следующего исходного узла и предыдущего текущего узла.Обратите внимание, что CAS не требуется в этом процессе, потому что только один поток может успешно получить состояние синхронизации.
Схема процесса выглядит следующим образом:
Получение и освобождение состояния синхронизации
Как упоминалось ранее, AQS — это основа для создания компонентов синхронизации Java, и мы ожидаем, что она станет основой для большинства нужд синхронизации. Шаблон метода шаблона, принятый шаблоном проектирования AQS, подкласс реализует свой абстрактный метод для управления состоянием синхронизации посредством наследования. Для подкласса у него не так много работы. AQS предоставляет большое количество методов шаблона для синхронизации. в основном делятся на три категории: монопольное получение и освобождение состояния синхронизации, совместное получение и освобождение состояния синхронизации и запрос ожидающих потоков в очереди синхронизации. Пользовательские подклассы могут реализовывать собственную семантику синхронизации с помощью шаблонных методов, предоставляемых AQS.
Эксклюзивный
Эксклюзивный, только один поток одновременно поддерживает состояние синхронизации.
Эксклюзивное получение состояния синхронизацииМетод Acquire(int arg) — это шаблонный метод, предоставляемый AQS. Этот метод получает исключительно состояние синхронизации, но этот метод не чувствителен к прерываниям. То есть, поскольку потоку не удается получить состояние синхронизации, он добавляется в очередь синхронизации CLH.При прерывании операции поток не удаляется из очереди синхронизации. код показывает, как показано ниже:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
скопировать код
Каждый метод определяется следующим образом:
-
tryAcquire: чтобы попытаться получить блокировку, если она прошла успешно, установите состояние блокировки и верните значение true, в противном случае верните значение false. Этот метод реализуется самим настраиваемым компонентом синхронизации и должен обеспечивать потокобезопасный доступ к состоянию синхронизации.
-
addWaiter: если tryAcquire возвращает FALSE (не удалось получить статус синхронизации), вызовите этот метод, чтобы добавить текущий поток в конец очереди синхронизации CLH.
-
AcquireQueued: Текущий поток будет блокироваться и ожидать (прокручиваться) в соответствии с принципом справедливости, пока не будет получена блокировка, и возвращать информацию о том, был ли текущий поток прерван во время процесса ожидания.
-
selfInterrupt: Генерирует прерывание.
МетодAcquireQueued — это процесс вращения, то есть после того, как текущий поток (узел) входит в очередь синхронизации, он войдет в процесс вращения.Каждый узел будет наблюдать интроспективно.Когда условия выполнены и состояние синхронизации получено, Вы можете выйти из этого процесса вращения, в противном случае он будет продолжать выполняться.
следующее:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { //中断标志 boolean interrupted = false; /* * 自旋过程,其实就是一个死循环而已 */ for (;;) { //当前线程的前驱节点 final Node p = node.predecessor(); //当前线程的前驱节点是头结点,且同步状态成功 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //获取失败,线程等待--具体后面介绍 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
скопировать код
Как видно из приведенного выше кода, текущий поток всегда будет пытаться получить состояние синхронизации.Конечно, предполагается, что только его предшествующий узел является головным узлом, который может попытаться получить состояние синхронизации.Причины:
-
Сохраняйте принцип синхронной очереди FIFO.
-
После того, как головной узел выйдет из состояния синхронизации, он разбудит свой узел-преемник.После пробуждения узла-преемника ему необходимо проверить, является ли он головным узлом.
Блок-схема метода Acquisition(int arg) выглядит следующим образом:
Эксклюзивное прерывание получения ответаAQS предоставляет метод access(int arg) для эксклюзивного получения состояния синхронизации, но этот метод не реагирует на прерывания.После того как поток будет прерван, поток останется в очереди синхронизации CLH, ожидая получения состояния синхронизации. Чтобы отреагировать на прерывание, AQS предоставляет методAcquireInterruptably(int arg).Этот метод немедленно отреагирует на прерывание и создаст InterruptedException, если текущий поток будет прерван во время ожидания получения состояния синхронизации.
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
скопировать код
Сначала проверьте, был ли поток прерван, если да, сгенерируйте InterruptedException, в противном случае выполните метод tryAcquire(int arg) для получения состояния синхронизации, если получение прошло успешно, вернитесь напрямую, в противном случае выполните doAcquireInterruptently(int arg). doAcquireInterruptably(int arg) определяется следующим образом:
private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
скопировать код
Между методом doAcquireInterruptably(int arg) и методом Acquire(int arg) есть только два различия.
1. Объявление метода выдает InterruptedException.
2. Флаг прерывания больше не используется в методе прерывания, а создается InterruptedException напрямую.
Эксклюзивное получение тайм-аутаВ дополнение к двум указанным выше методам AQS также предоставляет расширенный метод: tryAcquireNanos(int arg, long nanos). Этот метод является дальнейшим усовершенствованием методаAcquireInterruptably.В дополнение к реагированию на прерывания он также имеет контроль тайм-аута. То есть, если текущий поток не получит состояние синхронизации в течение указанного времени, он вернет false, в противном случае он вернет true. следующее:
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
скопировать код
Получение тайм-аута метода tryAcquireNanos(int arg, long nanosTimeout) окончательно реализовано в doAcquireNanos(int arg, long nanosTimeout) следующим образом:
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { //nanosTimeout <= 0 if (nanosTimeout <= 0L) return false; //超时时间 final long deadline = System.nanoTime() + nanosTimeout; //新增Node节点 final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { //自旋 for (;;) { final Node p = node.predecessor(); //获取同步状态成功 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } /* * 获取失败,做超时、中断判断 */ //重新计算需要休眠的时间 nanosTimeout = deadline - System.nanoTime(); //已经超时,返回false if (nanosTimeout <= 0L) return false; //如果没有超时,则等待nanosTimeout纳秒 //注:该线程会直接从LockSupport.parkNanos中返回, //LockSupport为JUC提供的一个阻塞和唤醒的工具类,后面做详细介绍 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); //线程是否已经中断了 if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
скопировать код
Для контроля времени ожидания программа сначала записывает крайний срок времени пробуждения, крайний срок = System.nanoTime() + nanosTimeout (интервал времени). Если получить состояние синхронизации не удается, вам необходимо рассчитать интервал времени nanosTimeout (= крайний срок - System.nanoTime()), который должен спать.Если nanosTimeout
Весь процесс выглядит следующим образом:
Эксклюзивный выпуск состояния синхронизацииПосле того, как поток получает состояние синхронизации, ему необходимо освободить состояние синхронизации после выполнения соответствующей логики. AQS предоставляет метод release(int arg) для сброса состояния синхронизации:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
скопировать код
Этот метод также вызывает пользовательский метод tryRelease(int arg) пользовательского синхронизатора для сброса состояния синхронизации. После успешного выпуска будет вызван метод unparkSuccessor(Node node) для пробуждения узла-преемника описано позже).
Вот небольшое резюме:
В AQS поддерживается очередь синхронизации FIFO.Если потоку не удается получить состояние синхронизации, он присоединяется к хвосту очереди синхронизации CLH и продолжает вращаться. Поток в очереди синхронизации CLH будет определять, является ли его узел-предшественник первым узлом при вращении, и если это первый узел, он будет постоянно пытаться получить состояние синхронизации, и если получение будет успешным, он выйдет из CLH. очередь синхронизации. Когда поток завершает выполнение логики, он освобождает состояние синхронизации и пробуждает узлы-преемники после освобождения.
общий
Основное различие между общим типом и эксклюзивным типом заключается в том, что только один поток эксклюзивного типа может одновременно получать состояние синхронизации, в то время как общий тип может иметь несколько потоков для одновременного получения состояния синхронизации. Например, операция чтения может выполняться несколькими потоками одновременно, в то время как операция записи может выполняться только одним потоком одновременно, а другие операции будут заблокированы.
Получение общего состояния синхронизацииAQS предоставляет метод acceShared(int arg) для обмена статусом синхронизации:
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) //获取失败,自旋获取同步状态 doAcquireShared(arg); }
скопировать код
Как видно из приведенной выше программы, первый метод заключается в вызове метода tryAcquireShared(int arg), чтобы попытаться получить состояние синхронизации. общим способом получения состояния синхронизации является возврат >= Значение 0 указывает на успешное получение. Необязательный статус синхронизации получается следующим образом:
private void doAcquireShared(int arg) { /共享式节点 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { //前驱节点 final Node p = node.predecessor(); //如果其前驱节点,获取同步状态 if (p == head) { //尝试获取同步 int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
скопировать код
Метод tryAcquireShared(int arg) пытается получить состояние синхронизации, и возвращаемое значение — int.Когда оно >= 0, это означает, что состояние синхронизации может быть получено, и в это время он может выйти из процесса вращения. .
МетодAcquireShared(int arg) не реагирует на прерывания.Подобно эксклюзивному типу, AQS также предоставляет методы для реагирования на прерывания и тайм-ауты, а именно: объясниться здесь. .
Общий выпуск состояния синхронизацииПосле получения состояния синхронизации необходимо вызвать метод release(int arg) для сброса состояния синхронизации.Метод выглядит следующим образом:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
скопировать код
Поскольку может существовать несколько потоков, которые одновременно освобождают ресурсы состояния синхронизации, необходимо обеспечить безопасное и успешное освобождение состояния синхронизации, что обычно выполняется с помощью CAS и циклов.
Блокировать и пробуждать темы
Если потоку не удается получить состояние синхронизации, он присоединяется к очереди синхронизации CLH и постоянно получает состояние синхронизации путем вращения, однако в процессе вращения необходимо определить, нужно ли блокировать текущий поток. приобретаетQueued() :
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true;
скопировать код
С помощью этого кода мы можем видеть, что после сбоя в получении статуса синхронизации поток не блокируется сразу, и необходимо проверить статус потока.Метод проверки статуса — shouldParkAfterFailedAcquire(Node pred, Node node ) метод, который в основном находится впереди.Узел драйвера определяет, должен ли текущий поток быть заблокирован.Код выглядит следующим образом:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //前驱节点 int ws = pred.waitStatus; //状态为signal,表示当前线程处于等待状态,直接放回true if (ws == Node.SIGNAL) return true; //前驱节点状态 > 0 ,则为Cancelled,表明该节点已经超时或者被中断了,需要从同步队列中取消 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } //前驱节点状态为Condition、propagate else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
скопировать код
Этот код в основном проверяет, нужно ли блокировать текущий поток.Конкретные правила таковы:
-
Если состояние узла-предшественника текущего потока равно SINNAL, это указывает на то, что текущий поток необходимо заблокировать, вызвать метод unpark() для пробуждения и напрямую вернуть значение true, а текущий поток заблокирован.
-
Если состояние узла-предшественника текущего потока ОТМЕНЕНО (ws > 0), это указывает на то, что узел-предшественник потока ожидал тайм-аута или был прерван, и узел-предшественник необходимо удалить из очереди CLH до тех пор, пока не будет выполнен возврат. к узлу-предшественнику state
-
Если предшествующий узел не является SINNAL или ОТМЕНЕН, установите для его предшествующего узла значение SINNAL с помощью CAS и верните false.
Если метод shouldParkAfterFailedAcquire(Node pred, Node node) возвращает значение true, вызовите метод parkAndCheckInterrupt(), чтобы заблокировать текущий поток:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
скопировать код
Метод parkAndCheckInterrupt() в основном приостанавливает текущий поток, тем самым блокируя стек вызовов потока, и возвращает состояние прерывания текущего потока. Внутри метод park() класса инструментов LockSupport вызывается для блокировки метода.
Когда поток освобождает состояние синхронизации, ему необходимо разбудить узел-преемник потока:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //唤醒后继节点 unparkSuccessor(h); return true; } return false; }
скопировать код
Вызовите unparkSuccessor(узел Node), чтобы разбудить узел-преемник:
private void unparkSuccessor(Node node) { //当前节点状态 int ws = node.waitStatus; //当前状态 < 0 则设置为 0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); //当前节点的后继节点 Node s = node.next; //后继节点为null或者其状态 > 0 (超时或者被中断了) if (s == null || s.waitStatus > 0) { s = null; //从tail节点来找可用节点 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //唤醒后继节点 if (s != null) LockSupport.unpark(s.thread); }
скопировать код
Могут быть случаи, когда узел-преемник текущего потока является нулевым, истекло время ожидания или прервано.Если это произойдет, вам нужно пропустить узел, но зачем начинать с хвостового узла вместо node.next? Причина в том, что node.next все еще может быть нулевым или отмененным, поэтому для поиска первого доступного потока используется хвостовой метод возврата. Наконец, вызовите метод unpark(Thread thread) LockSupport, чтобы разбудить поток.
LockSupport
Из приведенного выше я вижу, что когда поток необходимо заблокировать или разбудить, AQS выполняется с использованием класса инструментов LockSupport.
LockSupport — это базовый примитив блокировки потока, используемый для создания блокировок и других классов синхронизации.
Каждый поток, использующий LockSupport, связан с лицензией, и если лицензия доступна и доступна в процессе, вызов park() немедленно вернется, иначе он может заблокироваться. Если лицензия еще недоступна, вы можете вызвать unpark, чтобы сделать ее доступной. Но учтите, что лицензия не является реентерабельной, то есть метод park() можно вызвать только один раз, иначе он заблокируется навсегда.
LockSupport определяет серию методов, начиная с парковки для блокировки текущего потока и метода unpark(Thread thread) для пробуждения заблокированного потока. следующее:
Параметр blocker метода park(Object blocker) в основном используется для идентификации объекта, который ожидает текущий поток.Этот объект в основном используется для устранения неполадок и мониторинга системы.
И метод park, и unpark (Thread thread) появляются парами, при этом unpark должен выполняться после выполнения park, конечно, это не означает, что unpark thread будет заблокирован без вызова unpark thread. У Park есть метод, который принимает метку времени ( parkNanos(long nanos): отключает текущий поток для планирования потока, ожидает не более указанного времени ожидания, если только разрешение не доступно).
Исходный код метода park() выглядит следующим образом:
public static void park() { UNSAFE.park(false, 0L); }
скопировать код
Исходный код метода unpark(Thread thread) выглядит следующим образом:
public static void unpark(Thread thread) { if (thread != null) UNSAFE.unpark(thread); }
скопировать код
Как видно из вышеизложенного, его внутренняя реализация достигается через UNSAFE (sun.misc.Unsafe UNSAFE), который определяется следующим образом:
public native void park(boolean var1, long var2);public native void unpark(Object var1);
скопировать код
Оба являются нативными нативными методами. Unsafe — более опасный класс, в основном используемый для выполнения низкоуровневых небезопасных коллекций методов. Хотя этот класс и все методы являются общедоступными, использование этого класса по-прежнему ограничено, и вы не можете использовать этот класс напрямую в своей собственной Java-программе, потому что только доверенный код может получить экземпляр этого класса.
использованная литература
Дуг Ли: "Практика параллельного программирования на Java" Фанг Тэнфэй: "Искусство параллельного программирования на Java"