1. Введение
Что такое пул потоков
Пул потоков — это форма многопоточности, при которой задачи добавляются в очередь во время обработки, а затем автоматически запускаются после создания потоков.
Зачем использовать пул потоков
Если количество одновременных запросов велико, но время выполнения каждого потока очень короткое, потоки будут создаваться и уничтожаться часто. В результате эффективность системы будет значительно снижена, а затраты времени и ресурсов на частое создание и уничтожение потоков могут превысить фактическую работу.
Это из-за этой проблемы, необходимо ввести пул резьбы. использоватьПреимущества пулов потоковЕсть следующие моменты:
- Сокращение потребления ресурсов- Уменьшите стоимость создания и уничтожения потоков за счет повторного использования уже созданных потоков.
- Улучшить отзывчивость- При поступлении задачи задачу можно выполнить немедленно, не дожидаясь создания потока.
- Улучшить управляемость потоками- Потоки являются дефицитными ресурсами. Если они создаются без ограничений, это не только потребляет системные ресурсы, но и снижает стабильность системы. Использование пулов потоков можно использовать для унифицированного распределения, настройки и мониторинга. Но чтобы разумно использовать пул потоков, вы должны хорошо знать его принципы.
2. Структура исполнителя
Платформа Executor — это платформа для вызова, планирования, выполнения и управления асинхронными задачами в соответствии с набором политик выполнения с целью предоставления механизма для отделения «отправки задачи» от «как задачи выполняются».
Обзор основного API
Основной API платформы Executor выглядит следующим образом:
-
Executor
- Простой интерфейс для запуска задач. -
ExecutorService
- расширенныйExecutor
интерфейс. Расширяемость:- Поддержка потоков с возвращаемыми значениями;
- Поддержка управления жизненным циклом потоков.
-
ScheduledExecutorService
- расширенныйExecutorService
интерфейс. Расширяемость: поддерживает периодическое выполнение задач. -
AbstractExecutorService
-ExecutorService
Реализация интерфейса по умолчанию. -
ThreadPoolExecutor
- Основной класс фреймворка Executor, который наследуетAbstractExecutorService
своего рода. -
ScheduledThreadPoolExecutor
-ScheduledExecutorService
Реализация интерфейса, пул потоков, который может регулярно планировать задачи. -
Executors
- можно вызватьExecutors
Статический фабричный метод для создания пула потоков и возвратаExecutorService
объект.
Executor
Executor
В интерфейсе определен только одинexecute
способ получитьRunnable
объект.
public interface Executor {
void execute(Runnable command);
}
ExecutorService
ExecutorService
интерфейс наследуетExecutor
интерфейс, который также обеспечиваетinvokeAll
,invokeAny
,shutdown
,submit
и другие методы.
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Из определения методов он поддерживает, не сложно видеть, что: по сравнению сExecutor
интерфейс,ExecutorService
Основные расширения интерфейса:
- Поддерживаются потоки с возвращаемыми значениями -
sumbit
,invokeAll
,invokeAny
Все методы поддерживают входящиеCallable
объект. - Поддержка управления жизненным циклом потока —
shutdown
,shutdownNow
,isShutdown
и другие методы.
ScheduledExecutorService
ScheduledExecutorService
расширенный интерфейсExecutorService
интерфейс.
Помимо поддержки всех возможностей предыдущих двух интерфейсов, он также поддерживает потоки планирования времени.
public interface ScheduledExecutorService extends ExecutorService {
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
Его интерфейс расширения предоставляет следующие возможности:
-
schedule
метод может выполнятьRunnable
илиCallable
Задача. -
scheduleAtFixedRate
Методы иscheduleWithFixedDelay
Метод периодически выполняет задачу через заданный интервал времени.
3. ThreadPoolExecutor
java.uitl.concurrent.ThreadPoolExecutor
классExecutor
Основной класс в фреймворке. Поэтому данная статья будет посвящена этому классу.
Важные поля
ThreadPoolExecutor
Имеются следующие важные поля:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
Описание параметра:
-
ctl
- Используется для управления рабочим состоянием пула потоков и количеством эффективных потоков в пуле потоков.. Он содержит две части информации:- Текущее состояние пула потоков (
runState
) - Количество допустимых потоков в пуле потоков (
workerCount
) - можно увидеть,
ctl
использовалInteger
тип для сохранения, старшие 3 бита сохраняютсяrunState
, младшие 29 бит сохраняютсяworkerCount
.COUNT_BITS
29,CAPACITY
Это 1, сдвинутая влево на 29 бит минус 1 (29 1 с), эта константа представляетworkerCount
Верхний предел составляет около 500 миллионов.
- Текущее состояние пула потоков (
- Статус выполнения — существует пять статусов выполнения пула потоков:
-
RUNNING
- Рабочий статус. Принимает новые задачи, а также может обрабатывать задачи в очередях блокировки. -
SHUTDOWN
- Неполноценный. Не принимает новые задачи, но может обрабатывать задачи в очереди блокировки.- в пуле потоков
RUNNING
государство, звонитеshutdown
метод приведет пул потоков в это состояние. -
finalize
Метод также вызывается во время выполненияshutdown
метод входит в это состояние.
- в пуле потоков
-
STOP
- состояние остановки. Не принимает новые задачи и не обрабатывает задачи в очереди. Прерывает поток, обрабатывающий задачу. в пуле потоковRUNNING
илиSHUTDOWN
государство, звонитеshutdownNow
метод приведет пул потоков в это состояние. -
TIDYING
- Организовать статус. Если все задачи завершены,workerCount
(Количество допустимых потоков) равно 0, пул потоков будет вызываться после перехода в это состояние.terminated
способ входаTERMINATED
государство. -
TERMINATED
- Прекращено. существуетterminated
Войдите в это состояние после выполнения метода. дефолтterminated
Метод ничего не делает. ВходитьTERMINATED
Условия следующие:- Пул потоков не
RUNNING
государство; - состояние пула потоков не
TIDYING
статус илиTERMINATED
государство; - Если состояние пула потоков
SHUTDOWN
иworkerQueue
Пусто; -
workerCount
0; - настраивать
TIDYING
Статус успешно.
- Пул потоков не
-
Метод строительства
ThreadPoolExecutor
Конструкторов четыре, первые три основаны на четвертой реализации. Четвертый конструктор определяется следующим образом:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
Описание параметра:
-
corePoolSize
- количество основных потоков. Когда новая задача проходитexecute
Когда метод отправлен, пул потоков выполнит следующие суждения:- Если число запущенных потоков меньше
corePoolSize
, создается новый поток для обработки задачи, даже если другие потоки в пуле потоков простаивают. - Если количество потоков в пуле потоков больше или равно
corePoolSize
и меньше чемmaximumPoolSize
, то только когдаworkQueue
При заполнении создается новый поток для обработки задачи; - Если установлено
corePoolSize
иmaximumPoolSize
То же самое, размер создаваемого пула потоков фиксирован. В это время, если будет отправлена новая задача, еслиworkQueue
не заполнен, поместите запрос вworkQueue
, ожидая перехода незанятого потока изworkQueue
Подбирать задачи и обрабатывать их; - Если количество запущенных потоков больше или равно
maximumPoolSize
, то еслиworkQueue
заполнен, используйтеhandler
заданная стратегия обработки задачи; - Поэтому при сдаче задания порядок оценивания
corePoolSize
=>workQueue
=>maximumPoolSize
.
- Если число запущенных потоков меньше
-
maximumPoolSize
- максимальное количество потоков.- Если очередь заполнена, а количество уже созданных потоков меньше максимального количества потоков, пул потоков создаст новые потоки для выполнения задач.
- Стоит отметить, что этот параметр не действует, если используется неограниченная очередь задач.
-
keepAliveTime
:Тема поддерживает время жизни.- Когда количество потоков в пуле потоков превышает
corePoolSize
Когда в это время нет новой отправки задачи, потоки вне основного потока не будут немедленно уничтожены, а будут ждать, пока время ожидания не превыситkeepAliveTime
. - Следовательно, если есть много задач, и время выполнения каждой задачи относительно короткое, на этот раз может быть увеличено для улучшения использования потоков.
- Когда количество потоков в пуле потоков превышает
-
unit
-keepAliveTime
единица времени. Есть 7 значений. Необязательными единицами измерения являются дни (DAYS), часы (HOURS), минуты (MINUTES), миллисекунды (MILLISECONDS), микросекунды (MICROSECONDS, тысячные доли миллисекунды) и наносекунды (NANOSECONDS, тысячные доли микросекунды). -
workQueue
- очередь задач, ожидающих выполнения. Блокирующая очередь для хранения задач, ожидающих выполнения. Можно выбрать следующие очереди блокировки.-
ArrayBlockingQueue
- Ограниченная очередь блокировки.- Эта очередьОчередь в порядке очереди на основе массива (FIFO).
- Эта очередь должна быть создана с указанным размером.
-
LinkedBlockingQueue
- неограниченная очередь блокировки.- Эта очередьОчередь в порядке очереди (FIFO) на основе связанного списка.
- Если этот размер очереди не указан при создании, по умолчанию он равен
Integer.MAX_VALUE
. - пропускная способность обычно выше
ArrayBlockingQueue
. - использовать
LinkedBlockingQueue
значит:maximumPoolSize
не будет работать, максимальное количество потоков, которое может создать пул потоков, равноcorePoolSize
, так как очередь ожидания задач является неограниченной очередью. -
Executors.newFixedThreadPool
использовал эту очередь.
-
SynchronousQueue
- Отправленная задача не будет сохранена, но будет создан новый поток непосредственно для выполнения новой задачи..- Каждая операция вставки должна ждать, пока другой поток не вызовет операцию удаления, иначе операция вставки останется заблокированной.
- пропускная способность обычно выше
LinkedBlockingQueue
. -
Executors.newCachedThreadPool
использовал эту очередь.
-
PriorityBlockingQueue
- Неограниченная очередь блокировки с приоритетом.
-
-
threadFactory
- фабрика нитей. Каждому созданному потоку можно дать более осмысленное имя через фабрику потоков. -
handler
- стратегия насыщения. этоRejectedExecutionHandler
переменная типа. Когда очередь и пул потоков заполнены, что указывает на насыщение пула потоков, необходимо принять стратегию для обработки новых отправленных задач. Пул потоков поддерживает следующие стратегии:-
AbortPolicy
- Отменить задачу и создать исключение. Это также политика по умолчанию. -
DiscardPolicy
- Отменить задачу без создания исключения. -
DiscardOldestPolicy
- Отменить задачу в начале очереди и повторить задачу (повторить процесс). -
CallerRunsPolicy
- Используйте только поток вызывающего абонента для запуска задачи. - Если ни одна из вышеперечисленных стратегий не может удовлетворить ваши потребности, вы также можете реализовать
RejectedExecutionHandler
интерфейс для настройки стратегии обработки. Например, регистрация или сохранение задач, которые невозможно обработать.
-
выполнить метод
По умолчанию после создания пула потоков в пуле потоков нет, а потоки создаются только после отправки задач.
Для отправки задачи вы можете использоватьexecute
метод, этоThreadPoolExecutor
Основной метод, с помощью которого можноОтправить задачу в пул потоков для выполнения пулом потоков.
execute
Рабочий процесс метода выглядит следующим образом:
- если
workerCount < corePoolSize
, затем создайте и запустите поток для выполнения только что отправленной задачи; - если
workerCount >= corePoolSize
, а очередь блокировки в пуле потоков не заполнена, добавьте задачу в очередь блокировки; - если
workerCount >= corePoolSize && workerCount < maximumPoolSize
, и очередь блокировки в пуле потоков заполнена, создайте и запустите поток для выполнения только что отправленной задачи; - если
workerCount >= maximumPoolSize
, а очередь блокировки в пуле потоков заполнена, задача обрабатывается в соответствии с политикой отклонения.Метод обработки по умолчанию – прямой вызов исключения.
Другие важные методы
существуетThreadPoolExecutor
В классе также есть несколько важных методов:
-
submit
- похожий наexecute
, но для потоков с возвращаемыми значениями.submit
метод находится вExecutorService
методы, объявленные в , вAbstractExecutorService
Уже есть конкретная реализация.ThreadPoolExecutor
Прямое повторное использованиеAbstractExecutorService
изsubmit
метод. -
shutdown
- Пул потоков не будет остановлен немедленно, но будет остановлен после того, как все задачи в очереди кэша задач будут выполнены, но новые задачи приниматься не будут.- Переключите пул потоков на
SHUTDOWN
государство; - и позвони
interruptIdleWorkers
Метод запрашивает прерывание всех бездействующих рабочих процессов; - последний звонок
tryTerminate
Попытка завершить пул потоков.
- Переключите пул потоков на
-
shutdownNow
- Немедленно прервите пул потоков и попытайтесь прервать выполнение задачи, очистите очередь кэша задач и верните задачу, которая не была выполнена. иshutdown
Метод похож, разница:- установить состояние на
STOP
; - Прервать все рабочие потоки, независимо от того, простаивают они или нет;
- Удалить невыполненную задачу из очереди блокировки и вернуться.
- установить состояние на
-
isShutdown
- называетсяshutdown
илиshutdownNow
После метода,isShutdown
метод вернет true. -
isTerminaed
- Когда все задачи закрыты, пул потоков успешно закрыт, затем вызовитеisTerminaed
метод вернет true. -
setCorePoolSize
- Установить размер количества основных потоков. -
setMaximumPoolSize
- Установить максимальное количество потоков размера. -
getTaskCount
- общее количество задач, выполняемых и не выполняемых пулом потока; -
getCompletedTaskCount
- Количество задач, выполненных пулом потоков, значение меньше или равноtaskCount
; -
getLargestPoolSize
- Максимальное количество потоков, когда-либо созданных пулом потоков. С помощью этих данных можно узнать, заполнен ли пул потоков, то есть достиг ли онmaximumPoolSize
; -
getPoolSize
- текущее количество потоков в пуле потоков; -
getActiveCount
- Количество потоков, выполняющих задачи в текущем пуле потоков.
Пример использования
public class ThreadPoolExecutorDemo {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 500, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 100; i++) {
threadPoolExecutor.execute(new MyThread());
String info = String.format("线程池中线程数目:%s,队列中等待执行的任务数目:%s,已执行玩别的任务数目:%s",
threadPoolExecutor.getPoolSize(),
threadPoolExecutor.getQueue().size(),
threadPoolExecutor.getCompletedTaskCount());
System.out.println(info);
}
threadPoolExecutor.shutdown();
}
static class MyThread implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 执行");
}
}
}
Четыре, Исполнители
JDKExecutors
В классе предусмотрено несколько репрезентативных пулов потоков, эти пулы потоковоснованы наThreadPoolExecutor
индивидуальная реализация.
При фактическом использовании пулов потоков мы часто не используем напрямуюThreadPoolExecutor
вместо этого используйте репрезентативный экземпляр пула потоков, предоставленный в JDK.
newSingleThreadExecutor
Создать однопоточный пул резьбы.
Для выполнения задач будет создан только один рабочий поток, гарантирующий выполнение всех задач в указанном порядке (FIFO, LIFO, приоритет).Если единственный поток завершается аварийно, новый поток заменит его..
Самые большие особенности одного рабочего потока:Гарантированное последовательное выполнение задач.
Пример:
public class SingleThreadExecutorDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 100; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 执行");
}
});
}
executorService.shutdown();
}
}
newFixedThreadPool
Создайте пул потоков фиксированного размера.
Каждый раз, когда задача отправляется, создается новый рабочий поток.Если количество рабочих потоков достигает максимального количества потоков в пуле потоков, отправленная задача будет сохранена в очереди блокировки..
FixedThreadPool
Это типичный и превосходный пул потоков, обладающий преимуществами пула потоков, повышающими эффективность программы и снижающими накладные расходы при создании потоков. Однако, когда пул потоков простаивает, то есть когда в пуле потоков нет исполняемых задач, он не будет освобождать рабочие потоки и будет занимать определенные системные ресурсы.
Пример:
public class FixedThreadPoolDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 100; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 执行");
}
});
}
executorService.shutdown();
}
}
newCachedThreadPool
Создайте кешируемый пул потоков.
- Если длина пула потоков превышает количество потоков, необходимых для обработки задачи, некоторые простаивающие потоки будут перезапущены;
- Если задача не отправляется в пул потоков в течение длительного времени, то есть если рабочий поток простаивает в течение заданного времени (по умолчанию 1 минута), рабочий поток будет автоматически завершен. После завершения, если вы отправляете новую задачу, пул потоков воссоздает рабочий поток.
- Этот пул потоков не ограничивает размер пула потоков, размер пула потоков полностью зависит от максимального размера потока, который может создать операционная система (или JVM). Поэтому используйте
CachedThreadPool
При обязательно обратите внимание на контроль количества задач, иначе из-за одновременного выполнения большого количества потоков система будет парализована.
Пример:
public class CachedThreadPoolDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 100; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 执行");
}
});
}
executorService.shutdown();
}
}
newScheduleThreadPool
Создайте пул потоков бесконечного размера. Этот пул потоков поддерживает потребности в синхронизированном и периодическом выполнении задач.
public class ScheduledThreadPoolDemo {
public static void main(String[] args) {
schedule();
scheduleAtFixedRate();
}
private static void schedule() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
for (int i = 0; i < 100; i++) {
executorService.schedule(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 执行");
}
}, 1, TimeUnit.SECONDS);
}
executorService.shutdown();
}
private static void scheduleAtFixedRate() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
for (int i = 0; i < 100; i++) {
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 执行");
}
}, 1, 1, TimeUnit.SECONDS);
}
executorService.shutdown();
}
}