предисловие
Я считаю, что все в большей или меньшей степени использовали потоки в проектах, а потоки — это ценные ресурсы, которые нельзя создавать часто и которые следует повторно использовать для других задач, поэтому у нас есть свой пул потоков.
Использование пула потоков
Вы знаете, как мы можем создать пул потоков?
Конечно, я это знаю, JDK в основном предоставляет три метода создания пулов потоков.
- Executors.newFixedThreadPool(int nThreads): создать пул потоков с фиксированным количеством потоков.
- Executors.newSingleThreadExecutor(): создает пул потоков из одного потока.
- Executors.newCachedThreadPool(): создает пул потоков «неограниченного размера».
Как использовать пул потоков
ExecutorService threadPool = Executors.newFixedThreadPool(5);
threadPool.execute(() -> {
System.out.println("执行任务");
});
threadPool.shutdown();
Принцип пула потоков
Можете ли вы рассказать мне принцип пула потоков?
Основные параметры и состояние пула потоков
Упомянутый выше метод создания пула потоков на самом деле реализуется путем создания класса ThreadPoolExecutor, поэтому мы можем непосредственно посмотреть на принцип реализации этого класса.
Во-первых, давайте посмотрим на его метод построения
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
Поговорим о значении этих основных параметров
- corePoolSize : количество основных потоков в пуле потоков
- maxPoolSize : максимальное количество потоков в пуле потоков
- keepAliveTime : Период ожидания неосновных потоков.Когда время простоя неосновных потоков в системе превышает keepAliveTime, они будут перезапущены. Если для свойства allowCoreThreadTimeOut ThreadPoolExecutor установлено значение true, этот параметр также представляет период ожидания основного потока.
- unit : блок тайм-аута
- workQueue: очередь задач в пуле потоков, которая в основном используется для хранения задач, которые были отправлены, но еще не выполнены.
- обработчик : стратегия обработки, когда пул потоков не может обработать задачу.
Конечно, простое знание этих параметров не имеет большого значения, нам все равно придется смотреть на класс ThreadPoolExecutor глобально.
Прежде всего, давайте разберемся с состояниями, определенными в пуле потоков, они проходят через все тело.
// ctl存储了两个值,一个是线程池的状态,
// 另一个是活动线程数(workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池最多允许2^29-1个(大概5亿)线程存在,
// 当然首先要你的系统能新建这么多个
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
Из определения приведенных выше переменных-членов мы можем узнать, что пул потоков допускает до 500 миллионов (2 ^ 29-1) действий потоков, так почему бы не 2 ^ 31-1? Поскольку разработчик считает, что это значение достаточно велико, Если вы считаете, что это узкое место в будущем, он будет заменен типом Long.
В то же время в пуле потоков есть пять состояний, которые решают жизненный цикл пула потоков.
- РАБОТАЕТ: Состояние работы. Принимать новые задачи и обрабатывать задачи в очереди
- SHUTDOWN : Состояние отключения (вызывается метод отключения). Не принимать новые задачи, а обрабатывать задачи в очереди
- STOP : состояние остановки (вызывается метод shutdownNow). Не принимать новые задачи, не обрабатывать задачи в очереди и прерывать обработку задачи
- TIDYING: все задачи были завершены, workerCount (статус активного завершения, когда выполняется terminated(), он будет обновлен до этого номера потока статуса) равен 0, после того, как пул потоков перейдет в этот статус, он вызовет метод terminated() войти в статус TERMINATED
- TERMINATED: завершенное состояние, которое будет обновлено до этого состояния, когда будет выполнено Terminated().
Схема потока состояний выглядит следующим образом:
принцип
Мы знаем, что когда мы выполняем задачу, вызывается метод execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 1. 如果工作线程数小于核心线程数(corePoolSize),则创建一个工作线程执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2. 如果当前是running状态,并且任务队列能够添加任务
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 2.1 如果不处于running状态了(使用者可能调用了shutdown方法),
// 则将刚才添加到任务队列的任务移除
if (! isRunning(recheck) && remove(command))
reject(command);
// 2.2 如果当前没有工作线程,
// 则新建一个工作线程来执行任务(任务已经被添加到了任务队列)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 队列已经满了的情况下,则新启动一个工作线程来执行任务
else if (!addWorker(command, false))
reject(command);
}
В методе addWorker все еще есть некоторая необходимая логика оценки, например, находится ли текущий пул потоков в нерабочем состоянии, пуста ли очередь и т. д. рабочих потоков превышает максимальный размер пула и запускает рабочие потоки для выполнения задач.
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
for (;;) {
int wc = workerCountOf(c);
// 1. 判断当前工作线程是否满足条件
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 2. 增加工作线程数量
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
// 3. 创建工作线程
w = new Worker(firstTask);
final Thread t = w.thread;
workers.add(w);
if (workerAdded) {
// 4. 运行工作线程
t.start();
workerStarted = true;
}
return workerStarted;
}
Итак, подводя итогРабочий процесс пула потоковследующее:
- Отправить задачу, если количество рабочих потоков в пуле потоков меньше, чем corePoolSize, создать новый рабочий поток для выполнения задачи
- Если текущий рабочий поток пула потоков уже равен corePoolSize, новая задача помещается в рабочую очередь для выполнения.
- Если рабочая очередь заполнена и количество рабочих потоков меньше maxPoolSize, создайте новый рабочий поток для выполнения задачи.
- Если количество рабочих потоков в текущем пуле потоков достигло максимального размера пула и новая задача не может быть помещена в очередь задач, для соответствующей обработки будет использоваться соответствующая стратегия (по умолчанию используется стратегия отклонения).
Если вы думаете, что вышеизложенное трудно запомнить, я расскажу вам историю о ресторане с горячими горшками, и вы лучше поймете.
Раньше был ресторан с горячими горшками под названием Zhu Shuai Shuai Hot Pot. Босс - красивый молодой человек, который только что бросил работу программиста и начал свой собственный бизнес. Ресторан с горячими горшками невелик и может иметь только 10 столов ( corePoolSize).Заходите в магазин посидеть (в магазине есть кондиционер), приходите поздно, магазин полон, а люди, которые придут позже, выстроятся в очередь (workQueue).
Очередь росла, и, увидев, что выхода нет, Чжу Шуайшуай поставил снаружи несколько временных столов (непрофильные рабочие потоки), чтобы гости могли поесть на улице. Если кто-то в магазине закончил есть или временный стол снаружи закончил есть, пусть люди в очереди едят. Когда было поздно и очереди уже не было, начальник попросил людей убрать временный столик на улицу, ведь на улице было не очень хорошо, и он боялся, что придет руководство города. Если бизнес идет очень хорошо и приходит слишком много людей, это выходит за рамки возможностей ресторана с горячими горшками, и вы можете только попросить их пойти в другое место.
Приведенную выше историю нужно тщательно распробовать, и в конце концов вы обнаружите, что код исходит из жизни.
Выше говорилось о рабочих потоках, что, черт возьми, такое рабочий поток? На самом деле рабочий поток относится к нашему классу Worker, который является приватным классом в ThreadPoolExecutor.
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
// 运行worker的线程(new Thread(this))
final Thread thread;
// 需要执行的任务
Runnable firstTask;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
}
Видно, что Woker не только наследует AbstractQueuedSynchronizer (который реализует функцию эксклюзивной блокировки), но и реализует интерфейс Runnable.
Фактически, каждый поток в пуле потоков инкапсулирован в объект Worker, а то, что поддерживает ThreadPool, на самом деле является набором объектов Worker.
Worker использует себя в качестве задачи для создания потока и в то же время назначает внешнюю задачу своей собственной переменной-члену задачи, что эквивалентно переносу задачи.
Метод addWorker() выполняет worker.thread.start(), который фактически выполняет метод runWorker Worker.
final void runWorker(Worker w) {
// 1. 获取任务开始执行任务,如果获取不到任务,当前的worker就会被JVM回收
while (task != null || (task = getTask()) != null) {
task.run();
}
}
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 1. 判断线程池是否关闭
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 2. 判断是否需要进行超时控制。
// allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时;
// wc > corePoolSize,表示当前线程池中的工作线程数量大于核心线程数量;
// 对于超过核心线程数量的这些线程,需要进行超时控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 3. wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法;
// timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
// 4. timed如果为true,则通过阻塞队列的poll方法进行超时控制,
//如果在keepAliveTime时间内没有获取到任务,则返回null。如果为false,则直接阻塞
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
}
}
Особо следует отметить в приведенном выше коде пункт 3 в getTask() , который предназначен для управления количеством рабочих потоков, доступных для пула потоков. Из предыдущего анализа мы можем знать, что если количество рабочих потоков в текущем пуле потоков превышает corePoolSize и меньше maxPoolSize, а workQueue заполнена, рабочие потоки могут быть добавлены, но если задача не получена timeout, то есть если timedOut имеет значение true В данном случае это означает, что workQueue уже пуста, а значит в пуле потоков больше нет потоков для выполнения задач.Можно уничтожить больше потоков, чем corePoolSize и сохранить количество потоки в corePoolSize.
политика отказа
Вы только что упомянули о стратегии отказа. Какая у вас есть стратегия отказа?
Существует четыре основных типа стратегий отказа:
- AbortPolicy : всегда выдает исключение RejectedExecutionException
- CallerRunsPolicy : если пул потоков не закрыт, он будет передан потоку, вызывающему пул потоков для выполнения.
- DiscardPolicy: отменить задачу напрямую и ничего не делать.
- DiscardOldestPolicy : отбросить самую старую задачу в очереди, а затем повторно отправить текущую задачу в пул потоков.
Различия между пулами потоков, созданными исполнителями
Можете ли вы сказать мне разницу между пулами потоков, созданными Исполнителями, о которых вы упоминали в начале?
newFixedThreadPool
Пул потоков с фиксированным числом потоков, corePoolSize равен maxPoolSize, а используемая блокирующая очередь — LinkedBlockingQueue, которая представляет собой неограниченную очередь. постоянно добавляться в очередь, что может привести к потере памяти.
newSingleThreadExecutor
Создайте пул потоков из одного потока, corePoolSize = maxPoolSize = 1, а также используйте неограниченную очередь LinkedBlockingQueue.Когда количество задач велико и пул потоков слишком поздно обрабатывается, это может привести к переполнению памяти.
newCachedThreadPool
Создайте кешируемый пул потоков, corePoolSize = 1, maxPoolSize = Interger.MAX_VALUE, но используйте SynousQueue, эта очередь особенная, нет внутренней структуры для хранения каких-либо элементов, поэтому при большом количестве задач созданный поток (corePoolSize = 1) Если задача долго не обрабатывается, всегда будет создаваться поток для ее обработки, а также есть вероятность ООМ.
Кэш в cacheThreadPool на самом деле относится к SynousQueue.При вставке данных в эту очередь, если нет задачи для выборки, процесс вставки будет заблокирован.
Поскольку вы сказали, что существует возможность OOM, как вам создать пул потоков?
В реальных условиях не рекомендуется создавать пул потоков через Executors, а черезnew ThreadPoolExecutor
Не рекомендуется использовать неограниченные очереди, а использовать ограниченные очереди, такие как ArrayBlockingQueue. И стратегия отказа зависит от ваших собственных потребностей (если предоставленная система не устраивает, напишите ее сами)
В то же время настройка количества потоков ядра не настолько велика, насколько это возможно, можно сказать только, что это значение устанавливается в соответствии с вашими потребностями, вообще говоря, его разумно можно настроить в соответствии со следующими двумя пунктами.
- Для задач с интенсивным вводом-выводом вы можете увеличить количество потоков, например количество ЦП * 2.
- Для вычислительных задач (большое количество операций в памяти) можно задать меньшее количество потоков, например, равное количеству ЦП
Конечно, это соображение имеет много аспектов, связанных не только с программой, но и с аппаратными и другими ресурсами, короче, это больше отладка во время теста.
Все, не думайте, что то, что я сказал выше, ерунда, пожалуйста, будьте уверены и уберите эту мысль.