Параллелизм Java (6) — CountDownLatch, семафор и AQS

Node.js Java задняя часть исходный код

введение

В предыдущей статье был подробно проанализирован принцип ReentrantLock на основе AQS ReentrantLock представляет собой монопольную блокировку посредством преобразования между переменными состояния 0 и 1 в AQS. Так что подумайте об этом, что это представляет, когда переменная состояния больше 1? Есть ли такая реализация на базе AQS в J.U.C? Если да, то как они этого добились? Ответы на эти вопросы будут даны в подробном анализе классов Semaphore и CountDownLatch в J.U.C.

  1. Общая логика между Semaphore и CountDownLatch
  2. Пример использования Semaphore и CountDownLatch
  • 2.1 Использование семафора
  • 2.2 Использование CountDownLatch
  1. Анализ исходного кода
  • 3.1 Реализация общих блокировок в AQS
  • 3.2 Анализ исходного кода семафора
  • 3.3 Анализ исходного кода CountDownLatch
  1. Суммировать

1. Как совместно используются Semaphore и CountDownLatch

Эксклюзивная блокировка означает, что может быть только одна блокировка получения потока, и другие потоки должны ждать снятия блокировки в случае, когда блокировка занята, прежде чем продолжить. Означает ли совместная блокировка, что совместная блокировка означает, что вы можете использовать эту блокировку одновременно, не подождите? Если да, то смысл этой блокировки не существует. Совместное использование в J.U.C означает, что несколько потоков могут получать блокировки одновременно, но это ограничено, а не безгранично, J.U.C через семафор и CountDownLatch, соответственно, две ограниченные общие блокировки соответственно.

Семафор также называется семафором. Он назначает сигналы каждому потоку, который его использует, через общий «сигнальный пакет». Когда сигнала в сигнальном пакете достаточно, поток может получить блокировку. Напротив, сигнал в сигнальном пакете недостаточно, тогда блокировка не может быть получена, и необходимо дождаться освобождения достаточного количества сигналов, прежде чем она может быть получена.

CountDownLatch также называется счетчиком. Он управляет получением блокировок потока с помощью общего общего счетчика. Когда общий счетчик счетчика больше 0, поток блокируется и не может получить блокировку. Только когда общий счетчик счетчик равен 0, все заблокированы, потоки освобождаются одновременно.

Видно, что и Semaphore, и CountDownLatch имеют общую общую сумму, которая достигается через состояние.

2. Пример использования Semaphore и CountDownLatch

Прежде чем подробно анализировать принципы Semaphore и CountDownLatch, давайте посмотрим, как они используются, чтобы мы могли понять их принципы в будущем. Знаешь, какой он первый? Тогда спросите, почему? Следующие два примера подробно иллюстрируют использование Semaphore и CountDownLatch.

2.1 Использование семафора

//初始化10个信号量在信号包中,让ABCD4个线程分别去获取
public static void main(String[] args) throws InterruptedException {
    Semaphore semaphore = new Semaphore(10);
	SemaphoreTest(semaphore);
}

private static void SemaphoreTest(final Semaphore semaphore) throws InterruptedException {
    //线程A初始获取了4个信号量,然后分3次释放了这4个信号量
    Thread threadA = new Thread(new Runnable() {

        @Override
        public void run() {
            try {
                semaphore.acquire(4);
                System.out.println(Thread.currentThread().getName() + " get 4 semaphore");
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getName() + " release 1 semaphore");
                semaphore.release(1);
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getName() + " release 1 semaphore");
                semaphore.release(1);
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getName() + " release 2 semaphore");
                semaphore.release(2);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    });
    threadA.setName("threadA");

    //线程B初始获取了5个信号量,然后分2次释放了这5个信号量
    Thread threadB = new Thread(new Runnable() {

        @Override
        public void run() {
            try {
                semaphore.acquire(5);
                System.out.println(Thread.currentThread().getName() + " get 5 semaphore");
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getName() + " release 2 semaphore");
                semaphore.release(2);
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getName() + " release 3 semaphore");
                semaphore.release(3);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    });
    threadB.setName("threadB");

    //线程C初始获取了4个信号量,然后分1次释放了这4个信号量
    Thread threadC = new Thread(new Runnable() {

        @Override
        public void run() {
            try {
                semaphore.acquire(4);
                System.out.println(Thread.currentThread().getName() + " get 4 semaphore");
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName() + " release 4 semaphore");
                semaphore.release(4);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    });
    threadC.setName("threadC");
    
    //线程D初始获取了10个信号量,然后分1次释放了这10个信号量
    Thread threadD = new Thread(new Runnable() {

        @Override
        public void run() {
            try {
                semaphore.acquire(10);
                System.out.println(Thread.currentThread().getName() + " get 10 semaphore");
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName() + " release 10 semaphore");
                semaphore.release(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    });
    threadD.setName("threadD");
    
    //线程A和线程B首先分别获取了4个和5个信号量,总信号量变为了1个
    threadA.start();
    threadB.start();
    Thread.sleep(1);
    //线程C尝试获取4个发现不够则等待
    threadC.start();
    Thread.sleep(1);
    //线程D尝试获取10个发现不够则等待
    threadD.start();
}

Результат выполнения следующий:

threadB get 5 semaphore
threadA get 4 semaphore
threadA release 1 semaphore
threadB release 2 semaphore
threadC get 4 semaphore
threadA release 1 semaphore
threadC release 4 semaphore
threadB release 3 semaphore
threadA release 2 semaphore
threadD get 10 semaphore
threadD release 10 semaphore

Можно видеть, что threadA и threadB могут продолжать выполняться после того, как threadC и threadD дождутся достаточного количества семафоров после получения 9 семафоров. А threadA и threadB могут выполняться одновременно, когда достаточно семафора.

Возникает вопрос, когда threadD ставится в очередь перед threadC, если освобождаются 4 семафора, будет ли threadC выполняться раньше threadD? Или нужно стоять в очереди? На этот вопрос будет дан ответ после детального анализа исходного кода Semaphore.

2.2 Использование CountDownLatch

//初始化计数器总量为2
public static void main(String[] args) throws InterruptedException {
    CountDownLatch countDownLatch = new CountDownLatch(2);
    CountDownLatchTest(countDownLatch);
}

private static void CountDownLatchTest(final CountDownLatch countDownLatch) throws InterruptedException {
    //threadA尝试执行,计数器为2被阻塞
    Thread threadA = new Thread(new Runnable() {

        @Override
        public void run() {
            try {
                countDownLatch.await();
                System.out.println(Thread.currentThread().getName() + " await");

            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    });
    threadA.setName("threadA");

    //threadB尝试执行,计数器为2被阻塞
    Thread threadB = new Thread(new Runnable() {

        @Override
        public void run() {
            try {
                countDownLatch.await();
                System.out.println(Thread.currentThread().getName() + " await");
                
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    });
    threadB.setName("threadB");
    
    //threadC在1秒后将计数器数量减1
    Thread threadC = new Thread(new Runnable() {

        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                countDownLatch.countDown();
                
                System.out.println(Thread.currentThread().getName() + " countDown");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    });
    threadC.setName("threadC");
    
    //threadD在5秒后将计数器数量减1
    Thread threadD = new Thread(new Runnable() {

        @Override
        public void run() {
            try {
                Thread.sleep(5000);
                countDownLatch.countDown();
                
                System.out.println(Thread.currentThread().getName() + " countDown");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    });
    threadD.setName("threadD");
    
    threadA.start();
    threadB.start();
    threadC.start();
    threadD.start();
}

Результат выполнения следующий:

threadC countDown
threadD countDown
threadA await
threadB await

Когда threadA и threadB пытаются выполниться, они блокируются, потому что общий счетчик равен 2. Когда threadC и threadD уменьшают общий счетчик до 0, threadA и threadB начинают выполняться одновременно.

Чтобы суммировать его: семафор - это как вращающийся ресторан суши, с 10 сиденьями, а когда сидения становятся доступными, официанты могут занять свои места. Если есть только 2 вакансии, и семья из 3 приходит, то есть только ожидание. Если вы приедете как пару, вы можете просто сидеть и есть. Конечно, если одновременно освобождаются 5 мест, то семейство из 3 и пару может пойти в то же время. CountDownlatch похож на временную игровую площадку в большом торговом центре. После времени каждого развлечения люди ждут, чтобы войти в проведение, чтобы играть в то же время, а в середине игры будут люди, которые не любят Играйте в любое время, но они не могут войти, как только все люди, которые вступают в все вышел, и новая группа людей может вступить в поле одновременно.

3. Анализ исходного кода

Поймите, что делают Semaphore и CountDownLatch и как их использовать. Далее давайте посмотрим, как Semaphore и CountDownLatch реализуют эти функции внизу.

3.1 Реализация общих блокировок в AQS

В предыдущей статье путем анализа ReentrantLock было найдено несколько ключевых методов реализации монопольных блокировок в AQS:

//状态量,独占锁在0和1之间切换
private volatile int state;

//调用tryAcquire获取锁,获取失败后加入队列中挂起等操作,AQS中实现
public final void acquire(int arg);

//独占模式下尝试获取锁,ReentrantLock中实现
protected boolean tryAcquire(int arg);

//调用tryRelease释放锁以及恢复线程等操作,AQS中实现
public final boolean release(int arg);

//独占模式下尝试释放锁,ReentrantLock中实现
protected boolean tryRelease(int arg);

Конкретная логика получения и снятия эксклюзивных блокировок реализована в ReentrantLock, а AQS отвечает за управление логикой, которая должна быть обработана после успешного или неудачного получения или снятия эксклюзивных блокировок. Так следует ли реализации разделяемых блокировок этому правилу? Из этого мы обнаружили следующие аналогичные методы в AQS:

//调用tryAcquireShared获取锁,获取失败后加入队列中挂起等操作,AQS中实现
public final void acquireShared(int arg);

//共享模式下尝试获取锁
protected int tryAcquireShared(int arg);

//调用tryReleaseShared释放锁以及恢复线程等操作,AQS中实现
public final boolean releaseShared(int arg);

//共享模式下尝试释放锁
protected boolean tryReleaseShared(int arg);

Общий замок и ядро ​​находятся в четырех клавишах выше, давайте посмотрим на то, как семафор вызывает вышеуказанный метод для достижения общего блокировки.

3.2 Анализ исходного кода семафора

Первый - это метод построения Semaphore. Как и ReentrantLock, он имеет два метода построения. Это также для достижения справедливых общих блокировок и несправедливых общих блокировок. У вас могут возникнуть вопросы. Поскольку это общая блокировка, почему она делится на справедливые и несправедливо? ? Это возвращает нас к вопросу, лежащему в основе приведенного выше примера: когда впереди находятся ожидающие потоки, могут ли более поздние потоки напрямую получить семафор или они должны стоять в очереди. Конечно, ждать справедливо, но несправедливо вставать в очередь.

Возьмем пример чередующихся суши: сейчас свободных мест всего 2, а семья из 3 человек уже ждет.В это время прибывает пара. Настала их очередь, и реализация нечестной общей блокировки позволяет паре есть напрямую, потому что свободных мест всего 2, поэтому семья из 3 человек продолжает ждать (казается очень несправедливой...), это Преимущество несправедливо разделенных блокировок в этом случае состоит в том, что это может максимизировать прибыль суши-ресторана (кажется, что это одновременно оскорбляет ожидающих клиентов...), что также является реализацией Semaphore по умолчанию.

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

В примере с семафором используются два основных метода: «acquire» и «release», которые вызывают методы «acquireSharedInterruptably» и «releaseShared» в AQS соответственно:

//获取permits个信号量
public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

//释放permits个信号量
public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0) //尝试获取arg个信号量
        doAcquireSharedInterruptibly(arg); //获取信号量失败时排队挂起
}

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) { //尝试释放arg个信号量
        doReleaseShared();
        return true;
    }
    return false;
}

Процесс получения и освобождения семафоров Semaphore реализуется через AQS, а то, как вычислить успех или успешное освобождение, реализовано самим Semaphore.

//公平共享锁尝试获取acquires个信号量
protected int tryAcquireShared(int acquires) {
    for (;;) {
        if (hasQueuedPredecessors()) //前面是否有排队,有则返回获取失败
            return -1;
        int available = getState(); //剩余的信号量(旋转寿司店剩余的座位)
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining)) // 剩余信号量不够,够的情况下尝试获取(旋转寿司店座位不够,或者同时来两对情况抢座位)
            return remaining;
    }
}
//非公平共享锁尝试获取acquires个信号量
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState(); //剩余的信号量(旋转寿司店剩余的座位)
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining)) // 剩余信号量不够,够的情况下尝试获取(旋转寿司店座位不够,或者同时来两对情侣抢座位)
            return remaining;
    }
}

Можно видеть, что разница между справедливой общей блокировкой и несправедливой общей блокировкой заключается в том, необходимо ли оценивать наличие потоков, ожидающих в очереди. Справедливая общая блокировка должна оцениваться в первую очередь, а несправедливая общая блокировка напрямую вставляет очередь, хотя впереди уже есть потоки, ожидающие.

Чтобы проверить этот вывод, слегка измените вышеприведенный пример:

threadA.start();
threadB.start();
Thread.sleep(1);
threadD.start(); //threadD已经在排队
Thread.sleep(3500);
threadC.start(); //3500毫秒后threadC来插队

Вывод результата:

threadB get 5 semaphore
threadA get 4 semaphore
threadB release 2 semaphore
threadA release 1 semaphore
threadC get 4 semaphore //threadC先与threadD获取到信号量
threadA release 1 semaphore
threadB release 3 semaphore
threadC release 4 semaphore
threadA release 2 semaphore
threadD get 10 semaphore
threadD release 10 semaphore

Этот пример является хорошим примером попытки получить общую блокировку перед постановкой в ​​очередь, когда это нечестная блокировка.

При неудачном получении семафора он будет поставлен в очередь.Эта операция реализована через метод doAcquireSharedInterruptably в AQS:

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED); //加入等待队列
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor(); //获取当前节点的前置节点
            if (p == head) {
                int r = tryAcquireShared(arg); //前置节点是头节点时,说明当前节点是第一个挂起的线程节点,再次尝试获取共享锁
                if (r >= 0) {
                    setHeadAndPropagate(node, r); //与ReentrantLock不同的地方:获取共享锁成功设置头节点,同时通知下一个节点
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) && //非头节点或者获取锁失败,检查节点状态,查看是否需要挂起线程
                parkAndCheckInterrupt()) //挂起线程,当前线程阻塞在这里!
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

Этот фрагмент кода в основном такой же, как метод AcquireQueued(addWaiter(Node.EXCLUSIVE), arg) в ReentrantLock, с двумя отличиями. Во-первых, при вступлении в очередь ожидания сюда добавляется узел типа Node.SHARED. Второй — уведомить следующий узел после успешного получения блокировки, то есть разбудить следующий поток. Возьмем в качестве примера вращающийся суши-ресторан: впереди одновременно идут 5 гостей, и осталось 5 мест.После того, как семья из 3 сядет, они скажут паре сзади, чтобы они разрешили сидеть в также, таким образом достигая цели обмена. Методы shouldParkAfterFailedAcquire и parkAndCheckInterrupt подробно описаны в предыдущей статье и объясняются здесь.

Давайте посмотрим, как освободить семафор в методе releaseShared. Сначала вызовите tryReleaseShared, чтобы попытаться освободить семафор. После успешного освобождения вызовите doReleaseShared, чтобы определить, нужно ли разбудить последующий поток:

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow //释放信号量过多
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next)) //cas操作设置新的信号量
            return true;
    }
}

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) { //SIGNAL状态下唤醒后继节点
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h); //唤醒后继节点
            }
            else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

Логика релиза хорошо понятна, по сравнению с ReentrantLock он лишь немного отличается количеством состояний.

