Всем привет,я третий ребенок.Приятно снова встретиться с вами.Температура недавно понизилась,так что согревайтесь пожалуйста.
В этом разделе используется пул потоков Java. Далее мы шаг за шагом перевернем пул потоков вверх дном.
Введение: Третий ребенок снимает деньги
Есть программист, зовут его третий.
У третьего ребенка нет денег в кармане, поэтому он поспешил заняться банковским делом.
Я встал рано утром, и сестра банка пожелала мне доброго утра.
Третий ребенок увидел, что прилавок пуст, а все пять копеек в карточке вынуты.
Третий в этот день встал поздно, и бизнес-окно было заполнено.
Приходилось входить в зону очереди, узнавать мобильный телефон и ждать.
Третий ребенок спал до третьего выстрела, и очередь была заполнена.
Когда управляющий увидел проем, товарищи по очереди поспешили это сделать.
В тот день было так жарко, что все очереди у прилавка были израсходованы.
Третий ребенок злится, когда видит это, как вы думаете, что должен сделать менеджер?
Менеджер махнул рукой и улыбнулся, такая сцена уже привычна. Четыре способа справиться с этим, угадайте, что бы я сделал.
- Небольшие банки перегружены, а устаревшие системы парализованы.
- Мы сожалеем о маленьком храме, который попросил вас прийти и найти того, кто это сделает.
- В зависимости от вашей срочной ситуации, приходите и уходите в команду, чтобы добавить пробку.
- На самом деле нет никакого способа сделать это сегодня, если вы не можете, вы можете изменить в другой день.
Да-да, по сути, этот процесс такой же, как пул потоков JDKThreadPoolExecutor
Рабочий процесс аналогичен: сначала продайте пропуск, а затем объедините рабочий процесс пула потоков, чтобы убедиться, что вы внезапно прозреете.
Настоящий бой: пул потоков управляет потоками обработки данных
Давайте просто поговорим о поддельном дескрипторе, покажем вам код, давайте перейдем к практике использования пула потоков в сочетании с бизнес-сценариями. ——Когда многие студенты были опрошены, принцип пула потоков был хорошо понят, когда их спрашивали, как его использовать в проекте, они останавливались. Прочитав этот пример, быстро подумайте, что можно применить в проекте.
Сценарии применения
Сценарий применения очень простой.Наш проект - это система аудита.Когда дело доходит до учета каждый год нам необходимо предоставлять данные в стороннюю учетную систему для учета.
Здесь есть проблема: в силу исторических причин интерфейс, предоставляемый учетной системой, поддерживает только один пуш, а фактический объем данных составляет 300 000. Если сделать один пуш, то это займет не менее одной недели.
Поэтому рассмотрите возможность использования многопоточного метода для передачи данных.Тогда как осуществляется управление потоком?Пул потоков.
Зачем использовать пул потоков для управления потоками? Конечно, для повторного использования потока.
Идея тоже очень проста, запустить несколько потоков, и каждый поток читает и проталкивает данные, которые не были пропушены в интервале (start, count) из базы данных.
конкретная реализация кода
Я извлек эту сцену, основной код:
Код относительно длинный, поэтому используется карбоновое благоустройство, код не понятен, не беда, я залил исполняемый код на удаленный склад, адрес склада:git ee.com/fighter3/дней спустя…, Этот пример относительно прост. Учащиеся, которые не использовали пулы потоков, могут рассмотреть, есть ли у вас какие-либо сценарии обработки и очистки данных, которые можно применить. Вы можете извлечь из этого уроки и интерпретировать их.
Тема этой статьи — пулы потоков, поэтому мы сосредоточимся на коде пула потоков:
Создание пула потоков
//核心线程数:设置为操作系统CPU数乘以2
private static final Integer CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
//最大线程数:设置为和核心线程数相同
private static final Integer MAXIMUM_POOl_SIZE = CORE_POOL_SIZE;
//创建线程池
ThreadPoolExecutor pool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOl_SIZE * 2, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
Пул потоков создается непосредственно ThreadPoolExecutor:
- Количество основных потоков установлено равным количеству ЦП × 2.
- Поскольку требуются сегментированные данные, максимальное количество потоков равно количеству основных потоков.
- использовать очередь блокировки
LinkedBlockingQueue
- Запретить использование политики по умолчанию
Пул потоков отправляет задачи
//提交线程,用数据起始位置标识线程
Future<Integer> future = pool.submit(new PushDataTask(start, LIMIT, start));
- Поскольку требуется возвращаемое значение, используйте
submit()
Отправьте задание, если используетеexecute()
Отправьте задачу без возвращаемого значения.
Код не несет ответственности, вы можете просто спуститься и запустить.
Итак, как именно работает пул потоков? Давайте посмотрим вниз.
Принцип: Принцип реализации пула потоков
Рабочий процесс пула потоков
Метод строительства
Когда мы создавали пул потоков, мы использовалиThreadPoolExecutor
Конструктор:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
Давайте посмотрим на значение нескольких параметров:
-
corePoolSize
: количество основных потоков -
maximumPoolSize
: максимально допустимое количество потоков (основные потоки + неосновные потоки). -
workQueue
: очередь задач пула потоковОчередь блокировки, используемая для сохранения задач, ожидающих выполнения.Обычные очереди блокировки:
-
ArrayBlockingQueue
: ограниченная блокирующая очередь на основе структуры массива -
LinkedBlockingQueue
: Блокировка очереди на основе связанного списка структуры -
SynchronousQueue
: Не сохранять блок очереди блокировки -
PriorityBlockingQueue
: бесконечная очередь блокировки с приоритетом
-
-
handler
: политика отказа от насыщения пула потоковПлатформа пула потоков JDK предлагает четыре стратегии:
-
AbortPolicy
: Выбрасывает исключение напрямую, стратегия по умолчанию. -
CallerRunsPolicy
: Запустить задачу в потоке вызывающего абонента. -
DiscardOldestPolicy
: Отменить самую старую задачу в очереди задач -
DiscardPolicy
: не обрабатывать, отменить текущую задачу
Вы также можете реализовать в соответствии с вашими собственными сценариями приложений
RejectedExecutionHandler
интерфейс для настройки стратегии. -
Вышеупомянутые четыре параметра тесно связаны с рабочим процессом пула потоков. Давайте посмотрим на оставшиеся три параметра.
-
keepAliveTime
: максимальное время бездействия неосновных потоков. -
unit
: время, в течение которого неосновные потоки в пуле потоков остаются активными. -
threadFactory
: Фабрика, используемая при создании нового потока, которую можно использовать для установки имени потока и т. д.
Рабочий процесс пула потоков
Зная несколько параметров, как эти параметры применяются?
отexecute()
Взяв в качестве примера задачу отправки метода, давайте посмотрим на рабочий процесс пула потоков:
При отправке задач в пул потоков:
- Если текущий запущенный поток меньше
核心线程数corePoolSize
, создается новый поток для выполнения задачи - Если запущено равное или большее количество потоков
核心线程数corePoolSize
, добавьте задачу в任务队列workQueue
- если
任务队列workQueue
заполнен, создайте новый поток для обработки задачи - Если новые потоки создаются так, что текущее общее количество потоков превышает
最大线程数maximumPoolSize
, задание будет отклонено,线程池拒绝策略handler
воплощать в жизнь
Объединяя жизненные примеры, с которых мы начали, правильно ли это:
Анализ исходного кода работы пула потоков
Приведенный выше анализ процесса позволяет нам интуитивно понять принцип работы пула потоков.Давайте рассмотрим детали в исходном коде.
Отправить тему (выполнить)
Пул потоков выполняет задачи следующим образом:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//获取当前线程池的状态+线程个数变量的组合值
int c = ctl.get();
//1.如果正在运行线程数少于核心线程数
if (workerCountOf(c) < corePoolSize) {
//开启新线程运行
if (addWorker(command, true))
return;
c = ctl.get();
}
//2. 判断线程池是否处于运行状态,是则添加任务到阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
//二次检查
int recheck = ctl.get();
//如果当前线程池不是运行状态,则从队列中移除任务,并执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
//如若当前线程池为空,则添加一个新线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//最后尝试添加线程,如若添加失败,执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
Давайте взглянемexecute()
Подробная блок-схема:
Добавить тему (addWorker)
существуетexecute
В коде метода есть ключевой методprivate boolean addWorker(Runnable firstTask, boolean core)
, этот метод в основном завершает две части:增加线程数
,添加任务,并执行
.
- Давайте сначала посмотрим на первую часть, чтобы увеличить количество потоков:
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 1.检查队列是否只在必要时为空(判断线程状态,且队列不为空)
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//2.循环CAS增加线程个数
for (;;) {
int wc = workerCountOf(c);
//2.1 如果线程个数超限则返回 false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//2.2 CAS方式增加线程个数,同时只有一个线程成功,成功跳出循环
if (compareAndIncrementWorkerCount(c))
break retry;
//2.3 CAS失败,看线程池状态是否变化,变化则跳到外层,尝试重新获取线程池状态,否则内层重新CAS
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
//3. 到这说明CAS成功了
boolean workerStarted = false;
boolean workerAdded = false;
- Затем посмотрите вторую часть, чтобы добавить задачи и выполнить их.
Worker w = null;
try {
//4.创建worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//4.1、加独占锁 ,为了实现workers同步,因为可能多个线程调用了线程池的excute方法
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//4.2、重新检查线程池状态,以避免在获取锁前调用了shutdown接口
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//4.3添加任务
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//4.4、添加成功之后启动任务
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
Давайте посмотрим на общий процесс:
Поток выполнения (runWorker)
После отправки пользовательского потока в пул потоковWorker
воплощать в жизнь,Worker
является наследованием в пуле потоковAQS
,выполнитьRunnable
Пользовательский класс для интерфейса, который представляет собой объект, выполняющий конкретную задачу.
Давайте посмотрим на его метод построения:
Worker(Runnable firstTask) {
setState(-1); // 在调用runWorker之前禁止中断
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); //创建一个线程
}
- В конструкторе сначала установите состояние =-1, теперь есть простая монопольная блокировка без повторного входа, состояние = 0 означает, что блокировка не была получена, состояние = 1 означает, что блокировка была получена, а размер состояния установлен равным -1 во избежание Поток был прерван до запуска метода runWorker()
- firstTask записывает первую задачу рабочего потока
- поток - это поток, который выполняет задачу
этоrun
вызов метода напрямуюrunWorker
, реальный поток выполнения находится в нашемrunWorker
В методе:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 允许中断
boolean completedAbruptly = true;
try {
//获取当前任务,从队列中获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
…………
try {
//执行任务前做一些类似统计之类的事情
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行任务完毕后干一些些事情
afterExecute(task, thrown);
}
} finally {
task = null;
// 统计当前Worker 完成了多少个任务
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//执行清理工作
processWorkerExit(w, completedAbruptly);
}
}
Код выглядит много, но на самом деле суть в том, чтобы срезать ветки и лозы.task.run()
Пусть нить течет.
Получить задачу (getTask)
Наша задача вышеrunWorker
я вижу это предложениеwhile (task != null || (task = getTask()) != null)
, выполняемая задача либо в настоящее время передается вfirstTask
, а также черезgetTask()
получить этоgetTask
Основная цельПолучить задачи из очереди.
private Runnable getTask() {
//poll()方法是否超时
boolean timedOut = false;
//循环获取
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 1.线程池未终止,且队列为空,返回null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//工作线程数
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//2.判断工作线程数是否超过最大线程数 && 超时判断 && 工作线程数大于0或队列为空
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//从任务队列中获取线程
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//获取成功
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
Подводя итог, модель Worker, выполняющего задачи, выглядит следующим образом [8]:
резюме
вот, поймиexecute
иworker
некоторых процессов, можно сказать, что на самом делеThreadPoolExecutor
Реализация представляет собой производственно-потребительскую модель.
Когда пользователь добавляет задачи в пул потоков, это эквивалентно производителю, производящему элементы. workers
Когда потоки в рабочем наборе потоков непосредственно выполняют задачи или получают задачи из очереди задач, они эквивалентны потребителям, потребляющим элементы.
жизненный цикл пула потоков
Представление состояния пула потоков
существуетThreadPoolExecutor
В нем определены некоторые состояния, и в то же время старшие и младшие биты используются для созданияctl
Этот параметр может сохранять состояние и количество потоков, что очень умно! [6]
//记录线程池状态和线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//29
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池状态
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
Старшие 3 бита представляют статус, а младшие 29 бит записывают количество потоков:
Поток состояния пула потоков
Пул потоков определяет в общей сложности пять состояний, давайте посмотрим, как протекают эти состояния [6]:
- RUNNING: Состояние выполнения, прием новых задач и обработка задач в очереди.
- SHUTDOWN: состояние выключения (был вызван метод выключения). Не принимать новые задачи, а обрабатывать задачи в очереди.
- STOP: Состояние Stopped (вызван метод shutdownNow). Не принимать новые задачи, не обрабатывать задачи в очереди и прерывать обработку задач.
- TIDYING: все задачи были завершены, а workerCount равен 0. После того, как пул потоков перейдет в это состояние, будет вызван метод terminated() для перехода в состояние TERMINATED.
- TERMINATED: Завершенное состояние, состояние после завершения вызова метода terminated().
Применение: создание надежного пула потоков
Правильно настроить пул потоков
Что касается построения пула потоков, нам нужно обратить внимание на две конфигурации:размер пула потоковиочередь задач.
размер пула потоков
Что касается размера пула потоков, то не существует «золотого правила», которое нужно неукоснительно соблюдать, по характеру задачи его можно условно разделить наCPU密集型任务
,IO密集型任务
и混合型任务
.
- Задачи, интенсивно использующие ЦП: задачи, интенсивно использующие ЦП, должны быть настроены с минимальным количеством потоков, например, пул потоков с потоками Ncpu+1.
- Задачи с интенсивным вводом-выводом: потоки задач с интенсивным вводом-выводом не всегда выполняют задачи, поэтому вам следует настроить как можно больше потоков, например 2*Ncpu.
- Смешанные задачи. При необходимости смешанные задачи можно разделить на задачи с интенсивным использованием ЦП и задачи с интенсивным вводом-выводом.
Конечно, это всего лишь предложение, на самом деле, как его настроить, зависит от комбинации事前评估和测试
,事中监控
для определения приблизительного размера пула потоков. Размер пула потоков также можно регулировать динамически без записи мертвых.
очередь задач
Очередь задачи, как правило, рекомендуется использовать ограниченную очередь, очередь может казаться неограниченной очередью задач неограниченное накопление, что приведет к исключительному исключительному исключительству памяти.
Мониторинг пула потоков
[1] Если в системе используется большое количество пулов потоков, необходимо отслеживать пул потоков, чтобы при возникновении проблемы ее можно было быстро локализовать в соответствии с использованием пула потоков.
Пул потоков можно контролировать с помощью параметров и методов, предоставляемых пулом потоков:
- getActiveCount(): количество потоков в пуле потоков, выполняющих задачи.
- getCompletedTaskCount() : количество выполненных задач в пуле потоков, значение меньше или равно taskCount.
- getCorePoolSize() : количество основных потоков в пуле потоков.
- getLargestPoolSize(): максимальное количество потоков, когда-либо созданных пулом потоков. Благодаря этим данным вы можете узнать, заполнен ли пул потоков, то есть достигнут ли максимальный размер пула.
- getMaximumPoolSize(): максимальное количество потоков в пуле потоков.
- getPoolSize() : текущее количество потоков в пуле потоков.
- getTaskCount() : общее количество задач, выполненных и не выполненных пулом потоков.
Вы также можете отслеживать, расширяя пул потоков:
- Настройте пул потоков, унаследовав пул потоков, перепишите методы beforeExecute, afterExecute и терминированные методы пула потоков,
- Также можно выполнить некоторый код для мониторинга до выполнения задачи, после выполнения и до закрытия пула потоков. Например, отслеживать среднее время выполнения, максимальное время выполнения и минимальное время выполнения задач и т. д.
End
Эта статья начинается с жизненного сценария, шаг за шагом от реального боя к принципу, чтобы получить глубокое понимание пула потоков.
А вы узнали, так называемая так называемая四种线程池
В статье это не упоминается — конечно, из-за недостатка места в следующей статье будет организован пул потоков для создания класса инструмента.Executors
.
Пул потоков также является ключевым полем битвы для интервью Какие вопросы будут заданы в интервью?
Все это в пути.点赞
,关注
Не теряйтесь, увидимся в следующий раз!
Ссылаться на:
[1] «Искусство параллельного программирования на Java».
[2] «Практика программирования на Java».
[3]. По правде говоря, на этот раз вы точно сможете легко изучить пулы потоков.
[4]. Требования к собеседованию: анализ пула потоков Java
[6] Брат Сяо Фу "Руководство по Java Face Sutra"
[7] «Красота параллельного программирования на Java».
[8]. Принцип реализации пула потоков Java и его практика в бизнесе Meituan