ScheduledThreadPoolExecutor пула потоков

задняя часть исходный код UML
ScheduledThreadPoolExecutor пула потоков

Оригинальная статья, краткое изложение опыта и жизненные перипетии на всем пути от набора в школу до фабрики.

Нажмите, чтобы узнать подробностиwww.codercc.com

1. Введение в ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor можно использовать для выполнения асинхронных задач или периодического выполнения задач после заданной задержки.По сравнению с таймером планирования задач его функция более мощная.Таймер может использовать только один фоновый поток для выполнения задач, а ScheduledThreadPoolExecutor может передать конструктор.чтобы указать количество фоновых потоков. Схема UML класса ScheduledThreadPoolExecutor выглядит следующим образом:

ScheduledThreadPoolExecutor类的UML图.png
Диаграмма UML класса ScheduledThreadPoolExecutor.png
  1. Как видно из диаграммы UML, ScheduledThreadPoolExecutor наследуетThreadPoolExecutor, то есть ScheduledThreadPoolExecutor имеет базовые функции execute() и submit() для отправки асинхронных задач.Вы можете увидеть эту статью. Однако класс ScheduledThreadPoolExecutor реализуетScheduledExecutorService, этот интерфейс определяет функцию ScheduledThreadPoolExecutor, которая может задерживать и периодически выполнять задачи;
  2. ПланируютthreadPoolExecutoCutor также имеет два важных класса внутренних классов:DelayedWorkQueueиScheduledFutureTask. Видно, что DelayedWorkQueue реализует интерфейс BlockingQueue, представляющий собой блокирующую очередь, а ScheduledFutureTask наследует класс FutureTask, что также означает, что этот класс используется для возврата результатов асинхронных задач. Эти два ключевых класса будут подробно рассмотрены ниже.

1.1 Конструктор

ScheduledThreadPoolExecutor имеет следующие конструкторы:

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
};
public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory);
};
public ScheduledThreadPoolExecutor(int corePoolSize,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), handler);
};
public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}

Видно, что поскольку ScheduledThreadPoolExecutor наследует ThreadPoolExecutor, его метод построения фактически вызывает ThreadPoolExecutor, и введение ThreadPoolExecutor может бытьВы можете увидеть эту статью, Разобравшись со значением нескольких параметров конструктора ThreadPoolExecutor, разобраться несложно. Видно, что количество потоков в пуле основных потоков ScheduledThreadPoolExecutor равно указанному corePoolSize. Когда количество потоков в пуле основных потоков достигает corePoolSize, задача будет отправлена ​​в ограниченную очередь блокировки DelayedWorkQueue. Описание DelayedWorkQueue подробно ниже Максимальное количество потоков, разрешенное пулом потоков, равно Integer.MAX_VALUE, что означает, что теоретически это неограниченный пул потоков.

1.2 Уникальные методы

ScheduledThreadPoolExecutor реализуетScheduledExecutorServiceинтерфейс, определяющийУникальные функции асинхронных задач, которые можно выполнять с задержкой, и асинхронных задач, которые можно выполнять периодически., соответствующие методы:

//达到给定的延时时间后,执行任务。这里传入的是实现Runnable接口的任务,
//因此通过ScheduledFuture.get()获取结果为null
public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);
//达到给定的延时时间后,执行任务。这里传入的是实现Callable接口的任务,
//因此,返回的是任务的最终计算结果
 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);
//是以上一个任务开始的时间计时,period时间过去后,
//检测上一个任务是否执行完毕,如果上一个任务执行完毕,
//则当前任务立即执行,如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
//当达到延时时间initialDelay后,任务开始执行。上一个任务执行结束后到下一次
//任务执行,中间延时时间间隔为delay。以这种方式,周期性执行任务。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

2. Задачи, которые можно выполнять периодически --- ScheduledFutureTask

SchedleledThreadPoolExecutor Наиболее важным функциями является возможность периодически выполнения асинхронных задач при вызовеschedule,scheduleAtFixedRate和scheduleWithFixedDelay方法Когда на самом деле это класс планируемого измельчения, который преобразует представленную задачу, как видно из исходного кода. Возьмите метод расписания в качестве примера:

public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}

