Коллеги имеют кое-что сказать: как делает ThreadPoolexecutor Recycle Threats

Java

Недавно мой любознательный коллега задал мне ряд вопросов, один из которых касался пула потоков ThreadPoolExecutor:Как ThreadPoolExecutor повторно использует потоки? на основе "Talk is cheap, show me the code.Основной принцип, давайте посмотрим непосредственно на исходный код ThreadPoolExecutor.

написать впереди

Подробное введение в принцип работы ThreadPoolExecutor не является целью этой статьи, здесь рекомендуется детская обувь для растрескивания стен.Java Thread Pount Pool Принцип реализации и практика бизнес-групп в Соединенных ШтатахЭту статью лучше всего читать с исходным кодом JDK, и эффект будет лучше. Также прилагаются заметки, которые я сделал ранее (это карта разума, надеюсь, она поможет друзьям, которые копают, систематизируют свои идеи):JUC: принцип работы ThreadPoolExecutor

Ладно, без лишних слов, давайте к делу~

Обзор ThreadPoolExecutor

Сначала получите некоторые необходимые знания, иначе вы можете запутаться, когда будете смотреть на содержание, которое следует.

  • Использование ThreadPoolExecutorctlПеременные хранят состояние пула потоков, высокое значение 3 указывает на рабочее состояние.runStateмладшие 29 бит указывают количество рабочих потоковworkerCount

  • Рабочий статусrunStateИмеются следующие значения:
    • RUNNING: получать новые отправки задач и обрабатывать задачи в очереди буфера задач.
    • SHUTDOWN: не получать новые отправки задач, а обрабатывать задачи в очереди буфера задач.
    • STOP: не получает новые отправки задач, не обрабатывает задачи в очереди буфера задач и прерывает задачи в процессе выполнения.
    • TIDYING: все задачи завершены, количество рабочих потоков равно 0, поток переходит вTIDYINGгосударство позвонитterminated()функция крючка.
    • TERMINATED:terminated()Выполнение метода завершено.
  • runStateПоток между состояниями выглядит следующим образом:

  • ThreadPoolExecutor определяют представленную задачу из рабочей нити, которая не связана напрямую. Это создает модель производителя-потребителя внутри: подавитель задачи является производителем, а рабочая нить играет роль потребителя, который отвечает за задача реализации. Процесс работы всего ThreadPoolExecutuecuteCuteCute отображается на следующем рисунке: (рисунок поступает изПринцип реализации пула потоков Java и его практика в бизнесе Meituan)

Как отправляются задачи: метод execute() класса ThreadPoolExecutor

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // ctl变量存储着当前线程池的运行状态runState和总线程数量workerCount
    int c = ctl.get();
    // 当前工作者线程数小于设置的corePoolSize值
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 当前线程池状态runState为RUNNING,且成功提交任务到任务缓冲队列中
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 重新检查线程池状态,必要时对刚提交到队列中的任务执行回退即remove操作
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            // 如果当前可用的工作者线程数为0,创建新的工作者线程
            addWorker(null, false);
    }
    // 如果工作者线程数大于corePoolSize,且任务缓冲队列workQueue已满,则创建新的工作者线程,直到数量到达maximumPoolSize上限
    else if (!addWorker(command, false))
        reject(command);
}

Вышеизложенное можно прочитать на блок-схеме нижеexecute()Исходный код метода: execute()Более важный метод появляется в методеaddWorker(), давайте щелкнем и посмотрим, что делает этот метод:

/**
* @param firstTask 提交的待执行的任务,可能为null,此时代表应该创建新的工作线程
* 处理任务缓冲队列中待处理的任务。
* @param core 布尔值,为true代表使用corePoolSize作为边界,为false则使用maximumPoolSize
/*
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 检查当前线程池状态
        // 需要注意的是,当runState==SHUTDOWN时,此时线程池不允许提交新的任务,但需要把任务缓冲队列中的任务处理完,所以如果任务缓冲队列非空且提交的firstTask参数为null,代表应该创建新的工作线程处理队列中待完成的任务,此时不应该直接return
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            // CAPACITY是理论上工作线程数量的最大值(2^29)
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // CAS非阻塞地更新ThreadPoolExecutor本身维护的工作线程数量,更新成功则跳出循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 重新获取一次线程池状态,如果运行状态有变更,则回到retry块的开头重新开始
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // 如果重新获取的线程池运行状态没有变更,证明只是CAS更新失败,则只需要重新执行CAS更新工作线程数量的逻辑即可。
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
				
                // 正如前面提到的,需要addWorker时,线程池状态可能为RUNNING,也可能为SHUTDOWN
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 将新创建的worker添加到全局维护的worker集合中(workers其实是一个HashSet)
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        // 跟踪最大的池大小
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 这里会执行worker的run()方法
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

addWorker()Метод выглядит длинным, но на самом деле он мало что дает, всего две вещи:

  • обнаружитьrunStateСтатус, обновление CASworkerCount.
  • Под эксклюзивной защитой ReentrantLock создайте новый воркер и проверьте еще разrunStateсостояние, вновь созданный рабочий поток безопасно добавляется вworkers(Hashset Collection) и выполнение, сохраняя при сохраненииlargestPoolSizeПеременная (для отслеживания максимального размера пула).

Вы также можете прочитать вышеизложенное в соответствии с приведенной ниже блок-схемой.addWorker()Исходный код:

Как выполняется отправленная задача: метод runWorker() класса ThreadPoolExecutor

Worker — это закрытый класс внутри ThreadPoolExecutor, который определяется следующим образом (часть кода опущена):

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
{
    /** 该Worker所在的线程,即工作线程本体 */
    final Thread thread;
    /** 初始化时带入的预备执行的任务,可能为null */
    Runnable firstTask;
    /** 记录着这个Worker完成的总任务数 */
    volatile long completedTasks;

    /** 构造函数,执行一些初始化操作 */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** 具体的运行逻辑委托给外部的即ThreadPoolExecutor的runWorker()方法执行  */
    public void run() {
        runWorker(this);
    }

    // Lock methods
    // Worker通过继承AQS类自定义了一个独占锁,重写了相关方法,这里省略不给出
    .......
}

