1. Введение
Прежде чем представить пул потоков, давайте разберемся в нескольких вещах:
- Создание и уничтожение потоков требует затрат, таких как время создания потока и связанные с ним вычислительные ресурсы. Если на веб-сервере создается поток для каждого входящего запроса, большинство запросов обрабатываются легко. Тогда стоимость создания потока очень велика по сравнению со стоимостью обработки запроса, что влияет на общую производительность.
- Когда количество потоков может сделать ЦП занятым и зеленым, а затем снова создать потоки, потоки в основном находятся в состоянии простоя.Помимо того, что они занимают память, дополнительные потоки могут также конкурировать с другими потоками за ресурсы ЦП.Накладные расходы на производительность.
- Существует ограничение на количество потоков, которые могут быть созданы, при превышении этого ограничения может возникнуть ошибка
OutOfMemoryError
аномальный.
В настоящее время, если есть вещь, которая может управлять жизненным циклом потоков, повторно использовать существующие потоки и простым способом отделять отправку и выполнение задач. Да, это всеПул потоков(пул потоков), чтобы понять пул потоков в Java, вы должны сначала понятьThreadPoolExecutor
этот класс.
2. Детали ThreadPoolExecutor
схема наследования классов
Конструктор
/线程池配置信息,volatile修饰保证变量在多线程下的可见性
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private volatile long keepAliveTime;
private final BlockingQueue<Runnable> workQueue;
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
из JDK вышеThreadPoolExecutor
Из исходного кода конструктора класса видно, что всего конструктор имеет 7 параметров, значение которых описано ниже:
параметр | значение |
---|---|
corePoolSize |
Базовый размер, который представляет собой количество основных потоков в пуле потоков. |
maximumPoolSize |
Максимальный размер, который представляет собой максимальное количество потоков, разрешенных в пуле потоков. |
keepAliveTime |
Время выживания, когда поток не выполняет задачи, время простоя превышает это время и он будет помечен как пригодный для повторного использования, пока размер пула потоков не превысит базовый размер, помеченный поток будет завершен |
unit |
keepAliveTime единицы, естьDAYS ,HOURS ,MINUTES ,SECONDS ,MILLISECONDS ,MICROSECONDS ,NANOSECONDS 7 единиц в наличии |
workQueue |
Рабочая очередь, очередь блокировки, используемая для хранения задач, ожидающих выполнения. |
threadFactory |
фабрика ниток. Пул потоков создает поток, вызывая фабрику потоков.Thread newThread(Runnable r) создать нить |
handler |
Стратегия насыщения. Когда очередь блокировки заполнена, текущее количество потоков в пуле потоков достигло максимального значения и ни один поток не находится в состоянии простоя, политика насыщения будет реализована для отправленных задач в это время. (Если задача отправляется закрытому Исполнителю, стратегия насыщения также будет выполнена) |
ThreadPoolExecutor
В классе четыре перегруженных конструктора.Каждый конструктор должен указать первые 5 параметров из таблицы выше.Последние два параметра можно указать по желанию.Если не указать, конструктор будет использовать значение по умолчанию.фабрика нитейистратегия насыщения:
Фабрика нитей (ThreadFactory)
Все потоки создания пула потоков передаютсяThreadFactory
изThread newThread(Runnable r)
метод создания. НижеExecutors
Исходный код метода фабрики потоков по умолчанию в классе.
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
Из приведенного выше видно, что фабрика потоков по умолчанию создаетне опекун, приоритетThread.NORM_PRIORITY
разгром. Если вы хотите настроить собственную фабрику потоков в соответствии с вашими потребностями, просто реализуйтеThreadFactory
интерфейсThread newThread(Runnable r)
метод.
Политика насыщения (RejectedExecutionHandler)
в JDKThreadPoolExecutor
класс предлагает 4 различныхRejectedExecutionHandler
выполнить:
-
AbortPolicy
Политика насыщения по умолчанию, стратегия выдает непроверенное (исключение во время выполнения)RejectedExecutionException
. -
DiscardPolicy
Ничего не делайте, просто бросьте задачу -
CallerRunsPolicy
Выполнение задачи в вызывающем потоке -
DiscardOldestPolicy
Отменить первую задачу в очереди блокировки, а затем повторно передать задачу в пул потоков для выполнения.
Точно так же это может быть достигнуто путемRejectedExecutionHandler
Пользовательская стратегия насыщения интерфейса.
Состояние пула потоков и количество потоков
/代表线程池当前状态和线程数量的原子变量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; /COUNT_BITS为29
private static final int CAPACITY = (1 << COUNT_BITS) - 1; /CAPACITY为能表示的最大线程数。
/线程池状态
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
/对线程池状态和线程数量进行打包和拆包的函数:
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
/判断线程池状态的三个函数
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/线程数量增1,成功返回true,失败返回false
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/线程数量减1,成功返回true,失败返回false
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/线程数量减1,失败则重试直到成功
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
AtomicInteger
переменная типаctl
Старшие 3 бита используются для представления текущего состояния пула потоков, а младшие 29 бит используются для представления текущего количества потоков.
Пул потоков Java имеет 5 различных состояний, а именно работает (RUNNING
),закрытие(SHUTDOWN
),останавливаться(STOP
),аккуратный(TIDYING
),Заканчивать(TERMINATED
).
существуетThreadPoolExecutor
Он представлен 5 целочисленными константами, каждая из которых представлена старшими 3 битами:
-
RUNNING
Старшие 3 бита111, пул потоков в этом состоянии будет получать новые задачи и обрабатывать задачи в очереди блокировки -
SHUTDOWN
Старшие 3 бита000, пул потоков в этом состоянии не будет получать новые задачи, а будет обрабатывать задачи в очереди блокировки. перечислитьvoid shutdown()
реализация метода -
STOP
Старшие 3 бита001, потоки в этом состоянии не получают новых задач, не обрабатывают задачи в очереди блокировки и прерывают выполнение задач. перечислитьList<Runnable> shutdownNow()
выполнить. -
TIDYING
Старшие 3 бита010, когда задача, блокирующая очередь, завершена или пул потоков остановлен после закрытия пула потоков, тоworkerCount
(текущее количество потоков) равно 0, и пул потоков будет вызываться после входа в это состояние.terminated()
способ входаTERMINATED
государство. -
TERMINATED
Старшие 3 бита011
запустить пул потоков
При созданииThreadPoolExecutor
После объекта в пуле потоков нет потоков. обычно звонятvoid execute(Runnable command)
Поток создается и запускается при выполнении задачи, но основной поток можно создать и запустить заранее, вызвав следующий метод (запускается в методе addWorker):
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}
Процесс реализации
Как показано выше, при вызовеvoid execute(Runnable command)
Когда этот метод выполняет задачу:
- Определить, меньше ли количество потоков в текущем пуле потоков, чем размер основного пула потоков, если да, создать поток и запустить его, в противном случае перейти к шагу 2
- Определите, заполнена ли очередь задач. Если она не заполнена, добавьте задачу в очередь блокировки. Если она заполнена, перейдите к шагу 3.
- Определить, меньше ли количество потоков в текущем пуле потоков, чем максимальный размер пула потоков, если да, создать поток и запустить его, в противном случае выполнить политику насыщения
public void execute(Runnable command) {
/任务为空,抛出空指针异常
if (command == null)
throw new NullPointerException();
int c = ctl.get();
/判断当前线程数量是否小于核心线程数
if (workerCountOf(c) < corePoolSize) {
/是则添加一个核心线程(true表示核心线程)到线程池,并且启动线程执行任务(addWorker方法里会启动)
if (addWorker(command, true))
return; /添加成功则返回
c = ctl.get();
}
/线程池处于运行状态则向阻塞队列添加该任务
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
/判断线程池是否处于运行状态,不是就移除刚才添加的任务
if (! isRunning(recheck) && remove(command))
/移除成功就执行饱和策略,这样整个方法就结束了
reject(command);
/否则若处于运行状态或移除失败,这时无论处于哪种情况任务都在阻塞队列里,判断当前线程数量是否为0
else if (workerCountOf(recheck) == 0)
若是则添加一个线程并启动
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
метод addWorker
boolean addWorker(Runnable firstTask, boolean core)
Цель метода состоит в том, чтобы создатьWorker
объект и запустить поток в этом объекте (Worker
внутри одногоThread
поле типа).
private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet<Worker>();
private int largestPoolSize;
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/如果线程池不处于运行状态,理论上不应该添加一个执行该任务的线程,但如果满足下面三个条件的话就可以通过:
1. 线程池状态是关闭
2. 要执行的任务为空
3. 阻塞队列不为空
因为线程池关闭后不允许提交任务,但关闭后会执行完阻塞队列的任务,所以允许添加一个firstTask为空的线程
来帮助执行完阻塞队列里的任务
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
/若当前线程池的线程数量达到了线程池所允许的最大线程数或所指定要添加线程类型的线程数量则返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
/到这里前面的限制条件都通过,现在尝试将线程数量增一,成功则退出最外层的循环
if (compareAndIncrementWorkerCount(c))
break retry;
/失败则重新获取线程池状态,状态改变则从最外层循环开始执行,不变则从内循环开始执行
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
/构造一个Worker对象,每个Worker对象绑定一个线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
/若线程池处于运行状态或处于关闭且firstTask为null
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
线程提前启动,则抛出异常
if (t.isAlive())
throw new IllegalThreadStateException();
/将w加到Worker的集合里
workers.add(w);
获取Worker集合大小,若大小比largestPoolSize大小大,则更新一下
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
/添加成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
/若添加成功则启动线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
/若启动失败(t线程为空或添加过程中抛出异常)则执行addWorkerFailed方法
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Рабочий класс
Потоки, поддерживаемые пулом потоков, на самом деле представляют собой набор объектов Worker, который инкапсулирует потоки и наследует их.AbstractQueuedSynchronizer
класс и реализуетRunnable
интерфейс, переписанvoid run()
метод. А зачем наследоватьAbstractQueuedSynchronizer
класс, см. нижеrunWorker
Объяснение метода.
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
private static final long serialVersionUID = 6138294804551838833L;
final Thread thread;
Runnable firstTask;
/绑定这个对象线程已执行完成的任务数
volatile long completedTasks;
Worker(Runnable firstTask) {
/阻止中断,在任务获取前不允许中断
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/线程启动时执行的方法
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
/获取锁,不可重入
public void lock() { acquire(1); }
/尝试获取锁
public boolean tryLock() { return tryAcquire(1); }
/释放锁
public void unlock() { release(1); }
/判断锁是否被独占
public boolean isLocked() { return isHeldExclusively(); }
/中断已开始执行的线程,这个就是为什么要设置setState(-1)的一个原因了,这个方法会被`shutdownNow()`方法调用。
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
метод runWorker
выше, почемуWorker
класс для наследованияAbstractQueuedSynchronizer
, на самом деле состояние блокировки используется для того, чтобы отличить бездействующие потоки от небездействующих потоков.runWorker
В методе:
- Нет блокировки при получении задачи (состояние простоя, поток может быть прерван)
- Блокировать только тогда, когда задача должна быть выполнена (поток не может быть прерван)
вызовvoid tryTerminate()
иvoid shutdown()
При использовании этих двух методов бездействующие потоки прерываются, поэтому потоки, не выполняющие задачи, могут быть прерваны.
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); /允许中断,与Worker构造函数的setState(-1)是一对的
boolean completedAbruptly = true;
try {
/获取到任务才进入循环
while (task != null || (task = getTask()) != null) {
/加锁,表示非空闲状态
w.lock();
/1. 如果线程池状态大于等于STOP并且本线程未中断,则应该执行中断方法
2. 或者执行Thread.interrupted()方法判断本线程是否中断并且清除中断状态,
如果发现线程池状态大于等于STOP则执行中断方法。
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
/ThreadPoolExecutor中的beforeExecute(wt, task)方法一个空方法,用来留给继承ThreadPoolExecutor的类
来重写该方法并在任务执行前执行
beforeExecute(wt, task);
Throwable thrown = null;
try {
/执行获取到的任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
/ThreadPoolExecutor中的afterExecute(task,thrown)方法也是一个空方法,用来留给继承
ThreadPoolExecutor的类来重写该方法并在任务执行后执行
afterExecute(task, thrown);
}
} finally {
task = null;
/该线程执行的任务加1,即使抛出异常
w.completedTasks++;
/释放锁,表示回到空闲状态
w.unlock();
}
}
/执行到这一步表示是由于获取不到任务而正常退出的,所以completedAbruptly为false
completedAbruptly = false;
} finally {
/无论怎样退出都要执行
processWorkerExit(w, completedAbruptly);
}
}
метод getTask
private Runnable getTask() {
/表示获取任务是否已超时
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/1. 若线程池状态大于等于停止状态,此时线程池不再处理队列的任务,并且会回收所有线程(不管空不空闲),
所以此时应该把线程池线程数量减1,并且获取的任务为空
/2. 处于关闭状态且任务队列为空,表示任务队列为空且不会有任务提交,所以线程数减1,并且获取的任务为空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
/是否启用超时机制。当允许核心线程超时或当前线程数超过核心线程则启用
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/如果线程数量超过线程池所允许的最大线程数或者启用超时机制情况下获取任务超时,理论上应该回收线程。
但是如果该线程是线程池中的最后一个线程且任务队列不为空就可以不回收,继续运行,要是还有其他线程或者任务队列为空则回收该线程。
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
/尝试将线程数量减1,成功返回null,失败继续从循环开始处开始。这里为什么不是用decrementWorkerCount()
这种不会失败的方法减1而采用这种方式。是因为 wc > 1,如果线程池不只有一个线程它们互相发现不只一个线程,
且它们同时执行不会失败的将线程数量减一的方法,到时线程池线程数量可能就为0了,哪么队列中的任务就没线程执行了。
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/1. 如果启用超时机制就执行poll()方法,在keepAliveTime纳秒内还没获取就返回null。
2. 如果未启用超时机制就执行take()方法,队列没任务就一直阻塞直到有任务。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
/到这里就是因为超时获取不到任务
timedOut = true;
} catch (InterruptedException retry) {
/在执行take()过程中被中断并不算超时
timedOut = false;
}
}
}
метод processWorkerExit
private void processWorkerExit(Worker w, boolean completedAbruptly) {
/由于不是获取不到任务而正常退出的,得在这里将线程数减1,正常退出的在getTask()方法有这个减1操作
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
/加锁,因为HashSet和completedTaskCount不是线程安全的
mainLock.lock();
try {
/将线程执行的任务数统一加到线程池维护的completedTaskCount字段
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
/尝试将线程池设置为结束状态
tryTerminate();
int c = ctl.get();
/满足当前线程池状态小于STOP(运行或关闭状态)才继续
if (runStateLessThan(c, STOP)) {
若线程是异常退出runWorker方法就直接添加一个没有带初始任务的非核心线程
if (!completedAbruptly) {
/这三行代码找出当前线程池所至少存在的线程数
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
/如果当前线程数已经大于等于min,就直接返回,否则添加一个没有带初始任务的非核心线程
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
На следующем рисунке показан нормальный процесс выполнения пула потоков после отправки задач в пул потоков:
метод tryTerminate
terminate
(Конец) — это последнее состояние пула потоков, и его можно перевести только из закрытого или остановленного состояния в конечное состояние.
final void tryTerminate() {
for (;;) {
int c = ctl.get();
/如果满足下面任意一个条件就没办法到达结束状态
1. 线程池处于运行状态
2. 线程池状态是TIDYING或已经是结束状态
3. 线程池处于关闭状态且任务队列不为空
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
/当前线程数量不为0也无法到达结束状态
if (workerCountOf(c) != 0) {
/中断一个空闲线程
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
/尝试将线程池状态设置为TIDYING,失败重循环开始处开始
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
/terminated()是一个空方法,留给继承ThreadPoolExecutor的类覆盖
terminated();
} finally {
/尝试将线程池状态设置为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}
закрыть операцию
Мы можем позвонитьvoid shutdown()
Метод закрывает пул потоков.После закрытия пула потоков ему не разрешается принимать новые задачи.
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
/ 安全策略判断
checkShutdownAccess();
/设置线程池状态为SHUTDOWN状态
advanceRunState(SHUTDOWN);
/中断所有空闲线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
/尝试结束线程池
tryTerminate();
}
остановить операцию
Мы можем запустить и завершить работу, позвонивvoid shutdownNow()
Метод останавливает пул потоков, после остановки пул потоков не может принимать новые задачи, не будет выполнять задачи в блокирующей очереди, а также прервет все текущие потоки.
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
/ 安全策略判断
checkShutdownAccess();
/设置线程池状态为STOP状态
advanceRunState(STOP);
/中断所有线程,不管是空闲还是非空闲
interruptWorkers();
/取出阻塞队列的所有任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
/尝试结束线程池
tryTerminate();
return tasks;
}
3. Конфигурация пула потоков
Executors
Предоставляются четыре статических фабричных метода для создания пулов потоков с четырьмя различными конфигурациями:
-
newFixedThreadPool(int nThreads)
Примите переменную nThreads типа int, создайтеколичество основных потоковимаксимальное количество потоковкак для
nThreads
Пул потоков (то есть максимальное количество потоков равно nThreads) и используется неограниченная блокирующая очередь.LinkedBlockingQueue
. Если тайм-аут основного потока не установлен, время ожидания созданного потока не истечет.
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
-
newSingleThreadExecutor()
Создаватьколичество основных потоковимаксимальное количество потоковПул потоков равен 1 (то есть максимальное количество потоков равно 1) и используется неограниченная блокирующая очередь.
LinkedBlockingQueue
, если тайм-аут основного потока не установлен, созданный поток не будет истечен. Единственный поток может гарантировать последовательное выполнение задач.Если выполнение единственного потока завершается из-за исключения,processWorkerExit
Метод, наконец, определит, следует ли закончить с исключением и создать новый поток для продолжения работы.
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
-
newCachedThreadPool()
Создаватьколичество основных потоков0,максимальное количество потоковза
Integer.MAX_VALUE
Пул потоков имеет тайм-аут 60 секунд, поэтому поток будет перезапущен, если он простаивает более 60 секунд. Синхронная очередь используется как блокирующая очередь, синхронная очередь не хранит элементы, и вставка выполняется на одном конце, а вставка на другом конце должна быть успешно вставлена, иначе операция вставки будет заблокирована и будет ждать.
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
-
newScheduledThreadPool()
Создайте основной поток с несколькими
corePoolSize
Пул потоков используется для периодического выполнения задач в течение заданного времени.ScheduledThreadPoolExecutor
унаследовано отThreadPoolExecutor
.
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}