Ядро Java (2) Глубокое понимание пула потоков ThreadPool

Java задняя часть GitHub исходный код

Threadpool

В этой статье вы получите следующую информацию:

  • Интерпретация исходного кода пула потоков
  • Анализ потока выполнения пула потоков
  • Реализация пула потоков с возвращаемым значением
  • Отложенная реализация пула потоков

Чтобы облегчить понимание читателей, эта статья будет начинаться с более мелкого к более глубокому, начиная с использования пулов потоков, а затем расширяя до расширенного содержания, такого как интерпретация исходного кода и анализ исходного кода, Читатели могут выбрать порядок чтения и главы, которые им нужно понять в соответствии с их собственной ситуацией.

1. Преимущества пулов потоков

Пул потоков может более полно использовать системные ресурсы, такие как ЦП, память, сеть, ввод-вывод и т. д. Основные функции пула потоков заключаются в следующем:

  • Пул потоков можно использовать для повторного использования потоков и управления максимальным числом параллелизма;
  • Реализовать стратегию кэширования задач и механизм отклонения;
  • Реализовать отложенное выполнение

В Руководстве по разработке Java для Alibaba указано, что ресурсы потоков должны предоставляться через пул потоков, как показано на следующем рисунке:

线程池规定

Во-вторых, использование пула потоков

В этом разделе будут представлены создание и использование семи пулов потоков, состояние пулов потоков и введение в параметры ThreadPoolExecutor.

2.1 Создание пула потоков

Пулы потоков могут использовать Executors и ThreadPoolExecutors.Существует шесть способов создания пулов потоков с использованием Executors, как показано ниже:

线程池创建

// 使用Executors方式创建
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(2);
ScheduledExecutorService singleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);
ExecutorService workStealingPool = Executors.newWorkStealingPool();
// 原始创建方式
ThreadPoolExecutor tp = new ThreadPoolExecutor(10, 10, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());

2.1.1 Интерпретация пула потоков

  1. newSingleThreadExecutor(), который характеризуется тем, что количество рабочих потоков ограничено 1, оперирует неограниченной рабочей очередью, поэтому гарантирует, что все задачи выполняются последовательно, максимум одна задача будет активной, а пользователям не разрешено изменять экземпляр пула потоков, поэтому вы можете избежать изменения количества потоков.
  2. newCachedThreadPool(), который представляет собой пул потоков, используемый для обработки большого количества краткосрочных рабочих задач, имеет несколько отличительных характеристик: он будет пытаться кэшировать потоки и повторно использовать их, а когда кэшированные потоки недоступны, будут создаваться новые рабочие потоки. создан; если поток бездействует более 60 секунд, он будет завершен и удален из кеша; при длительном бездействии этот пул потоков не будет потреблять никаких ресурсов. Внутри он использует SynchronousQueue в качестве рабочей очереди.
  3. newFixedThreadPool(int nThreads), повторно использует указанное количество (nThreads) потоков, за которыми используется неограниченная рабочая очередь, и в любой момент времени активны не более nThreads рабочих потоков. Это означает, что если количество задач превышает количество активных очередей, он будет ждать появления в рабочей очереди незанятого потока; если рабочий поток выйдет, будет создан новый рабочий поток, чтобы восполнить указанное количество nThreads. .
  4. newSingleThreadScheduledExecutor() создает единый пул потоков и возвращает ScheduledExecutorService, который может планировать или периодически выполнять работу.
  5. newScheduledThreadPool(int corePoolSize) похож на newSingleThreadScheduledExecutor(), он создает ScheduledExecutorService, который может выполнять плановое или периодическое планирование работы, разница заключается в одном рабочем потоке или нескольких рабочих потоках.
  6. newWorkStealingPool(int parallelism), это пул потоков, который часто упускается из виду.ForkJoinPool,использоватьWork-StealingАлгоритмы, обрабатывающие задачи параллельно, без гарантии порядка обработки.
  7. ThreadPoolExecutor — самый примитивный способ создания пула потоков.Вышеуказанные 1-3 метода создания — все инкапсуляция ThreadPoolExecutor.

