Потоки и пулы потоков в Java — Пулы потоков

Java

Приквел:Потоки и пулы потоков в Java — Потоки

Пул потоков

Что такое пул потоков?

Проще говоря, пул потоков относится к созданию нескольких потоков заранее.Когда есть задача для обработки, поток в пуле потоков будет обрабатывать задачу.После завершения обработки поток не будет уничтожен, а продолжится ждать следующего задания. Поскольку создание и удаление потоков потребляют системные ресурсы, когда бизнесу необходимо часто создавать и уничтожать потоки, рассмотрите возможность использования пулов потоков для повышения производительности системы.

Что может пул потоков?

В The Art of Java Concurrent Programming использование пулов потоков может помочь:

  • Снизить потребление ресурсов. За счет повторного использования уже созданных потоков потребление, вызванное созданием и уничтожением потоков, может быть уменьшено.
  • Улучшить отзывчивость. Когда задача поступает, ее можно выполнить немедленно, не дожидаясь создания потока.
  • Улучшить управляемость потоками. Потоки являются дефицитными ресурсами. Если их создавать без ограничений, они не только потребляют системные ресурсы, но и снижают стабильность системы. Использование пулов потоков можно использовать для унифицированного распределения, настройки и мониторинга.

Как создать пул потоков

Сначала создайте класс, реализующий интерфейс Runnable.

package demo;

import java.util.Date;

/**
 * @author yuanyiwen
 * @create 2020-02-28 16:05
 * @description
 */
public class DemoThread implements Runnable {

    private String command;

    public DemoThread(String command) {
        this.command = command;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " 开始时间 : " + new Date());
        processCommand();
        System.out.println(Thread.currentThread().getName() + " 结束时间 : " + new Date());
    }

    private void processCommand() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String toString() {
        return "DemoThread{" +
                "command='" + command + '\'' +
                '}';
    }
}

Здесь давайте используем ThreadPoolExecutor для создания пула потоков для тестирования:

package demo;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author yuanyiwen
 * @create 2020-02-28 16:19
 * @description
 */
public class DemoThreadPoolExecutor {

    private static final int CORE_POOL_SIZE = 5;
    private static final int MAX_POOL_SIZE  = 10;
    private static final int QUEUE_CAPACITY = 100;
    private static final Long KEEP_ALIVE_TIME = 1L;

    public static void main(String[] args) {
        // 使用线程池来创建线程
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                // 核心线程数为 :5
                CORE_POOL_SIZE,
                // 最大线程数 :10
                MAX_POOL_SIZE,
                // 等待时间 :1L
                KEEP_ALIVE_TIME,
                // 等待时间的单位 :秒
                TimeUnit.SECONDS,
                // 任务队列为 ArrayBlockingQueue,且容量为 100
                new ArrayBlockingQueue<>(QUEUE_CAPACITY),
                // 饱和策略为 CallerRunsPolicy
                new ThreadPoolExecutor.CallerRunsPolicy()
        );

        for(int i = 0; i < 15; i++) {
            // 创建WorkerThread对象,该对象需要实现Runnable接口
            Runnable worker = new DemoThread("任务" + i);
            // 通过线程池执行Runnable
            threadPoolExecutor.execute(worker);
        }
        // 终止线程池
        threadPoolExecutor.shutdown();
        while (!threadPoolExecutor.isTerminated()) {

        }
        System.out.println("全部线程已终止");
    }
}

Наконец, давайте посмотрим на текущие результаты:

Можно видеть, что когда число основных потоков равно 5, даже если всего нужно запустить 15 потоков, каждый раз одновременно будут выполняться только 5 задач, а оставшиеся задачи будут помещены в очередь ожидания, ожидание бездействия основного потока. В общем, шаги следующие:

Платформа исполнителя

Платформа Executor была введена после Java5. После Java 5 лучше запускать поток через Executor, чем использовать метод запуска потока. В дополнение к тому, что он более управляем и эффективен (реализован с помощью пула потоков, экономит накладные расходы), есть ключевой момент: это помогает избежать этой проблемы выхода.

этот побег

Этот переход означает, что другие потоки сохраняют ссылку на объект до возврата конструктора, и вызов методов для объектов, которые еще не были полностью сконструированы, может привести к странным ошибкам.

Обычно есть два условия, которые должны быть выполнены, чтобы вызвать этот escape: одно — создать внутренний класс в конструкторе, а другое — опубликовать внутренний класс в конструкторе.

Поскольку освобожденный объект внутреннего класса имеет свои собственные права доступа к внешнему классу this, при доступе к внешнему классу this через объект внутреннего класса внешний класс может не сконструироваться, что приведет к некоторым неожиданным проблемам.

Типичный сценарий побега выглядит следующим образом:

public class DemoThisEscape {

    private int a = 10;

    public DemoThisEscape() {
        // 在外部类的构造函数中调用内部类
        new Thread(new InnerClass()).start();
    }

