Углубленный анализ пула потоков серии мертвых потоков Java - жизненный цикл

Java

threadpool_life

(удобнее просматривать исходный код на горизонтальном экране мобильного телефона)

Примечание. Часть анализа исходного кода Java основана на версии java8, если не указано иное.

Примечание. Часть исходного кода пула потоков относится к классу ThreadPoolExecutor, если не указано иное.

Введение

В прошлой главе мы вместе рассмотрели жизненный цикл потоков (помните шесть состояний?), но знаете ли вы, что пул потоков тоже имеет жизненный цикл? !

вопрос

(1) Каково состояние пула потоков?

(2) Как различные состояния влияют на задачи в очереди задач?

Сначала перейдите к исходному коду

На самом деле, когда мы говорили об архитектуре пула потоков, мы говорили о некоторых методах, таких как ShutDown()/shutDownNow(), которые связаны с жизненным циклом пула потоков.

Давайте сначала посмотрим на статус и связанные методы в жизненном цикле, определенном в пуле потоков ThreadPoolExecutor:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // =29
private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // =000 11111...

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS; // 111 00000...
private static final int SHUTDOWN   =  0 << COUNT_BITS; // 000 00000...
private static final int STOP       =  1 << COUNT_BITS; // 001 00000...
private static final int TIDYING    =  2 << COUNT_BITS; // 010 00000...
private static final int TERMINATED =  3 << COUNT_BITS; // 011 00000...

// 线程池的状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 线程池中工作线程的数量
private static int workerCountOf(int c)  { return c & CAPACITY; }
// 计算ctl的值,等于运行状态“加上”线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; }

Из приведенного выше кода мы можем получить:

(1) Состояние пула потоков и количество рабочих потоков хранится в управляющей переменной ctl, аналогично переменной состояния в AQS, но здесь используется напрямую AtomicInteger, и его также можно заменить на unsafe+ летучий;

(2) Старшие три бита ctl сохраняют текущее состояние, а младшие 29 бит сохраняют количество рабочих потоков, что означает, что количество потоков может быть не более (2^29-1), что является ПРОМЫШЛЕННОСТЬЮ. над;

(3) Существует пять состояний пула потоков, а именно: РАБОТАЕТ, ВЫКЛЮЧЕНИЕ, ОСТАНОВКА, УБОРКА, ПРЕКРАЩЕНИЕ;

(4) ВЫПОЛНЯЕТСЯ, что означает, что новые задачи приемлемы и задачи в очереди могут быть выполнены;

(5) SHUTDOWN, указывающее, что новые задачи не принимаются, но задачи в очереди могут выполняться;

(6) STOP, что означает, что новые задачи не принимаются, задачи в очереди больше не выполняются, а задачи, которые выполняются, прерываются;

(7) TIDYING, все задачи завершены, а количество рабочих потоков равно 0. Поток, который наконец перейдет в это состояние, выполнит хук-метод terminated(), и только один поток выполнит этот метод;

(8) TERMINATED, прерванное состояние, был выполнен хук-метод terminated();

блок-схема

Давайте посмотрим, как протекают эти состояния:

threadpool_life

(1) Когда создается новый пул потоков, его начальное состояние — RUNNING, что можно увидеть, когда ctl определен выше;

(2) РАБОТА->ЗАВЕРШЕНИЕ, при выполнении метода shutdown();

(3) RUNNING->STOP, при выполнении метода shutdownNow();

(4) SHUTDOWN->STOP, когда выполняется метод shutdownNow() [Эта статья изначально была создана общедоступным ведомым номером «Tongge Reading Source Code»];

(5) STOP->TIDYING, после выполнения shutdown() или shutdownNow() все задачи были завершены, а количество рабочих потоков равно 0, в это время будет выполняться метод terminated();

(6) УБОРКА->ПРЕРЫВАНИЕ, после выполнения метода terminated();

Анализ исходного кода

Вы считаете, что исходник статуса выложен, а картинка готова? Это определенно невозможно, давайте посмотрим, как это контролировать в исходном коде.

(1) БЕГ

