Использование ThreadPoolExecutor
Совет: если у вас есть какие-либо вопросы, пожалуйста, свяжитесь с личным сообщением, адрес исходного кода ниже, пожалуйста, получите его самостоятельно
предисловие
ThreadPoolExecutor — это класс пула потоков, который существует только после JDK 1.5. JDK помогает нам реализовать newSingleThreadExecutor, newFixedThreadPool, newCachedThreadPool и другие удобные пулы потоков, созданные на основе ThreadPoolExecutor. Так почему же эти пулы потоков не рекомендуются в спецификациях разработки Alibaba Woolen Cloth? Я верю, что вы будете просвещены после прочтения этой статьи.Совет: Ниже приведен основной текст этой статьи, следующие случаи приведены для справки.
1. Техническое введение
1. Что такое пул потоков?
Пул потоков — это форма многопоточности, при которой задачи добавляются в очередь во время обработки, а затем автоматически запускаются после создания потоков. Потоки пула потоков являются фоновыми потоками. Каждый поток использует размер стека по умолчанию, работает с приоритетом по умолчанию и находится в многопоточном апартаменте. Если поток бездействует в управляемом коде (например, в ожидании события), пул потоков вставляет другой рабочий поток, чтобы все процессоры были заняты. Если все потоки пула потоков всегда заняты, но очередь содержит ожидающую работу, пул потоков через некоторое время создаст еще один рабочий поток, но количество потоков никогда не превысит максимальное. Потоки, которые превышают максимальное значение, могут быть помещены в очередь, но они не запустятся, пока не закончатся другие потоки. ---Выдержка из энциклопедии Baidu2. Используйте шаги
1. Введение параметра ThreadPoolExecutor
имя параметра | Тип параметра | Значение параметра |
---|---|---|
corePoolSize | int | Размер пула основных потоков |
maximumPoolSize | int | Максимальный размер пула потоков |
keepAliveTime | long | максимальное время простоя потока |
unit | TimeUnit | единица времени |
workQueue | BlockingQueue | очередь ожидания потока |
threadFactory | ThreadFactory | фабрика по созданию ниток |
handler | RejectedExecutionHandler | политика отказа |
Давайте посмотрим на базовый исходный код метода execute класса ThreadPoolExecutor для анализа. | ||
Хорошо, согласно решению: |
1. Если запущено меньше потоков, чем потоков corePoolSize, попробуйте запустить новый поток с данной командой в качестве первой задачи.
2. Если задача может быть успешно поставлена в очередь, то нам все равно нужно перепроверить, следует ли добавить поток (потому что существующий поток умер с момента последней проверки), или пул был закрыт с момента входа в этот метод. Так что мы перепроверяем состояние и при необходимости откатываем очередь, если она остановлена, или запускаем новый поток, если потока нет.
3. Если мы не можем поставить задачу в очередь, то попробуйте добавить новый поток. Если это не удается, мы знаем, что у нас нет или насыщение, и отклоняем задачу.
2. использование newSingleThreadExecutor
Код выглядит следующим образом (пример):
@Test
public void testNewSingleThreadExecutor() {
ExecutorService threaPool = Executors.newSingleThreadExecutor();
long start = System.currentTimeMillis();
System.out.println("线程池执行开始");
int idx = 10;
while (--idx > 0) {
threaPool.execute(() -> {
try {
LOGGER.info("线程执行中");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
});
}
threaPool.shutdown();
for (; ; ) {
if (threaPool.isTerminated())
break;
}
long end = System.currentTimeMillis();
System.out.println("线程池执行结束,总用时:" + (end - start) + " ms ");
}
Результат запуска этого метода тестирования выглядит следующим образом:Обратите внимание на место, которое я отметил красным прямоугольником, для выполнения используется только один поток, каков принцип? Давайте посмотрим на исходный код newSingleThreadExecutor.
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
Создан пул потоков ThreadPoolExecutor с 1 основным потоком и 1 максимальным потоком выполнения. Очередь ожидания — LinkedBlockingQueue. Давайте нажмем и посмотрим, что представляет собой конструктор LinkedBlockingQueue по умолчанию.
Вы можете видеть, что это очередь с емкостью по умолчанию Interger.MAX_VALUE.
Вывод: newSingleThreadExecutor — это основной поток, равный 1, максимально допустимый поток в пуле потоков — 1, а очередь ожидания — это бесконечный пул потоков, поэтому вы должны знать, почему он открывает только один поток для выполнения.
3. Использование newFixedThreadPool
Код выглядит следующим образом (пример):
@Test
public void testNewFixedThreadPool() {
ExecutorService threaPool = Executors.newFixedThreadPool(5);
long start = System.currentTimeMillis();
System.out.println("线程池执行开始");
int idx = 20;
while (--idx >= 0) {
threaPool.execute(() -> {
try {
LOGGER.info("线程执行中");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
});
}
threaPool.shutdown();
for (; ; ) {
if (threaPool.isTerminated())
break;
}
long end = System.currentTimeMillis();
System.out.println("线程池执行结束,总用时:" + (end - start) + " ms ");
}
Сначала посмотрим на результаты выполнения
ОК, глядя на результаты выполнения, мы видим, что открыто только 5 потоков, а в пакетном режиме выполняется 5. Далее давайте посмотрим на его исходный код.
Также строится пул потоков ThreadPoolExecutor.Параметры: количество основных потоков и максимальное количество потоков в пуле потоков - входящие параметры. Модульный тест проходит 5, поэтому 5 потоков открываются для запуска, и эти 5 потоки повторно используются после запуска для выполнения в очереди.
Вывод: newFixedThreadPool — это пул потоков, который выполняет фиксированный размер в соответствии с входящими параметрами.
4. использование newCachedThreadPool
Код выглядит следующим образом (пример):
@Test
public void testNewCachedThreadPool() {
ExecutorService threaPool = Executors.newCachedThreadPool();
long start = System.currentTimeMillis();
System.out.println("线程池执行开始");
int idx = 200;
while (--idx >= 0) {
threaPool.execute(() -> {
LOGGER.info("线程执行中");
});
}
threaPool.shutdown();
for (; ; ) {
if (threaPool.isTerminated())
break;
}
long end = System.currentTimeMillis();
System.out.println("线程池执行结束,总用时:" + (end - start) + " ms ");
}
Хорошо, это отличается от приведенного выше. Давайте выполним 200 потоков. Давайте сначала посмотрим на результаты выполнения.Очевидно, что она отличается от вышеописанной.При выполнении задач с малым временем выполнения многократно используются потоки для выполнения.В чем причина? Давайте сначала посмотрим на исходный код
Создан основной поток с номером 0, максимальный поток выполнения — Interger.MAX_VALUE, и обратите внимание, что здесь используется очередь SynchronousQueue, SynchronousQueue не имеет емкости, это небуферизованная ожидающая очередь, блокирующая очередь, которая не хранит элементы и будет непосредственная отправка задач потребителям: перед продолжением добавления новых элементов необходимо дождаться использования добавленных элементов в очереди.
SynchronousQueue, что касается ее базового принципа, я напишу статью об очередях позже, поэтому не буду здесь вдаваться в подробности.
Вывод: newCachedThreadPool — это пул потоков, который можно расширять до бесконечности, при отсутствии бездействующего потока он создаст новый поток, а при наличии бездействующего потока будет использовать для обработки незанятый поток.
5. Рекомендации по использованию пулов потоков
Благодаря приведенным выше тестовым примерам и анализу исходного кода я считаю, что у каждого есть определенное понимание пула потоков, которое можно резюмировать следующим образом:
1. newSingleThreadExecutor: запускается только один поток, эффективность обработки низкая, размер блокирующей очереди не ограничен, если в очереди накапливается слишком много данных, это приведет к потреблению ресурсов.
2. newFixedThreadPool: пул потоков фиксированного размера, который может контролировать количество одновременных потоков, но размер блокирующей очереди не ограничен.Если в очереди накапливается слишком много данных, это приведет к потреблению ресурсов.
3. newCachedThreadPool: больше подходит для обработки бизнеса с коротким временем выполнения, но если потоки создаются без ограничений, это может привести к слишком большому использованию памяти и OOM, а также к чрезмерному переключению ЦП и потреблению слишком большого количества ресурсов.
Поэтому рекомендуется реализовать собственный ThreadPoolExecutor в соответствии с бизнес-сценариями, особенно для систем с высокой степенью параллелизма и большим трафиком, поэтому Alibaba не рекомендует использовать вышеуказанные пулы потоков.
Примечание автора
Это кажется простым? Для более подробного использования, пожалуйста, нажмите ниже, чтобы просмотреть исходный код, следуйте за мной, чтобы показать вам более расширенное использование
Адрес источника:Нажмите здесь, чтобы просмотреть исходный код.