Принцип реализации пула потоков ThreadPoolExecutor

Java задняя часть база данных SQL
Принцип реализации пула потоков ThreadPoolExecutor

Оригинальная статья и краткое изложение опыта и жизненные перипетии от набора в школу до фабрики А

Нажмите, чтобы узнать подробностиwww.codercc.com

1. Зачем использовать пулы потоков

В реальном использовании потоки занимают много системных ресурсов, и если потоки не управляются должным образом, легко вызвать системные проблемы. Поэтому в большинстве параллельных фреймворков будет использоватьсяПул потоковДля управления потоками использование пулов потоков для управления потоками имеет следующие преимущества:

  1. Сокращение потребления ресурсов. Сведите к минимуму потери производительности системы за счет повторного использования существующих потоков и сокращения количества отключений потоков;
  2. Улучшить скорость отклика системы. При повторном использовании потоков процесс создания потоков исключается, поэтому общая скорость отклика системы повышается;
  3. Улучшить управляемость потоками. Потоки — дефицитные ресурсы, если их создавать без ограничений, то они будут не только потреблять системные ресурсы, но и снижать стабильность системы, поэтому для управления потоками необходимо использовать пулы потоков.

2. Как работает пул потоков

Когда параллельная задача отправляется в пул потоков, процесс назначения потоков в пул потоков для выполнения задачи показан на следующем рисунке:

线程池执行流程图.jpg
Блок-схема выполнения пула потоков.jpg

Как видно из рисунка, процесс задачи, представленный пулом потоков, в основном такой, как:

  1. Сначала определите пул потоковосновной пул потоковВсе ли потоки выполняют задачи. Если нет, создайте новый поток для выполнения только что отправленной задачи, в противном случае все потоки в основном пуле потоков выполняют задачу, затем перейдите к шагу 2;
  2. судить о текущемочередь блокировкиПолна ли она, если нет, поместить отправленную задачу в очередь на блокировку, иначе перейти к шагу 3;
  3. судитьВсе потоки в пуле потоковВыполняются ли все задачи, если нет, создается новый поток для выполнения задачи, в противном случае она передается на обработку стратегии насыщения.

3. Создание пула потоков

Создание пула потоков в основномThreadPoolExecutorКласс для завершения, ThreadPoolExecutor имеет много перегруженных методов построения, через метод построения с наибольшим количеством параметров, чтобы понять, какие параметры необходимо настроить для создания пула потоков. Конструктор ThreadPoolExecutor:

ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

Параметры описаны ниже:

  1. corePoolSize: указывает размер основного пула потоков. При отправке задачи, если количество потоков в текущем пуле основных потоков не достигает значения corePoolSize, для выполнения отправленной задачи будет создан новый поток.Даже если в текущем пуле основных потоков есть незанятые потоки. Если количество потоков в текущем пуле основных потоков достигло значения corePoolSize, потоки не будут воссозданы. если называетсяprestartCoreThread()илиprestartAllCoreThreads(), все основные потоки будут созданы и запущены при создании пула потоков.
  2. maxPoolSize: указывает максимальное количество потоков, которые может создать пул потоков. Если очередь блокировки заполнена и количество потоков в текущем пуле потоков не превышает maxPoolSize, для выполнения задачи будет создан новый поток.
  3. keepAliveTime: время выживания бездействующего потока. Если количество потоков в текущем пуле потоков превысило corePoolSize, а время простоя потоков превышает keepAliveTime, эти простаивающие потоки будут уничтожены, что может максимально снизить потребление системных ресурсов.
  4. единица измерения: единица измерения времени. Укажите единицу времени для keepAliveTime.
  5. workQueue: очередь блокировки. Очередь блокировки для хранения задач, об очередях блокировкиВы можете увидеть эту статью. можно использоватьArrayBlockingQueue, LinkedBlockingQueue, SynchronousQueue, PriorityBlockingQueue.
  6. threadFactory: инженерный класс для создания потоков. Вы можете задать более осмысленное имя для каждого созданного потока, указав фабрику потоков.Если есть проблема параллелизма, также легко найти причину проблемы.
  7. обработчик: политика насыщения. Когда очередь блокировки пула потоков заполнена и указанный поток открыт, это означает, что текущий пул потоков насыщен, и тогда необходимо принять стратегию для решения этой ситуации. Используемые стратегии следующие:
    1. AbortPolicy: напрямую отклонить отправленную задачу и броситьRejectedExecutionExceptionаномальный;
    2. CallerRunsPolicy: для выполнения задачи используйте только тот поток, в котором находится вызывающий объект;
    3. DiscardPolicy: отменить задачу без ее обработки;
    4. DiscardOldestPolicy: отменить задачу с наибольшим временем хранения в очереди блокировки и выполнить текущую задачу.

Логика выполнения пула потоков