    private class InnerClass implements Runnable {
        @Override
        public void run() {
            // 在这里通过 DemoThisEscape.this 引用尚未构造完毕的对象,比如这样 :
            System.out.println(DemoThisEscape.this.a);
        }
    }
}

При использовании пула потоков для унифицированного планирования потоков шаг ручного запуска потока в программе опускается, тем самым избегая ситуации запуска потока в конструкторе, поэтому он может эффективно избежать этого выхода.

Общие параметры ThreadPoolExecutor

1. corePoolSize: количество потоков ядра.

Определяет минимальное количество потоков, которые могут выполняться одновременно.

2. maxPoolSize : максимальное количество потоков

Когда задачи, хранящиеся в очереди, достигают емкости очереди, текущее количество потоков, которые могут выполняться одновременно, увеличивается до максимального количества потоков.

3. keepAliveTime: время ожидания

Когда количество потоков больше, чем количество основных потоков, максимальное время, в течение которого выживают избыточные простаивающие потоки.

4. unit : Единица времени.

Единица времени для параметра keepAliveTime, включаяTimeUnit.SECONDS,TimeUnit.MINUTES,TimeUnit.HOURS,TimeUnit.DAYSи Т. Д.

5. workQueue: очередь задач

Очередь задач используется для хранения очереди задач, ожидающих выполнения.

6. threadFactory: фабрика ниток

Фабрика потоков, используемая для создания потоков, как правило, по умолчанию.

7. обработчик: политика отклонения

Также известная как стратегия насыщения; когда слишком много задач отправляется и не может быть обработано вовремя, для обработки задач можно использовать специальную стратегию.

Стратегия насыщения ThreadPoolExecutor:Относится к стратегии, выполняемой ThreadPoolTaskExecutor, когда количество одновременно выполняющихся потоков достигает максимального количества потоков и очередь заполняется.

Общие правила отказа включают:

  • ThreadPoolExecutor.AbortPolicy:бросатьRejectedExecutionExceptionотклонить обработку новых задач — это стратегия отклонения по умолчанию, используемая в Spring.
  • ThreadPoolExecutor.CallerRunsPolicy:Поток вызываетexecuteсам, то есть напрямую вызываяexecuteметод, работающий в потоке (run) отклоненная задача, которая отбрасывается, если исполнитель был закрыт. Эта стратегия обеспечивает простой механизм управления обратной связью, который может замедлить отправку новых задач, но может привести к задержкам. Эту стратегию можно выбрать, если приложение может терпеть эту задержку и не может отбрасывать запросы задач.
  • ThreadPoolExecutor.DiscardPolicy:Не обрабатывайте новые задачи, просто отбрасывайте их.
  • ThreadPoolExecutor.DiscardOldestPolicy:Эта политика отклонит самый старый незавершенный запрос задачи.

Почему рекомендуется использовать ThreadPoolExecutor для создания потоков?

Протокол 1. Ресурсы потоков должны предоставляться через пул потоков, и явное создание потоков в приложении запрещено.

Преимущество использования пула потоков заключается в сокращении времени, затрачиваемого на создание и уничтожение потоков, и накладных расходов на системные ресурсы, а также решении проблемы нехватки ресурсов. Если пул потоков не используется, это может привести к тому, что система создаст большое количество потоков одного типа, что может вызвать проблему нехватки памяти или «чрезмерного переключения».

Протокол 2: Обязательный пул потоков не может использоватьсяExecutorsсоздавать, а черезThreadPoolExecutorМетод конструктора, этот способ обработки позволяет авторам более четко представлять правила выполнения пула потоков и избегать риска исчерпания ресурсов.

Недостатки Executors, возвращающие объекты пула потоков, заключаются в следующем:

FixedThreadPoolиSingleThreadExecutor: запрос разрешендлина очередиInteger.MAX_VALUE, может бытьнакопить много запросов, что приводит к ООМ.

CachedThreadPoolиScheduledThreadPool: разрешено создаватьколичество потоковInteger.MAX_VALUE, может бытьСоздавайте много тем, что приводит к ООМ.

Несколько общих пулов потоков

Фиксированный пул потоков FixThreadPool

FixThreadPool: пул потоков с фиксированным количеством повторно используемых потоков.

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(
            nThreads, nThreads,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(),
            threadFactory);
    }

Механизм исполнения:

  • Если количество запущенных в данный момент потоков меньшеcorePoolSize, когда приходит новая задача, создается новый поток для выполнения задачи;
  • Количество запущенных в данный момент потоков равноcorePoolSizeПосле этого, если появится новая задача, задача будет добавлена ​​вLinkedBlockingQueue;
  • После того, как поток в пуле потоков выполнит свою работу, он будет повторно запускаться сLinkedBlockingQueueПолучить задачи для выполнения.

