Параллельное программирование — ScheduledThreadPoolExecutor

Java задняя часть программист

1. Введение

В предыдущей статье мы представили класс задач таймера Timer, который появился в JDK 1.3 и находится в пакете java.util. А сегодня сказалScheduledThreadPoolExecutorОн находится в пакете JUC, недавно добавленном в JDK1.5.

Сегодня поговорим об этом классе.

2. Введение в API

Внутренняя структура класса иTimerЕще немного похоже, тоже 3 класса:

  • ScheduledThreadPoolExecutor: Интерфейс, используемый программистами.
  • DelayedWorkQueue: Очередь для хранения задач.
  • ScheduledFutureTask: поток, выполняющий задачу.

Введение в методы строительства:

// 使用给定核心池大小创建一个新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize)  
// 使用给定初始参数创建一个新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler)  
// 使用给定的初始参数创建一个新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory)  
// 使用给定初始参数创建一个新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler)  

ScheduledThreadPoolExecutorПоддерживается до 3 параметров: количество основных потоков, фабрика потоков и политика отклонения.

Почему нет максимального количества потоков? так какScheduledThreadPoolExecutorВнутри безграничная очередь,maximumPoolSizeЭто больше не имеет смысла.

Позвольте мне снова представить его метод API, пожалуйста, простите меня за копирование документации JDK, просто относитесь к этому как к памятке следующим образом:

protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) // 修改或替换用于执行 callable 的任务。
  
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) // 修改或替换用于执行 runnable 的任务。        
    
void execute(Runnable command) // 使用所要求的零延迟执行命令。  

boolean	getContinueExistingPeriodicTasksAfterShutdownPolicy() // 获取有关在此执行程序已 shutdown 的情况下、是否继续执行现有定期任务的策略。      
   
boolean	getExecuteExistingDelayedTasksAfterShutdownPolicy() // 获取有关在此执行程序已 shutdown 的情况下是否继续执行现有延迟任务的策略。  

BlockingQueue<Runnable>	getQueue() // 返回此执行程序使用的任务队列。     

boolean	remove(Runnable task) // 从执行程序的内部队列中移除此任务(如果存在),从而如果尚未开始,则其不再运行。     
  
<V> ScheduledFuture<V>	schedule(Callable<V> callable, long delay, TimeUnit unit) // 创建并执行在给定延迟后启用的 ScheduledFuture。    
    
ScheduledFuture<?>	schedule(Runnable command, long delay, TimeUnit unit) // 创建并执行在给定延迟后启用的一次性操作。  
     
ScheduledFuture<?>	scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) // 创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;也就是将在 initialDelay 后开始执行,然后在 initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推。 
     
ScheduledFuture<?>	scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) // 创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。  
   
void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) // 设置有关在此执行程序已 shutdown 的情况下是否继续执行现有定期任务的策略。  
   
void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) // 设置有关在此执行程序已 shutdown 的情况下是否继续执行现有延迟任务的策略。  
      
void shutdown() // 在以前已提交任务的执行中发起一个有序的关闭,但是不接受新任务。
    
List<Runnable>	shutdownNow() // 尝试停止所有正在执行的任务、暂停等待任务的处理,并返回等待执行的任务列表。  
     
<T> Future<T> submit(Callable<T> task) //  提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。 
   
Future<?> submit(Runnable task) //  提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。  
     
<T> Future<T> submit(Runnable task, T result) //  提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。

Наиболее часто используются следующие методы:

// 使用给定核心池大小创建一个新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize)  

// 创建并执行在给定延迟后启用的一次性操作。  
ScheduledFuture<?>	schedule(Runnable command, long delay, TimeUnit unit) 
  
// 创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;也就是将在 initialDelay 后开始执行,然后在 initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推。 
ScheduledFuture<?>	scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) 

// 创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。       
ScheduledFuture<?>	scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) 

В дополнение к конструктору по умолчанию есть 3scheduleметод. Разберем их внутреннюю реализацию.

3. Построить внутреннюю реализацию

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

Мы заинтересованы в этомDelayedWorkQueueОчередь. Он также является блокирующей очередью. Структура данных для этой очереди — куча. В то же время этоqueueТоже сопоставимо, какое сравнение? задача должна быть выполненаcompareToметод, логика сравнения этого метода такова: сравните время выполнения задачи, если время выполнения задачи одинаковое, сравните время присоединения задачи.

следовательно,ScheduledFutureTaskЕсть 2 переменные:

  • time: время выполнения задачи.
  • sequenceNumber: время присоединения задачи.

