Эта статья участвовала в "Проект «Звезда раскопок»«Выиграйте креативные подарочные наборы и бросьте вызов творческим поощрениям.
Здравствуйте, я криворукий.
Когда я несколько дней назад болтал с большим парнем, он сказал, что недавно мониторил пул потоков и только что закончил разработку функции динамической настройки.
Напоминает мне статью, которую я писал об этом раньше, поэтому я нашел ее и посмотрел:"Как установить параметры пула потоков? Мейтуан дал ответ, который шокировал интервьюера. 》
Потом мне указали на проблему, и я хорошенько над ней подумал, и мне показалось, что действительно осталась дыра.
Чтобы лучше описать эту яму, позвольте мне сначала рассмотреть некоторые ключевые моменты динамической настройки пулов потоков.
Во-первых, зачем вам динамически настраивать параметры пула потоков?
Потому что с развитием бизнеса может возникнуть ситуация, когда пул потоков начинает хватать, но постепенно становится полным.
Это приведет к отклонению последующих отправленных задач.
Не существует универсальной схемы конфигурации, и соответствующие параметры должны плавать вместе с системой.
Таким образом, мы можем отслеживать пул потоков в нескольких измерениях, например, одним из измерений является мониторинг использования очереди.
Когда использование очереди превышает 80%, отправляется раннее предупреждающее сообщение, чтобы напомнить соответствующему ответственному лицу о бдительности.Вы можете перейти на соответствующую фоновую страницу управления, чтобы настроить параметры пула потоков, чтобы предотвратить отклонение задачи.
Когда кто-то спросит вас, как настроить различные параметры пула потоков в будущем, вы сначала запомните ответ на статью из восьми частей, которая разделена на интенсивные операции ввода-вывода и интенсивные ресурсы ЦП.
Плюс один: Но, помимо этих решений, я использую еще один набор решений, когда реально решаю проблему».
Затем повторите слова выше.
Итак, какие параметры может изменять пул потоков?
Обычно количество основных потоков и максимальное количество потоков можно регулировать.
Пул потоков также напрямую предоставляет соответствующий метод set:
Но на самом деле есть еще один ключевой параметр, который нужно регулировать, это длина очереди.
Да, кстати, чтобы объяснить, очередь по умолчанию, используемая в этой статье,LinkedBlockingQueue
.
Его емкость является окончательно измененной, что означает, что она не может быть изменена после того, как она указана:
Поэтому настройка длины очереди требует небольшого мозгового штурма.
Что касается того, как обойти финальное ограничение, я расскажу об этом позже, позвольте сначала дать вам код.
Обычно я не публикую большие куски кода, но почему я разместил его в этот раз?
Потому что я обнаружил, что моя предыдущая статья не была опубликована, а код, который я написал раньше, не знал, куда деваться.
Итак, я снова горько постучал...
import cn.hutool.core.thread.NamedThreadFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadChangeDemo {
public static void main(String[] args) {
dynamicModifyExecutor();
}
private static ThreadPoolExecutor buildThreadPoolExecutor() {
return new ThreadPoolExecutor(2,
5,
60,
TimeUnit.SECONDS,
new ResizeableCapacityLinkedBlockingQueue<>(10),
new NamedThreadFactory("why技术", false));
}
private static void dynamicModifyExecutor() {
ThreadPoolExecutor executor = buildThreadPoolExecutor();
for (int i = 0; i < 15; i++) {
executor.execute(() -> {
threadPoolStatus(executor,"创建任务");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
threadPoolStatus(executor,"改变之前");
executor.setCorePoolSize(10);
executor.setMaximumPoolSize(10);
ResizeableCapacityLinkedBlockingQueue<Runnable> queue = (ResizeableCapacityLinkedBlockingQueue)executor.getQueue();
queue.setCapacity(100);
threadPoolStatus(executor,"改变之后");
}
/**
* 打印线程池状态
*
* @param executor
* @param name
*/
private static void threadPoolStatus(ThreadPoolExecutor executor, String name) {
BlockingQueue<Runnable> queue = executor.getQueue();
System.out.println(Thread.currentThread().getName() + "-" + name + "-:" +
"核心线程数:" + executor.getCorePoolSize() +
" 活动线程数:" + executor.getActiveCount() +
" 最大线程数:" + executor.getMaximumPoolSize() +
" 线程池活跃度:" +
divide(executor.getActiveCount(), executor.getMaximumPoolSize()) +
" 任务完成数:" + executor.getCompletedTaskCount() +
" 队列大小:" + (queue.size() + queue.remainingCapacity()) +
" 当前排队线程数:" + queue.size() +
" 队列剩余大小:" + queue.remainingCapacity() +
" 队列使用度:" + divide(queue.size(), queue.size() + queue.remainingCapacity()));
}
private static String divide(int num1, int num2) {
return String.format("%1.2f%%", Double.parseDouble(num1 + "") / Double.parseDouble(num2 + "") * 100);
}
}
После того, как вы вставите этот код, вы обнаружите, что у вас нетNamedThreadFactory
этот класс.
Не беда, я использую тот, что в hutool toolkit.Если у вас его нет, то можете его настроить или не передавать в конструкторе.Это не суть, это не большая проблема.
Большая проблема в том,ResizeableCapacityLinkedBlockingQueue
эта вещь.
Как это произошло?
Упоминалось в предыдущей статье:
Просто вставьте копию LinkedBlockingQueue, измените имя, удалите последний модификатор параметра Capacity и укажите соответствующий метод получения/установки.
По ощущениям очень просто, он может менять динамические параметры мощности.
Однако, когда я писал это в то время, мне казалось, что там была яма.
Ведь если это так просто, то почему официалы оформляют его как окончательный?
Где яма?
оLinkedBlockingQueue
Принцип работы здесь обсуждаться не будет, нужно проговаривать содержание восьминогого текста.
Суть в том, что в упомянутом выше сценарии, если я напрямую удалю модификатор final и предоставлю соответствующий ему метод get/set, где будет pit.
Прежде всего, если нет специальных инструкций, исходный код в этой статье — версия JDK 8.
Давайте посмотрим на метод put:
В основном смотрите на эту обрамленную часть.
Емкость в известном нам состоянии while представляет собой текущую емкость.
Так что же такое count.get?
количество элементов в текущей очереди.
count.get == capacity означает, что очередь заполнена, затем выполнитеnotFull.await()
Приостановить текущую операцию размещения.
Давайте проверим это на простом примере:
Подайте заявку на очередь длиной 5, а затем вызовите метод put в цикле.Когда очередь заполнена, программа блокируется.
Сбрасывая текущий поток, мы можем узнать, что основной поток действительно заблокирован в том месте, которое мы проанализировали ранее:
Итак, вы думаете об этом. Если я изменю вместимость очереди на другое значение, будет ли об этом известно этому месту?
Он не может этого воспринять, он ждет, пока проснутся другие.
Теперь давайте заменим очередь моей измененной очередью для проверки.
Идея программы проверки, приведенной ниже, заключается в выполнении операции размещения очереди в подпотоке до тех пор, пока емкость не будет заполнена и заблокирована.
Затем основной поток изменяет емкость на 100.
В приведенной выше программе эффект, которого я хочу добиться, заключается в том, что при расширении емкости дочерний поток не должен продолжать блокироваться.
Но после предыдущего анализа мы знаем, что дочерний поток здесь не разбудится.
Итак, вывод такой:
Дочерний поток все еще заблокирован, поэтому он не оправдывает ожиданий.
Так что же нам делать в это время?
Конечно, взять на себя инициативу проснуться.
То есть изменить логику setCapacity:
public void setCapacity(int capacity) {
final int oldCapacity = this.capacity;
this.capacity = capacity;
final int size = count.get();
if (capacity > size && size >= oldCapacity) {
signalNotFull();
}
}
Основная логика состоит в том, чтобы найти, что если емкость расширена, то вызовите ееsignalNotFull
метод:
Разбудите поток, который был припаркован.
Если вы видите это и думаете, что немного запутались, значит, вы не знаете, что делают эти штуки LinkedBlockingQueue:
Поторопитесь и потратьте час, чтобы пополнить очки знаний, связанные с LinkedBlockingQueue. Подобные вещи часто проверяют на собеседованиях.
Хорошо, давайте вернемся к этому.
После изменения нашего пользовательского метода setCapacity и повторного запуска программы появляется ожидаемый результат:
В дополнение к изменению метода setCapacity, я случайно вызвал еще один ответ во время написания статьи:
После вызова метода setCapacity снова вызовите метод put, и вы также сможете получить ожидаемый результат:
Когда мы наблюдаем за методом put, мы видим, что правда та же самая:
После вызова метода setCapacity снова вызовите метод put.Поскольку условие кода с пометкой ① не выполняется, он не будет заблокирован.
Таким образом, вы можете перейти к месту, отмеченному ②, чтобы разбудить заблокированный поток.
Следовательно, он также достигает цели изменения длины очереди и замаскированного пробуждения заблокированной задачи.
В конечном итоге необходимо выполнить операцию пробуждения.
Ну какая элегантность?
Это должен быть первый способ инкапсулировать логику в методе setCapacity и работать более элегантно.
Второй способ в большей степени применим к ситуации "сам не знаешь зачем, все равно писать такие программы - это нормально".
Теперь мы знаем, в чем заключается проблема динамической настройки длины очереди в пуле потоков.
То есть после заполнения очереди поток, вызывающий метод put, будет заблокирован.Даже если другой поток вызовет метод setCapacity и изменит длину очереди, если ни один поток не инициирует операцию put снова, заблокированный поток не будет заблокирован. будить.
Да или нет?
Ты понимаешь?
правильно?
Это неправильно, друзья.
Друзья, которые часто кивали, увидев предыдущий контент, обратите внимание.
Это место вот-вот начнет вращаться.
начать поворот
При добавлении объектов в очередь в пуле потоков вместо команды put используется команда offer:
Давайте посмотрим, что делает команда offer:
После заполнения очереди верните false напрямую, и блокировки не будет.
То есть нет необходимости просыпаться, как я упоминал ранее, в пуле потоков, потому что заблокированных потоков вообще нет.
В процессе общения с начальником он упомянулVariableLinkedBlockingQueue
с вещами.
Этот класс находится в пакете MQ, и из него узнаётся упомянутая ранее модификация метода setCapacity:
При этом в проекте используется и его метод put:
Поэтому возможно, что в ситуации, которую мы проанализировали ранее, есть нить, которую нужно разбудить.
Однако подумайте об этом, метод put не используется в пуле потоков, просто чтобы избежать такой ситуации?
Да, это.
Однако это недостаточно строго: если вы знаете, что есть проблема, зачем оставлять здесь дыру?
Вы внимательно изучаете VariableLinkedBlockingQueue в MQ, и его можно использовать, даже когда метод put заблокирован.
На самом деле, писать здесь кажется бесполезным знанием, кроме того, что вы можете ознакомиться с LinkedBlockingQueue.
Тем не менее, я могу сделать это бесполезное знание полезным.
Потому что это на самом деле маленькая деталь.
Предположим, я иду на собеседование. Когда я упоминаю метод динамической настройки во время интервью, я непреднамеренно ущипну эту маленькую деталь. Даже если я на самом деле не реализовал динамическую настройку, это кажется очень реальным, когда я упоминаю такую маленькую деталь.
Интервьюер услышал: очень хорошо, есть целые и части, подделки быть не должно.
В VariableLinkedBlockingQueue есть еще несколько деталей, в качестве примера возьмем метод put:
Судя поcount.get() >= capacity
сталcount.get() = capacity
, цель состоит в том, чтобы поддержать сценарий, в котором емкость изменяется с большой на маленькую.
Таких мест несколько, поэтому я не буду перечислять их по одному.
Дьявол кроется в деталях.
Учащиеся должны хорошо его рассмотреть.
JDK bug
На самом деле первоначальный план был написан заранее, и я планировал закончить его, потому что просто хотел добавить детали, которых раньше не замечал.
Однако я был дешев и обратился к списку ошибок JDK, чтобы найти LinkedBlockingQueue, чтобы увидеть, есть ли какие-либо другие преимущества.
Я не ожидал этого, и действительно был небольшой сюрприз.
Во-первых, это ошибка, возникшая 29 декабря 2019 года:
Судя по заголовку, я хочу расширить возможности LinkedBlockingQueue, чтобы можно было изменить его емкость.
В дополнение к описанию сцены ниже, он также должен захотеть взаимодействовать с пулом потоков, найти дескриптор очереди, перейти к базовой логике, связать систему мониторинга, просмотреть страницу конфигурации и нажать на набор. динамических адаптационных ударов.
Но чиновник не принял это предложение.
В ответе говорилось, что ребята, которые написали параллельный пакет, очень осторожно относились к добавлению вещей в параллельный класс. Они считали, что добавление динамически изменяемых функций в ThreadPoolExecutor принесет или уже принесло множество ошибок.
Я понимаю простое предложение: предложение все еще хорошее, но я не смею двигаться. Параллельно с этим куском дергать все тело, не знаю какие мотыльки вылезут.
Таким образом, чтобы достичь этой функции, или вы должны найти способ.
Это также объясняет, почему final используется для изменения емкости очереди, ведь уменьшение функции уменьшит вероятность ошибок.
Вторая ошибка интересна и соответствует нашей потребности в динамической настройке пула потоков:
Это ошибка, зарегистрированная в марте 2020 года, которая описывает исключение отклонения, возникающее при обновлении количества основных потоков пула потоков.
В части описания ошибки он разместил много кода, но код, который он написал, был очень сложным и непростым для понимания.
К счастью, г-н Мартин написал упрощенную версию, которая понятна с первого взгляда и ее легче понять:
Что делает этот код?Позвольте мне кратко рассказать вам об этом.
Во-первых, в основном методе есть цикл, в котором вызывается тестовый метод, когда тестовый метод выдает исключение, цикл завершается.
Затем в тестовом методе каждый раз создается новый пул потоков, а затем длина очереди плюс максимальное количество потоков отправляются в пул потоков, и, наконец, пул потоков закрывается.
Существует также еще один поток, который изменяет количество основных потоков в пуле потоков с 1 на 5.
Вы можете открыть ссылку на ошибку, упомянутую выше, вставить этот код и запустить его, это просто невероятно.
Босс Мартин, он тоже думает, что это баг.
Честно говоря, я запускал кейс и думал, что это должна быть ошибка, но после личной аттестации Дуга Ли он не подумал, что это ошибка.
Основная причина в том, что эта ошибка действительно находится за пределами моего понимания, а конкретная причина четко не указана в ссылке, из-за чего я очень долго ее искал и даже хотел в какой-то момент сдаться.
Но после того, как, наконец, локализовав проблему, он вздохнул: Блин, и все? Это не имеет особого смысла.
Давайте посмотрим, как ведет себя проблема:
После запуска вышеуказанной программы будет выброшено исключение RejectedExecutionException, то есть пул потоков откажется выполнять задачу.
Но, как мы анализировали ранее, количество циклов for — это количество задач, которые может просто удерживать пул потоков:
По логике не должно быть проблем, верно?
Вот тут приятели, задававшие вопрос, недоумевали:
Он сказал: "Я очень озадачен. Количество задач, которые я отправляю, никогда не превысит queueCapacity+maxThreads. Почему пул потоков генерирует исключение RejectedExecutionException? И эту проблему очень сложно отладить, потому что при добавлении к задаче какой-либо задержки проблема не будет воспроизводиться.
Его вывод таков: "Эта проблема очень необъяснима, но я могу воспроизводить ее стабильно, но время возникновения проблемы очень случайное каждый раз, когда я ее воспроизвожу. Я не могу понять это. Я думаю, что это ошибка. Пожалуйста, помогите".
Я не буду говорить, какова основная причина обнаруженного мной бага.
Посмотрим, что скажет старик:
Точка зрения старика состоит всего из четырех слов:
Старик сказал, что не убедил себя в том, что вышеуказанная программа должна быть успешно запущена.
Это значит, что он думает, что аномалия тоже нормальна. Но он не сказал почему.
Через день он добавил еще одно предложение:
Позвольте мне сначала перевести это для вас:
Он сказал, что когда метод submit пула потоков и setCorePoolSize или prestartAllCoreThreads существуют одновременно и выполняются в разных потоках, между ними будет конкуренция.
Существует короткое окно, когда новый поток предварительно запущен, но не полностью готов принять задачи в очереди. Очередь все еще заполнена в этом окне.
Решение на самом деле очень простое, например, можно убрать логику предварительного запуска потоков в методе setCorePoolSize, но если использовать метод prestartAllCoreThreads, прежние проблемы все равно возникнут.
Но в любом случае, я все еще не уверен, что это проблема, которую нужно исправить.
Как, слова отца выглядят очень смущенными?
Да, я прочитал этот отрывок в начале раз 10 и весь запутался, но когда я понял причину этой проблемы, мне все равно пришлось вздохнуть:
Или старик резюмировал по месту, без единого слова чепухи.
Какова причина?
Во-первых, давайте взглянем на два места в примере кода, которые управляют пулом потоков:
Это один поток, который изменяет количество основных потоков, то есть один поток в пуле потоков CompletableFuture по умолчанию ForkJoinPool.
Отправка задач в пул потоков — это другой поток, основной поток.
Отец первого предложения, вот в чем дело:
Гонки, это водить, это ехать быстро, это конкурировать с ... смысл.
Это многопоточный сценарий, основной поток и потоки в ForkJoinPool соревнуются, то есть может возникнуть проблема кто придет первым.
Затем посмотрим, что делает метод setCorePoolSize:
Место, отмеченное ①, предназначено для вычисления разницы между вновь установленным количеством основных потоков и исходным количеством основных потоков.
Полученная разница используется в месте, отмеченном ②.
То есть разница между разницей и количеством задач, которые в настоящее время очередят в очереди в текущей очереди, тем меньше.
Например, текущий номер основного потока равен 2, на этот раз я хочу изменить его на 5. В очереди стоят 10 задач.
Тогда разница будет 5-2=3, то есть дельта=3 на метке ①.
workQueue.size — это 10 задач, поставленных в очередь.
Это Math.min(3,10), поэтому k=3 на метке ②.
Смысл в том, что необходимо добавить 3 основных потока, чтобы помочь обрабатывать задачи в очереди.
Однако должно быть правильно, что вы хотите добавить еще 3?
Возможно ли, что задачи в очереди были обработаны в процессе добавления, и возможно ли, что задач вообще не более 3-х?
Итак, что является условием завершения цикла, кроме честного повторения цикла k раз?
Когда очередь пуста:
В то же время, если вы посмотрите на большой комментарий над кодом, то поймете, что он описывает на самом деле то же самое, что и я.
Хорошо, давайте перейдем к addWorker, где я хочу, чтобы вы увидели:
После серии рассуждений в этом методе он войдет в логику new Worker(), то есть в рабочий поток.
Затем добавьте этот поток в worker.
worker — это коллекция HashSet, в которой хранятся рабочие потоки:
Посмотрите на два раунда кода, которые я создал, начиная сworkers.add(w)
прибытьt.start()
.
Между присоединением к коллекции и фактическим запуском все еще есть некоторая логика.
Короткий период времени, в течение которого выполняется логика в середине, старик назвал «окном».
есть окно, в котором новые потоки находятся в процессе предварительного запуска, но еще не принимают задачи.
То есть будет окно, когда новый поток предзапущен, но еще не принял задачу.
Что будет с этим окном?
Это следующая фраза:
очередь может оставаться (временно) заполненной.
Очередь все еще может быть заполнена, но только временно.
Далее давайте посмотрим на это вместе:
Итак, как вы понимаете подчеркнутое предложение выше?
Добавьте реальную сцену, которая является предыдущим примером кода, просто настройте параметры:
Количество основных потоков в этом пуле потоков равно 1, максимальное количество потоков равно 2, длина очереди равна 5, а максимальное количество задач, которые могут быть размещены, равно 7.
Другой поток выполняет операцию, которая изменяет пул основных потоков с 1 на 2.
Предположим, мы записываем, что отправка пула потоков отправила 6 задач, а момент времени отправки седьмой задачи — T1.
Зачем подчеркивать этот момент времени?
Потому что при отправке седьмой задачи необходимо включить количество неосновных потоков.
Конкретный исходный код находится здесь:
java.util.concurrent.ThreadPoolExecutor#execute
То есть очередь в это время заполнена.workQueue.offer(command)
То, что возвращается, является fasle. Так что идтиaddWorker(command, false)
метод.
Код переходит к строке 1378, то есть T1.
Если метод addWorker в строке 1378 возвращает значение false, это означает, что добавление рабочего потока завершилось неудачно и возникает исключение отклонения.
Предыдущий пример программы выдает исключение отклонения, поскольку здесь возвращается fasle.
Таким образом, возникает вопрос: почему addWorker в строке 1378 возвращает false после выполнения?
Поскольку это условие в настоящее время не выполняетсяwc >= (core ? corePoolSize : maximumPoolSize)
:
wc — текущий пул потоков, количество рабочих потоков.
Принесите наше прежнее состояние, и всеwc >=(false?2:2)
.
То есть wc=2.
Почему оно должно быть равно 2, а не 1?
Откуда взялось больше?
Правда одна: в это время также выполняется addWorker в методе setCorePoolSizeworkers.add(w)
, в результате чего wc изменится с 1 на 2.
Сбой, поэтому возникает исключение отклонения.
Так почему же в большинстве случаев не выбрасываются исключения?
потому что изworkers.add(w)
прибытьt.start()
Это временное окно очень короткое.
В большинстве случаев после выполнения addWorker в методе setCorePoolSize он поймет, что задача взята из очереди и выполнена.
В этом случае, после отправки очередной задачи через пул потоков, обнаруживается, что место в очереди еще есть, и оно помещается в очередь, а метод addWorker вообще выполняться не будет.
Причина, это правда.
Эту проблему многопоточности действительно трудно воспроизвести, как я ее обнаружил?
Добавить журнал.
Как добавить логи в исходный код?
Я не только сделал кастомную очередь, но и склеил копию исходного кода пула потоков, чтобы можно было добавлять логи:
Кроме того, на самом деле мой план позиционирования тоже очень неточен.
При отладке многопоточности лучше не использовать System.out.println, бывают ямки!
Сцены
Вернемся назад и посмотрим на план, данный стариком:
На самом деле это дает два.
Первый — удалить логику addworker в методе setCorePoolSize.
Второй заключается в том, что в исходной программе, то есть программе, заданной вопрошающим, используется метод prestartAllCoreThreads, а в этой должен вызываться метод addWorker, так что определенный шанс возникновения предыдущей проблемы все же есть.
Однако старик не понял, почему он так написал?
Я думаю, может быть, он просто не придумал нужную сцену?
На самом деле упомянутая выше ошибка все еще может проявляться в сценарии динамической корректировки.
Хотя, вероятность возникновения очень низкая, а условия очень суровые.
Однако шанс все же есть.
Если такое случается, когда ваши коллеги ковыряются в голове, вы говорите: ну, я это уже видел, это баг. Не обязательно каждый раз.
Это еще одна маленькая деталь, до которой можно дотянуться.
Но если вы столкнетесь с этим вопросом во время интервью, это глупый вопрос.
бессмысленно.
Принадлежность, интервьюер не знал, где увидеть точку зрения, которая чувствует себя очень сильной, и должен показать, что он очень сильный.
Но чего он не знал, так это того, что этот вопрос:
Последнее слово
Ладно, смотри сюда, ставь лайк. Написание статей утомительно и нуждается в небольшой положительной обратной связи.
Вот один для всех читателей и друзей:
Эта статья взята из моего личного блога, приглашаю всех поиграть: