задний план
Я полагаю, что в процессе интервью вы сталкивались со многими интервьюерами, которые спрашивали о тредах.После тредов есть пул потоков. От простого к сложному, это такой процесс, и это правда, что многие люди меньше контактируют с пулом потоков в своей работе. Самое большее - создать его, а затем отправить в него поток. Для некоторых опытных интервьюеров, посмотрите Вы может задавать много вопросов, связанных с пулом потоков. Вместо того, чтобы задавать вопросы в оцепенении, лучше усердно учиться. Не трудитесь в это время.
Что такое пул потоков?
Пул потоков — это форма многопоточной обработки.Во время обработки задачи отправляются в пул потоков, и выполнение задач управляется пулом потоков.
Если для каждого запроса создается поток, ресурсы сервера будут быстро исчерпаны. Использование пула потоков может уменьшить количество создаваемых и уничтожаемых потоков. Каждый рабочий поток можно использовать повторно и выполнять несколько задач.
Если мы используем пример из жизни, чтобы проиллюстрировать, мы можем рассматривать пул потоков как команду обслуживания клиентов.Если 1000 человек звонят для консультации в одно и то же время, согласно нормальной логике, 1000 сотрудников службы поддержки клиентов необходимы, чтобы ответить на телефонный звонок и Обслужить покупателей. На самом деле необходимо учитывать многие вещи, например: достаточно ли ресурсов и относительно высока ли стоимость найма такого количества людей. Обычной практикой является набор 100 человек для создания центра обслуживания клиентов.При поступлении звонка служба поддержки клиентов, на которую не ответили, будет назначена для оказания услуг.Если более 100 человек консультируются одновременно, клиент будет предложено подождать и разобраться с этим позже.Когда обслуживание клиентов бесплатно, вы можете продолжить.Обслуживайте следующего клиента, чтобы добиться рационального использования ресурса и максимизировать выгоды.
Типы пулов потоков в Java
1. newSingleThreadExecutor
Как создать:
ExecutorService pool = Executors.newSingleThreadExecutor();
Однопоточный пул потоков. В этом пуле потоков работает только один поток, что эквивалентно одному потоку, последовательно выполняющему все задачи. Если единственный поток завершается аварийно, его место занимает новый поток. Этот пул потоков гарантирует выполнение всех задач в том порядке, в котором они были отправлены.
Как использовать:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPool {
public static void main(String[] args) {
ExecutorService pool = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
pool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t开始发车啦....");
});
}
}
}
Результат выглядит следующим образом:
pool-1-thread-1 开始发车啦....
pool-1-thread-1 开始发车啦....
pool-1-thread-1 开始发车啦....
pool-1-thread-1 开始发车啦....
pool-1-thread-1 开始发车啦....
pool-1-thread-1 开始发车啦....
pool-1-thread-1 开始发车啦....
pool-1-thread-1 开始发车啦....
pool-1-thread-1 开始发车啦....
pool-1-thread-1 开始发车啦....
Из вывода видно, что постоянно работает только один поток.
2.newFixedThreadPool
Как создать:
ExecutorService pool = Executors.newFixedThreadPool(10);
Создайте пул потоков фиксированного размера. Поток создается каждый раз при отправке задачи до тех пор, пока поток не достигнет максимального размера пула потоков. Как только размер пула потоков достигнет максимального значения, он останется неизменным.Если поток завершится из-за аварийного выполнения, пул потоков будет пополнен новым потоком.
Как использовать:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPool {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
pool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t开始发车啦....");
});
}
}
}
Результат выглядит следующим образом:
pool-1-thread-1 开始发车啦....
pool-1-thread-4 开始发车啦....
pool-1-thread-3 开始发车啦....
pool-1-thread-2 开始发车啦....
pool-1-thread-6 开始发车啦....
pool-1-thread-7 开始发车啦....
pool-1-thread-5 开始发车啦....
pool-1-thread-8 开始发车啦....
pool-1-thread-9 开始发车啦....
pool-1-thread-10 开始发车啦....
3. newCachedThreadPool
Как создать:
ExecutorService pool = Executors.newCachedThreadPool();
Создайте кешируемый пул потоков. Если размер пула потоков превышает количество потоков, необходимых для обработки задач, некоторые простаивающие потоки будут перезапущены.Когда количество задач увеличится, в пул потоков будут добавлены новые потоки для обработки задач.
Использование показано в пункте 2 выше.
4.newScheduledThreadPool
Как создать:
ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
Этот пул потоков поддерживает потребности в синхронизированном и периодическом выполнении задач.
Как использовать:
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ThreadPool {
public static void main(String[] args) {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
for (int i = 0; i < 10; i++) {
pool.schedule(() -> {
System.out.println(Thread.currentThread().getName() + "\t开始发车啦....");
}, 10, TimeUnit.SECONDS);
}
}
}
Приведенная выше демонстрация предназначена для задержки выполнения задач на 10 секунд.Если вы хотите выполнять периодические задачи, вы можете использовать следующий метод для выполнения один раз в секунду.
//pool.scheduleWithFixedDelay也可以
pool.scheduleAtFixedRate(() -> {
System.out.println(Thread.currentThread().getName() + "\t开始发车啦....");
}, 1, 1, TimeUnit.SECONDS);
5.newWorkStealingPoolnewWorkStealingPool доступен только в jdk1.8, он будет динамически создавать и закрывать потоки в соответствии с требуемым уровнем параллелизма, снижать конкуренцию за счет использования нескольких очередей и реализовывать ForkJoinPool на нижнем уровне. Преимущество ForkJoinPool заключается в том, что он может в полной мере использовать преимущества нескольких процессоров и многоядерных процессоров, разбивать задачу на несколько «маленьких задач» и размещать несколько «маленьких задач» на нескольких процессорных ядрах для параллельного выполнения. выполняются «маленькие задачи», эти результаты выполнения могут быть объединены.
Расскажите о политике отклонения пула потоков
Когда задача запроса продолжает поступать, а система не может ее обработать в данный момент, стратегия, которую мы должны принять, состоит в том, чтобы отказать в обслуживании. Интерфейс RejectedExecutionHandler предоставляет возможность отказаться от пользовательских методов обработки задачи. В ThreadPoolExecutor уже включены четыре стратегии обработки.
- Политика AbortPolicy: эта политика напрямую вызывает исключение, препятствуя правильной работе системы.
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
- Политика CallerRunsPolicy: Пока пул потоков не закрыт, эта политика запускает текущую отброшенную задачу непосредственно в потоке вызывающего объекта.
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
- Политика DiscardOleddestPolicy: эта политика отбрасывает самый старый запрос, то есть задачу, которая должна быть выполнена, и пытается снова отправить текущую задачу.
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
- Политика DiscardPolicy: эта политика автоматически отбрасывает задачи, которые не могут быть обработаны без какой-либо обработки.
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
В дополнение к четырем стратегиям отклонения, предоставляемым JDK по умолчанию, мы можем настроить стратегию отклонения в соответствии с потребностями нашего бизнеса.Способ настройки очень прост, и мы можем напрямую реализовать интерфейс RejectedExecutionHandler.
Например, в интеграции Spring есть настраиваемая политика отклонения CallerBlocksPolicy, которая вставляет задачи в очередь до тех пор, пока очередь не освободится и вставка не пройдет успешно, в противном случае она будет заблокирована в соответствии с максимальным временем ожидания до истечения времени ожидания.
package org.springframework.integration.util;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class CallerBlocksPolicy implements RejectedExecutionHandler {
private static final Log logger = LogFactory.getLog(CallerBlocksPolicy.class);
private final long maxWait;
/**
* @param maxWait The maximum time to wait for a queue slot to be
* available, in milliseconds.
*/
public CallerBlocksPolicy(long maxWait) {
this.maxWait = maxWait;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
try {
BlockingQueue<Runnable> queue = executor.getQueue();
if (logger.isDebugEnabled()) {
logger.debug("Attempting to queue task execution for " + this.maxWait + " milliseconds");
}
if (!queue.offer(r, this.maxWait, TimeUnit.MILLISECONDS)) {
throw new RejectedExecutionException("Max wait time expired to queue task");
}
if (logger.isDebugEnabled()) {
logger.debug("Task execution queued");
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RejectedExecutionException("Interrupted", e);
}
}
else {
throw new RejectedExecutionException("Executor has been shut down");
}
}
}
Как его использовать после определения? Определение бесполезно, его нужно использовать в пуле потоков.Вы можете настроить пул потоков следующими способами и указать политику отказа.
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100);
ThreadPoolExecutor executor = new ThreadPoolExecutor(
10, 100, 10, TimeUnit.SECONDS, workQueue, new CallerBlocksPolicy());
В чем разница между выполнением и отправкой?
В предыдущем объяснении мы использовали метод execute для выполнения задач.В дополнение к методу execute есть также метод submit, который также может выполнять отправленные нами задачи.
В чем разница между этими двумя методами? В каких сценариях он применяется? Проведем простой анализ.
execute подходит для сценариев, где не нужно обращать внимание на возвращаемое значение, нужно только кинуть поток в пул потоков на выполнение.
public class ThreadPool {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(10);
pool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t开始发车啦....");
});
}
}
Метод submit подходит для сценариев, требующих внимания к возвращаемому значению. Определение метода submit выглядит следующим образом:
public interface ExecutorService extends Executor {
  ...
  <T> Future<T> submit(Callable<T> task);
  <T> Future<T> submit(Runnable task, T result);
  Future<?> submit(Runnable task);
  ...
}
Его подкласс AbstractExecutorService реализует метод отправки.Можно увидеть, что независимо от того, является ли параметр Callable или Runnable, он в конечном итоге будет инкапсулирован в RunnableFuture, а затем вызовет execute для выполнения.
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
Давайте посмотрим, как используются эти три метода:
submit(Callable task);
public class ThreadPool {
public static void main(String[] args) throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(10);
Future<String> future = pool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return "Hello";
}
});
String result = future.get();
System.out.println(result);
}
}
submit(Runnable task, T result);
public class ThreadPool {
public static void main(String[] args) throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(10);
Data data = new Data();
Future<Data> future = pool.submit(new MyRunnable(data), data);
String result = future.get().getName();
System.out.println(result);
}
}
class Data {
String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
class MyRunnable implements Runnable {
private Data data;
public MyRunnable(Data data) {
this.data = data;
}
@Override
public void run() {
data.setName("yinjihuan");
}
}
Future<?> submit(Runnable task);Прямая отправка Runnable не получит возвращаемого значения, возвращаемое значение равно null.
Пять сценариев использования пула потоков
-
newSingleThreadExecutor: однопоточный пул потоков, который можно использовать в сценариях, где требуется последовательное выполнение и выполняется только один поток.
-
newFixedThreadPool: пул потоков фиксированного размера, который можно использовать для ограничения количества потоков в условиях известного давления параллелизма.
-
newCachedThreadPool: пул потоков, который можно расширять до бесконечности, что больше подходит для обработки задач с относительно небольшим временем выполнения.
-
newScheduledThreadPool: пул потоков, который можно откладывать и запускать регулярно, подходит для сценариев, требующих нескольких фоновых потоков для выполнения периодических задач.
-
newWorkStealingPool: пул потоков с несколькими очередями задач, который может уменьшить количество подключений и создать потоки с текущим количеством доступных ЦП для параллельного выполнения.
Закрытие пула потоков
Закрыть пул потоков можно, вызвав методы shutdownNow и shutdown.
shutdownNow: выполнить прерывание() для всех выполняемых задач, остановить выполнение, отменить все задачи, которые еще не запущены, и вернуться к списку задач, которые еще не запущены.
public class ThreadPool {
public static void main(String[] args) throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(1);
for (int i = 0; i < 5; i++) {
System.err.println(i);
pool.execute(() -> {
try {
Thread.sleep(30000);
System.out.println("--");
} catch (Exception e) {
e.printStackTrace();
}
});
}
Thread.sleep(1000);
List<Runnable> runs = pool.shutdownNow();
}
}
Вышеприведенный код имитирует сценарий немедленной отмены, добавления 5 задач потока в пул потоков, а затем приостановки в течение определенного периода времени. Пул потоков имеет только один поток. Если в это время вызывается shutdownNow, необходимо прерывание выполняющаяся задача и вернуть потоки 4. Для невыполненных задач консоль выводит следующее:
0
1
2
3
4
[fs.ThreadPool$$Lambda$1/990368553@682a0b20,
fs.ThreadPool$$Lambda$1/990368553@682a0b20,
fs.ThreadPool$$Lambda$1/990368553@682a0b20,
fs.ThreadPool$$Lambda$1/990368553@682a0b20]
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at fs.ThreadPool.lambda$0(ThreadPool.java:15)
at fs.ThreadPool$$Lambda$1/990368553.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
shutdown: когда мы вызываем завершение работы, пул потоков больше не будет принимать новые задачи, но не будет принудительно завершать задачи, которые были отправлены или выполняются.
public class ThreadPool {
public static void main(String[] args) throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(1);
for (int i = 0; i < 5; i++) {
System.err.println(i);
pool.execute(() -> {
try {
Thread.sleep(30000);
System.out.println("--");
} catch (Exception e) {
e.printStackTrace();
}
});
}
Thread.sleep(1000);
pool.shutdown();
pool.execute(() -> {
try {
Thread.sleep(30000);
System.out.println("--");
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
Вышеприведенный код имитирует состояние выполнения, затем вызывает завершение работы, а затем добавляет к нему задачи. От добавления нужно отказаться. См. вывод:
0
1
2
3
4
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task fs.ThreadPool$$Lambda$2/1747585824@3d075dc0 rejected from java.util.concurrent.ThreadPoolExecutor@214c265e[Shutting down, pool size = 1, active threads = 1, queued tasks = 4, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at fs.ThreadPool.main(ThreadPool.java:24)
Существуют также некоторые бизнес-сценарии, которым необходимо знать, завершены ли все задачи в пуле потоков. Когда мы закрываем пул потоков, мы можем использовать isTerminated, чтобы определить, завершены ли все потоки. назвали это результатом выключения.
public class ThreadPool {
public static void main(String[] args) throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(1);
for (int i = 0; i < 5; i++) {
System.err.println(i);
pool.execute(() -> {
try {
Thread.sleep(3000);
System.out.println("--");
} catch (Exception e) {
e.printStackTrace();
}
});
}
Thread.sleep(1000);
pool.shutdown();
while(true){
if(pool.isTerminated()){
System.out.println("所有的子线程都结束了!");
break;
}
Thread.sleep(1000);
}
}
}
пользовательский пул потоков
В процессе фактического использования большинство из нас использует Executors для создания пулов потоков для прямого использования, если есть какие-то другие требования, такие как указание политики отклонения пула потоков, тип очереди блокировки, префикс имен потоков и т. д. ., мы можем Решение состоит в том, чтобы использовать собственный пул потоков.
Если вы просто хотите изменить префикс имени потока, вы можете настроить ThreadFactory для достижения этого.Существует параметр ThreadFactory в Executors.new.... Если он не указан, используется DefaultThreadFactory.
Ядром пользовательского пула потоков является создание объекта ThreadPoolExecutor и указание параметров.
Давайте посмотрим на определение конструктора ThreadPoolExecutor:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) ;
- corePoolSize Размер пула потоков определяет, будет ли вновь отправленная задача выполняться в новом потоке или помещена в очередь задач, а также является наиболее важным параметром пула потоков. Как правило, в начале пула потоков нет, и только когда приходит задача и количество потоков меньше, чем corePoolSize, потоки создаются.
- максимальный размер пула Максимальное количество потоков, максимальное количество потоков, которое может создать пул потоков.
- KeepAliveTime Максимальное время выживания избыточных бездействующих потоков после того, как число потоков превысит corePoolSize.
- единица измерения единица времени
- работаОчередь Очередь, в которой хранятся задачи, которые слишком поздно обработать, называется BlockingQueue.
- нитьФабрика Фабричный класс для создания потоков, который может определять имя потока, приоритет и т. д.
- обработчик Стратегия отказа, как с ней справиться, когда задача слишком поздняя, чтобы с ней справиться, объяснялась ранее.
Поняв приведенную выше информацию о параметрах, мы можем определить собственный пул потоков. Я заменил LinkedBlockingQueue на ArrayBlockingQueue и указал размер очереди. Когда задача превышает размер очереди, для ее обработки используется политика отклонения CallerRunsPolicy.
Преимущество этого в том, что размер очереди строго контролируется, и нет ситуации, когда задачи добавляются постоянно, иногда обработка задач идет медленно, а количество задач слишком велико, что займет много времени. памяти и привести к переполнению памяти.
Конечно, вы также можете управлять записью, отправленной в пул потоков, например CountDownLatch, Semaphore и т. д.
/**
* 自定义线程池<br>
* 默认的newFixedThreadPool里的LinkedBlockingQueue是一个无边界队列,如果不断的往里加任务,最终会导致内存的不可控<br>
* 增加了有边界的队列,使用了CallerRunsPolicy拒绝策略
* @author yinjihuan
*
*/
public class FangjiaThreadPoolExecutor {
private static ExecutorService executorService = newFixedThreadPool(50);
private static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10000), new DefaultThreadFactory(), new CallerRunsPolicy());
}
public static void execute(Runnable command) {
executorService.execute(command);
}
public static void shutdown() {
executorService.shutdown();
}
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "FSH-pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
}
Для большего обмена технологиями, пожалуйста, обратите внимание на общедоступную учетную запись WeChat: Yuantiandi