Пулы потоков не так просты, как вы думаете (продолжение)

Java
Пулы потоков не так просты, как вы думаете (продолжение)

предисловие

Я написал статью некоторое время назад«Пул потоков не так прост, как вы думаете», и мы создали с вами базовый пул потоков, в котором:

  • Базовая функция планирования пула потоков.
  • Пул потоков автоматически расширяется и сжимается.
  • Поток кэша очереди.
  • Закройте пул потоков.

Этих функций, в конце концов, тоже осталось реализовать триfeatures.

  • Выполнить поток с возвращаемым значением.
  • Как насчет обработки исключений?
  • Как я буду уведомлен, когда все задачи будут выполнены?

На этот раз давайте реализуем эти три функции, чтобы увидетьj.u.cКак пул потоков в .

Перед прочтением этой статьи настоятельно рекомендуется ознакомиться с вышеизложенным«Пул потоков не так прост, как вы думаете»

Уведомления о завершении задач

Когда вы используете пулы потоков, у вас будут более или менее следующие требования:

После того, как задачи в пуле потоков выполнены, основной поток уведомляется о других действиях, таких как выполнение следующей волны задач после выполнения пакета задач.

В качестве примера возьмем наш предыдущий код:

Всего в пул потоков отправляется 13 задач, и журнал «выполнение задачи завершено» печатается до тех пор, пока они все не будут выполнены.

Результат выполнения следующий:

Чтобы просто добиться этого эффекта, мы можем передать реализацию интерфейса при инициализации пула потоков, Этот интерфейс используется для обратного вызова после завершения задачи.

public interface Notify {

    /**
     * 回调
     */
    void notifyListen() ;
}

Выше приведен конструктор пула потоков и определение интерфейса.

Таким образом, ключ к реализации этой функции заключается в том, когда вызывать этот интерфейс?

На самом деле просто подумать об этом внимательно: пока мы записываем количество задач, отправленных в пул потоков, и количество завершений, когда разница между ними равна 0, считается, что задачи в пуле потоков были выполняется; тогда этот интерфейс может быть вызван обратно.

Итак, при записи задач в пул потоков нам нужно записать количество задач:

Ради безопасности параллелизма счетчик здесь принимает атомарныйAtomicInteger.


После выполнения задачи счетчик будет равен -1.Как только он станет равным 0, все задачи и задачи будут выполнены.В это время мы можем отозвать уведомление о завершении нашего пользовательского интерфейса.


реализация JDK

Такие требования есть в jdkThreadPoolExecutorтакже связано вAPI, но использование не то же самое, но основные принципы схожи.

Мы используемThreadPoolExecutorОбщий процесс отключения выглядит следующим образом:

    executorService.shutdown();
    while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) {
        logger.info("thread running");
    }

Выполнить после отправки потокаshutdown()Закройте пул потоков, а затем вызовите его в циклеawaitTermination()метод, который вернется после выполнения всех задачtrueтем самым выходя из цикла.

Цель и принцип этих двух методов заключаются в следующем:

  • воплощать в жизньshutdown()После этого состояние пула потоков будет установлено в закрытое состояние.В это время он перестанет получать новые задачи и будет ждать выполнения всех задач в очереди, прежде чем закрыть пул потоков.
  • awaitTerminationОн будет заблокирован до тех пор, пока не будут выполнены все задачи в пуле потоков или не истечет время ожидания.

почему дваapiКак насчет комбинированного использования?

Основная цель — сделать что-то после выполнения всех потоков, то есть основной поток нужно выполнить до того, как будет выполнен поток.заблокированиз.

shutdown()После выполнения он не будет блокироваться и немедленно вернется, все нужно вызывать непрерывно с помощью циклов.awaitTermination(), потому что этот API заблокирует поток.

На самом деле, когда мы посмотрим на исходный код, мы обнаружим, чтоThreadPoolExecutorБлокировка — это по-прежнему использование механизма ожидающих уведомлений, ноLockSupportизAPIВот и все.

поток с возвращаемым значением

Далее следует поток с возвращаемым значением, и это требование также очень распространено; например, поток требуется для асинхронного вычисления некоторых данных, а затем получения результата для использования в итоговой сводке.

Давайте посмотрим, как его использовать (аналогично jdk):

Первая задача – не достичьRunnableинтерфейс, ведь егоrun()Функции не имеют возвращаемого значения, поэтому мы реализуемCallableИнтерфейс:

Этот интерфейс имеет возвращаемое значение.

При этом было внесено небольшое изменение при подаче задания:

Во-первых, функция, выполняющая задачу, задается выражениемexecute()заменяетсяsubmit(), и он вернет возвращаемое значениеFuture, через который можно получить результат выполнения потока.

Наконец, распечатайте все результаты выполнения на втором шаге:

Принцип реализации

Прежде чем рассматривать конкретную реализацию, давайте подумаем, как реализовать такую ​​функцию?

  • сначала ограничиваетсяjdkразгромapiСпецификация для выполнения потока, независимо от того, реализует ли он интерфейс или наследует класс, в конечном итоге выполняетсяrun()функция.
  • Итак, мы хотим, чтобы поток имел возвращаемое значение, но он может быть выполнен толькоrun()функцию для вызова метода с возвращаемым значением, а затем сохранить возвращаемое значение для последующего использования.

Например, мы создали новыйCallable<T>Интерфейс:

public interface Callable<T> {

    /**
     * 执行任务
     * @return 执行结果
     */
    T call() ;
}

этоcallФункция — это только что упомянутый метод с возвращаемым значением, поэтому мы должны вызывать его в функции run() потока.

Потом будет другойFutureИнтерфейс, его основная функция заключается в получении возвращаемого значения потока, т.е.再将这个返回值存放起来用于后续使用упоминается здесьпоследующее использование.

Раз есть интерфейс, то должна быть и его реализация.FutureTask, он достигаетFutureИнтерфейс используется для последующего получения возвращаемого значения.

В то же время понялRunnableИнтерфейс превращается в поток.

так что в своемrun()Функция с только что упомянутым возвращаемым значением будет вызываться в функцииcall()функция.


рекомбинироватьsubmit()отправлять задачи иget()Получите исходный код возвращаемого значения, чтобы лучше понять дорвей.

    /**
     * 有返回值
     *
     * @param callable
     * @param <T>
     * @return
     */
    public <T> Future<T> submit(Callable<T> callable) {
        FutureTask<T> future = new FutureTask(callable);
        execute(future);
        return future;
    }

submit()Очень просто, брось насCallableобъект превращается вFutureTaskобъект, а затем вызвать предыдущийexecute()чтобы бросить его в пул потоков (последующий процесс аналогичен процессу входа обычного потока в пул потоков).

Сам FutureTask также является потоком, поэтому его можно использовать напрямую.execute()функция.


иfuture.get()в функцииfutureобъект из-заsubmit()Реальный объект, возвращенный вFutureTask, поэтому мы просто смотрим исходный код напрямую.

так какget()Это блокирующая функция до тех пор, пока поток не вернется и, наконец, не пройдетnotify.wait()Это реализуется путем перевода потока в состояние блокировки.

сделать это изwait()Условие, возвращаемое в потоке, должно быть активировано, когда поток завершит выполнение и получит возвращаемое значение.

Это вторая часть диаграммы; как только поток завершит выполнение (callable.call()) проснетсяnotifyобъект, поэтомуgetМетод также может вернуться.


Та же причина,ThreadPoolExecutorПринцип аналогичен, но учитывает больше деталей, поэтому выглядит сложно, но ядро ​​после упрощения кода таково.

Даже окончательный используемый API выглядит похоже:

Обработка исключений

Последнее — это место, где некоторые новички могут легко наступить на яму при использовании пулов потоков: это обработка исключений.

Например такой сценарий:

создан тольконитьПул потоков, этот поток делает только одну вещь, то есть безостановочный цикл while.

Но в процессе зацикливания случайно было выброшено исключение, которое по совпадению не было поймано. Как вы думаете, что будет дальше?

Поток продолжает работать? Или пул потоков выйдет?

Из явления на самом деле ни то, ни другое, поток не продолжает работать и пул потоков не выходит, он всегда будет здесь зависать.

когда мыdumpСнимок потока найдет:

В это время в пуле потоков все еще выполняется поток, и по имени потока будет найдено, что это вновь созданный поток (ранее он былThread-0,ЭтоThread-1).

Его состояние потокаWAITING, найденный через стек застрял вCustomThreadPool.java:272место.

Он застревает в том месте, где задача получена из очереди.Поскольку очередь задач в это время пуста, он всегда будет блокироваться здесь.

Увидев это, друзья, которые следили за ним раньше, испытывают дежа вю.

Правильно, я написал два раньше:

В то время также обсуждались вопросы, связанные с пулами потоков.“激烈”, собственно, конечная причина точно такая же, как и здесь.

Итак, давайте посмотрим на проблему с этой короткой версией кода:

Сейчас я упростил еще одну версию кода, думаю, друзья, у которых раньше были сомнения, на этот раз должны понять его лучше.

По сути, пул потоков будет отлавливать исключения из работы потока, но он не будет обрабатываться, а будет использоваться только для того, чтобы отметить успешность выполнения;

После сбоя выполнения текущий поток исключений будет перезапущен, а новый будет создан заново.WorkerТема продолжается сПолучить задачу из очереди и выполнить ее.

Так что в итоге застревает从队列中取任务место.

фактическиThreadPoolExecutorОбработка исключений также схожа, и конкретный исходный код сильно анализироваться не будет, о чем несколько раз говорилось в двух вышеприведенных статьях.

Поэтому, когда мы используем пул потоков, задачи в нем должны выполнять обработку исключений.

Суммировать

После этой волны, я думаю, с пулом потоков разобраться не составит труда, в целом он использует множество многопоточных решений, таких как:

  • ReentrantLockПовторно входящие блокировки используются для обеспечения параллельной безопасности записи потока.
  • Используйте механизм ожидающих уведомлений для обеспечения связи между потоками (результаты выполнения потоков, ожидание завершения выполнения пула потоков и т. д.).

Наконец узнал:

  • Стандартный процесс закрытия пула потоков.
  • Как использовать поток с возвращаемым значением.
  • Важность перехвата исключений потока.

Наконец, весь исходный код этой статьи (используется вместе с тестовым кодом):

GitHub.com/crossover J я…

Ваши лайки и репост - лучшая поддержка для меня