После создания пула потоков через ThreadPoolExecutor, как происходит процесс выполнения после отправки задачи?Давайте посмотрим на исходный код. Исходный код метода execute выглядит следующим образом:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
	//如果线程池的线程个数少于corePoolSize则创建新线程执行当前任务
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
	//如果线程个数大于corePoolSize或者创建线程失败,则将任务存放在阻塞队列workQueue中
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
	//如果当前任务无法放进阻塞队列中,则创建新的线程来执行任务
    else if (!addWorker(command, false))
        reject(command);
}

См. комментарии для логики выполнения метода выполнения ThreadPoolExecutor. На следующем рисунке показана диаграмма выполнения метода execute класса ThreadPoolExecutor:

execute执行过程示意图.jpg
Схематическая диаграмма выполнения процесса выполнения.jpg

Логика выполнения метода execute имеет следующие ситуации:

  1. Если количество запущенных в данный момент потоков меньше, чем corePoolSize, будут созданы новые потоки для выполнения новых задач;
  2. Если количество запущенных потоков равно или больше, чем corePoolSize, отправленные задачи будут храниться в очереди блокировки workQueue;
  3. Если текущая очередь workQueue заполнена, для выполнения задачи будет создан новый поток;
  4. Если количество потоков превысило максимальный размер пула, для обработки будет использоваться стратегия насыщения RejectedExecutionHandler.

Следует отметить, что идея дизайна пула потоков заключается в использованииБазовый пул потоков corePoolSize, блокирующая очередь workQueue и максимальный пул потоков, такая стратегия кэширования для обработки задач, на самом деле, эта дизайнерская идея будет использоваться в рамках потребности.

4. Закрытие пула потоков

Закройте пул потоков, вы можете пройтиshutdownиshutdownNowэти два метода. Их принцип заключается в обходе всех потоков в пуле потоков, а затем прерывании потоков по очереди.shutdownиshutdownNowОтличия все же есть:

  1. shutdownNowСначала установите состояние пула потоков вSTOP, затем попробуйтеОстановить все запущенные и ожидающие задачии возвращает список задач, ожидающих выполнения;
  2. shutdownПросто установите состояние пула потоков наSHUTDOWNсостояние, затем прервите все потоки, которые не выполняют задачи

Видно, что метод shutdown продолжит выполнение выполняющейся задачи, а shutdownNow напрямую прервет выполняющуюся задачу. Любой из этих двух методов вызывается,isShutdownМетод вернет true, когда все потоки будут успешно закрыты, это означает, что пул потоков успешно закрыт, затем вызовитеisTerminatedметод вернет true.

5. Как правильно настроить параметры пула потоков?

Чтобы правильно настроить пул потоков, вы должны сначала проанализировать характеристики задачи, которые можно проанализировать со следующих точек зрения:

  1. Характер задач: задачи с интенсивным использованием ЦП, задачи с интенсивным вводом-выводом и смешанные задачи.
  2. Приоритет задачи: высокий, средний и низкий.
  3. Время выполнения задачи: долгое, среднее и короткое.
  4. Зависимость задачи: зависит ли она от других системных ресурсов, таких как соединения с базой данных.

Задачи с разным характером задач могут обрабатываться отдельно пулами потоков разного размера. Настройте как можно меньше потоков для задач с интенсивным использованием ЦП, таких как настройкаNcpu+1Пул потоков потоков. Задачи с интенсивным вводом-выводом должны ждать операций ввода-вывода, а потоки не всегда выполняют задачи, поэтому настройте как можно больше потоков, например2xNcpu. Смешанные задачи, если их можно разделить, разделите их на задачу с интенсивным использованием ЦП и задачу с интенсивным вводом-выводом.Пока время выполнения двух задач не слишком отличается, пропускная способность разделенного выполнения будет выше. к пропускной способности последовательного выполнения, если время выполнения двух задач слишком отличается, нет необходимости в декомпозиции. мы можем пройтиRuntime.getRuntime().availableProcessors()Метод получает количество процессоров текущего устройства.

Задачи с разными приоритетами можно обрабатывать с помощью приоритетной очереди PriorityBlockingQueue. Это позволяет выполнять задачи с более высоким приоритетом в первую очередь.Следует отметить, что если задачи с более высоким приоритетом всегда помещаются в очередь, задачи с более низким приоритетом могут никогда не выполняться.

Задачи с разным временем выполнения могут быть переданы для обработки в пулы потоков разного размера, или можно использовать приоритетную очередь, чтобы разрешить выполнение задач с коротким временем выполнения в первую очередь.

Зависит от задачи пула соединений с базой данных, потому что поток должен ждать, пока база данных вернет результат после отправки SQL.Если время ожидания больше, время простоя ЦП больше, то количество потоков должно быть установлено больше, чтобы лучше использовать ЦП.

И очередь блокировкиЛучше использовать ограниченную очередь, если используется неограниченная очередь, то после того, как задача окажется в очереди блокировки, она займет слишком много ресурсов памяти и даже вызовет сбой системы.

использованная литература

Искусство параллельного программирования на Java

Анализ исходного кода ThreadPoolExecutor, очень подробный