предисловие
Я написал статью некоторое время назад«Пул потоков не так прост, как вы думаете», и мы создали с вами базовый пул потоков, в котором:
- Базовая функция планирования пула потоков.
- Пул потоков автоматически расширяется и сжимается.
- Поток кэша очереди.
- Закройте пул потоков.
Этих функций, в конце концов, тоже осталось реализовать триfeatures
.
- Выполнить поток с возвращаемым значением.
- Как насчет обработки исключений?
- Как я буду уведомлен, когда все задачи будут выполнены?
На этот раз давайте реализуем эти три функции, чтобы увидетьj.u.c
Как пул потоков в .
Перед прочтением этой статьи настоятельно рекомендуется ознакомиться с вышеизложенным«Пул потоков не так прост, как вы думаете»
Уведомления о завершении задач
Когда вы используете пулы потоков, у вас будут более или менее следующие требования:
После того, как задачи в пуле потоков выполнены, основной поток уведомляется о других действиях, таких как выполнение следующей волны задач после выполнения пакета задач.
В качестве примера возьмем наш предыдущий код:
Всего в пул потоков отправляется 13 задач, и журнал «выполнение задачи завершено» печатается до тех пор, пока они все не будут выполнены.
Результат выполнения следующий:
Чтобы просто добиться этого эффекта, мы можем передать реализацию интерфейса при инициализации пула потоков, Этот интерфейс используется для обратного вызова после завершения задачи.
public interface Notify {
/**
* 回调
*/
void notifyListen() ;
}
Выше приведен конструктор пула потоков и определение интерфейса.
Таким образом, ключ к реализации этой функции заключается в том, когда вызывать этот интерфейс?
На самом деле просто подумать об этом внимательно: пока мы записываем количество задач, отправленных в пул потоков, и количество завершений, когда разница между ними равна 0, считается, что задачи в пуле потоков были выполняется; тогда этот интерфейс может быть вызван обратно.
Итак, при записи задач в пул потоков нам нужно записать количество задач:
Ради безопасности параллелизма счетчик здесь принимает атомарныйAtomicInteger
.
После выполнения задачи счетчик будет равен -1.Как только он станет равным 0, все задачи и задачи будут выполнены.В это время мы можем отозвать уведомление о завершении нашего пользовательского интерфейса.
реализация JDK
Такие требования есть в jdkThreadPoolExecutor
также связано вAPI
, но использование не то же самое, но основные принципы схожи.
Мы используемThreadPoolExecutor
Общий процесс отключения выглядит следующим образом:
executorService.shutdown();
while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) {
logger.info("thread running");
}
Выполнить после отправки потокаshutdown()
Закройте пул потоков, а затем вызовите его в циклеawaitTermination()
метод, который вернется после выполнения всех задачtrue
тем самым выходя из цикла.
Цель и принцип этих двух методов заключаются в следующем:
- воплощать в жизнь
shutdown()
После этого состояние пула потоков будет установлено в закрытое состояние.В это время он перестанет получать новые задачи и будет ждать выполнения всех задач в очереди, прежде чем закрыть пул потоков. -
awaitTermination
Он будет заблокирован до тех пор, пока не будут выполнены все задачи в пуле потоков или не истечет время ожидания.
почему дваapi
Как насчет комбинированного использования?
Основная цель — сделать что-то после выполнения всех потоков, то есть основной поток нужно выполнить до того, как будет выполнен поток.заблокированиз.
shutdown()
После выполнения он не будет блокироваться и немедленно вернется, все нужно вызывать непрерывно с помощью циклов.awaitTermination()
, потому что этот API заблокирует поток.
На самом деле, когда мы посмотрим на исходный код, мы обнаружим, чтоThreadPoolExecutor
Блокировка — это по-прежнему использование механизма ожидающих уведомлений, ноLockSupport
изAPI
Вот и все.
поток с возвращаемым значением
Далее следует поток с возвращаемым значением, и это требование также очень распространено; например, поток требуется для асинхронного вычисления некоторых данных, а затем получения результата для использования в итоговой сводке.
Давайте посмотрим, как его использовать (аналогично jdk):
Первая задача – не достичьRunnable
интерфейс, ведь егоrun()
Функции не имеют возвращаемого значения, поэтому мы реализуемCallable
Интерфейс:
Этот интерфейс имеет возвращаемое значение.
При этом было внесено небольшое изменение при подаче задания:
Во-первых, функция, выполняющая задачу, задается выражениемexecute()
заменяетсяsubmit()
, и он вернет возвращаемое значениеFuture
, через который можно получить результат выполнения потока.
Наконец, распечатайте все результаты выполнения на втором шаге:
Принцип реализации
Прежде чем рассматривать конкретную реализацию, давайте подумаем, как реализовать такую функцию?
- сначала ограничивается
jdk
разгромapi
Спецификация для выполнения потока, независимо от того, реализует ли он интерфейс или наследует класс, в конечном итоге выполняетсяrun()
функция. - Итак, мы хотим, чтобы поток имел возвращаемое значение, но он может быть выполнен только
run()
функцию для вызова метода с возвращаемым значением, а затем сохранить возвращаемое значение для последующего использования.
Например, мы создали новыйCallable<T>
Интерфейс:
public interface Callable<T> {
/**
* 执行任务
* @return 执行结果
*/
T call() ;
}
этоcall
Функция — это только что упомянутый метод с возвращаемым значением, поэтому мы должны вызывать его в функции run() потока.
Потом будет другойFuture
Интерфейс, его основная функция заключается в получении возвращаемого значения потока, т.е.再将这个返回值存放起来用于后续使用
упоминается здесьпоследующее использование.
Раз есть интерфейс, то должна быть и его реализация.FutureTask
, он достигаетFuture
Интерфейс используется для последующего получения возвращаемого значения.
В то же время понялRunnable
Интерфейс превращается в поток.
так что в своемrun()
Функция с только что упомянутым возвращаемым значением будет вызываться в функцииcall()
функция.
рекомбинироватьsubmit()
отправлять задачи иget()
Получите исходный код возвращаемого значения, чтобы лучше понять дорвей.
/**
* 有返回值
*
* @param callable
* @param <T>
* @return
*/
public <T> Future<T> submit(Callable<T> callable) {
FutureTask<T> future = new FutureTask(callable);
execute(future);
return future;
}
submit()
Очень просто, брось насCallable
объект превращается вFutureTask
объект, а затем вызвать предыдущийexecute()
чтобы бросить его в пул потоков (последующий процесс аналогичен процессу входа обычного потока в пул потоков).
Сам FutureTask также является потоком, поэтому его можно использовать напрямую.
execute()
функция.
иfuture.get()
в функцииfuture
объект из-заsubmit()
Реальный объект, возвращенный вFutureTask
, поэтому мы просто смотрим исходный код напрямую.
так какget()
Это блокирующая функция до тех пор, пока поток не вернется и, наконец, не пройдетnotify.wait()
Это реализуется путем перевода потока в состояние блокировки.
сделать это изwait()
Условие, возвращаемое в потоке, должно быть активировано, когда поток завершит выполнение и получит возвращаемое значение.
Это вторая часть диаграммы; как только поток завершит выполнение (callable.call()
) проснетсяnotify
объект, поэтомуget
Метод также может вернуться.
Та же причина,ThreadPoolExecutor
Принцип аналогичен, но учитывает больше деталей, поэтому выглядит сложно, но ядро после упрощения кода таково.
Даже окончательный используемый API выглядит похоже:
Обработка исключений
Последнее — это место, где некоторые новички могут легко наступить на яму при использовании пулов потоков: это обработка исключений.
Например такой сценарий:
создан тольконитьПул потоков, этот поток делает только одну вещь, то есть безостановочный цикл while.
Но в процессе зацикливания случайно было выброшено исключение, которое по совпадению не было поймано. Как вы думаете, что будет дальше?
Поток продолжает работать? Или пул потоков выйдет?
Из явления на самом деле ни то, ни другое, поток не продолжает работать и пул потоков не выходит, он всегда будет здесь зависать.
когда мыdump
Снимок потока найдет:
В это время в пуле потоков все еще выполняется поток, и по имени потока будет найдено, что это вновь созданный поток (ранее он былThread-0
,ЭтоThread-1
).
Его состояние потокаWAITING
, найденный через стек застрял вCustomThreadPool.java:272
место.
Он застревает в том месте, где задача получена из очереди.Поскольку очередь задач в это время пуста, он всегда будет блокироваться здесь.
Увидев это, друзья, которые следили за ним раньше, испытывают дежа вю.
Правильно, я написал два раньше:
В то время также обсуждались вопросы, связанные с пулами потоков.“激烈”
, собственно, конечная причина точно такая же, как и здесь.
Итак, давайте посмотрим на проблему с этой короткой версией кода:
Сейчас я упростил еще одну версию кода, думаю, друзья, у которых раньше были сомнения, на этот раз должны понять его лучше.
По сути, пул потоков будет отлавливать исключения из работы потока, но он не будет обрабатываться, а будет использоваться только для того, чтобы отметить успешность выполнения;
После сбоя выполнения текущий поток исключений будет перезапущен, а новый будет создан заново.Worker
Тема продолжается сПолучить задачу из очереди и выполнить ее.
Так что в итоге застревает从队列中取任务
место.
фактическиThreadPoolExecutor
Обработка исключений также схожа, и конкретный исходный код сильно анализироваться не будет, о чем несколько раз говорилось в двух вышеприведенных статьях.
Поэтому, когда мы используем пул потоков, задачи в нем должны выполнять обработку исключений.
Суммировать
После этой волны, я думаю, с пулом потоков разобраться не составит труда, в целом он использует множество многопоточных решений, таких как:
-
ReentrantLock
Повторно входящие блокировки используются для обеспечения параллельной безопасности записи потока. - Используйте механизм ожидающих уведомлений для обеспечения связи между потоками (результаты выполнения потоков, ожидание завершения выполнения пула потоков и т. д.).
Наконец узнал:
- Стандартный процесс закрытия пула потоков.
- Как использовать поток с возвращаемым значением.
- Важность перехвата исключений потока.
Наконец, весь исходный код этой статьи (используется вместе с тестовым кодом):
Ваши лайки и репост - лучшая поддержка для меня