предисловие
Я думал, что пул потоков довольно прост (обычно используется, и принцип тоже был проанализирован), в этот раз я хочу написать пул потоков сам, чтобы понять его более глубоко; но я обнаружил, что не думал об этом, когда Я приземлился на детали в процессе написания, так просто. После сравнения с исходным кодом я действительно должен восхищатьсяDoug Lea
.
Я думаю, что большинство людей идут прямо кjava.util.concurrent.ThreadPoolExecutor
При просмотре исходного кодаAQS
содержания, поэтому не так просто разобраться в конкретных деталях.
Вместо того, чтобы анализировать исходный код один за другим, лучше реализовать упрощенную версию самостоятельно.Конечно, упрощенная версия не означает отсутствие функций, и это необходимо для обеспечения согласованности основной логики.
Отсюда и цель этой статьи:
Напишите полностью оборудованный пул потоков самостоятельно, и в то же время вы поймете принцип работы пула потоков и как разумно использовать пул потоков в своей работе.
Прежде чем начать заново, рекомендуется, чтобы друзья, которые не очень хорошо знакомы с пулами потоков, ознакомились с этими статьями:
Здесь я перехватил часть содержания, может быть, я могу зарыть предзнаменование (яма).
Подробности смотрите по этим двум ссылкам.
- Как изящно использовать и понимать пулы потоков
- Некоторые детали, которые вы не можете пропустить в пуле потоков
Из-за ограничений по объему данную статью можно разделить на две части.
Создать пул потоков
Теперь войдите в тему, создайте новуюCustomThreadPool
класс, это работает так:
Проще говоря, это кидать задачи в пул потоков, а потерянные задачи будут буферизоваться в очереди, пул потоков хранит фактически по однойThread
, они будут продолжать получать выполнение задачи из только что буферизованной очереди.
Процесс по-прежнему довольно прост.
Давайте посмотрим на эффект нашего самостоятельно созданного пула потоков:
Инициализируется пул потоков с 3 ядрами, 5 максимальными потоками и 4 размером очереди.
Сначала в нее было сброшено 10 задач.Так как размер блокирующей очереди равен 4, а максимальное количество потоков равно 5, то в итоге будет создано 5 потоков (верхний предел) из-за отсутствия буферизации в очереди.
Через какое-то время задачи не отправляются (sleep
) автоматически масштабируется до трех потоков (гарантированно не меньше, чем количество основных потоков).
Конструктор
Посмотрим, как это реализовано.
Ниже приведен конструктор этого пула потоков:
Будут следующие основные параметры:
-
miniSize
Минимальное количество потоков, эквивалентноеThreadPool
Количество основных потоков в . -
maxSize
Максимальное количество потоков. -
keepAliveTime
Время активности потока. -
workQueue
блокирующая очередь. -
notify
интерфейс уведомлений.
примерно то же самоеThreadPool
Параметры в те же, и эффект аналогичен.
Следует отметить, что один из инициализированныхworkers
Переменные-члены:
/**
* 存放线程池
*/
private volatile Set<Worker> workers;
public CustomThreadPool(int miniSize, int maxSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, Notify notify) {
workers = new ConcurrentHashSet<>();
}
workers
это поток, работающий в конечном пуле потоков хранилища, вj.u.c
исходный код представляет собойHashSet
Поэтому все операции над ним нужно заблокировать.
Для простоты я определил потокобезопасныйSet
называетсяConcurrentHashSet
.
На самом деле принцип очень прост иHashSet
похоже наHashMap
для хранения данных используйте егоkey
неповторяемые функции для достиженияset
, просто здесьHashMap
безопасен ли параллелизмConcurrentHashMap
быть реализованным.
Это гарантирует, что пишет и делеции к этому безрезультатно.
Однако из-заConcurrentHashMap
изsize()
Функция не точная, поэтому использую здесь отдельнуюAtomicInteger
для подсчета размера контейнера.
Создать основной поток
При добавлении задачи в пул потоков на самом деле нужно сделать довольно много вещей, самое главное — создать поток и сохранить его в пуле потоков.
Конечно, мы не можем создавать неограниченное количество потоков, иначе было бы бессмысленно использовать пулы потоков. тогдаminiSize maxSize
Эти два параметра имеют свое значение.
Но на каком этапе вступают в игру эти два параметра? Это то, что должно быть ясно в первую очередь.
Из этой блок-схемы видно, что первым шагом является определение того, превышает ли оно количество основных потоков, и если нет, то его создание.
В сочетании с кодом можно обнаружить, что при выполнении задачи он будет оценивать, больше ли оно, чем количество основных потоков, чтобы создавать потоки.
worker.startTask()
Часть выполнения задачи будет проанализирована позже.
здесьminiSize
Поскольку он будет использоваться в многопоточных сценариях, он также используетсяvolatile
Ключевые слова для обеспечения видимости.
буфер очереди
В сочетании с приведенной выше блок-схемой вторым шагом является определение того, может ли очередь хранить задачи (независимо от того, заполнена ли она).
Приоритет будет сохранен в очереди.
до самого верха
После сбоя записи он определит, превышает ли размер текущего пула потоков максимальное количество потоков, и если нет, продолжит создание потоков для выполнения.
В противном случае выполнение попытается заблокировать очередь записи (j.u.c
Политика отказа будет применяться здесь)
Вышеупомянутые шаги такие же, как на блок-схеме, так что вы видите какие-либо ямы?
всегда будь осторожен
Из двух шагов приведенной выше блок-схемы видно, что прямоесоздать новую тему.
Этот процесс относится к промежуточномуЗапись непосредственно в очередь блокировкиНакладные расходы очень велики, в основном по следующим двум причинам:
- Создание потока будет заблокировано.Хотя функция записи ConcurrentHashMap наконец-то используется, возможность блокировки все же есть.
- Будет создан новый поток, а создание потока также требует обращения к API операционной системы, что дорого.
Поэтому в идеале мы должны избегать этих двух шагов и пытаться позволить задачам, брошенным в пул потоков, попасть в очередь блокировки.
выполнять задачи
Задача добавлена, как выполняется?
упоминается при создании задачиworker.startTask()
функция:
/**
* 添加任务,需要加锁
* @param runnable 任务
*/
private void addWorker(Runnable runnable) {
Worker worker = new Worker(runnable, true);
worker.startTask();
workers.add(worker);
}
То есть он будет создан при создании потока для выполнения задачи.Worker
объект, используя егоstartTask()
метод выполнения задачи.
Итак, давайте посмотримWorker
Как выглядит объект:
По сути, это также и сам поток, и задачи, которые необходимо выполнить, хранятся в переменных-членах.task
место.
Главное - выполнить заданиеworker.startTask()
этот шаг.
public void startTask() {
thread.start();
}
На самом деле это работаетworker
сама ветка, см. нижеrun
метод.
- Первым шагом является выполнение задачи, переданной при создании потока (
task.run
), а затем продолжит получать задачи из очереди на выполнение до тех пор, пока не перестанут получаться новые задачи. - После выполнения задачи встроенный счетчик будет равен -1, что удобно для оповещения о выполнении всех задач.
- Рабочий поток завершается после того, как он не может получить задачу и должен освободить себя из пула потоков (
workers.remove(this)
).
Получить задачи из очереди
фактическиgetTask
Это также очень важный метод, который инкапсулирует получение задач из очереди и перезапускает потоки, которые не нужно поддерживать в рабочем состоянии.
Очевидно, что основная роль заключается в получении задач из очереди, но есть два места, на которые стоит обратить внимание:
- Когда количество потоков превышает количество основных потоков, задача должна быть получена из очереди через время поддержания активности при получении задачи; если задача не может быть получена, очередь должна быть пустой, поэтому возврат
null
позже в вышеrun()
Этот поток будет завершен в середине, таким образом, будет достигнута цель повторного использования потока, что является эффектом, который мы продемонстрировали ранее. - Блокировка здесь обязательна, причина блокировки в том, что здесь обязательно будет параллелизм, а отсутствие блокировки приведет к
workers.size() > miniSize
Условие выполняется несколько раз, в результате чего поток полностью перезапускается.
закрыть пул потоков
Наконец, давайте поговорим о закрытии потока;
Возьмем тестовый код только что в качестве примера.Если мы не закроем поток после отправки задачи, мы обнаружим, что программа не завершится даже после выполнения задачи.
Только что из исходного кода на самом деле очень легко увидеть, что причина того, что вы не выходите, заключается в том, чтоWorker
Поток всегда будет блокироватьсяtask = workQueue.take();
При этом, даже если нить уменьшится, она не будет меньше количества основных нитей.
Это также может быть доказано стеком:
Здесь заблокировано ровно три темы.
И закрытие потока обычно имеет следующие два вида:
- Немедленное отключение: после выполнения метода отключения, независимо от текущего состояния работы пула потоков, он будет немедленно остановлен по всей плате, что приведет к потере задач.
- Не принимает новые задачи и выходит из пула потоков, дождавшись завершения выполнения существующих задач.
закрыть сейчас
Давайте посмотрим на первый立即关闭
:
/**
* 立即关闭线程池,会造成任务丢失
*/
public void shutDownNow() {
isShutDown.set(true);
tryClose(false);
}
/**
* 关闭线程池
*
* @param isTry true 尝试关闭 --> 会等待所有任务执行完毕
* false 立即关闭线程池--> 任务有丢失的可能
*/
private void tryClose(boolean isTry) {
if (!isTry) {
closeAllTask();
} else {
if (isShutDown.get() && totalTask.get() == 0) {
closeAllTask();
}
}
}
/**
* 关闭所有任务
*/
private void closeAllTask() {
for (Worker worker : workers) {
//LOGGER.info("开始关闭");
worker.close();
}
}
public void close() {
thread.interrupt();
}
Легко видеть, что в конце концов он проходит через все потоки в пуле потоков.worker
Потоки выполняют свои функции прерывания один за другим.
Давайте проверим это:
Можно обнаружить, что три задачи, брошенные позже, на самом деле не выполняются.
закрыть, когда закончите
иизящное завершение работыэто не то же самое:
/**
* 任务执行完毕后关闭线程池
*/
public void shutdown() {
isShutDown.set(true);
tryClose(true);
}
Здесь он вынесет еще одно суждение и прервет поток только после выполнения всех задач.
В то же время, когда поток необходимо перезапустить, он попытается закрыть поток:
Посмотрим реальный эффект:
перерабатывающая нить
Вышеупомянутая более или менее переработка потоков, по сути, сводка состоит из следующих двух пунктов:
- После выполнения
shutdown/shutdownNow
метод установит состояние пула потоков в закрытое состояние, так что до тех пор, покаworker
Потоки Thring вернутся к пустым, когда вы получите задачу из очереди, что приведет кworker
Нить перерабатывается. - Как только размер пула потоков превысит количество основных потоков, время поддержания активности будет использоваться для получения задач из очереди, поэтому, если оно не может быть получено, верните
null
Запускается рециркуляция.
Но если наша очередь достаточно велика, чтобы количество потоков не превышало количество основных потоков, это не вызовет перезапуска.
Например, здесь я настраиваю размер очереди на 10, чтобы задачи накапливались в очереди, а не создавалось пятьworker
нить.
Так всегдаThread-1~3
Эти три потока повторно планируют задачи.
Суммировать
На этот раз реализовано большинство основных функций пула потоков, и я считаю, что если вы прочитаете его и снова коснетесь, у вас будет другое понимание пула потоков.
Подводя итог текущему содержанию:
- Пул потоков и размер очереди должны быть спроектированы разумно, а задачи должны получаться и выполняться из очереди, насколько это возможно.
- Используйте с осторожностью
shutdownNow()
Метод закрывает пул потоков, что приведет к потере задачи (если это не разрешено бизнесом). - Если задач много, время выполнения потока мало и может быть увеличено
keepalive
значение, так что поток не перерабатывается в максимально возможной степени, чтобы поток можно было использовать повторно.
В то же время в следующий раз я поделюсь некоторыми новыми функциями пула потоков, такими как:
- Выполнить поток с возвращаемым значением.
- Как насчет обработки исключений?
- Как я буду уведомлен, когда все задачи будут выполнены?
Весь исходный код этой статьи:
Ваши лайки и репост - лучшая поддержка для меня