CountDownLatch
Фиксация CountDownLatch эквивалентна двери. Прежде чем фиксация достигнет конечного состояния, эта дверьвсегда закрыто, и ни один поток не может пройти, когда достигается конечное состояние, дверь открывается и позволяет пройти всем потокам. Когда блокировка достигает конечного состояния, она не изменит состояние,Дверь остается открытой навсегда
Принцип реализации CountDownLatch
CountDownLatch реализует метод через внутренний класс Sync, а sync наследует метод в шаблоне перезаписи AQS. Внутреннее определение синхронизации:
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
/**
* 获取同步状态
*/
int getCount() {
return getState();
}
/**
* 获取同步状态
*/
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
/**
* 释放同步状态
*/
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
Как видно из переписанного метода в исходном коде, синхронизация в CountDownLatch принимаетобщий режим. Пример обратного отсчета:
public class TestHarness {
public static long timeTasks(int nThreads, final Runnable task) throws InterruptedException {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads);
for (int i = 0; i < nThreads; i++) {
Thread t = new Thread() {
@Override
public void run() {
try {
startGate.await();
try {
System.out.println(Thread.currentThread().getName() + "开始执行");
task.run();
} finally {
endGate.countDown();
System.out.println(Thread.currentThread().getName() + "执行结束");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
t.start();
}
long start = System.nanoTime();
startGate.countDown();
endGate.await();
long end = System.nanoTime();
System.out.println("所有线程执行完毕,耗时:" + (end-start));
return end - start;
}
public static void main(String[] args) throws InterruptedException {
System.out.println(timeTasks(10, new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "————————work");
}
}));
}
}
результат операции:
Thread-0开始执行
Thread-3开始执行
Thread-0————————work
Thread-0执行结束
Thread-1开始执行
Thread-2开始执行
Thread-7开始执行
Thread-7————————work
Thread-7执行结束
Thread-9开始执行
Thread-9————————work
Thread-9执行结束
Thread-8开始执行
Thread-8————————work
Thread-8执行结束
Thread-2————————work
Thread-2执行结束
Thread-6开始执行
Thread-1————————work
Thread-6————————work
Thread-6执行结束
Thread-5开始执行
Thread-5————————work
Thread-5执行结束
Thread-3————————work
Thread-3执行结束
Thread-4开始执行
Thread-1执行结束
Thread-4————————work
Thread-4执行结束
所有线程执行完毕,耗时:2794976
2794976
CyclicBarrier
Относительно CountDownLatchОдноразовый объект, как только он входит в терминальное состояние, его нельзя сбросить, CyclicBarrier можно использовать повторно. CyclicBarrier похож на защелку, ноКлючевое различие между защелками заключается в том, что защелки используются для ожидания событий, а ограничения используются для ожидания других потоков., роль которого состоит в том, чтобы блокировать группу потоков, когда они достигают барьера (также называемого точкой синхронизации), пока последний поток не достигнет барьера, барьер откроет дверь, и все потоки, перехваченные барьером, продолжат работу.
Принцип реализации CyclicBarrier
Циклический способ построения барьера
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
Параметр partys относится к количеству потоков, перехваченных забором.
Параметр барьерное действие относится к потоку, который будет выполняться первым, когда все эти потоки достигнут барьера.
метод ожидания()
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// 获取锁
lock.lock();
try {
final Generation g = generation;
// 若栅栏处于断开状态,抛出异常
if (g.broken)
throw new BrokenBarrierException();
// 若线程中断,断开CyclicBarrier
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
// count为0表明所有线程到达栅栏位置
if (index == 0) { // tripped
boolean ranAction = false;
try {
// 若初始化时指定了所有线程到达栅栏时的任务,执行它
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 唤醒所有等待线程,开始新的generation
nextGeneration();
return 0;
} finally {
// 若任务执行异常,断开CyclicBarrier
if (!ranAction)
breakBarrier();
}
}
// 循环所有线程到达栅栏或栅栏断开或线程中断或超时
for (;;) {
try {
// 一直等待
if (!timed)
trip.await();
// 限时等待
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 若线程中断且栅栏没有断开,断开CyclicBarrier
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
// 若等待超时,断开CyclicBarrier
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
// 释放锁
lock.unlock();
}
}
Его основная логика: если поток не достигает позиции ограждения, поток, достигший положения ограждения, будет ждать, пока не произойдет следующий сценарий:
① Все нити достигают положения упора
② Поток прерван
③ Тайм-аут ожидания потока
④ Поток вызывает метод reset() для отключения текущего забора и сброса забора в начальное состояние.
метод сброса:
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 断开当前栅栏
breakBarrier(); // break the current generation
// 开始新的generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
Пример циклического барьера
public class CyclicBarrierTest {
private static CyclicBarrier cyclicBarrier;
static class CyclicBarrierThread extends Thread{
public void run() {
System.out.println("运动员:" + Thread.currentThread().getName() + "到场");
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args){
cyclicBarrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("运动员全部到齐,比赛开始");
}
});
for(int i = 0 ; i < 5 ; i++){
new CyclicBarrierThread().start();
}
}
}
результат операции:
运动员:Thread-0到场
运动员:Thread-1到场
运动员:Thread-2到场
运动员:Thread-3到场
运动员:Thread-4到场
运动员全部到齐,比赛开始
Разница между CountDownLatch и CyclicBarrier
① Счетчик .CountDownLatch можно использовать только один раз, а счетчик CyclicBarrier можно сбросить с помощью метода reset(). Таким образом, CyclicBarrier может обрабатывать более сложные бизнес-сценарии. Например, если при вычислении возникает ошибка, вы можете сбросить счетчик и позволить потоку сделать это снова.
②.CyclicBarrier также предоставляет другие полезные методы, такие как метод getNumberWaiting для получения количества потоков, заблокированных Cyclic-Barrier. Метод isBroken() используется, чтобы узнать, прерван ли заблокированный поток.
③.CountDownLatch имеет тенденцию ожидать один поток и другие потоки, а CyclicBarrier имеет тенденцию ждать, пока несколько потоков будут ждать друг друга.