Глубокое понимание пула потоков Java

Java

1. Введение

Что такое пул потоков

Пул потоков — это форма многопоточности, при которой задачи добавляются в очередь во время обработки, а затем автоматически запускаются после создания потоков.

Зачем использовать пул потоков

Если количество одновременных запросов велико, но время выполнения каждого потока очень короткое, потоки будут создаваться и уничтожаться часто. В результате эффективность системы будет значительно снижена, а затраты времени и ресурсов на частое создание и уничтожение потоков могут превысить фактическую работу.

Это из-за этой проблемы, необходимо ввести пул резьбы. использоватьПреимущества пулов потоковЕсть следующие моменты:

  • Сокращение потребления ресурсов- Уменьшите стоимость создания и уничтожения потоков за счет повторного использования уже созданных потоков.
  • Улучшить отзывчивость- При поступлении задачи задачу можно выполнить немедленно, не дожидаясь создания потока.
  • Улучшить управляемость потоками- Потоки являются дефицитными ресурсами. Если они создаются без ограничений, это не только потребляет системные ресурсы, но и снижает стабильность системы. Использование пулов потоков можно использовать для унифицированного распределения, настройки и мониторинга. Но чтобы разумно использовать пул потоков, вы должны хорошо знать его принципы.

2. Структура исполнителя

Платформа Executor — это платформа для вызова, планирования, выполнения и управления асинхронными задачами в соответствии с набором политик выполнения с целью предоставления механизма для отделения «отправки задачи» от «как задачи выполняются».

Обзор основного API

Основной API платформы Executor выглядит следующим образом:

  • Executor- Простой интерфейс для запуска задач.
  • ExecutorService- расширенныйExecutorинтерфейс. Расширяемость:
    • Поддержка потоков с возвращаемыми значениями;
    • Поддержка управления жизненным циклом потоков.
  • ScheduledExecutorService- расширенныйExecutorServiceинтерфейс. Расширяемость: поддерживает периодическое выполнение задач.
  • AbstractExecutorService - ExecutorServiceРеализация интерфейса по умолчанию.
  • ThreadPoolExecutor- Основной класс фреймворка Executor, который наследуетAbstractExecutorServiceсвоего рода.
  • ScheduledThreadPoolExecutor - ScheduledExecutorServiceРеализация интерфейса, пул потоков, который может регулярно планировать задачи.
  • Executors- можно вызватьExecutorsСтатический фабричный метод для создания пула потоков и возвратаExecutorServiceобъект.

Executor

ExecutorВ интерфейсе определен только одинexecuteспособ получитьRunnableобъект.

public interface Executor {
    void execute(Runnable command);
}

ExecutorService

ExecutorServiceинтерфейс наследуетExecutorинтерфейс, который также обеспечиваетinvokeAll,invokeAny,shutdown,submitи другие методы.

public interface ExecutorService extends Executor {

    void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Из определения методов он поддерживает, не сложно видеть, что: по сравнению сExecutorинтерфейс,ExecutorServiceОсновные расширения интерфейса:

  • Поддерживаются потоки с возвращаемыми значениями -sumbit,invokeAll,invokeAnyВсе методы поддерживают входящиеCallableобъект.
  • Поддержка управления жизненным циклом потока —shutdown,shutdownNow,isShutdownи другие методы.

ScheduledExecutorService

ScheduledExecutorServiceрасширенный интерфейсExecutorServiceинтерфейс.

Помимо поддержки всех возможностей предыдущих двух интерфейсов, он также поддерживает потоки планирования времени.

public interface ScheduledExecutorService extends ExecutorService {

    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

}

Его интерфейс расширения предоставляет следующие возможности:

  • scheduleметод может выполнятьRunnableилиCallableЗадача.
  • scheduleAtFixedRateМетоды иscheduleWithFixedDelayМетод периодически выполняет задачу через заданный интервал времени.

3. ThreadPoolExecutor

java.uitl.concurrent.ThreadPoolExecutorклассExecutorОсновной класс в фреймворке. Поэтому данная статья будет посвящена этому классу.

Важные поля

ThreadPoolExecutorИмеются следующие важные поля:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

Описание параметра:

  • ctl - Используется для управления рабочим состоянием пула потоков и количеством эффективных потоков в пуле потоков.. Он содержит две части информации:
    • Текущее состояние пула потоков (runState)
    • Количество допустимых потоков в пуле потоков (workerCount)
    • можно увидеть,ctlиспользовалIntegerтип для сохранения, старшие 3 бита сохраняютсяrunState, младшие 29 бит сохраняютсяworkerCount.COUNT_BITS29,CAPACITYЭто 1, сдвинутая влево на 29 бит минус 1 (29 1 с), эта константа представляетworkerCountВерхний предел составляет около 500 миллионов.
  • Статус выполнения — существует пять статусов выполнения пула потоков:
    • RUNNING - Рабочий статус. Принимает новые задачи, а также может обрабатывать задачи в очередях блокировки.
    • SHUTDOWN - Неполноценный. Не принимает новые задачи, но может обрабатывать задачи в очереди блокировки.
      • в пуле потоковRUNNINGгосударство, звонитеshutdownметод приведет пул потоков в это состояние.
      • finalizeМетод также вызывается во время выполненияshutdownметод входит в это состояние.
    • STOP - состояние остановки. Не принимает новые задачи и не обрабатывает задачи в очереди. Прерывает поток, обрабатывающий задачу. в пуле потоковRUNNINGилиSHUTDOWNгосударство, звонитеshutdownNowметод приведет пул потоков в это состояние.
    • TIDYING - Организовать статус. Если все задачи завершены,workerCount(Количество допустимых потоков) равно 0, пул потоков будет вызываться после перехода в это состояние.terminatedспособ входаTERMINATEDгосударство.
    • TERMINATED - Прекращено. существуетterminatedВойдите в это состояние после выполнения метода. дефолтterminatedМетод ничего не делает. ВходитьTERMINATEDУсловия следующие:
      • Пул потоков неRUNNINGгосударство;
      • состояние пула потоков неTIDYINGстатус илиTERMINATEDгосударство;
      • Если состояние пула потоковSHUTDOWNиworkerQueueПусто;
      • workerCount0;
      • настраиватьTIDYINGСтатус успешно.

Метод строительства

ThreadPoolExecutorКонструкторов четыре, первые три основаны на четвертой реализации. Четвертый конструктор определяется следующим образом:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {

Описание параметра:

  • corePoolSize - количество основных потоков. Когда новая задача проходитexecuteКогда метод отправлен, пул потоков выполнит следующие суждения:
    • Если число запущенных потоков меньшеcorePoolSize, создается новый поток для обработки задачи, даже если другие потоки в пуле потоков простаивают.
    • Если количество потоков в пуле потоков больше или равноcorePoolSizeи меньше чемmaximumPoolSize, то только когдаworkQueueПри заполнении создается новый поток для обработки задачи;
    • Если установленоcorePoolSizeиmaximumPoolSizeТо же самое, размер создаваемого пула потоков фиксирован. В это время, если будет отправлена ​​новая задача, еслиworkQueueне заполнен, поместите запрос вworkQueue, ожидая перехода незанятого потока изworkQueueПодбирать задачи и обрабатывать их;
    • Если количество запущенных потоков больше или равноmaximumPoolSize, то еслиworkQueueзаполнен, используйтеhandlerзаданная стратегия обработки задачи;
    • Поэтому при сдаче задания порядок оцениванияcorePoolSize => workQueue => maximumPoolSize.
  • maximumPoolSize - максимальное количество потоков.
    • Если очередь заполнена, а количество уже созданных потоков меньше максимального количества потоков, пул потоков создаст новые потоки для выполнения задач.
    • Стоит отметить, что этот параметр не действует, если используется неограниченная очередь задач.
  • keepAliveTime:Тема поддерживает время жизни.
    • Когда количество потоков в пуле потоков превышаетcorePoolSizeКогда в это время нет новой отправки задачи, потоки вне основного потока не будут немедленно уничтожены, а будут ждать, пока время ожидания не превыситkeepAliveTime.
    • Следовательно, если есть много задач, и время выполнения каждой задачи относительно короткое, на этот раз может быть увеличено для улучшения использования потоков.
  • unit - keepAliveTimeединица времени. Есть 7 значений. Необязательными единицами измерения являются дни (DAYS), часы (HOURS), минуты (MINUTES), миллисекунды (MILLISECONDS), микросекунды (MICROSECONDS, тысячные доли миллисекунды) и наносекунды (NANOSECONDS, тысячные доли микросекунды).
  • workQueue - очередь задач, ожидающих выполнения. Блокирующая очередь для хранения задач, ожидающих выполнения. Можно выбрать следующие очереди блокировки.
    • ArrayBlockingQueue - Ограниченная очередь блокировки.
      • Эта очередьОчередь в порядке очереди на основе массива (FIFO).
      • Эта очередь должна быть создана с указанным размером.
    • LinkedBlockingQueue - неограниченная очередь блокировки.
      • Эта очередьОчередь в порядке очереди (FIFO) на основе связанного списка.
      • Если этот размер очереди не указан при создании, по умолчанию он равенInteger.MAX_VALUE.
      • пропускная способность обычно вышеArrayBlockingQueue.
      • использоватьLinkedBlockingQueueзначит:maximumPoolSizeне будет работать, максимальное количество потоков, которое может создать пул потоков, равноcorePoolSize, так как очередь ожидания задач является неограниченной очередью.
      • Executors.newFixedThreadPoolиспользовал эту очередь.
    • SynchronousQueue - Отправленная задача не будет сохранена, но будет создан новый поток непосредственно для выполнения новой задачи..
      • Каждая операция вставки должна ждать, пока другой поток не вызовет операцию удаления, иначе операция вставки останется заблокированной.
      • пропускная способность обычно вышеLinkedBlockingQueue.
      • Executors.newCachedThreadPoolиспользовал эту очередь.
    • PriorityBlockingQueue - Неограниченная очередь блокировки с приоритетом.
  • threadFactory - фабрика нитей. Каждому созданному потоку можно дать более осмысленное имя через фабрику потоков.
  • handler - стратегия насыщения. этоRejectedExecutionHandlerпеременная типа. Когда очередь и пул потоков заполнены, что указывает на насыщение пула потоков, необходимо принять стратегию для обработки новых отправленных задач. Пул потоков поддерживает следующие стратегии:
    • AbortPolicy- Отменить задачу и создать исключение. Это также политика по умолчанию.
    • DiscardPolicy- Отменить задачу без создания исключения.
    • DiscardOldestPolicy- Отменить задачу в начале очереди и повторить задачу (повторить процесс).
    • CallerRunsPolicy- Используйте только поток вызывающего абонента для запуска задачи.
    • Если ни одна из вышеперечисленных стратегий не может удовлетворить ваши потребности, вы также можете реализоватьRejectedExecutionHandlerинтерфейс для настройки стратегии обработки. Например, регистрация или сохранение задач, которые невозможно обработать.

выполнить метод

По умолчанию после создания пула потоков в пуле потоков нет, а потоки создаются только после отправки задач.

Для отправки задачи вы можете использоватьexecuteметод, этоThreadPoolExecutorОсновной метод, с помощью которого можноОтправить задачу в пул потоков для выполнения пулом потоков.

executeРабочий процесс метода выглядит следующим образом:

  1. еслиworkerCount < corePoolSize, затем создайте и запустите поток для выполнения только что отправленной задачи;
  2. еслиworkerCount >= corePoolSize, а очередь блокировки в пуле потоков не заполнена, добавьте задачу в очередь блокировки;
  3. еслиworkerCount >= corePoolSize && workerCount < maximumPoolSize, и очередь блокировки в пуле потоков заполнена, создайте и запустите поток для выполнения только что отправленной задачи;
  4. еслиworkerCount >= maximumPoolSize, а очередь блокировки в пуле потоков заполнена, задача обрабатывается в соответствии с политикой отклонения.Метод обработки по умолчанию – прямой вызов исключения.

Другие важные методы

существуетThreadPoolExecutorВ классе также есть несколько важных методов:

  • submit- похожий наexecute, но для потоков с возвращаемыми значениями.submitметод находится вExecutorServiceметоды, объявленные в , вAbstractExecutorServiceУже есть конкретная реализация.ThreadPoolExecutorПрямое повторное использованиеAbstractExecutorServiceизsubmitметод.
  • shutdown- Пул потоков не будет остановлен немедленно, но будет остановлен после того, как все задачи в очереди кэша задач будут выполнены, но новые задачи приниматься не будут.
    • Переключите пул потоков наSHUTDOWNгосударство;
    • и позвониinterruptIdleWorkersМетод запрашивает прерывание всех бездействующих рабочих процессов;
    • последний звонокtryTerminateПопытка завершить пул потоков.
  • shutdownNow- Немедленно прервите пул потоков и попытайтесь прервать выполнение задачи, очистите очередь кэша задач и верните задачу, которая не была выполнена. иshutdownМетод похож, разница:
    • установить состояние наSTOP;
    • Прервать все рабочие потоки, независимо от того, простаивают они или нет;
    • Удалить невыполненную задачу из очереди блокировки и вернуться.
  • isShutdown- называетсяshutdownилиshutdownNowПосле метода,isShutdownметод вернет true.
  • isTerminaed- Когда все задачи закрыты, пул потоков успешно закрыт, затем вызовитеisTerminaedметод вернет true.
  • setCorePoolSize- Установить размер количества основных потоков.
  • setMaximumPoolSize- Установить максимальное количество потоков размера.
  • getTaskCount- общее количество задач, выполняемых и не выполняемых пулом потока;
  • getCompletedTaskCount- Количество задач, выполненных пулом потоков, значение меньше или равноtaskCount;
  • getLargestPoolSize- Максимальное количество потоков, когда-либо созданных пулом потоков. С помощью этих данных можно узнать, заполнен ли пул потоков, то есть достиг ли онmaximumPoolSize;
  • getPoolSize- текущее количество потоков в пуле потоков;
  • getActiveCount- Количество потоков, выполняющих задачи в текущем пуле потоков.

Пример использования

public class ThreadPoolExecutorDemo {

    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 500, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());

        for (int i = 0; i < 100; i++) {
            threadPoolExecutor.execute(new MyThread());
            String info = String.format("线程池中线程数目:%s,队列中等待执行的任务数目:%s,已执行玩别的任务数目:%s",
                threadPoolExecutor.getPoolSize(),
                threadPoolExecutor.getQueue().size(),
                threadPoolExecutor.getCompletedTaskCount());
            System.out.println(info);
        }
        threadPoolExecutor.shutdown();
    }

    static class MyThread implements Runnable {

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " 执行");
        }

    }

}

Четыре, Исполнители

JDKExecutorsВ классе предусмотрено несколько репрезентативных пулов потоков, эти пулы потоковоснованы наThreadPoolExecutorиндивидуальная реализация.

При фактическом использовании пулов потоков мы часто не используем напрямуюThreadPoolExecutorвместо этого используйте репрезентативный экземпляр пула потоков, предоставленный в JDK.

newSingleThreadExecutor

Создать однопоточный пул резьбы.

Для выполнения задач будет создан только один рабочий поток, гарантирующий выполнение всех задач в указанном порядке (FIFO, LIFO, приоритет).Если единственный поток завершается аварийно, новый поток заменит его..

Самые большие особенности одного рабочего потока:Гарантированное последовательное выполнение задач.

Пример:

public class SingleThreadExecutorDemo {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 100; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " 执行");
                }
            });
        }
        executorService.shutdown();
    }

}

newFixedThreadPool

Создайте пул потоков фиксированного размера.

Каждый раз, когда задача отправляется, создается новый рабочий поток.Если количество рабочих потоков достигает максимального количества потоков в пуле потоков, отправленная задача будет сохранена в очереди блокировки..

FixedThreadPoolЭто типичный и превосходный пул потоков, обладающий преимуществами пула потоков, повышающими эффективность программы и снижающими накладные расходы при создании потоков. Однако, когда пул потоков простаивает, то есть когда в пуле потоков нет исполняемых задач, он не будет освобождать рабочие потоки и будет занимать определенные системные ресурсы.