Суммировать:Среди них newSingleThreadExecutor, newCachedThreadPool, newFixedThreadPool — реализация инкапсуляции ThreadPoolExecutor, newSingleThreadScheduledExecutor, newScheduledThreadPool — инкапсуляция подкласса ThreadPoolExecutor ScheduledThreadPoolExecutor, который используется для выполнения отложенных задач, а newWorkStealingPool — новый метод, добавленный в Java 8.

2.1.2 Значение пула с одним потоком

Из приведенного выше кода видно, что newSingleThreadExecutor и newSingleThreadScheduledExecutor создают однопоточные пулы, так в чем же смысл однопоточных пулов?

Хотя это однопоточный пул, он предоставляет такие функции, как рабочая очередь, управление жизненным циклом и обслуживание рабочих потоков.

2.2 Интерпретация ThreadPoolExecutor

ThreadPoolExecutor — это основной метод пула потоков. Давайте посмотрим на внутреннюю реализацию ThreadPoolExecutor и на то, как класс инкапсуляции вызывает ThreadPoolExecutor.

Начнем с конструктора, исходный код конструктора выглядит следующим образом:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

Описание параметра:

  • corePoolSize: так называемое количество основных потоков, которое можно примерно понимать как количество долгосрочных резидентных потоков (если не установлено значение allowCoreThreadTimeOut). Это значение может сильно отличаться для разных пулов потоков, например newFixedThreadPool установит его в nThreads, а для newCachedThreadPool будет равно 0.
  • maxPoolSize: как следует из названия, это максимальное количество потоков, которое может быть создано, когда потоков недостаточно. Для того же сравнения, для newFixedThreadPool, конечно же, это nThreads, потому что его требование — фиксированный размер, а для newCachedThreadPool — Integer.MAX_VALUE.
  • keepAliveTime: время поддержания активности бездействующего потока. Если время простоя потока превышает это значение, он будет закрыт. Обратите внимание, что условия для того, чтобы это значение вступило в силу, должны быть выполнены: время простоя превышает это значение, а количество потоков в пуле потоков меньше или равно количеству основных потоков corePoolSize. Конечно, основные потоки не будут закрыты по умолчанию, если только не установлено значение allowCoreThreadTimeOut(true), тогда основные потоки также могут быть перезапущены.
  • TimeUnit: единица измерения времени.
  • BlockingQueue: столбец сброса задач используется для хранения задач, которые должны быть выполнены в пуле потоков.
  • threadFactory: используется для создания потоков, обычно мы можем использовать значение по умолчанию.
  • обработчик: когда пул потоков заполнен, но отправляется новая задача, то, какую стратегию следует принять, определяется этим. Есть несколько способов на выбор, например, генерация исключения, прямой отказ, а затем возврат и т. д., или вы можете реализовать соответствующий интерфейс для реализации собственной логики.

Давайте посмотрим на вызов класса инкапсуляции пула потоков в ThreadPoolExecutor:

Исходный код пакета newSingleThreadExecutor для ThreadPoolExecutor выглядит следующим образом:

public static ExecutorService newSingleThreadExecutor() {
    return new Executors.FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>()));
}

Исходный код пакета newCachedThreadPool для ThreadPoolExecutor выглядит следующим образом:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

Исходный код пакета newFixedThreadPool для ThreadPoolExecutor выглядит следующим образом:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

Исходный код инкапсуляции ScheduledExecutorService для ThreadPoolExecutor выглядит следующим образом:

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1));
}

newSingleThreadScheduledExecutor использует ScheduledThreadPoolExecutor, подкласс ThreadPoolExecutor, как показано на следующем рисунке:

Threadpool

Исходный код инкапсуляции newScheduledThreadPool в ThreadPoolExecutor выглядит следующим образом:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

newScheduledThreadPool также использует ScheduledThreadPoolExecutor, подкласс ThreadPoolExecutor.

2.3 Статус пула потоков

Глядя на исходный код ThreadPoolExecutor, состояние потока выглядит следующим образом:

Threadpool

Интерпретация состояния потока (следующее содержимое взято из:Java OOP.com/post/java - он...

  • РАБОТАЕТ: Это ничего не говорит, это самое нормальное состояние: принимать новые задачи и обрабатывать задачи в очереди ожидания;
  • SHUTDOWN: не принимает новые отправленные задачи, но продолжает обрабатывать задачи в очереди ожидания;
  • STOP: не принимать новые отправки задач, больше не обрабатывать задачи в очереди ожидания и прерывать поток, выполняющий задачу;
  • УБОРКА: Все задачи уничтожены, workCount равен 0. Когда состояние пула потоков будет преобразовано в состояние TIDYING, будет выполнен метод ловушки terminated();
  • TERMINATED: после завершения метода Terminated() статус пула потоков станет следующим;

RUNNING определяется как -1, SHUTDOWN определяется как 0, а остальные больше 0, поэтому, когда он равен 0, задача не может быть отправлена.Если он больше 0, даже выполнение задачи необходимо прервать. .

Прочитав введение этих состояний, читатели обычно могут догадаться, что переходы между состояниями происходят в девяти случаях из десяти. Процесс перехода каждого состояния выглядит следующим образом:

  • RUNNING -> SHUTDOWN: этот переход состояния происходит, когда вызывается shutdown(), что также является наиболее важным;
  • (РАБОТАЕТ или ВЫКЛЮЧАЕТСЯ) -> STOP: Когда вызывается shutdownNow(), происходит переход этого состояния.
  • SHUTDOWN -> TIDYING: когда очередь задач и пул потоков очищаются, они будут преобразованы из SHUTDOWN в TIDYING;
  • STOP -> TIDYING: этот переход происходит, когда очередь задач пуста;
  • УБОРКА -> ПРЕКРАЩЕНИЕ: Как упоминалось ранее, когда метод terminated() завершается;

2.4 Выполнение пула потоков

Сказав так много, давайте посмотрим, как пул потоков выполняет задачи.Существует два метода отправки задач пула потоков:

  • execute
  • submit

Среди них execute может принимать только задачи типа Runnable, которые используются следующим образом:

ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
singleThreadExecutor.execute(new Runnable() {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName());
    }
});

submit может принимать задачи типа Runnable или Callable, которые используются следующим образом:

ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(new Runnable() {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName());
    }
});

2.4.1 Реализация пула потоков с возвращаемым значением

Используйте submit для передачи класса Callable, чтобы получить возвращаемое значение выполненной задачи Callable — это функция, добавленная в JDK 1.5, чтобы дополнить случай, когда Runnable не имеет возврата.

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<Long> result = executorService.submit(new Callable<Long>() {
    @Override
    public Long call() throws Exception {
        return new Date().getTime();
    }
});
try {
    System.out.println("运行结果:" + result.get());
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

2.4.2 Реализация отложенного пула потоков

В пуле потоков newSingleThreadScheduledExecutor и newScheduledThreadPool возвращают ScheduledExecutorService, который используется для выполнения пула отложенных потоков. Код выглядит следующим образом:

// 延迟线程池
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);
scheduledThreadPool.schedule(new Runnable() {
    @Override
    public void run() {
        System.out.println("time:" + new Date().getTime());
    }
}, 10, TimeUnit.SECONDS);

Полный адрес загрузки образца: GitHub.com/VIP stone/Срочно…

3. Интерпретация исходного кода пула потоков

В чтении исходного кода пула потоков есть небольшая хитрость: его можно читать последовательно в порядке выполнения пула потоков, что облегчает понимание реализации пула потоков.

Интерпретация процесса чтения исходного кода

Давайте начнем чтение с метода отправки задачи execute() пула потоков. Из execute() мы обнаружим, что основным методом, выполняемым пулом потоков, является addWorker(). В addWorker() мы обнаруживаем, что начальный поток вызывает метод start(), который вызывает После метода start() будет выполняться метод run() класса Worker, а во время выполнения будет вызываться runWorker(). Ключ к запуску программы лежит в getTask( ).После метода getTask() поток закрывается, а также рабочий процесс всего пула потоков Это сделано, давайте спустимся и посмотрим вместе (если вы не поняли эту статью, вы можете вернуться и прочтите его еще раз после прочтения исходного кода).

3.1 интерпретация исходного кода execute()

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();

    // 如果当前线程数少于核心线程数,那么直接添加一个 worker 来执行任务,
    // 创建一个新的线程,并把当前任务 command 作为这个线程的第一个任务(firstTask)
    if (workerCountOf(c) < corePoolSize) {
        // 添加任务成功,那么就结束了。提交任务嘛,线程池已经接受了这个任务,这个方法也就可以返回了
        // 至于执行的结果,到时候会包装到 FutureTask 中。
        // 返回 false 代表线程池不允许提交任务
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 到这里说明,要么当前线程数大于等于核心线程数,要么刚刚 addWorker 失败了

    // 如果线程池处于 RUNNING 状态,把这个任务添加到任务队列 workQueue 中
    if (isRunning(c) && workQueue.offer(command)) {
        /* 这里面说的是,如果任务进入了 workQueue,我们是否需要开启新的线程
         * 因为线程数在 [0, corePoolSize) 是无条件开启新的线程
         * 如果线程数已经大于等于 corePoolSize,那么将任务添加到队列中,然后进到这里
         */
        int recheck = ctl.get();
        // 如果线程池已不处于 RUNNING 状态,那么移除已经入队的这个任务,并且执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 如果线程池还是 RUNNING 的,并且线程数为 0,那么开启新的线程
        // 到这里,我们知道了,这块代码的真正意图是:担心任务提交到队列中了,但是线程都关闭了
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 如果 workQueue 队列满了,那么进入到这个分支
    // 以 maximumPoolSize 为界创建新的 worker,
    // 如果失败,说明当前线程数已经达到 maximumPoolSize,执行拒绝策略
    else if (!addWorker(command, false))
        reject(command);
}

3.2 Интерпретация исходного кода addWorker()

// 第一个参数是准备提交给这个线程执行的任务,之前说了,可以为 null
// 第二个参数为 true 代表使用核心线程数 corePoolSize 作为创建线程的界线,也就说创建这个线程的时候,
//         如果线程池中的线程总数已经达到 corePoolSize,那么不能响应这次创建线程的请求
//         如果是 false,代表使用最大线程数 maximumPoolSize 作为界线
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 这个非常不好理解
        // 如果线程池已关闭,并满足以下条件之一,那么不创建新的 worker:
        // 1. 线程池状态大于 SHUTDOWN,其实也就是 STOP, TIDYING, 或 TERMINATED
        // 2. firstTask != null
        // 3. workQueue.isEmpty()
        // 简单分析下:
        // 还是状态控制的问题,当线程池处于 SHUTDOWN 的时候,不允许提交任务,但是已有的任务继续执行
        // 当状态大于 SHUTDOWN 时,不允许提交任务,且中断正在执行的任务
        // 多说一句:如果线程池处于 SHUTDOWN,但是 firstTask 为 null,且 workQueue 非空,那么是允许创建 worker 的
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 如果成功,那么就是所有创建线程前的条件校验都满足了,准备创建线程执行任务了
            // 这里失败的话,说明有其他线程也在尝试往线程池中创建线程
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 由于有并发,重新再读取一下 ctl
            c = ctl.get();
            // 正常如果是 CAS 失败的话,进到下一个里层的for循环就可以了
            // 可是如果是因为其他线程的操作,导致线程池的状态发生了变更,如有其他线程关闭了这个线程池
            // 那么需要回到外层的for循环
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    /* 
     * 到这里,我们认为在当前这个时刻,可以开始创建线程来执行任务了,
     * 因为该校验的都校验了,至于以后会发生什么,那是以后的事,至少当前是满足条件的
     */

    // worker 是否已经启动
    boolean workerStarted = false;
    // 是否已将这个 worker 添加到 workers 这个 HashSet 中
    boolean workerAdded = false;
    Worker w = null;
    try {
        final ReentrantLock mainLock = this.mainLock;
        // 把 firstTask 传给 worker 的构造方法
        w = new Worker(firstTask);
        // 取 worker 中的线程对象,之前说了,Worker的构造方法会调用 ThreadFactory 来创建一个新的线程
        final Thread t = w.thread;
        if (t != null) {
            // 这个是整个类的全局锁,持有这个锁才能让下面的操作“顺理成章”,
            // 因为关闭一个线程池需要这个锁,至少我持有锁的期间,线程池不会被关闭
            mainLock.lock();
            try {

                int c = ctl.get();
                int rs = runStateOf(c);

                // 小于 SHUTTDOWN 那就是 RUNNING,这个自不必说,是最正常的情况
                // 如果等于 SHUTDOWN,前面说了,不接受新的任务,但是会继续执行等待队列中的任务
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // worker 里面的 thread 可不能是已经启动的
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    // 加到 workers 这个 HashSet 中
                    workers.add(w);
                    int s = workers.size();
                    // largestPoolSize 用于记录 workers 中的个数的最大值
                    // 因为 workers 是不断增加减少的,通过这个值可以知道线程池的大小曾经达到的最大值
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 添加成功的话,启动这个线程
            if (workerAdded) {
                // 启动线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 如果线程没有启动,需要做一些清理工作,如前面 workCount 加了 1,将其减掉
        if (! workerStarted)
            addWorkerFailed(w);
    }
    // 返回线程是否启动成功
    return workerStarted;
}

Как видно из этого кода, вызывается t.start();;

3.3 Интерпретация исходного кода runWorker()

Согласно приведенному выше коду, после вызова t.start() Worker немедленно будет вызван метод run() Worker.Исходный код run() выглядит следующим образом:

public void run() {
    runWorker(this);
}

Исходный код runWorker() выглядит следующим образом:

//  worker 线程启动后调用,while 循环(即自旋!)不断从等待队列获取任务并执行
//  worker 初始化时,可指定 firstTask,那么第一个任务也就可以不需要从队列中获取
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // 该线程的第一个任务(若有)
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 允许中断
    w.unlock(); 
    boolean completedAbruptly = true;
    try {
        // 循环调用 getTask 获取任务
        while (task != null || (task = getTask()) != null) {
            w.lock();          
            // 若线程池状态大于等于 STOP,那么意味着该线程也要中断
              /**
               * 若线程池STOP,请确保线程 已被中断
               * 如果没有,请确保线程未被中断
               * 这需要在第二种情况下进行重新检查,以便在关中断时处理shutdownNow竞争
               */
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 这是一个钩子方法,留给需要的子类实现
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 到这里终于可以执行任务了
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    // 这里不允许抛出 Throwable,所以转换为 Error
                    thrown = x; throw new Error(x);
                } finally {
                    // 也是一个钩子方法,将 task 和异常作为参数,留给需要的子类实现
                    afterExecute(task, thrown);
                }
            } finally {
                // 置空 task,准备 getTask 下一个任务
                task = null;
                // 累加完成的任务数
                w.completedTasks++;
                // 释放掉 worker 的独占锁
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 到这里,需要执行线程关闭
        // 1. 说明 getTask 返回 null,也就是说,这个 worker 的使命结束了,执行关闭
        // 2. 任务执行过程中发生了异常
        //    第一种情况,已经在代码处理了将 workCount 减 1,这个在 getTask 方法分析中说
        //    第二种情况,workCount 没有进行处理,所以需要在 processWorkerExit 中处理
        processWorkerExit(w, completedAbruptly);
    }
}

3.4 Интерпретация исходного кода getTask()

В runWorker есть getTask(), давайте посмотрим на конкретную реализацию:

// 此方法有三种可能
// 1. 阻塞直到获取到任务返回。默认 corePoolSize 之内的线程是不会被回收的,它们会一直等待任务
// 2. 超时退出。keepAliveTime 起作用的时候,也就是如果这么多时间内都没有任务,那么应该执行关闭
// 3. 如果发生了以下条件,须返回 null
//     池中有大于 maximumPoolSize 个 workers 存在(通过调用 setMaximumPoolSize 进行设置)
//     线程池处于 SHUTDOWN,而且 workQueue 是空的,前面说了,这种不再接受新的任务
//     线程池处于 STOP,不仅不接受新的线程,连 workQueue 中的线程也不再执行
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
   for (;;) {
   			// 允许核心线程数内的线程回收,或当前线程数超过了核心线程数,那么有可能发生超时关闭
            // 这里 break,是为了不往下执行后一个 if (compareAndDecrementWorkerCount(c))
            // 两个 if 一起看:如果当前线程数 wc > maximumPoolSize,或者超时,都返回 null
            // 那这里的问题来了,wc > maximumPoolSize 的情况,为什么要返回 null?
            // 换句话说,返回 null 意味着关闭线程。
            // 那是因为有可能开发者调用了 setMaximumPoolSize 将线程池的 maximumPoolSize 调小了
            // 如果此 worker 发生了中断,采取的方案是重试
            // 解释下为什么会发生中断,这个读者要去看 setMaximumPoolSize 方法,
            // 如果开发者将 maximumPoolSize 调小了,导致其小于当前的 workers 数量,
            // 那么意味着超出的部分线程要被关闭。重新进入 for 循环,自然会有部分线程会返回 null
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                // CAS 操作,减少工作线程数
                decrementWorkerCount();
                return null;
            }
            int wc = workerCountOf(c);
            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
            // 如果此 worker 发生了中断,采取的方案是重试
            // 解释下为什么会发生中断,这个读者要去看 setMaximumPoolSize 方法,
            // 如果开发者将 maximumPoolSize 调小了,导致其小于当前的 workers 数量,
            // 那么意味着超出的部分线程要被关闭。重新进入 for 循环,自然会有部分线程会返回 null
                timedOut = false;
            }
        }
}

