7 способов создать пул потоков, настоятельно рекомендуется их использовать...

Java
7 способов создать пул потоков, настоятельно рекомендуется их использовать...

Согласно закону Мура: количество транзисторов, которые могут поместиться на интегральной схеме, удваивается каждые 18 месяцев, поэтому количество транзисторов в процессоре будет увеличиваться.

Однако с течением времени количество транзисторов, которые могут быть размещены на интегральной схеме, имеет тенденцию к насыщению, и закон Мура постепенно нарушается, поэтому многоядерные процессоры постепенно становятся массовым явлением, а соответствующие многопоточные программирование тоже стало популярным и популярным.Это конечно давно.Пока что многопоточное программирование стало необходимым профессиональным навыком для программистов.Тогда мы обсудим самую важную тему в многопоточном программировании,"пул потоков ".

Что такое пул потоков?

ThreadPool — это механизм управления потоками и их использования, основанный на идее объединения потоков. Он заранее сохраняет несколько потоков в «пуле». Когда возникает задача, он может избежать накладных расходов на производительность, вызванных повторным созданием и уничтожением потоков. Для выполнения соответствующего нужно только извлечь соответствующий поток из «пула». задача. .

Идея объединения также широко используется в компьютерах, таких как:

  • Пул памяти: подайте заявку на память заранее, улучшите скорость приложения памяти и уменьшите фрагментацию памяти.
  • Пул соединений: предварительно подавайте заявки на подключения к базе данных, повышайте скорость подачи заявок на подключения и снижайте нагрузку на систему.
  • Пул экземпляров (пул объектов): перерабатывайте объекты, чтобы уменьшить дорогостоящее потребление ресурсов во время инициализации и выпуска.

Преимущества пулов потоков в основном отражены в следующих четырех пунктах:

  1. Сокращение потребления ресурсов: повторное использование созданных потоков с помощью технологии объединения, чтобы уменьшить потери, вызванные созданием и уничтожением потоков.
  2. Улучшить отзывчивость: При поступлении задачи ее можно выполнить немедленно, не дожидаясь создания потока.
  3. Улучшить управляемость потоками: Потоки являются дефицитными ресурсами.Если они создаются без ограничений, они не только потребляют системные ресурсы, но и вызывают дисбаланс планирования ресурсов из-за необоснованного распределения потоков, снижая стабильность системы. Используйте пулы потоков для унифицированного распределения, настройки и мониторинга.
  4. Обеспечьте все более и более мощные функции: пул потоков является расширяемым, что позволяет разработчикам добавлять к нему дополнительные функции. Например, пул потоков с отложенной синхронизацией ScheduledThreadPoolExecutor позволяет регулярно откладывать или выполнять задачи.

в то же времяAlibaba также предписывает в своем Руководстве по разработке Java, что ресурсы потоков должны предоставляться через пул потоков, и не разрешается явно создавать потоки в приложениях..

Описание: Преимущество пула потоков заключается в сокращении затрат времени на создание и уничтожение потоков и накладных расходов системных ресурсов, а также решении проблемы нехватки ресурсов. Если пул потоков не используется, это может привести к тому, что система создаст большое количество потоков одного типа, что может вызвать проблему нехватки памяти или «чрезмерного переключения».

Узнав, что такое пул потоков и зачем его использовать, давайте посмотрим, как использовать пул потоков.

использование пула потоков

Существует 7 способов создания пулов потоков, но в целом их можно разделить на 2 категории:

  • Класс прошелThreadPoolExecutorСоздан пул потоков;
  • Другой класс черезExecutorsСоздан пул потоков.

image.png

Всего существует 7 способов создания пулов потоков (6 из которых создаютсяExecutorsсоздано, 1 черезThreadPoolExecutorсозданный):

  1. Executors.newFixedThreadPool: создайте пул потоков фиксированного размера, который может контролировать количество параллельных потоков, а лишние потоки будут ожидать в очереди;
  2. Executors.newCachedThreadPool: создание кэшируемого пула потоков.Если количество потоков превышает требования к обработке, кеш будет восстановлен через определенный период времени.Если количества потоков недостаточно, будет создан новый поток;
  3. Executors.newSingleThreadExecutor: создайте пул потоков с одним числом потоков, что может гарантировать порядок выполнения «первым поступил, первым вышел»;
  4. Executors.newScheduledThreadPool: создайте пул потоков, который может выполнять отложенные задачи;
  5. Executors.newSingleThreadScheduledExecutor: создание однопоточного пула потоков, который может выполнять отложенные задачи;
  6. Executors.newWorkStealingPool: создание пула потоков для упреждающего выполнения (порядок выполнения задач не определен) [Добавлено в JDK 1.8].
  7. ThreadPoolExecutor: самый примитивный способ создания пула потоков, он содержит 7 параметров для установки, которые будут подробно рассмотрены позже.

Значение однопоточного пулаИз приведенного выше кода видно, что newSingleThreadExecutor и newSingleThreadScheduledExecutor создают однопоточные пулы, так в чем же смысл однопоточных пулов? О: Хотя это однопоточный пул, он предоставляет такие функции, как очередь работ, управление жизненным циклом и обслуживание рабочих потоков.

Затем давайте рассмотрим конкретное использование создания каждого пула потоков.

1.FixedThreadPool

Создайте пул потоков фиксированного размера, чтобы контролировать количество одновременных потоков, а лишние потоки будут ждать в очереди.

Пример использования следующий:

public static void fixedThreadPool() {
    // 创建 2 个数据级的线程池
    ExecutorService threadPool = Executors.newFixedThreadPool(2);

    // 创建任务
    Runnable runnable = new Runnable() {
        @Override
        public void run() {
            System.out.println("任务被执行,线程:" + Thread.currentThread().getName());
        }
    };

    // 线程池执行任务(一次添加 4 个任务)
    // 执行任务的方法有两种:submit 和 execute
    threadPool.submit(runnable);  // 执行方式 1:submit
    threadPool.execute(runnable); // 执行方式 2:execute
    threadPool.execute(runnable);
    threadPool.execute(runnable);
}

Результат выполнения следующий:image.png

Если вы считаете, что приведенный выше метод громоздкий, вы также можете использовать более простой метод, как показано в следующем коде:

public static void fixedThreadPool() {
    // 创建线程池
    ExecutorService threadPool = Executors.newFixedThreadPool(2);
    // 执行任务
    threadPool.execute(() -> {
        System.out.println("任务被执行,线程:" + Thread.currentThread().getName());
    });
}

2.CachedThreadPool

Создайте кешируемый пул потоков. Если количество потоков превышает требования к обработке, кеш будет восстановлен через определенный период времени. Если количества потоков недостаточно, будут созданы новые потоки.

Пример использования следующий:

public static void cachedThreadPool() {
    // 创建线程池
    ExecutorService threadPool = Executors.newCachedThreadPool();
    // 执行任务
    for (int i = 0; i < 10; i++) {
        threadPool.execute(() -> {
            System.out.println("任务被执行,线程:" + Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
            }
        });
    }
}

Результат выполнения следующий:image.pngКак видно из приведенных выше результатов, пул потоков создал 10 потоков для выполнения соответствующих задач.

3.SingleThreadExecutor

Создайте пул потоков с одним номером потока, который гарантирует порядок выполнения в порядке поступления.

Пример использования следующий:

public static void singleThreadExecutor() {
    // 创建线程池
    ExecutorService threadPool = Executors.newSingleThreadExecutor();
    // 执行任务
    for (int i = 0; i < 10; i++) {
        final int index = i;
        threadPool.execute(() -> {
            System.out.println(index + ":任务被执行");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
            }
        });
    }
}

Результат выполнения следующий:image.png

4.ScheduledThreadPool

Создайте пул потоков, который может выполнять отложенные задачи.

Пример использования следующий:

public static void scheduledThreadPool() {
    // 创建线程池
    ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(5);
    // 添加定时执行任务(1s 后执行)
    System.out.println("添加任务,时间:" + new Date());
    threadPool.schedule(() -> {
        System.out.println("任务被执行,时间:" + new Date());
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
        }
    }, 1, TimeUnit.SECONDS);
}

Результат выполнения следующий:image.pngКак видно из приведенных выше результатов, задача выполняется через 1 секунду, что соответствует нашим ожиданиям.

5.SingleThreadScheduledExecutor

Создайте однопоточный пул потоков, который может выполнять отложенные задачи.

Пример использования следующий:

public static void SingleThreadScheduledExecutor() {
    // 创建线程池
    ScheduledExecutorService threadPool = Executors.newSingleThreadScheduledExecutor();
    // 添加定时执行任务(2s 后执行)
    System.out.println("添加任务,时间:" + new Date());
    threadPool.schedule(() -> {
        System.out.println("任务被执行,时间:" + new Date());
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
        }
    }, 2, TimeUnit.SECONDS);
}

Результат выполнения следующий:image.pngКак видно из приведенных выше результатов, задача выполняется через 2 секунды, что соответствует нашим ожиданиям.

6.newWorkStealingPool

Создайте пул потоков для вытесняющего выполнения (порядок выполнения задач не определен). Обратите внимание, что этот метод доступен только в версиях JDK 1.8+.

Пример использования следующий:

public static void workStealingPool() {
    // 创建线程池
    ExecutorService threadPool = Executors.newWorkStealingPool();
    // 执行任务
    for (int i = 0; i < 10; i++) {
        final int index = i;
        threadPool.execute(() -> {
            System.out.println(index + " 被执行,线程名:" + Thread.currentThread().getName());
        });
    }
    // 确保任务执行完成
    while (!threadPool.isTerminated()) {
    }
}

Результат выполнения следующий:image.pngКак видно из вышеприведенных результатов, порядок выполнения задач неопределен, потому что выполняется упреждающе.

7.ThreadPoolExecutor

Самый примитивный способ создания пула потоков, он содержит 7 параметров для установки.

Пример использования следующий:

public static void myThreadPoolExecutor() {
    // 创建线程池
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
    // 执行任务
    for (int i = 0; i < 10; i++) {
        final int index = i;
        threadPool.execute(() -> {
            System.out.println(index + " 被执行,线程名:" + Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}

Результат выполнения следующий:image.png

Введение параметра ThreadPoolExecutor

ThreadPoolExecutor может установить до 7 параметров, как показано в следующем коде:

 public ThreadPoolExecutor(int corePoolSize,
                           int maximumPoolSize,
                           long keepAliveTime,
                           TimeUnit unit,
                           BlockingQueue<Runnable> workQueue,
                           ThreadFactory threadFactory,
                           RejectedExecutionHandler handler) {
     // 省略...
 }

Значения 7 параметров следующие:

Параметр 1: corePoolSize

Количество основных потоков, количество потоков, которые всегда активны в пуле потоков.

Параметр 2: максимальный размер пула

Максимальное количество потоков, максимальное количество потоков, разрешенных в пуле потоков, и максимальное количество потоков, которое может быть создано, когда очередь задач пула потоков заполнена.

Параметр 3: KeepAliveTime

Время, в течение которого может выжить максимальное количество потоков.Когда в потоке нет выполнения задачи, максимальный поток уничтожит часть и, наконец, сохранит количество основных потоков.

Параметр 4: единица измерения:

Единица используется в сочетании со временем выживания параметра 3. Вместе они используются для установки времени выживания потока.Единица времени параметра keepAliveTime имеет следующие 7 опций:

  • TimeUnit.DAYS: дни
  • TimeUnit.HOURS: Часы
  • TimeUnit.MINUTES: минуты
  • TimeUnit.SECONDS: секунды
  • TimeUnit.MILLISECONDS: миллисекунды
  • TimeUnit.MICROSECONDS: тонкий
  • TimeUnit.NANOSECONDS: наносекунды

Параметр 5: рабочая очередь

Очередь блокировки, используемая для хранения задач, ожидающих выполнения пулом потоков, является потокобезопасной и включает следующие 7 типов:

  • ArrayBlockingQueue: ограниченная очередь блокировки, состоящая из структур массива.
  • LinkedBlockingQueue: ограниченная очередь блокировки, состоящая из структуры связанного списка.
  • SynchronousQueue: блокирующая очередь, которая не хранит элементы, т. е. отправляет непосредственно в поток, не удерживая их.
  • PriorityBlockingQueue: неограниченная очередь блокировки, поддерживающая сортировку по приоритету.
  • DelayQueue: неограниченная блокирующая очередь, реализованная с использованием приоритетной очереди, из которой элементы могут быть извлечены только по истечении задержки.
  • LinkedTransferQueue: неограниченная очередь блокировки, состоящая из структуры связанного списка. Аналогичен SynchronousQueue, но также содержит неблокирующие методы.
  • LinkedBlockingDeque: Двунаправленная очередь блокировки, состоящая из структуры связанного списка.

Чаще используетсяLinkedBlockingQueueиSynchronous, стратегия организации очереди пула потоков такая же, какBlockingQueueСвязанный.

Параметр 6: threadFactory

Фабрика потоков, в основном используемая для создания потоков, по умолчанию имеет обычный приоритет, потоки без демона.

Параметр 7: обработчик

Стратегия отклонения, стратегия отказа от обработки задачи, в системе предусмотрено 4 варианта:

  • AbortPolicy: отклонить и создать исключение.
  • CallerRunsPolicy: используйте текущий вызывающий поток для выполнения этой задачи.
  • DiscardOldestPolicy: отменить задачу в начале (самой старой) очереди и выполнить текущую задачу.
  • DiscardPolicy: игнорировать и отклонять текущую задачу.

Политика по умолчаниюAbortPolicy.

Поток выполнения пула потоков

Процесс выполнения ключевых узлов ThreadPoolExecutor выглядит следующим образом:

  • Потоки создаются, когда количество потоков меньше количества основных потоков.
  • Когда количество потоков больше или равно количеству основных потоков, а очередь задач не заполнена, задача помещается в очередь задач.
  • Когда количество потоков больше или равно количеству основных потоков, а очередь задач заполнена: если количество потоков меньше максимального количества потоков, создать поток; если количество потоков равно максимальное количество потоков, возникает исключение и задача отклоняется.

Поток выполнения пула потоков показан на следующем рисунке:image.png

политика отклонения треда

Давайте продемонстрируем срабатывание политики отклонения ThreadPoolExecutor, мы используемDiscardPolicyСтратегия отклонения, она будет игнорировать и отбрасывать стратегию текущей задачи, код реализации выглядит следующим образом:

public static void main(String[] args) {
    // 任务的具体方法
    Runnable runnable = new Runnable() {
        @Override
        public void run() {
            System.out.println("当前任务被执行,执行时间:" + new Date() +
                               " 执行线程:" + Thread.currentThread().getName());
            try {
                // 等待 1s
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    };
    // 创建线程,线程的任务队列的长度为 1
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1,
                                                           100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1),
                                                           new ThreadPoolExecutor.DiscardPolicy());
    // 添加并执行 4 个任务
    threadPool.execute(runnable);
    threadPool.execute(runnable);
    threadPool.execute(runnable);
    threadPool.execute(runnable);
}

Мы создали пул потоков с количеством основных потоков и максимальным количеством потоков, равным 1, и установили очередь задач пула потоков на 1, так что, когда у нас будет более 2 задач, будет запущена политика отклонения. , а результат выполнения выглядит следующим образом:image.pngИз приведенных выше результатов видно, что правильно выполняются только две задачи, а остальные избыточные задачи отбрасываются и игнорируются. Использование других стратегий отказа аналогично и не будет повторяться здесь.

Пользовательская политика отказа

В дополнение к четырем политикам отклонения, предоставляемым самой Java, мы также можем настроить политики отклонения.Пример кода выглядит следующим образом:

public static void main(String[] args) {
    // 任务的具体方法
    Runnable runnable = new Runnable() {
        @Override
        public void run() {
            System.out.println("当前任务被执行,执行时间:" + new Date() +
                               " 执行线程:" + Thread.currentThread().getName());
            try {
                // 等待 1s
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    };
    // 创建线程,线程的任务队列的长度为 1
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1,
                                                           100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1),
                                                           new RejectedExecutionHandler() {
                                                               @Override
                                                               public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                                                                   // 执行自定义拒绝策略的相关操作
                                                                   System.out.println("我是自定义拒绝策略~");
                                                               }
                                                           });
    // 添加并执行 4 个任务
    threadPool.execute(runnable);
    threadPool.execute(runnable);
    threadPool.execute(runnable);
    threadPool.execute(runnable);
}

Результат выполнения программы следующий:image.png

Какой пул потоков использовать?

После изучения вышеизложенного у нас также есть определенное понимание всего пула потоков, так как же выбрать пул потоков?

Давайте посмотрим на ответ, который дает нам «Руководство по разработке Java» от Alibaba:

[Обязательное требование] Исполнителям не разрешено создавать пулы потоков, но используются ThreadPoolExecutors.Этот способ обработки позволяет авторам более точно указывать правила работы пулов потоков и избегать риска исчерпания ресурсов.

Примечание. Недостатки объекта пула потоков, возвращаемого Executors, заключаются в следующем:

1) FixedThreadPool и SingleThreadPool: допустимая длина очереди запросов — Integer.MAX_VALUE, что может привести к накоплению большого количества запросов, что приведет к OOM.

2) CachedThreadPool: разрешенное количество создаваемых потоков равно Integer.MAX_VALUE, и может быть создано большое количество потоков, что приведет к OOM.

Поэтому, учитывая вышеизложенное, мы рекомендуем использоватьThreadPoolExecutorСпособ создания пула потоков, потому что этот способ создания более контролируемый, а правила работы пула потоков более прояснены, что позволяет избежать некоторых неизвестных рисков.

Суммировать

В этой статье мы представили 7 способов создания пулов потоков, из которых наиболее рекомендуемым являетсяThreadPoolExecutorспособ создания пула потоков,ThreadPoolExecutorМожно установить до 7 параметров, конечно, 5 параметров также можно использовать в обычном режиме.ThreadPoolExecutorКогда задач слишком много (нельзя обработать), предусмотрены стратегии отклонения 4. Конечно, мы также можем настроить стратегию отказа. Надеюсь, содержание этой статьи поможет вам. Нелегко быть оригинальным, если вы думаете, что это хорошо, просто поставьте лайк и уходите!

Ссылки и благодарности

Специальности.Meituan.com/2020/04/02/…

блог woo woo woo.cn на.com/photographed/afraid/13…

Подпишитесь на официальный аккаунт «Java Chinese Community», чтобы узнать больше о галантерейных товарах.