FixThreadPool использует неограниченную очередьLinkedBlockingQueue(Емкость очереди — Integer.MAX_VALUE), и это принесет в пул потоков следующее:Оказать влияние :

  • Когда количество потоков в пуле потоков достигаетcorePoolSize, новые задачи будут ожидать в неограниченной очереди, поэтому количество потоков в пуле потоков не будет превышатьcorePoolSize;
  • Так как используется неограниченная очередь, тоmaximumPoolSizeБудет недопустимым параметром, так как невозможно иметь полную очередь задач, поэтому FixedThreadPool'scorePoolSize,maximumPoolSizeустанавливается на одно и то же значение, иkeepAliveTimeбудет недопустимым параметром;
  • Запуск FixedThreadPool (относится к невыполнениюshutdown()илиshutdownNow()) не отклонит задачу, поэтому при наличии большого количества задач это может привести к неработоспособности.

Однопоточный пул SingleThreadExecutor

SingleThreadExecutor — это пул потоков только с одним потоком.

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(
                    1, 1,
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>(),
                    threadFactory));
}

В основном это то же самое, что и FixThreadPool, за исключением того, что в пуле есть только один поток.

Пул потоков кеша CachedThreadPool

CachedThreadPool — это пул потоков, который создает новые потоки по мере необходимости, но повторно использует созданные ранее потоки по мере их появления.

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(
            0, Integer.MAX_VALUE,
            60L, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>(),
            threadFactory);
}

ТотcorePoolSizeустановлен на 0,maximumPoolSizeУстановлено значение Integer.MAX.VALUE, которое не ограничено. Хотя он неограничен, потому что пул потоков также имеет механизм уничтожения, то есть, если поток не использовался в течение 60 секунд, поток будет уничтожен, что экономит много ресурсов.

Однако, если основной поток отправляет задачи быстрее, чемmaximunPoolВ зависимости от скорости, с которой поток обрабатывает задачи, CachedThreadPool будет продолжать создавать новые потоки, что по-прежнему может вызывать перегрузку ЦП или переполнение памяти.

Механизм исполнения:

  • сначала выполнитьofferОперация, отправьте задачу в очередь задач. Если в текущем максимальном пуле выполняются незанятые потокиpollоперация, а основной потокofferс неработающими потокамиpollПри успешном сопряжении основной поток передает задачу незанятому потоку для выполнения, что считаетсяexecute()Выполнение метода завершено, в противном случае выполняются следующие шаги.
  • когда начальныйmaximumпусто, илиmaximumPoolКогда нет свободных потоков вpollработать. В этот момент CachedThreadPool создаст новый поток для выполнения задачи,execute()Выполнение метода завершено.

Как составить размер пула потоков?

переключатель контекста

В многопоточном переменном программировании количество общих потоков больше, чем количество ядер ЦП, и одно ядро ​​ЦП может использоваться только одним потоком в любой момент времени. Чтобы эти потоки выполнялись эффективно, стратегия, принятая ЦП, заключается в выделении временных интервалов для каждого потока и ротации. Когда квант времени потока истечет, он снова будет в состоянии готовности для использования другими потоками, что является переключением контекста.

В двух словах, текущая задача сохранит свое состояние перед переключением на другую задачу после выполнения кванта времени ЦП, так что при переключении обратно на эту задачу в следующий раз ее можно будет напрямую загрузить в предыдущее состояние. Процесс задачи от сохранения до перезагрузки — это переключение контекста.

Переключение контекста обычно требует больших вычислительных ресурсов. То есть это требует значительного процессорного времени, порядка наносекунд для каждого переключения в десятках или сотнях переключений в секунду. Следовательно, переключение контекста означает, что оно потребляет много процессорного времени для системы, по сути, это может быть самая трудоемкая операция в операционной системе.

Одна из многих вещей, которые Linux сравнивает с другими операционными системами (включая другие Unix-подобные системы), заключается в том, что переключение контекста и переключение режимов занимает очень мало времени.

простой проект решения

Задачи с интенсивным использованием ЦП (N+1):

Эта задача в основном потребляет ресурсы ЦП.Количество потоков может быть установлено равным N (количество ядер ЦП) + 1. На один поток больше, чем число ядер ЦП, чтобы предотвратить случайное прерывание потока из-за ошибки страницы, или задача вызвано другими причинами воздействия подвески. Как только задача будет приостановлена, ЦП будет бездействовать, и дополнительный поток в этом случае может полностью использовать время простоя ЦП.

Задачи с интенсивным вводом-выводом (2N):

Когда применяется задача такого типа, система будет тратить большую часть времени на обработку взаимодействия ввода-вывода, и поток не будет занимать ЦП для обработки в течение периода времени обработки ввода-вывода.В это время ЦП может быть передан в пользование другим потокам. . Таким образом, при выполнении задач с интенсивным вводом-выводом мы можем настроить больше потоков, а конкретный метод расчета — 2N.


Справочная статья :JavaGuide