Играйте с пулом потоков от 0 до 1

Java
Играйте с пулом потоков от 0 до 1

Обычно мы не используем класс потока напрямую.ThreadВместо многопоточного программирования используйте более удобный пул потоков для планирования задач и управления ими. Пул потоков похож на общий велосипед, нам нужно получить его только тогда, когда он нам нужен. Можно даже сказать, что пул потоков лучше, нам просто нужно передать ему задачу, и он запустится в нужное время. Но если вы используете его напрямуюThreadclass нам нужно создавать, запускать и ждать поток каждый раз, когда мы выполняем задачу, и сложно управлять потоком в целом, что является непростой задачей. Теперь, когда у нас есть пул потоков, давайте оставим хлопоты пулу потоков.

Предыдущая статья об использовании пула потоков и его исходного кода слишком длинная и объемная, и я чувствую, что ее не очень легко понять. Поэтому я реорганизовал контент, разделил его на две статьи и добавил немного контента, надеясь, что всем будет легче понять соответствующий контент.

В этой статье мы начнем с концепции и общего использования пулов потоков и сначала представим общее использование пулов потоков. Затем мы подробно представим часто используемые настраиваемые элементы в пуле потоков, такие как очередь задач, политика отклонения и т. д., и, наконец, представим четыре общие конфигурации пула потоков. С помощью этой статьи вы сможете освоить использование пулов потоков и использовать пулы потоков для гибкого планирования потоков на практике.

Для чтения этой статьи требуется базовое понимание многопоточного программирования, например, что такое поток, какую проблему решает многопоточность и так далее. Читатели, которые не знают, могут обратиться к статье, которую я опубликовал ранее.«На этот раз давайте полностью освоим многопоточность Java (2/10)»

Как правило, наиболее часто используемым классом реализации пула потоков являетсяThreadPoolExecutor, далее мы познакомимся с основным использованием этого класса. JDK проделал хорошую работу по инкапсуляции пула потоков, я считаю, что этот процесс будет очень простым.

Базовое использование пула потоков

Создать пул потоков

Поскольку пул потоков — это класс Java, самый прямой способ его использования должен быть новым.ThreadPoolExecutorОбъекты, такие какThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>() ). Таким образом, каждый параметр в конструкторе Что это значит? Мы не можем беспокоиться о деталях, продолжайте использование поездки в пуле, вернитесь, чтобы изучить эту проблему позже.

Отправить задачу

Когда пул потоков создан, мы можем отправлять задачи в пул потоков для выполнения. Отправка задач в пул потоков довольно проста, нам просто нужно передать исходныйThreadконструктор классаRunnableобъект передается в пул потоковexecuteметод илиsubmitметод подойдет.executeМетоды иsubmitМетод в основном нет разницы, единственная разница между двумяsubmitметод вернетFutureОбъект, используемый для проверки выполнения асинхронной задачи и получения результата выполнения (после завершения асинхронной задачи).

Мы можем сначала попробовать, как использовать более простойexecuteметод, пример кода выглядит следующим образом:

public class ThreadPoolTest {

    private static int count = 0;

    public static void main(String[] args) throws Exception {
        Runnable task = new Runnable() {
            public void run() {
                for (int i = 0; i < 1000000; ++i) {
                    synchronized (ThreadPoolTest.class) {
                        count += 1;
                    }
                }
            }
        };

        // 重要:创建线程池
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 0L,
        TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

        // 重要:向线程池提交两个任务
        threadPool.execute(task);
        threadPool.execute(task);

        // 等待线程池中的所有任务完成
        threadPool.shutdown();
        while (!threadPool.awaitTermination(1L, TimeUnit.MINUTES)) {
            System.out.println("Not yet. Still waiting for termination");
        }
        
        System.out.println("count = " + count);
    }
}

Результат после запуска — два миллиона, и мы успешно реализовали первую программу с использованием пула потоков. Итак, вернемся к предыдущему вопросу: каковы функции параметров, передаваемых при создании пула потоков?

Углубленный анализ пулов потоков

Создайте параметры пула резьбы

НижеThreadPoolExecutorОпределение конструктора для:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