Пример:

public class FixedThreadPoolDemo {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 100; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " 执行");
                }
            });
        }
        executorService.shutdown();
    }

}

newCachedThreadPool

Создайте кешируемый пул потоков.

  • Если длина пула потоков превышает количество потоков, необходимых для обработки задачи, некоторые простаивающие потоки будут перезапущены;
  • Если задача не отправляется в пул потоков в течение длительного времени, то есть если рабочий поток простаивает в течение заданного времени (по умолчанию 1 минута), рабочий поток будет автоматически завершен. После завершения, если вы отправляете новую задачу, пул потоков воссоздает рабочий поток.
  • Этот пул потоков не ограничивает размер пула потоков, размер пула потоков полностью зависит от максимального размера потока, который может создать операционная система (или JVM). Поэтому используйтеCachedThreadPoolПри обязательно обратите внимание на контроль количества задач, иначе из-за одновременного выполнения большого количества потоков система будет парализована.

Пример:

public class CachedThreadPoolDemo {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 100; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " 执行");
                }
            });
        }
        executorService.shutdown();
    }

}

newScheduleThreadPool

Создайте пул потоков бесконечного размера. Этот пул потоков поддерживает потребности в синхронизированном и периодическом выполнении задач.

public class ScheduledThreadPoolDemo {

    public static void main(String[] args) {
        schedule();
        scheduleAtFixedRate();
    }

    private static void schedule() {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        for (int i = 0; i < 100; i++) {
            executorService.schedule(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " 执行");
                }
            }, 1, TimeUnit.SECONDS);
        }
        executorService.shutdown();
    }

    private static void scheduleAtFixedRate() {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        for (int i = 0; i < 100; i++) {
            executorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " 执行");
                }
            }, 1, 1, TimeUnit.SECONDS);
        }
        executorService.shutdown();
    }

}

использованная литература