В разделе параллельного программирования руководства по разработке Alibaba есть статья: Пулы потоков не могут использовать Executors для создания, но через ThreadPoolExecutor, причина его отключения с помощью анализа исходного кода.
написать впереди
Прежде всего, спасибо, что прочитали эту статью в промежутке между постройкой здания.Прочитав эту статью, вы узнаете:
- Определение пула потоков
- Несколько способов для исполнителей создавать пулы потоков
- Объекты ThreadPoolExecutor
- Связь между логикой задачи выполнения пула потоков и параметрами пула потоков
- Исполнители создают возвращаемые объекты ThreadPoolExecutor
- Тест исключения OOM
- Как определить параметры пула потоков
Если вы просто хотите узнать причину, вы можете вытащить ее прямо в сводку.
Определение пула потоков
Управление набором рабочих потоков. Повторное использование потоков через пул потоков имеет следующие преимущества:
- Уменьшите создание ресурсов => уменьшите накладные расходы на память, создайте потоки, занимающие память
- Уменьшите нагрузку на систему => создание потока требует времени, задерживает обработку запросов
- Улучшить стабильность => избежать создания бесконечного потока
OutOfMemoryError
[сокращенно ООМ]
Как исполнители создают пулы потоков
Создание пулов потоков в зависимости от типа возвращаемых объектов можно разделить на три категории:
- Создать и вернуть объект ThreadPoolExecutor
- Создание и возврат объекта ScheduleThreadPoolExecutor
- Создать и вернуть объект ForkJoinPool
В этой статье обсуждается только создание возвратовThreadPoolExecutor
объект
Объект ThreadPoolExecutor
ПредставляемExecutors
Прежде чем создавать метод пула потоков, давайте представим егоThreadPoolExecutor
, так как все эти статические методы создания пулов потоков возвращаютThreadPoolExecutor
объект, и мы вручную создаемThreadPoolExecutor
Отличие объектов в том, что нам не нужно самим передавать параметры конструктора.ThreadPoolExecutor
Есть четыре конструктора, но все они вызывают один и тот же:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
Описание параметра конструктора:
- corePoolSize => количество основных потоков пула потоков
- maxPoolSize => максимальное количество пулов потоков
- keepAliveTime => время выживания бездействующего потока
- единица => единица времени
- workQueue => буферная очередь, используемая пулом потоков
- threadFactory => фабрика, используемая пулом потоков для создания потоков
- обработчик => стратегия обработки пула потоков для отклоненных задач
Связь между логикой задачи выполнения пула потоков и параметрами пула потоков
Описание логики выполнения:- Определите, заполнено ли количество основных потоков, размер количества основных потоков и
corePoolSize
Параметр связан, если он не полный, создайте поток для выполнения задачи - Если основной пул потоков заполнен, определите, заполнена ли очередь, а также заполнена ли очередь и
workQueue
Параметр связан, если он не заполнен, он будет добавлен в очередь - Если очередь заполнена, определите, заполнен ли пул потоков, а также заполнен ли пул потоков и
maximumPoolSize
Параметр связан, если он не полный, создайте поток для выполнения задачи - Если пул потоков заполнен, политика отказа используется для обработки задач, которые невозможно выполнить.
handler
связанный с параметром
Исполнители создают возвращаемые объекты ThreadPoolExecutor
Executors
Существует три способа создания и возврата объекта ThreadPoolExecutor:
- Executors#newCachedThreadPool => Создать кэшируемый пул потоков
- Executors#newSingleThreadExecutor => Создать пул однопоточных потоков
- Executors#newFixedThreadPool => Создать пул потоков фиксированной длины
Executors#newCachedThreadPool метод
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
CachedThreadPool
это пул потоков, который создает новые потоки по мере необходимости
- corePoolSize => 0, количество пулов основных потоков равно 0
- maxPoolSize => Integer.MAX_VALUE, максимальное количество пулов потоков — Integer.MAX_VALUE, можно считать, что потоки можно создавать бесконечно
- keepAliveTime => 60L
- единица => секунды
- workQueue => SynchronousQueue
Когда поставлена задача,corePoolSize
для 0, чтобы не создавать основные потоки,SynchronousQueue
Это очередь, которая не хранит элементы.Можно понять, что очередь всегда заполнена, поэтому в конечном итоге будут созданы неосновные потоки для выполнения задач. Для неосновных потоков они будут перезапущены при бездействии в течение 60 с.так какInteger.MAX_VALUE
Очень большой, можно считать, что потоки можно создавать бесконечно, и легко вызвать исключения OOM в случае ограниченных ресурсов
Executors#newSingleThreadExecutor метод
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
SingleThreadExecutor
это однопоточный пул потоков только с одним основным потоком
- corePoolSize => 1, количество пулов основных потоков равно 1
- maxPoolSize => 1, максимальное количество пулов потоков равно 1, то есть можно создать не более одного потока, и единственный поток — основной поток
- keepAliveTime => 0L
- единица => миллисекунды
- workQueue => LinkedBlockingQueue
При отправке задачи сначала создается основной поток для выполнения задачи. Если количество основных потоков превышает количество основных потоков, задача помещается в очередь.так какLinkedBlockingQueue
длинаInteger.MAX_VALUE
Очередь можно рассматривать как неограниченную очередь, поэтому в очередь может быть вставлено бесконечное количество задач, что легко вызвать, когда ресурсы ограничены.OOM
аномальный, а из-за неограниченной очередиmaximumPoolSize
иkeepAliveTime
Параметр будет недопустимым, и неосновной поток вообще не будет создан.
Executors#newFixedThreadPool метод
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
FixedThreadPool
Это пул фиксированных основных потоков, и количество фиксированных основных потоков передается пользователем.
- corePoolSize => nThreads, количество пулов основных потоков равно 1
- maxPoolSize => nThreads, максимальное количество пулов потоков равно nThreads, то есть максимально могут быть созданы только потоки nThreads
- keepAliveTime => 0L
- единица => миллисекунды
- рабочая очередь => LinkedBlockingQueue
это и
SingleThreadExecutor
Точно так же единственная разница заключается в количестве основных потоков, и из-заиспользуетLinkedBlockingQueue
, что легко вызвать, когда ресурсы ограниченыOOM
аномальный
Суммировать:
- FixedThreadPool и SingleThreadExecutor => Допустимая длина очереди запросов — Integer.MAX_VALUE, что может аккумулировать большое количество запросов, вызывая
OOM
аномальный - CachedThreadPool => Число потоков, разрешенных для создания, равно Integer.MAX_VALUE, что может создать большое количество потоков, вызывая
OOM
аномальный
Вот почему запрещено использоватьExecutors
Для создания пула потоков рекомендуется создать его самостоятельноThreadPoolExecutor
причина
Тест исключения OOM
Теоретически будетOOM
Исключение, необходимо протестировать волну, чтобы проверить предыдущее утверждение:
Класс теста: TaskTest.java
public class TaskTest {
public static void main(String[] args) {
ExecutorService es = Executors.newCachedThreadPool();
int i = 0;
while (true) {
es.submit(new Task(i++));
}
}
}
использоватьExecutors
созданныйCachedThreadPool
, Добавить потоки в пул потоков
Перед началом тестового классаJVM
Регулировка памяти должна быть меньше, иначе компьютер легко столкнется с проблемами [Не спрашивайте меня, почему я знаю, это Tie Hanhantian! ! ! 】,существуетidea
внутри:Run
-> Edit Configurations
JVM
Описание параметра:
- -Xms10M =>
Java Heap
значение инициализации памяти - -Xmx10M =>
Java Heap
максимальная память
результат операции:
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "main"
Disconnected from the target VM, address: '127.0.0.1:60416', transport: 'socket'
Начать отчет, когда создано более 3 недельных тем.OOM
Ошибка
Два других пула потоков тестироваться не будут, методы тестирования те же, но созданные пулы потоков разные.
Как определить параметры пула потоков
-
интенсивное использование процессора=> Рекомендуемый размер пула потоков
CPU
количество + 1,CPU
ЧислоRuntime.availableProcessors
метод получить -
Интенсивный ввод-вывод =>
CPU
количество *CPU
Использование * (1 + время ожидания потока / процессорное время потока) -
Гибридный=> Разделите задачи на
CPU
интенсивный иIO
Интенсивный, а затем использовать разные пулы потоков для обработки, чтобы каждый пул потоков можно было настроить в соответствии с его собственной рабочей нагрузкой. - очередь блокировки=> Рекомендуется использовать ограниченную очередь, что помогает избежать исчерпания ресурсов
-
политика отказа=> По умолчанию
AbortPolicy
Политика отказа, кинутая прямо в программуRejectedExecutionException
Исключение [потому что это исключение времени выполнения, а не обязательноеcatch
], этот подход недостаточно элегантен. Существует несколько рекомендуемых стратегий обработки отказов:- захват в программе
RejectedExecutionException
Exception, задача обрабатывается в пойманном исключении. против политики отказа по умолчанию - использовать
CallerRunsPolicy
Стратегия отклонения, эта стратегия дает задание потоку, вызывающему выполнение, для выполнения [обычно основной поток], в это время основной поток не сможет отправлять какие-либо задачи в течение определенного периода времени, так что рабочий поток может обработать выполняемую задачу. Темы, отправленные в этот момент, будут сохранены вTCP
В очереди полная очередь TCP повлияет на клиента, что приведет к незначительному снижению производительности. - Пользовательская политика отказа, просто нужно реализовать
RejectedExecutionHandler
интерфейс - Если задача не особо важна, используйте
DiscardPolicy
иDiscardOldestPolicy
Политика отказа также может отбросить задачу.
- захват в программе
Если вы используете статический метод Executors для созданияThreadPoolExecutor
объект, доступ к которому можно получить с помощьюSemaphore
Ограничение выполнения задач также позволяет избежать возникновенияOOM
аномальный
Из-за отсутствия опыта в определении параметров пула потоков, это все теоретические знания, опытные начальники могут добавить.
Учись понемногу каждый день, давай, добро пожаловать лайк
Прикреплен к предыдущим статьям: добро пожаловать, читайте, лайкайте и комментируйте
Связанные с шаблоном проектирования:
1. Паттерн синглтон, ты правда правильно пишешь?
2. (Стратегический режим + Заводской режим + карта) переключатель case в пакете Kill project
Связанные с JAVA8:
1. Оптимизируйте свой код с помощью Stream API
2. Уважаемый, вместо Date рекомендуется использовать LocalDateTime
Связанные с базой данных:
1. Сравнение эффективности запросов типов времени базы данных mysql datetime, bigint и timestamp.
2. Очень доволен! Наконец-то ступил на яму медленного запроса
- эффективный:
1. Создайте каркас Java, чтобы унифицировать стиль структуры командного проекта.
Связанный журнал:
1. Структура журнала, выберите Logback или Log4j2?
2. Файл конфигурации Logback пишется так, а TPS увеличен в 10 раз.
Связанные с инженерией:
1. Когда вам нечего делать, пишите LRU локальный кеш
2. Redis реализует аналогичный функциональный модуль
3. JMX Visual Monitoring Thread Pool
4. Управление разрешениями [Spring Security]
5. Spring пользовательская аннотация от входа до мастерства
6. Java имитирует посадку на Youku
7. QPS такой высокий, давайте писать многоуровневый кеш
8. Java использует phantomjs для создания снимков экрана
разное:
1. Используйте попытку с ресурсами, чтобы изящно закрыть ресурсы
2. Босс, зачем использовать сумму плавающего хранилища, чтобы вычесть мою оплату?