Мы знаем, что потоки нужно создавать, когда они нужны, и уничтожать, когда они не нужны, но создание и уничтожение потоков — дорогостоящие операции.
Почему это дорого?
Хотя нам, программистам, легко создать поток, мы можем просто использовать new Thread() для его создания, но операционная система выполнит гораздо больше работы, ей нужно выдать系统调用
, перехват в ядре, вызов API-интерфейсов ядра для создания потоков, выделение ресурсов потокам и т. д. Эти операции имеют много накладных расходов.
Следовательно, в случае высокого параллелизма и большого трафика частое создание и уничтожение потоков будет сильно замедлять скорость отклика, так есть ли способ улучшить скорость отклика? Есть много способов. Попытка избежать создания и уничтожения потоков — это способ повысить производительность.复用
Вставайте, потому что производительность — наша самая большая ежедневная забота.
В этой статье давайте сначала разберемся с инфраструктурой Executor, а затем начнем с описания основных концепций пулов потоков, постепенно разберемся в основных классах пулов потоков, а затем постепенно введем принцип пулов потоков, чтобы шаг за шагом понять потоки. бассейны.
В Java этого эффекта можно добиться с помощью пула потоков. Сегодня мы подробно поговорим о Java.线程池
.
Платформа исполнителя
Зачем сначала говорить об Executor? Поскольку я думаю, что Executor является драйвером пула потоков, мы обычно используем новый метод Thread().start() для создания и выполнения потоков.Создайте тему и начните работу. А о создании пула потоков мы поговорим позже, что больше отражено вуправлять исполнениемначальство.
Общая структура Executor выглядит следующим образом, и мы представим каждый класс в структуре Executor ниже.
Давайте сначала познакомимся с Исполнителем
Интерфейс исполнителя
Исполнительjava.util.concurrent
Интерфейс верхнего уровня, этот интерфейс имеет только один метод, то естьexecute
метод. Обычно мы создаем и запускаем поток, используяnew Thread().start()
и метод execute в Executor вместо явного создания потоков. Executor предназначен для разделения сведений о представлении задачи и выполнении задачи. Используя инфраструктуру Executor, вы можете создавать потоки следующим образом.
Executor executor = Executors.xxx // xxx 其实就是 Executor 的实现类,我们后面会说
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
Метод execute получаетRunnable
Экземпляр, он используется для выполнения задачи, а задача представляет собой класс, реализующий интерфейс Runnable, но метод execute не может получить реализациюCallable
Класс интерфейса, то есть метод execute, не может получить задачу с возвращаемым значением.
Потоки, созданные методом execute, выполняются асинхронно, то есть вам не нужно ждать завершения каждой задачи перед выполнением следующей задачи.
Например, ниже приведен простой пример использования Executor для создания и выполнения потоков.
public class RunnableTask implements Runnable{
@Override
public void run() {
System.out.println("running");
}
public static void main(String[] args) {
Executor executor = Executors.newSingleThreadExecutor(); // 你可能不太理解这是什么意思,我们后面会说。
executor.execute(new RunnableTask());
}
}
Исполнитель эквивалентен патриарху. Начальник только отдает приказы. Если патриарх просит вас выполнить асинхронно, вы должны выполнить его асинхронно. Патриарх говорит нет.汇报
Вам не нужно сообщать о задаче, но у патриарха есть немного управления, поэтому помимо Исполнителя нам также нужно знать другие служебные операции, например, когда ваш поток завершается, когда он приостанавливается и оценивается текущий статус вашей темы и т. д.,ExecutorService
Просто большая домработница.
Интерфейс ExecutorService
ExecutorService также является интерфейсом, является расширением Executor и предоставляет некоторые методы, которых нет в Executor. Давайте представим эти методы.
void shutdown();
shutdown
После вызова метода ExecutorService упорядоченно закроет выполняющиеся задачи, но не будет принимать новые задачи. Если задача уже закрыта, этот метод не действует.
ExecutorService также имеет метод, аналогичный методу выключения.
List<Runnable> shutdownNow();
shutdownNow
Попытается остановить закрытие всех выполняемых задач, остановить ожидающие задачи и вернуть список задач, ожидающих выполнения.
Поскольку shutdown и shutdownNow так похожи, в чем разница?
- метод выключения будет просто
线程池
Статус установлен наSHUTWDOWN
, выполняющаяся задача будет продолжать выполняться, пул потоков будет ожидать завершения выполнения задачи, а неисполняемый поток будет прерван.- Метод shutdownNow установит состояние пула потоков в
STOP
, выполняемые и ожидающие задачи останавливаются, и возвращается список задач, ожидающих выполнения
ExecutorService также имеет три метода для оценки статуса потока:
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
-
isShutdown
Метод указывает, был ли закрыт исполнитель, если да, то возвращает true, иначе возвращает false. -
isTerminated
Этот метод означает, что все задачи были завершены после выключения.Если завершение возвращает false, необходимо отметить, что метод isTerminated никогда не будет истинным, если сначала не будут вызваны методы shutdown или shutdownNow. -
awaitTermination
Метод блокируется до тех пор, пока не завершится выполнение всех задач после запроса на завершение работы. Этот метод не очень прост для понимания, давайте рассмотрим его на небольшом примере.
public static ExecutorService executorService = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
executorService.submit(() -> {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
System.out.println("Waiting...");
boolean isTermination = executorService.awaitTermination(3, TimeUnit.SECONDS);
System.out.println("Waiting...Done");
if(isTermination){
System.out.println("All Thread Done");
}
System.out.println(Thread.currentThread().getName());
}
Если после вызова executorService.shutdown() все потоки завершают задачу, а isTermination возвращает true, программа выводит All Thread Done. будет возвращать ложь.
Еще одна причина, по которой ExecutorService является большим помощником, заключается в том, что он может не только размещать объекты Runnable, но и приниматьCallable
объект. В ExecutorService,submit
Эту роль играют методы.
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
Метод submit вернетFuture
объект,<T>
Представляет универсальный тип, который предназначен для возвращаемого значения, созданного Callable.Если метод вызова в задаче, отправленной методом отправки, возвращает целое число, то метод отправки возвращаетFuture<Integer>
,Так далее и тому подобное.
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
invokeAll
Этот метод используется для выполнения заданной комбинации задач. После завершения выполнения будет возвращен список задач. Каждый элемент в списке задач является задачей, и каждая задача будет включать статус задачи и результат выполнения. Аналогично, метод invokeAll метод также вернет объект Future.
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
invokeAny получит результат первой выполненной задачи, т.е.Callable<T>
Возвращаемое значение вызова в интерфейсе,При получении результата другие запущенные задачи прерываются,имеют阻塞性
.
Обязанности большого дворецкого более стандартны, чем у бригадира, и дела, которыми он управляет, тоже шире, но ведь большой дворецкий – это еще и опора семьи, он не будет выполнять специфическую работу. , он отвечает за завершение работы большой экономки.
Абстрактный класс AbstractExecutorService
AbstractExecutorService — это абстрактный класс, который реализует некоторые методы ExecutorService, достаточно экспертный, он будет анализировать, что должна делать большая экономка, а затем составлять конкретные планы требований большой домработницы, а затем находить его правую руку. человек.ThreadPoolExecutor
для достижения цели.
AbstractExecutorService Этот абстрактный класс в основном реализуетinvokeAll
иinvokeAny
метод, мы объясним анализ исходного кода этих двух методов позже.
Интерфейс ScheduledExecutorService
ScheduledExecutorService также является интерфейсом. Он расширяет интерфейс ExecutorService и предоставляет функции, которых нет в интерфейсе ExecutorService.定时执行器
, исполнитель по времени может запланировать запуск команд после определенной задержки или периодически.
В основном он имеет три метода интерфейса и один перегруженный метод. Давайте сначала взглянем на эти два перегруженных метода.
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
schedule
Метод может выполнить задачу после определенной задержки и может выполнить ее только один раз. Как видите, метод schedule также возвращаетScheduledFuture
Object, объект ScheduledFuture расширяет интерфейсы Future и Delayed и представляет результат асинхронного отложенного вычисления. Метод расписания поддерживает нулевые и отрицательные задержки, обе из которых считаются немедленными задачами.
Еще одна вещь, которую следует отметить, это то, что метод расписания может получать относительное время и период в качестве параметров вместо фиксированной даты, вы можете использоватьdate.getTime - System.currentTimeMillis()чтобы получить относительный интервал времени.
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
scheduleAtFixedRate указывает, что задача будет запланирована с фиксированной скоростью в то время, когдаinitialDelay
Затем выполнять непрерывно.
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
Этот метод очень похож на вышеописанный, это означает, что задача выполняется с фиксированным временем задержки.
Два метода scheduleAtFixedRate и scheduleWithFixedDelay легко спутать.Давайте рассмотрим пример, чтобы проиллюстрировать разницу между этими двумя методами.
public class ScheduleTest {
public static void main(String[] args) {
Runnable command = () -> {
long startTime = System.currentTimeMillis();
System.out.println("current timestamp = " + startTime);
try {
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("time spend = " + (System.currentTimeMillis() - startTime));
};
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
scheduledExecutorService.scheduleAtFixedRate(command,100,1000,TimeUnit.MILLISECONDS);
}
}
Вывод примерно такой
Видно, что временной интервал вывода текущей метки времени составляет около 1000 миллисекунд, поэтому можно сделать вывод, чтоscheduleAtFixedRate
задачи выполняются с постоянной скоростью.
Потом снова смотримscheduleWithFixedDelay
Метод такой же, как в тестовом классе выше, за исключением того, что мы заменяем scheduleAtFixedRate на scheduleWithFixedDelay .
scheduledExecutorService.scheduleWithFixedDelay(command,10,1000,TimeUnit.MILLISECONDS);
Затем наблюдайте за выводом
Видно, что интервал между двумя текущими метками времени примерно равен сумме 1000(фиксированное время) + задержка(затраты времени), которую можно определитьscheduleWithFixedDelay
выполняется с фиксированной задержкой.
Описание пула потоков
Давайте сначала разберемся, что такое пул потоков.池子
, какой бассейн? Это относится к пулу, который управляет одной и той же группой рабочих потоков, то есть пул потоков будет управлять внутренними рабочими потоками единообразно.
Согласно вики, пул потоков на самом деле является шаблоном проектирования программного обеспечения, используемым для реализации параллелизма в компьютерных программах.
![image-20210202200016478](/Users/mr.l/Library/Application Support/typora-user-images/image-20210202200016478.png)
Например, ниже приведена простая схема концепции пула потоков.
Примечание. Эта диаграмма является всего лишь концептуальной моделью, а не реальной реализацией пула потоков, надеюсь, читатели не будут сбиты с толку.
Можно видеть, что это фактически эквивалентнопроизводитель-потребительМодель, потоки в очереди задач будут поступать в пул потоков, который управляется пулом потоков. Каждый поток в пуле потоков является рабочим потоком. После выполнения рабочего потока он будет помещен в очередь завершения, представляя выполненная задача.
Приведенный выше рисунок имеет недостаток, то есть потоки в очереди будут уничтожены после выполнения, что вызовет потерю производительности и снизит скорость отклика.Целью использования пулов потоков часто является повторное использование потоков для повышения производительности программы.
Поэтому мы должны повторно использовать рабочий поток после выполнения и ждать следующего использования.
Создание пула потоков
О базовом механизме выполнения пула потоков мы говорили выше.Вы знаете, как повторно используются потоки, поэтому ничто не может появиться из воздуха, и то же самое верно для потоков, так как же они создаются? Далее я должен упомянуть класс инструментов, то естьExecutors
.
Исполнители такжеjava.util.concurrent
Являясь членом пакета, это фабрика для создания пулов потоков. Для создания пулов потоков можно использовать статические фабричные методы. Ниже приведены конкретные типы пулов потоков, которые могут создавать исполнители.
-
newFixedThreadPool
: newFixedThreadPool создаст фиксированное количество пулов потоков, которые программист может создать с помощьюExecutors.newFixedThreadPool(int nThreads)
При указании вручную поток создается каждый раз при отправке задачи, а значение nThreads — это максимальное количество активных потоков, разрешенных в любой момент времени. Если дополнительные задачи создаются, когда все потоки активны, эти вновь созданные потоки войдут в очередь ожидания для планирования потоков. Если какой-либо поток из-за аварии во время выполнения线程终止
, то поток в очереди ожидания будет использоваться вместо этого при выполнении последующих задач. -
newWorkStealingPool
: newWorkStealingPool — это недавно добавленный пул потоков в JDK1.8, основанный наfork-join
Реализация механизма пула потоков с использованиемWork-Stealing
алгоритм. newWorkStealingPool создаст достаточно потоков для поддержки параллелизма и использует несколько очередей для уменьшения конкуренции. Пул потоков кражи работы не гарантирует порядок выполнения отправленных задач. -
newSingleThreadExecutor
: newSingleThreadExecutor — однопоточный исполнитель, который будет создавать только单个
Поток для выполнения задачи, если этот поток аварийно завершится, для его замены будет создан другой поток. newSingleThreadExecutor обеспечит порядок выполнения задач в очереди задач, то есть выполнение задач有序的
. -
newCachedThreadPool
: newCachedThreadPool создаст кэшируемый пул потоков в соответствии с фактическими потребностями. Если количество потоков в пуле потоков превышает фактическое количество задач, которые необходимо обработать, то newCachedThreadPool перезапустит лишние потоки. Если фактическое количество обрабатываемых потоков не соответствует количеству задач, вы можете добавить новые потоки в пул потоков, и количество потоков в пуле потоков не ограничено. -
newSingleThreadScheduledExecutor
: newSingleThreadScheduledExecutor очень похож на newSingleThreadExecutor, за исключением того, что этот исполнитель с запланированным может выполнять задачи после определенной задержки или периодически. -
newScheduledThreadPool
: этот пул потоков подобен вышеприведенному запланированному исполнителю, за исключением того, что newSingleThreadScheduledExecutor добавляет на один больше, чем newScheduledThreadPool.DelegatedScheduledExecutorService
Proxy, который на самом деле является воплощением шаблона проектирования обертки.
Базовая реализация вышеупомянутых пулов потоков поддерживается ThreadPoolExecutor, поэтому, чтобы понять принцип работы этих пулов потоков, вам нужно сначала понять ThreadPoolExecutor.Давайте поговорим о ThreadPoolExecutor.
Класс ThreadPoolExecutor
ThreadPoolExecutor
родыjava.util.concurrent
Под классом инструмента можно сказать, что это основной класс в пуле потоков. Если вы хотите полностью понять пул потоков, вы должны сначала понять этот класс.
Если мы возьмем приведенное выше семейство в качестве примера, ThreadPoolExecutor является основой семейства, основой семейства. ThreadPoolExecutor выполняет большую работу.
Прежде всего, ThreadPoolExecutor предоставляет четыре конструктора, но первые три конструктора в конечном итоге вызовут последний конструктор для инициализации.
public class ThreadPoolExecutor extends AbstractExecutorService {
.....
// 1
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue);
// 2
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
// 3
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
// 4
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
...
}
Итак, давайте посмотрим на последний пул потоков и посмотрим, какие параметры.Если я правильно считаю, должно быть 7 параметров (первичный математический уровень...)
- Во-первых, очень важным параметром является
corePoolSize
, Емкость/размер основного пула потоков, как бы вы это ни называли, я не думаю, что с этим что-то не так. Но вы должны понимать значение этого параметра,Он имеет очень тесную связь с принципом реализации пула потоков.. Вы только что создали пул потоков, а потоков в это время нет. Это легко понять, потому что у меня сейчас нет задач для выполнения. Что вы делаете с созданием потоков? И есть накладные расходы на создание потоков, так что никогда не поздно создавать потоки, когда приходят задачи. но! Я бы сказал, но при вызове методов prestartAllCoreThreads или prestartCoreThread потоки будут создаваться при отсутствии задач: первый создает потоки corePoolSize, а второй создает только один поток. Дедушка Леа хотел, чтобы мы стали программистами懒汉
, подождите, пока придет задание; но вы должны быть饿汉
, Закончите задание раньше времени. Если мы хотим быть ленивыми, после создания пула потоков количество потоков в пуле потоков равно 0. Когда приходит задача, будет создан поток для выполнения задачи.Когда количество потоков в пуле потоков достигает corePoolSize, поместит поступающие задачи в缓存队列
среди.
-
maximumPoolSize
: существует еще одна емкость пула потоков, но это максимальная емкость пула потоков, то есть самый большой поток, который может вместить пул потоков, а указанное выше значение corePoolSize — это только емкость основного потока.
Я знаю, что в это время у вас будут вопросы, то есть вы не знаете, как отличить мощность основного потока от максимальной мощности потока, верно? Мы объясним это позже.
-
keepAliveTime
: этот параметр является пулом потоков保活机制
, указывающее, как долго поток будет завершен без выполнения задачи. По умолчанию этот параметр вступает в силу, только если количество потоков больше, чем corePoolSize. Когда количество потоков больше, чем corePoolSize, если время ожидания любого бездействующего потока больше, чем keepAliveTime, поток будет устранен до тех пор, пока количество потоков не станет равным corePoolSize. Если вызывается метод allowCoreThreadTimeOut, количество потоков в пределах диапазона corePoolSize также будет действовать до тех пор, пока количество потоков не уменьшится до 0. -
unit
: Этот параметр легко сказать, этоTimeUnit
Переменная unit представляет единицу времени keepAliveTime. Типы единиц следующиеTimeUnit.DAYS; //天 TimeUnit.HOURS; //小时 TimeUnit.MINUTES; //分钟 TimeUnit.SECONDS; //秒 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.NANOSECONDS; //纳秒
-
workQueue
: Понятие, представленное этим параметром, представляет собой очередь ожидания. Как мы сказали выше, если основной поток> corePoolSize, задача будет помещена в очередь ожидания. Выбор этой очереди ожидания также является знанием. Дедушка Леа показал нам три варианта ожидания в очереди-
SynchronousQueue
: на основе阻塞队列(BlockingQueue)
реализации, он будет напрямую передавать задачу потребителю и должен ждать, пока добавленные элементы в очереди будут использованы, прежде чем продолжать добавлять новые элементы. Блокирующие очереди, использующие SynchronousQueue, обычно требуют, чтобы maxPoolSizes был неограниченным, что равно Integer.MAX_VALUE, чтобы избежать отказа потоков выполнять операции. -
LinkedBlockingQueue
: LinkedBlockingQueue — неограниченная буферная очередь ожидания. Когда количество выполняемых в данный момент потоков достигает числа corePoolSize, оставшиеся элементы будут ожидать в очереди блокировки. -
ArrayBlockingQueue
: ArrayBlockingQueue – это очередь ожидания с ограниченным буфером, в которой можно указать размер буферной очереди. Когда число исполняемых потоков равно corePoolSize, лишние элементы буферизуются в очереди ArrayBlockingQueue и продолжают выполняться, когда есть незанятые потоки. Когда очередь ArrayBlockingQueue заполнена, если ему не удается присоединиться к очереди ArrayBlockingQueue, для выполнения будет открыт новый поток. Когда количество потоков достигнет максимального максимального размера пула, будет сообщено об ошибке, когда новый элемент попытается присоединиться к очереди ArrayBlockingQueue
-
-
threadFactory
: Фабрика потоков, этот параметр в основном используется для создания потоков; -
handler
: Политика отказа, политика отказа в основном имеет следующие значения.-
AbortPolicy
: отменяет задачу и создает исключение RejectedExecutionException. -
DiscardPolicy
: отменяет задачу напрямую, но не генерирует исключение. -
DiscardOldestPolicy
: Непосредственно отбросить задачу в начале очереди и повторить задачу (повторить процесс). -
CallerRunsPolicy
: задача обрабатывается вызывающим потоком.
-
Глубокое понимание пулов потоков
Выше я вкратце рассказал вам об базовой структуре пула потоков, в пуле потоков есть несколько очень важных параметров, которые можно внимательно попробовать, но проснитесь, приятель, следующий шаг — самое интересное.
состояние пула потоков
Во-первых, давайте поговорим о состоянии пула потоков.Состояние пула потоков является очень интересным моментом проектирования.ThreadPoolExecutor используетctl
Для хранения состояния пула потоков эти состояния также называются пулом потоков.生命周期
. Подумайте об этом тоже. Как бассейн ресурсов для хранения и управления потоками, сам пул резьбы должен иметь эти состояния, а изменения между состояниями могут лучше соответствовать нашим потребностям. CTL на самом деле являетсяAtomicInteger
Типы переменных для обеспечения原子性
.
Помимо хранения состояния пула потоков, ctl также хранитworkerCount
Эта концепция workerCount указывает количество эффективных потоков, а workerCount указывает количество рабочих потоков, которым разрешено запускаться, но не разрешено останавливаться. Значение workerCount отличается от фактического количества активных потоков.
Старшие и младшие биты ctl используются для определения того, является ли это статусом пула потоков или количеством рабочих потоков.Статус пула потоков высокий..
Здесь есть конструктивный момент, зачем использовать AtomicInteger вместо хранения большего AtomicLong или подобного?
Дело не в том, что Леа не рассматривала эту проблему.Для представления значения int текущий размер workerCount равен **(2^29)-1 (около 500 миллионов потоков), а не (2^31)-1 (2 миллиарда) может быть представлена нитью**. Эту переменную можно изменить на AtomicLong, если возникнут проблемы в будущем. Но до тех пор, пока это не потребуется, использование int делает этот код быстрее и проще, а хранилище int занимает меньше места.
runState имеет следующие состояния
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
Сначала мы переходим к диаграмме вращения состояний, а затем даем подробное объяснение в соответствии с диаграммой вращения состояний.
Эти состояния объясняются следующим
-
RUNNING
: если пул потоков находится в состоянии RUNNING, он может получать новые задачи и обрабатывать запущенные задачи. Из инициализации ctl можно узнать, что после создания пула потоков он будет находиться в состоянии RUNNING, а эффективное количество потоков в пуле потоков равно 0.
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
-
SHUTDOWN
: После вызова метода выключения состояние пула потоков изменится с RUNNING -> SHUTDOWN. Пул потоков в состоянии SHUTDOWN может обрабатывать запущенные задачи, но не может принимать новые задачи, что совпадает с описанием выключения, которое мы упоминалось выше. -
STOP
: Аналогично методу выключения, при вызове метода shutdownNow программа перейдет из состояния РАБОТАЕТ/ВЫКЛЮЧЕНИЕ -> СТОП, пул потоков в состоянии СТОП, не будет получать новые задачи, не будет обрабатывать добавленные задачи и прерывает выполняемые задачи. -
TIDYING
: Состояние TIDYING имеет предварительное условие, которое делится на два типа: первый заключается в том, что когда пул потоков находится в состоянии SHUTDOWN, а число потоков в очереди блокировки и пуле потоков пусты, будет выполнено SHUTDOWN -> TIDYING; другой: Когда пул потоков находится в состоянии STOP, когда количество пулов потоков пусто, он будет находиться в состоянии STOP -> TIDYING. Пул потоков, преобразованный в TIDYING, вызоветterminated
Этот метод ловушки, завершенный, является пустой реализацией в классе ThreadPoolExecutor.Если пользователь хочет выполнить соответствующую обработку, когда пул потоков становится TIDYING, это можно реализовать путем перегрузки завершенной функции. -
TERMINATED
: Состояние TERMINATED — это последнее состояние пула потоков.Когда пул потоков находится в состоянии TIDYING, после выполнения завершенного метода он будет находиться в состоянии TIDYING -> TERMINATED. В этот момент пул потоков полностью завершается.
важная переменная
Давайте взглянем на важные переменные в пуле потоков.
private final BlockingQueue<Runnable> workQueue;
Очередь блокировки, это имеет то же значение, что и параметры очереди блокировки, о которых мы упоминали выше, потому что при построении ThreadPoolExecutor значение параметра будет присвоено this.workQueue.
private final ReentrantLock mainLock = new ReentrantLock();
Основной пул потоков状态锁
, изменения состояния пула потоков (например, размер пула потоков, рабочее состояние) должны использовать эту блокировку.
private final HashSet<Worker> workers = new HashSet<Worker>();
рабочие держат коллекцию всех потоков в пуле потоков, только вышеперечисленныеmainLock
Доступ к замку возможен только.
private final Condition termination = mainLock.newCondition();
Ожидание условий, используемых для поддержки метода awaitTermination. Условие и блокировку можно использовать вместе для реализации механизма уведомления/ожидания.
private int largestPoolSize;
Наибольший размер пула представляет собой размер самого большого пула в пуле потоков, доступ к которому можно получить, только удерживая mainLock.
private long completedTaskCount;
completeTaskCount представляет количество выполненных задач, оно обновляется только при завершении задачи, и для доступа к нему необходимо удерживать mainLock.
private volatile ThreadFactory threadFactory;
threadFactory — это фабрика для создания потоков, все потоки будут использовать эту фабрику, вызываяaddWorker
метод создания.
private volatile RejectedExecutionHandler handler;
Обработчик представляет собой стратегию отказа, и обработчик будет вызываться, когда поток перенасыщен или находится на грани закрытия.
private volatile long keepAliveTime;
Время поддержания активности, которое относится к периоду ожидания для бездействующих потоков в ожидании работы.При наличии нескольких corePoolSize или allowCoreThreadTimeOut поток будет использовать этот период ожидания.
Ниже приведены некоторые другие относительно простые переменные, и я просто прокомментирую их.
private volatile boolean allowCoreThreadTimeOut; //是否允许为核心线程设置存活时间
private volatile int corePoolSize; //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
private volatile int maximumPoolSize; //线程池最大能容忍的线程数
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy(); // 默认的拒绝策略
отправка задачи
Теперь мы знаем, что при создании ThreadPoolExecutor будет находиться в рабочем состоянии.В это время количество потоков равно 0. Когда задача прибудет, пул потоков создаст поток для выполнения задачи, и наш фокус будет размещены наотправка задачина этом процессе.
Обычно мы использовали бы
executor.execute()
Чтобы выполнить задачу, я видел этот процесс выполнения во многих книгах и руководствах блога.Ниже приведена диаграмма выполнения и блок-схема выполнения ThreadPoolExecutor, нарисованная некоторыми книгами и руководствами блога.
Схема выполнения
Схема процесса
Метод выполнения ThreadPoolExecutor делится на следующие четыре случая.
- Если количество текущих рабочих потоков меньше, чем corePoolSize, для выполнения задачи будет создан новый поток, на этом шаге необходимо получить mainLock
全局锁
. - Если размер выполняемого потока не меньше размера corePoolSize, добавьте задачу в очередь блокировки BlockingQueue.
- Если задача не может быть добавлена в BlockingQueue, явление в это время заключается в том, что очередь заполнена. В это время необходимо создать новый поток для обработки задачи. На этом этапе также необходимо получить глобальную блокировку mainLock.
- Если создание нового потока приведет к тому, что текущий поток превысит
maximumPoolSize
, задача будет отклонена и будет использоватьсяRejectedExecutionHandler.rejectEExecution()
Метод отклоняет новую задачу.
ThreadPoolExecutor использует приведенную выше общую идею дизайна, чтобы избежать получения глобальных блокировок при выполнении метода execute, поскольку частое получение глобальных блокировок будет серьезной проблемой.可伸缩瓶颈
, поэтому почти все вызовы метода execute проходят через шаг 2.
Процесс выполнения execute указан выше.Вообще говоря, этот процесс выполнения объясняет очень важные моменты, но недостаточно подробно.После того, как я проверил ThreadPoolExecute и некоторые статьи анализа исходного кода, я обнаружил, что это не так просто.Давайте возьмем сначала посмотрите на выполнение.Исходный код , я дал китайские комментарии
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 获取 ctl 的值
int c = ctl.get();
// 判断 ctl 的值是否小于核心线程池的数量
if (workerCountOf(c) < corePoolSize) {
// 如果小于,增加工作队列,command 就是一个个的任务
if (addWorker(command, true))
// 线程创建成功,直接返回
return;
// 线程添加不成功,需要再次判断,每需要一次判断都会获取 ctl 的值
c = ctl.get();
}
// 如果线程池处于运行状态并且能够成功的放入阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
// 再次进行检查
int recheck = ctl.get();
// 如果不是运行态并且成功的从阻塞队列中删除
if (! isRunning(recheck) && remove(command))
// 执行拒绝策略
reject(command);
// worker 线程数量是否为 0
else if (workerCountOf(recheck) == 0)
// 增加工作线程
addWorker(null, false);
}
// 如果不能增加工作线程的数量,就会直接执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
Ниже приведена блок-схема выполнения, которую я нарисовал в соответствии с исходным кодом.
Далее мы анализируем процесс выполнения, который может быть немного многословным, поскольку несколько основных процессов были упомянуты выше, но для целостности процесса мы упомянем его здесь снова.
- Если количество ядер в пуле потоков меньше
corePoolSize
, то addWorker будет использоваться для создания нового потока, а процесс addworker будет проанализирован ниже. Если создание прошло успешно, метод execute вернется напрямую. Если он не был создан успешно, это может быть связано с тем, что пул потоков был закрыт.Это может быть связано с тем, что workerCountOf(c) = corePoolSize. - Если пул потоков все еще находится в состоянии «Выполняется», задача будет добавлена в очередь блокировки.
double-check
Дважды проверьте, продолжайте выполнять следующие шаги, если присоединение не удалось, это может быть связано с тем, что поток очереди заполнен.В это время будет оцениваться, может ли он присоединиться к пулу потоков.Если пул потоков также заполнен, отклонение политика будет выполняться напрямую.Если пул потоков заполнен, можно присоединиться, выполнение метода завершается. - Двойная проверка на шаге 2 в основном предназначена для того, чтобы определить, может ли задача, введенная в workQueue, быть выполнена: если пул потоков больше не находится в состоянии Running, он должен отказаться от добавления задач и удалить задачи из очереди workQueue. Если пул потоков находится в состоянии Running, но удаление из workQueue не удается, причина может заключаться в том, что другие потоки выполняют задачу, и политика отклонения будет выполняться напрямую.
- Если поток находится в состоянии «Выполняется» и задачу нельзя удалить из очереди, оцените, равен ли рабочий поток 0. Если он не равен 0, выполнение завершено.Если рабочий поток равен 0, для увеличения будет использоваться addWorker. рабочий поток и выполнение завершены.
Добавить рабочий поток
Как видно из приведенного выше процесса выполнения, добавление воркера требует большой работы, что также является сложным моментом для сравнения цен.Давайте разберем его вместе.Это исходный код воркера.
private boolean addWorker(Runnable firstTask, boolean core) {
// retry 的用法相当于 goto
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 仅在必要时检查队列是否为空。
// 线程池状态有五种,state 越小越是运行状态
// rs >= SHUTDOWN,表示此时线程池状态可能是 SHUTDOWN、STOP、TIDYING、TERMINATED
// 默认 rs >= SHUTDOWN,如果 rs = SHUTDOWN,直接返回 false
// 默认 rs < SHUTDOWN,是 RUNNING,如果任务不是空,返回 false
// 默认 RUNNING,任务是空,如果工作队列为空,返回 false
//
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 执行循环
for (;;) {
// 统计工作线程数量
int wc = workerCountOf(c);
// 如果 worker 数量>线程池最大上限 CAPACITY(即使用int低29位可以容纳的最大值)
// 或者 worker数量 > corePoolSize 或 worker数量>maximumPoolSize ),即已经超过了给定的边界
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 使用 CAS 增加 worker 数量,增加成功,跳出循环。
if (compareAndIncrementWorkerCount(c))
break retry;
// 检查 ctl
c = ctl.get(); // Re-read ctl
// 如果状态不等于之前获取的 state,跳出内层循环,继续去外层循环判断
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
/*
worker数量+1成功的后续操作
* 添加到 workers Set 集合,并启动 worker 线程
*/
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 包装 Runnable 对象
// 设置 firstTask 的值为 -1
// 赋值给当前任务
// 使用 worker 自身这个 runnable,调用 ThreadFactory 创建一个线程,并设置给worker的成员变量thread
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 在持有锁的时候重新检查
// 如果 ThreadFactory 失败或在获得锁之前关闭,请回退。
int rs = runStateOf(ctl.get());
//如果线程池在运行 running<shutdown 或者 线程池已经 shutdown,且firstTask==null
// (可能是 workQueue 中仍有未执行完成的任务,创建没有初始任务的 worker 线程执行)
//worker 数量 -1 的操作在 addWorkerFailed()
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// workers 就是一个 HashSet 集合
workers.add(w);
// 设置最大的池大小 largestPoolSize,workerAdded 设置为true
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
//如果启动线程失败
// worker 数量 -1
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Это действительно долгий метод, хочется немного вырвать кровью, на самом деле моя печень перестала двигаться, но когда я думаю о читателях, которые читают эту статью, я могуподпишись на меня, даже глоток старой крови того стоит.
Схема выполнения этого метода выглядит следующим образом.
Мы не будем описывать это здесь словами, но на блок-схеме выше есть один объект, который привлек мое внимание, а именноworker
объект, этот объект представляет рабочий поток в пуле потоков, так что же это за рабочий объект?
рабочий объект
Рабочий находится в г.ThreadPoolExecutor
Внутри он расширяет класс AQS и реализует интерфейс Runnable. Класс Worker в основном поддерживает состояние управления прерыванием во время выполнения потока. Он обеспечивает операции захвата и снятия блокировки. В рабочей реализации мы использовали нереентерабельные мьютексы вместо повторяющихся блокировок, потому что Леа считала, что мы не должны иметь возможность повторно получать блокировки при вызове таких методов управления, как setCorePoolSize.
Исходный код рабочего объекта относительно прост и стандартен, здесь мы говорим только о методе построения рабочего объекта, то есть
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
Создание рабочего объекта требует трех шагов:
- Начальное состояние AQS равно -1, прерывание() в настоящее время не разрешено. Только после запуска рабочего потока и выполнения метода runWorker() состояние может быть установлено на 0 для прерывания.
- Назначьте firstTask глобальной переменной, которая является текущим классом
- пройти через
ThreadFactory
Создайте новую тему.
###Выполняется задача
Наш предыдущий процесс в основном проанализирован выполнением метода выполнения пула резьбы. Этот процесс выполнения эквивалентен процессу подачи задач, и мы здесь, чтобы сказать,Получить задачу из очереди и запустить ееэтого рабочего процесса.
Обычно мы начинаем с начальной задачи, поэтому нам не нужно получать первую задачу. В противном случае, пока пул потоков все еще находится в состоянии выполнения, мы будем вызыватьgetTask
способ получить задание. Метод getTask может возвращать значение null, что может быть вызвано изменением состояния пула потоков или изменением параметра конфигурации. Другая ситуация может быть связана с异常
Он срабатывает, о чем мы подробно поговорим позже.
Давайте взглянемrunWorker
Исходный код метода:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 允许打断
// new Worker() 是 state==-1,此处是调用 Worker 类的 tryRelease() 方法,
// 将 state 置为0
w.unlock();
boolean completedAbruptly = true;
try {
// 调用 getTask() 获取任务
while (task != null || (task = getTask()) != null) {
// 获取全局锁
w.lock();
// 确保只有在线程 STOPING 时,才会被设置中断标志,否则清除中断标志。
// 如果一开始判断线程池状态 < STOPING,但 Thread.interrupted() 为 true,
// 即线程已经被中断,又清除了中断标示,再次判断线程池状态是否 >= stop
// 是,再次设置中断标示,wt.interrupt()
// 否,不做操作,清除中断标示后进行后续步骤
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 执行前需要调用的方法,交给程序员自己来实现
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行后需要调用的方法,交给程序员自己来实现
afterExecute(task, thrown);
}
} finally {
// 把 task 置为 null,完成任务数 + 1,并进行解锁
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
// 最后处理 worker 的退出
} finally {
processWorkerExit(w, completedAbruptly);
}
}
Вот блок-схема runWorker
Обратите внимание, что последнийprocessWorkerExit
метод, на самом деле делается много вещей, включая суждениеcompletedAbruptly
Логическое значение, указывающее, следует ли завершить задачу, получить блокировку, попытаться удалить рабочий процесс из очереди, а затем попытаться прервать. Далее будет оцениваться статус прерывания. Если текущее состояние пула потоков меньше СТОП, вместо него будет создан новый рабочий Уничтоженный рабочий.
приобретение задачи
Получение задачи — это процесс выполнения метода getTask, который в основном используется для получения и удаления задач. Введите ссылку для анализа исходного кода ниже
private Runnable getTask() {
// 判断最后一个 poll 是否超时。
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 必要时检查队列是否为空
// 对线程池状态的判断,两种情况会 workerCount-1,并且返回 null
// 线程池状态为 shutdown,且 workQueue 为空(反映了 shutdown 状态的线程池还是要执行 workQueue 中剩余的任务的)
// 线程池状态为 stop(shutdownNow() 会导致变成 STOP)(此时不用考虑 workQueue 的情况)
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// 是否需要定时从 workQueue 中获取
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果工作线程的数量大于 maximumPoolSize 会进行线程剔除
// 如果使用了 allowCoreThreadTimeOut ,并且工作线程不为0或者队列有任务的话,会直接进行线程剔除
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
Блок-схема выполнения метода getTask выглядит следующим образом.
выход из рабочего потока
Выход из рабочего потока — это последний шаг runWorker, этот шаг определяет, будет ли рабочий поток внезапно завершен, и попытается ли он завершить поток, и нужно ли увеличить поток, чтобы заменить исходный рабочий поток.
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// worker数量 -1
// completedAbruptly 是 true,突然终止,说明是 task 执行时异常情况导致,即run()方法执行时发生了异常,那么正在工作的 worker 线程数量需要-1
// completedAbruptly 是 false 是突然终止,说明是 worker 线程没有 task 可执行了,不用-1,因为已经在 getTask() 方法中-1了
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
// 从 Workers Set 中移除 worker
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
// 尝试终止线程,
tryTerminate();
// 是否需要增加 worker 线程
// 线程池状态是 running 或 shutdown
// 如果当前线程是突然终止的,addWorker()
// 如果当前线程不是突然终止的,但当前线程数量 < 要维护的线程数量,addWorker()
// 故如果调用线程池 shutdown(),直到workQueue为空前,线程池都会维持 corePoolSize 个线程,
// 然后再逐渐销毁这 corePoolSize 个线程
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
Исходный код немного великоват, и вы, возможно, какое-то время не сможете понять приведенный выше исходный код, но вы можете сначала вставить комментарии, а когда у вас будет время, вам нужно неоднократно стимулировать и углублять свое впечатление!
другие пулы потоков
Давайте посмотрим на принципы построения других пулов потоков, в основном с участиемFixedThreadPool, SingleThreadExecutor, CachedThreadPool.
newFixedThreadPool
newFixedThreadPool называется многоразовым固定线程数
Пул потоков . Ниже приведен исходный код newFixedThreadPool.
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
Как видите, для параметров corePoolSize и maxPoolSize newFixedThreadPool заданы параметры, указанные при создании FixedThreadPool.nThreads
, то есть в newFiexedThreadPool количество основных потоков — это максимальное количество потоков.
Ниже приведена схема выполнения newFixedThreadPool.
Рабочий процесс newFixedThreadPool выглядит следующим образом.
- Если количество запущенных в данный момент потоков меньше, чем corePoolSize, для выполнения задачи создается новый поток addworker.
- Если количество потоков текущего потока равно corePoolSize, задача будет добавлена непосредственно в
LinkedBlockingQueue
В неограниченной очереди блокировки, если верхний предел LinkedBlockingQueue не указан, по умолчанию используется размер Integer.MAX_VALUE. - После выполнения задач в пуле потоков newFixedThreadPool будет повторно получать задачи из LinkedBlockingQueue для выполнения.
По сравнению с ThreadPoolExecutor, newFixedThreadPool в основном внес следующие изменения.
-
Количество основных потоков равно максимальному количеству потоков, поэтому newFixedThreadPool имеет только две максимальные емкости: одна — это емкость потоков пула потоков, а другая — емкость потоков неограниченной очереди блокировки LinkedBlockingQueue.
-
Здесь видно, что есть еще одно изменение 0L, то есть keepAliveTime = 0L, keepAliveTime — время ожидания потока после достижения максимальной мощности рабочего потока, 0L означает, что когда количество потоков в пуле потоков больше чем corePoolsize, запасные потоки будут немедленно завершены.
-
Из-за использования неограниченной очереди работающий newFixedThreadPool не будет отклонять задачу, то есть метод RejectedExecutionHandler.rejectedExecution не будет вызываться.
newSingleThreadExecutor
В newSingleThreadExecutor есть только один рабочий поток, что означает, что это Executor только с одним рабочим потоком.
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
Видно, что в newSingleThreadExecutor и corePoolSize, и MaximumPoolSize установлены в 1, таймаута нет, также используется неограниченная блокирующая очередь LinkedBlockingQueue, кроме corePoolSize и maxPoolSize, остальные почти такие же, как у newFixedThreadPool.
Ниже приведена диаграмма выполнения newSingleThreadExecutor.
Процесс выполнения newSingleThreadExecutor такой же, как у newFixedThreadPool, за исключением того, что число рабочих потоков newSingleThreadExecutor равно 1.
newCachedThreadPool
newCachedThreadPool – это пул потоков, который создает рабочие потоки по мере необходимости. Максимальное число пулов потоков newCachedThreadPool – целое число.MAX_VALUE, а60
секунды, используяSynchronousQueue
Небуферизованная блокирующая очередь.
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
Схема его выполнения следующая
- Во-первых, метод SynchronousQueue.offer будет выполняться первым, если в текущем выполняющемся максимальном пуле есть незанятые потоки.
SynchronousQueue.poll
, задача будет передана для выполнения в незанятый поток, метод execute завершается, в противном случае выполнение продолжится вниз. - Если ни один поток в MaximumPool не выполняет метод SynchronousQueue.poll, в этом случае newCachedThreadPool создаст новый поток для выполнения задачи, и метод execute завершится.
- Завершенный поток выполнит операцию опроса, что приведет к тому, что бездействующий поток будет ждать до 60 секунд в SynchronousQueue. Если новая задача будет отправлена в течение 60 секунд, поток ожидания выполнит только что отправленное задание, в противном случае поток ожидания завершится.
Ключевым моментом здесь является очередь SynchronousQueue, которая представляет собой блокирующую очередь без пропускной способности.Каждая операция вставки должна ожидать соответствующей операции удаления другим потоком.. На самом деле это своего рода передача задачи, как показано на следующем рисунке.
На самом деле, есть еще и пул потоковScheduledThreadPoolExecutor
, поэтому в этой статье я не буду вдаваться в подробности.
Рекомендации по использованию пула потоков
Вот несколько моментов, которые следует учитывать при использовании пулов потоков на практике.
-
Избегайте наложения задач, такие как упомянутый выше newFixedThreadPool, который создает заданное количество потоков, но рабочая очередь не ограничена, что приводит к тому, что при слишком малом количестве потоков рабочей очереди скорость обработки не успевает за скоростью поступления очередь.В этом случае очень вероятно, что может вызвать OOM, может быть использован для диагностики
jmap
Проверьте, не поставлено ли в очередь большое количество задач. - В производственной практике вполне вероятно, что логика не является строгой или рабочий поток не может быть вовремя освобожден.утечка резьбы, на этот раз лучше проверить стек потоков
- Избегайте проблем с синхронизацией, таких как взаимоблокировки
- Старайтесь избегать операций при использовании пулов потоков
ThreadLocal
, потому что время жизни рабочего потока может превышать время жизни задачи.
Параметры размера пула потоков
Интервьюеры также часто проверяют параметр размера пула потоков.任务类型
настроить размер пула потоков
- Если это задача, интенсивно использующая ЦП, это означает, что ЦП является дефицитным ресурсом.В это время мы обычно не можем увеличить вычислительную мощность, увеличив количество потоков, потому что слишком много потоков приведет к частому переключению контекста.В этом В этом случае рекомендуется Разумное количество потоков
N(CPU)数 + 1
. - Если это задача с интенсивным вводом-выводом, это означает, что требуется больше ожидания.В настоящее время вы можете обратиться к рекомендуемому методу Brain Goetz.Количество потоков = количество ядер ЦП × (1 + среднее время ожидания / среднее время работы). Эталонное значение может быть N(ЦП) ядер * 2.
Конечно, это только справочное значение, и конкретные параметры необходимо настроить в соответствии с реальной ситуацией.Например, вы можете сначала установить размер пула потоков на эталонное значение, а затем наблюдать за работой задачи, загрузкой системы, и использование ресурсов для внесения соответствующих корректировок.
постскриптум
Эта статья писалась давно, так как раньше я мало что знала о пулах потоков, потратила много сил на ее изучение, надеюсь, эта статья будет вам полезна.
Кроме того, добавьте мой WeChat becxuan, присоединяйтесь к группе ежедневных вопросов, делитесь одним вопросом интервью каждый день, пожалуйста, обратитесь к моему Github для получения дополнительной информации, станьте лучшим лучшимJavaer, эта статья была включена, см. оригинальную ссылку для получения подробной информации.
Я лично перелил шесть PDF-файлов. После того, как программист поиска WeChat cxuan обратил внимание на официальный аккаунт, он ответил cxuan в фоновом режиме и получил все PDF-файлы.Эти PDF-файлы следующие