Многопоточный параллелизм — очень важная часть языка Java, и в то же время это также сложность в его основе. Это важно, потому что многопоточность является часто используемым знанием в повседневной разработке, и это сложно, потому что существует так много знаний, связанных с многопоточным параллелизмом, и нелегко полностью понять знания, связанные с параллелизмом Java. По этой причине параллелизм в Java стал одним из самых частых вопросов на собеседованиях по Java. В этой серии статей систематизировано понимание параллелизма в Java с точки зрения модели памяти Java, ключевого слова volatile, ключевого слова synchronized, ReetrantLock, класса параллелизма Atomic и пула потоков. Изучив эту серию статей, вы получите глубокое понимание роли ключевого слова volatile, принципа реализации синхронизированных блокировок, блокировок очереди AQS и CLH, четкого понимания спин-блокировок, предвзятых блокировок, оптимистичных блокировок, пессимистичных блокировок. и т. д. Потрясающие знания в области параллелизма.
Серия статей о многопоточном параллелизме:
На этот раз тщательно изучите модель памяти Java и ключевое слово volatile.
На этот раз досконально изучите ключевое слово synchronized в Java.
На этот раз я досконально разобрался с принципом реализации ReentranLock в Java.
На этот раз полностью изучите атомарный класс Atomic в параллельных пакетах Java.
Глубокое понимание механизма ожидания и пробуждения потоков Java (1)
Глубокое понимание механизма ожидания и пробуждения потоков Java (2)
Конец серии статей о параллелизме Java: полное понимание того, как работает пул потоков Java.
Серия Java Concurrency: принцип ThreadLocal на самом деле очень прост
Эта статья является пятой в серии статей о параллелизме Java, в которой подробно анализируется механизм пробуждения и ожидания Java.
Все должны быть знакомы с ожиданием и пробуждением потоков, ведь это ключевой учебный контент при изучении основ Java. В первых двух статьях, анализирующих синхронизацию и ReentranLock, мы пропустили содержимое, связанное с ожиданием и пробуждением потока, главным образом потому, что нелегко глубоко понять механизм ожидания и пробуждения потока, поэтому эта статья будет написана отдельно. анализировать. Затем в этой статье мы подробно разделим ожидание и пробуждение потоков с точки зрения синхронизированного и ReentranLock.
Прежде чем мы начнем, позвольте мне порекомендоватьAndroidNoteЭто репозиторий GitHub, здесь мои учебные заметки, а также источник моего первого черновика статьи. В этом репозитории собраны обширные знания о Java и Android. Это относительно систематизированная и всеобъемлющая база знаний Android. Это также редкая книга для интервью для студентов, которые готовятся к собеседованию Добро пожаловать на домашнюю страницу репозитория GitHub.
1. См. ожидание потока и пробуждение от синхронизированных блокировок.
Когда вы впервые изучали Java, все, должно быть, использовали synchronized для реализации кода модели «производитель-потребитель», которая использовала несколько методов в Object, таких как wait(), notify(), notifyAll(). если вы были в то время.Несколько запутался, почему методы, связанные с ожиданием и пробуждением потока, определены в классе Object?
Какие? Вы уже забыли, что такое модель «производитель-потребитель»? Итак, давайте сначала рассмотрим модель «производитель-потребитель».
1. Модель «производитель-потребитель».
Модель «производитель-потребитель» является типичным примером совместной коммуникации потоков. В этой модели есть два типа ролей, а именно несколько потоков-производителей и несколько потоков-потребителей. Поток-производитель отвечает за отправку пользовательских запросов, а поток-потребитель отвечает за обработку запросов, отправленных производителем. Во многих случаях производители и потребители не могут достичь определенного баланса, то есть иногда производители производят слишком быстро, а потребление слишком поздно, а иногда потребители могут быть слишком сильны, а производители слишком поздно начинают производить. В этом случае буфер памяти, совместно используемый производителями и потребителями, необходим для балансировки взаимодействия между ними. Производители и потребители взаимодействуют через разделяемые буферы памяти, тем самым уравновешивая потоки производителей и потребителей и разделяя производителей и потребителей. Как показано ниже:
Когда в контейнере очереди нет продукта, потребитель должен находиться в состоянии ожидания, а когда контейнер заполнен, производитель должен находиться в состоянии ожидания. И каждый раз, когда потребитель потребляет товар, он уведомляет ожидающего производителя, что его можно произвести; когда товар производится, он также уведомляет ожидающего потребителя, что его можно потреблять.
2. Используйте synchronized для реализации модели «производитель-потребитель».
После понимания модели «производитель-потребитель» мы пытаемся использовать ключевое слово synchronized в сочетании с методами wait() и notifyAll() для реализации примера модели «производитель-потребитель».
Возьмем более классический пример производства хлеба. Во-первых, нам нужен класс контейнера для хлеба. В классе контейнера есть две операции: положить хлеб и вынуть хлеб. Код выглядит следующим образом:
public class BreadContainer {
LinkedList<Bread> list = new LinkedList<>();
// 容器容量
private final static int CAPACITY = 10;
/**
* 放入面包
*/
public synchronized void put(Bread bread) {
while (list.size() == CAPACITY) {
try {
// 如果容器已满,则阻塞生产者线程
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.add(bread);
// 面包生产成功后通知消费者线程
notifyAll();
System.out.println(Thread.currentThread().getName() + " product a bread" + bread.toString() + " size = " + list.size());
}
/**
* 取出面包
*/
public synchronized void take() {
while (list.isEmpty()) {
try {
// 如果容器为空,则阻塞消费者线程
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Bread bread = list.removeFirst();
// 消费后通知生产者生产面包
notifyAll();
System.out.println("Consumer " + Thread.currentThread().getName() + " consume a bread" + bread.toString() + " size = " + list.size());
}
}
Метод put в приведенном выше коде поместит произведенный хлеб в контейнер. Если контейнер заполнен, поток производителя должен быть заблокирован, чтобы остановить производство.Когда производитель успешно помещает хлеб в контейнер, он должен попытаться разбудить ожидающий поток потребителя для потребления.
Метод взятия — это операция извлечения хлеба. Когда контейнер пуст, поток-потребитель блокируется и может ждать. Если хлеб успешно потреблен, производитель уведомляется о начале производства.
Также обратите внимание, что оба метода используют ключевое слово synchronized, если вы посмотрите наНа этот раз досконально изучите ключевое слово synchronized в Java.В этой статье вы должны знать, что синхронизированный заблокированный объект — это объект-экземпляр, в котором расположены эти два метода, то есть объект BreadContainer, а методы wait() и notifyAll(), вызываемые в этих двух методах, также принадлежат объекту BreadContainer. объект. . Запомните этот отрывок, оставьте здесь флаг, мы разберем его позже.
Далее, реализация производителей и потребителей относительно проста, код выглядит следующим образом:
// 生产者
public class Producer implements Runnable {
private final BreadContainer container;
public Producer(BreadContainer container) {
this.container = container;
}
@Override
public void run() {
// 生产者生产面包
container.put(new Bread());
}
}
// 消费者
public class Consumer implements Runnable {
private final BreadContainer container;
public Consumer(BreadContainer container) {
this.container = container;
}
@Override
public void run() {
// 消费者消费面包
container.take();
}
}
Затем в тестовом коде одновременно откройте несколько потоков-производителей и несколько потоков-потребителей.
public static void main(String[] args) {
BreadContainer container = new BreadContainer();
new Thread(() -> {
for (int i = 0; i < 100; i++) {
new Thread(new Producer(container)).start();
}
}).start();
new Thread(() -> {
for (int i = 0; i < 100; i++) {
new Thread(new Consumer(container)).start();
}
}).start();
}
В этот момент запускается основной метод, и потоки производителя и потребителя могут хорошо работать вместе.
Обратите внимание, что в основном методе мы создаем экземпляр объекта BreadContainer.Объект синхронизированной блокировки, упомянутый во флаге выше, является контейнером, а вызываемые методы wait и notifyAll также являются методами экземпляра контейнера. Я не знаю, есть ли у вас какие-либо сомнения, что именно делают объекты метода ожидания и уведомления контейнера, чтобы заблокировать и разбудить поток? Куда попадают заблокированные темы? Зачем вызывать методы wait и notifyAll в объекте-контейнере? Возможно ли перейти к вызову ожидания и уведомлению всех других объектов?
2. Основной принцип реализации wait() и notify
существуетНа этот раз досконально изучите ключевое слово synchronized в Java.Мы уже знаем, что после использования синхронизированного ключевого слова синхронизированный заблокированный объект будет связан с объектом монитора.Когда поток получает синхронизированную блокировку, счетчик в объекте монитора будет увеличиваться на 1, а идентификатор потока будет хранится в файле _ower монитора. В этот момент, если другие потоки попытаются получить блокировку, они будут помещены в_EntryList
Заблокирован в очереди.
Помните Флаг, который мы установили в предыдущем разделе? Синхронизированная блокировка — это объект-контейнер, а wait и notify также являются методами объекта-контейнера, поэтому рассмотрение проблем, которые мы оставили в предыдущем разделе, немного сбивает с толку. Поток также добавляется в очередь ожидания при вызове метода ожидания, а затем пробуждает поток из очереди ожидания при уведомлении или уведомлении? по этому вопросу вНа этот раз досконально изучите ключевое слово synchronized в Java.На самом деле эта статья уже интерпретирована, то есть поток, вызывающий метод ожидания, будет добавлен в_WaitSet
collection и приостановит поток. Однако здесь вновь подчеркивается_WaitSet
а также_EntryList
эти два набора._EntryList
Коллекция хранит заблокированные потоки без захвата блокировки, а коллекция _WaitSet сохраняет потоки в состоянии ожидания после вызова метода ожидания. **
Чтобы доказать приведенный выше вывод, нам нужно посмотреть, что делают wait и notify/notifyAll.
Давайте взглянем на реализацию трех методов ожидания, уведомления и уведомления All в Object.
public class Object {
public final native void notify();
public final native void notifyAll();
public final void wait() throws InterruptedException {
wait(0L);
}
public final native void wait(long timeoutMillis) throws InterruptedException;
}
К сожалению, все эти методы являются собственными методами, что означает, что эти методы реализованы на C/C++ в виртуальной машине. В этом случае вы могли бы также взглянуть на код виртуальной машины, чтобы узнать, ведь доказательств нет.
1. Реализация ожидания виртуальной машиной
Продолжите код модели «производитель-потребитель» из предыдущего раздела для анализа.Когда поток-производитель кладет хлеб в контейнер и обнаруживает, что контейнер полон, он вызывает метод ожидания, после чего поток снимает блокировку и переходит в блокирующее состояние.
Реализация метода ожидания в Object находится вobjectMonitor.cppсерединаObjectMonitor::wait(jlong millis, bool interruptible, TRAPS)
В этой функции основной код, связанный с ObjectMonitor::wait, выглядит следующим образом:
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
// ...省略其他代码
// 当前线程
Thread * const Self = THREAD ;
// 将线程封装成ObjectWaiter
ObjectWaiter node(Self);
// 标记为Wait状态
node.TState = ObjectWaiter::TS_WAIT ;
Self->_ParkEvent->reset() ;
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
// 调用 AddWaiter 方法将线程加入到等待队列中
AddWaiter (&node) ;
Thread::SpinRelease (&_WaitSetLock) ;
// ...
// 释放 monitor 锁,并将自己挂起
exit (true, Self) ;
}
Видно, что после вызова функции ожидания поток инкапсулируется в объект ObjectWaiter, и поток добавляется в очередь ожидания через функцию AddWaiter.Для начала рассмотрим код функции AddWaiter:
inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {
// 如果 _WaitSet 还没有初始化,先初始化 _WaitSet
if (_WaitSet == NULL) {
// 初始化 _WaitSet 的头结点,此时只有一个node元素
_WaitSet = node;
// 可以看出ObjectWaiter是一个双向链表,这里将node的首尾相连,说明_WaitSet是一个循环链表
node->_prev = node;
node->_next = node;
} else {
// _WaitSet 的头结点
ObjectWaiter* head = _WaitSet ;
// 环形链表头结点的prev就是尾结点
ObjectWaiter* tail = head->_prev;
assert(tail->_next == head, "invariant check");
// 将node插入到_WaitSet的尾结点中
tail->_next = node;
head->_prev = node;
node->_next = head;
node->_prev = tail;
}
}
Реализация функции AddWaiter на самом деле относительно проста, она инициализирует_WaitSet
связанный список и вставить узел в_WaitSet
Конец очереди, это тоже видно из кода_WaitSet
Связный список — это круговой двусвязный список.
После завершения операции очереди вставки потока продолжайте вызывать функцию выхода, чтобы снять блокировку монитора и приостановить себя. Об этом методе он будет задействован позже, а исходный код будет виден позже.
2. Реализация уведомления виртуальной машиной
После того, как производитель заканчивает производство хлеба, вызывается notifyAll, чтобы разбудить поток-потребитель. Метод notifyAll разбудит все потоки, а метод notify разбудит только один поток. Здесь мы берем уведомление в качестве примераobjectMonitor.cppКак функция уведомления пробуждает поток.
void ObjectMonitor::notify(TRAPS) {
int Policy = Knob_MoveNotifyee ;
// DequeueWaiter是一个函数,会返回 _WaitSet 的头结点
ObjectWaiter * iterator = DequeueWaiter() ;
if (iterator != NULL) {
// 将阻塞队列赋值给 List
ObjectWaiter * List = _EntryList ;
// 根据策略执行不同的逻辑,Policy默认值为2
if (Policy == 0) { // prepend to EntryList
// ...
} else if (Policy == 1) { // append to EntryList
// ...
} else if (Policy == 2) { // prepend to cxq
// prepend to cxq
if (List == NULL) {
// iterator 的前驱与后继节点置空
iterator->_next = iterator->_prev = NULL ;
// _EntryList指向这个节点,说明节点已被加入阻塞队列,等待获取锁
_EntryList = iterator ;
} else {
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) { // 通过CAS将iterator插入到 _cxq 队列
ObjectWaiter * Front = _cxq ;
iterator->_next = Front ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
break ;
}
}
}
} else if (Policy == 3) { // append to cxq
// ...
}
}
// ...
}
В функции уведомления сначала вызывается функция DequeueWaiter, а функция функции DequeueWaiter состоит в том, чтобы вывести_WaitSet
Головной узел связанного списка, код выглядит следующим образом:
inline ObjectWaiter* ObjectMonitor::DequeueWaiter() {
// dequeue the very first waiter
ObjectWaiter* waiter = _WaitSet;
if (waiter) {
DequeueSpecificWaiter(waiter);
}
return waiter;
}
// 将头结点 从队列中断开
inline void ObjectMonitor::DequeueSpecificWaiter(ObjectWaiter* node) {
// ...
ObjectWaiter* next = node->_next;
if (next == node) {
// 此时,队列中只有一个元素,因此取出后,队列就是NULL了
_WaitSet = NULL;
} else {
ObjectWaiter* prev = node->_prev;
// 这一操作就是将 node 从队列移除,并重新连接队列
next->_prev = prev;
prev->_next = next;
if (_WaitSet == node) {
_WaitSet = next;
}
}
// 将 node 的前驱节点与后继节点置空
node->_next = NULL;
node->_prev = NULL;
}
Видно, что в функции DequeueWaiter вызывается функция DequeueSpecificWaiter, в которой, если очередь имеет только один узел,_WaitSet
Пустой, то есть после извлечения головного узла в очереди нет элементов. Если есть несколько узлов, головной узел будет удален из очереди и повторно сплайсирован._WaitSet
очередь. Тогда узел-предшественник и узел-преемник извлеченного узла пусты.
Следующий код функции уведомления оценивает, что если итератор не равен NULL, это означает, что поток находится в состоянии ожидания, и ожидающий поток необходимо перевести в очередь блокирующего потока. Далее выполняется другая логика в соответствии с Политикой.Значение Политики по умолчанию равно 2, поэтому здесь рассматривается только ситуация по умолчанию. То есть, когда Политика равна 2, то_EntryList
Присваивается списку, если список равен NULL, это означает, что в данный момент нет ни одного потока в состоянии блокировки. тогда будет_EntryList
указывает на итератор. Он отмечает, что ожидающий поток перешел в состояние блокировки и может получить блокировку, но в это время поток не был пробужден. Если List равен NULL, то поток в состоянии ожидания перемещается в CAS через CAS._cxq
очередь,_cxq
Очередь - это просто временная очередь, и в конечном итоге она будет перемещена в функцию выхода позже._EntryList
середина. Здесь мы должны обратить внимание на различиесостояние блокировкиа такжесостояние ожидания,так же какочередь ожиданияа такжеочередь блокировки.
3. Существующая функция виртуальной машины
Видно, что функция уведомления только передает очередь потоку, а фактически не пробуждается. Операция фактического пробуждения потока реализована в уже упомянутом в разделе 1 этой главы. Просто время вызова существующей функции в это время наступает после того, как виртуальная машина читает инструкцию monitorexist. Взгляните на упрощенный код функции выхода:
void ObjectMonitor::exit(bool not_suspended, TRAPS) {
// ...
// 这里是一个死循环
for (;;) {
ObjectWaiter * w = NULL ;
// QMode默认值为0
int QMode = Knob_QMode ;
if (QMode == 2 && _cxq != NULL) {
// ... 这里从_cxq队列取头结点并唤醒,无关省略。
return ;
}
// ...
w = _EntryList ;
// 先查看_EntryList是否为空
if (w != NULL) {
// _EntryList不为空,通过ExitEpilog函数唤醒_EntryList队列的头结点
ExitEpilog (Self, w) ;
return ;
}
// 到这里说明_EntryList为空,将则将 w 指向 _cxq
w = _cxq ;
// _cxq 是 NULL 说明没有等待状态的线程需要唤醒,则继续执行循环
if (w == NULL) continue ;
// ...
// 走到这说明有处于等待状态,需要唤醒的线程
if (QMode == 1) {
// ...
} else {
// 如果走到此处说明_cxq队列不为空
// QMode == 0 or QMode == 2
// 此时_EntryList队列是空,将_EntryList指向_cxq队列
_EntryList = w ;
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
// 将单向链表变成双向环链表
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}
}
if (_succ != NULL) continue;
w = _EntryList ;
if (w != NULL) {
guarantee (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
// 唤醒_EntryList的头结点
ExitEpilog (Self, w) ;
return ;
}
}
// 释放锁并唤醒线程
void ObjectMonitor::ExitEpilog (Thread * Self, ObjectWaiter * Wakee) {
assert (_owner == Self, "invariant") ;
// Exit protocol:
// 1. ST _succ = wakee
// 2. membar #loadstore|#storestore;
// 2. ST _owner = NULL
// 3. unpark(wakee)
_succ = Knob_SuccEnabled ? Wakee->_thread : NULL ;
ParkEvent * Trigger = Wakee->_event ;
Wakee = NULL ;
// 释放锁
OrderAccess::release_store_ptr (&_owner, NULL) ;
OrderAccess::fence() ; // ST _owner vs LD in unpark()
// 唤醒线程
Trigger->unpark() ;
//...
}
Код существующей функции сложен, а здесь он упрощен, так как значение QMode по умолчанию равно 0, обсуждается только этот случай.
-
Во-первых, если
_EntryList
не равно NULL, затем напрямую вызовите функцию ExitEpilog из_EntryList
Выньте головной узел и разбудите поток; -
если
_EntryList
равно NULL, но_cxq
Очередь не равна NULL, что указывает на то, что поток в состоянии ожидания был уведомлен, но на самом деле не был пробужден._cxq
Переместите все элементы в очередь в_EntryList
Поставьте в очередь и преобразуйте его в двусвязный список. Затем проснитесь через ExitEpilog_EntryList
головной узел.
3. Резюме
Эта статья начинается с простой модели «производитель-потребитель», распознает методы ожидания и уведомления/уведомления All в Object и глубоко анализирует реализацию этих двух методов в нижней части виртуальной машины. Синхронизированное ключевое слово в коде Java компилируется компилятором в инструкцию monitorenter/monitorexist байт-кода.Когда виртуальная машина выполняет соответствующую инструкцию, она вызывает соответствующие функции в нижней части виртуальной машины, чтобы получить и снять блокировку . Поскольку объект блокировки Object связан с объектом монитора, вы можете вызвать методы ожидания и notify/notifyAll в объекте Object, чтобы заблокировать и разбудить поток. Эти два метода также вызывают связанные функции в нижней части виртуальной машины.Функция ожидания инкапсулирует поток как объект ожидания и вставляет его в очередь ожидания, а функция notify/notifyAll извлекает поток из очереди ожидания и передает его. к_EntryList
очередь или пересесть на_cxq
Поставьте в очередь, подождите, пока поток, удерживающий блокировку, завершит выполнение, прочитает инструкцию monitorexist и вызовет существующую функцию виртуальной машины, чтобы снять блокировку и проснуться._EntryList
очередь или_cxq
Потоки в очереди.
Этот механизм ожидания и пробуждения синхронизированных блокировок, очевидно, имеет недостаток. По-прежнему взяв в качестве примера модель «производитель-потребитель», поскольку и поток-производитель, и поток-потребитель будут добавлены в одну и ту же очередь WaitSet, метод notifyAll не может точно контролировать, какой тип потока пробуждается. пока вНа этот раз я досконально разобрался с принципом реализации ReentranLock в Java.В этой статье мы узнали о ReentranLock, который похож на синхронизированный и имеет аналогичный механизм ожидания и пробуждения, а также может точно контролировать пробуждение указанных потоков. Итак, как реализован ReentranLock? Мы обсудим это в следующий раз.