Эти две переменные используются для сравнения порядка выполнения задач. Порядок всего планирования — это логика.

4. Разница между несколькими методами расписания

Просто сказал, что есть 3scheduleметод:

  1. ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)Создает и выполняет одноразовое действие, включенное после заданной задержки.

  2. ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)Создает и выполняет периодическое действие, которое сначала включается после заданной начальной задержки, а последующие действия имеют заданный период;initialDelayначать выполнение послеinitialDelay+periodпосле исполнения, то вinitialDelay + 2 * periodпосле казни и так далее.

  3. ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)Создает и выполняет периодическое действие, которое сначала включается после заданной начальной задержки, за которой следует заданная задержка между завершением каждого выполнения и началом следующего.

Первый метод выполняется через заданное время и завершается после выполнения.

Интересно то, что второй метод и третий метод напрямую отличаются.

Оба метода можно вызывать повторно. Однако логика повторных вызовов другая, и именно здесь лучше использовать Timer.

Что у них общего, так это:Перед выполнением следующей задачи необходимо дождаться завершения предыдущей задачи.

Разница в следующем:График у них примерно разный.

scheduleAtFixedRateПериод выполнения метода фиксирован, то есть он использует время начала выполнения предыдущей задачи в качестве отправной точки, а также период времени после этого для планирования следующей задачи.

scheduleWithFixedDelayМетод заключается в использовании времени окончания предыдущей задачи в качестве отправной точки, добавлении времени периода после этого и планировании следующей задачи.

какие отличия есть?

Если время выполнения задачи очень короткое, то разницы выше нет. Однако, если время выполнения задачи очень большое, превышающее время периода, то вылезает разница.

Предположим.

мы установилиperiodВремя составляет 2 секунды, в то время как задача занимает 5 секунд.

В этом разница между двумя методами.

scheduleAtFixedRateМетод будет выполняться сразу после окончания предыдущей задачи, а интервал между ним и временем начала выполнения предыдущей задачи составляет5секунд (потому что он должен дождаться завершения предыдущей задачи).

scheduleWithFixedDelayМетод будет выполнен после окончания предыдущей задачи.Примечание: **Подождите еще 2 секунды, прежде чем ** начнет выполняться, тогда интервал между ним и временем начала выполнения предыдущей задачи составляет7второй.

Итак, мы используемScheduledThreadPoolExecutorВ процессе следует учитывать, что время выполнения задачи не может превышать время интервала, если оно превышает, то лучше использоватьscheduleAtFixedRateметод предотвращения накопления задач.

Конечно, это также зависит от конкретного бизнеса. Не могу обобщать. Но обязательно обратите внимание на разницу между двумя методами.

5. Реализация планового метода

Давайте посмотрим на внутреннюю реализацию метода scheduleAtFixedRate.

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

СоздаватьScheduledFutureTaskобъект, затем украсьте этоFuture, реализация этого класса возвращается напрямую, а у подклассов может быть своя реализация, украшающая слой вне этой задачи.

затем выполнитьdelayedExecuteметод, который, наконец, возвращаетFuture.

этоScheduledFutureTaskРеализовано множество интерфейсов, таких какFuture,Runnable,Comparable,DelayedЖдать.

ScheduledFutureTaskстроится следующим образом:

ScheduledFutureTask(Runnable r, V result, long ns, long period) {
    super(r, result);
    this.time = ns;
    this.period = period;
    this.sequenceNumber = sequencer.getAndIncrement();
}

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
}

static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}

Слой за слоем класс сначала проходит атомарный статическийintНомер очереди задачи объекта, а затем создатьCallable, этот Callable является адаптером, который адаптируетсяRunnableиCallable, этоRunnableупаковано вcallabe, егоcallметод заключается в вызове метода запуска данной задачи. Конечно, здесьresultбесполезен.

Если вы проходитеcallable, тогда позвониFutureTaskизrunметод, который устанавливает реальное возвращаемое значение.

Здесь используется шаблон адаптера, что довольно интересно.

В общем, этоScheduledFutureTaskна основеFutureTask, оFutureTaskМы представили его из исходного кода ранее.

И сам переопределяет несколько методов:compareTo,getDelay,run,isPeriodic4 метода.

нам еще предстоит увидетьdelayedExecuteреализация.

private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        reject(task);
    else {
        // 添加进队列。
        super.getQueue().add(task);
        // 如果线程池关闭了,且不可以在当前状态下运行任务,且从队列删除任务成功,就给任务打上取消标记。
        // 第二个判断是由两个变量控制的(下面是默认值):
        // continueExistingPeriodicTasksAfterShutdown = false 表示关闭的时候应取消周期性任务。默认关闭
        // executeExistingDelayedTasksAfterShutdown = true。表示关闭的时候应取消非周期性的任务。默认不关闭。
        // running 状态下,canRunInCurrentRunState 必定返回 ture。
        // 非 running 状态下,canRunInCurrentRunState 根据上面的两个值返回。
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            // 开始执行任务
            ensurePrestart();
    }
}

Расскажите о вышеописанном методе.

  1. Определить, следует ли закрывать, если закрыто, то задача отклоняется.
  2. Если он не находится в закрытом состоянии, он добавляется в очередь, и порядок добавления очереди был указан ранее, согласноScheduledFutureTaskизcompareToПриходит метод, сначала сравните время выполнения, а потом сравните порядок добавления.
  3. Если во время этого процесса пул потоков закрывается, судят о том, следует ли отменять задачу в это время, по двум переменным, которые написаны в комментариях. Стратегия по умолчанию — отменить, если это периодическая задача, и не отменять в противном случае.
  4. Если пул потоков не закрыт. Просто вызовите поток в пуле потоков, чтобы выполнить задачу.

Общий процесс показан на рисунке:

image.png

Обратите внимание на картинку выше, если это периодическая задача, очередь будет возвращена после завершения выполнения.

Где это можно увидеть?

ScheduledFutureTaskизrunметод:

public void run() {
    // 是否是周期性任务
    boolean periodic = isPeriodic();
    // 如果不可以在当前状态下运行,就取消任务(将这个任务的状态设置为CANCELLED)。
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    // 如果不是周期性的任务,调用 FutureTask # run 方法
    else if (!periodic)
        ScheduledFutureTask.super.run();
    // 如果是周期性的。
    // 执行任务,但不设置返回值,成功后返回 true。(callable 不可以重复执行)
    else if (ScheduledFutureTask.super.runAndReset()) {
        // 设置下次执行时间
        setNextRunTime();
        // 再次将任务添加到队列中
        reExecutePeriodic(outerTask);
    }
}

Логика следующая:

  1. Если он больше не может работать в текущем состоянии, отмените задачу.
  2. Если это не периодическая задача, выполнитеFutureTaskизrunметод.
  3. Если это периодическая задача, ее необходимо выполнитьrunAndResetметод.
  4. После завершения выполнения перепишите и установите следующее время выполнения текущей задачи, а затем добавьте ее в очередь.

И управление всем процессом исполненияScheduledThreadPoolExecutorродительский классThreadPoolExecutorизrunWorkerметод. Среди них метод будет брать данные из очереди, то есть очереди вызововtakeметод.

оDelayedWorkQueueизtakeметод, один из которыхleaderПеременная, если лидер не пустой, значит уже есть ожидающий поток, то блокируем текущий поток, если он пустой, значит первый элемент очереди обновился, устанавливаем текущий поток вleader.

ЭтоLeader-Followerобразец, говорит Дуг Ли.

Конечно,takeОбщая логика метода остается прежней. Получить данные из головы очереди. использоватьConditionСделайте координацию между потоками.

5. Резюме

оScheduledThreadPoolExecutorРасписание уроков мы почти разобрали, так что давайте подведем итоги.

ScheduledThreadPoolExecutorЭто пул потоков задач по времени, похожий наTimer, но чемTimerСильный и крепкий.

Например, в отличие от Таймера, задача нештатная, а вся система планирования совершенно бесполезна.

чемTimerпереборRateмодель(Rate 和 Delay).

Отличие этих двух режимов в том, что время начала выполнения задачи разное.RateРассчитывается от времени начала выполнения предыдущей задачи;Delayрассчитывается от времени окончания предыдущей задачи.

Поэтому, если время самой задачи превышает время интервала, время интервала двух режимов не будет согласовано.

А порядок задач осуществляетсяScheduledFutureTaskизcompareToМетод отсортирован, правило заключается в том, чтобы сначала сравнить время выполнения, а если время совпадает, то сравнить время соединения.

Следует также отметить, что если во время выполнения задачи возникнет исключение, оно больше не повторится. так какScheduledFutureTaskизrunметод не делалcatchиметь дело с. Следовательно, программист должен обрабатывать его вручную, что намного лучше, чем исключение Timer, которое напрямую обходится системе планирования.

ScheduledThreadPoolExecutorреализуется на основеThreadPoolExecutor, большинство функций повторно используются в родительском классе,Просто сбросьте время после окончания выполнения и снова верните задачу в очередь, формируя задание на время.