Сегодня поговорим о "динамическом обновлении" пула потоков.

задняя часть опрос
Сегодня поговорим о "динамическом обновлении" пула потоков.

Адрес GitHub проекта динамического пула потоков (Dynamic-ThreadPool),Нажмите, чтобы посетить

Пул потоков — этоИнструменты для объединения идей для управления потоками. Использование пула потоков может

#setCorePoolSize

  1. new corePoolSize
  2. corePoolSizeдляnew corePoolSize
  3. Определите, больше ли рабочих потоков в пуле потоков, чемnew corePoolSize, если условие истинно, выполнение прервет избыточный бездействующий поток
  4. Если вышеуказанные условия не выполняются, судьяcorePoolSizeЭто меньше, чемnew corePoolSize, если меньше, необходимо создать новый основной поток

Что касается четвертого шага, есть небольшое знание, оптимизация, сделанная автором пула потоков, чтобы гарантировать, что ресурсы потоков не будут потрачены впустую.

При выполнении четвертого шага через аннотацию я знаю, что не знаю, сколько потоков нужно создать.Чтобы гарантировать, что ресурсы потока не будут потрачены впустую, здесь будет основыватьсяworkQueue#sizeи дельта для расчета количества потоков k, которые необходимо создать

Math#minОн вернет меньшее из двух значений. Xiaobian думает о трех ситуациях. Давайте предположим здесь.

  1. ПредположениеworkQueue#size == 0, то k также равно нулю, что доказывает отсутствие заблокированной задачи, которую необходимо выполнить,k-- > 0Выражение не выполняется, оно не будет выполнено#addWorker
  2. ПредположениеworkQueue#size > 0 && < delta, в это время в очереди задач есть задачи, которые нужно выполнить.k-- > 0Выражение установлено, и общий случай создастworkQueue#sizeНовый основной поток, обычно другие потоки в пуле потоковworkQueueКогда задача будет очищена, она выскочит из процесса создания
  3. ПредположениеworkQueue#size > 0 && > delta, который создает не более дельта-новых основных потоков

Вот и вдохновение для нас,Написание кода — это не просто уборка снега, но подумать о том, есть ли возможности для улучшения кода с глобальной точки зрения

Я спросил себя,Основные потоки не будут переработаны, если нет задач, если количество основных потоков установлено слишком большим, это не пустая трата ресурсов после пикового периода, вам нужно самостоятельно корректировать количество обратно?

Мы можем контролировать это, установив параметр при создании пула потоков.allowsCoreThreadTimeOutПо умолчанию False, т. е. основные потоки остаются активными, даже когда они простаивают. Если True, основные потоки используютkeepAliveTimeтайм-аут ожидания работы

Динамическая яма с основной резьбой

Следует отметить один очень важный момент,Может произойти сбой, если установлено количество основных потоков. Например, максимальное количество потоков равно 5, а количество активных потоков в текущем пуле потоков равно 5. В настоящее время, если для количества основных потоков установлено значение 10, это не должно действовать. Почему?

Предположим, что состояние выполнения пула потоков следующее: основной поток — 3, максимальный поток — 5 и активный поток в пуле потоков — 5. В это время вызовите#setCorePoolSizeДинамически установить количество основных потоков на 10

После выполнения вышеуказанных операций вызовите#executeИнициировать выполнение задачи в пул потоков, внутренняя логика обработки следующая

  1. Судя по тому, что количество ядер в текущем пуле потоков равно 10, а текущий рабочий поток равен 5, тогдаположить начало#addWorkerдобавить тему
  2. #addWorkerбудуКоличество рабочих потоков + 1, на этот раз этот рабочий процесс не считается добавленным в пул потоков.
  3. Затем создается класс-оболочка потока Worker и выполняется Start, поскольку сам Worker содержит объект потока,Start также является потоком операций для выполнения задач.
  4. получить задание#getTaskОдин шаг заключается в том, что динамическая модификация количества потоков ядра не вступает в силу, то есть, когда выполнение задачи в очереди действительно получается, она будет выполняться первой.Определите, превышает ли текущее количество рабочих потоков максимальное количество потоков.
  5. Поскольку выше указана операция +1 для рабочих потоков, количество рабочих потоков в пуле равно 6, и устанавливается условное выражение оценки.Количество рабочих потоков для выполнения -1 операций,и уничтожить этого работника

Вот пул потоков для получения задач очереди#getTaskФрагмент кода, давайте посмотрим примерно

