Подробно объясните механизм ожидания/уведомления ожидания/уведомления в Condition.

Java исходный код
Подробно объясните механизм ожидания/уведомления ожидания/уведомления в Condition.

Оригинальная статья, краткое изложение опыта и жизненные перипетии на всем пути от набора в школу до фабрики А

Нажмите, чтобы узнать подробностиwww.codercc.com

1. Введение в условия

Любой Java-объект естественным образом наследуется от класса Object.Связь между потоками часто применяется к нескольким методам Object, таким как wait(), wait(long timeout), wait(long timeout, int nanos) и notify(), notifyAll( ) Несколько методов реализуют механизм ожидания/уведомления, и, аналогичным образом, все еще будут те же методы для реализации механизма ожидания/уведомления в системе java Lock. В целомОжидание объекта и уведомление/уведомление взаимодействуют с монитором объекта для завершения механизма ожидания/уведомления между потоками, а условие взаимодействует с блокировкой для завершения механизма уведомления об ожидании.Первый находится на нижнем уровне java, а второй - на языке уровень и имеет более высокую управляемость и расширяемость. Помимо разницы в способах использования, эти дваФункцииОтличий еще много:

  1. Условие может поддерживать отсутствие ответа на прерывания, но не с помощью объекта;
  2. Условие может поддерживать несколько очередей ожидания (новые несколько объектов условия), в то время как режим объекта может поддерживать только одну;
  3. Условие может поддерживать настройку тайм-аута, а Object — нет.

Ссылаясь на методы wait и notify/notifyAll объекта, Condition также предоставляет тот же метод:

метод ожидания для объекта

  1. void await() выдает InterruptedException: Текущий поток переходит в состояние ожидания.Если другие потоки вызывают метод signal или signalAll условия, а текущий поток получает блокировку и возвращается из метода ожидания, если он прерывается в состоянии ожидания, прерывается будет выброшено исключение;
  2. long awaitNanos(long nanosTimeout): текущий поток переходит в состояние ожидания, пока не будет уведомлен, прерван илитайм-аут;
  3. boolean await(long time, TimeUnit unit) выдает InterruptedException: то же самое, что и второе, поддерживает пользовательские единицы времени
  4. логическое значение awaitUntil(дата крайнего срока) выдает InterruptedException: текущий поток переходит в состояние ожидания до тех пор, пока не будет уведомлен, прерван илив определенное время

метод notify/notifyAll для объекта

  1. void signal(): пробуждает поток, ожидающий выполнения условия, удаляет поток изочередь ожиданиятрансфер вочередь синхронизации, если вы можете конкурировать за Lock в очереди синхронизации, вы можете вернуться из метода ожидания.
  2. void signalAll(): отличие от 1 в том, что он может разбудить все потоки, ожидающие выполнения условия

2. Анализ реализации принципа состояния

2.1 Очередь ожидания

Если вы хотите глубоко освоить условие, вы должны знать принцип его реализации.Теперь давайте взглянем на исходный код условия. Создание объекта условия выполняетсяlock.newCondition(), и этот метод фактически создаст новыйConditionObjectобъект, класс AQS (Статья о принципе реализации AQS) внутреннего класса, вы можете посмотреть, если вам интересно. Как мы говорили ранее, условие должно использоваться вместе с блокировкой, то есть условие и блокировка связаны друг с другом, а принцип реализации блокировки зависит от AQS Естественно, ConditionObject является внутренним классом AQS. Мы знаем, что при реализации механизма блокировки AQS поддерживает внутреннюю очередь синхронизации.Если это монопольная блокировка, хвосты всех потоков, которым не удалось получить блокировку, вставляются вочередь синхронизации, точно так же используется внутри условия, аочередь ожидания, все потоки, вызывающие метод condition.await, будут добавлены в очередь ожидания, а состояние потока будет преобразовано в состояние ожидания. Также обратите внимание, что в ConditionObject есть две переменные-члены:

/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

Итак, мы видим, что ConditionObject управляет очередью ожидания, удерживая указатели начала и конца очереди ожидания. Главное, что следует отметить, это то, что класс Node повторно использует класс Node в AQS, и его статус узла и связанные свойства можно просмотреть.Статья о принципе реализации AQS, если внимательно прочитать эту статью, то легко понять условие, и реализация системы блокировки также будет иметь качественное улучшение. У класса Node есть такое свойство:

//后继节点
Node nextWaiter;

Дальнейшее объяснение,Очередь ожидания является односторонней очередью, и когда я раньше говорил AQS, я знал, что очередь синхронизации — это двусторонняя очередь. Затем мы используем демонстрацию и выполняем отладку, чтобы увидеть, соответствует ли она нашей гипотезе:

public static void main(String[] args) {
    for (int i = 0; i < 10; i++) {
        Thread thread = new Thread(() -> {
            lock.lock();
            try {
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
        });
        thread.start();
    }
}

Этот код не имеет никакого смысла, он даже воняет, просто чтобы проиллюстрировать то, что мы только что подумали. Создано 10 новых потоков, ни один поток не получает блокировку первым, а затем вызывает метод condition.await, чтобы снять блокировку, добавить текущий поток в очередь ожидания и проверить, когда будет достигнут 10-й поток, с помощью управления отладкой.firstWaiterТо есть ожидание головного узла в очереди.Схема сцены в режиме отладки выглядит следующим образом:

debug模式下情景图

Из этого рисунка хорошо видны следующие моменты: 1. После вызова метода condition.await потоки вставляются в очередь ожидания по очереди, как показано на рисунке, ссылки на потоки в очереди — Thread-0, Поток-1, Поток-2....Поток-8 2. Очередь ожидания является односторонней очередью. Благодаря нашему предположению и экспериментальной проверке мы можем нарисовать схематическую диаграмму очереди ожидания, как показано ниже:

等待队列的示意图

Еще одна вещь, которую следует отметить, это то, что мы можем вызывать метод lock.newCondition() несколько раз для создания нескольких объектов условий, то есть блокировка может содержать несколько ожидающих очередей. В прошлом способ использования Object на самом деле относился кНа мониторе объекта объекта может быть только одна очередь синхронизации и одна очередь ожидания, в то время как блокировка в параллельном пакете имеет одну очередь синхронизации и несколько очередей ожидания.. Схематическая диаграмма выглядит следующим образом:

AQS持有多个Condition.png

Как показано на рисунке, ConditionObject — это внутренний класс AQS, поэтому каждый ConditionObject может получить доступ к методам, предоставляемым AQS, что эквивалентно тому, что каждое условие имеет ссылку на собственный синхронизатор.

2.2 Принцип реализации ожидания

Когда вызывается метод condition.await(), поток, который в данный момент получает блокировку, попадает в очередь ожидания.Если поток может вернуться из метода await(), он должен получить блокировку, связанную с условием.. Далее мы по-прежнему смотрим на это с точки зрения исходного кода, только когда мы знакомы с логикой исходного кода, наше понимание становится наиболее глубоким. Исходный код метода await():

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
	// 1. 将当前线程包装成Node,尾插入到等待队列中
    Node node = addConditionWaiter();
	// 2. 释放当前线程所占用的lock,在释放的过程中会唤醒同步队列中的下一个节点
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
		// 3. 当前线程进入到等待状态
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
	// 4. 自旋等待获取到同步状态(即获取到lock)
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
	// 5. 处理被中断的情况
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

Основная логика кодасмотрите примечание,мы все знаемКогда текущий поток вызывает метод condition.await(), он заставит текущий поток снять блокировку, а затем присоединиться к очереди ожидания. он получает возврат из метода await только после блокировки, или он будет прерван, когда он прерван во время ожидания.. Тогда у нас возникнут следующие вопросы по этому процессу реализации: 1. Как добавить текущий поток в очередь ожидания? 2. Процесс снятия блокировки? 3. Как я могу выйти из метода ожидания? И логика этого кода состоит в том, чтобы сообщить нам ответы на эти три вопроса. конкретныйсмотрите примечание, на шаге 1 вызовите addConditionWaiter, чтобы добавить текущий поток в очередь ожидания.Исходный код этого метода:

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
	//将当前线程包装成Node
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
		//尾插入
        t.nextWaiter = node;
	//更新lastWaiter
    lastWaiter = node;
    return node;
}