Каждый параметр имеет следующие значения:

  1. corePoolSize, размер основного пула потоков, общий пул потоков будет содержать как минимум это количество потоков;
  2. maxPoolSize, максимальный размер пула потоков, то есть максимальное количество потоков в пуле потоков;
  3. keepAliveTime и блок вместе образуют тайм-аут,keepAliveTimeэто количество времени,unitявляется единицей времени, а единица плюс количество составляют окончательный период ожидания. Этот тайм-аут указывает, что если пул потоков содержит болееcorePoolSizeколичество потоков, поток будет уничтожен, когда время простоя потока превысит время ожидания;
  4. workQueue – это блокирующая очередь для задач. Если в пуле потоков недостаточно доступных потоков, задачи будут помещены в эту блокирующую очередь для выполнения. Тип передаваемой здесь очереди определяет стратегию обработки этих задач пулом потоков. Конкретный тип будет представлен ниже;
  5. threadFactory — объект фабрики потоков, с помощью которого пул потоков создает потоки. Мы можем передать пользовательскую реализациюThreadFactoryКласс интерфейса используется для изменения логики создания потока, его можно не указывать и использовать по умолчанию.Executors.defaultThreadFactory()В качестве фабрики по умолчанию;
  6. Обработчик, стратегия отклонения, объекты, которые не обрабатываются, когда пул потоков не выполняет или не сохраняет новые отправленные задачи, и существует несколько из следующих стратегий:
    • ThreadPoolExecutor.AbortPolicy, стратегия по умолчанию, поведение заключается в прямом броскеRejectedExecutionExceptionаномальный
    • ThreadPoolExecutor.CallerRunsPolicy, используйте поток, в котором находится вызывающий объект, для выполнения задачи
    • ThreadPoolExecutor.DiscardOldestPolicy, отмените самую раннюю отправленную задачу в очереди блокировки и повторите попытку выполнения метода
    • ThreadPoolExecutor.DiscardPolicy, молча удалить задачу напрямую, не возвращая никаких ошибок

Большинство читателей, увиденных здесь, могут не понимать конкретную роль каждого параметра, поэтому мы использовали эти параметры конфигурации кода в исходном коде пула потоков, чтобы глубже понять значение каждого параметра.

Реализация метода выполнения

Обычно мы используемexecuteметод отправляет нашу задачу, так что же делает пул потоков в процессе? существуетThreadPoolExecutorКатегорияexecute()В исходном коде метода мы в основном делаем четыре вещи:

  1. Если количество потоков в текущем пуле потоков меньше, чем количество основных потоковcorePoolSize, ЧерезthreadFactoryСоздайте новую тему и пропустите задачу ввода в качестве первой задачи в потоке;
  2. Если количество потоков в текущем пуле потоков достигло количества основных потоковcorePoolSize, то пройдетБлокировка очереди workerQueueизofferметод для добавления задач в очередь для сохранения и ожидания бездействия потока перед выполнением;
  3. Если количество потоков достигло Corepoolsize и не может вставить эту задачу (например, полную) в очередь блоков, пул потоков добавит поток для выполнения задачи, если количество потоков не достигло максимального количества потоков.maximumPoolSize;
  4. Если максимальное количество потоков действительно достигнуто, оно пройдетобработчик объекта политики отклоненияотказаться от этой задачи.

Общий процесс выполнения выглядит следующим образом: сплошная черная точка слева представляет начало процесса, а черные концентрические круги внизу представляют его конец:

Функции всех параметров, кроме времени ожидания в параметрах конструктора пула потоков, указаны выше, я думаю, что каждый может понять значение каждого параметра в соответствии с описанным выше процессом. Но есть один термин, который мы еще не объяснили подробно, а именноочередь блокировкизначение.

Блокирующая очередь в пуле потоков

в пуле потоковочередь блокировкиОн специально используется для хранения отложенных задач, которые должны ждать, пока поток не будет простаивать, иочередь блокировкиЭто такая структура данных, это очередь (похожая на список), которая может хранить от 0 до N элементов. Мы можем вставлять и извлекать элементы из этой очереди, а операцию извлечения можно понимать как операцию получения и удаления элемента из очереди. Когда в очереди нет элементов, операция получения в эту очередь будет заблокирована до тех пор, пока элемент не будет вставлен, и не будет активирована; когда очередь заполнена, операция вставки в эту очередь будет заблокирована до тех пор, пока элемент не будет вставлен. Он проснется только после того, как всплывет.

Такая структура данных хорошо подходит для сцены пула-потока, когда работник не может обрабатывать задачу, будет ввести состояние блокировки, просыпаться до после подачи новых задач.