3.3 Анализ исходного кода CountDownLatch

По сравнению с Semaphore, CountDownLatch гораздо проще по логике реализации, и нет разницы между справедливостью и несправедливостью, потому что когда счетчик достигает 0, все ожидающие потоки будут освобождены, а когда он не равен 0, все ожидающие потоки будут освобождены. блокировать. Давайте посмотрим непосредственно на два основных метода CountDownLatch, await и countDown.

public void await() throws InterruptedException {
    //和Semaphore的不同在于参数为1,其实这个参数对CountDownLatch来说没什么意义,因为后面CountDownLatch的tryAcquireShared实现是通过getState() == 0来判断的
    sync.acquireSharedInterruptibly(1); 
}

public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    //这里加入了一个等待超时控制,超过时间后直接返回false执行后面的代码,不会长时间阻塞
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); 
}

public void countDown() {
    sync.releaseShared(1); //每次释放1个计数
}

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0) //尝试获取arg个信号量
        doAcquireSharedInterruptibly(arg); //获取信号量失败时排队挂起
}

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1; //奠定了同时获取锁的基础,无论State初始为多少,只能计数等于0时触发
}

Есть два отличия от семафора: во-первых, состояние каждый раз уменьшается только на 1, а все ожидающие потоки освобождаются только тогда, когда оно равно 0. Во-вторых, предоставить метод ожидания тайм-аута. МетодAcquireSharedInterruptily такой же, как и Semaphore, поэтому я не буду вдаваться в подробности, здесь мы сосредоточимся на методе tryAcquireSharedNanos.

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 ||
        doAcquireSharedNanos(arg, nanosTimeout);
}

//最小自旋时间
static final long spinForTimeoutThreshold = 1000L;

private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout; //计算了一个deadline
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
            }
            nanosTimeout = deadline - System.nanoTime(); 
            if (nanosTimeout <= 0L) //超时后直接返回false,继续执行
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold) //大于最小cas操作时间则挂起线程
                LockSupport.parkNanos(this, nanosTimeout); //挂起线程也有超时限制
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

Сосредоточьтесь на нескольких строках кода, помеченных комментариями. Сначала вычисляется период ожидания. Когда время ожидания превышено, ожидание сразу завершается и выполнение продолжается. Если время ожидания не истекло и оно превышает минимальное время работы cas, определенное здесь и равное 1000 нс, оно будет приостановлено, и операция приостановки также имеет ограничение по времени ожидания. Это реализует ожидание тайм-аута.

4. Резюме

До сих пор были проанализированы две реализации общих блокировок AQS, Semaphore и CountDownLatch.Самое большое различие между ними и неразделяемыми состоит в том, могут ли несколько потоков получать блокировки одновременно. Я надеюсь, что после прочтения каждый сможет получить глубокое представление о Semaphore и CountDownLatch. Если вы не понимаете, подумайте о примере с вращающимся суши-рестораном и парком развлечений. Если это полезно для всех, если вы думаете, что Написано хорошо, можно поставить лайк.Конечно, я надеюсь, что Каждый может активно указывать на ошибки в тексте и выдвигать положительные предложения по улучшению.