Вы также были отравлены этим «пулом потоков» Spring?

Java

Два дня назад и однажды ночью, когда я был погружён в радость набора кода, я услышал невероятный восклицание от моего коллеги по соседству: Как могла команда отправки пула потоков выполняться дольше секунды?

Метод отправки пула потоков выполняется более одной секунды? Это неправильно. Отправка пула потоков должна быть быстрой операцией, и при нормальных обстоятельствах она не должна занимать больше одной секунды.

Глядя на код, кажется, что нет никакой проблемы, это простой код для отправки задач.

executor.execute( () -> {
    // 具体的任务代码
    // 这里有个for循环
});

Хотя в выполняемом задании есть цикл for, который может занимать много времени, когда выполнение отправляет задачу, оно фактически не выполняет задание, поэтому это не должно быть вызвано этой причиной.

анализировать

Видя эту ситуацию, первое, о чем мы думаем, — это процесс обработки, когда пул потоков отправляет задачи:

线程池原理图
Схема пула потоков

Затем последовательно проанализируйте операции, которые могут занять больше секунды:

Сколько времени занимает создание темы?

Согласно приведенному выше рисунку, мы можем знать, что если количество основных потоков установлено слишком большим, новые основные потоки могут постоянно создаваться для выполнения задач. Точно так же, если пул основных потоков и очередь задач заполнены, для выполнения задач будут созданы дополнительные потоки.

Создание потоков занимает много времени, и пул потоков Java блокируется, когда здесь создаются потоки.

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();

Давайте напишем простую программу, которая может имитировать трудоемкую работу пула потоков.Следующий код создает 2w потоков, что на моем компьютере занимает около 6000 миллисекунд.

long before = System.currentTimeMillis();
for (int i = 0; i < 20000; i++) {
    // doSomething里面睡眠一秒
    new Thread(() -> doSomething()).start();
}
long after = System.currentTimeMillis();
// 下面这行在我的电脑里输出6139
System.out.println(after - before);

Но, взглянув на наш мониторинг, количество потоков было относительно хорошим, что не должно быть причиной. Кроме того, новые потоки в этом месте вряд ли достигнут такого масштаба.

Сколько времени нужно, чтобы войти в очередь задач?

Очередь задач пула потоков является синхронизированной очередью. Таким образом, операция постановки в очередь является синхронной.

Несколько часто используемых очередей синхронизации:

  1. LinkedBlockingQueue

    Сцепленная очередь блокировки, базовая структура данных представляет собой связанный список, а размер по умолчанию равенInteger.MAX_VALUE, вы также можете указать размер.

  2. ArrayBlockingQueue

    Очередь блокировки массива, базовая структура данных представляет собой массив, и необходимо указать размер очереди.

  3. SynchronousQueue

    Синхронная очередь, внутренняя емкость равна 0, каждая операция ввода должна ждать операции взятия, и наоборот.

  4. DelayQueue

    Очередь с задержкой, элемент в очереди может быть получен из очереди только по истечении указанного времени задержки.

Поэтому используйте специальные очереди синхронизации, иначе это может привести кexecuteМетод блокируется более чем на секунду, напримерSynchronousQueue. В сочетании со специальной «стратегией отторжения» можно вызвать это явление, пример мы приведем ниже.

политика отказа?

Когда количество потоков достигает максимального количества потоков, будет принята стратегия обработки отказа.Четыре стратегии обработки отказа:

  1. ThreadPoolExecutor.AbortPolicy: по умолчанию отклоняет политику обработки, отменяет задачу и выдает исключение.
  2. ThreadPoolExecutor.DiscardPolicy: отбрасывает входящие задачи, но не генерирует исключения.
  3. ThreadPoolExecuture.discardoldestPolicy: Откажитесь от задачи во главе очереди (старшая) и повторите попытку исполнителя (если он снова не удается, повторите процесс).
  4. ThreadPoolExecutor.CallerRunsPolicy: эта задача обрабатывается вызывающим потоком.

Видно, что первые три стратегии обработки отказа будут «отбрасывать» задачу, а последняя — нет. Последняя стратегия отбраковки взаимодействует с вышеуказаннойSynchronousQueue, это может привести к ситуации, с которой мы столкнулись. Образец кода:

Executor executor = new ThreadPoolExecutor(2,2, 2, 
                     TimeUnit.MILLISECONDS,new SynchronousQueue<>(), 
                     new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 3; i++) {
    long before = System.currentTimeMillis();
    executor.execute( () -> {
        // doSomething里面睡眠一秒
        doSomething();
    });
    long after = System.currentTimeMillis();
    // 下面这段代码,第三行会输出1001
    System.out.println(after - before);
}

SimpleAsyncTaskExecutor

