Реализация Java завершает выполнение запланированных задач в пуле потоков

интервью Java Netty

от развития

Недавно в проекте появилось новое требование, заключающееся в реализации функции, которая может динамически добавлять временные задачи. Сказав это, некоторые люди могут сказать, что это просто, просто используйте кварц, просто и грубо. Однако кварцевый каркас слишком тяжел, а небольшие проекты совсем не просты в эксплуатации. Конечно, некоторые люди скажут, что jdk предоставляет интерфейс таймера, которого вполне достаточно. Но требования нашего проекта — полностью многопоточная модель, а таймер — однопоточный, поэтому в итоге арендодатель выбрал пул потоков jdk.

Что такое пул потоков

Java предоставляет четыре пула потоков через Executors, а именно: **newCachedThreadPool :** создание кэшируемого пула потоков. Если длина пула потоков превышает требования к обработке, простаивающие потоки можно гибко перезапускать. Если перезапуска нет, будут созданы новые потоки.newFixedThreadPool :Создайте пул потоков фиксированной длины, чтобы контролировать максимальное количество одновременных потоков. Превышающие потоки будут ждать в очереди.newScheduledThreadPool :Создайте пул потоков фиксированной длины для поддержки запланированного и периодического выполнения задач.newSingleThreadExecutor :Создайте однопоточный пул потоков, который использует только один рабочий поток для выполнения задач, гарантируя, что все задачи выполняются в указанном порядке (FIFO, LIFO, приоритет).

NewScheduledThreadPool, используемый в проекте арендодателя, вот и все, сколько бы арендодателей ни было на работе, погуглите, их много.

Получение службы пула потоков

Арендодатель получает услугу пула потоков через режим singleton. Код выглядит следующим образом:

/**
 * 线程池创建.
 * @author wuhf
 * @date 2018/01/16
 */
public class ThreadPoolUtils {

    private static ScheduledExecutorService executorService;

    private ThreadPoolUtils() {
        //手动创建线程池.
        executorService = new ScheduledThreadPoolExecutor(10,
                new BasicThreadFactory.Builder().namingPattern("syncdata-schedule-pool-%d").daemon(true).build());
    }

    private static class PluginConfigHolder {
        private final static ThreadPoolUtils INSTANCE = new ThreadPoolUtils();
    }

    public static ThreadPoolUtils getInstance() {
        return PluginConfigHolder.INSTANCE;
    }

    public ScheduledExecutorService getThreadPool(){
        return executorService;
    }

}

Прервать реализацию кода работающего потока

Без лишних слов, код выглядит следующим образом:

/**
 * 中断线程池的某个任务.
 */
public class InterruptThread implements Runnable {

    private int num;

    public InterruptThread (int num){
        this.num = num;
    }

    public static void main(String[] args) throws InterruptedException {

        Thread interruptThread = new Thread(new InterruptThread(1));
        ScheduledFuture<?> t = ThreadPoolUtils.getInstance().getThreadPool().scheduleAtFixedRate(interruptThread,0,2,
                TimeUnit.SECONDS);

        InterruptThread interruptThread1 = new InterruptThread(2);
        ThreadPoolUtils.getInstance().getThreadPool().scheduleAtFixedRate(interruptThread1,0,2,
                TimeUnit.SECONDS);

        InterruptThread interruptThread2 = new InterruptThread(3);
        ThreadPoolUtils.getInstance().getThreadPool().scheduleAtFixedRate(interruptThread2,0,2,
                TimeUnit.SECONDS);
        Thread.sleep(5000);

		//终止正在运行的线程interruptThread
        t.cancel(true);
        while (true){

        }
    }

    @Override
    public void run() {
        System.out.println("this is a thread" + num);
    }
}

Побить пит-рекорд

Когда домовладелец использовал следующий код, он внезапно подумал о том, как остановить выполнение потока, когда необходимо остановить запланированную задачу.

ThreadPoolUtils.getInstance().getThreadPool().scheduleAtFixedRate(interruptThread,0,2,
                TimeUnit.SECONDS);

Поскольку у меня есть такая потребность, давайте поищем в Google. Я искал большую часть времени, но не могу найти никакой соответствующей информации. Все они представляют собой глубокий анализ пула потоков Java. Либо глобальные переменные, либо еще что-то, а решение, удовлетворяющее арендодателя, не найдено.

Поскольку потока нет, давайте посмотрим на базовый исходный код scheduleAtFixedRate, чтобы увидеть, что это такое. Конечно же, я видел конкретную реализацию метода scheduleAtFixedRate в исходном коде и обнаружил, что его возвращаемое значение — ScheduledFuture.

 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

Тогда давайте спустимся и посмотрим, что в ScheduledFuture, это не разочаровало арендодателя, я видел это

public boolean cancel(boolean mayInterruptIfRunning) {
            boolean cancelled = super.cancel(mayInterruptIfRunning);
            if (cancelled && removeOnCancel && heapIndex >= 0)
                remove(this);
            return cancelled;
}
            
//从线程的运行队列中移除当前线程
public boolean remove(Runnable task) {
        boolean removed = workQueue.remove(task);
        tryTerminate(); // In case SHUTDOWN and now empty
        return removed;
}

Поднимитесь и проверьте, что такое super.cancel(mayInterruptIfRunning), мы видим это,

//通过调用线程的interrupt方法终止线程运行
public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }

Здесь решаются все проблемы.

Подвести итог

В проекте всегда есть сложные решения.Когда Google не легко найти, это также может быть хорошим способом посмотреть исходный код jdk. Наконец, давайте опубликуем блог арендодателя. Блог был перенесен в Tencent Cloud и управляется несколькими университетами-партнерами. Содержание охватывает Java, Android и т. д. Если вы заинтересованы, пожалуйста, продолжайте.Блог арендодателя и его друзей