Теперь, когда вы знаете, в чем проблема, как решить проблему сбоя динамической настройки?

На самом деле все просто,То есть при настройке потока ядра вы можете установить максимальное количество потоков одновременно.. Динамическая настройка действительна до тех пор, пока количество рабочих потоков не превышает максимальное количество потоков.

Этот раздел относится кКак установить параметры пула потоков? Meituan дал ответ, который шокировал интервьюера

MaximumPoolSize (максимальное количество потоков)

Указывает максимальное количество потоков, которые можно создать в пуле потоков. пройти через#setMaximumPoolSizeСбросить максимальное количество потоков, логика модификации следующая

Исходный код установки максимального количества потоков в пуле потоков относительно прост и не содержит сложной логики, процесс выглядит следующим образом

  1. судитьnew maximumPoolSizeВерны ли параметры, если условия не выполняются, выбрасывается исключение для завершения процесса
  2. настраиватьnew maximumPoolSizeЗаменить максимальное количество потоков в пуле потоков
  3. Если рабочий поток пула потоков больше, чемnew maximumPoolSize, Затем избыточный Worker инициировал процесс прерывания

ThreadFactory (фабрика нитей)

Функция фабрики потоков заключается в создании потока для пула потоков, и вы можете установить собственный поток при создании потока.префикс имени (важно), Установите, следует ли демонировать поток, приоритет потока и способ обработки исключения потока.

Хотя фабрики потоков могут сбрасывать параметры после запуска,Но не рекомендуется. Поскольку уже запущенный поток не будет уничтожен, если ранее запущенный поток не будет уничтожен, очень вероятно, что в пуле потоков появятся два потока с разной семантикой.

В примере кода создается пул потоков и указывается имя префикса фабрики потоков. Запустите задачу в пуле потоков, чтобы в нем была фабрика перед созданием потока.

Затем создайте новую фабрику после потока, замените внутреннюю фабрику пула потоков и запустите задачу, чтобы создать максимальное количество потоков, мы можем просмотреть журнал

Неудивительно, что потоки, созданные двумя фабриками потоков, борются друг с другом, и если не предпринимать никаких специальных действий, это продолжается вечно. Итак, подводя итог вышесказанному, не рекомендуется модифицировать фабрику потоков в бизнесе, иначе это будут ваши собственные люди~

Другие параметры

Остальные два динамически настраиваемых параметра относительно просты, поэтому я не буду приводить примеры по одному, вы можете просто посмотреть исходный код.

  • KeepAliveTime
  • RejectedExecutionHandler

Также есть очень важный параметр, который необходимо динамически обновлять, т.размер очереди блокировки. Некоторые друзья могут спросить, почему бы напрямую не заменить очередь блокировки?

На самом деле можно напрямую заменить блокирующую очередь, но если ее заменить напрямую, это вызовет много проблем.Для самого прямого примера,С задачами укладки в исходной очереди справиться непросто., нет необходимости усложнять вещи, которые изменяют способность решать проблему. Таким образом, при динамическом выполнении вместо замены рассматривается только размер очереди блокировки.

здесь сLinkedBlockingQueueНапример, очередь не предоставляет метод для изменения размера очереди в исходном коде, потому что переменная, представляющая размер очередиcapacityИзменено с последним ключевым словом

Вы можете рассмотреть, на основе этой окончательной модификации, как расширить модификацию емкости блокирующей очереди.

динамическая очередь блокировки

Пул потоковМодель «производитель-потребитель», кэшировать задачи через очередь блокировки,Рабочий поток получает задачи из очереди блокировки. Интерфейс рабочей очереди — BlockingQueue.Когда очередь пуста, поток, получивший элемент,Подождите, пока очередь не станет пустой, когда очередь будет заполнена, поток, хранящий элемент,Ожидание освобождения очереди

图片来源美团技术博客

Очередь блокировки динамически устанавливает размер очереди, и существует множество способов работы. Вы можете добавить некоторые расширения в соответствии с исходной логикой или переписать определенные методы.Реализация не исправлена. Ниже приведены несколько схем, которые могут реализовать функцию динамической блокировки очереди.

  1. Скопируйте реализацию исходного кода очереди блокировки, добавьте метод #set, чтобы сделать переменную емкости
  2. Наследовать очередь блокировки и переписать основной метод на исходной основе.
  3. Наследовать очередь блокировки и динамически изменять емкость путем отражения

Если вам не нужно переписывать исходную очередь блокировки для получения дополнительных функций,я предпочитаю первое, код будет более лаконичным и стабильным. Ниже сLBQНапример