В пуле потоков различные типы пула потоков очереди блокировки будут оказывать различное влияние на поведение, вот три наиболее часто используемых типа очереди блокировки:

  1. прямая очередь, сSynchronousQueueКласс представлен, а в очереди нет задач. Когда поток отправки задачи пытается добавить отложенную задачу в очередь, она будет заблокирована, пока поток обработки задачи не попытается получить отложенную задачу из очереди, он напрямую свяжется с потоком отправки задачи в заблокированном состоянии, и поток отправки задачи будет Задача будет передана непосредственно потоку выполнения задачи;
  2. неограниченная очередь, сLinkedBlockingQueueПредставлены классы, а в очереди может храниться неограниченное количество задач. Этот тип очереди никогда не перестанет помещать задачи в очередь, потому что очередь заполнена, поэтому мы можем обнаружить, что при использовании неограниченной очереди количество потоков в пуле потоков может достигать только количества основных потоков и не будет расти. Теперь параметр maxPoolSize максимального количества потоков не действует;
  3. ограниченная очередь, сArrayBlockingQueueКлассы представлены и могут содержать фиксированное количество задач. Этот вид очереди чаще используется на практике, потому что он не приведет к чрезмерному потреблению ресурсов из-за сохранения слишком большого количества задач (неограниченная очередь) и не повлияет на производительность системы (прямая очередь), поскольку поток отправки задач заблокирован. В целом ограниченные очереди на практике более сбалансированы.

Прочтите исходный код метода Execute.

В среде IDE, такой как IDEA, мы можем щелкнуть в нашем примере кодаThreadPoolExecutorКласс переходит на JDKThreadPoolExecutorИсходный код класса. В исходном коде мы можем увидеть многоеjava.util.concurrentРазличные комментарии, оставленные создателем пакета "Doug Lea", на картинке ниже скриншот исходного кода этого класса.

Содержание этих заметок очень ценно для справки, и рекомендуется, чтобы способные читатели и друзья могли прочитать их самостоятельно. Далее мы шаг за шагом раскроем класс пула потоков.ThreadPoolExecutorТайна исходного кода. Но этот шаг необязателен, его можно пропустить.

НижеThreadPoolExecutorсерединаexecuteМетод имеет исходный код, объясненный на китайском языке. Заинтересованные друзья могут сравнить его с описанным выше процессом для справки:

public void execute(Runnable command) {
    // 检查提交的任务是否为空
    if (command == null)
        throw new NullPointerException();
    
    // 获取控制变量值
    int c = ctl.get();
    // 检查当前线程数是否达到了核心线程数
    if (workerCountOf(c) < corePoolSize) {
        // 未达到核心线程数,则创建新线程
        // 并将传入的任务作为该线程的第一个任务
        if (addWorker(command, true))
            // 添加线程成功则直接返回,否则继续执行
            return;

        // 因为前面调用了耗时操作addWorker方法
        // 所以线程池状态有可能发生了改变,重新获取状态值
        c = ctl.get();
    }

    // 判断线程池当前状态是否是运行中
    // 如果是则调用workQueue.offer方法将任务放入阻塞队列
    if (isRunning(c) && workQueue.offer(command)) {
        // 因为执行了耗时操作“放入阻塞队列”,所以重新获取状态值
        int recheck = ctl.get();
        // 如果当前状态不是运行中,则将刚才放入阻塞队列的任务拿出,如果拿出成功,则直接拒绝这个任务
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            // 如果线程池中没有线程了,那就创建一个
            addWorker(null, false);
    }
    // 如果放入阻塞队列失败(如队列已满),则添加一个线程
    else if (!addWorker(command, false))
        // 如果添加线程失败(如已经达到了最大线程数),则拒绝任务
        reject(command);
}

В этом исходном коде мы видим, что пул потоков создаетсяaddWorkerметод создания потока, Worker здесь относится кThreadPoolExecutorКласс, используемый для переноса и управления потокамиWorker类对象。 Если вы хотите знатьWorkerДля конкретного процесса выполнения класса вы можете прочитать следующую статью о углубленном анализе процесса выполнения задач пула резьбы.

сверхурочное время

Тогда есть еще одна вещь, которую мы не упомянулисверхурочное времяКакую роль она сыграла в этом процессе? Как видно спереди, количество потоков делится на количество основных потоков и максимальное количество потоков. Когда у потока нет задач для выполнения, он блокируется при получении новых задач из очереди. В это время мы называем этот поток бездействующим потоком. После отправки новой задачи поток выйдет из состояния блокировки и начать выполнение новой задачи Task.

Если общее количество потоков в текущем пуле потоков больше, чем количество основных потоков, пока время простоя любого потока превышает время ожидания, поток будет уничтожен; если общее количество потоков в пуле потоков меньше или равно количеству потоков ядра, то поток тайм-аута будет уничтожен Не будет уничтожен (за исключением некоторых особых случаев). Здесь в игру вступает параметр тайм-аута.