Так будут ли проблемы, с которыми мы сталкиваемся, вызваны вышеуказанными причинами? С этими догадками мы пошли искать код, определяющий исполнителя.

SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
executor.setConcurrencyLimit(20);

Установка максимального количества одновременных запросов на 20, кажется, не проблема, подождите, этоSimpleAsyncTaskExecutorЧто это за фигня?

Кажется, это пул потоков, предоставленный Spring... (Голос постепенно становится неуверенным)

эм... Я посмотрел определение пакета org.springframework.core.task, который действительно предоставляется Spring. Что касается того, является ли это пулом потоков, сначала посмотрите на диаграмму классов:

достигнутоExecutorинтерфейс, но почему не в дереве наследованияThreadPoolExecutor? Мы предполагаем, что Spring сам реализует пул потоков? Хотя это и не должно быть необходимо.

исходный код

С сомнениями мы продолжали смотреть на исходный код этого класса. В основном видитеexecuteметод, обнаружите, что перед каждым выполнением вы должны вызыватьbeforeAccessметод, в этом методе есть такой странный код:

beforeAccess
beforeAccess

Цикл while проверяет и ожидает, если текущее количество параллельных потоков больше или равно установленному максимальному значению.

Нашел причину, это должно быть виновником. Но почему Spring разработан таким образом?

Над аннотацией класса SimpleAsyncTaskExecutor мы нашли сообщение автора:

 * <p><b>NOTE: This implementation does not reuse threads!</b> Consider a
 * thread-pooling TaskExecutor implementation instead, in particular for
 * executing a large number of short-lived tasks.

Общее значение таково: эта реализация не использует повторно потоки. Если вы хотите повторно использовать потоки, используйте реализацию пула потоков. Это используется для выполнения многих задач, которые занимают очень мало времени.

В этот момент открывается правда.

отражение

Разберитесь перед использованием интерфейса

Основная причина этой проблемы заключается в том, что мы думали, что SimpleAsyncTaskExecutor является «пулом потоков», но это не так! ! !

Когда мы используем проекты с открытым исходным кодом, мы часто используем их напрямую, мы не смотрим внимательно на их исходный код, и мы можем не рассматривать его среду приложения четко. Уже поздно ждать, пока возникнет проблема с программой.

Так что лучше разобраться перед использованием интерфейса, хотя бы посмотреть официальную документацию или документацию/примечания по интерфейсу.

Даже если есть реальная проблема, просмотр исходного кода по-прежнему является способом устранения проблемы, потому что код мертв, он не будет лгать.

соглашение о коде

У Али такая спецификация кода: не рекомендуется напрямую использовать пул потоков в классе Executors, а использоватьThreadPoolExecutorТаким образом, учащиеся, которые пишут, должны иметь более четкое представление о правилах работы пула потоков, чтобы избежать риска исчерпания ресурсов.

Раньше я не совсем понимал это, я думал, что использование класса Executors может улучшить читаемость, JDK предоставляет такой инструментальный класс, поэтому он мне не нужен. Только когда я столкнулся с этой проблемой, я понял благие намерения этого закона.

Если мы используем канонический способ использования пула потоков вместо использования так называемого «пула потоков», предоставляемого Spring, мы не столкнемся с этой проблемой.

Четкие обязанности интерфейса

Подумайте еще раз, почему коллеги считают это пулом потоков? Потому что его имя класса и имя метода слишком похожи на пул потоков. он достигаетExecutorинтерфейсexecuteметод, из-за чего мы ошибочно приняли его за пул потоков.

Итак, вернемся кExecutorЗа что отвечает этот интерфейс? Мы можем сделать это в JDKexecuteСм. это примечание к методу:

/**
* Executes the given command at some time in the future.  The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*/

Общая идея состоит в том, что входящая команда будет выполнена в определенное время в будущем.Эта команда может быть выполнена в новом потоке, в пуле потоков или в потоке, который вызывает этот метод.Как ее выполнить определяется класс реализации.решить.

Так что этоExecutorВ обязанности этого класса входит не предоставление интерфейса для пула потоков, а предоставление интерфейса для «выполнения команд в будущем».

Следовательно, что действительно представляет собой значение пула потоков, так этоThreadPoolExecutorкласс вместоExecutorинтерфейс.

Когда мы пишем код, мы также должны четко определить обязанности интерфейса. Таким образом, когда другие будут использовать ваш интерфейс или читать исходный код, они не будут сбиты с толку.

Об авторе

Я Ясин, пигментный материал и веселый программист.

Публичный аккаунт WeChat: составлена ​​программа

Персональный сайт: https://yasinshaw.com

Подписывайтесь на мой официальный аккаунт и развивайтесь вместе со мной~

公众号
Нет публики

В этой статье используетсяmdniceнабор текста