Глубокое понимание механизма ожидания и пробуждения потоков Java (1)

Java задняя часть
Глубокое понимание механизма ожидания и пробуждения потоков Java (1)

Многопоточный параллелизм — очень важная часть языка Java, и в то же время это также сложность в его основе. Это важно, потому что многопоточность является часто используемым знанием в повседневной разработке, и это сложно, потому что существует так много знаний, связанных с многопоточным параллелизмом, и нелегко полностью понять знания, связанные с параллелизмом Java. По этой причине параллелизм в Java стал одним из самых частых вопросов на собеседованиях по Java. В этой серии статей систематизировано понимание параллелизма в Java с точки зрения модели памяти Java, ключевого слова volatile, ключевого слова synchronized, ReetrantLock, класса параллелизма Atomic и пула потоков. Изучив эту серию статей, вы получите глубокое понимание роли ключевого слова volatile, принципа реализации синхронизированных блокировок, блокировок очереди AQS и CLH, четкого понимания спин-блокировок, предвзятых блокировок, оптимистичных блокировок, пессимистичных блокировок. и т. д. Потрясающие знания в области параллелизма.

Серия статей о многопоточном параллелизме:

На этот раз тщательно изучите модель памяти Java и ключевое слово volatile.

На этот раз досконально изучите ключевое слово synchronized в Java.

На этот раз я досконально разобрался с принципом реализации ReentranLock в Java.

На этот раз полностью изучите атомарный класс Atomic в параллельных пакетах Java.

Глубокое понимание механизма ожидания и пробуждения потоков Java (1)

Глубокое понимание механизма ожидания и пробуждения потоков Java (2)

Конец серии статей о параллелизме Java: полное понимание того, как работает пул потоков Java.

Серия Java Concurrency: принцип ThreadLocal на самом деле очень прост

Эта статья является пятой в серии статей о параллелизме Java, в которой подробно анализируется механизм пробуждения и ожидания Java.

Все должны быть знакомы с ожиданием и пробуждением потоков, ведь это ключевой учебный контент при изучении основ Java. В первых двух статьях, анализирующих синхронизацию и ReentranLock, мы пропустили содержимое, связанное с ожиданием и пробуждением потока, главным образом потому, что нелегко глубоко понять механизм ожидания и пробуждения потока, поэтому эта статья будет написана отдельно. анализировать. Затем в этой статье мы подробно разделим ожидание и пробуждение потоков с точки зрения синхронизированного и ReentranLock.

Прежде чем мы начнем, позвольте мне порекомендоватьAndroidNoteЭто репозиторий GitHub, здесь мои учебные заметки, а также источник моего первого черновика статьи. В этом репозитории собраны обширные знания о Java и Android. Это относительно систематизированная и всеобъемлющая база знаний Android. Это также редкая книга для интервью для студентов, которые готовятся к собеседованию Добро пожаловать на домашнюю страницу репозитория GitHub.

1. См. ожидание потока и пробуждение от синхронизированных блокировок.

Когда вы впервые изучали Java, все, должно быть, использовали synchronized для реализации кода модели «производитель-потребитель», которая использовала несколько методов в Object, таких как wait(), notify(), notifyAll(). если вы были в то время.Несколько запутался, почему методы, связанные с ожиданием и пробуждением потока, определены в классе Object?

Какие? Вы уже забыли, что такое модель «производитель-потребитель»? Итак, давайте сначала рассмотрим модель «производитель-потребитель».

1. Модель «производитель-потребитель».

Модель «производитель-потребитель» является типичным примером совместной коммуникации потоков. В этой модели есть два типа ролей, а именно несколько потоков-производителей и несколько потоков-потребителей. Поток-производитель отвечает за отправку пользовательских запросов, а поток-потребитель отвечает за обработку запросов, отправленных производителем. Во многих случаях производители и потребители не могут достичь определенного баланса, то есть иногда производители производят слишком быстро, а потребление слишком поздно, а иногда потребители могут быть слишком сильны, а производители слишком поздно начинают производить. В этом случае буфер памяти, совместно используемый производителями и потребителями, необходим для балансировки взаимодействия между ними. Производители и потребители взаимодействуют через разделяемые буферы памяти, тем самым уравновешивая потоки производителей и потребителей и разделяя производителей и потребителей. Как показано ниже:

1C2478F7-48B7-4ACA-A575-ABF8B71F40B9.png

Когда в контейнере очереди нет продукта, потребитель должен находиться в состоянии ожидания, а когда контейнер заполнен, производитель должен находиться в состоянии ожидания. И каждый раз, когда потребитель потребляет товар, он уведомляет ожидающего производителя, что его можно произвести; когда товар производится, он также уведомляет ожидающего потребителя, что его можно потреблять.

2. Используйте synchronized для реализации модели «производитель-потребитель».

После понимания модели «производитель-потребитель» мы пытаемся использовать ключевое слово synchronized в сочетании с методами wait() и notifyAll() для реализации примера модели «производитель-потребитель».

Возьмем более классический пример производства хлеба. Во-первых, нам нужен класс контейнера для хлеба. В классе контейнера есть две операции: положить хлеб и вынуть хлеб. Код выглядит следующим образом:

public class BreadContainer {

    LinkedList<Bread> list = new LinkedList<>();
    // 容器容量
    private final static int CAPACITY = 10;
    /**
     * 放入面包
     */
    public synchronized void put(Bread bread) {
        while (list.size() == CAPACITY) {
            try {
                // 如果容器已满,则阻塞生产者线程
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        list.add(bread);
        // 面包生产成功后通知消费者线程
        notifyAll();
        System.out.println(Thread.currentThread().getName() + " product a bread" + bread.toString() + " size = " + list.size());
    }

    /**
     * 取出面包
     */
    public synchronized void take() {
        while (list.isEmpty()) {
            try {
                // 如果容器为空,则阻塞消费者线程
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        Bread bread = list.removeFirst();
        // 消费后通知生产者生产面包
        notifyAll();
        System.out.println("Consumer " + Thread.currentThread().getName() + " consume a bread" + bread.toString() + " size = " + list.size());
    }
}

Метод put в приведенном выше коде поместит произведенный хлеб в контейнер. Если контейнер заполнен, поток производителя должен быть заблокирован, чтобы остановить производство.Когда производитель успешно помещает хлеб в контейнер, он должен попытаться разбудить ожидающий поток потребителя для потребления.

Метод взятия — это операция извлечения хлеба. Когда контейнер пуст, поток-потребитель блокируется и может ждать. Если хлеб успешно потреблен, производитель уведомляется о начале производства.

Также обратите внимание, что оба метода используют ключевое слово synchronized, если вы посмотрите наНа этот раз досконально изучите ключевое слово synchronized в Java.В этой статье вы должны знать, что синхронизированный заблокированный объект — это объект-экземпляр, в котором расположены эти два метода, то есть объект BreadContainer, а методы wait() и notifyAll(), вызываемые в этих двух методах, также принадлежат объекту BreadContainer. объект. . Запомните этот отрывок, оставьте здесь флаг, мы разберем его позже.

Далее, реализация производителей и потребителей относительно проста, код выглядит следующим образом:

// 生产者
public class Producer implements Runnable {
    private final BreadContainer container;

    public Producer(BreadContainer container) {
        this.container = container;
    }

    @Override
    public void run() {
        // 生产者生产面包
        container.put(new Bread());
    }
}

// 消费者
public class Consumer implements Runnable {

    private final BreadContainer container;

    public Consumer(BreadContainer container) {
        this.container = container;
    }

    @Override
    public void run() {
        // 消费者消费面包
        container.take();
    }
}

Затем в тестовом коде одновременно откройте несколько потоков-производителей и несколько потоков-потребителей.

    public static void main(String[] args) {
        BreadContainer container = new BreadContainer();

        new Thread(() -> {
            for (int i = 0; i < 100; i++) {
                new Thread(new Producer(container)).start();
            }
        }).start();

        new Thread(() -> {
            for (int i = 0; i < 100; i++) {
                new Thread(new Consumer(container)).start();
            }
        }).start();

    }

В этот момент запускается основной метод, и потоки производителя и потребителя могут хорошо работать вместе.

Обратите внимание, что в основном методе мы создаем экземпляр объекта BreadContainer.Объект синхронизированной блокировки, упомянутый во флаге выше, является контейнером, а вызываемые методы wait и notifyAll также являются методами экземпляра контейнера. Я не знаю, есть ли у вас какие-либо сомнения, что именно делают объекты метода ожидания и уведомления контейнера, чтобы заблокировать и разбудить поток? Куда попадают заблокированные темы? Зачем вызывать методы wait и notifyAll в объекте-контейнере? Возможно ли перейти к вызову ожидания и уведомлению всех других объектов?

2. Основной принцип реализации wait() и notify

существуетНа этот раз досконально изучите ключевое слово synchronized в Java.Мы уже знаем, что после использования синхронизированного ключевого слова синхронизированный заблокированный объект будет связан с объектом монитора.Когда поток получает синхронизированную блокировку, счетчик в объекте монитора будет увеличиваться на 1, а идентификатор потока будет хранится в файле _ower монитора. В этот момент, если другие потоки попытаются получить блокировку, они будут помещены в_EntryListЗаблокирован в очереди.

Помните Флаг, который мы установили в предыдущем разделе? Синхронизированная блокировка — это объект-контейнер, а wait и notify также являются методами объекта-контейнера, поэтому рассмотрение проблем, которые мы оставили в предыдущем разделе, немного сбивает с толку. Поток также добавляется в очередь ожидания при вызове метода ожидания, а затем пробуждает поток из очереди ожидания при уведомлении или уведомлении? по этому вопросу вНа этот раз досконально изучите ключевое слово synchronized в Java.На самом деле эта статья уже интерпретирована, то есть поток, вызывающий метод ожидания, будет добавлен в_WaitSetcollection и приостановит поток. Однако здесь вновь подчеркивается_WaitSetа также_EntryListэти два набора._EntryListКоллекция хранит заблокированные потоки без захвата блокировки, а коллекция _WaitSet сохраняет потоки в состоянии ожидания после вызова метода ожидания. **

Чтобы доказать приведенный выше вывод, нам нужно посмотреть, что делают wait и notify/notifyAll.

Давайте взглянем на реализацию трех методов ожидания, уведомления и уведомления All в Object.

public class Object {

    public final native void notify();

    public final native void notifyAll();

    public final void wait() throws InterruptedException {
        wait(0L);
    } 
    public final native void wait(long timeoutMillis) throws InterruptedException;    

}

К сожалению, все эти методы являются собственными методами, что означает, что эти методы реализованы на C/C++ в виртуальной машине. В этом случае вы могли бы также взглянуть на код виртуальной машины, чтобы узнать, ведь доказательств нет.

1. Реализация ожидания виртуальной машиной

Продолжите код модели «производитель-потребитель» из предыдущего раздела для анализа.Когда поток-производитель кладет хлеб в контейнер и обнаруживает, что контейнер полон, он вызывает метод ожидания, после чего поток снимает блокировку и переходит в блокирующее состояние.

Реализация метода ожидания в Object находится вobjectMonitor.cppсерединаObjectMonitor::wait(jlong millis, bool interruptible, TRAPS)В этой функции основной код, связанный с ObjectMonitor::wait, выглядит следующим образом:

void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
    // ...省略其他代码
    
    // 当前线程
    Thread * const Self = THREAD ;
    // 将线程封装成ObjectWaiter
    ObjectWaiter node(Self);
    // 标记为Wait状态
    node.TState = ObjectWaiter::TS_WAIT ;
    Self->_ParkEvent->reset() ;

    Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
    // 调用 AddWaiter 方法将线程加入到等待队列中
    AddWaiter (&node) ;
    Thread::SpinRelease (&_WaitSetLock) ;
    
    // ...
    
    // 释放 monitor 锁,并将自己挂起
    exit (true, Self) ; 
}

Видно, что после вызова функции ожидания поток инкапсулируется в объект ObjectWaiter, и поток добавляется в очередь ожидания через функцию AddWaiter.Для начала рассмотрим код функции AddWaiter:

inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {
    // 如果 _WaitSet 还没有初始化,先初始化 _WaitSet
    if (_WaitSet == NULL) {
        // 初始化 _WaitSet 的头结点,此时只有一个node元素
        _WaitSet = node;
        // 可以看出ObjectWaiter是一个双向链表,这里将node的首尾相连,说明_WaitSet是一个循环链表
        node->_prev = node;
        node->_next = node;
    } else {
        // _WaitSet 的头结点
        ObjectWaiter* head = _WaitSet ;
        // 环形链表头结点的prev就是尾结点
        ObjectWaiter* tail = head->_prev;
        assert(tail->_next == head, "invariant check");
        // 将node插入到_WaitSet的尾结点中
        tail->_next = node;
        head->_prev = node;
        node->_next = head;
        node->_prev = tail;
    }
}

Реализация функции AddWaiter на самом деле относительно проста, она инициализирует_WaitSetсвязанный список и вставить узел в_WaitSetКонец очереди, это тоже видно из кода_WaitSetСвязный список — это круговой двусвязный список.

После завершения операции очереди вставки потока продолжайте вызывать функцию выхода, чтобы снять блокировку монитора и приостановить себя. Об этом методе он будет задействован позже, а исходный код будет виден позже.

2. Реализация уведомления виртуальной машиной

После того, как производитель заканчивает производство хлеба, вызывается notifyAll, чтобы разбудить поток-потребитель. Метод notifyAll разбудит все потоки, а метод notify разбудит только один поток. Здесь мы берем уведомление в качестве примераobjectMonitor.cppКак функция уведомления пробуждает поток.

void ObjectMonitor::notify(TRAPS) {
 
    int Policy = Knob_MoveNotifyee ;
    // DequeueWaiter是一个函数,会返回 _WaitSet 的头结点
    ObjectWaiter * iterator = DequeueWaiter() ;
    if (iterator != NULL) {
        // 将阻塞队列赋值给 List
        ObjectWaiter * List = _EntryList ;

        // 根据策略执行不同的逻辑,Policy默认值为2
        if (Policy == 0) {       // prepend to EntryList
            // ...
        } else if (Policy == 1) {      // append to EntryList
            // ...
        } else if (Policy == 2) {      // prepend to cxq
            // prepend to cxq
            if (List == NULL) {
                // iterator 的前驱与后继节点置空
                iterator->_next = iterator->_prev = NULL ;
                // _EntryList指向这个节点,说明节点已被加入阻塞队列,等待获取锁
                _EntryList = iterator ;
            } else {
                iterator->TState = ObjectWaiter::TS_CXQ ;
                for (;;) { // 通过CAS将iterator插入到 _cxq 队列
                    ObjectWaiter * Front = _cxq ;
                    iterator->_next = Front ;
                    if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
                        break ;
                    }
                }
            }
        } else if (Policy == 3) {      // append to cxq
            // ...
        }
    }
    // ...
}

В функции уведомления сначала вызывается функция DequeueWaiter, а функция функции DequeueWaiter состоит в том, чтобы вывести_WaitSetГоловной узел связанного списка, код выглядит следующим образом:

inline ObjectWaiter* ObjectMonitor::DequeueWaiter() {
    // dequeue the very first waiter
    ObjectWaiter* waiter = _WaitSet;
    if (waiter) {
        DequeueSpecificWaiter(waiter);
    }
    return waiter;
}
// 将头结点 从队列中断开
inline void ObjectMonitor::DequeueSpecificWaiter(ObjectWaiter* node) {
  // ...
  
  ObjectWaiter* next = node->_next;
  if (next == node) {
    // 此时,队列中只有一个元素,因此取出后,队列就是NULL了
    _WaitSet = NULL;
  } else {
    ObjectWaiter* prev = node->_prev;
    // 这一操作就是将 node 从队列移除,并重新连接队列
    next->_prev = prev;
    prev->_next = next;
    if (_WaitSet == node) {
      _WaitSet = next;
    }
  }
  // 将 node 的前驱节点与后继节点置空
  node->_next = NULL;
  node->_prev = NULL;
}

Видно, что в функции DequeueWaiter вызывается функция DequeueSpecificWaiter, в которой, если очередь имеет только один узел,_WaitSetПустой, то есть после извлечения головного узла в очереди нет элементов. Если есть несколько узлов, головной узел будет удален из очереди и повторно сплайсирован._WaitSetочередь. Тогда узел-предшественник и узел-преемник извлеченного узла пусты.

Следующий код функции уведомления оценивает, что если итератор не равен NULL, это означает, что поток находится в состоянии ожидания, и ожидающий поток необходимо перевести в очередь блокирующего потока. Далее выполняется другая логика в соответствии с Политикой.Значение Политики по умолчанию равно 2, поэтому здесь рассматривается только ситуация по умолчанию. То есть, когда Политика равна 2, то_EntryListПрисваивается списку, если список равен NULL, это означает, что в данный момент нет ни одного потока в состоянии блокировки. тогда будет_EntryList указывает на итератор. Он отмечает, что ожидающий поток перешел в состояние блокировки и может получить блокировку, но в это время поток не был пробужден. Если List равен NULL, то поток в состоянии ожидания перемещается в CAS через CAS._cxqочередь,_cxqОчередь - это просто временная очередь, и в конечном итоге она будет перемещена в функцию выхода позже._EntryListсередина. Здесь мы должны обратить внимание на различиесостояние блокировкиа такжесостояние ожидания,так же какочередь ожиданияа такжеочередь блокировки.

3. Существующая функция виртуальной машины

Видно, что функция уведомления только передает очередь потоку, а фактически не пробуждается. Операция фактического пробуждения потока реализована в уже упомянутом в разделе 1 этой главы. Просто время вызова существующей функции в это время наступает после того, как виртуальная машина читает инструкцию monitorexist. Взгляните на упрощенный код функции выхода:

void ObjectMonitor::exit(bool not_suspended, TRAPS) {
  // ...
  // 这里是一个死循环
  for (;;) {

        ObjectWaiter * w = NULL ;
        // QMode默认值为0
        int QMode = Knob_QMode ;
        
        if (QMode == 2 && _cxq != NULL) {
            // ... 这里从_cxq队列取头结点并唤醒,无关省略。
            return ;
        }

        // ...
        
        w = _EntryList  ;
        // 先查看_EntryList是否为空
        if (w != NULL) {
            // _EntryList不为空,通过ExitEpilog函数唤醒_EntryList队列的头结点
            ExitEpilog (Self, w) ;
            return ;
        }
        // 到这里说明_EntryList为空,将则将 w 指向 _cxq
        w = _cxq ;
        // _cxq 是 NULL 说明没有等待状态的线程需要唤醒,则继续执行循环
        if (w == NULL) continue ;

        // ...

        // 走到这说明有处于等待状态,需要唤醒的线程
    
        if (QMode == 1) {
            // ...
        } else {
            // 如果走到此处说明_cxq队列不为空
            // QMode == 0 or QMode == 2
            // 此时_EntryList队列是空,将_EntryList指向_cxq队列
            _EntryList = w ;
            ObjectWaiter * q = NULL ;
            ObjectWaiter * p ;
            // 将单向链表变成双向环链表
            for (p = w ; p != NULL ; p = p->_next) {
                guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
                p->TState = ObjectWaiter::TS_ENTER ;
                p->_prev = q ;
                q = p ;
            }
        }

        if (_succ != NULL) continue;

        w = _EntryList  ;
        if (w != NULL) {
            guarantee (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
            // 唤醒_EntryList的头结点
            ExitEpilog (Self, w) ;
            return ;
        }
    }
    
// 释放锁并唤醒线程    
void ObjectMonitor::ExitEpilog (Thread * Self, ObjectWaiter * Wakee) {
    assert (_owner == Self, "invariant") ;

    // Exit protocol:
    // 1. ST _succ = wakee
    // 2. membar #loadstore|#storestore;
    // 2. ST _owner = NULL
    // 3. unpark(wakee)

    _succ = Knob_SuccEnabled ? Wakee->_thread : NULL ;
    ParkEvent * Trigger = Wakee->_event ;

    Wakee  = NULL ;

    // 释放锁
    OrderAccess::release_store_ptr (&_owner, NULL) ;
    OrderAccess::fence() ;                               // ST _owner vs LD in unpark()
    // 唤醒线程
    Trigger->unpark() ;

    //...
}    

Код существующей функции сложен, а здесь он упрощен, так как значение QMode по умолчанию равно 0, обсуждается только этот случай.

  • Во-первых, если_EntryListне равно NULL, затем напрямую вызовите функцию ExitEpilog из_EntryListВыньте головной узел и разбудите поток;

  • если_EntryListравно NULL, но_cxqОчередь не равна NULL, что указывает на то, что поток в состоянии ожидания был уведомлен, но на самом деле не был пробужден._cxqПереместите все элементы в очередь в_EntryListПоставьте в очередь и преобразуйте его в двусвязный список. Затем проснитесь через ExitEpilog_EntryListголовной узел.

3. Резюме

Эта статья начинается с простой модели «производитель-потребитель», распознает методы ожидания и уведомления/уведомления All в Object и глубоко анализирует реализацию этих двух методов в нижней части виртуальной машины. Синхронизированное ключевое слово в коде Java компилируется компилятором в инструкцию monitorenter/monitorexist байт-кода.Когда виртуальная машина выполняет соответствующую инструкцию, она вызывает соответствующие функции в нижней части виртуальной машины, чтобы получить и снять блокировку . Поскольку объект блокировки Object связан с объектом монитора, вы можете вызвать методы ожидания и notify/notifyAll в объекте Object, чтобы заблокировать и разбудить поток. Эти два метода также вызывают связанные функции в нижней части виртуальной машины.Функция ожидания инкапсулирует поток как объект ожидания и вставляет его в очередь ожидания, а функция notify/notifyAll извлекает поток из очереди ожидания и передает его. к_EntryListочередь или пересесть на_cxqПоставьте в очередь, подождите, пока поток, удерживающий блокировку, завершит выполнение, прочитает инструкцию monitorexist и вызовет существующую функцию виртуальной машины, чтобы снять блокировку и проснуться._EntryListочередь или_cxqПотоки в очереди.

Этот механизм ожидания и пробуждения синхронизированных блокировок, очевидно, имеет недостаток. По-прежнему взяв в качестве примера модель «производитель-потребитель», поскольку и поток-производитель, и поток-потребитель будут добавлены в одну и ту же очередь WaitSet, метод notifyAll не может точно контролировать, какой тип потока пробуждается. пока вНа этот раз я досконально разобрался с принципом реализации ReentranLock в Java.В этой статье мы узнали о ReentranLock, который похож на синхронизированный и имеет аналогичный механизм ожидания и пробуждения, а также может точно контролировать пробуждение указанных потоков. Итак, как реализован ReentranLock? Мы обсудим это в следующий раз.

Справочник и рекомендуемая литература

Поговорим о блокировке (3): cxq, EntryList и WaitSet