В первую очередь старые правила, рекомендую свою группу обмена пингвинами:
Учащиеся, заинтересованные в общении со springboot для быстрой разработки, могут добавить следующую группу пингвинов.
нормальная нить
- 1. Реализация: наследовать поток или реализовать интерфейс Runnable.
- 1. Наследовать поток, только одиночное наследование
- 2. Реализуйте интерфейс Runnable (который может реализовать внутреннее совместное использование ресурсов), и интерфейс может быть реализован более
- 3. Классическая проблема: продажа билетов в витрине
- 2. Создайте экземпляр объекта
- 3. Выполняйте задания
- 4. Уничтожить поток для повторного использования ресурсов
считать:
Что нам делать, когда необходимо обработать несколько ресурсов, открыв потоки? Вы повторяли следующий процесс:
create -> run -> destroy
Мы знаем, что каждая операция компьютера требует большого потребления ресурсов, работа 5 потоков может не иметь никакого эффекта, а 5w? 50 000 сотворений и разрушений всего за 50 000 казней? Выполнение задач может затрачивать значительное количество времени на обработку этих созданий и уничтожений.
Пул потоков
Функции
- 1. Решить проблему выполнения нескольких потоков в процессорном блоке
- 2. Сократить время простоя процессорного блока
- 3. Увеличение пропускной способности процессорного блока во время работы (почему вы так говорите? Мы уменьшаем отходы создания и уничтожения каждого потока для нескольких задач и повышаем эффективность выполнения задач)
сочинение
- 1. Менеджер пула потоков (ThreadPool): отвечает за создание, управление, уничтожение пулов потоков и добавление задач.
- 2. Рабочий поток (PoolWorker): если задачи нет, он будет ждать, и задача может быть зациклена и повторена.
- 3. Интерфейс задачи (Task): каждая задача должна реализовывать интерфейс.Рабочий поток отвечает за планирование выполнения задачи, указание входа в задачу, завершение работы после завершения задачи и статус выполнения задачи, и Т. Д.
- 4. Очередь задач: храните необработанные задачи и предоставляйте механизм буферизации задач.
например: касса супермаркета: группа кассового обслуживания, один кассир, работа кассира, люди, ожидающие обналичивания
Класс пула потоков JDK: java.util.concurrent.Executors и интерфейс исполнителя пула потоков JDK: java.util.concurrent.Executor
В исполнителях jdk предоставляет следующие связанные пулы потоков:
статический метод | Тип созданного пула потоков | Фактическая реализация возвращаемого значения |
---|---|---|
newFixedThreadPool(int) | Фиксированный пул потоков | ThreadPoolExecutor |
newWorkStealingPool() | Пул параллельных потоков по количеству ядер процессора | ForkJoinPool |
newSingleThreadExecutor() | Отдельный пул потоков для одного потока | FinalizableDelegatedExecutorService |
newCachedThreadPool() | пул кеш-потоков | ThreadPoolExecutor |
newSingleThreadScheduledExecutor() | Отдельный пул потоков синхронизации потоков | DelegatedScheduledExecutorService |
newScheduledThreadPool(int) | Пул временных потоков | ScheduledThreadPoolExecutor |
newSingleThreadExecutor() пул потоков потоков
Почему я должен использовать пул потоков, чтобы проиллюстрировать это? На самом деле, мы развили простое, чтобы иметь дело со сложным. Сначала код:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
Мы видим, что возвращаемое значение вышеприведенного метода — это все ExecutorService, но на самом деле создается экземпляр FinalizableDelegatedExecutorService Давайте войдем и посмотрим исходный код, как показано ниже:
static class FinalizableDelegatedExecutorService extends DelegatedExecutorService {
//构造方法
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
//对象销毁的时候调用
protected void finalize() {
super.shutdown();
}
}
В приведенном выше коде мы ясно видим, что FinalizableDelegatedExecutorService — это только инкапсуляция DelegatedExecutorService, и единственная реализация — завершать ExecutorService при уничтожении объекта.
На этом этапе мы должны вернуться к анализу DelegatedExecutorService и конкретного кода в приведенном выше методе.
Давайте посмотрим на реализацию пула однопоточных потоков по умолчанию следующим образом:
new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
//此处的代码实现了一个ExecutorService,分别有几个参数?何解?
//
public class ThreadPoolExecutor extends AbstractExecutorService {
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
}
//我们可以看到几个参数的字面意思分别是:
//corePoolSize 核心线程数量,包括空闲线程
//maximumPoolSize 最大线程数量
//keepAliveTime 保持活跃时间(参照后续源码,这里应该是:当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间)
//unit keepAliveTime 参数的时间单位
//workQueue 执行前用于保持任务的队列。此队列仅保持由 execute方法提交的 Runnable任务
//Executors.defaultThreadFactory() 默认线程工厂
//defaultHandler 超出线程和任务队列的任务的处理程序,实现为:new AbortPolicy(),当然这里默认是没有处理的,需要我们手动实现
//这里,我们接着看默认的线程工厂,毕竟线程池核心是需要线程来执行任务,所以此处先看线程来源。
static class DefaultThreadFactory implements ThreadFactory {
//池数量,指定原子操作
private static final AtomicInteger poolNumber = new AtomicInteger(1);
//线程组
private final ThreadGroup group;
//线程数量,指定原子操作
private final AtomicInteger threadNumber = new AtomicInteger(1);
//线程名称前缀
private final String namePrefix;
DefaultThreadFactory() {
//获取系统安全管理器
SecurityManager s = System.getSecurityManager();
//创建线程组,由是否获取系统安全管理器决定
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
//构造线程名称
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
//创建线程
public Thread newThread(Runnable r) {
//将线程组、Runnable接口(线程实际执行代码块)、线程名、线程所需要的堆栈大小为0
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
//如果为守护线程,取消守护状态,必须在线程执行前调用这个setDaemon方法
if (t.isDaemon())
t.setDaemon(false);
//默认任务优先级,值为5
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
//上面的默认线程工厂,提供给了我们一个非守护线程的线程,由原子操作保证线程唯一,任务优先级默认(最低1,最高10,默认5,此处优先级为5)
Прочитав вышеизложенное, можно подытожить: однопоточный пул потоков по умолчанию имеет только один поток и один пул потоков, время ожидания новых задач равно 0, а для связывания потоков добавляются атомарные операции.
Это здесь? Конечно, нет, теперь нам нужно взглянуть на более конкретный ThreadPoolExecutor, чтобы глубже понять пул потоков.
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
*所有的构造方法均指向这里,所以我们看一下这个就足够
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//参数检查,说明线程池不能线程=0,也不能最大线程数量不大于0切最大线程数量不能少于核心线程数量,等待任务最长时间不能小于0
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
//等待任务队列、线程工厂、超任务队列的处理程序
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
//上面的判断,可以看做是一种防御式编程,所有的问题预先处理,后续无需考虑类似问题
//构造线程池相关设定阈值
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
}
//到了这里其实我们不必先追究具体的实现,还是先看看AbstractExecutorService吧。
//抽象的执行服务
public abstract class AbstractExecutorService implements ExecutorService {
//执行方法
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
//获取任务数量
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
//任务集合
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
//执行完成服务
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);
try {
// 记录异常
ExecutionException ee = null;
//超时时间线
final long deadline = timed ? System.nanoTime() + nanos : 0L;
//使用迭代器获取任务
Iterator<? extends Callable<T>> it = tasks.iterator();
// 确定开始一项任务
futures.add(ecs.submit(it.next()));
//任务数量减少
--ntasks;
//正在执行任务标志
int active = 1;
//循环执行任务
for (;;) {
//获取任务队列中第一个任务
Future<T> f = ecs.poll();
//任务为空,如果还有任务则执行任务(任务数量减1,提交任务到执行队列,正在执行任务数量+1)
//正在执行任务数为0,说明任务执行完毕,中断任务循环
//若有超时检查,则执行超时检查机制
//上述情况都不满足,则取出任务队列头,并将其从队列移除
if (f == null) {
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
else if (active == 0)
break;
else if (timed) {
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
else
f = ecs.take();
}
//任务不为空
if (f != null) {
//正在执行标志-1
--active;
try {
//返回执行结果
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
//取消所有任务
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
//执行方法
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
//和上面类似,这里也是创建任务队列
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
//迭代进行任务执行
try {
//创建任务,并添加到任务队列
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));
//设置超时时间标记
final long deadline = System.nanoTime() + nanos;
final int size = futures.size();
//在执行器没有多少多并行性的情况下,交替执行时间检查和调用。
for (int i = 0; i < size; i++) {
execute((Runnable)futures.get(i));
nanos = deadline - System.nanoTime();
//任务超时,返回任务队列
if (nanos <= 0L)
return futures;
}
//遍历任务并返回任务执行结果
for (int i = 0; i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
//超时
if (nanos <= 0L)
return futures;
try {
//给定执行时间等待任务完成并返回结果
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
nanos = deadline - System.nanoTime();
}
}
done = true;
return futures;
} finally {
//未完成则取消执行
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
/**
*创建任务队列
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
/**
* 提交任务到执行队列
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
}
Благодаря приведенному выше коду мы в основном поняли, как пул потоков создает очередь задач и выполняет задачи, поэтому здесь нам нужно только обратить внимание на некоторые ключевые методы ThreadPoolExecutor, чтобы понять, как работает пул потоков, и соответствующие несколько шаблонов пулов потоков. можно вывести.
Прежде всего, перед тем, как на этот раз посмотреть исходный код, мы должны подумать об этом и разобраться в общем процессе выполнения пула потоков:
Ранее мы кратко упомянули несколько основных параметров ThreadPoolExecutor, давайте кратко суммируем их ниже:
public class ThreadPoolExecutor extends AbstractExecutorService {
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
}
- corePoolSize: максимальное количество основных потоков в пуле потоков.
Основной поток: при создании нового потока в пуле потоков, если общее количество текущих потоков меньше, чем corePoolSize, вновь созданный поток является основным потоком. Если оно превышает corePoolSize, вновь созданный поток не является основным потоком. Основной поток всегда будет жить в пуле потоков по умолчанию, даже если основной поток ничего не делает (состояние простоя). Если для атрибута allowCoreThreadTimeOut ThreadPoolExecutor указано значение true, то, если основной поток не работает (состояние простоя), он будет уничтожен через определенный период времени (определяется параметром под продолжительностью).
- maxPoolSize: максимальное значение общего количества потоков в пуле потоков.
Общее количество потоков = основные потоки + неосновные потоки.
- keepAliveTime: период простоя неосновных потоков в пуле потоков.
Неосновной поток будет уничтожен, если он не работает (состояние простоя) дольше времени, заданного этим параметром.Если установлено значение allowCoreThreadTimeOut = true, он будет воздействовать на основной поток.
- unit: единица измерения keepAliveTime
TimeUnit — это тип перечисления, который включает: НАНОСЕКУНДЫ: 1 микросекунда = 1 микросекунда / 1000 МИКРОСЕКУНДЫ: 1 микросекунда = 1 миллисекунда / 1000 МИЛЛИСЕКУНДЫ: 1 мс = 1 секунда / 1000 СЕКУНДЫ : секунды МИНУТЫ: очки ЧАСЫ: часы ДНИ : дни
- workQueue: очередь задач в пуле потоков: поддерживает объекты Runnable, ожидающие выполнения
Когда все основные потоки будут работать, в эту очередь будут добавлены новые задачи для обработки, если очередь заполнена, будут созданы новые неосновные потоки для выполнения задач.
-
threadFactory: способ создания потоков.
-
обработчик: обработчик исключений.
Теперь, когда известно о выполнении задачи, как она ставится в очередь?
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* 1. 如果运行的线程少于corepoolSize大小,新任务会直接开启新的线程执行。
* 对addWorker的调用原子性地检查运行状态和workerCount,从而通过返回false防止假警报,假警报会在不应该的情况下添加线程。
*
* 2. 如果一个任务成功的加入队列,我们需要再次检查是否需要开启新的线程来执行。
* 可能原因有:已有任务执行完毕,或者线程池已经被结束。
*
*
* 3. 如果不能对任务进行排队,则尝试添加一个新任务线程。
* 如果它失败了,我们知道我们已经关闭或饱和了所以拒绝这个任务。
*/
//运行状态标签
int c = ctl.get();
//正在执行的线程数 < 核心线程数 ,则立即执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//再次检查是否运行且没超过任务队列容量
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//需要执行的任务不在运行状态且不在等待队列,则将这个任务异常抛出
if (! isRunning(recheck) && remove(command))
reject(command);
//运行任务数量为空,则将核心线程外的线程任务置空
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//超过核心线程数且其他线程任务也添加失败,抛出异常
else if (!addWorker(command, false))
reject(command);
}
Увидев это, мы смутно поняли, что задачи ставятся в очередь на выполнение.Все очереди задач ставятся в очередь на выполнение одинаково.Так какие у нас очереди задач?
-
LinkedBlockingQueue: Линейная очередь блокировки. Получить задачу, если она не превышает corePoolSize, создать новый поток для выполнения, в противном случае войти в очередь блокировки для ожидания
-
ArrayBlockingQueue: Очередь блокировки массива. Массив специальной диагностики имеет фиксированную длину, то есть длина этой очереди фиксирована. При получении новой задачи, если она не превышает corePoolSize, создается новый поток для выполнения, если превышает, создается новый поток (общее количество потоков
-
SynchronousQueue: синхронизировать очередь. Поскольку это синхронная очередь, это означает, что новые задачи будут выполняться по мере их поступления. То есть количество основных потоков бесконечно.
-
DelayQueue: очередь задержки, вы также можете узнать, что задача должна быть отложена, прослушивая имя.Когда очередь получает задачу, она сначала присоединяется к очереди, и задача будет выполняться только по достижении указанного времени задержки.
То есть на данный момент мы в основном проанализировали несколько ядер пула потоков: собственный тип пула потоков jdk, фабрика потоков в пуле потоков (для производственных потоков), выполнение задач пула потоков, очередь задач пула потоков, пул потоков тип очереди. Мы можем завершить эту статью, резюмируя картину.Конечно, для конкретной реализации других типов пулов потоков, пожалуйста, проверьте исходный код самостоятельно.
Мысль: есть ли что-то подобное в Java-разработке, работающее подобным образом?