CountDownLatch
это инструмент для многопоточного управления, он называется门阀
,计数器
или闭锁
. Этот инструмент часто используется для координации синхронизации между несколькими потоками или для связи между потоками (а не для взаимного исключения). Давайте познакомимся с CountDownLatch
Познакомьтесь с CountDownLatch
CountDownLatch позволяет потоку ждать, пока другие потоки завершат свою работу, прежде чем продолжить. Он эквивалентен счетчику. Начальное значение счетчика — это количество потоков. Всякий раз, когда задача завершается, значение счетчика уменьшается на единицу. Когда значение счетчика равно 0, это означает, что все потоки завершены задачи. Поток, ожидающий CountDownLatch, может возобновить выполнение следующей задачи.
Использование CountDownLatch
CountDownLatch предоставляет конструктор, необходимо указать его начальное значение, а также указатьcountDown
метод, функция этого метода в основном используется для уменьшения значения счетчика, когда счетчик становится 0, на CountDownLatchawait
Поток будет разбужен и продолжит выполнение других задач. Конечно, пробуждение также можно отложить, чего можно добиться, добавив время задержки в CountDownLatch.
Его основные методы заключаются в следующем.
CountDownLatch в основном имеет следующие сценарии применения
Сценарии применения CountDownLatch
Типичный сценарий приложения заключается в том, что при запуске службы одновременно загружаются многие компоненты и службы, а основной поток будет ожидать загрузки компонентов и служб. Когда все компоненты и службы загружены, основной поток и другие потоки выполняют задачу вместе.
CountDownLatch также может реализовать программу, в которой учащиеся вместе участвуют в гонке. CountDownLatch инициализируется как поток с количеством учеников. После выстрела каждый ученик становится потоком для выполнения своих соответствующих задач. Когда первый ученик запускает На всем протяжении CountDownLatch будет уменьшаться. Во-первых, CountDownLatch станет равным 0, пока все ученики не закончат, а затем объявят результаты выполнения вместе.
Следуя этому сценарию, вы можете самостоятельно расширять и расширять многие другие сценарии задач.
Использование CountDownLatch
Давайте продемонстрируем использование CountDownLatch на простом счетчике.
public class TCountDownLatch {
public static void main(String[] args) {
CountDownLatch latch = new CountDownLatch(5);
Increment increment = new Increment(latch);
Decrement decrement = new Decrement(latch);
new Thread(increment).start();
new Thread(decrement).start();
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Decrement implements Runnable {
CountDownLatch countDownLatch;
public Decrement(CountDownLatch countDownLatch){
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
for(long i = countDownLatch.getCount();i > 0;i--){
Thread.sleep(1000);
System.out.println("countdown");
this.countDownLatch.countDown();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Increment implements Runnable {
CountDownLatch countDownLatch;
public Increment(CountDownLatch countDownLatch){
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
System.out.println("await");
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Waiter Released");
}
}
В основном методе мы инициализируем CountDownLatch со счетчиком 5, в методе Decrement мы используемcountDown
Выполните операцию уменьшения, затем приостановите работу на некоторое время и подождите в классе Increment, пока поток в классе Decrement не завершит операцию уменьшения счетчика на единицу, разбудите метод run в классе Increment, чтобы продолжить выполнение.
Далее давайте продемонстрируем конкретное использование CountDownLatch на примере бега учащихся.
public class StudentRunRace {
CountDownLatch stopLatch = new CountDownLatch(1);
CountDownLatch runLatch = new CountDownLatch(10);
public void waitSignal() throws Exception{
System.out.println("选手" + Thread.currentThread().getName() + "正在等待裁判发布口令");
stopLatch.await();
System.out.println("选手" + Thread.currentThread().getName() + "已接受裁判口令");
Thread.sleep((long) (Math.random() * 10000));
System.out.println("选手" + Thread.currentThread().getName() + "到达终点");
runLatch.countDown();
}
public void waitStop() throws Exception{
Thread.sleep((long) (Math.random() * 10000));
System.out.println("裁判"+Thread.currentThread().getName()+"即将发布口令");
stopLatch.countDown();
System.out.println("裁判"+Thread.currentThread().getName()+"已发送口令,正在等待所有选手到达终点");
runLatch.await();
System.out.println("所有选手都到达终点");
System.out.println("裁判"+Thread.currentThread().getName()+"汇总成绩排名");
}
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
StudentRunRace studentRunRace = new StudentRunRace();
for (int i = 0; i < 10; i++) {
Runnable runnable = () -> {
try {
studentRunRace.waitSignal();
} catch (Exception e) {
e.printStackTrace();
}
};
service.execute(runnable);
}
try {
studentRunRace.waitStop();
} catch (Exception e) {
e.printStackTrace();
}
service.shutdown();
}
}
Давайте проанализируем это вместеCountDownLatch
исходный код
Анализ исходного кода CountDownLatch
CountDownLatch относительно прост в использовании, но очень полезен Теперь вы можете добавить CountDownLatch в свой набор инструментов. Давайте подробнее рассмотрим CountDownLatch.
Нижний слой CountDownLatch определяетсяAbstractQueuedSynchronizer
Поддержка, а ядром структуры данных AQS являются две очереди, одна同步队列(sync queue)
,один条件队列(condition queue)
.
Синхронизировать внутренний класс
CountDownLatch — это Sync внутри него, который наследует абстрактный класс AQS.
private static final class Sync extends AbstractQueuedSynchronizer {...}
CountDownLatch на самом деле имеет только один внутриsync
свойство и является окончательным
private final Sync sync;
CountDownLatch имеет только один конструктор с параметрами
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
То есть количество счетчиков должно быть указано при инициализации, если число отрицательное, то сразу будет выброшено исключение.
Затем инициализируйте count для подсчета внутри Sync, который
Sync(int count) {
setState(count);
}
Обратите внимание, что здесь есть setState(count), что это значит? Это просто операция по установке состояния, но на самом деле это не только, есть еще и слой значения, что значение состояния представляет собой количество потоков для достижения условия. Мы обсудим это, когда будем говорить о методе countDown.
getCount()
Возвращаемое значение методаgetState()
метод, который является методом в AbstractQueuedSynchronizer, который возвращает текущее количество потоков с семантикой памяти изменчивых операций чтения.
// ---- CountDownLatch ----
int getCount() {
return getState();
}
// ---- AbstractQueuedSynchronizer ----
protected final int getState() {
return state;
}
tryAcquireShared()
Метод используется для получения состояния объекта в разделяемом состоянии, для определения является ли объект 0, если он 0, то возвращает 1, а это значит, что он может попытаться его получить, а если не 0, он возвращает -1, что означает, что его нельзя получить.
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// ---- getState() 方法和上面的方法相同 ----
это共享状态
Он относится к концепции AQS и разделен на два режима в AQS, один из которых独占模式
, один共享模式
.
- Эксклюзивный режим tryAcquire, попытка получить ресурс, возврат true в случае успеха, false в случае неудачи.
- Попробуйте поделиться методом tryAcquireShared, попробуйте получить ресурсы. Отрицательное число означает сбой, 0 — успех, но доступных ресурсов нет, положительное число — успех и оставшиеся ресурсы.
tryReleaseShared()
способ выпуска в общем режиме
protected boolean tryReleaseShared(int releases) {
// 减小数量,变为 0 的时候进行通知。
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
Этот метод представляет собой бесконечный цикл для получения состояния потока. Если состояние потока равно 0, это означает, что он не занят потоком. Если он не занят, он напрямую возвращает false, указывая на то, что он был освобожден; тогда следующее состояние - 1, используя метод CAS compareAndSetState для выполнения и сравнения значений памяти, если значение памяти также равно 1, значение памяти будет обновлено до 0, и будет оцениваться, равно ли nextc 0. Если сравнение CAS неудачно, оценка цикла будет выполнена снова.
Если использование CAS неясно, читатели и друзья могут обратиться к этой статье.Расскажу вам великий секрет AtomicInteger!
метод ожидания
await()
Этот метод является очень важным методом CountDownLatch. По сути, только методы countDown и await являются сущностью CountDownLatch. Этот метод заставит текущий поток ждать, пока счетчик CountDownLatch не уменьшится до нуля, если только поток не будет прерван.
В CountDownLatch есть два метода ожидания, один без каких-либо параметров.await()
, можно немного подождатьawait(long timeout, TimeUnit unit)
. Давайте сначала посмотрим на метод await().
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
Внутри метода await вызывается методAcquireSharedInterruptily, который является методом AQS, который прерывается в общем режиме.
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
Как вы можете видеть, внутренняя часть методаAcquireSharedInterruptably сначала определит, является ли поток中断
, если поток прерывается, сразу создается исключение прерывания потока. Если нет прерывания, оно будет приобретено общим способом. Если блокировка не может быть получена в общем режиме, то общий режим будет отключен.
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
Этот способ немного длинный, рассмотрим его отдельно
- Сначала будет создан узел в общем режиме для присоединения к очереди.
- Затем используйте бесконечный цикл, чтобы судить о узле-предшественнике вновь созданного узла. Если узел-предшественник узла узла является головным узлом, то будет оцениваться состояние потока. Здесь вызывается setHeadAndPropagate, и его исходный код следующее
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
Сначала будет установлен головной узел, а затем будет сделан ряд суждений, чтобы получить преемника узла, чтобы получить узел и освободить его в общем режиме, будет вызван метод doReleaseShared. метод doReleaseShared.
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
Этот метод сначала определит, равен ли головной узел хвостовому узлу в бесконечном цикле.Если головной узел равен хвостовому узлу, он выйдет напрямую. Если головной узел не равен хвостовому узлу, он определит, является ли состояние SIGNAL, если нет, продолжит цикл compareAndSetWaitStatus, а затем отключит узел-преемник. Если состояние не SIGNAL, будет также вызвана функция compareAndSetWaitStatus для установки статуса PROPAGATE, а если статус равен 0 и неудачный, цикл продолжится.
Другими словами, setHeadAndPropagate представляет собой серию процессов установки головного узла и освобождения узлов-преемников.
- Давайте посмотрим на следующее, если суждение, которое
shouldParkAfterFailedAcquire(p, node)
здесь
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
Если вышеприведенный узел p = node.predecessor() получает, что предшествующий узел не является головным узлом, будет выполнена операция отключения парковки, чтобы определить, может ли он быть отключен в это время.Критерии оценки следующие:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
Этот метод будет оценивать узел-предшественник узла p.结点状态(waitStatus)
, существует пять типов состояний узла, которые
-
CANCELLED(1)
: Указывает, что текущий узел был отменен. Когда он истечет или будет прерван (в случае ответа на прерывание), он вызовет изменение этого состояния, и узел после входа в это состояние не изменится. -
SIGNAL(-1)
: Указывает, что узел-преемник ожидает пробуждения текущего узла. Когда узел-преемник ставится в очередь, состояние узла-предшественника будет обновлено до SIGNAL. -
CONDITION(-2)
: указывает, что узел ожидает выполнения условия. Когда другие потоки вызывают метод signal() условия, узел в состоянии УСЛОВИЯ будетПеревод из очереди ожидания в очередь синхронизации, ожидая получения блокировки синхронизации. -
PROPAGATE(-3)
: в совместно используемом режиме узел-предшественник не только разбудит свой узел-преемник, но также может разбудить узел-преемник. -
0
: состояние по умолчанию, когда новый узел ставится в очередь.
Если предшествующий узел SIGNAL, он вернет true, чтобы указать, что он может быть отключен.Если состояние предшествующего узла больше 0 (почему бы не использовать ws == Node.CANCEELLED в это время)? Потому что условие для ws больше 0 - это только состояние CANCELED. Затем выполняется ряд операций обхода поиска до тех пор, пока состояние ожидания узла-предшественника не станет > 0. Если ws
Если проверка будет прервана, она вернет false.
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
Этот метод используетLockSupport.park
Отключите, затем верните флаг, если поток прерван.
-
cancelAcquire()
Он используется для отмены очереди ожидания.Если ресурс не был успешно получен во время процесса ожидания (например, тайм-аут, или прерывается, если его можно прервать), то отменяет ожидание узла в очереди.
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
Следовательно, вызов await для CountDownLatch будет примерно иметь следующий вызывающий процесс.
Перегруженный метод с ожиданиемawait(long timeout, TimeUnit unit)
, основное различие между этим методом и ожиданием заключается в том, что этот метод может ожидать счетчика в течение определенного периода времени, прежде чем выполнять последующие операции.
метод обратного отсчета
countDown является таким же важным методом, как и await.countDown используется для уменьшения количества счетчиков.Если счетчик уменьшится до 0, все потоки будут освобождены.
public void countDown() {
sync.releaseShared(1);
}
Этот метод вызовет метод releaseShared, который используется для операции освобождения в общем режиме.Во-первых, он определит, может ли быть выполнено освобождение.Метод оценки — это метод tryReleaseShared внутреннего класса Sync CountDownLatch.
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// ---- CountDownLatch ----
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
tryReleaseShared выполнит цикл for, чтобы определить значение состояния потока, и использует CAS, чтобы постоянно пытаться заменить его.
Если он может быть выпущен, будет вызван метод doReleaseShared.
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
Как видите, doReleaseShared на самом деле представляет собой бесконечный цикл, который постоянно использует CAS для попытки замены.
Суммировать
Эта статья представляет собой базовое использование и анализ исходного кода CountDownLatch. CountDownLatch — это счетчик на основе AQS. Все его внутренние методы связаны с инфраструктурой AQS. реализации, поэтому изучение параллелизма неотделимо от обсуждения AQS. Исходный код CountDownLatch выглядит небольшим и простым, но его внутренняя цепочка вызовов, такая как метод await, очень длинная, и стоит потратить время на его подробное изучение.
Я cxuan, программист технического творчества. Если вы считаете, что эта статья хороша, прошу читателей поставить лайк, прочитать и поделиться!