В-четвертых, процесс выполнения пула потоков

Поток выполнения пула потоков выглядит следующим образом:

Threadpool

V. Резюме

Резюме этой статьи представлено в форме вопроса и ответа, который цитируется из «Глубокая интерпретация идей дизайна пула потоков Java и реализация исходного кода», а адрес ссылки прилагается внизу.

1. Каковы ключевые атрибуты пула потоков?

  • Потоки между corePoolSize и maxPoolSize будут переработаны.Конечно, потоки corePoolSize также могут быть переработаны, установив (allowCoreThreadTimeOut(true)).

  • workQueue используется для хранения задач, при добавлении задач, если текущее количество потоков превышает corePoolSize, то задачи вставляются в очередь, а за вытаскивание задач из очереди будут отвечать потоки в пуле потоков.

  • keepAliveTime используется для установки времени простоя, если количество потоков превышает corePoolSize, а время простоя некоторых потоков превышает это значение, будет выполнена операция закрытия этих потоков.

  • rejectExecutionHandler используется для обработки ситуации, когда пул потоков не может выполнить задачу.По умолчанию он генерирует исключение RejectedExecutionException, игнорирует задачу, использует поток, отправивший задачу, для выполнения задачи и удаляет самую долгую ожидающую задачу в очереди. , а затем отправляет задачу. Четыре стратегии, по умолчанию выдается исключение.