Другие операции пула пула

Закройте пул резьбы

В коде, который раньше использовал пул потоков для выполнения задач, он использовался для ожидания выполнения всех задач в пуле потоков.shutdown()метод, который позволяет закрыть пул потоков. заThreadPoolExecutor, есть два основных способа закрыть пул потоков:

  1. shutdown(), пул потоков закрывается упорядоченным образом.После вызова пул потоков завершит выполнение отправленных задач, но не будет принимать новые задачи.
  2. shutdownNow(), непосредственно закрыть пул потоков, запущенные задачи в пуле потоков будут прерваны, задачи, ожидающие выполнения, не будут выполняться снова, но эти задачи, все еще ожидающие в очереди блокировки, будут возвращены в качестве возвращаемых значений.

Мониторинг рабочего состояния пула потоков

Мы можем получить текущую информацию о текущем пуле потоков, вызвав методы для объекта пула потоков, обычно используемые методы:

  • getTaskCount, пул потоков завершен, выполнение, общее количество задач, ожидающих выполнения, оценивается. Поскольку динамика задачи изменится в статистическом процессе, конечный результат не является точным значением;
  • GetCompletedTaskCount — общее количество задач, выполненных в пуле потоков, которое также является оценочным;
  • getLargestPoolSize — максимальное количество потоков, когда-либо созданных пулом потоков. С помощью этих данных можно узнать, заполнен ли пул потоков, то есть достигнут ли максимальный размер пула;
  • getPoolSize, текущее количество потоков в пуле потоков;
  • getActiveCount — оценка количества потоков, выполняющих задачи в текущем пуле потоков.

Четыре общих пула потоков

Во многих случаях мы не будем напрямую создаватьThreadPoolExecutorобъект класса, а по необходимости черезExecutorsНесколько статических методов для создания пулов потоков определенного назначения. Существует четыре часто используемых пула потоков:

  1. Кэшируемый пул потоков, используйтеExecutors.newCachedThreadPoolсоздание метода
  2. Пул потоков фиксированной длины, используйтеExecutors.newFixedThreadPoolсоздание метода
  3. Пул потоков задач задержки, используйтеExecutors.newScheduledThreadPoolсоздание метода
  4. Однопоточный пул потоков, используйтеExecutors.newSingleThreadExecutorсоздание метода

Давайте посмотрим на характеристики и применимые сценарии различных типов пулов резьбы через исходный код этих статических методов.

Кэшируемый пул потоков

Исходный код JDK, можно легко просматривать прыжок в IDE, следующееExecutors.newCachedThreadPoolисходный код в методе. Как видно из кода, кэшируемый пул потоков на самом деле создается путем непосредственного созданияThreadPoolExecutorКонструктор класса создан, но параметры в нем заданы, поэтому особых настроек делать не нужно. Таким образом, наше наблюдение сосредоточено на том, как настроить конкретную конфигурацию в этом методе.ThreadPoolExecutorобъекта и для каких сценариев подходит такой пул потоков.

Из приведенного ниже кода видно, что входящийThreadPoolExecutorЗначения конструктора: - corePoolSize Количество основных потоков равно 0, что означает, что количество потоков в пуле потоков может быть равно 0 - MaximumPoolSize Максимальное количество потоков равно Integer.MAX_VALUE, что означает, что в пуле потоков может быть не более чем бесконечное количество потоков. - Время ожидания установлено на 60 секунд, что означает, что потоки в пуле потоков будут перезапущены после бездействия в течение 60 секунд. - последним переданным являетсяSynchronousQueueТип очереди блокировки, что означает, что каждая вновь добавленная задача должна быть немедленно обработана рабочим потоком.

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

Поэтому кэшируемый пул потоков при добавлении задач будет отдавать приоритет простаивающим потокам, если нет, то будет создан новый поток. Верхнего предела на количество потоков нет, поэтому каждая задача будет сразу назначена рабочему потоку для выполнения , и ему не нужно находиться в очереди на блокировку.Подождите, если пул потоков простаивает долгое время, все потоки в нем будут уничтожены, экономя системные ресурсы.

  • преимущество
    • Задачи могут выполняться сразу после добавления, не входя в очередь блокировки для ожидания
    • Потоки не резервируются во время простоя, что позволяет экономить системные ресурсы.
  • недостаток
    • Нет ограничений на количество потоков, которые могут чрезмерно потреблять системные ресурсы.
  • Применимая сцена
    • Подходит для большого количества коротких задач и сценариев с высокими требованиями к времени отклика.

Пул потоков фиксированной длины

