1. Введение
В предыдущей статье мы представили класс задач таймера Timer, который появился в JDK 1.3 и находится в пакете java.util. А сегодня сказалScheduledThreadPoolExecutor
Он находится в пакете JUC, недавно добавленном в JDK1.5.
Сегодня поговорим об этом классе.
2. Введение в API
Внутренняя структура класса иTimer
Еще немного похоже, тоже 3 класса:
-
ScheduledThreadPoolExecutor
: Интерфейс, используемый программистами. -
DelayedWorkQueue
: Очередь для хранения задач. -
ScheduledFutureTask
: поток, выполняющий задачу.
Введение в методы строительства:
// 使用给定核心池大小创建一个新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize)
// 使用给定初始参数创建一个新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler)
// 使用给定的初始参数创建一个新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory)
// 使用给定初始参数创建一个新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler)
ScheduledThreadPoolExecutor
Поддерживается до 3 параметров: количество основных потоков, фабрика потоков и политика отклонения.
Почему нет максимального количества потоков? так какScheduledThreadPoolExecutor
Внутри безграничная очередь,maximumPoolSize
Это больше не имеет смысла.
Позвольте мне снова представить его метод API, пожалуйста, простите меня за копирование документации JDK, просто относитесь к этому как к памятке следующим образом:
protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) // 修改或替换用于执行 callable 的任务。
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) // 修改或替换用于执行 runnable 的任务。
void execute(Runnable command) // 使用所要求的零延迟执行命令。
boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() // 获取有关在此执行程序已 shutdown 的情况下、是否继续执行现有定期任务的策略。
boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() // 获取有关在此执行程序已 shutdown 的情况下是否继续执行现有延迟任务的策略。
BlockingQueue<Runnable> getQueue() // 返回此执行程序使用的任务队列。
boolean remove(Runnable task) // 从执行程序的内部队列中移除此任务(如果存在),从而如果尚未开始,则其不再运行。
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) // 创建并执行在给定延迟后启用的 ScheduledFuture。
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) // 创建并执行在给定延迟后启用的一次性操作。
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) // 创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;也就是将在 initialDelay 后开始执行,然后在 initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推。
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) // 创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。
void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) // 设置有关在此执行程序已 shutdown 的情况下是否继续执行现有定期任务的策略。
void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) // 设置有关在此执行程序已 shutdown 的情况下是否继续执行现有延迟任务的策略。
void shutdown() // 在以前已提交任务的执行中发起一个有序的关闭,但是不接受新任务。
List<Runnable> shutdownNow() // 尝试停止所有正在执行的任务、暂停等待任务的处理,并返回等待执行的任务列表。
<T> Future<T> submit(Callable<T> task) // 提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。
Future<?> submit(Runnable task) // 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。
<T> Future<T> submit(Runnable task, T result) // 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。
Наиболее часто используются следующие методы:
// 使用给定核心池大小创建一个新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize)
// 创建并执行在给定延迟后启用的一次性操作。
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
// 创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;也就是将在 initialDelay 后开始执行,然后在 initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推。
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
// 创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
В дополнение к конструктору по умолчанию есть 3schedule
метод. Разберем их внутреннюю реализацию.
3. Построить внутреннюю реализацию
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
Мы заинтересованы в этомDelayedWorkQueue
Очередь. Он также является блокирующей очередью. Структура данных для этой очереди — куча. В то же время этоqueue
Тоже сопоставимо, какое сравнение? задача должна быть выполненаcompareTo
метод, логика сравнения этого метода такова: сравните время выполнения задачи, если время выполнения задачи одинаковое, сравните время присоединения задачи.
следовательно,ScheduledFutureTask
Есть 2 переменные:
-
time
: время выполнения задачи. -
sequenceNumber
: время присоединения задачи.
Эти две переменные используются для сравнения порядка выполнения задач. Порядок всего планирования — это логика.
4. Разница между несколькими методами расписания
Просто сказал, что есть 3schedule
метод:
-
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
Создает и выполняет одноразовое действие, включенное после заданной задержки. -
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
Создает и выполняет периодическое действие, которое сначала включается после заданной начальной задержки, а последующие действия имеют заданный период;initialDelay
начать выполнение послеinitialDelay+period
после исполнения, то вinitialDelay + 2 * period
после казни и так далее. -
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
Создает и выполняет периодическое действие, которое сначала включается после заданной начальной задержки, за которой следует заданная задержка между завершением каждого выполнения и началом следующего.
Первый метод выполняется через заданное время и завершается после выполнения.
Интересно то, что второй метод и третий метод напрямую отличаются.
Оба метода можно вызывать повторно. Однако логика повторных вызовов другая, и именно здесь лучше использовать Timer.
Что у них общего, так это:Перед выполнением следующей задачи необходимо дождаться завершения предыдущей задачи.
Разница в следующем:График у них примерно разный.
scheduleAtFixedRate
Период выполнения метода фиксирован, то есть он использует время начала выполнения предыдущей задачи в качестве отправной точки, а также период времени после этого для планирования следующей задачи.
scheduleWithFixedDelay
Метод заключается в использовании времени окончания предыдущей задачи в качестве отправной точки, добавлении времени периода после этого и планировании следующей задачи.
какие отличия есть?
Если время выполнения задачи очень короткое, то разницы выше нет. Однако, если время выполнения задачи очень большое, превышающее время периода, то вылезает разница.
Предположим.
мы установилиperiod
Время составляет 2 секунды, в то время как задача занимает 5 секунд.
В этом разница между двумя методами.
scheduleAtFixedRate
Метод будет выполняться сразу после окончания предыдущей задачи, а интервал между ним и временем начала выполнения предыдущей задачи составляет5секунд (потому что он должен дождаться завершения предыдущей задачи).
scheduleWithFixedDelay
Метод будет выполнен после окончания предыдущей задачи.Примечание: **Подождите еще 2 секунды, прежде чем ** начнет выполняться, тогда интервал между ним и временем начала выполнения предыдущей задачи составляет7второй.
Итак, мы используемScheduledThreadPoolExecutor
В процессе следует учитывать, что время выполнения задачи не может превышать время интервала, если оно превышает, то лучше использоватьscheduleAtFixedRate
метод предотвращения накопления задач.
Конечно, это также зависит от конкретного бизнеса. Не могу обобщать. Но обязательно обратите внимание на разницу между двумя методами.
5. Реализация планового метода
Давайте посмотрим на внутреннюю реализацию метода scheduleAtFixedRate.
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
СоздаватьScheduledFutureTask
объект, затем украсьте этоFuture
, реализация этого класса возвращается напрямую, а у подклассов может быть своя реализация, украшающая слой вне этой задачи.
затем выполнитьdelayedExecute
метод, который, наконец, возвращаетFuture
.
этоScheduledFutureTask
Реализовано множество интерфейсов, таких какFuture
,Runnable
,Comparable
,Delayed
Ждать.
ScheduledFutureTask
строится следующим образом:
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
Слой за слоем класс сначала проходит атомарный статическийint
Номер очереди задачи объекта, а затем создатьCallable
, этот Callable является адаптером, который адаптируетсяRunnable
иCallable
, этоRunnable
упаковано вcallabe
, егоcall
метод заключается в вызове метода запуска данной задачи. Конечно, здесьresult
бесполезен.
Если вы проходитеcallable
, тогда позвониFutureTask
изrun
метод, который устанавливает реальное возвращаемое значение.
Здесь используется шаблон адаптера, что довольно интересно.
В общем, этоScheduledFutureTask
на основеFutureTask
, оFutureTask
Мы представили его из исходного кода ранее.
И сам переопределяет несколько методов:compareTo
,getDelay
,run
,isPeriodic
4 метода.
нам еще предстоит увидетьdelayedExecute
реализация.
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
// 添加进队列。
super.getQueue().add(task);
// 如果线程池关闭了,且不可以在当前状态下运行任务,且从队列删除任务成功,就给任务打上取消标记。
// 第二个判断是由两个变量控制的(下面是默认值):
// continueExistingPeriodicTasksAfterShutdown = false 表示关闭的时候应取消周期性任务。默认关闭
// executeExistingDelayedTasksAfterShutdown = true。表示关闭的时候应取消非周期性的任务。默认不关闭。
// running 状态下,canRunInCurrentRunState 必定返回 ture。
// 非 running 状态下,canRunInCurrentRunState 根据上面的两个值返回。
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// 开始执行任务
ensurePrestart();
}
}
Расскажите о вышеописанном методе.
- Определить, следует ли закрывать, если закрыто, то задача отклоняется.
- Если он не находится в закрытом состоянии, он добавляется в очередь, и порядок добавления очереди был указан ранее, согласно
ScheduledFutureTask
изcompareTo
Приходит метод, сначала сравните время выполнения, а потом сравните порядок добавления. - Если во время этого процесса пул потоков закрывается, судят о том, следует ли отменять задачу в это время, по двум переменным, которые написаны в комментариях. Стратегия по умолчанию — отменить, если это периодическая задача, и не отменять в противном случае.
- Если пул потоков не закрыт. Просто вызовите поток в пуле потоков, чтобы выполнить задачу.
Общий процесс показан на рисунке:
Обратите внимание на картинку выше, если это периодическая задача, очередь будет возвращена после завершения выполнения.
Где это можно увидеть?
ScheduledFutureTask
изrun
метод:
public void run() {
// 是否是周期性任务
boolean periodic = isPeriodic();
// 如果不可以在当前状态下运行,就取消任务(将这个任务的状态设置为CANCELLED)。
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 如果不是周期性的任务,调用 FutureTask # run 方法
else if (!periodic)
ScheduledFutureTask.super.run();
// 如果是周期性的。
// 执行任务,但不设置返回值,成功后返回 true。(callable 不可以重复执行)
else if (ScheduledFutureTask.super.runAndReset()) {
// 设置下次执行时间
setNextRunTime();
// 再次将任务添加到队列中
reExecutePeriodic(outerTask);
}
}
Логика следующая:
- Если он больше не может работать в текущем состоянии, отмените задачу.
- Если это не периодическая задача, выполните
FutureTask
изrun
метод. - Если это периодическая задача, ее необходимо выполнить
runAndReset
метод. - После завершения выполнения перепишите и установите следующее время выполнения текущей задачи, а затем добавьте ее в очередь.
И управление всем процессом исполненияScheduledThreadPoolExecutor
родительский классThreadPoolExecutor
изrunWorker
метод. Среди них метод будет брать данные из очереди, то есть очереди вызововtake
метод.
оDelayedWorkQueue
изtake
метод, один из которыхleader
Переменная, если лидер не пустой, значит уже есть ожидающий поток, то блокируем текущий поток, если он пустой, значит первый элемент очереди обновился, устанавливаем текущий поток вleader
.
ЭтоLeader-Follower
образец, говорит Дуг Ли.
Конечно,take
Общая логика метода остается прежней. Получить данные из головы очереди. использоватьCondition
Сделайте координацию между потоками.
5. Резюме
оScheduledThreadPoolExecutor
Расписание уроков мы почти разобрали, так что давайте подведем итоги.
ScheduledThreadPoolExecutor
Это пул потоков задач по времени, похожий наTimer
, но чемTimer
Сильный и крепкий.
Например, в отличие от Таймера, задача нештатная, а вся система планирования совершенно бесполезна.
чемTimer
переборRate
модель(Rate 和 Delay
).
Отличие этих двух режимов в том, что время начала выполнения задачи разное.Rate
Рассчитывается от времени начала выполнения предыдущей задачи;Delay
рассчитывается от времени окончания предыдущей задачи.
Поэтому, если время самой задачи превышает время интервала, время интервала двух режимов не будет согласовано.
А порядок задач осуществляетсяScheduledFutureTask
изcompareTo
Метод отсортирован, правило заключается в том, чтобы сначала сравнить время выполнения, а если время совпадает, то сравнить время соединения.
Следует также отметить, что если во время выполнения задачи возникнет исключение, оно больше не повторится. так какScheduledFutureTask
изrun
метод не делалcatch
иметь дело с. Следовательно, программист должен обрабатывать его вручную, что намного лучше, чем исключение Timer, которое напрямую обходится системе планирования.
ScheduledThreadPoolExecutor
реализуется на основеThreadPoolExecutor
, большинство функций повторно используются в родительском классе,Просто сбросьте время после окончания выполнения и снова верните задачу в очередь, формируя задание на время.