Здравствуйте, я криворукий.
Сегодня я поделюсь с вами расширенным пулом потоков, и я думаю, что идея расширения очень хороша.
Не волнуйтесь, я здесь с заглавной партией, не думаю, что кто-то будет тестировать эту штуку на собеседовании, но в работе можно столкнуться с соответствующими сценариями.
Чтобы получить этот пул потоков, я сначала дам вам сценарий для легкого понимания.
Возьмите следующий смайлик в качестве примера.
Предположим, у нас есть два программиста, назовем их богатыми и преуспевающими.
Смайлик выше — это портрет работы этих двух программистов за день, который представлен программой.
Сначала мы создаем объект, который представляет, что делает программист в данный момент:
public class CoderDoSomeThing {
private String name;
private String doSomeThing;
public CoderDoSomeThing(String name, String doSomeThing) {
this.name = name;
this.doSomeThing = doSomeThing;
}
}
Затем используйте код, чтобы описать, что делают богатство и процветание:
public class NbThreadPoolTest {
public static void main(String[] args) {
CoderDoSomeThing rich1 = new CoderDoSomeThing("富贵", "启动Idea");
CoderDoSomeThing rich2 = new CoderDoSomeThing("富贵", "搞数据库,连tomcat,crud一顿输出");
CoderDoSomeThing rich3 = new CoderDoSomeThing("富贵", "嘴角疯狂上扬");
CoderDoSomeThing rich4 = new CoderDoSomeThing("富贵", "接口访问报错");
CoderDoSomeThing rich5 = new CoderDoSomeThing("富贵", "心态崩了,卸载Idea");
CoderDoSomeThing www1 = new CoderDoSomeThing("旺财", "启动Idea");
CoderDoSomeThing www2 = new CoderDoSomeThing("旺财", "搞数据库,连tomcat,crud一顿输出");
CoderDoSomeThing www3 = new CoderDoSomeThing("旺财", "嘴角疯狂上扬");
CoderDoSomeThing www4 = new CoderDoSomeThing("旺财", "接口访问报错");
CoderDoSomeThing www5 = new CoderDoSomeThing("旺财", "心态崩了,卸载Idea");
}
}
Краткое объяснение имен переменных показывает, что я все еще думаю об этом.
Быть богатым — значит иметь деньги, поэтому имя переменной — rich.
Wangcai — это Wangwangwang, поэтому имя переменной — www.
Если вы посмотрите на имя моего класса, NbThreadPoolTest, то поймете, что мне нужно использовать пул потоков.
В реальной ситуации богатые и зажиточные люди могут заниматься своими делами, не мешая друг другу, то есть они должны быть своими потоками.
Делая свои вещи, не мешая друг другу, похоже, можно использовать пул потоков.
Итак, я изменил программу, чтобы она выглядела так, чтобы использовать пул потоков:
public class NbThreadPoolTest {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(5);
List<CoderDoSomeThing> coderDoSomeThingList = new ArrayList<>();
coderDoSomeThingList.add(new CoderDoSomeThing("富贵", "启动Idea"));
coderDoSomeThingList.add(new CoderDoSomeThing("富贵", "搞数据库,连tomcat,crud一顿输出"));
coderDoSomeThingList.add(new CoderDoSomeThing("富贵", "嘴角疯狂上扬"));
coderDoSomeThingList.add(new CoderDoSomeThing("富贵", "接口访问报错"));
coderDoSomeThingList.add(new CoderDoSomeThing("富贵", "心态崩了,卸载Idea"));
coderDoSomeThingList.add(new CoderDoSomeThing("旺财", "启动Idea"));
coderDoSomeThingList.add(new CoderDoSomeThing("旺财", "搞数据库,连tomcat,crud一顿输出"));
coderDoSomeThingList.add(new CoderDoSomeThing("旺财", "嘴角疯狂上扬"));
coderDoSomeThingList.add(new CoderDoSomeThing("旺财", "接口访问报错"));
coderDoSomeThingList.add(new CoderDoSomeThing("旺财", "心态崩了,卸载Idea"));
coderDoSomeThingList.forEach(coderDoSomeThing -> {
executorService.execute(() -> {
System.out.println(coderDoSomeThing.toString());
});
});
}
}
Вышеприведенная программа предназначена для инкапсуляции вещей, сделанных Fugui и Wangcai, в список, а затем обхода списка и добавления всех вещей в нем, то есть «выполнения задач» в пул потоков.
Затем, после выполнения вышеуказанной программы, возможный вывод будет следующим:
На первый взгляд, нет никакой проблемы, и богатство, и процветание действуют одновременно.
Но если присмотреться, все делают что-то не в том порядке.
Например, Ванцай выглядит немного «шизофреником», а уголки его рта начали дико подниматься, как только Идея была запущена.
Итак, вот что я хочу.
Какую вещь я хочу?
Это делается для того, чтобы богатство и процветание выполнялись одновременно, а также для того, чтобы они выполняли действия в определенном порядке, то есть в соответствии с порядком, в котором я помещал их в пул потоков.
Формально это описывается так:
Мне нужен такой пул потоков, который может гарантировать, что поступающие задачи будут разделены на задачи по определенному измерению, а затем выполнены в том порядке, в котором задачи были отправлены. Этот пул потоков может повысить пропускную способность за счет параллельной обработки (несколько потоков), гарантируя при этом, что задачи в определенном диапазоне выполняются в строгом порядке.
Используя мой предыдущий пример, «по определенному измерению» — это имя человека, которое является измерением богатства и богатства.
Как ты это делаешь?
анализ еды
Что бы я сделал?
Прежде всего, я могу быть уверен, что пул потоков JDK не может этого сделать.
Потому что с точки зрения принципа пула потоков параллелизм и последовательный порядок не могут быть удовлетворены одновременно.
Вы понимаете, что я имею в виду?
Например, если я хочу использовать пул потоков для обеспечения порядка, то это так:
Существует только один пул потоков, который может гарантировать порядок.
Но есть ли в этом смысл?
В этом есть какой-то смысл, потому что он не занимает основной поток, но смысла в нем мало, ведь он кастрирует важную способность «многопоточность».
Итак, как нам повысить параллелизм в этом сценарии?
Подождите, похоже, у нас уже есть пул потоков, который гарантирует в первую очередь.
Итак, если мы расширим его по горизонтали и сделаем еще несколько, разве у нас не будет возможности распараллелить?
Затем вышеупомянутое «по определенному измерению», если есть несколько пулов потоков только с одним потоком, я также могу сопоставить «измерение» и «каждый пул потоков» в соответствии с этим измерением.
Программно это выглядит так:
Место, помеченное ①, предназначено для создания нескольких пулов потоков только с одним потоком, чтобы обеспечить порядок потребления.
Место, обозначенное ②, предназначено для сопоставления отношений между именами людей и пулами потоков через карту. Это просто указание. Например, мы также можем использовать модуль номера пользователя, чтобы найти соответствующий пул потоков. Например, если номер пользователя нечетный, используйте один пул потоков, а если номер пользователя четный, используйте другая нить.
Следовательно, нет необходимости определять, сколько пулов потоков только с одним потоком определяется с точки зрения количества данных в «определенном измерении».Они также многоразовые, в этом месте есть небольшой поворот, чтобы развернуться.
Место, отмеченное ③, должно перейти к соответствующему пулу потоков на карте в соответствии с именем.
Из результатов вывода проблем нет:
Когда вы увидите здесь друзей, вы скажете: Ты не обманываешь?
Разве не хорошо сказать, что есть пул потоков, вы сделали несколько.
Если вы хотите смотреть на вещи под этим углом, вы сузите дорогу.
Вы хотите иметь большой пул потоков, а существует множество пулов потоков только с одним потоком.
Это открывает макет.
Способ, который я написал выше, представляет собой очень простую демонстрацию, которая в основном приводит к идее этой схемы.
Я хочу представить проект с открытым исходным кодом, основанный на этой идее.
Ее написал большой босс крупной компании, я посмотрел исходный код и был поражен: написание действительно чертовски хорошо.
Я дам вам последний вариант использования и сначала выведу:
От случая, использование также очень просто.
Отличие от собственного использования JDK заключается в той части, которую я выделил.
Сначала создайте объект KeyAffinityExecutor, чтобы заменить собственный пул потоков.
KeyAffinityExecutor, который включает слово Affinity.
В переводе имеет тот же смысл:
Поэтому KeyAffinityExecutor переводится как пул потоков, аналогичный ключу, когда вы поймете его функцию и область применения, вы почувствуете, что имя не тыкается.
Далее вызывается метод executeEx объекта KeyAffinityExecutor, и в него можно передать еще один параметр, который представляет собой размерность, отличающую определенный тип той же задачи, например, здесь я привожу поле имени.
Из варианта использования можно сказать, что упаковка очень хорошая и работает из коробки.
Использование KeyAffinityExecutor
Давайте сначала поговорим об использовании этого класса.
Соответствующий адрес проекта с открытым исходным кодом:
Если вы хотите использовать его, вам нужно импортировать следующий адрес maven:
<dependency>
<groupId>com.github.phantomthief</groupId>
<artifactId>more-lambdas</artifactId>
<version>0.1.55</version>
</dependency>
Его основным кодом является этот интерфейс:
com.github.phantomthief.pool.KeyAffinityExecutor
В этом интерфейсе много комментариев, вы можете опустить его и посмотреть.
Здесь я в основном показываю комментарии автора к интерфейсу, так он представил свой инструмент.
Это пул потоков, который использует в указанном порядке соответствия ключей.
KeyAffinityExecutor — это специальный пул потоков задач.
Это может гарантировать, что задачи, поставленные в одном и том же ключе, будут выполняться последовательно в порядке отправки. Он очень подходит для сценариев, в которых требуется параллельная обработка для повышения пропускной способности, при этом гарантируя, что задачи в определенном диапазоне выполняются в строгом порядке.
Встроенная реализация KeyAffinityExecutor предназначена для сопоставления указанного ключа с фиксированным пулом однопоточных потоков и будет поддерживать несколько (настраиваемых) пулов однопоточных потоков внутри для поддержания определенной степени параллелизма задач.
Следует отметить, что KeyAffinityExecutor, определенный этим интерфейсом, не требует запуска задач с одним и тем же ключом в одном потоке.Хотя класс реализации может реализовать его таким образом, это не является обязательным требованием, поэтому при использовании Please not полагаться на такие предположения.
Многие спрашивают, в чем разница между этим и использованием массива пулов потоков и реализацией простого модуля?
На самом деле, большинство сценариев не сильно отличаются друг от друга, но когда возникает перекос данных, данные, которые хэшируются в одно и то же место, могут задерживаться из-за перекоса данных в горячих точках.
При низком уровне параллелизма (можно установить пороговое значение) эта реализация выберет для доставки наиболее простаивающий пул потоков, максимально изолирует искаженные данные и уменьшит влияние на другие данные.
Во введении автора кратко объясняются сценарии применения и внутренние принципы проекта, которые аналогичны тому, что мы анализировали ранее.
Кроме того, есть два момента, на которые следует обратить особое внимание.
Первое место здесь:
В качестве объекта измерения задачи различения, если это настраиваемый объект, его хэш-код и эквиваленты должны быть переписаны, чтобы гарантировать, что он может играть роль в идентификации.
Напоминание здесь такое же, как и причина, по которой методы hashCode и equals должны быть переписаны, если ключ HashMap является объектом.
Основы программирования, просто упомяните об этом, не вдаваясь в подробности.
О втором месте следует сказать осторожно, оно принадлежит его основной идее.
Он не использовал простой метод по модулю, потому что в простом сценарии по модулю данные могут быть искажены.
Вот как я понимаю мысль автора.
Во-первых, давайте объясним, что такое перекос данных по модулю.Давайте возьмем простой пример:
В приведенном выше фрагменте кода я добавил новый символ «мастер рыбной ловли». При этом к объекту добавляется поле id.
Предположим, мы берем остаток от 2 для поля id:
Тогда ситуация будет такова, что идентификатор, соответствующий мастеру и расширенному результату, равен 1, и они будут использовать один и тот же пул потоков.
Очевидно, что из-за частых операций мастера «ловить» стало хот-данные, в результате чего происходит наклон пула соединений под номером 0, что в свою очередь влияет на нормальную работу Фугуи.
И какова стратегия KeyAffinityExecutor?
Для доставки будет выбран наиболее простаивающий пул потоков.
Как понять это?
Тем не менее приведенный выше пример, если мы создадим пул потоков следующим образом:
KeyAffinityExecutor executorService =
KeyAffinityExecutor.newSerializingExecutor(3, 200, "MY-POOL-%d");
Первый параметр равен 3, что означает, что будут созданы 3 пула потоков только с одним потоком в этом пуле потоков.
Затем, когда мы используем его для отправки задач, поскольку измерение является измерением идентификатора, у нас есть только три идентификатора, поэтому мы просто заполняем этот пул потоков:
На данный момент искажения данных нет.
Однако что, если я ранее изменил параметр построения пула потоков с 3 на 2?
KeyAffinityExecutor executorService =
KeyAffinityExecutor.newSerializingExecutor(2, 200, "MY-POOL-%d");
Метод отправки остается без изменений, а также добавляется логика задержки задач с id 1 и 2 для наблюдения за тем, как обрабатываются данные с id 3:
Несомненно, пула потоков точно не хватает при подаче операции ловли мастера выполнения, что делать?
В это время, по описанию автора, "для доставки будет выбран наиболее простаивающий пул потоков".
Я использую такие данные, чтобы проиллюстрировать:
Поэтому при выполнении основной операции ловли он выберет один из двух вариантов.
Как выбрать?
Выбирается тот, у кого самая низкая степень параллелизма.
Поскольку в задаче есть время задержки, мы можем наблюдать, что параллелизм потока, выполняющего богатые, равен 5, а параллелизм потока, выполняющего богатые, равен 6.
Поэтому при выполнении мастером операции лова для обработки будет выбран поток с параллелизмом 5.
В этом сценарии происходит перекос данных. Но предпосылка тильта изменилась, и в настоящее время нет доступных потоков.
Итак, авторы говорят: «Как можно больше изолируйте искаженные данные».
Самая большая разница между этими двумя схемами заключается в использовании ресурсов потока.Если это просто по модулю, могут быть доступные потоки, когда происходит перекос данных.
Если это метод KeyAffinityExecutor, он может гарантировать, что при возникновении перекоса данных потоки в пуле потоков должны быть израсходованы.
Затем вы почувствуете тонкие различия между двумя программами.
Исходный код KeyAffinityExecutor
Исходников не так уж и много, всего таких категорий:
Но большая часть его исходного кода написана на лямбда-выражениях, которые в основном являются функциональным программированием.Если вы слабы в этой области, это будет выглядеть немного сложнее.
Если вы хотите освоить его исходный код, я предлагаю вытащить проект локально и начать с его тестовых случаев:
Я сообщу вам некоторые ключевые моменты, которые я видел, чтобы вы могли разобраться в своих мыслях, когда вы пойдете, чтобы увидеть это своими глазами.
Прежде всего, следует начать с его метода построения, автор очень четко обозначил значение каждого параметра:
Предположим, что наш конструктор такой, что означает построить 3 пула потоков только с одним потоком, а размер очереди каждого пула потоков равен 200:
KeyAffinityExecutor executorService =
KeyAffinityExecutor.newSerializingExecutor(3, 200, "WHY-POOL-%d");
Прежде всего, мы должны выяснить, где находится логика построения «пула потоков только с одним потоком».
Этот метод скрыт в конструкторе:
com.github.phantomthief.pool.KeyAffinityExecutorUtils#executor(java.lang.String, int)
Здесь вы можете увидеть «пул потоков только с одним потоком», о котором мы говорили, а также можно указать длину очереди:
Этот метод возвращает интерфейсы поставщиков, необходимые для использования подчиненных.
Далее нам предстоит выяснить, где отражается цифра «3»?
Он скрыт в методе сборки конструктора, который в итоге вызовет этот метод:
com.github.phantomthief.pool.impl.KeyAffinityImpl#KeyAffinityImpl
Когда вы нажимаете точку останова в этом месте, а затем смотрите Debug, это очень ясно:
Что касается нескольких ключевых параметров этой части коробки, позвольте мне объяснить:
Первый — это параметр count, который мы определили как 3. Тогда диапазон (0,3) равен 0,1,2.
Затем есть поставщик, который является интерфейсом поставщика, возвращаемым методом исполнителя, о котором мы упоминали ранее.Вы можете видеть, что он инкапсулирует пул потоков.
Затем следует очень важная операция: map(ValueRef::new).
Объект ValueRef в этой операции имеет решающее значение:
com.github.phantomthief.pool.impl.KeyAffinityImpl.ValueRef
Ключевым моментом является переменная параллелизма в этом объекте.
Помните упомянутое ранее предложение «выбрать самый бездействующий исполнитель (пул потоков)»?
Как я могу сказать, что он простаивает?
Он зависит от переменной параллелизма.
Соответствующий код находится здесь:
com.github.phantomthief.pool.impl.KeyAffinityImpl#select
Если вы можете перейти к точке останова, это означает, что текущий ключ не был сопоставлен ранее, поэтому вам нужно указать для него пул потоков.
Операция указания этого пула потоков заключается в зацикливании всей коллекции, которая содержит объекты ValueRef:
Таким образом, метод compareInt(ValueRef::concurrency) должен выбрать тот, у которого наименьший параллелизм среди всех текущих пулов потоков.
Если этот пул потоков никогда не использовался или в настоящее время не используются никакие задачи, то параллелизм должен быть равен 0, и будут выбраны все.
Если используются все пулы потоков, выбирается пул потоков с наименьшим значением параллелизма.
Я просто даю вам общее представление, если вы хотите узнать больше, вы можете сами перейти к исходному коду.
Если вы очень хорошо знаете, как использовать лямбда-выражения, вы будете чувствовать себя действительно элегантно и комфортно писать.
Если вы не знаете лямбды...
Так почему бы тебе не поторопиться учиться?
Также я нашел две знакомые вещи.
Друзья, посмотрите, что это:
Разве это не просто динамическая настройка параметров пула потоков?
Второе это:
Я также писал о динамической настройке в RabbitMQ, и я также подчеркнул эти три места:
- Добавлены {@link #setCapacity(int)} и {@link #getCapacity()}.
- {@link #capacity} Граница оценки изменена с == на >=
- Некоторые триггеры сигналов signal() заменены на signalAll().
Кроме того, автор также упомянул, что версия RabbitMQ будет иметь ошибки, вызывающие NPE.
Я не изучал это подробно, если вам интересно, вы можете сравнить код, и вы сможете понять, в чем проблема.
Разговор о Дуббо
Зачем говорить о Даббо?
Потому что я, кажется, также нашел следы KeyAffinityExecutor в Dubbo.
Почему вы говорите, что кажется?
Потому что в конце концов он не был объединен с кодовой базой.
Соответствующая ссылка здесь:
В этом представлении представлено так много документов:
Внутри мы можем найти вещи, с которыми мы знакомы:
На самом деле идеи одни и те же, но вы обнаружите, что даже если идеи одни и те же, структура кода, написанная двумя разными людьми, все равно сильно отличается.
Здесь Dubbo немного четче разделяет уровень кода, например, определяет абстрактный объект AbstractKeyAffinity, а затем реализует две схемы случайного и минимального параллелизма.
В этих деталях есть отличия.
Но поставщик этого кода не стал его использовать и предложил альтернативу:
В этом представлении он в основном представил этот класс:
org.apache.dubbo.common.threadpool.serial.SerializingExecutor
Вы знаете этот класс из названия, оно делает упор на сериализацию.
Взгляните на его тестовые примеры, и вы узнаете, как он используется:
Во-первых, входные данные его конструктора — это другой пул потоков.
Затем отправьте задачу с помощью метода выполнения SerializingExecutor.
Что мы делаем внутри задачи, так это вынимаем из карты ключ, соответствующий val, а затем добавляем 1 и ставим обратно.
Все мы знаем, что описанная выше операция является потоконебезопасной в случае многопоточности, и конечный результат должен быть меньше количества циклов.
Однако, если это однопоточный случай, это определенно хорошо.
Итак, как сопоставить пул потоков с одним потоком?
SerializingExecutor делает именно это.
И его принцип очень прост, основной код — всего несколько строк.
Во-первых, он сам создает очередь:
Отправленные задачи помещаются в очередь.
Затем выполняйте по одному.
Как обеспечить поочередную реализацию?
Есть много методов, вот объект AtomicBoolean для управления:
Таким образом реализуется сценарий сериализации многопоточных задач.
Меня просто удивляет, что класс SerializingExecutor в настоящее время не используется в Dubbo.
Однако, если вам нужно реализовать такую странную функцию, например, кто-то дает вам пул потоков, но у вас есть какое-то соображение в вашем процессе, вам нужно сериализовать задачу, и вы не должны трогать чужой пул потоков в это время. ., то можно вспомнить, что у Dubbo здесь есть готовое, более элегантное, качественное решение.
Последнее слово
Ладно, вижу тут, вперед, смотреть, ставить лайки и ставить любые, я не против, если вы их все аранжируете. Написание статей утомительно и нуждается в небольшой положительной обратной связи.
Вот один для всех читателей и друзей:
Эта статья была включена в мой личный блог, играть может каждый.