оглядыватьсяaddWorker()метод, когда вновь созданный Worker успешно помещается в коллекцию Worker, потокstart()метод будет называться так, чтобы рабочий объектrun()будет вызван метод, иrun()внутри называетсяrunWorker(), поэтому давайте сосредоточимся на этом методе:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 如果该Worker被创建时的firstTask不为null,则首先直接执行该task任务
        // 否则调用getTask()方法从任务缓冲队列中获取任务并执行
        while (task != null || (task = getTask()) != null) {
            // 获取Worker自定义的独占锁,确保该Worker在执行任务过程中不会被外部中断
            w.lock();
            // 如果检测到当前线程池状态已经进入到STOP,则需要进一步确认当前线程是否已经被中断
            // 如果没有被中断,则需要执行中断操作。
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 万事俱备,执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 对于跳出while循环,即不再从任务缓冲队列中获取任务并执行的Worker,需要执行回收逻辑
        // 这里也就是本文关心的ThreadPoolExecutor回收线程的地方
        processWorkerExit(w, completedAbruptly);
    }
}

runWorker()Процесс исполнения таков:

  1. пока цикл продолжаетсяgetTask()способ получить задание.
  2. getTask()Метод извлекает задачу из очереди блокировки.
  3. Если пул потоков вошелSTOPсостояние, затем убедитесь, что текущий поток находится в прерванном состоянии, в противном случае убедитесь, что текущий поток не находится в прерванном состоянии.
  4. выполнять задания.
  5. еслиgetTask()РезультатnullЗатем выпрыгите из петли и выполнитеprocessWorkerExit()Метод, уничтожь ветку.

Приведенное выше можно прочитать на блок-схеме нижеrunWorker()Исходный код:

Как перерабатывается поток ThreadPoolExecutor: processWorkerExit()

Это уже известно из содержания предыдущего разделаprocessWorkerExit()метод будет вrunWorker()Метод выполнен, вы можете сначала увидеть, что он делает:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        // 实际这里就是回收线程的主要操作了,移除线程池对该线程的引用,使其可以被JVM正常地回收
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    tryTerminate();
    // 由于引起线程回收的可能性有很多,线程池还要判断是什么引发了这次回收,
    // 是否要改变线程池的现阶段状态,是否要根据新状态,重新分配线程,于是就有了下面这部分逻辑
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

Таким образом, восстановление нитей в пуле резьбы JVM зависимое автоматическое восстановление, резьба пула резьбы, чтобы выполнить задание состоит в том, чтобы поддерживать определенное количество ссылок на состояние текущего пула резьбы, чтобы предотвратить эту часть нити JVM, когда Нитки пула-пул необходимо решить, какое восстановление и необходимо устранить ссылки.

Когда пул потоков ThreadPoolExecutor повторно использует потоки?

Из предыдущего контента вы можете узнать, что работает WorkerrunWorker()метод, он будет вgetTask()метод возвращаетnull, то есть выйти из цикла, когда из очереди буфера задач не получено ни одной задачи, и выполнитьprocessWorkerExit()Перезапустить текущий рабочий поток, поэтому, когда ThreadPoolExecutor перезапускает поток, это фактически зависит отgetTask()когда вернутьсяnullДавайте рассмотрим внизgetTask()Что происходит внутри:

private Runnable getTask() {
    boolean timedOut = false;

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 检测线程池是否正在终止,若此时runState进入了STOP或者任务缓冲队列未空,则减少工作线程数量并返回null
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // 当允许核心线程超时或者工作线程数大于设置的核心线程数量上限时,timed被设为true
        // 后文的分析我们基于allowCoreThreadTimeOut=false的前提,即核心线程不会超时
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 从任务缓冲队列中获取任务
            // 根据timed的值来确定调用限时获取的poll()方法还是阻塞获取的take()方法
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                // 成功从任务缓冲队列中获取到任务则直接返回
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            // 发生中断就重置timedOut,并重新一轮循环
            timedOut = false;
        }
    }
}