входящийThreadPoolExecutorЗначения конструктора:

  • Основные потоки corePoolSize и максимальные потоки maxPoolSize являются фиксированными значениями.nThreads, то есть количество потоков в пуле потоков останется равнымnThreads, поэтому он называется «пул потоков фиксированной длины».
  • Тайм-аут установлен на 0 миллисекунд, потому что в пуле потоков есть только основные потоки, поэтому нет необходимости учитывать время ожидания.
  • Последний параметр использует неограниченную очередь, поэтому в случае, когда все потоки обрабатывают задачи, вы можете добавлять задачи в очередь блокировки на неопределенный срок для выполнения
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

Количество потоков в пуле потоков фиксированной длины будет постепенно увеличиваться до nThreads, и после этого простаивающие потоки не будут освобождаться, а количество потоков останется на уровнеnThreadsКусок. Если при добавлении задачи все потоки заняты, задача будет добавлена ​​в очередь блокировки для выполнения, а верхнего предела общего количества задач в очереди блокировки нет.

  • преимущество
    • Количество потоков фиксировано, а потребление системных ресурсов контролируемо
  • недостаток
    • В случае резкого увеличения количества задач пул потоков не будет эластично расти, что вызовет задержку времени выполнения задач
    • При использовании неограниченной очереди, если задано слишком малое количество потоков, это может привести к слишком большому количеству невыполненных задач, что приведет к слишком позднему выполнению задач и чрезмерному потреблению ресурсов.
  • Применимая сцена
    • Сценарии, в которых пиковый объем задач не будет слишком большим, и задача не требует большого времени отклика

Пул потоков отложенных задач

В отличие от двух предыдущих методов,Executors.newScheduledThreadPoolто, что возвращаетсяScheduledExecutorServiceОбъект интерфейса может предоставлять такие функции, как отложенное выполнение и выполнение по времени. Конфигурация пула потоков имеет следующие характеристики:

  • maxPoolSize Максимальное количество потоков не ограничено, и может быть создано большое количество новых потоков для выполнения задач, когда количество задач велико.
  • Тайм-аут равен 0, и поток будет уничтожен сразу после бездействия.
  • При использовании отложенной рабочей очереди элементы в отложенной рабочей очереди имеют соответствующие сроки действия, и всплывают только просроченные элементы.
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

Реализован пул потоков отложенных задач.ScheduledExecutorServiceИнтерфейс в основном используется для ситуаций, когда требуется отложенное выполнение и выполнение по времени.

пул потоков с одним потоком

В пуле однопоточных потоков есть только один рабочий поток, который обеспечивает выполнение добавленных задач в указанном порядке (первым пришел, первым вышел, последним пришел, первым вышел, приоритет). Но если в пуле потоков только один поток, почему мы используем пул потоков, а не напрямуюThreadШерстяная ткань? В этом случае есть два основных преимущества: первое — мы можем легко отправлять задачи на асинхронное выполнение через общий пул потоков, не управляя сами жизненным циклом потоков; второе — мы можем использовать очередь задач и указывать порядок выполнения задач, легко реализовать функцию управления задачами.

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

Суммировать

В этой статье мы начнем с концепции и базового использования пула потоков и глубоко проанализируем весь процесс отправки задачи и роль каждого параметра конструктора пула потоков в реальном запущенном процессе пула потоков с помощью исходного кода метода execute. Также действительно прочитайте класс пула потоковThreadPoolExecutorИсходный код метода execute. Наконец, мы представляем другие общие операции пулов потоков и четыре часто используемых пула потоков.

На этом наше знакомство с исходным кодом пула потоков завершено, и я надеюсь, что после прочтения этой статьи у вас сложится общее впечатление об использовании и запущенном процессе пула потоков. Почему только общее впечатление? Потому что я думаю, что многие читатели, у которых нет соответствующей основы, могут просто иметь собственное представление о пуле потоков после прочтения этого, и некоторые детали могут быть не полностью охвачены. Поэтому я предлагаю, чтобы после прочтения этой статьи вы, возможно, захотели вернуться к началу статьи и прочитать ее еще несколько раз.Я верю, что второе чтение принесет вам другой опыт, потому что я также читаю ее для третий раз.ThreadPoolExecutorИсходный код класса действительно ломает некоторые из этих важных соединений.

Введение

В этой статье мы рассмотрели только базовое использование пула потоков и метод отправки задач.executeИсходный код. Итак, после того, как задача, как быть представленным в пул потоков выполнения это? В следующей статье мы можем найти ответ, в следующей статье мы подробно проанализируем процесс выполнения задачи пула потоков.