Что такое пул потоков?
Во избежание повторного создания и уничтожения потоков, мы можем повторно использовать эти потоки, В пуле потоков всегда будут заняты активные потоки, но в пуле потоков также будут незанятые потоки, и эти потоки находятся в состоянии ожидания , при наличии задачи нить будет взята из пула в использование.По окончании работы нить не уничтожается, а помещается обратно в пул.
Пул потоков в основном решает две проблемы:
Во-первых, пулы потоков могут обеспечить хорошую производительность при выполнении большого количества асинхронных задач.
Во-вторых, пул потоков предоставляет средства ограничения ресурсов и управления ими, такие как ограничение количества существующих и динамическое добавление новых потоков.
- "Красота параллельного программирования на Java"
Приведенное выше содержание взято из книги "Красота параллельного программирования на Java". Первый вопрос был упомянут выше. Частое создание и уничтожение потоков очень требовательно к производительности, но потоки в пуле потоков могут быть повторно использованы и могут быть лучше. Для повышения производительности пул потоков использует очередь блокировки для обслуживания объектов, допускающих выполнение.
Принципиальный анализ
JDK инкапсулирует для нас набор многопоточных исполнителей фреймворка, чтобы помочь нам лучше контролировать пул потоков.Исполнители предоставляют некоторые фабричные методы пула потоков:
- newFixedThreadPool: возвращает пул потоков фиксированной длины, и количество потоков в пуле потоков фиксировано.
- newCacheThreadPool: этот метод возвращает пул потоков, который регулирует количество потоков в соответствии с реальной ситуацией.Время выживания бездействующих потоков составляет 60 с.
- newSingleThreadExecutor: этот метод возвращает пул потоков только с одним потоком.
- newSingleThreadScheduledExecutor: этот метод возвращает
SchemeExecutorService
объект, размер пула потоков равен 1,SchemeExecutorService
интерфейс вThreadPoolExecutor
класс иExecutorService
Расширение поверх интерфейса для выполнения задачи в заданное время. - newSchemeThreadPool: этот метод возвращает
SchemeExecutorService
Объект, указывающий количество потоков в пуле потоков.
Для основного пула потоков он используется внутриThreadPoolExecutor
Объект реализован, но информация о внутренних параметрах отличается.Для начала рассмотрим два примера:nexFixedThreadPool
иnewSingleThreadExecutor
Следующее:
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
Из процесса создания пула потоков выше видно, что все ониThreadPoolExecutor
пакет, давайте посмотримThreadPoolExecutor
Описание параметра:
имя параметра | Параметр Описание |
---|---|
corePoolSize | Указывает количество потоков пула потоков |
maximumPoolSize | Указывает максимальное количество потоков в пуле потоков. |
keepAliveTime | Когда количество потоков в пуле потоков превышает corePoolSize, оставшиеся бездействующие потоки будут существовать дольше.Если количество потоков в пуле потоков превышает corePoolSize, потоки будут уничтожены по истечении времени keepAliveTime. |
unit | Единица keepAliveTime |
workQueue | Рабочая очередь, которая кэширует задачи, которые были отправлены, но еще не выполнены. |
threadFactory | Фабрика потоков, используемая для создания потоков, не указанная в качестве фабрики потоков по умолчанию DefaultThreadFactory |
handler | политика отказа |
WorkQueue представляет собой очередь, которая отправлена, но не выполнена.Это объект интерфейса BlockingQueue и используется для хранения объектов Runable.В основном он делится на следующие типы:
-
Очередь на прямую подачу:
SynchronousQueue
Очередь, это очередь без емкости. Я объяснил это ранее. Когда пул потоков выполняет операцию предложения постановки в очередь, у него нет емкости, поэтому он напрямую возвращает false и не сохраняет его, а отправляет его непосредственно в потоки выполняются, и если свободных потоков нет, выполняется политика отклонения. -
Ограниченная очередь задач: вы можете использовать
ArrayBlockingQueue
Очередь, поскольку она внутренне реализована на основе массивов, параметр емкости должен быть указан при инициализации.При использовании ограниченной очереди задач, когда есть задачи для отправки, количество потоков в пуле потоков меньше, чем corePoolSize, и новые потоки создаются для выполнения задач, когда количество потоков в пуле потоков больше, чем corePoolSize, отправленные задачи помещаются в очередь.Когда отправленные задачи заполняют очередь, если количество потоков в пуле потоков не превышает maxPoolSize, для выполнения задачи создается новый поток.Если maxPoolSize превышен, выполняется политика отклонения. -
Неограниченная очередь задач: можно использовать
LinkedBlockingQueue
Очередь, основанная на внутреннем связанном списке, длина очереди по умолчаниюInteger.MAX_VALUE
, вы также можете указать длину очереди.Когда очередь заполнена, выполняется операция блокировки.Конечно, пул потоков используетoffer
Метод не блокирует поток, при заполнении очереди возвращает false, при успешном выполнении возвращает true.LinkedBlockingQueue
Когда задача отправляется в пул потоков в очереди, если количество пулов потоков меньше, чем corePoolSize, пул потоков будет генерировать новые потоки для выполнения задачи.Когда количество потоков в пуле потоков больше, чем corePoolSize, отправленные задачи будут помещены в очередь, подождите, пока поток, выполняющий задачу, потребляет задачи в очереди после выполнения задачи.Если есть еще новые задачи для отправки в будущем, но нет бездействующих потоков, он будет продолжать ставить отправленные задачи в очередь до тех пор, пока ресурсы не будут исчерпаны. -
Очередь задач с приоритетом: t Ограниченная очередь задач — это очередь с приоритетом выполнения, которую можно использовать
PriorityBlockingQueue
Очередь может управлять порядком выполнения задач. Это неограниченная очередь. Очередь может выполняться в соответствии с порядком приоритета самих задач. Обеспечивая производительность, она также может иметь хорошую гарантию качества.
Выше объяснено о внутреннем пуле потоков черезThreadPoolExecutor
Для реализации я буду использовать пример для анализа исходного кода:
public class ThreadPoolDemo1 {
public static void main(String[] args) {
ExecutorService executorService = new ThreadPoolExecutor(5,
10,
60L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(5), new CustomThreadFactory());
for (int i = 0; i < 15; i++) {
executorService.execute(() -> {
try {
Thread.sleep(50000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("由线程:" + Thread.currentThread().getName() + "执行任务完成");
});
}
}
}
Пул потоков определен выше.CorePoolSize, инициализированный пулом потоков, равен 5, то есть количество потоков в пуле потоков равно 5, максимальное значение maxThreadPoolSize потока равно 10, а время выживания резервных потоков равно 60 с. используется в качестве очереди блокировки. Я обнаружил, что настроилThreadFactory
Фабрика пулов потоков, здесь я действительно вывожу имя пула потоков при создании потока, исходный код такой:
/**
* 自定义的线程池构造工厂
*/
public class CustomThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
public CustomThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
@Override
public Thread newThread(Runnable r) {
String name = namePrefix + threadNumber.getAndIncrement();
Thread t = new Thread(group, r,
name,
0);
System.out.println("线程池创建,线程名称为:" + name);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
код иDefaultThreadFactory
то же, только вnewThread
При создании нового потока выводится название пула потоков, что удобно для просмотра времени создания потока.main
В метод отправлено 15 задач, вызывающихexecute
метод выполнения задачи отправки, в анализеexecute
Перед методом давайте сначала разберемся в состоянии потока:
//假设Integer类型是32位的二进制表示。
//高3位代表线程池的状态,低29位代表的是线程池的数量
//默认是RUNNING状态,线程池的数量为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//线程个数位数,表示的Integer中除去最高的3位之后剩下的位数表示线程池的个数
private static final int COUNT_BITS = Integer.SIZE - 3;
//线程池的线程的最大数量
//这里举例是32为机器,表示为00011111111111111111111111111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//线程池的状态
// runState is stored in the high-order bits
//11100000000000000000000000000000
//接受新任务并且处理阻塞队列里面任务
private static final int RUNNING = -1 << COUNT_BITS;
//00000000000000000000000000000000
//拒绝新任务但是处理阻塞队列的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
//00100000000000000000000000000000
//拒接新任务并且抛弃阻塞队列里面的任务,同时会中断正在处理的任务
private static final int STOP = 1 << COUNT_BITS;
//01000000000000000000000000000000
//所有任务都执行完(包括阻塞队列中的任务)后当线程池活动线程数为0,将要调用terminated方法。
private static final int TIDYING = 2 << COUNT_BITS;
//01100000000000000000000000000000
//终止状态,terminated方法调用完成以后的状态
private static final int TERMINATED = 3 << COUNT_BITS;
Из приведенного выше содержимого видно, что ctl фактически хранит состояние пула потоков и переменные количества потоков.RUNNING
, это,11100000000000000000000000000000
, здесь мы предполагаем, что Integer на работающей машине 32-битный, потому что некоторые машины могут быть не 32-битными Integer, следующие COUNT_BITS для управления количеством битов, то есть сначала получаем количество битов Integer на платформа, например Это 32 бита, тогда 32 бита - 3 бита = 29 бит, то есть младшие 29 бит представляют собой готовое количество, а верхние 3 бита представляют собой состояние потока.Это хорошо видно что состояние пула потоков ниже осуществляется через младшие биты.Для операции сдвига влево, помимо вышеперечисленных переменных, также предусмотрен метод манипулирования состоянием пула потоков:
// 操作ctl变量,主要是进行分解或组合线程数量和线程池状态。
// 获取高3位,获取线程池状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取低29位,获取线程池中线程的数量
private static int workerCountOf(int c) { return c & CAPACITY; }
// 组合ctl变量,rs=runStatue代表的是线程池的状态,wc=workCount代表的是线程池线程的数量
private static int ctlOf(int rs, int wc) { return rs | wc; }
/*
* Bit field accessors that don't require unpacking ctl.
* These depend on the bit layout and on workerCount being never negative.
*/
//指定的线程池状态c小于状态s
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
//指定的线程池状态c至少是状态s
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
// 判断线程池是否运行状态
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* CAS增加线程池线程数量.
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* CAS减少线程池线程数量
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/**
* 将线程池的线程数量进行较少操作,如果竞争失败直到竞争成功为止。
*/
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
Давайте взглянемThreadPoolExecutor
под объектомexecute
метод:
public void execute(Runnable command) {
// 判断提交的任务是不是为空,如果为空则抛出NullPointException异常
if (command == null)
throw new NullPointerException();
// 获取线程池的状态和线程池的数量
int c = ctl.get();
// 如果线程池的数量小于corePoolSize,则进行添加线程执行任务
if (workerCountOf(c) < corePoolSize) {
//添加线程修改线程数量并且将command作为第一个任务进行处理
if (addWorker(command, true))
return;
// 获取最新的状态
c = ctl.get();
}
// 如果线程池的状态是RUNNING,将命令添加到队列中
if (isRunning(c) && workQueue.offer(command)) {
//二次检查线程池状态和线程数量
int recheck = ctl.get();
//线程不是RUNNING状态,从队列中移除当前任务,并且执行拒绝策略。
//这里说明一点,只有RUNNING状态的线程池才会接受新的任务,其余状态全部拒绝。
if (! isRunning(recheck) && remove(command))
reject(command);
//如果线程池的线程数量为空时,代表线程池是空的,添加一个新的线程。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果队列是满的,或者是SynchronousQueue队列时,则直接添加新的线程执行任务,如果添加失败则进行拒绝
//可能线程池的线程数量大于maximumPoolSize则采取拒绝策略。
else if (!addWorker(command, false))
reject(command);
}
Следующие пункты резюмируются путем анализа метода execute:
- Когда количество потоков в пуле потоков меньше
corePoolSize
, напрямую добавьте поток в пул потоков и выполните текущую задачу в качестве первой задачи. - Если состояние пула потоков
RUNNING
, вы можете принять задачу, поместить задачу в очередь блокировки и выполнить вторичную внутреннюю проверку. Возможно, состояние пула потоков изменилось при запуске следующего содержимого. В это время, если состояние пула потоков становится неRUNNING
, текущая задача удаляется из очереди и применяется политика отклонения. - Если очередь блокировки заполнена или
SynchronousQueue
Когда в этой специальной очереди нет места, новый поток добавляется непосредственно для выполнения задачи.Когда количество потоков в пуле потоков превышаетmaximumPoolSize
соответствующая политика отказа. - Операция постановки в очередь использует
offer
Метод, этот метод не будет блокировать очередь, если очередь заполнена или тайм-аут вызывает сбой очереди, он возвращает false, а если очередь успешна, он возвращает true.
Давайте проанализируем исходный код приведенного выше примера.Очередь блокировки в нашем исходном кодеArrayBlockingQueue
Queue, а длина указанной очереди равна 5, мы видим, что задача представленного ниже пула потоков равна 15, а параметр corePoolSize установлен равным 5 потокам ядра, а максимальное количество потоков (maximumPoolSzie) равно 10 (включая число основных потоков), предполагая, что все задачи отправляются в пул потоков одновременно, 5 задач будут отправлены в поток в качестве первой задачи для выполнения, 5 задач будут добавлены в очередь блокировки, а 5 задач будут передана в поток Когда в пуле будет обнаружено, что блокирующая очередь заполнена, задача будет отправлена напрямую.Обнаружено, что текущее количество потоков на 5 меньше, чем максимальное количество потоков, и новый поток может быть создан для выполнения задачи.Здесь мы просто предполагаем, что все задачи отправлены, потому что мы добавляем Thread.sleep к задаче для сна на некоторое время.После завершения цикла for и отправки задачи, сон задачи может быть завершен, а содержимое задачи выполняется, поэтому его можно рассматривать как все отправленные задачи , но ни одна задача не завершена. Если есть завершенная задача, она может не запускать максимальное количество потоков. Может случиться так, что задача будет удалена из очереди после ее выполнения. завершена, а затем еще одна задача может быть добавлена в очередь, когда она придет.Картинка вышеИз приведенного выше видно, что есть 5 основных потоков ядра, выполняющих задачи, 5 задач в очереди задач ожидают выполнения простаивающих потоков, и есть еще 5 исполняемых потоков. Основные потоки относятся к потокам в диапазоне corePoolSize, а не к основным потокам относятся к потокам, которые больше, чем corePoolSize, но меньше или равны MaximumPoolSize. Эти неосновные потоки не всегда являются активными потоками. Они будут быть уничтожены в соответствии с параметрами, заданными пулом потоков. Если задача не будет отправлена после 60 с, будет выполнена операция уничтожения. Конечно, рабочий поток не указывает, какие потоки должны быть переработаны, а эти потоки должны быть сохранены. определяется путем получения задач из очереди Если поток получает задачи и обнаруживает, что количество потоков в пуле потоков больше, чем corePoolSize, и очередь блокируется Когда она пуста, блокирующая очередь блокируется на 60 секунд, а затем возвращается false, если задачи нет. В это время поток будет освобожден и вызовет
processWorkerExit
Чтобы обработать выход из потока, давайте проанализируем его далееaddWorker
Что они сделали:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
//获取线程池的状态和线程池线程的数量
int c = ctl.get();
//单独获取线程池的状态
int rs = runStateOf(c);
//检查队列是否只在必要时为空
if (rs >= SHUTDOWN && //线程池的状态是SHUTDOWN、STOP、TIDYING、TERMINATED
! (rs == SHUTDOWN && //可以看做是rs!=SHUTDOWN,线程池状态为STOP、TIDYING、TERMINATED
firstTask == null && //可以看做firstTask!=null,并且rs=SHUTDOWN
! workQueue.isEmpty())) //可以看做rs=SHUTDOWN,并且workQueue.isEmpty()队列为空
return false;
//循环CAS增加线程池中线程的个数
for (;;) {
//获取线程池中线程个数
int wc = workerCountOf(c);
//如果线程池线程数量超过最大线程池数量,则直接返回
if (wc >= CAPACITY ||
//如果指定使用corePoolSize作为限制则使用corePoolSize,反之使用maximumPoolSize,最为工作线程最大线程线程数量,如果工作线程大于相应的线程数量则直接返回。
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS增加线程池中线程的数量
if (compareAndIncrementWorkerCount(c))
//跳出增加线程池数量。
break retry;
//如果修改失败,则重新获取线程池的状态和线程数量
c = ctl.get(); // Re-read ctl
//如果最新的线程池状态和原有县城出状态不一样时,则跳转到外层retry中,否则在内层循环重新进行CAS
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//工作线程是否开始启动标志
boolean workerStarted = false;
//工作线程添加到线程池成功与否标志
boolean workerAdded = false;
Worker w = null;
try {
//创建一个Worker对象
w = new Worker(firstTask);
//获取worker中的线程,这里线程是通过ThreadFactory线程工厂创建出来的,详细看下面源码信息。
final Thread t = w.thread;
//判断线程是否为空
if (t != null) {
//添加独占锁,为添加worker进行同步操作,防止其他线程同时进行execute方法。
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//获取线程池的状态
int rs = runStateOf(ctl.get());
//如果线程池状态为RUNNING或者是线程池状态为SHUTDOWN并且第一个任务为空时,当线程池状态为SHUTDOWN时,是不允许添加新任务的,所以他会从队列中获取任务。
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//添加worker到集合中
workers.add(w);
int s = workers.size();
//跟踪最大的线程池数量
if (s > largestPoolSize)
largestPoolSize = s;
//添加worker成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//如果添加worker成功就启动任务
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//如果没有启动,w不为空就已出worker,并且线程池数量进行减少。
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
через вышеуказанноеaddWorker
Этот метод можно объяснить в двух частях. Первая часть заключается в увеличении количества потоков в пуле потоков через CAS. В первой части есть оператор if. Это место посвящено анализу:
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
Это можно рассматривать как следующее, т.!
Поместите это в скобки, и это будет выглядеть так:
if (rs >= SHUTDOWN &&
(rs != SHUTDOWN ||
firstTask != null ||
workQueue.isEmpty()))
return false;
- Статус пула потоков SHUTDOWN, STOP, TIDYING, TERMINATED
- Когда пул потоков находится в состоянии STOP, TIDYING, TERMINATED, операции добавления и запуска потока в этих состояниях не требуются, потому что в указанном выше состоянии поток пула потоков фактически уничтожает поток, а это означает, что поток вызывает shutdownNow, и т.п. метод.
- Если состояние пула потоков SHUTDOWN и первая задача не пуста, он не будет принимать новые задачи и возвращать false напрямую, то есть в состоянии SHUTDOWN новые задачи приниматься не будут, а только незавершенные задачи в очередь будет работать.
- Когда статус пула потоков SHUTDOWN и очередь пуста, возвращайтесь напрямую, не добавляя задачи.
Верхняя часть разделена на два цикла: внутренний и внешний цикл. Внешний цикл оценивает состояние пула потоков, чтобы решить, необходимо ли добавлять потоки рабочих задач. Он оценивается по содержимому, упомянутому выше. Последний внутренний цикл чтобы увеличить количество потоков в операции CAS. Если указаноcore
Параметр имеет значение true, что означает, что количество потоков в пуле потоков не превышаетcorePoolSize
, когда указано false, это означает, что количество потоков в пуле потоков достиглоcorePoolSize
, и очередь заполнена, илиSynchronousQueue
Такая безместная очередь, но еще не достигла максимального пула потоковmaximumPoolSize
, так что это будет внутреннеcore
Параметры для определения превышения максимального предела. Если он превышается, поток не может быть добавлен, и реализуется стратегия отклонения. Если он не превышается, количество потоков будет увеличено.
Вторая часть предназначена в основном для добавления задач в рабочий процесс и запуска потока.Здесь мы сначала рассмотрим объект Worker.
// 这里发现它是实现了AQS,是一个不可重入的独占锁模式
// 并且它还集成了Runable接口,实现了run方法。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
/** 执行任务的线程,通过ThreadFactory创建 */
final Thread thread;
/** 初始化第一个任务*/
Runnable firstTask;
/** 每个线程完成任务的数量 */
volatile long completedTasks;
/**
* 首先现将state值设置为-1,因为在AQS中state=0代表的是锁没有被占用,而且在线程池中shutdown方法会判断能否争抢到锁,如果可以获得锁则对线程进行中断操作,如果调用了shutdownNow它会判断state>=0会被中断。
* firstTask第一个任务,如果为空则会从队列中获取任务,后面runWorker中。
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** 委托调用外部的runWorker方法 */
public void run() {
runWorker(this);
}
//是否独占锁
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
//这里就是上面shutdownNow中调用的线程中断的方法,getState()>=0
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Видно, что Worker — это блокировка, реализующая AQS, это нереентерабельная монопольная блокировка, и он же реализуетRunnable
интерфейс, реализованныйrun
метод, поместите AQS в конструкторstate
Установить как-1
, чтобы избежать еще не введенных темrunWorker
метод перед вызовомshutdown
илиshutdownNow
метод, будет прерван, установленный в -1 не будет прерван. Позже мы видимrun
метод, который вызываетThreadPoolExecutor
изrunWorker
метод, напомним здесь, вaddWorker
метод, добавитьworker
прибытьHashSet<Worker>
После середины он будетworkerAdded
Установите значение true, чтобы добавитьworker
Успех, следующий код вызывается позже:
if (workerAdded) {
t.start();
workerStarted = true;
}
Этот t представляет использование в конструкторе WorkerThreadFactory
Созданный поток и передавший себя (сам Worker) текущему потоку, созданный поток является потоком задачи, который будет вызываться при запуске потока задачиWorker
внизrun
метод,run
Метод внутри и делегирование внешнему методуrunWorker
Для работы его параметры передаются самой вызывающей стороной,Worker
серединаrun
Метод выглядит следующим образом:
public void run() {
runWorker(this); //this指Worker对象本身
}
Вот простая картинка для представления логики следующего вызова.
Общая логика заключается в том, чтобы сначала создать поток, а потокWorker
Установите исполнителя и заполните поток вWorker
, а затем вынуть поток в Worker в addWorker и начать операцию.После запуска он вызовет метод run в Worker, а затем метод run вызовет runWorker ThreadPoolExecutor, а затем runWorker вызовет метод задача firstTask в Worker.Этот fistTask является реальной задачей, которая должна быть выполнена, а также логикой кода, реализованной пользователем.
Далее давайте взглянем на конкретное содержимое метода runWorker:
final void runWorker(Worker w) {
//调用者也就是Worker中的线程
Thread wt = Thread.currentThread();
//获取Worker中的第一个任务
Runnable task = w.firstTask;
//将Worker中的任务清除代表执行了第一个任务了,后面如果再有任务就从队列中获取。
w.firstTask = null;
//这里还记的我们在new Worker的时候将AQS的state状态设置为-1,这里先进行解锁操作,将state设置为0
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//循环进行获取任务,如果第一个任务不为空,或者是如果第一个任务为空,从任务队列中获取任务,如果有任务则返回获取的任务信息,如果没有任务可以获取则进行阻塞,阻塞也分两种第一种是阻塞直到任务队列中有内容,第二种是阻塞队列一定时间之后还是没有任务就直接返回null。
while (task != null || (task = getTask()) != null) {
//先获取worker的独占锁,防止其他线程调用了shutdown方法。
w.lock();
// 如果线程池正在停止,确保线程是被中断的,如果没有则确保线程不被中断操作。
if ((runStateAtLeast(ctl.get(), STOP) || //如果线程池状态为STOP、TIDYING、TERMINATED直接拒绝任务中断当前线程
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//执行任务之前做一些操作,可进行自定义
beforeExecute(wt, task);
Throwable thrown = null;
try {
//运行任务在这里喽。
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//执行任务之后做一些操作,可进行自定义
afterExecute(task, thrown);
}
} finally {
//将任务清空为了下次任务获取
task = null;
//统计当前Worker完成了多少任务
w.completedTasks++;
//独占锁释放
w.unlock();
}
}
completedAbruptly = false;
} finally {
//处理Worker的退出操作,执行清理工作。
processWorkerExit(w, completedAbruptly);
}
}
Мы видим, что если Worker запущен в первый раз, он получит задачу firstTask от Worker для выполнения, а затем, после успешного выполнения, вызовет getTask() для получения задачи из очереди, это место более Интересно, это осуществляется по ситуации Чтобы получить задачи, у нас есть пока BlockingQueue предоставляет несколько методов для получения из очереди.В этом getTask используются два метода.Первый - использовать опрос для получения информации в очереди. Когда в очереди еще нет задачи, возвращается null напрямую, а затем есть еще один метод take.Метод take заключается в том, чтобы заблокировать текущий поток, если в очереди нет задачи.После ожидания задачи в очереди, он уведомит ожидающий поток очереди об использовании задачи.Давайте взглянем на метод getTask:
private Runnable getTask() {
boolean timedOut = false; //poll获取超时
for (;;) {
//获取线程池的状态和线程数量
int c = ctl.get();
//获取线程池的状态
int rs = runStateOf(c);
//线程池状态大于等于SHUTDOWN
//1.线程池如果是大于STOP的话减少工作线程池数量
//2.如果线程池状态为SHUTDOW并且队列为空时,代表队列任务已经执行完,返回null,线程数量减少1
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//获取线程池数量。
int wc = workerCountOf(c);
//如果allowCoreThreadTimeOut为true,则空闲线程在一定时间未获得任务会清除
//或者如果线程数量大于corePoolSize的时候会进行清除空闲线程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//1.如果线程池数量大于最大的线程池数量或者对(空余线程进行清除操作并且poll超时了,意思是队列中没有内容了,导致poll间隔一段时间后没有获取内容超时了。
//2.如果线程池的数量大于1或者是队列已经是空的
//总之意思就是当线程池的线程池数量大于corePoolSize,或指定了allowCoreThreadTimeOut为true,当队列中没有数据或者线程池数量大于1的情况下,尝试对线程池的数量进行减少操作,然后返回null,用于上一个方法进行清除操作。
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//如果timed代表的是清除空闲线程的意思
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //等待一段时间如果没有获取到返回null。
workQueue.take(); //阻塞当前线程
//如果队列中获取到内容则返回
if (r != null)
return r;
//如果没有获取到超时了则设置timeOut状态
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
- Рабочий поток вызывает getTask, чтобы получить задачу из очереди.
- Если указано значение allowCoreThreadTimeOut или количество потоков в пуле потоков больше, чем corePoolSize, то лишние простаивающие потоки будут очищены, будет вызван метод poll очереди блокировки, и если за указанное время не будет получено ни одной задачи, будет установлено значение false возвращаться напрямую.
- Если количество пулов потоков в пуле потоков меньше, чем corePoolSize, или значение по умолчанию для allowCoreThreadTimeOut равно false, блокирующий поток будет получать задачи из очереди до тех пор, пока в очереди не появится задача для пробуждения потока.
Мы еще помним, что на первой картинке отмечены core thread и normal thread.На самом деле маркировка не очень точная.Это означает, что если количество пулов потоков превышает corePoolSize и не указано значение allowCoreThreadTimeOut, то он будет очищаться количество потоков больше, чем corePoolSize.И некоторые потоки меньше или равно максимальномуPoolSize, пометка основного потока означает, что corePoolSize не будет очищен, но будет очищен поток больше, чем corePoolSize, то есть поток в пул потоков оценивает, когда задача получена, то есть выполняется в getTask.Суждение, если количество потоков в текущем пуле потоков больше, чем corePoolSize, используйте метод опроса для получения задач в очереди. нет задачи в течение определенного периода времени, он вернет значение null. После возврата к значению null установите timeOut = true, и get getTask также вернет значение null. Это перейдет к методу runWorker вызывающей стороны, всегда вwhile (task != null || (task = getTask()) != null)
В это время getTask возвращает null и выпрыгивает из оператора цикла while.Установите completeAbruptly = false, что означает, что он не завершается внезапно, а завершается нормально.После выхода он будет выполняться, наконецprocessWorkerExit(w, completedAbruptly)
, чтобы выполнить очистку. Давайте посмотрим на исходный код:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 如果突然完成则调整线程数量
decrementWorkerCount(); // 减少线程数量1
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); //获取锁,同时只有一个线程获得锁
try {
completedTaskCount += w.completedTasks; //统计整个线程池完成的数量
workers.remove(w); //将完成任务的worker从HashSet中移除
} finally {
mainLock.unlock(); //释放锁
}
//尝试设置线程池状态为TERMINATED
//1.如果线程池状态为SHUTDOWN并且线程池线程数量与工作队列为空时,修改状态。
//2.如果线程池状态为STOP并且线程池线程数量为空时,修改状态。
tryTerminate();
// 获取线程池的状态和线程池的数量
int c = ctl.get();
// 如果线程池的状态小于STOP,也就是SHUTDOWN或RUNNING状态
if (runStateLessThan(c, STOP)) {
//如果不是突然完成,也就是正常结束
if (!completedAbruptly) {
//如果指定allowCoreThreadTimeOut=true(默认false)则代表线程池中有空余线程时需要进行清理操作,否则线程池中的线程应该保持corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//这里判断如果线程池中队列为空并且线程数量最小为0时,将最小值调整为1,因为队列中还有任务没有完成需要增加队列,所以这里增加了一个线程。
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//如果当前线程数效益核心个数,就增加一个Worker
addWorker(null, false);
}
Из приведенного выше исходного кода можно сделать вывод, что если количество потоков превышает количество основных потоков,runWorker
Он не будет ждать сообщений в очереди, а выполнит операцию очистки.Приведенный выше код очистки сначала выполняет меньше операций над количеством пулов потоков, а затем подсчитывает количество выполненных задач во всем пуле потоков, а затем пытается изменить Состояние пула потоков задаетсяSHUTDOWN->TIDYING->TERMINATED
или поSTOP->TIDYING->TERMINATED
, измените состояние пула потоков наTERMINATED
, необходимы два условия:
-
Когда количество потоков в пуле потоков и рабочей очереди пусты, а состояние пула потоков равно
SHUTDOWN
Когда состояние изменяется, процесс модификацииSHUTDOWN->TIDYING->TERMINATED
-
Когда состояние пула потоков
STOP
И когда количество пулов потоков пусто, он попытается изменить состояние.Процесс модификацииSTOP->TIDYING->TERMINATED
Если установленоTERMINATED
состояние, также необходимо вызвать переменную условияtermination
изsignalAll()
способ разбудить всех с момента вызоваawaitTermination
метод, пока поток заблокирован, другими словами, при вызовеawaitTermination
После того, как состояние пула потоков станет TERMINATED, он будет разбужен.
Далее мы проанализируем этоtryTerminate
метод и посмотрите, соответствует ли он тому, что мы сказали выше:
final void tryTerminate() {
for (;;) {
// 获取线程池的状态和线程池的数量组合状态
int c = ctl.get();
//这里单独下面进行分析,这里说明两个问题,需要反向来想这个问题。
//1.如果线程池状态STOP则不进入if语句
//2.如果线程池状态为SHUTDOWN并且工作队列为空时,不进入if语句
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//如果线程池数量不为空时,进行中断操作。
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//修改状态为TIDYING,并且将线程池的数量进行清空
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//执行一些逻辑,默认是空的
terminated();
} finally {
//修改状态为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
//唤醒调用awaitTermination方法的线程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
Мы извлекаем приведенный выше оператор if отдельно для анализа и изменяем первое суждение if выше следующим образом. Вы можете видеть, что возврат находится в else. В это время внутреннее суждение if преобразуется в следующее:
if (!isRunning(c) &&
!runStateAtLeast(c, TIDYING) && //只能是SHUTDOWN和STOP
(runStateOf(c) != SHUTDOWN || workQueue.isEmpty())){
//这里执行逻辑
}else {
return;
}
Содержание анализа по отдельности выглядит следующим образом:
-
!isRunning(c)
Представитель НЕ БЕГАЕТ, значит возможноSHUTDOWN
,STOP
,TIDYING
,TERMINATED
Эти четыре состояния -
Средний разъем - это и означает, за которым следует
runStateAtLeast(c, TIDYING)
Это означает, по крайней мереTIDYING
,TERMINATED
Эти два, в свою очередь, могут бытьRUNNING
,SHUTDOWN
,STOP
, но было решено прежде, что это не может бытьRUNINNG
состоянии, поэтому первые два соединенных вместе могут находиться только в состоянииSHUTDOWN
,STOP
-
runStateOf(c) != SHUTDOWN || workQueue.isEmpty()
когда предыдущее состояниеSHUTDOWN
, отправитсяworkQueue.isEmpty()
, вместе, государствоSHUTDOWN
и рабочая очередь пуста, когда состояние пула потоковSTOP
, он войдет вrunStateOf(c) != SHUTDOWN
, вернуть true напрямую, это означает, что состояние пула потоковSTOP
Также есть оператор и оператор if для его преобразования.Логика следующая:
if (workerCountOf(c) == 0) {
//执行下面的逻辑
}else{
interruptIdleWorkers(ONLY_ONE);
return;
}
Здесь мы также конвертируем, мы видим, что когда количество пулов потоков пусто, будет выполняться следующая логика.Следующая логика заключается в изменении состояния пула потоков наTERMINATED
, два соединенных вместе означают, что модифицированное состояние, проанализированное выше, являетсяTERMINATED
условия, вот изображение, представляющее информацию о состоянии пула потоков:
Фактически, на приведенном выше рисунке мы представили оSHUTDOWN
илиSTOP
прибытьTERMINATED
изменения, не объясняя, как перейти отRUNNING
состояние меняется наSHUTDOWN
илиSTOP
статус, на самом деле называетсяshutdown()
илиshutdownNow
Метод меняет свое состояние, давайте посмотримshutdown
Исходный код метода:
public void shutdown() {
//获取全局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//权限检查
checkShutdownAccess();
//设置线程池状态为SHUTDOWN,如果状态已经是大于等于SHUTDOWN则直接返回
advanceRunState(SHUTDOWN);
//如果线程没有设置中断标识并且线程没有运行则设置中断标识
interruptIdleWorkers();
//空的可以实现的内容
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//尝试修改线程池状态为TERMINATED
tryTerminate();
}
- Во-первых, выполните обнаружение разрешений в текущем потоке, чтобы увидеть, установлен ли диспетчер безопасности.Если он установлен, это зависит от того, есть ли у текущего потока, вызывающего завершение работы, разрешение на закрытие потока.Если у него есть готовое разрешение на прерывание работы.Выбрасывает, если нет разрешения
SecurityException
илиNullPointException
аномальный. - Установите состояние пула потоков в SHUTDOWN и вернитесь напрямую, если состояние уже больше или равно SHUTDOWN.
- Если поток не имеет установленного флага прерывания и поток не выполняется, то устанавливается флаг прерывания.
- Попробуйте изменить состояние пула потоков на TERMINATED
Далее давайте посмотримadvanceRunState
Содержание следующее:
private void advanceRunState(int targetState) {
for (;;) {
//获取线程池状态和线程池的线程数量
int c = ctl.get();
if (runStateAtLeast(c, targetState) || //如果线程池的状态>=SHUTDOWN
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) //设置线程池状态为SHUTDOWN
//返回
break;
}
}
- Когда статус пула потоков >= SHUTDOWN, возвращайтесь напрямую
- Если состояние пула потоков RUNNING, установите состояние пула потоков в SHUTDOWN и вернитесь, если настройка выполнена успешно.
interruptIdleWorkers
Код выглядит следующим образом:
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
//获取全局锁,同时只能有一个线程能够调用shutdown方法
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//遍历工作线程
for (Worker w : workers) {
Thread t = w.thread;
//如果当前线程没有设置中断标志并且可以获取Worker自己的锁
if (!t.isInterrupted() && w.tryLock()) {
try {
//设置中断标志
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
//执行一次,清理空闲线程。
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
Мы видим, что при вызове метода shutdown мы устанавливаем флаг прерывания только для неработающего потока, то есть поток, активно выполняющий задачу, не устанавливает флаг прерывания, и работа потока будет постепенно очищаться до тех пор, пока все задачи выполняются.Также помним, что в методе getTask есть такой кусок кода:
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
Определите, находится ли статус >=SHUTDOWN, и когда очередь пуста, уменьшите количество пулов потоков и выполняйте внутренние операции CAS до тех пор, пока операция CAS не будет успешной и не вернет значение null, после возврата значения null он вызоветprocessWorkerExit(w, false);
Очистите информацию о потоке рабочих и попытайтесь установить поток наTERMINATED
статус, выше для всехshutdown
Анализ метода, давайте посмотрим на следующееshutdownNow
метод и сравните разницу между ними:
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//权限检查
checkShutdownAccess();
//设置线程池状态为STOP,如果状态已经是大于等于STOP则直接返回
advanceRunState(STOP);
//这里是和SHUTDOWN区别的地方,这里是强制进行中断操作
interruptWorkers();
//将为完成任务复制到list集合中
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//尝试修改线程池状态为TERMINATED
tryTerminate();
return tasks;
}
shutdownNow
Метод возвращает список информации о незавершенной задаче.tasks = drainQueue();
, по сути метод иshutdown
Основные отличия методов заключаются в следующем:
-
shutdownNow
метод устанавливает состояние пула потоков вSTOP
,иshutdown
затем измените состояние наSHUTDOWN
-
shutdownNow
Метод прерывает рабочую задачу, то есть если рабочий поток работает, он также будет прерван, иshutdown
Это сначала попытаться получить блокировку.Если блокировка успешно получена, устанавливается флаг прерывания, то есть операция прерывания.Если блокировка не получена, она автоматически выйдет после ожидания завершения. -
shutdownNow
Метод возвращает список незавершенных задач.
Код нижеshutDownNow
изinterruptWorkers
метод:
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
//直接进行中断操作。
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
звонил внутреннеWorker
изinterruptIfStarted
Метод, метод внутренне прерывается для потоков, но предварительным условием для прерывания является то, что состояние состояния AQS должно быть больше или равно 0. Если состояние равно -1, оно не будет прервано, но если задача выполняется вrunWorker
не будет выполнять задачу, потому что состояние пула потоковSTOP
, если состояние пула потоков STOP, поток будет прерван.Следующий код находится в WorkerinterruptIfStarted
:
void interruptIfStarted() {
Thread t;
//当前Worker锁状态大于等于0并且线程没有被中断
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
политика отказа
Встроенная политика отклонения JDK выглядит следующим образом:
- Политика AbortPolicy: эта политика напрямую вызывает исключение, препятствуя правильной работе системы.
- Политика CallerRunsPolicy: пока пул потоков не закрыт, состояние пула потоков RUNNING, необходимо напрямую вызвать поток для запуска текущей отброшенной задачи.
- Стратегия DiscardOledestPolicy: эта стратегия отбрасывает самый старый запрос, который является первой задачей, которая должна быть выполнена, и пытается снова отправить задачу.
- Политика DiscardPolicy: эта политика автоматически отбрасывает задачи, которые не могут быть обработаны без какой-либо обработки.
Суммировать
Прежде всего, давайте подытожим картину сначала:
- Основной поток вызывает пул потоков, а пул потоков выполняет метод execute.
- пул потоков через
addWorker
Создайте поток и поместите поток в пул потоков. Здесь мы видим, что вторым шагом является добавление потока в основной поток. На самом деле в пуле потоков нет основного и неосновного потока. устанавливается в соответствии с corePoolSize и maxPoolSize Размер используется для различения, потому что потоки, превышающие размер corePoolSize, будут переработаны. больше, чем corePoolSize, или указанное количество потоков.allowCoreThreadTimeOut
Если это правда, он вернется после ожидания в течение определенного периода времени, а не будет ждать все время - Когда количество пулов потоков достигает corePoolSize, пул потоков сначала добавляет задачи в очередь.
- Когда задача в очереди также достигает максимального значения, установленного очередью, она создаст новый поток.Обратите внимание, что количество потоков в это время превысило corePoolSize, но не достигло максимального значения maxPoolSize.
- Когда количество потоков в пуле потоков достигает максимального размера пула, политика будет соответственно отклонена.
Друзья, которым это нравится, могут следить за моей публичной учетной записью WeChat и время от времени публиковать статьи.