Копировать очередь блокировки

Этот метод прост и груб, прямоLinkedBlockingQueueСкопируйте код и измените его на новое имяResizableCapacityLinkedBlockIngQueue, затем поставьтеcapacityмодифицированныйfinalКлючевое слово удаляется, а#setCapacityметод

Переопределить основные методы

Большинство блоггеров в Интернете используют вышеописанный способ копирования очередей блокировки, позже я обсуждал динамику очередей блокировки с двумя большими парнями, а потом нашел версию иностранного программиста с GitHub, управляющую очередями блокировки с помощью семафоров. из,«Реализация семафора GitHub LinkedBQ»

Очередь содержит блокировкуРазмер очереди и самореализующиеся семафоры. Каждый раз, когда изменяется размер очереди блокировки, семафор также увеличивается или уменьшается.

Отражение изменяет емкость

Функцию динамического изменения очереди блокировки также можно реализовать с помощью отражения. Перед модификацией я рассматривал этот способБудут ли проблемы с безопасностью потоков?, используя группу потоков Jmeter и изменяя емкость поочередно, было проведено несколько раундов испытаний, и заключение теста былоНет проблем с безопасностью потоков

Чтобы изменить размер очереди блокировки с помощью отражения,Я не рекомендую. Во-первых, этот метод жесткого кодирования не является элегантным, а во-вторых, его совместимость с последующими версиями JDK не гарантируется на 100%.

Принимая во внимание все, хотя способность модификации отражения может достичь желаемого эффекта, делать это не рекомендуется.

В заключение

В статье редактор обобщает распространенную ситуацию с использованием пулов потоков в отрасли.

  1. пул потоковПараметр не может выполнять быструю динамическую настройку
  2. Без разумного контроля, что приводит кпотерять инициативутак же какЭффективно предотвращать возможные проблемы

Основываясь на первой проблеме настройки динамических параметров, я написал первую статью из серии динамических пулов потоков. Да, позже будет больше статей о динамическом пуле потоков, включая, помимо прочего, следующие идеи.

  1. Как реализовать мониторинг пула потоков в реальном времени,Исторические данные индикатораКак агрегировать показы администратора
  2. Стыковка тревожных сообщений с разных платформ для достиженияНастраиваемые элегантные эффекты множественного приема
  3. Можно ли использовать динамический пул потоков?Конфигурационный центр сравнительного анализа, подключитесь к серверу для управления модификацией параметров триггера

Эти функции на самом деле реализуются в пуле динамического потока группы США.Naihe не является открытым исходным кодом. И у моего собственного проекта есть болевые точки в этом отношении, так что толькоповторить колесо

Первоначально на написание версии, основанной на промежуточном программном обеспечении и способнойЗавершите динамический пул потоков стыковочной платформы.. Может быть, это потому, что это слишком просто, я думаю, что чего-то не хватает. Позже, ночью, когда я не мог уснуть, мой мозг был широко открыт. Разве это не сработало бы, если бы я не полагался на промежуточное ПО?

Тогда естьПроект динамического пула потоков DTP (Dynamic-ThreadPool), и назовите элемент groupId, начинающийся с io. В настоящее время DTP — это всего лишь проект, в котором редактор может использовать свои способности к разработке и осведомленность о продукте, а ежедневное время разработки кода сосредоточено на выходных и субботах.

Проект DTP разделен на две основные части:Сервер и SpringBoot Starter, серверная сторона служит реестром всех клиентских пулов потоков и хранилищем данных исторического индикатора для вызова и отображения консоли, а стартер будет использоваться клиентской стороной в качестве Jar для взаимодействия с серверной стороной.

В настоящее время в DTP реализованы серверная и клиентская части.Взаимодействие с динамическим обновлением параметров, ссылка на реализацию исходного кода взята из предыдущей версии Nacos 2.0.Механизм длительного опроса и прослушивания событий

Теперь, когда вы решили не полагаться на ПО промежуточного слоя, проблема очевидна.Одноточечные проблемы в онлайн-среде. Поскольку после развертывания кластера данные не могут распространяться и транслироваться между узлами.Обратитесь к Eureka за моделью распределенной точки доступа.

В итоге после просмотра проекта я почувствовал себя довольно хорошо, а мои трудолюбивые друзья заказали ⚡️ Звезда, желаю тебе добра.

Код все еще обновляется, адрес исходного кода:GitHub.com/longtai94/…