Анализ исходного кода пула потоков планирования Java ScheduledThreadPoolExecutor

Java исходный код

Класс ScheduledThreadPoolExecutor в последнее время широко используется в недавно приобретенном проекте для выполнения некоторых запланированных задач.У меня никогда раньше не было возможности изучить исходный код этого класса.В этот раз я воспользовался возможностью изучить его.

Оригинальный адрес:woo woo Краткое описание.com/afraid/18 hair 4 from 95AC…

Этот класс в основном основан на классе ThreadPoolExecutor для вторичной разработки, поэтому студентам, не знакомым с процессом выполнения пула потоков Java, рекомендуется сначала прочитать мою предыдущую статью.
Что следует знать, когда интервьюер спрашивает о пулах потоков?

1. Процесс исполнения

  1. В отличие от ThreadPoolExecutor, при отправке задачи в ScheduledThreadPoolExecutor задача упаковывается как объект ScheduledFutureTask для присоединения к очереди задержки и запуска потока пробуждения.

  2. Когда задачи, отправленные пользователем, добавляются в очередь отложенных, они будут упорядочены по времени выполнения, то есть задача в начале очереди должна быть выполнена раньше. Поток пробуждения получит задачу из очереди задержки, и если время выполнения задачи подошло, она начнет выполняться. В противном случае заблокируйте и подождите оставшееся время задержки, прежде чем пытаться получить задание.

  3. После выполнения задачи, если задача является задачей, которую необходимо выполнять периодически и неоднократно, она будет повторно добавлена ​​в очередь задержки после расчета следующего времени выполнения.

2. Углубленный анализ исходного кода

Сначала посмотрите на несколько конструкторов класса планируемого обратного управления:

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }

    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), handler);
    }

    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory, handler);
    }

Примечание. В конструкторах здесь используется super, который на самом деле является конструктором ThreadPoolExecutor.
Здесь следует отметить три вещи:

  1. Использование DelayedWorkQueue в качестве очереди блокировки недоступно для пользователей для пользовательских настроек, таких как класс ThreadPoolExecutor. Очередь — это основной компонент класса ScheduledThreadPoolExecutor, который будет подробно описан позже.
  2. Параметр maxPoolSize здесь недоступен для пользователей, потому что элементы в DelayedWorkQueue будут расширены, когда начальная вместимость превысит 16, то есть очередь не будет заполнена, и параметр maxPoolSize не будет действовать, даже если установлено.
  3. Рабочий поток не имеет времени перезапуска по той же причине, что и в пункте 2, поскольку операция перезапуска не запускается. Таким образом, время выживания потока здесь установлено равным 0.

Опять же: чтобы понять вышеприведенные три очка, сначала нужно понимать точки знаний ThreadPoolexecuteCutor.

Когда мы создадим пул расписания, вы можете начать отправку задачи. Здесь проанализируйте исходный код трех распространенных API:

Первый — метод расписания, что означает, что задача запускается после указанного времени задержки и будет выполняться только один раз.

    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        //参数校验
        if (command == null || unit == null)
            throw new NullPointerException();
        //这里是一个嵌套结构,首先把用户提交的任务包装成ScheduledFutureTask
        //然后在调用decorateTask进行包装,该方法是留给用户去扩展的,默认是个空方法
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        //包装好任务以后,就进行提交了
        delayedExecute(t);
        return t;
    }

Ориентируйтесь на исходный код отправленной задачи:

    private void delayedExecute(RunnableScheduledFuture<?> task) {
        //如果线程池已经关闭,则使用拒绝策略把提交任务拒绝掉
        if (isShutdown())
            reject(task);
        else {
            //与ThreadPoolExecutor不同,这里直接把任务加入延迟队列
            super.getQueue().add(task);
            //如果当前状态无法执行任务,则取消
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                //这里是增加一个worker线程,避免提交的任务没有worker去执行
                //原因就是该类没有像ThreadPoolExecutor一样,woker满了才放入队列
                ensurePrestart();
        }
    }

Ключевым моментом здесь является строка кода super.getQueue().add(task) Класс ScheduledThreadPoolExecutor внутренне реализует очередь задержки на основе структуры данных кучи. Метод add в конечном итоге попадет в метод offer, давайте посмотрим:

        public boolean offer(Runnable x) {
            //参数校验
            if (x == null)
                throw new NullPointerException();
            RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                //查看当前元素数量,如果大于队列长度则进行扩容
                int i = size;
                if (i >= queue.length)
                    grow();
                //元素数量加1
                size = i + 1;
                //如果当前队列还没有元素,则直接加入头部
                if (i == 0) {
                    queue[0] = e;
                    //记录索引
                    setIndex(e, 0);
                } else {
                    //把任务加入堆中,并调整堆结构,这里就会根据任务的触发时间排列
                    //把需要最早执行的任务放在前面
                    siftUp(i, e);
                }
                //如果新加入的元素就是队列头,这里有两种情况
                //1.这是用户提交的第一个任务
                //2.新任务进行堆调整以后,排在队列头
                if (queue[0] == e) {
                    //这个变量起优化作用,后面说
                    leader = null;
                    //加入元素以后,唤醒worker线程
                    available.signal();
                }
            } finally {
                lock.unlock();
            }
            return true;
        }