Видно, что черезdecorateTaskпреобразует входящий Runnable вScheduledFutureTaskсвоего рода. Самая большая роль пула потоков заключается в разделении задач и потоков.Потоки в основном являются исполнителями задач, а задачи теперь называются ScheduledFutureTask. Сразу после этого любой поток, который думает о выполнении задачи, всегда будет вызыватьrun()метод. Чтобы гарантировать, что ScheduledThreadPoolExecutor может задерживать выполнение задачи и периодически выполнять задачи, ScheduledFutureTask переопределяет метод запуска:

public void run() {
    boolean periodic = isPeriodic();
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    else if (!periodic)
		//如果不是周期性执行任务,则直接调用run方法
        ScheduledFutureTask.super.run();
		//如果是周期性执行任务的话,需要重设下一次执行任务的时间
    else if (ScheduledFutureTask.super.runAndReset()) {
        setNextRunTime();
        reExecutePeriodic(outerTask);
    }
}

Из исходного кода видно, что в переписанном методе запускаif (!periodic)Определить, является ли текущая задача периодической задачей, если нет, вызвать ее напрямуюrun()方法; иначе выполнитьsetNextRunTime()метод для сброса времени выполнения следующей задачи и передачиreExecutePeriodic(outerTask)способ поместить следующую задачу для выполнения вDelayedWorkQueueсередина.

Таким образом, можно сделать вывод, что:ScheduledFutureTaskОсновная функция заключается в дальнейшей инкапсуляции асинхронных задач в зависимости от того, является ли текущая задача периодической. Если это не периодическая задача (вызов метода расписания), передать ее напрямуюrun()Выполните, если это периодическая задача, вам необходимо сбросить время следующего выполнения после каждого выполнения, а затем продолжать поместить следующую задачу в очередь блокировки.

3. DelayedWorkQueue

Еще одним важным классом в ScheduledThreadPoolExecutor является DelayedWorkQueue. Чтобы понять, что его ScheduledThreadPoolExecutor может задерживать выполнение асинхронных задач и периодически выполнять задачи, DelayedWorkQueue соответствующим образом инкапсулируется. DelayedWorkQueue — это структура данных на основе кучи, аналогичная DelayQueue и PriorityQueue. При выполнении задач на время время выполнения каждой задачи разное, поэтому задача DelayedWorkQueue состоит в том, чтобы расположить в порядке возрастания времени выполнения, а задача, время выполнения которой ближе к текущему времени, находится в начале очереди.

Зачем использовать DelayedWorkQueue?

Когда выполняется запланированная задача, необходимо удалить задачу, которая должна быть выполнена в последнее время, поэтому каждый раз, когда задача удаляется из очереди в очереди, она должна быть той, у которой наибольшее время выполнения в текущей очереди, поэтому это естественно использовать приоритетную очередь.

DelayedWorkQueue — это очередь с приоритетом, которая может гарантировать, что каждая задача, удаленная из очереди, является самым продвинутым временем выполнения в текущей очереди.Поскольку это очередь, основанная на структуре кучи, наихудшее время структуры кучи при выполнении операций вставки и удаления. равно O(logN).

Структура данных DelayedWorkQueue

//初始大小
private static final int INITIAL_CAPACITY = 16;
//DelayedWorkQueue是由一个大小为16的数组组成,数组元素为实现RunnableScheduleFuture接口的类
//实际上为ScheduledFutureTask
private RunnableScheduledFuture<?>[] queue =
    new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;

Видно, что нижний слой DelayedWorkQueue состоит из массивов.DelayedWorkQueue может прочитать статью этого блоггера, очень подробно.

Относительно DelayedWorkQueue можно сделать следующие выводы:DelayedWorkQueue — это структура данных на основе кучи, которая сортирует каждую задачу в хронологическом порядке и ставит задачу с более коротким временем выполнения в начало очереди, чтобы она могла быть выполнена первой..

4. Процесс выполнения ScheduledThreadPoolExecutor

Теперь мы понимаем два внутренних класса ScheduledFutueTask и DelayedWorkQueue ScheduledThreadPoolExecutor, которые на самом деле являются двумя наиболее важными ключевыми факторами в рабочем процессе пула потоков:Задачи и очереди блокировки. Теперь после того, как мы посмотрим на планируемуюthreadPoolExecutor, отправьте процесс выполнения задач в целом. Способ планируемогоTREADPoolExecutor Чтобы запланировать пример, конкретный исходный код для:

