Параллелизм Java — CountDownLatch и CyclicBarrier

интервью Java задняя часть Java EE

CountDownLatch

Фиксация CountDownLatch эквивалентна двери. Прежде чем фиксация достигнет конечного состояния, эта дверьвсегда закрыто, и ни один поток не может пройти, когда достигается конечное состояние, дверь открывается и позволяет пройти всем потокам. Когда блокировка достигает конечного состояния, она не изменит состояние,Дверь остается открытой навсегда

Принцип реализации CountDownLatch

CountDownLatch реализует метод через внутренний класс Sync, а sync наследует метод в шаблоне перезаписи AQS. Внутреннее определение синхронизации:


    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
         
        Sync(int count) {
            setState(count);
        }
        /**
         * 获取同步状态
         */
        int getCount() {
            return getState();
        }
        /**
         * 获取同步状态
         */
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
        /**
         * 释放同步状态
         */
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

Как видно из переписанного метода в исходном коде, синхронизация в CountDownLatch принимаетобщий режим. Пример обратного отсчета:


public class TestHarness {
     
    public static long timeTasks(int nThreads, final Runnable task) throws InterruptedException {
        final CountDownLatch startGate = new CountDownLatch(1);
        final CountDownLatch endGate = new CountDownLatch(nThreads);
         
        for (int i = 0; i < nThreads; i++) {
            Thread t = new Thread() {
                @Override
                public void run() {
                    try {
                        startGate.await();
                        try {
                            System.out.println(Thread.currentThread().getName() + "开始执行");
                            task.run();
                        } finally {
                            endGate.countDown();
                            System.out.println(Thread.currentThread().getName() + "执行结束");
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            t.start();
        }
        long start = System.nanoTime();
        startGate.countDown();
        endGate.await();
        long end = System.nanoTime();
        System.out.println("所有线程执行完毕,耗时:" + (end-start));
        return end - start;
    }
         
    public static void main(String[] args) throws InterruptedException {
        System.out.println(timeTasks(10, new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + "————————work");
            }
        }));
    }
}

результат операции:


Thread-0开始执行
Thread-3开始执行
Thread-0————————work
Thread-0执行结束
Thread-1开始执行
Thread-2开始执行
Thread-7开始执行
Thread-7————————work
Thread-7执行结束
Thread-9开始执行
Thread-9————————work
Thread-9执行结束
Thread-8开始执行
Thread-8————————work
Thread-8执行结束
Thread-2————————work
Thread-2执行结束
Thread-6开始执行
Thread-1————————work
Thread-6————————work
Thread-6执行结束
Thread-5开始执行
Thread-5————————work
Thread-5执行结束
Thread-3————————work
Thread-3执行结束
Thread-4开始执行
Thread-1执行结束
Thread-4————————work
Thread-4执行结束
所有线程执行完毕,耗时:2794976
2794976

CyclicBarrier

Относительно CountDownLatchОдноразовый объект, как только он входит в терминальное состояние, его нельзя сбросить, CyclicBarrier можно использовать повторно. CyclicBarrier похож на защелку, ноКлючевое различие между защелками заключается в том, что защелки используются для ожидания событий, а ограничения используются для ожидания других потоков., роль которого состоит в том, чтобы блокировать группу потоков, когда они достигают барьера (также называемого точкой синхронизации), пока последний поток не достигнет барьера, барьер откроет дверь, и все потоки, перехваченные барьером, продолжат работу.

Принцип реализации CyclicBarrier

Циклический способ построения барьера


    public CyclicBarrier(int parties) {
        this(parties, null);
    }
    
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

Параметр partys относится к количеству потоков, перехваченных забором.
Параметр барьерное действие относится к потоку, который будет выполняться первым, когда все эти потоки достигнут барьера.

метод ожидания()


    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
    
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        // 获取锁
        lock.lock();
        try {
            final Generation g = generation;
            // 若栅栏处于断开状态,抛出异常
            if (g.broken)
                throw new BrokenBarrierException();
            // 若线程中断,断开CyclicBarrier
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            int index = --count;
            // count为0表明所有线程到达栅栏位置
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    // 若初始化时指定了所有线程到达栅栏时的任务,执行它 
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    // 唤醒所有等待线程,开始新的generation
                    nextGeneration();
                    return 0;
                } finally {
                    // 若任务执行异常,断开CyclicBarrier
                    if (!ranAction)
                        breakBarrier();
                }
            }
            // 循环所有线程到达栅栏或栅栏断开或线程中断或超时
            for (;;) {
                try {
                    // 一直等待
                    if (!timed)
                        trip.await();
                    // 限时等待    
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    // 若线程中断且栅栏没有断开,断开CyclicBarrier
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
                 
                if (g.broken)
                    throw new BrokenBarrierException();
                
                if (g != generation)
                    return index;
                // 若等待超时,断开CyclicBarrier
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            // 释放锁
            lock.unlock();
        }
    }

Его основная логика: если поток не достигает позиции ограждения, поток, достигший положения ограждения, будет ждать, пока не произойдет следующий сценарий:
① Все нити достигают положения упора
② Поток прерван
③ Тайм-аут ожидания потока
④ Поток вызывает метод reset() для отключения текущего забора и сброса забора в начальное состояние.

метод сброса:


    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 断开当前栅栏
            breakBarrier();   // break the current generation
            // 开始新的generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }

Пример циклического барьера


public class CyclicBarrierTest {
     
    private static CyclicBarrier cyclicBarrier;
     
    static class CyclicBarrierThread extends Thread{
        public void run() {
            System.out.println("运动员:" + Thread.currentThread().getName() + "到场");
            try {
                cyclicBarrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
     
    public static void main(String[] args){
        cyclicBarrier = new CyclicBarrier(5, new Runnable() {
            @Override
            public void run() {
                System.out.println("运动员全部到齐,比赛开始");
            }
        });
         
        for(int i = 0 ; i < 5 ; i++){
            new CyclicBarrierThread().start();
        }
    }

}

результат операции:


运动员:Thread-0到场
运动员:Thread-1到场
运动员:Thread-2到场
运动员:Thread-3到场
运动员:Thread-4到场
运动员全部到齐,比赛开始

Разница между CountDownLatch и CyclicBarrier

① Счетчик .CountDownLatch можно использовать только один раз, а счетчик CyclicBarrier можно сбросить с помощью метода reset(). Таким образом, CyclicBarrier может обрабатывать более сложные бизнес-сценарии. Например, если при вычислении возникает ошибка, вы можете сбросить счетчик и позволить потоку сделать это снова.

②.CyclicBarrier также предоставляет другие полезные методы, такие как метод getNumberWaiting для получения количества потоков, заблокированных Cyclic-Barrier. Метод isBroken() используется, чтобы узнать, прерван ли заблокированный поток.

③.CountDownLatch имеет тенденцию ожидать один поток и другие потоки, а CyclicBarrier имеет тенденцию ждать, пока несколько потоков будут ждать друг друга.