getTask()Основная логика заключается в том, что в бесконечном цикле for изworkQueueУберите задачу для выполнения из очереди блокировки, верните задачу, если она может быть получена обычным образом, или продолжайте получать ее в цикле до тех пор, пока получение не будет успешным, если толькоСостояние пула потоков переходит в режим остановкиилиСпециальность пуста,илиКоличество рабочих потоков превышает максимальный размер пула maxPoolSize.,илиполучить тайм-аут задачи,getTask()Метод выйдет из цикла for и вернется.

Прочитайте блок-схему нижеgetTask()Исходный код:

Вернемся к предмету нашего беспокойства,getTask()когда он вернетсяnull? На самом деле блок-схема была дана в основном в двух местах:

  1. Во-первых, остановлен ли пул потоков в соответствии с исходным кодом.

Когда ThreadPoolExecutorshutdown()Вызывается метод, входит пул потоковSHUTDOWNстатус, возможны две ситуации:

  • перечислитьshutdown(), все задачи выполнены и завершены, на данный моментgetTask()заблокирован вworkQueue.take()

отshutdown()Например, его внутренний стек вызовов вызоветinterruptIdleWorkers(false);interruptIdleWorkers()Внутри выглядит так:

этоshutdown()будет прерывать каждый бездействующий рабочий поток, хранящийся в пуле потоков (getTask()Метод не держит пользовательскую эксклюзивную блокировку объекта Worker.Если не верите, перейдите к исходному коду 🌚 ),getTask()метод начнется сworkQueue.take()Вернитесь в блокировку (поскольку очередь блокировки пула потоков поддерживает реагирование на прерывания) и войдите в следующий раунд для итерации в обычном режиме, после чего программа перейдет к первому условному суждению «остановлен ли пул потоков», отвечающему требованиям условия и вернуть ноль.

  • перечислитьshutdown()В это время в очереди буфера задач все еще есть невыполненные задачи.getTask()в нормальном цикле

ПредположениеgetTask()идтиworkQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS), он получает задачи из очереди буфера задач, если вызывается метод shutdown(), рабочий поток прерывается из-заworkQueue.poll()Метод реагирует на прерывания и по желанию сразу возвращает null,getTask()Он будет продолжать входить в следующий раунд для итерации. В это время он перейдет к первому решению условия "остановлен ли пул потоков". В это время условие не выполняется, потому чтоworkQueueне пусто, поэтомуgetTask()будет продолжать работать нормально доworkQueueЕсли он пуст, то есть оставшиеся задачи будут обработаны до того, как она обычно вышла из цикла и возвращается в NULL.

В этом втором случае также есть очень частный случай, который необходимо обсудить отдельно, а именно вызовshutdown()В это время в очереди буфера задач осталось только 2 задачи, но рабочих потоков в это время все еще 4. По результатам, рассмотренным выше, легко предположить, что 2 из 4 рабочих потоков будут работать нормально. .getTask()Первое условное суждение «остановлен ли пул потоков» завершается и возвращается.null, то 2 рабочих потока останутся заблокированными вgetTask()изworkQueue.take()shutdown()Каждому подходящему работнику будет отправлен только один сигнал прерывания, после чего эти два сигнала будут заблокированы.workQueue.take()Как Worker нормально выходит из цикла и перерабатывается? Это зависит от того, что мы упоминали ранее, отвечает за переработку рабочих потоков.processWorkerExit()метод, который будет внутренне вызыватьtryTerminate()метод, и этоtryTerminate()будет вызываться внутреннеinterruptIdleWorkers(true)метод, при передачеtrue, этот метод будет прерывать только один из многих бездействующих рабочих процессов, удерживаемых текущим пулом потоков, то есть один из двух заблокированных рабочих процессов, упомянутых выше, будет разбужен сигналом прерывания и нормально выйдет из цикла, и этот вызывается сигналом прерывания. Пробужденный Worker находится в процессе перезапуска (processWorkerExit()Called) отправит новый сигнал прерывания другому заблокированному рабочему потоку, который похож на домино, серия заблокированных рабочих потоков будет нормально просыпаться из-за прерывания одного из рабочих потоков.Заблокированные рабочие потоки, в конечном итоге все рабочие потоки перерабатываются.

  1. Во-вторых, является ли текущее количество рабочих потоков слишком большим, что соответствует исходному коду.

Это легко объяснить, соответствующий сценарийshutdown()Не вызывается, пул потоков находится вRUNNINGгосударство,workQueueВсе задачи были загружены и выполнены. В этом сценарии пул потоков уменьшит количество рабочих потоков доcorePoolSizeРазмер (предполагая allowcorethreadtimeout = false),timedиtimedOutстанетtrue, а из-заworkQueueОн пуст, поэтому второе условное суждение «не слишком ли много текущих рабочих потоков» будет оцениваться какtrue, а затем выйти из цикла и вернутьсяnull.

На данный момент ясно, что «пул потоков THREADPOOLEXECUTOR для восстановления потока».

постскриптум

После разбора кидайте этому прилежному коллеге, коллега называет его хорошим парнем🐶

Наконец, ссылочные ресурсы даются как обычно: