введение
В повседневной разработке пул потоков — это технология, которая используется очень часто.Будь то многопоточность на сервере для получения пользовательских запросов или многопоточность на стороне клиента для обработки данных, технология пула потоков будет использоваться, поэтому вы может полностью понять использование пулов потоков. Принцип реализации, стоящий за ним, и разумная оптимизация размера пула потоков очень необходимы. В этой статье будут объяснены основные функции пулов потоков и принципы, лежащие в их основе, с ответами на десять часто задаваемых вопросов, в надежде быть полезными для всех.
- Приведите пример, чтобы проиллюстрировать, почему вы должны использовать пул потоков и каковы его преимущества?
- Какие основные пулы потоков предусмотрены в jdk1.8?
- Связь между основными компонентами пула потоков?
- Жизненный цикл ExecutorService?
- Может ли поток в пуле потоков установить тайм-аут?
- Как отменить поток в пуле потоков?
- Как установить подходящий размер пула потоков?
- Как установить правильный размер очереди при использовании ограниченных очередей?
- Как выбрать подходящую политику отказа при использовании ограниченной очереди, если очередь заполнена?
- Как подсчитать время выполнения потоков в пуле потоков?
1. Приведите пример, иллюстрирующий, почему вам следует использовать пул потоков и каковы его преимущества?
Давайте сначала рассмотрим такой сценарий: сервер получает сообщения от нескольких клиентов, прослушивая порт 8888 в одном потоке. Чтобы избежать блокировки основного потока, каждый раз при получении сообщения запускается новый поток для его обработки, так что основной поток может непрерывно получать новые сообщения. Простая реализация кода без использования пула потоков выглядит следующим образом:
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(8888);
while (true) {
try {
Socket socket = serverSocket.accept();
new Thread(() -> {
try {
InputStream inputStream = socket.getInputStream();
//do something
} catch (IOException e) {
e.printStackTrace();
}
}).start();
} catch (IOException e) {
}
}
}
Создавая каждый раз новый поток, основной поток не будет блокироваться, что улучшает способность сервера получать сообщения. Но есть несколько очень очевидных проблем:
- Потребление памяти непрерывными потоками инициализации ограничено в любое время, а неограниченные новые потоки будут занимать много места в памяти.
- В случае ограниченных ресурсов ЦП создание большего количества потоков не только не может достичь цели одновременной обработки клиентских сообщений, но, наоборот, из-за более частого переключения между потоками время обработки будет больше, а эффективность будет ниже.
- Создание и уничтожение самого потока потребляет ресурсы сервера.
- Неудобно централизованно управлять потоками. И эти проблемы можно решить с помощью пулов потоков.
2. Какие основные пулы потоков представлены в jdk1.8 и сценарии их использования?
-
newFixedThreadPool — пул потоков с фиксированным количеством потоков. Он имеет такое же количество основных потоков (corePoolSize) и максимальное количество потоков (maximumPoolSize). В то же время он использует неограниченную очередь блокировки LinkedBlockingQueue для хранения дополнительных задач, то есть, когда количество потоков, достигающих nThreads, выполняется, все последующие потоки будут попадать в LinkedBlockingQueue, а новые потоки создаваться не будут.
Сценарий использования: Поскольку количество потоков фиксировано, он обычно подходит для предсказуемых сред выполнения параллельных задач.
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
-
newCachedThreadPool, пул кэшируемых потоков. Количество основных потоков по умолчанию (corePoolSize) равно 0, максимальное количество потоков (maximumPoolSize) равно Integer.MAX_VALUE, а также имеет время истечения 60 секунд.Когда поток простаивает более 60 секунд, он будет переработанный. Внутренне SynchronousQueue используется как блокирующая очередь.
Сценарий использования: Из-за нехватки емкости SynchronousQueue newCachedThreadPool не подходит для долгосрочных задач. Потому что, если время выполнения одной задачи слишком велико, всякий раз, когда нет простаивающих потоков, будут открываться новые потоки, и количество потоков может достигать Integer.MAX_VALUE, а очередь хранилища не может кэшировать задачи, что может легко привести к Проблемы с ООМ. Поэтому сценарии его использования, как правило, заключаются в выполнении большого количества краткосрочных задач.
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
-
newSingleThreadExecutor, пул потоков с одним потоком. Количество основных потоков по умолчанию (corePoolSize) и максимальное количество потоков (maximumPoolSize) равны 1, и используется неограниченная очередь блокировки LinkedBlockingQueue.
Сценарий использования: поскольку выполняется только один поток, а другие задачи ставятся в очередь, он подходит для среды, в которой один поток последовательно выполняет упорядоченные задачи.
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
-
newScheduledThreadPool и newSingleThreadScheduledExecutor, пулы потоков, которые выполняют отложенные или периодические задачи, используют встроенную очередь блокировки DelayedWorkQueue. Вы можете видеть, что его возвращаемым результатом является ScheduledExecutorService, который расширяет интерфейс ExecutorService и предоставляет методы для отсрочки и периодического выполнения задач.
Сценарий использования: используется для задач с отложенным запуском или задач, которые необходимо выполнять периодически.
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
-
newWorkStealingPool — пул потоков, предоставляемый jdk1.8 для выполнения параллельных задач. Уровень параллелизма по умолчанию — поток доступного в настоящее время максимального числа доступных ЦП.
Сценарий использования: используется для задач, которые требуют много времени и могут быть сегментированы и распараллелены.
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
3. Какова связь между основными компонентами пула потоков?
Короче говоря, пул потоков можно разделить на четыре компонента: Executor, ExecutorService, Executors и ThreadPoolExecutor.
- Интерфейс Executor определяет метод execute, который принимает Runnable в качестве параметра. Это также абстракция структуры пула потоков. Она разделяет, что может делать пул потоков и как это делать. Его также можно рассматривать как модель производителя и потребителя. Исполнитель отвечает за производственные задачи, а за конкретные пулы потоков отвечают Задачи позволяют пользователям более гибко переключать конкретную стратегию пула потоков, а также являются основой для разнообразия пула потоков.
public interface Executor {
void execute(Runnable command);
}
Итак, как в ThreadPoolExecutor реализовать метод execute? Давайте взглянем на исходный код метода execute в ThreadPoolExecutor, комментарии в нем слишком подробные и являются хорошим примером хороших комментариев. Вот лишь краткое резюме: во-первых, когда количество рабочих потоков меньше, чем количество основных потоков, оно попытается добавить рабочих в очередь на выполнение, если основных потоков недостаточно, задача будет добавлена в очередь.Если запись не удалась, будет принята стратегия отказа.
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
//ctl通过位运算同时标记了线程数量以及线程状态
int c = ctl.get();
//workerCountOf方法用来统计当前运行的线程数量
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
- Интерфейс ExecutorService наследуется от интерфейса Executor и предоставляет более полную функцию управления пулом потоков. А статус пула потоков делится на три типа: работает, закрыт и завершен. В то же время предоставляется отправка с возвращаемым значением, чтобы облегчить лучший контроль над отправленными задачами.
public interface ExecutorService extends Executor {
//关闭线程池,关闭状态
void shutdown();
//立即关闭线程池,关闭状态
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
//提交一个Callable类型的任务,带Future返回值
<T> Future<T> submit(Callable<T> task);
//提交一个Runnable类型的任务,带Future返回值
Future<?> submit(Runnable task);
//一段时间后终止线程池,终止状态
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
......
}
Это по-прежнему объясняется в ThreadPoolExecutor.Состояние пула потоков расширяется в ThreadPoolExecutor и определяется 5 состояний.Эти 5 состояний представлены старшими 3 битами Integer.SIZE. код показывает, как показано ниже:
* The runState provides the main lifecycle control, taking on values:
* 能够接受新任务也能处理队列中的任务
* RUNNING: Accept new tasks and process queued tasks
* 不能接受新任务,但能处理队列中的任务
* SHUTDOWN: Don't accept new tasks, but process queued tasks
不能接受新任务,也不能处理队列中的任务,同时会中断正在执行的任务
* STOP: Don't accept new tasks, don't process queued tasks,
* and interrupt in-progress tasks
所有的任务都被终止,工作线程为0
* TIDYING: All tasks have terminated, workerCount is zero,
* the thread transitioning to state TIDYING
* will run the terminated() hook method
terminated方法执行完成
* TERMINATED: terminated() has completed
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int RUNNING = -1 << COUNT_BITS;//101
private static final int SHUTDOWN = 0 << COUNT_BITS;//000
private static final int STOP = 1 << COUNT_BITS;//001
private static final int TIDYING = 2 << COUNT_BITS;//010
private static final int TERMINATED = 3 << COUNT_BITS;//011
Давайте посмотрим на преобразование этих пяти состояний через интерфейс ExecutorService:
public interface ExecutorService extends Executor {
//关闭线程池,线程池状态会从RUNNING变为SHUTDOWN
void shutdown();
//立即关闭线程池RUNNING或者SHUTDOWN到STOP
List<Runnable> shutdownNow();
//STOP、TIDYING以及TERMINATED都返回true
boolean isShutdown();
//TERMINATED状态返回true
boolean isTerminated();
//一段时间后终止线程池,TERMINATED
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
......
}
-
Executors предоставляет ряд статических методов для получения пулов потоков, которые эквивалентны фабрикам пулов потоков.Они представляют собой инкапсуляцию ThreadPoolExecutor, которая упрощает детали реализации пользователей, переключающихся между Executor и ExecutorService.
-
ThreadPoolExecutor — это реализация Executor и ExecutorService, предоставляющая конкретную реализацию пула потоков.
4. Каков жизненный цикл ExecutorService?
Эта проблема была объяснена выше.Жизненный цикл ExecutorService можно разделить на три состояния: работающий, закрытый и завершенный через определение интерфейса.
ThreadPoolExecutor предоставляет более подробные пять состояний в конкретной реализации: RUNNING, SHUTDOWN, STOP, TIDYING и TERMINATED. Описание различных состояний и переходов можно найти в ответе на предыдущий вопрос.
5. Может ли поток в пуле потоков установить тайм-аут?
Потоки в пуле потоков можно контролировать по тайм-ауту.Отправляйте задачи через отправку ExecutorService, которая вернет результат типа Future.Давайте посмотрим на код интерфейса Future:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
//获取返回结果,并在出现错误或者中断时throws Exception
V get() throws InterruptedException, ExecutionException;
//timeout时间内获取返回结果,并在出现错误、中断以及超时时throws Exception
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Future определяет методы get() и get(long timeout, блок TimeUnit).Метод get() блокирует текущий вызов до тех пор, пока не будет получен возвращаемый результат, а get(long timeout, блок TimeUnit) блокируется в течение указанного времени. Затем будет выброшено исключение TimeoutException. Таким образом, цель управления временем ожидания потока может быть достигнута. Простой пример использования выглядит следующим образом:
Future<String> future = executor.submit(callable);
try {
future.get(2000, TimeUnit.SECONDS);
} catch (InterruptedException e1) {
//中断后处理
} catch (ExecutionException e1) {
//抛出异常处理
} catch (TimeoutException e1) {
//超时处理
}
Здесь есть проблема, потому что метод get блокируется --- реализован LockSupport.park, как получить время ожидания каждого потока, когда в пуле потоков много потоков? Помимо реализации собственного пула потоков или собственной фабрики потоков, я не придумал лучшего способа использовать функции самого ThreadPoolExecutor. Очень глупое решение — начать прослушивание с тем же количеством потоков, что и пул потоков. Если у вас есть лучший способ, пожалуйста, оставьте сообщение.
6. Как отменить поток в пуле потоков?
Эта проблема такая же, как и решение вышеописанной проблемы: он также отправляет задачу через отправку ExecutorService, получает Future и вызывает метод отмены в Future для достижения цели.
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
}
Метод отмены имеет параметр mayInterruptIfRunning, если он равен true, это означает, что задача может принять и обработать прерывание, и вызывается метод прерывания. Если false, то это означает, что если задача не запущена, она не будет запущена, и метод прерывания не будет вызван.
Суть отмены фактически достигается через прерывание, то есть, если сам поток не может ответить на прерывание, бесполезно вызывать метод отмены. Как правило, потоки, заблокированные методами lockInterruptently, park и await, могут реагировать на прерывания, а работающие потоки должны быть прерваны самими разработчиками.
7. Как установить подходящий размер пула потоков?
Как установить подходящий размер пула потоков, я не думаю, что для этого вопроса есть фиксированная формула. Другими словами, есть только несколько простых правил настройки, но они различны для конкретных предприятий и могут быть проанализированы только в соответствии с тестом среды на месте.
Установка соответствующего размера пула потоков разделена на две части: одна — максимальный размер пула потоков, а другая — минимальный размер пула потоков. В ThreadPoolExecutor это отражается в максимальном количестве потоков (maximumPoolSize) и количестве основных потоков (corePoolSize).
Параметр максимального размера пула потоков тесно связан с количеством ядер ЦП на текущем компьютере. Вообще говоря, если вы хотите максимально использовать ЦП, вы можете установить его равным количеству ядер ЦП. Например, сервер с 4-ядерным процессором может быть установлен на 4. Но на самом деле ситуация совсем другая, потому что часто выполняемые нами задачи будут включать ввод-вывод.Например, если задача выполняет операцию запроса данных из базы данных, то ЦП фактически не используется в этот период полностью, так что мы можем Правильно расширьте размер maxPoolSize. В некоторых случаях задача будет нагружать ЦП.Такая установка большего количества потоков не только не повысит эффективность, но и значительно снизит эффективность из-за создания и уничтожения потоков и накладных расходов на переключение.Поэтому размер максимальный пул потоков должен быть основан на бизнесе.Подходящий размер может быть установлен только после конкретной проверки ситуации.
Минимальный размер пула потоков относительно легко установить по сравнению с максимальным размером пула потоков, поскольку минимальный размер потока обычно можно оценить и установить в соответствии с бизнес-ситуацией. Например, в большинстве случаев будут выполняться две задачи, а небольшая вероятность того, что если запущено более 2 задач, вы можете напрямую установить минимальный размер пула потоков равным 2. Но вам нужно знать, как долго будет выполняться более 2 задач на каждом интервале.Если будет более 2 задач каждые 2 минуты, то мы можем установить время истечения потока немного больше, например, 4 минуты, Таким образом, даже если часто выполняется более 2 задач, можно использовать кэшированный пул потоков.
В общем, установка максимального и минимального пулов потоков — это проблема без фиксированной формулы.Необходимо учитывать реальную бизнес-ситуацию и конфигурацию машины, а также проводить дополнительные тесты в соответствии с реальной бизнес-ситуацией для достижения оптимальной настройки. Прежде чем все будет решено, можно использовать принцип KISS архитектуры программного обеспечения, а максимальное и минимальное количество потоков можно установить на количество ядер процессора, а оптимизация будет выполнена позже.
8. Как установить соответствующий размер очереди при использовании ограниченных очередей?
Чтобы установить соответствующий размер очереди, вы должны сначала понять, когда очередь будет использоваться. В реализации ThreadPoolExecutor использование очередей немного отличается. Сначала он будет использовать потоки размера основного пула потоков, затем добавлять задачи в очередь, а затем расширяться до потоков максимального размера пула потоков, когда очередь заполнена. То есть использование очереди не в том, чтобы ждать, пока потоков не хватит, а в том, чтобы использовать ее, когда основных потоков недостаточно. Я не совсем понимаю намерение этой конструкции.Согласно книге «Полное руководство по производительности Java», она обеспечивает два дроссельных клапана, первый — это очередь, а второй — максимальный пул потоков. Но это не дает пользователям наилучшего опыта.Поскольку будет использоваться самый большой пул потоков, почему бы не использовать его в первый раз?
Зная, когда ThreadPoolExecutor использует пул потоков, очень удобно оценить соответствующий размер очереди. Если время выполнения одной задачи составляет 100 мс, минимальное количество потоков — 2, а максимальная задержка, которую может выдержать пользователь, — 2 с, то мы можем просто вычислить размер очереди: 2/2 с/100 мс = 10, поэтому максимальная задержка при заполнении очереди Не более 2 с. Конечно, есть и другие влияющие факторы, такие как некоторые задачи, превышающие или меньшие 100 мс, использование максимального пула потоков и т. д., вы можете вносить простые корректировки на этой основе.
9. Как при использовании ограниченной очереди выбрать подходящую политику отказа, если очередь заполнена?
В ThreadPoolExecutor предусмотрено четыре типа RejectedExecutionHandler, каждый из которых имеет четкое разделение труда и выбрать его несложно. Это: AbortPolicy, DiscardPolicy, DiscardOldestPolicy и CallerRunsPolicy. Исходный код их размещен ниже и дано краткое описание.При использовании вы можете выбрать в соответствии с вашими потребностями.
//AbortPolicy
//默认的拒绝策略,直接抛出RejectedExecutionException异常供调用者做后续处理
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
//DiscardPolicy
//不做任何处理,将任务直接抛弃掉
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
//DiscardOldestPolicy
//抛弃队列中的下一个任务,然后尝试做提交。这个使用我觉得应该是在知道当前要提交的任务比较重要,必须要被执行的场景
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
//CallerRunsPolicy
//直接使用调用者线程执行,相当于同步执行,会阻塞调用者线程,不太友好感觉。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
10. Как считать время выполнения потоков в пуле потоков?
Чтобы подсчитать время выполнения потоков в пуле потоков, вам нужно знать, где и когда выполняются потоки в пуле потоков? Знайте состояние выполнения потока, а затем добавляйте свою собственную обработку до и после выполнения потока, поэтому сначала найдите код, выполняемый потоком, в ThreadPoolExecutor:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); //执行task.run()的前置方法
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);//执行task.run()的后置方法
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
Видно, что в методе runWorker методы beforeExecute и afterExecute выполняются до и после task.run(), то есть до и после выполнения задачи Оба метода переопределяются в унаследованном классе ThreadPoolExecutor, обеспечивая большую гибкость. После наследования ThreadPoolExecutor мы можем делать все, что нам нужно, до и после выполнения задачи, конечно, включая статистику времени выполнения задачи.
Кстати, студенты, знакомые с исходным кодом Spring, видят здесь, находят ли они, что он похож на пре- и постпроцессор постпроцессора в spring? Разница в том, что один переопределяется наследованием, а другой реализуется интерфейсом.
Суммировать
На самом деле проблем, связанных с фреймворком пула потоков, гораздо больше, включая ThreadFactory, ForkJoinPool и т. д. Есть еще много мест, на изучение которых стоит потратить время. Эта статья является лишь небольшим подведением итогов после прочтения исходного кода jdk, «Практики параллельного программирования на Java» и «Авторитетного руководства по оптимизации производительности Java».
Использованная литература:
- «Практика параллельного программирования на Java»
- Полное руководство по оптимизации производительности Java