public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
	//将提交的任务转换成ScheduledFutureTask
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    //延时执行任务ScheduledFutureTask
	delayedExecute(t);
    return t;
}

Метод легко понять, может быть отложен для выполнения задач ScheduledThreadPoolExecutor и характеристики цикла могут выполнять задачу, класс реализации сначала преобразуется в интерфейс Runnable ScheduledFutureTask. Затем звонкиdelayedExecuteметод для выполнения задач, этот метод также является ключевым методом, посмотрите исходный код:

private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
		//如果当前线程池已经关闭,则拒绝任务
        reject(task);
    else {
		//将任务放入阻塞队列中
        super.getQueue().add(task);
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
			//保证至少有一个线程启动,即使corePoolSize=0
            ensurePrestart();
    }
}

delayedExecuteПожалуйста, смотрите комментарии для основной логики метода.Видно, что важная логика метода будет вensurePrestart()метод, его исходный код:

void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

Видно, что логика этого метода очень проста, ключ в том, что он вызываетaddWorker方法, основная функция этого метода:новыйWorker类, когда задача будет выполнена, она вызоветWorker所重写的run方法, который будет продолжать выполнятьсяrunWorkerметод. существуетrunWorkerметод будет вызыватьсяgetTaskМетод постоянно получает задачи из очереди блокировки для выполнения, пока задача, полученная из очереди блокировки, не станет нулевой, поток завершится и завершится.. Метод addWorker — это метод класса ThreadPoolExecutor,Для анализа исходного кода ThreadPoolExecutor вы можете прочитать эту статью, которая очень подробная.

5. Резюме

  1. ScheduledThreadPoolExecutor inherits the ThreadPoolExecutor class. Therefore, the overall function is the same. The thread pool is mainly responsible for creating threads (Worker class), and the threads continuously obtain new asynchronous tasks from the blocking queue until there are no asynchronous tasks in the blocking очередь.但是相较于 ThreadPoolExecutor 来说,ScheduledThreadPoolExecutor 具有延时执行任务和可周期性执行任务的特性,ScheduledThreadPoolExecutor 重新设计了任务类ScheduleFutureTask, ScheduleFutureTask переписанrunМетод придает ему характеристики отложенного выполнения и периодического выполнения задач. Кроме того, очередь блокировкиDelayedWorkQueueЭто очередь, которая может быть отсортирована в соответствии с приоритетом и принимает базовую структуру данных кучи, так что задача, время выполнения которой ближе к текущему времени, помещается в начало очереди, чтобы поток мог получить задание на выполнение;

  2. Независимо от того, является ли пул потоков ThreadPoolExecutor или ScheduledThreadPoolExecutor, тремя ключевыми элементами схемы являются: задача, исполнитель и результат задачи. Их дизайнерская идея также состоит в том, чтобы полностью разделить эти три ключевых элемента.

    Исполнитель

    Механизм исполнения задачи полностью переданWorker类, который дополнительно инкапсулирует Thread. Чтобы отправить задачу в пул потоков, будь то метод выполнения и метод отправки ThreadPoolExecutor или метод расписания ScheduledThreadPoolExecutor, задача сначала перемещается в очередь блокировки, а затем с помощью метода addWork создается новый класс Work. , а поток запускается через метод runWorker, и Continuous получает асинхронные задачи из очереди блокировки на выполнение и передает их Worker для выполнения до тех пор, пока задача не может быть получена из очереди блокировки.

    Задача

    В ThreadPoolExecutor и ScheduledThreadPoolExecutor задачи относятся к классам реализации, реализующим интерфейсы Runnable и Callable. ThreadPoolExecutor преобразует задачи вFutureTaskкласс, а в ScheduledThreadPoolExecutor. Чтобы реализовать характеристики отложенного выполнения задачи и периодического выполнения задачи, задача будет преобразована вScheduledFutureTaskкласс, который наследует FutureTask и переопределяет метод запуска.

    Результат миссии

    После отправки задачи в ThreadPoolExecutor результат выполнения задачи можно получить через класс интерфейса Future, в ThreadPoolExecutor это фактически класс FutureTask, а в ScheduledThreadPoolExecutor —ScheduledFutureTaskсвоего рода