Jianshu Zhan Xiaolang перепечатано, пожалуйста, укажите первоисточник, спасибо!
Оглядываясь назад на эту статью, которую я написал ранее, я вспоминаю, что источник интереса к чтению исходного кода, похоже, исходит от пула потоков Java.Когда холмы будут покорены один за другим, вы обнаружите, что попали в большую яму, потому что есть слишком много областей, которые вы не понимаете.
предисловие
Потоки являются дефицитными ресурсами. Если они создаются без ограничений, это не только потребляет системные ресурсы, но и снижает стабильность системы. Разумное использование пулов потоков для равномерного распределения, настройки и мониторинга потоков имеет следующие преимущества:
1. Уменьшить потребление ресурсов 2. Увеличить скорость отклика 3. Улучшить управляемость потоками.
Платформа Executor, представленная в Java 1.5, разделяет отправку и выполнение задач.Вам нужно только определить задачи и отправить их в пул потоков, не заботясь о том, как выполняется задача, какой поток выполняется и когда он выполняется.
demo
1,Executors.newFixedThreadPool(10)
Инициализировать исполнитель пула потоков, содержащий 10 потоков;
2. Пройтиexecutor.execute
Метод отправляет 20 задач, и каждая задача печатает имя текущего потока;
3. Жизненный цикл потока, ответственного за выполнение задачи, управляется инфраструктурой Executor;
ThreadPoolExecutor
Исполнители — это фабричный класс для пулов потоков Java, с помощью которого можно быстро инициализировать пул потоков, отвечающий потребностям бизнеса, напримерExecutors.newFixedThreadPool
метод может создать пул потоков с фиксированным числом потоков.
Суть его заключается в инициализации объекта ThreadPoolExecutor с различными параметрами, конкретные параметры описываются следующим образом:
corePoolSize
Количество основных потоков в пуле потоков. При отправке задачи пул потоков создает новый поток для выполнения задачи до тех пор, пока текущее количество потоков не станет равным corePoolSize; если текущее количество потоков равно corePoolSize, задачи, которые продолжаются для отправки сохраняются в очереди блокировки, ожидая выполнения; если выполняется метод prestartAllCoreThreads() пула потоков, пул потоков заранее создаст и запустит все основные потоки.
maximumPoolSize
Максимальное количество потоков, разрешенных в пуле потоков. Если текущая очередь блокировки заполнена, а задача продолжает отправляться, создается новый поток для выполнения задачи при условии, что текущее количество потоков меньше maxPoolSize;
keepAliveTime
Время выживания потока, когда он простаивает, то есть время, в течение которого поток продолжает существовать, когда нет выполнения задачи; по умолчанию этот параметр полезен только тогда, когда количество потоков больше, чем corePoolSize;
unit
Единица keepAliveTime;
workQueue
Очередь блокировки, используемая для сохранения задач, ожидающих выполнения, и задачи должны реализовывать интерфейс Runable.В JDK предусмотрены следующие очереди блокировки:
1. ArrayBlockingQueue: Ограниченная очередь блокировки на основе структуры массива, сортировка задач по FIFO;
2. LinkedBlockingQuene: очередь блокировки на основе структуры связанного списка, сортировка задач по FIFO, пропускная способность обычно выше, чем у ArrayBlockingQuene;
3. SynchronousQuene: блокирующая очередь, которая не хранит элементы.Каждая операция вставки должна ждать, пока другой поток не вызовет операцию удаления, иначе операция вставки всегда находится в состоянии блокировки, а пропускная способность обычно выше, чем у LinkedBlockingQuene;
4. priorityBlockingQuene: неограниченная очередь блокировки с приоритетом;
threadFactory
Фабрика для создания потоков, с помощью фабрики настраиваемых потоков вы можете установить идентифицирующее имя потока для каждого вновь созданного потока.
handler
Стратегия насыщения пула потоков. Когда блокирующая очередь заполнена и нет простаивающих рабочих потоков, если вы продолжаете отправлять задачи, вы должны принять стратегию для обработки задачи. Пул потоков предоставляет 4 стратегии:
1. AbortPolicy: генерировать исключение напрямую, политика по умолчанию;
2. CallerRunsPolicy: использовать поток, в котором находится вызывающий объект, для выполнения задачи;
3. DiscardOldestPolicy: отменить верхнюю задачу в очереди блокировки и выполнить текущую задачу;
4. DiscardPolicy: отклонить задачу напрямую Конечно, вы также можете реализовать интерфейс RejectedExecutionHandler в соответствии со сценарием приложения и настроить политику насыщения, например запись задач, которые не могут быть обработаны журналами или постоянным хранилищем.
Exectors
Фабричный класс Exectors предоставляет интерфейс инициализации пула потоков, в основном следующим образом:
newFixedThreadPool
Инициализируйте пул потоков с заданным количеством потоков, где corePoolSize == maxPoolSize, и используйте LinkedBlockingQuene в качестве очереди блокировки, но когда в пуле потоков нет исполняемых задач, поток не будет освобожден.
newCachedThreadPool
1. Инициализировать пул потоков, который может кэшировать потоки.Кэш по умолчанию — 60 с.Количество потоков в пуле потоков может достигать Integer.MAX_VALUE, то есть 2 147483647. SynchronousQueue используется внутри в качестве очереди блокировки;
2. В отличие от пула потоков, созданного newFixedThreadPool, newCachedThreadPool автоматически освобождает ресурсы потока, когда время простоя потока превышает keepAliveTime, когда не выполняется ни одна задача.При отправке новой задачи, если нет незанятого потока, будет создан новый поток. выполнить задачу и привести к определенным накладным расходам системы;
Поэтому при использовании этого пула потоков необходимо уделять внимание контролю количества одновременных задач, иначе создание большого количества потоков может привести к серьезным проблемам с производительностью.
newSingleThreadExecutor
В инициализированном пуле потоков есть только один поток. Если поток завершается аварийно, новый поток будет воссоздан для продолжения выполнения задачи. Единственный поток может гарантировать последовательное выполнение отправленных задач, а LinkedBlockingQueue используется внутри в качестве блокирующая очередь.
newScheduledThreadPool
Инициализированный пул потоков может периодически выполнять отправленные задачи в течение заданного времени.В реальных бизнес-сценариях пул потоков можно использовать для периодической синхронизации данных.
Принцип реализации
За исключением специальной внутренней реализации newScheduledThreadPool, несколько других пулов потоков реализованы на основе класса ThreadPoolExecutor.
внутреннее состояние пула потоков
Функция переменной AtomicInteger ctl очень мощная: младшие 29 бит используются для указания количества потоков в пуле потоков, а старшие 3 бита используются для указания рабочего состояния пула потоков:
1. БЕГ:-1<<COUNT_BITS
, то есть старшие 3 бита равны 111, пул потоков в этом состоянии будет получать новые задачи и обрабатывать задачи в очереди блокировки;
2. ВЫКЛЮЧЕНИЕ:0<<COUNT_BITS
, то есть старшие 3 бита равны 000, пул потоков в этом состоянии не будет получать новые задачи, а будет обрабатывать задачи в очереди блокировки;
3. СТОП:1<<COUNT_BITS
, то есть старшие 3 бита равны 001, поток в этом состоянии не будет получать новых задач, а также не будет обрабатывать задачи в очереди блокировки, и будет прерывать запущенные задачи;
4. УБОРКА:2<<COUNT_BITS
, то есть старшие 3 бита равны 010;
5. ПРЕКРАЩЕНО:3<<COUNT_BITS
, то есть старшие 3 бита равны 011;
отправка задачи
Платформа пула потоков предоставляет два способа отправки задач и выбор разных способов в соответствии с различными бизнес-требованиями.
Executor.execute()
Задачи, отправленные через метод Executor.execute(), должны реализовывать интерфейс Runnable, задачи, отправленные таким образом, не могут получить возвращаемое значение, поэтому невозможно определить, успешно ли выполнена задача.
ExecutorService.submit()
Задача, отправленная методом ExecutorService.submit(), может получить возвращаемое значение после выполнения задачи.
выполнение задачи
Когда задача отправляется в пул потоков, как пул потоков обрабатывает задачу?
выполнить реализацию
Конкретный процесс выполнения выглядит следующим образом:
1. Метод workerCountOf получает текущее количество потоков в пуле потоков в соответствии с младшими 29 битами ctl.Если количество потоков меньше, чем corePoolSize, выполнить метод addWorker для создания новой задачи выполнения потока, в противном случае выполнить шаг (2);
2. Если пул потоков находится в состоянии RUNNING и отправленная задача успешно помещена в очередь блокировки, то выполняется шаг (3), иначе выполняется шаг (4);
3. Снова проверьте состояние пула потоков.Если в пуле потоков нет RUNNING и задача успешно удалена из очереди блокировки, выполните метод reject для обработки задачи;
4. Выполните метод addWorker, чтобы создать новый поток для выполнения задачи.Если addWorker не выполняется, выполните метод reject для обработки задачи;
реализация addWorker
Это видно из реализации метода execute: addWorker в основном отвечает за создание новых потоков и выполнение задач, код реализован следующим образом:
Это только первая половина реализации метода addWoker:
1. Определите состояние пула потоков.Если значение состояния пула потоков больше или равно SHUTDOWN, отправленная задача не будет обработана и возвращена напрямую;
2. Определить, является ли создаваемый поток основным потоком с помощью параметра core.Если для ядра установлено значение true, а текущее количество потоков меньше, чем corePoolSize, он выйдет из цикла и начнет создавать новый поток. реализация следующая:
Рабочий поток пула потоков реализуется классом Worker.Под гарантией блокировки ReentrantLock экземпляр Worker вставляется в HashSet и запускается поток в Worker.Класс Worker устроен следующим образом:
1. Унаследовав класс AQS, он может легко реализовать операцию прерывания рабочего потока;
2. Реализует интерфейс Runnable, который может выполняться как задача в рабочем потоке;
3. Текущая представленная задача firstTask передается в качестве параметра конструктору Worker;
Из реализации метода построения класса Worker видно, что когда фабрика потоков создает поток потока, он передает в качестве параметра this самого экземпляра Worker.Когда метод start выполняется для запуска потока поток, он, по сути, выполняет метод runWorker Worker.
реализация runWorker
Метод runWorker является ядром пула потоков:
1. После запуска потока блокировка снимается с помощью метода разблокировки, а состояние AQS устанавливается в 0, что указывает на то, что операция прервана;
2. Получить первую задачу firstTask, выполнить метод run задачи, но перед выполнением задачи будет выполнена операция блокировки, а после выполнения задачи блокировка будет снята;
3. До и после выполнения задачи вы можете настроить методы beforeExecute и afterExecute согласно бизнес-сценарию;
4. После завершения выполнения firstTask ожидающая задача получается из очереди блокировки через метод getTask, если задачи в очереди нет, то метод getTask будет заблокирован и приостановлен, и не будет занимать ресурсы процессора;
реализация getTask
Вся операция getTask выполняется в режиме вращения:
1. workQueue.take: если блокирующая очередь пуста, текущий поток будет приостановлен и ожидает, когда задача будет добавлена в очередь, поток будет разбужен, а метод take вернет задачу и выполнит ее;
2. workQueue.poll: если в течение времени keepAliveTime в очереди блокировки по-прежнему нет задачи, вернуть null;
Поэтому потоки, реализованные в пуле потоков, всегда могут выполнять задачи, отправленные пользователями.
Реализация Future и Callable
Задача, отправленная методом ExecutorService.submit(), может получить возвращаемое значение после выполнения задачи.
В реальных бизнес-сценариях Future и Callable в основном появляются парами, Callable отвечает за получение результатов, а Future отвечает за получение результатов.
1. Интерфейс Callable похож на Runnable, за исключением того, что Runnable не имеет возвращаемого значения.
2. Помимо возврата обычных результатов для задач Callable, при возникновении исключения также будет возвращено исключение, то есть Future может получать различные результаты асинхронного выполнения задач;
3. Метод Future.get блокирует основной поток до тех пор, пока не будет выполнена задача Callable;
отправить реализацию
Вызываемая задача, отправленная методом submit, будет инкапсулирована в объект FutureTask.
FutureTask
1. FutureTask имеет разные состояния на разных этапах и инициализируется в NEW;
2. Класс FutureTask реализует интерфейс Runnable, так что FutureTask может быть отправлен в пул потоков для выполнения с помощью метода Executor.execute, и, наконец, будет выполнен метод run FutureTask;
Реализация FutureTask.get
Внутренне основной поток блокируется методом awaitDone Конкретная реализация выглядит следующим образом:
1. Если основной поток прерывается, генерируется исключение прерывания;
2. Оцените текущее состояние FutureTask, если оно больше, чем COMPLETING, это означает, что задача была выполнена, а затем вернуться напрямую;
3. Если текущее состояние равно COMPLETING, это означает, что задача была выполнена, в это время основному потоку нужно только отказаться от ресурсов процессора через метод yield и дождаться перехода состояния в NORMAL;
4. Инкапсулировать текущий поток через класс WaitNode и добавить его в список ожидающих через UNSAFE;
5. Окончательно подвесить нить через парк или паркнанос из LockSupport;
Реализация FutureTask.run
Метод FutureTask.run выполняется в пуле потоков, а не в основном потоке 1. Выполнением метода вызова задачи Callable 2. При успешном выполнении вызова результат сохраняется заданным методом 3. При наличии является исключением при выполнении вызова, затем сохраните исключение через setException;
set
setException
В методах set и setException статус FutureTask изменяется через UnSAFE, а метод finishCompletion выполняется для уведомления основного потока о выполнении задачи;
finishCompletion
1. При выполнении метода get класса FutureTask основной поток будет инкапсулирован в узел WaitNode и сохранен в списке ожидающих;
2. После выполнения задачи FutureTask установить значение ожидающих через UNSAFE, и разбудить основной поток через метод unpark класса LockSupport;
я ноль изобретательности