В проекте используется пул потоков, но на самом деле многие не знакомы с принципом, тут просто разобраться
ThreadPoolExecutor
Класс java.util.concurrent.ThreadPoolExecutor является основным классом в пуле потоков.
- Метод строительства
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
- параметр
corePoolSize количество основных потоков
maxPoolSize Максимальное количество потоков Общее количество потоков после того, как блокирующая очередь не может вместиться Включая corePoolSize
keepAliveTime Время ожидания Время выживания текущего бездействующего потока в пуле потоков после обслуживания задачи. Если времени достаточно, другие задачи могут быть обслужены
единица времени
WorkQueue Очередь блокировки Количество потоков больше, чем основной поток и помещается в очередь
фабрика пулов потоков threadFactory
Политика отклонения обработчика Очередь блокировки заполнена и достигнуто максимальное количество потоков Выполните политику отклонения
-
- corePoolSize: размер основного пула после создания пула потоков, то есть до того, как задачи не поступят вызывается () метод для предварительного создания потока.При поступлении задачи будет создан поток для выполнения задачи.Когда количество потоков в пуле потоков достигнет corePoolSize, прибывшая задача будет помещена в очередь кеша;
-
- keepAliveTime: указывает, как долго поток будет остановлен, если задача не выполняется. По умолчанию keepAliveTime будет работать только тогда, когда количество потоков в пуле потоков больше, чем corePoolSize, пока количество потоков в пуле потоков не будет больше, чем corePoolSize, то есть когда количество потоков в пуле потоков больше чем corePoolSize, если поток бездействует. Когда время достигает keepAliveTime, он завершится до тех пор, пока количество потоков в пуле потоков не превысит corePoolSize. Однако, если вызывается метод allowCoreThreadTimeOut(boolean), когда количество потоков в пуле потоков не больше, чем corePoolSize, параметр keepAliveTime также будет работать до тех пор, пока количество потоков в пуле потоков не станет равным 0;
- Процесс
1) Когда размер пула меньше, чем corePoolSize, создается новый поток и обрабатывается запрос
2) Когда размер пула равен corePoolSize, поместите запрос в workQueue, и простаивающие потоки в пуле будут получать задачи из workQueue и обрабатывать их.
3) Когда workQueue не может поставить вновь поступившую задачу, новый поток входит в пул и обрабатывает запрос (не дожидаясь очереди).Если размер пула достигает maxPoolSize, для обработки отклонения используется RejectedExecutionHandler.
4) Кроме того, при количестве потоков в пуле больше, чем corePoolSize, лишние потоки будут долго ждать keepAliveTime, а при отсутствии запроса на обработку уничтожатся сами
Шаги обработки: основной поток
- Популярное объяснение процесса
Предположим, есть фабрика, на фабрике 10 рабочих, каждый рабочий может одновременно выполнять только одну задачу. Следовательно, пока один из 10 рабочих простаивает, задача будет назначена этому бездействующему работнику;
Когда у 10 рабочих есть задачи, если есть задача, задача будет поставлена в очередь;
Если скорость роста числа новых задач намного выше, чем скорость выполнения задач рабочими, то начальник фабрики может захотеть принять меры по исправлению положения, например, снова нанять 4 временных рабочих;
Затем задачи также назначаются этим 4 временным работникам;
Если говорят, что скорости 14 рабочих недостаточно, начальник фабрики может подумать о том, чтобы не принимать новые задания или отказаться от некоторых предыдущих заданий.
Когда некоторые из 14 рабочих свободны, а темпы роста новых задач относительно медленны, начальник фабрики может рассмотреть вопрос об увольнении 4 временных рабочих и оставить только первоначальных 10. В конце концов, наем дополнительных рабочих стоит денег.
В этом примере corePoolSize равен 10, а maxPoolSize равен 14 (10+4).
Другими словами, corePoolSize — это размер пула потоков, на мой взгляд, maxPoolSize — это средство от пула потоков, то есть средство, когда количество задач вдруг становится слишком большим.
- очередь блокировки
1) ArrayBlockingQueue: очередь в порядке очереди на основе массива, размер которой необходимо указывать при создании очереди;
2) LinkedBlockingQueue: очередь в порядке очереди на основе связанного списка.Если размер очереди не указан при ее создании, значением по умолчанию является Integer.MAX_VALUE;
3) synchronousQueue: эта очередь особенная, она не сохраняет отправленные задачи, а напрямую создает новый поток для выполнения новой задачи.
- политика отказачетыре
AbortPolicy по умолчанию напрямую отбрасывает и выдает исключение
DiscardPolicy напрямую отбрасывает, не вызывая исключений
CallerRunsPolicy выполняется в основном потоке
DiscardOldestPolicy отбрасывает самую старую из очереди регистрации и выполняет текущую.
Пользовательская стратегия может реализовать RejectedExecutionHandler.
- Прецедент
public class Test {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(5));
for(int i=0;i<15;i++){
MyTask myTask = new MyTask(i);
executor.execute(myTask);
System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+
executor.getQueue().size()+",已执行完别的任务数目:"+executor.getCompletedTaskCount());
}
executor.shutdown();
}
}
class MyTask implements Runnable {
private int taskNum;
public MyTask(int num) {
this.taskNum = num;
}
@Override
public void run() {
System.out.println("正在执行task "+taskNum);
try {
Thread.currentThread().sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task "+taskNum+"执行完毕");
}
}
- Результаты
正在执行task 0
线程池中线程数目:1,队列中等待执行的任务数目:0,已执行完别的任务数目:0
线程池中线程数目:2,队列中等待执行的任务数目:0,已执行完别的任务数目:0
正在执行task 1
线程池中线程数目:3,队列中等待执行的任务数目:0,已执行完别的任务数目:0
正在执行task 2
线程池中线程数目:4,队列中等待执行的任务数目:0,已执行完别的任务数目:0
正在执行task 3
线程池中线程数目:5,队列中等待执行的任务数目:0,已执行完别的任务数目:0
正在执行task 4
线程池中线程数目:5,队列中等待执行的任务数目:1,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:2,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:3,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:4,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:5,已执行完别的任务数目:0
线程池中线程数目:6,队列中等待执行的任务数目:5,已执行完别的任务数目:0
正在执行task 10
线程池中线程数目:7,队列中等待执行的任务数目:5,已执行完别的任务数目:0
正在执行task 11
线程池中线程数目:8,队列中等待执行的任务数目:5,已执行完别的任务数目:0
正在执行task 12
线程池中线程数目:9,队列中等待执行的任务数目:5,已执行完别的任务数目:0
正在执行task 13
线程池中线程数目:10,队列中等待执行的任务数目:5,已执行完别的任务数目:0
正在执行task 14
task 3执行完毕
task 0执行完毕
task 2执行完毕
task 1执行完毕
正在执行task 8
正在执行task 7
正在执行task 6
正在执行task 5
task 4执行完毕
task 10执行完毕
task 11执行完毕
task 13执行完毕
task 12执行完毕
正在执行task 9
task 14执行完毕
task 8执行完毕
task 5执行完毕
task 7执行完毕
task 6执行完毕
task 9执行完毕
-
Как разумно настроить пул потоков
-
Чтобы правильно настроить пул потоков, вы должны сначала проанализировать характеристики задачи, которые можно проанализировать со следующих точек зрения:
- Характер задач: задачи с интенсивным использованием ЦП, задачи с интенсивным вводом-выводом и смешанные задачи.
- Приоритет задачи: высокий, средний и низкий.
- Время выполнения задачи: долгое, среднее и короткое.
- Зависимость задачи: зависит ли она от других системных ресурсов, таких как соединения с базой данных.
Задачи с разным характером задач могут обрабатываться отдельно пулами потоков разного размера. Настройте как можно меньше потоков для задач с интенсивным использованием ЦП, таких как настройка Ncpu+1 пул потоков для потоков. Задачи с интенсивным вводом-выводом должны ждать операций ввода-вывода, а потоки не всегда выполняют задачи, поэтому настройте как можно больше потоков, например 2*Ncpu. Смешанные задачи, если их можно разделить, разделите их на задачу с интенсивным использованием ЦП и задачу с интенсивным вводом-выводом.Пока время выполнения двух задач не слишком отличается, пропускная способность разделения будет выше.Из-за пропускная способность последовательного выполнения, если время выполнения двух задач слишком отличается, нет необходимости в декомпозиции. Мы можем получить количество процессоров текущего устройства с помощью метода Runtime.getRuntime(). AvailableProcessors().
Задачи с разными приоритетами можно обрабатывать с помощью приоритетной очереди PriorityBlockingQueue. Это позволяет выполнять задачи с высоким приоритетом в первую очередь.Следует отметить, что если задачи с высоким приоритетом всегда помещаются в очередь, задачи с низким приоритетом могут никогда не выполняться.
Задачи с разным временем выполнения могут быть переданы для обработки в пулы потоков разного размера, или можно использовать приоритетную очередь, чтобы разрешить выполнение задач с коротким временем выполнения в первую очередь.
Зависит от задачи пула соединений с базой данных, потому что поток должен ждать, пока база данных вернет результат после отправки SQL.Если время ожидания больше, время простоя ЦП будет больше, тогда количество потоков должно быть установить больше, чтобы лучше использовать ЦП.
Ограниченные очереди рекомендуются, ограниченная очередь может повысить стабильность и возможности раннего предупреждения системы и может быть увеличена в соответствии с потребностями, например, до нескольких тысяч. После того, как очереди и пулы потоков фонового пула задач, используемые нашей группой, были заполнены, и постоянно выбрасывалось исключение заброшенных задач.В результате расследования было обнаружено, что возникла проблема с базой данных, которая вызвала выполнение SQL стать очень медленным, потому что пул потоков фоновых задач Все задачи в пуле потоков должны запрашивать и вставлять данные в базу данных, поэтому все рабочие потоки в пуле потоков блокируются, а невыполненные задачи накапливаются в пуле потоков . Если в это время мы установим для него неограниченную очередь, в пуле потоков будет все больше и больше очередей, которые могут заполнить память, в результате чего будет недоступна вся система, а не только фоновые задачи. Конечно, все задачи в нашей системе развернуты на отдельных серверах, и мы используем пулы потоков разного размера для запуска разных типов задач, но когда возникают такие проблемы, другие задачи также будут затронуты.
-
- Лучшие практики
//创建线程池指定有意义的线程名字, 方便出错是回溯
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("demo-pool-%d").build();
ExecutorService singleThreadPool = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
singleThreadPool.execute(()-> System.out.println(Thread.currentThread().getName())
);
public class TimerTaskThread extends Thread {
public TimerTaskThread(){
super.setName("TimerTaskThread");
…
}
…
}