Два дня назад и однажды ночью, когда я был погружён в радость набора кода, я услышал невероятный восклицание от моего коллеги по соседству: Как могла команда отправки пула потоков выполняться дольше секунды?
Метод отправки пула потоков выполняется более одной секунды? Это неправильно. Отправка пула потоков должна быть быстрой операцией, и при нормальных обстоятельствах она не должна занимать больше одной секунды.
Глядя на код, кажется, что нет никакой проблемы, это простой код для отправки задач.
executor.execute( () -> {
// 具体的任务代码
// 这里有个for循环
});
Хотя в выполняемом задании есть цикл for, который может занимать много времени, когда выполнение отправляет задачу, оно фактически не выполняет задание, поэтому это не должно быть вызвано этой причиной.
анализировать
Видя эту ситуацию, первое, о чем мы думаем, — это процесс обработки, когда пул потоков отправляет задачи:
Затем последовательно проанализируйте операции, которые могут занять больше секунды:
Сколько времени занимает создание темы?
Согласно приведенному выше рисунку, мы можем знать, что если количество основных потоков установлено слишком большим, новые основные потоки могут постоянно создаваться для выполнения задач. Точно так же, если пул основных потоков и очередь задач заполнены, для выполнения задач будут созданы дополнительные потоки.
Создание потоков занимает много времени, и пул потоков Java блокируется, когда здесь создаются потоки.
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
Давайте напишем простую программу, которая может имитировать трудоемкую работу пула потоков.Следующий код создает 2w потоков, что на моем компьютере занимает около 6000 миллисекунд.
long before = System.currentTimeMillis();
for (int i = 0; i < 20000; i++) {
// doSomething里面睡眠一秒
new Thread(() -> doSomething()).start();
}
long after = System.currentTimeMillis();
// 下面这行在我的电脑里输出6139
System.out.println(after - before);
Но, взглянув на наш мониторинг, количество потоков было относительно хорошим, что не должно быть причиной. Кроме того, новые потоки в этом месте вряд ли достигнут такого масштаба.
Сколько времени нужно, чтобы войти в очередь задач?
Очередь задач пула потоков является синхронизированной очередью. Таким образом, операция постановки в очередь является синхронной.
Несколько часто используемых очередей синхронизации:
-
LinkedBlockingQueue
Сцепленная очередь блокировки, базовая структура данных представляет собой связанный список, а размер по умолчанию равен
Integer.MAX_VALUE
, вы также можете указать размер. -
ArrayBlockingQueue
Очередь блокировки массива, базовая структура данных представляет собой массив, и необходимо указать размер очереди.
-
SynchronousQueue
Синхронная очередь, внутренняя емкость равна 0, каждая операция ввода должна ждать операции взятия, и наоборот.
-
DelayQueue
Очередь с задержкой, элемент в очереди может быть получен из очереди только по истечении указанного времени задержки.
Поэтому используйте специальные очереди синхронизации, иначе это может привести кexecute
Метод блокируется более чем на секунду, напримерSynchronousQueue
. В сочетании со специальной «стратегией отторжения» можно вызвать это явление, пример мы приведем ниже.
политика отказа?
Когда количество потоков достигает максимального количества потоков, будет принята стратегия обработки отказа.Четыре стратегии обработки отказа:
- ThreadPoolExecutor.AbortPolicy: по умолчанию отклоняет политику обработки, отменяет задачу и выдает исключение.
- ThreadPoolExecutor.DiscardPolicy: отбрасывает входящие задачи, но не генерирует исключения.
- ThreadPoolExecuture.discardoldestPolicy: Откажитесь от задачи во главе очереди (старшая) и повторите попытку исполнителя (если он снова не удается, повторите процесс).
- ThreadPoolExecutor.CallerRunsPolicy: эта задача обрабатывается вызывающим потоком.
Видно, что первые три стратегии обработки отказа будут «отбрасывать» задачу, а последняя — нет. Последняя стратегия отбраковки взаимодействует с вышеуказаннойSynchronousQueue
, это может привести к ситуации, с которой мы столкнулись. Образец кода:
Executor executor = new ThreadPoolExecutor(2,2, 2,
TimeUnit.MILLISECONDS,new SynchronousQueue<>(),
new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 3; i++) {
long before = System.currentTimeMillis();
executor.execute( () -> {
// doSomething里面睡眠一秒
doSomething();
});
long after = System.currentTimeMillis();
// 下面这段代码,第三行会输出1001
System.out.println(after - before);
}
SimpleAsyncTaskExecutor
Так будут ли проблемы, с которыми мы сталкиваемся, вызваны вышеуказанными причинами? С этими догадками мы пошли искать код, определяющий исполнителя.
SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
executor.setConcurrencyLimit(20);
Установка максимального количества одновременных запросов на 20, кажется, не проблема, подождите, этоSimpleAsyncTaskExecutor
Что это за фигня?
Кажется, это пул потоков, предоставленный Spring... (Голос постепенно становится неуверенным)
эм... Я посмотрел определение пакета org.springframework.core.task, который действительно предоставляется Spring. Что касается того, является ли это пулом потоков, сначала посмотрите на диаграмму классов:
достигнутоExecutor
интерфейс, но почему не в дереве наследованияThreadPoolExecutor
? Мы предполагаем, что Spring сам реализует пул потоков? Хотя это и не должно быть необходимо.
исходный код
С сомнениями мы продолжали смотреть на исходный код этого класса. В основном видитеexecute
метод, обнаружите, что перед каждым выполнением вы должны вызыватьbeforeAccess
метод, в этом методе есть такой странный код:
Цикл while проверяет и ожидает, если текущее количество параллельных потоков больше или равно установленному максимальному значению.
Нашел причину, это должно быть виновником. Но почему Spring разработан таким образом?
Над аннотацией класса SimpleAsyncTaskExecutor мы нашли сообщение автора:
* <p><b>NOTE: This implementation does not reuse threads!</b> Consider a
* thread-pooling TaskExecutor implementation instead, in particular for
* executing a large number of short-lived tasks.
Общее значение таково: эта реализация не использует повторно потоки. Если вы хотите повторно использовать потоки, используйте реализацию пула потоков. Это используется для выполнения многих задач, которые занимают очень мало времени.
В этот момент открывается правда.
отражение
Разберитесь перед использованием интерфейса
Основная причина этой проблемы заключается в том, что мы думали, что SimpleAsyncTaskExecutor является «пулом потоков», но это не так! ! !
Когда мы используем проекты с открытым исходным кодом, мы часто используем их напрямую, мы не смотрим внимательно на их исходный код, и мы можем не рассматривать его среду приложения четко. Уже поздно ждать, пока возникнет проблема с программой.
Так что лучше разобраться перед использованием интерфейса, хотя бы посмотреть официальную документацию или документацию/примечания по интерфейсу.
Даже если есть реальная проблема, просмотр исходного кода по-прежнему является способом устранения проблемы, потому что код мертв, он не будет лгать.
соглашение о коде
У Али такая спецификация кода: не рекомендуется напрямую использовать пул потоков в классе Executors, а использоватьThreadPoolExecutor
Таким образом, учащиеся, которые пишут, должны иметь более четкое представление о правилах работы пула потоков, чтобы избежать риска исчерпания ресурсов.
Раньше я не совсем понимал это, я думал, что использование класса Executors может улучшить читаемость, JDK предоставляет такой инструментальный класс, поэтому он мне не нужен. Только когда я столкнулся с этой проблемой, я понял благие намерения этого закона.
Если мы используем канонический способ использования пула потоков вместо использования так называемого «пула потоков», предоставляемого Spring, мы не столкнемся с этой проблемой.
Четкие обязанности интерфейса
Подумайте еще раз, почему коллеги считают это пулом потоков? Потому что его имя класса и имя метода слишком похожи на пул потоков. он достигаетExecutor
интерфейсexecute
метод, из-за чего мы ошибочно приняли его за пул потоков.
Итак, вернемся кExecutor
За что отвечает этот интерфейс? Мы можем сделать это в JDKexecute
См. это примечание к методу:
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*/
Общая идея состоит в том, что входящая команда будет выполнена в определенное время в будущем.Эта команда может быть выполнена в новом потоке, в пуле потоков или в потоке, который вызывает этот метод.Как ее выполнить определяется класс реализации.решить.
Так что этоExecutor
В обязанности этого класса входит не предоставление интерфейса для пула потоков, а предоставление интерфейса для «выполнения команд в будущем».
Следовательно, что действительно представляет собой значение пула потоков, так этоThreadPoolExecutor
класс вместоExecutor
интерфейс.
Когда мы пишем код, мы также должны четко определить обязанности интерфейса. Таким образом, когда другие будут использовать ваш интерфейс или читать исходный код, они не будут сбиты с толку.
Об авторе
Я Ясин, пигментный материал и веселый программист.
Публичный аккаунт WeChat: составлена программа
Персональный сайт: https://yasinshaw.com
Подписывайтесь на мой официальный аккаунт и развивайтесь вместе со мной~
В этой статье используетсяmdniceнабор текста