короткая книгаЦзян И Джонни, пожалуйста, указывайте первоисточник для перепечатки, спасибо!
В этой статье анализируется и интерпретируется исходный код на основе JDK1.7. Мы начнем обсуждение этой темы с простого случая, а в конце статьи автор поделится некоторым опытом, чтобы опоздавшие не наступили на яму.
Если я вам нравлюсь, вы можете обратить внимание на мой публичный номер~Больше галантереи~
ThreadPoolExecutor — это тип инструмента пула потоков, предоставляемый JUC, а также параллельная среда с большинством сценариев приложений на языке Java.Можно сказать, что почти все, что требует асинхронного или параллельного выполнения, может использовать пул потоков Java. Итак, прежде всего, сравним «схему простого использования потоков» и «схему использования пула потоков ThreadPoolExecutor», в чем разница в решении задачи.
Кейс: Писец
В Средние века существовала профессия писца, которая работала копировальным аппаратом, переписывая книгу за книгой. Если в это время есть мастерская писца, то писцов всего 2, и они должны переписать 10 книг.
В этом примере мы «пишем управление потоками сами» и «управление потоками с помощью ThreadPoolExecutor» соответственно.
public static class Book {
private static AtomicInteger id = new AtomicInteger(0); // 书名生成器
private String bookName; // 书名
public void copy() { // 抄写书籍
System.out.println("start copy " + bookName);
try {
Thread.sleep(100L); // sleep 100ms
} catch (Exception e) {
// ignore
}
System.out.println("end copy " + bookName);
}
public Book() {
bookName = "book-" + String.valueOf(id.incrementAndGet()); // 书名自动生成
}
}
Внедрить управление нитью себя
// 提前准备好十本书
final BlockingQueue<Book> books = new LinkedBlockingDeque<Book>(10);
for (int i = 0; i < 10; i++) {
try {
books.put(new Book());
} catch (Exception e) {
// ignore
}
}
System.out.println("start work...");
// 创建两个书籍抄写员线程
Thread[] scribes = new Thread[2];
for (int scribeIndex = 0; scribeIndex < 2; scribeIndex++) {
scribes[scribeIndex] = new Thread(new Runnable() {
public void run() {
for (; ; ) {
if (Thread.currentThread().isInterrupted()) {
System.out.println("time arrives, stop writing...");
}
try {
Book currentBook = books.poll(5, TimeUnit.SECONDS);
currentBook.copy();
} catch (Exception e) {
System.out.println("time arrives, stop writing...");
return;
}
}
}
});
scribes[scribeIndex].setDaemon(false); // 设置为非守护线程
scribes[scribeIndex].start();
}
// 工作已经安排下去了,安心等待就好了
try {
Thread.sleep(10000l);
} catch (Exception e) {
// ignore
}
// 时间到了,提醒两个抄写员停止抄写
for (int scribeIndex = 0; scribeIndex < 2; scribeIndex++) {
scribes[scribeIndex].interrupt();
}
System.out.println("end work...");
Я написал много кода для выполнения вышеуказанных функций, давайте посмотрим, как для его выполнения используется ThreadPoolExecutor.
System.out.println("start work...");
ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < 10; i ++) {
executorService.submit(new Runnable() {
public void run() {
new Book().copy();
}
});
}
// 工作已经安排下去了,安心等待就好了
try {
Thread.sleep(10000l);
} catch (Exception e) {
// ignore
}
executorService.shutdownNow();
System.out.println("end work...");
Весь процесс очень понятен, они:написание задач,создание темы,начало потока,убить нить.
Но часто проблема не ограничивается вышеперечисленным.
Дилемма разработчика
Первым разработчикам параллельного программирования приходилось многое делать самим, и с помощью пула потоков Java можно было выполнить следующую работу:
1)управление потоками, создание потока, запуск, уничтожение и т.д.;
2)повторное использование потока, создание потоков принесет определенные накладные расходы серверу, как уменьшить накладные расходы на частое повторное создание потоков;
3)Эластичное масштабирование, сервер обычно имеет пиковые периоды и низкие пиковые периоды, можно ли эластично масштабировать пул потоков, например, можно ли повторно использовать поток, если он не используется в течение длительного времени после его успешного создания, чтобы уменьшить потери системных ресурсов или возможность увеличения емкости пула потоков в любое время;
4)политика отказа, количество потоков ограничено и задач для обработки много, а задачи, превышающие возможности системы, отклоняются или блокируются и ожидаются;
5)Обработка исключенийНить может столкнуться с исключением или ошибкой во время процесса выполнения, как разработчики должны правильно иметь эти исключения или ошибки;
6)Назначение задачи, независимо от того, основано ли назначение задач на принципе «первым поступил», «первым обслужен» или какой-либо политике приоритетов.
И т. д., и т. д. В этот раз мы представим фреймворк пула потоков ThreadPoolExecutor, разработанный Дугом Ли, и посмотрим, как Бог решает вышеуказанные проблемы.
Анализ исходного кода ThreadPoolExecutor
Прежде всего, прежде чем интерпретировать исходный код, нам нужно представить некоторые важные понятия ThreadPoolExecutor.
жизненный цикл
При разработке пула потоков ThreadPoolExecutor весь пул потоков среды выполнения задач разделен на 5 жизненных циклов:
RUNNING: позволяет получать новые задачи и обрабатывать задачи в очереди
SHUTDOWN: Новые задачи больше не поступают, обрабатываются только задачи в очереди.
STOP: Не только больше не поступают новые задачи, но и задачи в очереди больше не перевариваются и не обрабатываются, и пытаются прервать поток, выполняющий задачу
TIDYING: все задачи завершены, количество рабочих потоковworkCount
также устанавливается в 0, и состояние потока также устанавливается вTIDYING, и начните вызывать функцию ловушки, завершенную()
TERMINATED: Функция крюкаterminated()
Законченный
Схема трансформации каждого жизненного цикла выглядит следующим образом:
Как видно из рисунка, изменения на протяжении всего жизненного цикланеобратимыйиз.
слово состояния
ThreadPoolExecutor упаковывает состояние пула потоков и емкость пула потоков в переменную типа int, как показано на следующем рисунке.
Тематические биты состояния пула
государство | перечисление значений высокого порядка | положительный и отрицательный |
---|---|---|
RUNNING | 111 | Отрицательное число (-536870912) |
SHUTDOWN | 000 | 0 |
STOP | 001 | Положительный номер (536870912) |
TIDYING | 010 | Положительное число (1073741824) |
TERMINATED | 011 | Положительное число (1610612736) |
Следовательно, из расстановки значений состояния можно узнатьTERMINATED > TIDYING > STOP >SHUTDOWN > RUNNING
Код в ThreadPoolExecutor выглядит так:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 状态字的高比特位存放线程池状态信息
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 打包/提取状态字信息
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
// 判断当前线程池是否正在执行
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
Основной процесс выполнения пула потоков
Введение кода
Во-первых, мы создаем пул потоков.
1. Создание пула потоков
ExecutorService executorService = Executors.newFixedThreadPool(2);
Здесь используется фабричный метод, предоставляемый Executors, и могут быть созданы следующие четыре типа пулов потоков:
newFixedThreadPool. Этот метод будет использоваться для создания пула потоков с фиксированным размером (в настоящее время corePoolSize = maxPoolSize), и пул потоков будет создаваться каждый раз при отправке задачи до тех пор, пока пул потоков не достигнет максимального числа, а размер пул потоков после этого не изменится;
newCachedThreadPool. Этот метод создает кешируемый пул потоков (corePoolSize = 0, maxPoolSize = Integer.MAX_VALUE в это время), и простаивающие потоки будут автоматически переработаны в течение более 60 секунд. Риск этого пула потоков заключается в том, что если серверное приложение достигнет пиковый запрос. В течение периода новые потоки будут создаваться непрерывно, пока память не будет исчерпана;
newSingleThreadExecutor. Этот метод создает однопоточный пул потоков, который последовательно выполняет задачи в том порядке, в котором они поставлены в очередь (например:FIFO,LIFO, приоритет);
newScheduledThreadPool. Этот метод создает пул потоков фиксированной длины, который может выполнять задачи с задержкой или по времени;
2. Задача представления
Общая логика отправки задачи следующая:
1) Когда пул потоков меньше, чем corePoolSize, вновь отправленная задача создаст новый поток для выполнения задачи, даже если в это время в пуле потоков есть простаивающие потоки;
2) Когда пул потоков достигает размера corePoolSize, вновь отправленная задача будет помещена в workQueue, ожидая, пока задача в пуле потоков будет запланирована и выполнена;
3) Когда рабочая очередь заполнена и максимальный размер пула > corePoolSize, вновь отправленная задача создаст новый поток для выполнения задачи;
4) Когда количество отправленных задач превышает максимальный размер пула, вновь отправленные задачи обрабатываются RejectedExecutionHandler;
5) Когда пул потоков превышает количество потоков corePoolSize, а время простоя достигает keepAliveTime, закрываем неиспользуемые потоки;
Затем давайте посмотрим, как исходный код реализует приведенное выше описание
После успешного создания пула потоков мы отправляем задачу в пул потоков:
executorService.submit(new Runnable() {
public void run() {
new Book().copy();
}
});
После отправки в пул потоков:
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);// 包装出一个新的任务
execute(ftask); // 线程池的入口
return ftask;
}
можно увидетьThreadPoolExecutor
Способ входаexecute(Runnable commad)
. Логика выполнения этого метода следующая:
int c = ctl.get();
// 1. 如果当前线程池中线程总数少于核心线程数,则添加新线程到线程池中,
// 并且由新线程执行刚提交进来的任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 2. 可能刚才在创建新线程成功的同时,线程池被关闭了,因此需要double-check,
// 如果此时线程池已经被关闭了,那么回滚刚才被添加进来的任务
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 如果此时核心线程数(corePoolSize)已经满了,并且任务队列也满了,
// 尝试增加线程到maximumPoolSize大小,如果仍然失败,执行拒绝策略
else if (!addWorker(command, false))
reject(command);
В приведенном выше кодеctl.get()
метод,workerCountOf()
,а такжеisRunning()
Методы - это все операции чтения и записи слова состояния, упомянутые выше.Мы не будем расширять эту часть для читателей.Заинтересованные читатели могут узнать сами.
Далее посмотрим, что делает addWorker:
private boolean addWorker(Runnable firstTask, boolean core) {
// 这部分省略的代码都是对状态字进行修改,添加并创建线程之前,
// 需要递增work记数(此时需要线程安全地操作)
...
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
...
w = new Worker(firstTask); // 此处封装出了一个新的Work,这个类我们稍后会介绍
final Thread t = w.thread;
if (t != null) {
...
// 获得线程池状态,如果线程池已经被关闭了,则不再创建新的线程
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
...
workerAdded = true;
...
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 如果任务启动或者提交到线程池失败,
// 则执行回滚操作(从工作线程池中移除失败添加的worker、减少状态字中的任务计数)
addWorkerFailed(w);
}
return workerStarted;
}
3. Выполнение задачи
задача выполняется вWorker
класс, покаWorker
класс является унаследованнымRunnable
Интерфейс класса.
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
...
public void run() {
runWorker(this);
}
...
}
Вы можете видеть, что класс Worker вызывает внешнийrunWorker()
метод. Поэтому можно понять, что основная логика выполнения задачи внешняя.runWorker()
выполняется в методе
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
...
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) { // 循环读取任务
...
try {
beforeExecute(wt, task); // 用户实现的回调方法,任务启动前
Throwable thrown = null;
try {
task.run();// 任务执行
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 用户实现的回调方法,任务执行后
}
} finally {
task = null;
w.completedTasks++;
...
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
beforeExecute и afterExecute — это два метода ловушек, которые определяют действия, которые должны выполняться, когда поток начинает выполняться и завершает выполнение, что должно быть реализовано разработчиком.
Еще одна вещь, которую следует отметить, это метод getTask(), вызываемый в методе runWorker.Внутри этого метода, если возникают следующие условия, он возвращает значение null и завершает цикл выполнения рабочего потока: 1) Текущее количество потоков вот-вот превысит maxPoolSize 2) Пул потоков закрыт 3) Текущее количество потоков больше, чем corePoolSize, но меньше, чем maxPoolSize, и это связано с тем, что получение данных из BlockingQueue превышает время ожидания (по умолчанию 60 секунд).
Код реализован следующим образом:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 校验当前线程池状态
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
boolean timed; // Are workers subject to culling?
for (;;) {
int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize;
if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
try {
// 如果线程超过指定时间内(默认60秒)没有获取到任务,说明有线程即将过期
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
4. Отказ от задания
Если поток отправляется в пул потоков, в текущем пуле потоков возникает одно из следующих условий:
1) Очередь задач пула потоков заполнена
2) Пул потоков закрыт (вызовshutdown
функция илиshutdownNow
функция)
вызовет заранее заданную стратегию обратного вызова,ThreadPoolExecutor
Всего предусмотрено четыре стратегии:
1)AbortPolicy (прервать): стратегия напрямую вызовет исключение RejectedExecutionException, и вызывающий объект получит исключение;
2)DiscardPolicy (отбросить): Используя эту стратегию, пул потоков будет молча отбрасывать задачу, не зная об этом вызывающей стороне;
3)CallerRunsPolicy (запуск вызывающего абонента): эта стратегия не отбрасывает задачу и не генерирует исключение, а возвращает задачу вызывающей стороне, тем самым уменьшая поток новых задач;
4)DiscardoldestPolicy (отказаться от самого старого): Эта стратегия откажется от выполнения задачи вот-вот повернется, то «отказ от самой старой» приведет к отказу от задачи с наивысшим приоритетом, лучше не «отказаться от самой старой» политики насыщения и использования приоритетной очереди вместе; Здесь код реализует только нашу стратегию show ** CallerRunsPolicy (запуск вызывающего абонента) **:
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
// 策略实现
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
Конечно, разработчики также могут определить свои собственные стратегии насыщения в соответствии с потребностями бизнеса.
5. Разрушение пула потоков
ThreadPoolExecutor
Предоставляет два метода уничтожения пула потоков, а именноshutdown()
иshutdownNow()
shutdown()
Метод заключается в том, чтобы просто установить состояние пула потоков наSHUTDOWN, и отклоняет все входящие запросы, но задачи, уже находящиеся в очереди задач, по-прежнему будут использоваться в обычном режиме.
иshutdownNow()
Выполнение метода проще, и это заставит его закрытьExecutorService
, он также попытается отменить выполняемую задачу и вернуть все задачи, которые были отправлены, но не запущены. Разработчики могут записывать эти задачи в журнал и сохранять их для последующей обработки. Кроме того, попытка отменить выполняемую задачу просто попытка выполнить поток.Чтобы прервать, пользователь должен написать конкретную стратегию прерывания ответа потока. Код реализован следующим образом:
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
Осторожно ступаем в яму: рассказ об опыте работы с пулом потоков
Не используйте ThreadLocal
УходитеThreadPoolExecutor
используется в пуле потоковThreadLocal
,Так какThreadPoolExecutor
, потоки мультиплексируются, поэтому используйте здесьThreadLocal
Он будет использоваться несколькими задачами, поэтому это может привести к загрязнению грязными данными. нужно использовать с осторожностью
Установите значение corePoolSize разумно
Возьмите кусок кода в качестве примера:
// 10个线程,因为任务多,这里用LinkedBlockingQueue
private static final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
private static final ExecutorService service = new ThreadPoolExecutor(0, 10,
60L, TimeUnit.SECONDS, queue
);
corePoolSize=0 в коде, то есть количество потоков ядра равно 1. Если количество задач больше 10, то сначала будут созданы потоки maxPoolSize на выполнение, а остальные задачи будут добавлены в очередь для исполнения.
пока вThreadPoolExecutor
В реализации, когда workQueue заполнена и maxPoolSize>corePoolSize, вновь отправленная задача создаст новый поток для выполнения задачи.
Следовательно, очередь не будет заполнена, поэтому никогда не будет создано нитей maxPoolSize, то есть наша задача по-прежнему выполняется в одном потоке, и ожидание того, что несколько потоков могут использоваться одновременно, не может быть достигнуто.
прерывание потока
Несмотря на то чтоThreadPoolExecutor
при условииshutdownNow()
метод, после вызова этого метода он попытается прервать все потоки, но прерывание не гарантирует, что поток завершится в этот момент, поэтому разработчику необходимо реализовать стратегию прерывания потока. Об этой части вDoug Leaиз"Java Concurrency In Practice«Раздел 7.1.2 уже обсуждался полностью, и я не буду повторяться здесь.
финализировать функцию
В частности, важно отметить, чтоThreadPoolExecutor
существует одинfinalize
функция, конкретная реализация выглядит следующим образом:
protected void finalize() {
shutdown();
}
вызывается в этом методеshutdown()
функция, поэтому, если вы действительно не хотите останавливать выполнение пула потоков, не позволяйте пулу потоков выходить за рамки вашего кода.
Меня зовут Цзян И Джонни.