Зачем использовать пулы потоков
- Созданные потоки повторно используются с помощью технологии пула, чтобы избежать потерь, вызванных частым созданием и уничтожением потоков, снизить потребление ресурсов и повысить скорость отклика.
- Когда сервер запускает большое количество задач, создание большого количества потоков будет потреблять пространство памяти сервера и влиять на использование сервера.Пул потоков может играть роль управления потоками.
- Пул потоков является расширяемым, что позволяет разработчикам добавлять к нему дополнительные функции.
Общий дизайн пула потоков и анализ исходного кода
Давайте взглянем на макрос работающего механизма пула потоков.
Как видно из работающего механизма пула потоков, внутри пула потоков строится модель производитель-потребитель, которая развязывает поток и задачу и не имеет прямого отношения. будет временно храниться в очереди задач, таким образом хорошо буферизуя задачи
состояние работы пула потоков
Сам пул потоков имеет состояние, и следующие пять состояний пула потоков
Рабочий статус | описание статуса |
---|---|
RUNNING | Пул потоков может получать новые отправки задач, а также может нормально обрабатывать задачи в очереди блокировки. |
SHUTDOWN | Пул потоков больше не принимает новые отправки задач и может продолжать обрабатывать задачи в очереди блокировки. |
STOP | Новые задачи не принимаются, а существующие задачи в очереди блокировки отбрасываются, кроме того, это прерывает выполнение задач. |
TIDYING | После того, как все задачи выполнены (включая задачи в очереди блокировки), количество активных потоков в текущем пуле потоков уменьшается до 0, и вызывается завершенный метод. |
TERMINATED | Состояние завершения пула потоков. Когда завершаемый метод выполняется, пул потоков переходит в это состояние. |
Поток состояния пула потоков
переход состояния | метод перехода состояния |
---|---|
RUNNING -> SHUTDOWN | При вызове метода завершения работы пула потоков или при неявном вызове метода финализации (метод завершения работы будет вызываться внутри этого метода) |
РАБОТА, ВЫКЛЮЧЕНИЕ -> СТОП | Когда вызывается метод shutdownNow пула потоков |
SHUTDOWN -> TIDYING | Когда и пул потоков, и очередь блокировки становятся пустыми |
STOP -> TIDYING | Когда пул потоков становится пустым |
TIDYING->TERMINATED | Когда завершенный метод выполняется |
Так как же пул потоков управляет своим собственным состоянием выполнения и количеством потоков в пуле потоков?
// ctl:高三位表示线程池运行状态,低29位表示线程池线程运行数量
// 一个变量存储两个值的好处是不必费心思(比如加锁)去维护两个状态的一致性
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 获取线程池当前的运行状态(~:按位取反,即0变成1,1变成0。)
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取线程池当前运行的线程数量
private static int workerCountOf(int c) { return c & CAPACITY; }
// 通过线程池状态和运行的线程数量获取ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
Далее анализ исходного кода
Давайте взглянем на макрос отношения наследования каждого класса в пуле потоков.
Основным классом реализации пула потоков в Java является ThreadPoolExecutor.
Executor: Он предоставляет только интерфейс для выполнения задач. Пользователям не нужно обращать внимание на то, как создавать потоки и как планировать потоки. Им нужно только предоставить объект Runnable.
ExecutorService: на основе выполнения задач добавляются новые интерфейсы, такие как отправка задач, а также управление и контроль жизненного цикла пула потоков.
AbstractExecutorService: абстрактный класс, который последовательно соединяет процесс выполнения задач, гарантируя, что реализация нижнего уровня должна сосредоточиться только на одном методе выполнения задач.
ThreadPoolExecutor: с одной стороны, он поддерживает свой собственный жизненный цикл, а с другой стороны, он одновременно управляет потоками и задачами, так что их можно хорошо комбинировать для выполнения параллельных задач.
Задачи выполнения пула потоков
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
// 如果线程池中的线程数量小于coolPoolSize
if (workerCountOf(c) < corePoolSize) {
// 添加一个线程,并且把提交过来的线程当成firsttask
if (addWorker(command, true))
return;
// 因为线程池的状态和运行的线程数量可能随时都会改变,所以要对线程池时刻进行检查
c = ctl.get();
}
// 进入这个判断是因为上面的判断不符合条件,要么是corePoolSize已达上限,要么是添加线程失败
// 那么就要进行入队操作,入队操作之前要先判断线程池的状态
if (isRunning(c) && workQueue.offer(command)) {
// 再次获取ctl值,时时刻刻做判断
int recheck = ctl.get();
// 如果线程池不是在运行状态,那么就会执行后面的remove操作,相当于一次回滚,把这次执行的线程
// remove掉
if (! isRunning(recheck) && remove(command))
// 执行拒绝策略
reject(command);
// 走到这里说明线程池处于执行状态
else if (workerCountOf(recheck) == 0)
// 工作线程为0,创建一个非核心线程,防止存在有任务但是没有线程执行的情况
addWorker(null, false);
}
// 创建新的线程失败了,直接拒绝
else if (!addWorker(command, false))
reject(command);
}
Задача добавления пула потоков
// 入参core为true的话,会拿corePoolSize作为临界条件,false的话会拿maximumPoolSize作为临界条件
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 获取线程池运行的状态
int rs = runStateOf(c);
// 先做一个到底应不应该创建线程的判断
// 这个判断条件可以简化为 (rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null ||
// workQueue.isEmpty()))
// 分为三种情况:1. rs > SHUTDOWN (线程池状态处于STOP、TIDYING、TERMINATED,添加工作线程失败)
// 2.rs >= SHUTDOWN && firstTask != null
// 3.rs >= SHUTDOWN && workQueue.isEmpty()
// 在这三种情况下,都不会再创建新的线程
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 线程池中正在运行的线程的数量
int wc = workerCountOf(c);
// 判断线程是否已达上限
// 如果添加corePoolSize中的线程,判断是否超过corePoolSize的上限
// 如果添加maximumPoolSize中的线程,判断是否超过maximumPoolSize的上限
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 新增线程数,如果成功,则跳出循环
if (compareAndIncrementWorkerCount(c))
break retry;
// 再次获取c
c = ctl.get(); // Re-read ctl
// 线程池的状态是否等于最开始的状态
if (runStateOf(c) != rs)
// 不等于表示线程池有改变,需要重新执行一遍之前的操作
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建了一个worker对象
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// ReentrantLock独占锁
mainLock.lock();
try {
// 再次获取线程池状态
int rs = runStateOf(ctl.get());
// 先做一个是否可以运行线程的判断
// 1.线程池状态处于运行状态
// 2.线程池状态处于SHUTDOWN状态但task==null,因为SHUTDOWN状态不接受新的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 将worker添加到一个hashset中
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
// 用于记录出现过的最大线程数。
largestPoolSize = s;
// 做一个标志,表示worker线程添加到了hashset当中
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 执行线程
t.start();
// 工作线程已启动的标志
workerStarted = true;
}
}
} finally {
// 线程启动失败,做一些回滚的操作
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Рабочий класс
РабочийThreadPoolExecutor
Он реализует интерфейс Runnable и наследует AQS.Реализация интерфейса Runnable означает, что Worker является потоком, а наследование AQS заключается в реализации функции монопольной блокировки.
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
// 执行任务的线程
final Thread thread;
/** Initial task to run. Possibly null. */
// 执行的任务
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
// 新建线程的时候,设置state -1 是为了防止线程还未执行时(线程只有在执行的时候才会被中断),就被 // 其它线程显式调用shutdown方法中断了,因为中断是要判断state大于等于0才会中断
setState(-1);
this.firstTask = firstTask;
// 新建了一个线程
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
Способ создания нового потока в классе Worker имеет реализацию по умолчанию.
static class DefaultThreadFactory implements ThreadFactory {
// 系统中,可能不止一个线程池,所以这个是个静态字段
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
// 每个线程归属于特定的线程池,所以这个字段非静态
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
// 线程名字前缀
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
// 对于线程池的线程来说,都是用户线程
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
// 线程优先级都是一样的
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
final void runWorker(Worker w) {
// 获取当前线程
Thread wt = Thread.currentThread();
// 获取任务
Runnable task = w.firstTask;
// 显式将任务置为空,防止产生错乱的问题,下一次拿到重复的
w.firstTask = null;
// 将线程state置为0(创建Worker的时候state为-1),运行线程时允许线程中断
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 循环判断任务(firstTask或从队列中获取的task)是否为空
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 判断线程池的状态,是否处于stop状态,或者线程是否被中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 回调,可以适当做扩展
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
// 线程执行完成个数,起到一个统计的作用
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
// 从阻塞队列中获取任务
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
// 获取线程池状态
int rs = runStateOf(c);
// 1.线程池状态是STOP,TIDYING,TERMINATED
// 2.线程池shutdown并且队列是空的.
// 满足以上两个条件之一则工作线程数wc减去1,然后直接返回null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// allowCoreThreadTimeOut 是否允许核心工作线程超时销毁,默认是false,可以设置为true
// 工作线程数大于核心线程数
// 满足两个条件之一,timed为true
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 1.(工作线程数 > maximumPoolSize) || (timed == true && timedOut == true)
// 2.工作线程数 > 1或者队列为空
// 一般情况下,在工作线程数 > maximumPoolSize 并且任务队列为空的情况下会触发这个条件
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 如果timed为true,通过poll()方法做超时拉取,keepAliveTime时间内没有等待到有效的任务,则返回null
// 如果timed为false,通过take()做阻塞拉取,会阻塞到有下一个有效的任务时候再返回(一般不会是null)
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 取不到任务的时候timedOut = true
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
очередь задач
getTask() имеет шаг получения задач из очереди задач.Разные очереди задач имеют разные политики доступа к задачам.Ниже представлены необязательные очереди задач:
название | описывать |
---|---|
ArrayBlockingQueue | Ограниченная очередь блокировки, реализованная с помощью массива, который сортирует элементы в соответствии с принципом «первым пришел – первым обслужен» (FIFO) и использует повторные блокировки для управления параллелизмом. |
LinkedBlockingQueue | Ограниченная очередь, состоящая из структуры связанного списка, которая сортирует элементы в порядке поступления (FIFO). |
PriorityBlockingQueue | Неограниченная очередь, поддерживающая приоритезацию потоков |
DelayQueue | Неограниченная очередь, реализующая PriorityBlockingQueue для реализации отложенного получения.При создании элемента можно указать, сколько времени потребуется для получения текущего элемента из очереди. |
SynchronousQueue | Блокирующая очередь, которая не хранит элементы, каждая операция размещения должна ждать операции взятия. Поддерживаются как честные, так и несправедливые блокировки. Executors.newCachedThreadPool() использует SynchronousQueue |
LinkedTransferQueue | Неограниченная блокирующая очередь, состоящая из структуры связанного списка, эквивалентна другим очередям.Очередь LinkedTransferQueue имеет больше методов передачи и tryTransfer. |
LinkedBlockingDeque | Двунаправленная очередь блокировки, состоящая из структуры связанного списка |
переработка пула потоков
Основная функция getTask() - получить контроль над количеством задач и потоков. Если пул потоков не должен содержать столько потоков, он вернет null. Из кода видно, что есть два места, где нулевые значения возвращаются.
Первое место:
- Состояние пула потоков: STOP, TIDYING, TERMINATED.
- Пул потоков закрыт, а очередь пуста.
Второе место
- Когда количество рабочих потоков > maxPoolSize и очередь задач пуста
- Количество рабочих потоков > corePoolSize и очередь задач пуста
// completedAbruptly:true表示用户是异常退出
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果工作线程是异常退出,那么工作线程数减1
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 统计已完成的任务数
completedTaskCount += w.completedTasks;
// 将工作线程数移除
workers.remove(w);
} finally {
mainLock.unlock();
}
// 尝试中断空闲线程
tryTerminate();
int c = ctl.get();
// 如果当前线程池状态处于RUNNING、SHUTDOWN状态
if (runStateLessThan(c, STOP)) {
// 工作线程非异常
if (!completedAbruptly) {
// allowCoreThreadTimeOut这个值表示是否允许核心工作线程超时销毁
// 如果允许,那么核心工作线程数最小为0
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果最小保留的核心线程数为0并且任务队列不为空,表示至少还需要一个线程将任务完成
if (min == 0 && ! workQueue.isEmpty())
// 最小线程数改为1
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 如果当前运行的Worker数比当前需要的Worker数少的话,会调用addWorker,添加新的Worker
addWorker(null, false);
}
}
политика отклонения пула потоков
политика отказа | Как отказаться |
---|---|
AbortPolicy | Непосредственно вызывает исключение времени выполнения |
DiscardPolicy | Тихо отбрасывать отправленные задачи, ничего не делать и не создавать исключений |
DiscardOldestPolicy | Удалите самую длинную задачу в очереди блокировки (элемент head) и оставьте свободное место в очереди для текущей отправленной задачи, чтобы поместить ее в очередь. |
CallerRunsPolicy | Отправленная задача запускается непосредственно тем потоком, который отправил задачу. |
// 直接由提交任务的线程来运行这个提交的任务
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
// 直接抛出异常
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
// 默默地丢弃掉被拒绝的任务
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
// 这个方法什么都不做
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
// 将队头元素删除掉,移除掉最老的任务
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 判断线程池是否已关闭
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
Три способа отправки задач в абстрактный класс AbstractExecutorService
Есть три способа отправки, независимо от того, каким способом, последний — преобразовать переданную задачу в вызываемый объект для обработки, а затем вызвать
Метод execute, объявленный в интерфейсе Executor, обрабатывается единообразно
// 这三种提交任务的方式都是类似的
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public FutureTask(Runnable runnable, V result) {
// Runnable对象转换成了Callable对象
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
// 注意这里返回的result是null,层层传递进来的
return result;
}
}
контрольная работа
public static void main(String[] args) {
ExecutorService executors = new ThreadPoolExecutor(2,
5,
10, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(4));
IntStream.range(0, 10).forEach(i -> {
executors.submit(() -> {
IntStream.range(0, 50).forEach(j -> System.out.println(Thread.currentThread().getName()));
});
});
executors.shutdown();
}