2. Когда время создания потока в пуле потоков?

  • Если текущее количество потоков меньше, чем corePoolSize, при отправке задачи создается новый поток, и этот поток выполняет задачу;

  • Если текущее количество потоков достигло значения corePoolSize, добавьте отправленные задачи в очередь и подождите, пока потоки в пуле потоков не получат задачи из очереди;

  • Если очередь заполнена, то создать новый поток для выполнения задачи, нужно следить, чтобы количество потоков в пуле не превышало maxPoolSize, и если количество потоков превышает maxPoolSize в это время, политика отклонения выполняется.

3. Как быть с исключениями во время выполнения задачи?

Если при выполнении задачи возникает исключение, поток, выполняющий задачу, будет остановлен, вместо того чтобы продолжать получать другие задачи. Затем запускается новый поток, заменяющий его.

4. Когда будет реализована политика отказа?

  • Количество рабочих процессов достигает corePoolSize, задача успешно ставится в очередь, и пул потоков одновременно закрывается, и закрытие пула потоков не удаляет задачу из очереди, после чего выполняется политика отклонения. Это очень пограничная проблема. Поставьте в очередь и закройте пул потоков для параллельного выполнения. Читатели внимательно рассмотрят, как метод execute входит в первую команду reject (команду).
  • Количество воркеров больше или равно corePoolSize, готовы присоединиться к очереди, но очередь заполнена и задача не может присоединиться к очереди, затем подготовиться к запуску нового потока, но количество потоков достигло максимального размера пула, затем выполните политику отклонения.

6. Ссылки

Книга: "Эффективное кодирование: руководство по разработке на Java"

Основные технологии Java 36 лекций:t.cn/EwUJvWA

Углубленная интерпретация идей дизайна пула потоков Java и реализация исходного кода:Java OOP.com/post/java - он...

Пул потоков Java — анализ исходного кода ThreadPoolExecutor (на основе Java8):Ууууу. ИМО OC.com/article/429…

Рекомендация курса:

线程池创建