Благодаря вышеописанной логике мы успешно добавили отправленную задачу в очередь задержки.Как упоминалось ранее, после добавления задачи будет запущен поток waker.Задачей этого потока является непрерывное извлечение задач из очереди задержки на выполнение. Это то же самое, что и ThreadPoolExecutor, давайте посмотрим на исходный код для получения элементов из очереди задержки:

        public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    //取出队列中第一个元素,即最早需要执行的任务
                    RunnableScheduledFuture<?> first = queue[0];
                    //如果队列为空,则阻塞等待加入元素时唤醒
                    if (first == null)
                        available.await();
                    else {
                        //计算任务执行时间,这个delay是当前时间减去任务触发时间
                        long delay = first.getDelay(NANOSECONDS);
                        //如果到了触发时间,则执行出队操作
                        if (delay <= 0)
                            return finishPoll(first);
                        first = null; 
                        //这里表示该任务已经分配给了其他线程,当前线程等待唤醒就可以
                        if (leader != null)
                            available.await();
                        else {
                            //否则把给任务分配给当前线程
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                //当前线程等待任务剩余延迟时间
                                available.awaitNanos(delay);
                            } finally {
                                //这里线程醒来以后,什么时候leader会发生变化呢?
                                //就是上面的添加任务的时候
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                //如果队列不为空,则唤醒其他woker线程
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }

Почему здесь добавлена ​​переменная-лидер для распределения задач в очереди блокировки? Причина в том, чтобы сократить ненужное время ожидания. Например, первая задача в очереди будет выполнена через 1 минуту, затем пользователь продолжит присоединяться к рабочему потоку при отправке новых задач.Если все вновь отправленные задачи поставлены в очередь в конце очереди, то есть , новый воркер теперь возьмет эту задачу.Первая задача ждет время задержки выполнения.Когда задача достигнет времени триггера, она разбудит много рабочих потоков, что явно не нужно.

Когда задача будет вынесена потоком Woker, будет выполнен метод RUN.Поскольку задача упакована в объект ScheduledFutureTask, давайте посмотрим на метод RUN этого класса:

        public void run() {
            boolean periodic = isPeriodic();
            //如果当前线程池已经不支持执行任务,则取消
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                //如果不需要周期性执行,则直接执行run方法然后结束
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {
                //如果需要周期执行,则在执行完任务以后,设置下一次执行时间
                setNextRunTime();
                //把任务重新加入延迟队列
                reExecutePeriodic(outerTask);
            }
        }

Выше приведен полный процесс выполнения метода расписания.

Класс ScheduledThreadPoolExecutor предоставляет два метода для периодического выполнения задач, scheduleAtFixedRate и scheduleWithFixedDelay Давайте посмотрим на разницу.

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        //删除不必要的逻辑,重点看区别
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          //二者唯一区别
                                          unit.toNanos(period));
        //...
    }

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        //...
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          //二者唯一区别
                                          unit.toNanos(-delay));
       //..
    }

Первый передает время задержки цикла в ScheduledFutureTask, а второй устанавливает его как отрицательное число для передачи. В чем разница? Посмотрите исходный код метода setNextRunTime, который задает следующее время выполнения задачи в отделочных работах после завершения выполнения задачи:

        private void setNextRunTime() {
            long p = period;
            //大于0是scheduleAtFixedRate方法,表示执行时间是根据初始化参数计算的
            if (p > 0)
                time += p;
            else
            //小于0是scheduleWithFixedDelay方法,表示执行时间是根据当前时间重新计算的
                time = triggerTime(-p);
        }

То есть, когда задача отправляется с использованием метода scheduleAtFixedRate, было определено время задержки для последующего выполнения задачи, которое составляет InitialDelay, InitialDelay + период, InitialDelay + 2 * период и так далее.
Когда для отправки задачи вызывается метод scheduleWithFixedDelay, время задержки первого выполнения равно initialDelay, а время каждого последующего выполнения — это момент времени после завершения выполнения предыдущей задачи плюс период задержки выполнения.

3. Резюме

Можно сказать, что ScheduledThreadPoolExecutor выполняет некоторые расширенные операции с ThreadPoolExecutor, он просто переупаковывает задачи и блокирует очереди. Блокирующая очередь DelayedWorkQueue этого класса реализована на основе кучи. В этой статье подробно не рассматривается настройка вставки и удаления данных структуры кучи. Заинтересованные учащиеся могут приватно отправлять сообщения или оставлять комментарии.