Этот код легко понять, оберните текущий узел в узел, если firstWaiter очереди ожидания имеет значение null (очередь ожидания является пустой очередью), то укажите firstWaiter на текущий узел, в противном случае обновите lastWaiter (хвостовой узел ). этоВставьте узел, инкапсулированный текущим потоком, в очередь ожидания путем вставки хвоста, и видно, что очередь ожидания представляет собойСцепленная очередь без головного узла, прежде чем мы узнали о синхронных очередях, когда узнали об AQS.представляет собой связанную очередь с головным узлом, что является разницей между ними. После вставки текущего узла в очередь ожидания текущий поток снимет блокировку, которая реализуется методом fullRelease.Исходный код fullRelease:

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        if (release(savedState)) {
			//成功释放同步状态
            failed = false;
            return savedState;
        } else {
			//不成功释放同步状态抛出异常
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

Этот код легко понять,Вызовите метод освобождения метода шаблона AQS, чтобы отменить состояние синхронизации AQS и разбудить поток, на который ссылается узел-преемник головного узла в очереди синхронизации., возвращается в обычном режиме, если выпуск выполнен успешно, и выдает исключение в случае сбоя. Пока что эти два фрагмента кода решили ответы на два предыдущих вопроса, оставив третий вопрос, как выйти из метода await? Теперь оглянитесь назад и убедитесь, что метод await имеет следующую логику:

while (!isOnSyncQueue(node)) {
	// 3. 当前线程进入到等待状态
    LockSupport.park(this);
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;
}

Очевидно, что когда поток вызывает метод condition.await() в первый раз, он войдет в цикл while(), а затем текущий поток перейдет в состояние ожидания через метод LockSupport.park(this), тогда, если вы хотите выйти из метода await Первое предварительное условие, естественно, состоит в том, чтобы сначала выйти из цикла while, оставив только два места для выхода:1. Логика прерывается и происходит выход из цикла while 2. Логика цикла while неверна. Глядя на код, первое условие состоит в том, что код перейдет в режим останова и завершится после того, как ожидающий в данный момент поток будет прерван.Второе условие заключается в том, что текущий узел перемещается в очередь синхронизации (то есть метод signal или signalAll метода условие, вызванное другим потоком)), цикл while заканчивается после того, как логика в то время как ложь. Подводя итог, этоПосле прерывания текущего потока или вызова метода condition.signal/condition.signalAll текущий узел перемещается в очередь синхронизации, что является предварительным условием выхода текущего потока из метода await. Вызывается после выхода из цикла whileacquireQueued(node, savedState), этот метод упоминался при представлении базовой реализации AQS, если вам интересно, вы можете перейти ксм. эту статью, эффект от этого метода вВо время процесса вращения поток продолжает пытаться получить состояние синхронизации, пока не добьется успеха (поток не получит блокировку).. Это также показываетВыходом из метода await должна быть блокировка, получившая ссылку на условие (ассоциацию).. Пока что мы полностью нашли ответы на первые три вопроса, прочитав исходный код, и углубили наше понимание метода await. Схематическая диаграмма метода await выглядит следующим образом:

await方法示意图

Как показано на рисунке, поток, вызывающий метод condition.await, должен получить блокировку, то есть текущий поток является головным узлом в очереди синхронизации. После вызова этого метода хвост узла, инкапсулированный текущим потоком, будет вставлен в очередь ожидания.

Поддержка механизма тайм-аута

Условие также поддерживает механизм тайм-аута, и пользователь может вызывать методы awaitNanos и awaitUtil. Принципы реализации этих двух методов в основном такие же, как у метода tryAcquire в AQS.Вы можете внимательно прочитать о tryAcquire.Раздел 3.4 этого поста.

Поддержка отсутствия реакции на прерывания

Если вы хотите не реагировать на прерывания, вы можете вызвать метод condition.awaitUninterruptably() Исходный код этого метода:

public final void awaitUninterruptibly() {
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean interrupted = false;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if (Thread.interrupted())
            interrupted = true;
    }
    if (acquireQueued(node, savedState) || interrupted)
        selfInterrupt();
}

Этот метод в основном аналогичен описанному выше методу await, но он сокращает обработку прерываний и пропускает исключение прерванного исключения, выдаваемого методом reportInterruptAfterWait.

2.3 Принцип реализации signal/signalAll

Вызовите метод signal или signalAll условия, чтобы переместить узел с наибольшим временем ожидания в очереди ожидания в очередь синхронизации., чтобы у узла была возможность получить блокировку. В соответствии с очередью ожидания первый поступил – первым обслужен (FIFO), поэтому головной узел очереди ожидания должен быть узлом с наибольшим временем ожидания, то есть каждый раз, когда вызывается сигнальный метод условия, головной узел перемещен в очередь на синхронизацию. Давайте посмотрим, верна ли эта догадка, взглянув на исходный код.Исходный код метода сигнала:

public final void signal() {
    //1. 先检测当前线程是否已经获取lock
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //2. 获取等待队列中第一个节点,之后的操作都是针对这个节点
	Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

Метод signal сначала обнаружит, получил ли текущий поток блокировку. Если блокировка не получена, будет выдано исключение напрямую. Если она получена, будет получен узел, на который ссылается головной указатель очереди ожидания. Метод последующих операций doSignal также основан на этом узле. Давайте посмотрим, что делает метод doSignal Исходный код метода doSignal:

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
		//1. 将头结点从等待队列中移除
        first.nextWaiter = null;
		//2. while中transferForSignal方法对头结点做真正的处理
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

Конкретную логику смотрите в комментариях.Настоящая логика обработки головного узла находится вtransferForSignalПоложите, исходный код этого метода:

final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
	//1. 更新状态为0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
	//2.将该节点移入到同步队列中去
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

Ключевую логику см. в комментариях. Этот код в основном делает две вещи: 1. Изменяет статус головного узла на СОСТОЯНИЕ 2. Вызывает метод enq, чтобы вставить хвост узла в очередь синхронизации.Для метода enq , см. AQS Базовая реализация этой статьи. Теперь мы можем сделать вывод:Предпосылкой для вызова сигнала условия является то, что текущий поток получил блокировку.Этот метод переместит головной узел в очереди ожидания, то есть узел с наибольшим временем ожидания, в очередь синхронизации, и только после перемещения в очередь синхронизации есть возможность заставить ожидающий поток проснуться, то есть вернуться из метода LockSupport.park(this) в метод await, чтобы был шанс сделать поток, вызывающий метод await, успешным выходом. Принципиальная схема исполнения сигнала выглядит следующим образом:

signal执行示意图

signalAll

Разница между методами sigllAll и sigal отражена в методе doSignalAll, который нам уже известен.Метод oSignal будет работать только с головным узлом очереди ожидания,, а исходный код doSignalAll:

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

Этот метод просто ожидает, пока каждый узел в очереди будет перемещен в очередь синхронизации, то есть «уведомляет» каждый поток, вызывающий в данный момент метод condition.await().

3. Комбинация await и signal/signalAll

В начале статьи упоминается механизм ожидания/уведомления.Этот механизм может быть реализован с помощью методов await и signal/signalAll, предоставляемых условием.Самая классическая проблема, которую может решить этот механизм, это «Проблема производителя и потребителя». . Вопросы потребителей» будет объяснено в отдельной статье позже, которая также является высокочастотным тестовым сайтом для интервью. Методы await и signal и signalAll подобны переключателю, который управляет потоком A (сторона ожидания) и потоком B (сторона уведомления). Взаимосвязь между ними может быть более точно представлена ​​следующей диаграммой:

condition下的等待通知机制.png

Как показано на рисунке,Поток awaitThread сначала получает блокировку с помощью метода lock.lock(), а затем вызывает метод condition.await для входа в очередь ожидания, в то время как другой поток signalThread успешно получает блокировку с помощью метода lock.lock() и затем вызывает условие .signal или signalAll, благодаря чему поток awaitThread может иметь возможность перейти в очередь синхронизации.Когда другие потоки освобождают блокировку, поток awaitThread может иметь возможность получить блокировку, чтобы поток awaitThread мог выйти из очереди ожидания. способ выполнения последующих операций. Если awaitThread не сможет получить блокировку, он напрямую попадет в очередь синхронизации..

3. Пример

Давайте поговорим об использовании условия на очень простом примере:

public class AwaitSignal {
    private static ReentrantLock lock = new ReentrantLock();
    private static Condition condition = lock.newCondition();
    private static volatile boolean flag = false;

    public static void main(String[] args) {
        Thread waiter = new Thread(new waiter());
        waiter.start();
        Thread signaler = new Thread(new signaler());
        signaler.start();
    }

    static class waiter implements Runnable {

        @Override
        public void run() {
            lock.lock();
            try {
                while (!flag) {
                    System.out.println(Thread.currentThread().getName() + "当前条件不满足等待");
                    try {
                        condition.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println(Thread.currentThread().getName() + "接收到通知条件满足");
            } finally {
                lock.unlock();
            }
        }
    }

    static class signaler implements Runnable {

        @Override
        public void run() {
            lock.lock();
            try {
                flag = true;
                condition.signalAll();
            } finally {
                lock.unlock();
            }
        }
    }
}

Результат:

Thread-0当前条件不满足等待
Thread-0接收到通知,条件满足

Открываются два потока, ожидающий и сигнализатор. Когда поток ожидания начинает выполняться, поскольку условие не выполняется, выполняется метод condition.await, чтобы заставить поток войти в состояние ожидания и снять блокировку. После того, как поток сигнализатора получает блокировка, условие изменяется и все ожидающие потоки уведомляются, а затем снимается блокировка. В это время ожидающий поток получает блокировку, и, поскольку сигнальный поток изменяет условие, условие удовлетворяется относительно ожидающего, и выполнение продолжается.

использованная литература

Искусство параллельного программирования на Java