RUNNING относительно прост.Когда создается пул потоков, инициализируется ctl, а ctl инициализируется в состояние RUNNING, поэтому начальным состоянием пула потоков является состояние RUNNING.

// 初始状态为RUNNING
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

(2) ВЫКЛЮЧЕНИЕ

При выполнении метода shutdown() измените состояние на SHUTDOWN, что здесь точно сработает, потому что метод advanceRunState() является спиновым, и в случае неудачи он не завершится.

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 修改状态为SHUTDOWN
        advanceRunState(SHUTDOWN);
        // 标记空闲线程为中断状态
        interruptIdleWorkers();
        onShutdown();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}
private void advanceRunState(int targetState) {
    for (;;) {
        int c = ctl.get();
        // 如果状态大于SHUTDOWN,或者修改为SHUTDOWN成功了,才会break跳出自旋
        if (runStateAtLeast(c, targetState) ||
            ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}

(3) СТОП

При выполнении метода shutdownNow() состояние пула потоков изменяется на состояние STOP, и все потоки помечаются как прерванные.

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 修改为STOP状态
        advanceRunState(STOP);
        // 标记所有线程为中断状态
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        // 【本文由公从号“彤哥读源码”原创】
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

Что касается того, отвечает ли поток на прерывание, он фактически отвечает в методе очереди take () или poll () и, наконец, переходит к AQS.Когда они обнаруживают, что поток прерван, они выдают InterruptedException, а затем поймать это исключение в getTask(), И на следующем вращении выйти из текущего потока и уменьшить количество рабочих потоков.

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 如果状态为STOP了,这里会直接退出循环,且减少工作线程数量
        // 退出循环了也就相当于这个线程的生命周期结束了
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 真正响应中断是在poll()方法或者take()方法中
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            // 这里捕获中断异常
            timedOut = false;
        }
    }
}

Здесь возникает проблема, а именно, что делать с задачей, которая была извлечена и возвращена функцией getTask()?

На самом деле они будут выполняться нормально.Заинтересованные студенты могут сами взглянуть на метод runWorker().Мы разберем этот метод в следующем разделе.

(4) УБОРКА

После выполнения shutdown() или shutdownNow() он войдет в это состояние, если все задачи были прерваны и количество рабочих потоков равно 0.

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        // 下面几种情况不会执行后续代码
        // 1. 运行中
        // 2. 状态的值比TIDYING还大,也就是TERMINATED
        // 3. SHUTDOWN状态且任务队列不为空
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        // 工作线程数量不为0,也不会执行后续代码
        if (workerCountOf(c) != 0) {
            // 尝试中断空闲的线程
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // CAS修改状态为TIDYING状态
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // 更新成功,执行terminated钩子方法
                    terminated();
                } finally {
                    // 强制更新状态为TERMINATED,这里不需要CAS了
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

Коды, которые фактически обновляют состояния TIDYING и TERMINATED, находятся в методе tryTerminate().На самом деле, метод tryTerminated() вызывается во многих местах, таких как shutdown(), shutdownNow(), и когда поток завершается, так что почти каждый поток. Когда он, наконец, умрет, будет вызван метод tryTerminate(), но, в конце концов, только один поток действительно будет выполняться до места, где состояние модификации — TIDYING.

После изменения статуса на TIDYING, выполните метод terminated() и, наконец, измените статус на TERMINATED, что указывает на то, что пул потоков действительно мертв.

(5) ПРЕКРАЩЕНО

См. анализ в УБОРКЕ.

пасхальные яйца

В этой главе мы изучили жизненный цикл пула потоков с точки зрения определения состояния, блок-схемы, анализа исходного кода и т. д. Как вы справляетесь с этим?

В следующей главе мы начнем изучать основной процесс выполнения задач в пуле потоков.Студенты, которые боятся этого содержания, могут сначала прочитать две статьи «Рукописный пул потоков», написанные братом Тонгом, а затем изучить основные процесс пула потоков.Очень полезно.

Добро пожаловать, чтобы обратить внимание на мою общедоступную учетную запись «Брат Тонг читает исходный код», проверить больше статей из серии исходного кода и поплавать в океане исходного кода с братом Тонгом.

qrcode