Анализ исходного кода пула потоков Java

исходный код

Зачем использовать пулы потоков

  1. Созданные потоки повторно используются с помощью технологии пула, чтобы избежать потерь, вызванных частым созданием и уничтожением потоков, снизить потребление ресурсов и повысить скорость отклика.
  2. Когда сервер запускает большое количество задач, создание большого количества потоков будет потреблять пространство памяти сервера и влиять на использование сервера.Пул потоков может играть роль управления потоками.
  3. Пул потоков является расширяемым, что позволяет разработчикам добавлять к нему дополнительные функции.

Общий дизайн пула потоков и анализ исходного кода

Давайте взглянем на макрос работающего механизма пула потоков.

image.png

Как видно из работающего механизма пула потоков, внутри пула потоков строится модель производитель-потребитель, которая развязывает поток и задачу и не имеет прямого отношения. будет временно храниться в очереди задач, таким образом хорошо буферизуя задачи

состояние работы пула потоков

Сам пул потоков имеет состояние, и следующие пять состояний пула потоков

Рабочий статус описание статуса
RUNNING Пул потоков может получать новые отправки задач, а также может нормально обрабатывать задачи в очереди блокировки.
SHUTDOWN Пул потоков больше не принимает новые отправки задач и может продолжать обрабатывать задачи в очереди блокировки.
STOP Новые задачи не принимаются, а существующие задачи в очереди блокировки отбрасываются, кроме того, это прерывает выполнение задач.
TIDYING После того, как все задачи выполнены (включая задачи в очереди блокировки), количество активных потоков в текущем пуле потоков уменьшается до 0, и вызывается завершенный метод.
TERMINATED Состояние завершения пула потоков. Когда завершаемый метод выполняется, пул потоков переходит в это состояние.

Поток состояния пула потоков

переход состояния метод перехода состояния
RUNNING -> SHUTDOWN При вызове метода завершения работы пула потоков или при неявном вызове метода финализации (метод завершения работы будет вызываться внутри этого метода)
РАБОТА, ВЫКЛЮЧЕНИЕ -> СТОП Когда вызывается метод shutdownNow пула потоков
SHUTDOWN -> TIDYING Когда и пул потоков, и очередь блокировки становятся пустыми
STOP -> TIDYING Когда пул потоков становится пустым
TIDYING->TERMINATED Когда завершенный метод выполняется

image.png

Так как же пул потоков управляет своим собственным состоянием выполнения и количеством потоков в пуле потоков?

// ctl:高三位表示线程池运行状态,低29位表示线程池线程运行数量
// 一个变量存储两个值的好处是不必费心思(比如加锁)去维护两个状态的一致性
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// 获取线程池当前的运行状态(~:按位取反,即0变成1,1变成0。)
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 获取线程池当前运行的线程数量
private static int workerCountOf(int c)  { return c & CAPACITY; }
// 通过线程池状态和运行的线程数量获取ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }

Далее анализ исходного кода

Давайте взглянем на макрос отношения наследования каждого класса в пуле потоков.

Основным классом реализации пула потоков в Java является ThreadPoolExecutor.

image.png

Executor: Он предоставляет только интерфейс для выполнения задач. Пользователям не нужно обращать внимание на то, как создавать потоки и как планировать потоки. Им нужно только предоставить объект Runnable.

ExecutorService: на основе выполнения задач добавляются новые интерфейсы, такие как отправка задач, а также управление и контроль жизненного цикла пула потоков.

AbstractExecutorService: абстрактный класс, который последовательно соединяет процесс выполнения задач, гарантируя, что реализация нижнего уровня должна сосредоточиться только на одном методе выполнения задач.

ThreadPoolExecutor: с одной стороны, он поддерживает свой собственный жизненный цикл, а с другой стороны, он одновременно управляет потоками и задачами, так что их можно хорошо комбинировать для выполнения параллельных задач.

