Пул потоков
Что такое пул потоков?
Проще говоря, пул потоков относится к созданию нескольких потоков заранее.Когда есть задача для обработки, поток в пуле потоков будет обрабатывать задачу.После завершения обработки поток не будет уничтожен, а продолжится ждать следующего задания. Поскольку создание и удаление потоков потребляют системные ресурсы, когда бизнесу необходимо часто создавать и уничтожать потоки, рассмотрите возможность использования пулов потоков для повышения производительности системы.
Что может пул потоков?
В The Art of Java Concurrent Programming использование пулов потоков может помочь:
- Снизить потребление ресурсов. За счет повторного использования уже созданных потоков потребление, вызванное созданием и уничтожением потоков, может быть уменьшено.
- Улучшить отзывчивость. Когда задача поступает, ее можно выполнить немедленно, не дожидаясь создания потока.
- Улучшить управляемость потоками. Потоки являются дефицитными ресурсами. Если их создавать без ограничений, они не только потребляют системные ресурсы, но и снижают стабильность системы. Использование пулов потоков можно использовать для унифицированного распределения, настройки и мониторинга.
Как создать пул потоков
Сначала создайте класс, реализующий интерфейс Runnable.
package demo;
import java.util.Date;
/**
* @author yuanyiwen
* @create 2020-02-28 16:05
* @description
*/
public class DemoThread implements Runnable {
private String command;
public DemoThread(String command) {
this.command = command;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 开始时间 : " + new Date());
processCommand();
System.out.println(Thread.currentThread().getName() + " 结束时间 : " + new Date());
}
private void processCommand() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return "DemoThread{" +
"command='" + command + '\'' +
'}';
}
}
Здесь давайте используем ThreadPoolExecutor для создания пула потоков для тестирования:
package demo;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author yuanyiwen
* @create 2020-02-28 16:19
* @description
*/
public class DemoThreadPoolExecutor {
private static final int CORE_POOL_SIZE = 5;
private static final int MAX_POOL_SIZE = 10;
private static final int QUEUE_CAPACITY = 100;
private static final Long KEEP_ALIVE_TIME = 1L;
public static void main(String[] args) {
// 使用线程池来创建线程
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
// 核心线程数为 :5
CORE_POOL_SIZE,
// 最大线程数 :10
MAX_POOL_SIZE,
// 等待时间 :1L
KEEP_ALIVE_TIME,
// 等待时间的单位 :秒
TimeUnit.SECONDS,
// 任务队列为 ArrayBlockingQueue,且容量为 100
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
// 饱和策略为 CallerRunsPolicy
new ThreadPoolExecutor.CallerRunsPolicy()
);
for(int i = 0; i < 15; i++) {
// 创建WorkerThread对象,该对象需要实现Runnable接口
Runnable worker = new DemoThread("任务" + i);
// 通过线程池执行Runnable
threadPoolExecutor.execute(worker);
}
// 终止线程池
threadPoolExecutor.shutdown();
while (!threadPoolExecutor.isTerminated()) {
}
System.out.println("全部线程已终止");
}
}
Наконец, давайте посмотрим на текущие результаты:
Можно видеть, что когда число основных потоков равно 5, даже если всего нужно запустить 15 потоков, каждый раз одновременно будут выполняться только 5 задач, а оставшиеся задачи будут помещены в очередь ожидания, ожидание бездействия основного потока. В общем, шаги следующие:
Платформа исполнителя
Платформа Executor была введена после Java5. После Java 5 лучше запускать поток через Executor, чем использовать метод запуска потока. В дополнение к тому, что он более управляем и эффективен (реализован с помощью пула потоков, экономит накладные расходы), есть ключевой момент: это помогает избежать этой проблемы выхода.
этот побег
Этот переход означает, что другие потоки сохраняют ссылку на объект до возврата конструктора, и вызов методов для объектов, которые еще не были полностью сконструированы, может привести к странным ошибкам.
Обычно есть два условия, которые должны быть выполнены, чтобы вызвать этот escape: одно — создать внутренний класс в конструкторе, а другое — опубликовать внутренний класс в конструкторе.
Поскольку освобожденный объект внутреннего класса имеет свои собственные права доступа к внешнему классу this, при доступе к внешнему классу this через объект внутреннего класса внешний класс может не сконструироваться, что приведет к некоторым неожиданным проблемам.
Типичный сценарий побега выглядит следующим образом:
public class DemoThisEscape {
private int a = 10;
public DemoThisEscape() {
// 在外部类的构造函数中调用内部类
new Thread(new InnerClass()).start();
}
private class InnerClass implements Runnable {
@Override
public void run() {
// 在这里通过 DemoThisEscape.this 引用尚未构造完毕的对象,比如这样 :
System.out.println(DemoThisEscape.this.a);
}
}
}
При использовании пула потоков для унифицированного планирования потоков шаг ручного запуска потока в программе опускается, тем самым избегая ситуации запуска потока в конструкторе, поэтому он может эффективно избежать этого выхода.
Общие параметры ThreadPoolExecutor
1. corePoolSize: количество потоков ядра.
Определяет минимальное количество потоков, которые могут выполняться одновременно.
2. maxPoolSize : максимальное количество потоков
Когда задачи, хранящиеся в очереди, достигают емкости очереди, текущее количество потоков, которые могут выполняться одновременно, увеличивается до максимального количества потоков.
3. keepAliveTime: время ожидания
Когда количество потоков больше, чем количество основных потоков, максимальное время, в течение которого выживают избыточные простаивающие потоки.
4. unit : Единица времени.
Единица времени для параметра keepAliveTime, включаяTimeUnit.SECONDS
,TimeUnit.MINUTES
,TimeUnit.HOURS
,TimeUnit.DAYS
и Т. Д.
5. workQueue: очередь задач
Очередь задач используется для хранения очереди задач, ожидающих выполнения.
6. threadFactory: фабрика ниток
Фабрика потоков, используемая для создания потоков, как правило, по умолчанию.
7. обработчик: политика отклонения
Также известная как стратегия насыщения; когда слишком много задач отправляется и не может быть обработано вовремя, для обработки задач можно использовать специальную стратегию.
Стратегия насыщения ThreadPoolExecutor:Относится к стратегии, выполняемой ThreadPoolTaskExecutor, когда количество одновременно выполняющихся потоков достигает максимального количества потоков и очередь заполняется.
Общие правила отказа включают:
-
ThreadPoolExecutor.AbortPolicy:бросать
RejectedExecutionException
отклонить обработку новых задач — это стратегия отклонения по умолчанию, используемая в Spring. -
ThreadPoolExecutor.CallerRunsPolicy:Поток вызывает
execute
сам, то есть напрямую вызываяexecute
метод, работающий в потоке (run
) отклоненная задача, которая отбрасывается, если исполнитель был закрыт. Эта стратегия обеспечивает простой механизм управления обратной связью, который может замедлить отправку новых задач, но может привести к задержкам. Эту стратегию можно выбрать, если приложение может терпеть эту задержку и не может отбрасывать запросы задач. - ThreadPoolExecutor.DiscardPolicy:Не обрабатывайте новые задачи, просто отбрасывайте их.
- ThreadPoolExecutor.DiscardOldestPolicy:Эта политика отклонит самый старый незавершенный запрос задачи.
Почему рекомендуется использовать ThreadPoolExecutor для создания потоков?
Протокол 1. Ресурсы потоков должны предоставляться через пул потоков, и явное создание потоков в приложении запрещено.
Преимущество использования пула потоков заключается в сокращении времени, затрачиваемого на создание и уничтожение потоков, и накладных расходов на системные ресурсы, а также решении проблемы нехватки ресурсов. Если пул потоков не используется, это может привести к тому, что система создаст большое количество потоков одного типа, что может вызвать проблему нехватки памяти или «чрезмерного переключения».
Протокол 2: Обязательный пул потоков не может использоватьсяExecutors
создавать, а черезThreadPoolExecutor
Метод конструктора, этот способ обработки позволяет авторам более четко представлять правила выполнения пула потоков и избегать риска исчерпания ресурсов.
Недостатки Executors, возвращающие объекты пула потоков, заключаются в следующем:
FixedThreadPool
иSingleThreadExecutor
: запрос разрешендлина очередиInteger.MAX_VALUE, может бытьнакопить много запросов, что приводит к ООМ.
CachedThreadPool
иScheduledThreadPool
: разрешено создаватьколичество потоковInteger.MAX_VALUE, может бытьСоздавайте много тем, что приводит к ООМ.
Несколько общих пулов потоков
Фиксированный пул потоков FixThreadPool
FixThreadPool: пул потоков с фиксированным количеством повторно используемых потоков.
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(
nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
Механизм исполнения:
- Если количество запущенных в данный момент потоков меньше
corePoolSize
, когда приходит новая задача, создается новый поток для выполнения задачи; - Количество запущенных в данный момент потоков равно
corePoolSize
После этого, если появится новая задача, задача будет добавлена вLinkedBlockingQueue
; - После того, как поток в пуле потоков выполнит свою работу, он будет повторно запускаться с
LinkedBlockingQueue
Получить задачи для выполнения.
FixThreadPool использует неограниченную очередьLinkedBlockingQueue
(Емкость очереди — Integer.MAX_VALUE), и это принесет в пул потоков следующее:Оказать влияние :
- Когда количество потоков в пуле потоков достигает
corePoolSize
, новые задачи будут ожидать в неограниченной очереди, поэтому количество потоков в пуле потоков не будет превышатьcorePoolSize
; - Так как используется неограниченная очередь, то
maximumPoolSize
Будет недопустимым параметром, так как невозможно иметь полную очередь задач, поэтому FixedThreadPool'scorePoolSize
,maximumPoolSize
устанавливается на одно и то же значение, иkeepAliveTime
будет недопустимым параметром; - Запуск FixedThreadPool (относится к невыполнению
shutdown()
илиshutdownNow()
) не отклонит задачу, поэтому при наличии большого количества задач это может привести к неработоспособности.
Однопоточный пул SingleThreadExecutor
SingleThreadExecutor — это пул потоков только с одним потоком.
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(
1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
В основном это то же самое, что и FixThreadPool, за исключением того, что в пуле есть только один поток.
Пул потоков кеша CachedThreadPool
CachedThreadPool — это пул потоков, который создает новые потоки по мере необходимости, но повторно использует созданные ранее потоки по мере их появления.
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(
0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
ТотcorePoolSize
установлен на 0,maximumPoolSize
Установлено значение Integer.MAX.VALUE, которое не ограничено. Хотя он неограничен, потому что пул потоков также имеет механизм уничтожения, то есть, если поток не использовался в течение 60 секунд, поток будет уничтожен, что экономит много ресурсов.
Однако, если основной поток отправляет задачи быстрее, чемmaximunPool
В зависимости от скорости, с которой поток обрабатывает задачи, CachedThreadPool будет продолжать создавать новые потоки, что по-прежнему может вызывать перегрузку ЦП или переполнение памяти.
Механизм исполнения:
- сначала выполнить
offer
Операция, отправьте задачу в очередь задач. Если в текущем максимальном пуле выполняются незанятые потокиpoll
операция, а основной потокoffer
с неработающими потокамиpoll
При успешном сопряжении основной поток передает задачу незанятому потоку для выполнения, что считаетсяexecute()
Выполнение метода завершено, в противном случае выполняются следующие шаги. - когда начальный
maximum
пусто, илиmaximumPool
Когда нет свободных потоков вpoll
работать. В этот момент CachedThreadPool создаст новый поток для выполнения задачи,execute()
Выполнение метода завершено.
Как составить размер пула потоков?
переключатель контекста
В многопоточном переменном программировании количество общих потоков больше, чем количество ядер ЦП, и одно ядро ЦП может использоваться только одним потоком в любой момент времени. Чтобы эти потоки выполнялись эффективно, стратегия, принятая ЦП, заключается в выделении временных интервалов для каждого потока и ротации. Когда квант времени потока истечет, он снова будет в состоянии готовности для использования другими потоками, что является переключением контекста.
В двух словах, текущая задача сохранит свое состояние перед переключением на другую задачу после выполнения кванта времени ЦП, так что при переключении обратно на эту задачу в следующий раз ее можно будет напрямую загрузить в предыдущее состояние. Процесс задачи от сохранения до перезагрузки — это переключение контекста.
Переключение контекста обычно требует больших вычислительных ресурсов. То есть это требует значительного процессорного времени, порядка наносекунд для каждого переключения в десятках или сотнях переключений в секунду. Следовательно, переключение контекста означает, что оно потребляет много процессорного времени для системы, по сути, это может быть самая трудоемкая операция в операционной системе.
Одна из многих вещей, которые Linux сравнивает с другими операционными системами (включая другие Unix-подобные системы), заключается в том, что переключение контекста и переключение режимов занимает очень мало времени.
простой проект решения
Задачи с интенсивным использованием ЦП (N+1):
Эта задача в основном потребляет ресурсы ЦП.Количество потоков может быть установлено равным N (количество ядер ЦП) + 1. На один поток больше, чем число ядер ЦП, чтобы предотвратить случайное прерывание потока из-за ошибки страницы, или задача вызвано другими причинами воздействия подвески. Как только задача будет приостановлена, ЦП будет бездействовать, и дополнительный поток в этом случае может полностью использовать время простоя ЦП.
Задачи с интенсивным вводом-выводом (2N):
Когда применяется задача такого типа, система будет тратить большую часть времени на обработку взаимодействия ввода-вывода, и поток не будет занимать ЦП для обработки в течение периода времени обработки ввода-вывода.В это время ЦП может быть передан в пользование другим потокам. . Таким образом, при выполнении задач с интенсивным вводом-выводом мы можем настроить больше потоков, а конкретный метод расчета — 2N.
Справочная статья :JavaGuide