Как освоить пул потоков в JDK
Пул потоков в параллельном пакете JDK — это вопрос, который часто рассматривается в интервью.Анализ исходного кода ThreadPoolExecutorстатья. Из-за ограниченного места я не говорил об общих контрольных точках в интервью и о том, какие пункты следует освоить. Эта статья действительно немного длинная, для ее чтения лучше использовать компьютер в сочетании с исходным кодом. Сегодня я расскажу о том, какие моменты, по моему мнению, должен освоить ThreadPoolExecutor, эти моменты следует освоить, о чем часто спрашивают на собеседованиях. Теперь киньте несколько вопросов, если сможете на них ответить, то не надо смотреть вниз.
- Каковы наиболее часто используемые параметры в ThreadPoolExecutor и каковы их функции? После отправки задачи какой стратегии будет следовать ThreadPoolExecutor, чтобы создать поток для выполнения отправленной задачи?
- Каковы состояния ThreadPoolExecutor и каков поток между состояниями?
- В какой момент времени создаются потоки в ThreadPoolExecutor? Это после отправки задачи? Можно ли его создать до отправки задачи?
- Когда запускается поток, созданный в ThreadPoolExecutor?
- ThreadPoolExecutor на самом деле является пулом потоков, так как же он повторно использует потоки?
- Может ли один и тот же поток, созданный в ThreadPoolExecutor, выполнять несколько задач одновременно? Если нет механизма, гарантирующего, что один и тот же поток в ThreadPoolExecutor может выполнить только одну задачу, будет ли у него возможность выполнить другую задачу?
- В чем разница между shutdown и shutdownNow, методами отключения пула неработающих потоков в ThreadPoolExecutor?
- После отправки задач в ThreadPoolExecutor через метод submit возникнет ли проблема, если методы shutdown или shutdownNow не вызываются после выполнения всех задач?
- Предоставляет ли ThreadPoolExecutor точки расширения для выполнения каких-либо действий до или после выполнения задачи?
Если ответ на пропуске, ха-ха
Каковы параметры ThreadPoolExecutor и стратегия создания потоков?
Параметры ThreadPoolExecutor
-
corePoolSize
Количество основных потоков в пуле потоков -
mmaximumPoolSize
Максимальное количество потоков в пуле потоков -
keepAliveTime
Когда количество потоков в пуле потоков превышает corePoolSize, как долго ждать получения задач из workQueue -
unit
Единицей времени, соответствующей keepAliveTime, является класс TimeUnit. -
workQueue
Очередь блокировки используется для хранения отправленных задач, когда количество потоков в пуле потоков превышает corePoolSize. -
threadFactory
Принятая пулом потоков, фабрика потоков создает потоки в пуле потоков. -
handler
RejectedExecutionHandler, используемый, когда потоки в строке потока превышают максимальный размер пула, отклоняя обработчик выполнения.
Создайте стратегию потока
Кратко представим, что после отправки задачи в пул потоков пул потоков создает поток для выполнения процесса отправки задачи.
1. Когда количество потоков в пуле потоков, используемом для выполнения задачи, меньше, чем corePoolSize (количество основных потоков) при отправке задачи, пул потоков использует ThreadFactory (фабрика потоков) для создания потоков для выполнения отправленных задач. задача. В противном случае выполните вторые 2 шага.
2. Когда задача отправлена, количество потоков в пуле потоков, используемом для выполнения задачи, больше, чем corePoolSize (количество основных потоков), но workQueue не заполнена, пул потоков сохранит отправленную задачу в сначала workQueue (рабочая очередь), ожидание пула потоков. После выполнения других отправленных задач поток в цикле выведет выполнение задачи из workQueue в цикле. В противном случае перейдите к шагу 3.
3. Когда задача в пуле потоков больше, чем corePoolSize (количество основных потоков) при отправке задачи, а workQueu заполнена, но не превышает maxunPoolSize (максимальное количество потоков), пул потоков использует ThreadFacory (фабрика потоков) для создания потоков для выполнения отправленной задачи. В противном случае перейдите к 4.
4. Когда задача отправлена, задача выполнения в пуле потоков превышает максимальный размер unPoolSize, и выполняется политика отклонения (RejectedExecutionHanlder), настроенная в пуле потоков.
Поэтому вы должны быть очень осторожны при настройке параметров ThreadPoolExecutor.Не рекомендуется использовать большой ArrayBlockQueue или неограниченный LinkedBlockQueue, а также не следует устанавливать слишком большой размер corePoolSize. Для задач с интенсивным использованием ЦП можно установить меньшее значение (данные CUP +1), чтобы избежать ненужного переключения контекста; для задач с интенсивным вводом-выводом можно установить большее значение corePoolSize, что позволит избежать длительного ожидания ввода-вывода, пока CUP простаивает. Рекомендуется использовать threadFactory, определенный сам по себе, чтобы потоки, которые он создает, было легко различить и легко найти проблему.
Каковы состояния пула потоков и каков поток между состояниями?
- ВЫПОЛНЯЕТСЯ: Запуск, получение новых задач или обработка задач в очереди.
- SHUTDOWN: Close, новые задачи не будут получены, но значение задачи в очереди будет обработано со значением 0.
- STOP: Остановить, больше не получать новые задачи, не обрабатывать задачи в очереди и прерывать обработку задач.
- TIDYING: Все задачи завершены. Размер очереди равен 0, и поток, который перейдет в состояние TIDYING, выполнит метод terminated().
- TERMINATED: было выполнено Terminate Terminated().
Поток состояний выглядит следующим образом:
В какой момент времени создаются потоки в пуле?
В какой момент времени создаются потоки в ThreadPoolExecutor? Это после отправки задачи? Можно ли его создать до отправки задачи?
Как правило, после отправки задачи пул потоков будет использовать фабрику потоков для создания потоков, но не тогда, когда число потоков в пуле потоков достигнет corePoolSize или maxmumPoolSize. Основные потоки можно создать заранее с помощью метода prestartCoreThread или метода prestartAllCoreThreads перед отправкой задачи. Для получения подробной информации см. следующий рисунок:
Когда запускается поток, созданный в ThreadPoolExecutor?
Реализация потока в пуле потоков создается в методе addWorker, подробнее см. анализ метода addWorker в предыдущей статье. После создания поток запускается. Потоки, созданные в пуле потоков, инкапсулируются в объект Worker, а класс Worker реализует интерфейс Runnable, а потоки в пуле потоков ссылаются на рабочего. Когда поток запускается, у него фактически есть возможность дождаться, пока операционная система запланирует и выполнит метод run класса Worker.
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
//创建的线程引用了worker
this.thread = getThreadFactory().newThread(this);
}
ThreadPoolExecutor на самом деле является пулом потоков, так как же он повторно использует потоки?
Как только пул потоков создаст поток через ThreadFactory, он инкапсулирует созданный поток в объект Worker и одновременно запустит поток. Вновь созданный поток будет выполнять только что отправленную задачу и будет постоянно выводить выполнение задачи из workerQueue. Повторное использование потоков из пула потоков достигается за счет непрерывного удаления задач из workerQueue. Для анализа исходного кода см. анализ метода runWorkers.
Может ли один и тот же поток, созданный в ThreadPoolExecutor, выполнять несколько задач одновременно?
При этом несколько задач не могут выполняться одновременно, и только одна задача может выполняться при выполнении другой задачи. Как упоминалось выше, поток, созданный ThreadFacory в пуле потоков, в конечном итоге будет инкапсулирован в Worker, и поток ссылается на Worker.После запуска потока задача фактически выполняется в методе запуска в Worker и, наконец, запускается снова выполняет задачу. Делегируйте метод runWorker класса ThreadPoolExecutor.
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{...}
Worder реализует Runnable, с одной стороны, и наследует AQS, с другой стороны. Внедрив AQS, Worker имеет семантику эксклюзивной блокировки.Каждый раз, когда задача отправляется, она сначала блокируется, а затем разблокируется после завершения задачи. Именно эта операция блокировки и разблокировки гарантирует, что один и тот же поток должен завершить текущую задачу, прежде чем он получит возможность выполнить другую задачу.
В чем разница между shutdown и shutdownNow, методами отключения пула неработающих потоков в ThreadPoolExecutor?
Метод выключения заключается в том, чтобы установить состояние пула потоков в SHUTDOWN. В это время новые задачи не могут быть отправлены (отправка вызовет исключение), задачи workerQueue будут продолжать выполняться, а пул потоков отправит сигнал прерывания для этих бездействующих потоков. Бездействующий поток на самом деле не является потоком, который не выполняет задачи. Как поток, инкапсулированный в worker, может быть заблокирован, и реализация потока здесь будет простаивать. Ниже приведен исходный код для отправки сигнала прерывания в бездействующий поток.
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
//w.tryLock()用于加锁,看线程是否在执行任务
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
Метод shutdownNow должен установить состояние пула потоков на STOP.В это время новые задачи не могут быть отправлены (отправка вызовет исключение), и все потоки в пуле потоков получат сигнал прерывания. Ответ конкретного потока зависит от ситуации.Если поток заблокирован вызовом метода ожидания, присоединения или собственного метода ожидания объекта, состояние прерывания будет очищено, и в то же время будет сгенерировано исключение InterruptedException. Для других случаев обратитесь к описанию метода Thread.interrupt. Метод shutdownNow отправляет информацию о прерывании всем потокам.Исходный код выглядит следующим образом:
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
//加锁操作保证中断过程中不会新woker被创建
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
После отправки задач в ThreadPoolExecutor через метод submit возникнет ли проблема, если методы shutdown или shutdownNow не вызываются после выполнения всех задач?
Будет проблема, если основной поток не указан для разрешения тайм-аутов. Допустимый тайм-аут основного потока относится к методу блокировки, используемому для ожидания поступления задачи при получении задачи из wakerQueue, или задача извлекается из очереди синхронной блокировки путем установки тайм-аута. То есть получить задачу через метод опроса BlockingQueue или метод взятия для получения задачи. Обратитесь к анализу метода getTask в предыдущем анализе исходного кода. Если метод shutdown или shutdownNow не вызывается, основной поток всегда блокируется и приостанавливается из-за вызова метода BlockingQueue.take в методе getTask для получения задачи. Основной поток всегда будет находиться в состоянии блокировки, что приводит к утечке памяти, и основной поток не может завершиться, если только он не будет принудительно остановлен. Если вы попытаетесь запустить следующую программу, вы обнаружите, что программа не может выйти.
public class Test {
public static void main(String args[]) {
ExecutorService executorService = new ThreadPoolExecutor(3, 3,10L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10));
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println("thread name " + Thread.currentThread().getName());
}
});
}
}
При использовании пула потоков необходимо не забыть вызвать метод shutdown или shutdownNow в конкретном сценарии, чтобы закрыть пул потоков. Метод shutdown подходит для сценариев, в которых должны быть выполнены все отправленные задачи, а метод shutdownNow подходит для сценариев, в которых вас не волнует, завершены ли отправленные задачи.
Предоставляет ли ThreadPoolExecutor точки расширения для выполнения каких-либо действий до или после выполнения задачи?
Пул потоков предоставляет три точки расширения, а именно методы beforeExecutor и afterExecutor до и после вызова метода запуска или метода вызова отправленной задачи; другая точка расширения заключается в том, что состояние пула потоков изменяется с состояния TIDYING на TERMINATED состояние Будет вызван завершенный метод.
Суммировать
Изначально я просто хотел написать немного, но когда я это написал, я понял, что это было немного длинно. Эта статья в основном знакомит с важными моментами в ThreadPoolExecutor, которые я лично считаю более важными, а также разбирает ThreadPoolExecutor и находит отклонения в моем предыдущем понимании.
Прочитав три вещи ❤️
Если вы считаете, что этот контент очень полезен для вас, я хотел бы пригласить вас сделать мне три небольших одолжения:
-
Лайки, репосты и ваши "лайки и комментарии" - движущая сила моего творчества.
-
Обратите внимание на общедоступный номер 『яванская гнилая свиная кожа’, время от времени делясь оригинальными знаниями.
-
В то же время вы можете рассчитывать на последующие статьи🚀
Автор: Йе И
Источник:club.perf.com/article/191…