Задачи выполнения пула потоков

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        // 如果线程池中的线程数量小于coolPoolSize
        if (workerCountOf(c) < corePoolSize) {
            // 添加一个线程,并且把提交过来的线程当成firsttask
            if (addWorker(command, true))
                return;
            // 因为线程池的状态和运行的线程数量可能随时都会改变,所以要对线程池时刻进行检查
            c = ctl.get();
        }
        // 进入这个判断是因为上面的判断不符合条件,要么是corePoolSize已达上限,要么是添加线程失败
        // 那么就要进行入队操作,入队操作之前要先判断线程池的状态
        if (isRunning(c) && workQueue.offer(command)) {
          // 再次获取ctl值,时时刻刻做判断
            int recheck = ctl.get();
            // 如果线程池不是在运行状态,那么就会执行后面的remove操作,相当于一次回滚,把这次执行的线程	
            // remove掉
            if (! isRunning(recheck) && remove(command))
              	// 执行拒绝策略
                reject(command);
                // 走到这里说明线程池处于执行状态
            else if (workerCountOf(recheck) == 0)
              	// 工作线程为0,创建一个非核心线程,防止存在有任务但是没有线程执行的情况
                addWorker(null, false);
        }
        // 创建新的线程失败了,直接拒绝
        else if (!addWorker(command, false))
            reject(command);
    }

image.png

Задача добавления пула потоков

// 入参core为true的话,会拿corePoolSize作为临界条件,false的话会拿maximumPoolSize作为临界条件
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            // 获取线程池运行的状态
            int rs = runStateOf(c);
        // 先做一个到底应不应该创建线程的判断
        // 这个判断条件可以简化为 (rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || 					
        // workQueue.isEmpty()))
        // 分为三种情况:1. rs > SHUTDOWN (线程池状态处于STOP、TIDYING、TERMINATED,添加工作线程失败)
        //             2.rs >= SHUTDOWN && firstTask != null 
        //             3.rs >= SHUTDOWN && workQueue.isEmpty()
        // 在这三种情况下,都不会再创建新的线程
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            // 线程池中正在运行的线程的数量
            int wc = workerCountOf(c);
            // 判断线程是否已达上限
            // 如果添加corePoolSize中的线程,判断是否超过corePoolSize的上限
            // 如果添加maximumPoolSize中的线程,判断是否超过maximumPoolSize的上限
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 新增线程数,如果成功,则跳出循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 再次获取c
            c = ctl.get();  // Re-read ctl
            // 线程池的状态是否等于最开始的状态
            if (runStateOf(c) != rs)
              	// 不等于表示线程池有改变,需要重新执行一遍之前的操作
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
    	// 创建了一个worker对象
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            // ReentrantLock独占锁
            mainLock.lock();
            try {
                // 再次获取线程池状态
                int rs = runStateOf(ctl.get());
                // 先做一个是否可以运行线程的判断
                // 1.线程池状态处于运行状态
                // 2.线程池状态处于SHUTDOWN状态但task==null,因为SHUTDOWN状态不接受新的任务
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 将worker添加到一个hashset中
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                      	// 用于记录出现过的最大线程数。
                        largestPoolSize = s;
                    // 做一个标志,表示worker线程添加到了hashset当中
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
            	// 执行线程
                t.start();
                // 工作线程已启动的标志
                workerStarted = true;
            }
        }
    } finally {
    	// 线程启动失败,做一些回滚的操作
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

Рабочий класс

РабочийThreadPoolExecutorОн реализует интерфейс Runnable и наследует AQS.Реализация интерфейса Runnable означает, что Worker является потоком, а наследование AQS заключается в реализации функции монопольной блокировки.

    private final class Worker
      extends AbstractQueuedSynchronizer
      implements Runnable{
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
      	// 执行任务的线程
        final Thread thread;
        /** Initial task to run.  Possibly null. */
      	// 执行的任务
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
          // 新建线程的时候,设置state -1 是为了防止线程还未执行时(线程只有在执行的时候才会被中断),就被					// 其它线程显式调用shutdown方法中断了,因为中断是要判断state大于等于0才会中断
          setState(-1); 
          this.firstTask = firstTask;
          // 新建了一个线程
          this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

Способ создания нового потока в классе Worker имеет реализацию по умолчанию.

static class DefaultThreadFactory implements ThreadFactory {
      	// 系统中,可能不止一个线程池,所以这个是个静态字段
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
      	// 每个线程归属于特定的线程池,所以这个字段非静态
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            // 线程名字前缀
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
              	// 对于线程池的线程来说,都是用户线程
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
              	// 线程优先级都是一样的
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
final void runWorker(Worker w) {
        // 获取当前线程
        Thread wt = Thread.currentThread();
        // 获取任务
        Runnable task = w.firstTask;
        // 显式将任务置为空,防止产生错乱的问题,下一次拿到重复的
        w.firstTask = null;
        // 将线程state置为0(创建Worker的时候state为-1),运行线程时允许线程中断
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 循环判断任务(firstTask或从队列中获取的task)是否为空
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
              	// 判断线程池的状态,是否处于stop状态,或者线程是否被中断
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 回调,可以适当做扩展
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                      	// 执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    // 线程执行完成个数,起到一个统计的作用
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

  // 从阻塞队列中获取任务
  private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            // 获取线程池状态
            int rs = runStateOf(c);

            // 1.线程池状态是STOP,TIDYING,TERMINATED
       	    // 2.线程池shutdown并且队列是空的.
            // 满足以上两个条件之一则工作线程数wc减去1,然后直接返回null
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // allowCoreThreadTimeOut 是否允许核心工作线程超时销毁,默认是false,可以设置为true
            // 工作线程数大于核心线程数
            // 满足两个条件之一,timed为true
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
						
            // 1.(工作线程数 > maximumPoolSize) || (timed == true && timedOut == true)
            // 2.工作线程数 > 1或者队列为空
            // 一般情况下,在工作线程数 > maximumPoolSize 并且任务队列为空的情况下会触发这个条件
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
              	// 如果timed为true,通过poll()方法做超时拉取,keepAliveTime时间内没有等待到有效的任务,则返回null
            	// 如果timed为false,通过take()做阻塞拉取,会阻塞到有下一个有效的任务时候再返回(一般不会是null)	
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
              	// 取不到任务的时候timedOut = true
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

очередь задач

getTask() имеет шаг получения задач из очереди задач.Разные очереди задач имеют разные политики доступа к задачам.Ниже представлены необязательные очереди задач:

название описывать
ArrayBlockingQueue Ограниченная очередь блокировки, реализованная с помощью массива, который сортирует элементы в соответствии с принципом «первым пришел – первым обслужен» (FIFO) и использует повторные блокировки для управления параллелизмом.
LinkedBlockingQueue Ограниченная очередь, состоящая из структуры связанного списка, которая сортирует элементы в порядке поступления (FIFO).
PriorityBlockingQueue Неограниченная очередь, поддерживающая приоритезацию потоков
DelayQueue Неограниченная очередь, реализующая PriorityBlockingQueue для реализации отложенного получения.При создании элемента можно указать, сколько времени потребуется для получения текущего элемента из очереди.
SynchronousQueue Блокирующая очередь, которая не хранит элементы, каждая операция размещения должна ждать операции взятия. Поддерживаются как честные, так и несправедливые блокировки. Executors.newCachedThreadPool() использует SynchronousQueue
LinkedTransferQueue Неограниченная блокирующая очередь, состоящая из структуры связанного списка, эквивалентна другим очередям.Очередь LinkedTransferQueue имеет больше методов передачи и tryTransfer.
LinkedBlockingDeque Двунаправленная очередь блокировки, состоящая из структуры связанного списка

переработка пула потоков

Основная функция getTask() - получить контроль над количеством задач и потоков. Если пул потоков не должен содержать столько потоков, он вернет null. Из кода видно, что есть два места, где нулевые значения возвращаются.

Первое место:

  • Состояние пула потоков: STOP, TIDYING, TERMINATED.
  • Пул потоков закрыт, а очередь пуста.

Второе место

  • Когда количество рабочих потоков > maxPoolSize и очередь задач пуста
  • Количество рабочих потоков > corePoolSize и очередь задач пуста
// completedAbruptly:true表示用户是异常退出
private void processWorkerExit(Worker w, boolean completedAbruptly) {
  	// 如果工作线程是异常退出,那么工作线程数减1
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 统计已完成的任务数
            completedTaskCount += w.completedTasks;
            // 将工作线程数移除
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
	// 尝试中断空闲线程
        tryTerminate();

        int c = ctl.get();
  	// 如果当前线程池状态处于RUNNING、SHUTDOWN状态
        if (runStateLessThan(c, STOP)) {
          	// 工作线程非异常
            if (!completedAbruptly) {
              	// allowCoreThreadTimeOut这个值表示是否允许核心工作线程超时销毁
              	// 如果允许,那么核心工作线程数最小为0
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
              	// 如果最小保留的核心线程数为0并且任务队列不为空,表示至少还需要一个线程将任务完成
                if (min == 0 && ! workQueue.isEmpty())
                    // 最小线程数改为1
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            // 如果当前运行的Worker数比当前需要的Worker数少的话,会调用addWorker,添加新的Worker
            addWorker(null, false);
        }
    }

политика отклонения пула потоков

политика отказа Как отказаться
AbortPolicy Непосредственно вызывает исключение времени выполнения
DiscardPolicy Тихо отбрасывать отправленные задачи, ничего не делать и не создавать исключений
DiscardOldestPolicy Удалите самую длинную задачу в очереди блокировки (элемент head) и оставьте свободное место в очереди для текущей отправленной задачи, чтобы поместить ее в очередь.
CallerRunsPolicy Отправленная задача запускается непосредственно тем потоком, который отправил задачу.
    // 直接由提交任务的线程来运行这个提交的任务
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }			



    // 直接抛出异常	
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

    /**
     * A handler for rejected tasks that silently discards the
     * rejected task.
     */
    // 默默地丢弃掉被拒绝的任务
    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }

        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
      	// 这个方法什么都不做
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

    /**
     * A handler for rejected tasks that discards the oldest unhandled
     * request and then retries {@code execute}, unless the executor
     * is shut down, in which case the task is discarded.
     */
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }

        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
      	// 将队头元素删除掉,移除掉最老的任务
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            // 判断线程池是否已关闭
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

Три способа отправки задач в абстрактный класс AbstractExecutorService

Есть три способа отправки, независимо от того, каким способом, последний — преобразовать переданную задачу в вызываемый объект для обработки, а затем вызвать

Метод execute, объявленный в интерфейсе Executor, обрабатывается единообразно

	// 这三种提交任务的方式都是类似的
	public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
        }

		
	public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
        }
		

	public FutureTask(Runnable runnable, V result) {
      	// Runnable对象转换成了Callable对象
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
        }


	static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
          	// 注意这里返回的result是null,层层传递进来的
            return result;
        }
    }

контрольная работа

    public static void main(String[] args) {

        ExecutorService executors = new ThreadPoolExecutor(2,
                5,
                10, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(4));

        IntStream.range(0, 10).forEach(i -> {
            executors.submit(() -> {
                IntStream.range(0, 50).forEach(j -> System.out.println(Thread.currentThread().getName()));
            });
        });


        executors.shutdown();
    }

использованная литература

blog.CSDN.net/Не будь 007/Ах…

Специальности.Meituan.com/2020/04/02/…