Серия многопоточности в Java: структура Executors

Java задняя часть GitHub API

содержание

  1. Знакомство с интерфейсом исполнителя
  2. Введение в общие интерфейсы ExecutorService
  3. Знакомство с некоторыми методами создания пулов потоков
  4. Вопрос ответ

Знакомство с интерфейсом исполнителя

Executor — это интерфейс, предоставляющий метод execute, который получает параметр Runable следующим образом.

public interface Executor {
    void execute(Runnable command);
}

Общая схема структуры классов и интерфейсов фреймворка Executor

Executor框架的常用类和接口结构图

Объект потока и объект, возвращаемый выполнением потока

线程对象及线程执行返回的对象

объект потока

Объект потока — это задача, отправленная в пул потоков, который может реализовывать интерфейс Runable или интерфейс Callable. Возможно, здесь возникнет вопрос, почему интерфейс Runable и интерфейс Callable никак не связаны друг с другом, но оба могут выполняться как задачи? Вы можете подумать об этом, конец статьи объяснит это

Будущий интерфейс

Интерфейс Future и класс FutureTask используются для получения результатов, возвращаемых потоком после асинхронного выполнения.Вы можете видеть, что метод отправки интерфейса ExecutorService ниже возвращает Future.

Введение в общие интерфейсы ExecutorService

Далее давайте взглянем на ExecutorService, который наследует интерфейс Executor.

public interface ExecutorService extends Executor {
    //正常关闭(不再接收新任务,执行完队列中的任务)
    void shutdown();
	//强行关闭(关闭当前正在执行的任务,返回所有尚未启动的任务清单)
    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);
	...
}

Введение в конструктор ThreadPoolExecutor

Прежде чем вводить метод ношения пула потоков, мы должны сначала ввести класс ThreadPoolExecutor.Большинство методов фабрики Executors возвращают объекты ThreadPoolExecutor.Давайте посмотрим на его конструктор.

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {...}

Введение параметра

параметр Типы имея в виду
corePoolSize int количество основных потоков
maximumPoolSize int максимальное количество потоков
keepAliveTime long время выживания
unit TimeUnit единица времени
workQueue BlockingQueue Очередь для хранения потоков
threadFactory ThreadFactory Фабрика для создания потоков
handler RejectedExecutionHandler избыточные обработчики потоков (политика отклонения)

Знакомство с некоторыми методами создания пулов потоков

Зачем говорить об интерфейсе ExecutorService? Это потому, что большая часть того, что мы возвращаем, когда используем методы Executors, — это ExecutorService. Executors предоставляет несколько методов для создания пулов потоков, и я расскажу об этих методах далее.

newFixedThreadPool(int nThreads)
创建一个线程的线程池,若空闲则执行,若没有空闲线程则暂缓在任务队列中。

newWorkStealingPool()
创建持有足够线程的线程池来支持给定的并行级别,并通过使用多个队列,减少竞争,它需要穿一个并行级别的参数,如果不传,则被设定为默认的CPU数量。

newSingleThreadExecutor()
该方法返回一个固定数量的线程池  
该方法的线程始终不变,当有一个任务提交时,若线程池空闲,则立即执行,若没有,则会被暂缓在一个任务队列只能怪等待有空闲的线程去执行。

newCachedThreadPool() 
返回一个可根据实际情况调整线程个数的线程池,不限制最大线程数量,若有空闲的线程则执行任务,若无任务则不创建线程,并且每一个空闲线程会在60秒后自动回收。

newScheduledThreadPool(int corePoolSize)
返回一个SchededExecutorService对象,但该线程池可以设置线程的数量,支持定时及周期性任务执行。
 
newSingleThreadScheduledExecutor()
创建一个单例线程池,定期或延时执行任务。  
 

Далее объясняются следующие распространенные методы, создание единого не будет объясняться

метод newFixedThreadPool

Этот метод создает пул потоков с заданным количеством потоков. Количество потоков, которые можно хранить, не ограничено (неограниченная очередь). Он подходит для сценариев, в которых задачи потоков выполняются быстро.

FixedThreadPool的execute()的运行示意图

Взгляните на внутреннюю реализацию фабрики Executors.

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

Видно, что возвращается объект ThreadPoolExecutor, количество основных потоков и максимальное количество потоков являются входящими параметрами, время выживания равно 0, единица измерения времени — миллисекунды, а очередь блокировки — неограниченная очередь LinkedBlockingQueue.

Так как очередь использует неограниченную очередь LinkedBlockingQueue, максимальное количество потоков, maxPoolSize и keepAliveTime, являются недопустимыми параметрами, и политика отклонения также будет недействительной.Почему?

Вот еще вопрос.Неограниченная очередь означает, что нет верхнего предела для задач.Если выполняемые задачи трудоемки, то новые задачи всегда будут храниться в пуле потоков, а задач в пуле потоков будет все больше и больше , Каковы будут последствия? Вы можете попробовать следующий код

public class Main {

    public static void main(String[] args){
        ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

        while (true){
            pool.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }

    }
}

образец кода

public class Main {

    public static void main(String[] args){
        ExecutorService pool = Executors.newFixedThreadPool(4);

        for (int i = 0; i < 8; i++) {
            int finalI = i + 1;
            pool.submit(() -> {
                try {
                    System.out.println("任务"+ finalI +":开始等待2秒,时间:"+LocalTime.now()+",当前线程名:"+Thread.currentThread().getName());
                    Thread.sleep(2000);
                    System.out.println("任务"+ finalI +":结束等待2秒,时间:"+LocalTime.now()+",当前线程名:"+Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });

        }
        pool.shutdown();
    }
}

выходной результат

任务4:开始等待2秒,时间:17:13:22.048,当前线程名:pool-1-thread-4
任务2:开始等待2秒,时间:17:13:22.048,当前线程名:pool-1-thread-2
任务3:开始等待2秒,时间:17:13:22.048,当前线程名:pool-1-thread-3
任务1:开始等待2秒,时间:17:13:22.048,当前线程名:pool-1-thread-1

任务2:结束等待2秒,时间:17:13:24.048,当前线程名:pool-1-thread-2
任务3:结束等待2秒,时间:17:13:24.048,当前线程名:pool-1-thread-3
任务1:结束等待2秒,时间:17:13:24.048,当前线程名:pool-1-thread-1
任务4:结束等待2秒,时间:17:13:24.048,当前线程名:pool-1-thread-4
任务6:开始等待2秒,时间:17:13:24.049,当前线程名:pool-1-thread-4
任务7:开始等待2秒,时间:17:13:24.049,当前线程名:pool-1-thread-1
任务5:开始等待2秒,时间:17:13:24.049,当前线程名:pool-1-thread-3
任务8:开始等待2秒,时间:17:13:24.049,当前线程名:pool-1-thread-2

任务5:结束等待2秒,时间:17:13:26.050,当前线程名:pool-1-thread-3
任务7:结束等待2秒,时间:17:13:26.050,当前线程名:pool-1-thread-1
任务8:结束等待2秒,时间:17:13:26.051,当前线程名:pool-1-thread-2
任务6:结束等待2秒,时间:17:13:26.050,当前线程名:pool-1-thread-4

Видно, что задачи 1-4 выполняются одновременно, и выполнение завершается через 2 секунды, а задачи 5-8 запускаются одновременно. Это означает, что внутри метода создается только 4 потока, а остальные задачи хранятся в очереди, ожидая выполнения.

метод newCachedThreadPool

Пул потоков, созданный методом newCachedThreadPool, автоматически создает новые потоки по мере необходимости.

CachedThreadPool的execute()的运行示意图

Взгляните на внутреннюю реализацию фабрики Executors.

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

Метод newCachedThreadPool также возвращает объект ThreadPoolExecutor, основной поток равен 0, максимальное количество потоков равно максимальному значению MAX_VALUE Integer, время выживания равно 60, единица измерения времени — секунды, а очередь SynchronousQueue.

Из входящих параметров можно узнать, что время выживания бездействующего потока в методе newCachedThreadPool составляет 60 секунд, и поток будет завершен, как только оно превысит 60 секунд. Здесь также подразумевается проблема: если исполняемый поток медленный, а скорость отправки задач выше, чем скорость выполнения потока, новые потоки будут создаваться непрерывно, что приведет к росту процессора и памяти.

Код зацикливается, чтобы добавить новые задачи потока, такие как newFixedThreadPool, и при запуске моего компьютера возникает следующая ошибка.

An unrecoverable stack overflow has occurred.

Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread
	at java.lang.Thread.start0(Native Method)
	at java.lang.Thread.start(Thread.java:714)
	at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:950)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1368)
	at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
	at com.learnConcurrency.executor.cachedThreadPool.Main.main(Main.java:11)
Process finished with exit code -1073741571 (0xC00000FD)

Что касается очереди SynchronousQueue, то это блокирующая очередь без пропускной способности, схема доставки задач выглядит следующим образом.

CachedThreadPool的任务传递示意图

образец кода

public class Main {
    public static void main(String[] args) throws Exception{
        ExecutorService pool = Executors.newCachedThreadPool();
        for (int i = 0; i < 8; i++) {
            int finalI = i + 1;
            pool.submit(() -> {
                try {
                    System.out.println("任务"+ finalI +":开始等待60秒,时间:"+LocalTime.now()+",当前线程名:"+Thread.currentThread().getName());
                    Thread.sleep(60000);
                    System.out.println("任务"+ finalI +":结束等待60秒,时间:"+LocalTime.now()+",当前线程名:"+Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            //睡眠10秒
            Thread.sleep(10000);
        }
        pool.shutdown();
    }
}

Результаты

任务1:开始等待60秒,时间:17:15:21.570,当前线程名:pool-1-thread-1
任务2:开始等待60秒,时间:17:15:31.553,当前线程名:pool-1-thread-2
任务3:开始等待60秒,时间:17:15:41.555,当前线程名:pool-1-thread-3
任务4:开始等待60秒,时间:17:15:51.554,当前线程名:pool-1-thread-4
任务5:开始等待60秒,时间:17:16:01.554,当前线程名:pool-1-thread-5
任务6:开始等待60秒,时间:17:16:11.555,当前线程名:pool-1-thread-6
任务7:开始等待60秒,时间:17:16:21.555,当前线程名:pool-1-thread-7
任务1:结束等待60秒,时间:17:16:21.570,当前线程名:pool-1-thread-1
任务2:结束等待60秒,时间:17:16:31.554,当前线程名:pool-1-thread-2

任务8:开始等待60秒,时间:17:16:31.556,当前线程名:pool-1-thread-2
任务3:结束等待60秒,时间:17:16:41.555,当前线程名:pool-1-thread-3
任务4:结束等待60秒,时间:17:16:51.556,当前线程名:pool-1-thread-4
任务5:结束等待60秒,时间:17:17:01.556,当前线程名:pool-1-thread-5
任务6:结束等待60秒,时间:17:17:11.555,当前线程名:pool-1-thread-6
任务7:结束等待60秒,时间:17:17:21.556,当前线程名:pool-1-thread-7
任务8:结束等待60秒,时间:17:17:31.557,当前线程名:pool-1-thread-2

В примере кода каждая задача приостанавливается на 60 секунд, и каждый цикл добавляет задачу в спящий режим на 10 секунд.Из результатов выполнения видно, что 7 добавленных задач выполняются разными потоками, и в это время оба потока 1 и 2 выполнено. , задача 8 добавляется и выполняется созданным ранее пулом-1-потоком-2.

метод newScheduledThreadPool

Этот пул потоков в основном используется для задержки выполнения задач или для периодического выполнения задач.

Взгляните на внутреннюю реализацию фабрики Executors.

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

Здесь возвращается объект ScheduledThreadPoolExecutor, давайте углубимся и посмотрим

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

Здесь вызывается конструктор родительского класса.Родительским классом ScheduledThreadPoolExecutor является ThreadPoolExecutor, поэтому возвращаемый объект также является ThreadPoolExecutor. Количество основных потоков — это входящий параметр corePoolSize, максимальное значение потока — MAX_VALUE целого числа, время выживания — 0, единица времени — наносекунды, а очередь — DelayedWorkQueue.

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {}

Вот некоторые методы ScheduledExecutorService

public interface ScheduledExecutorService extends ExecutorService {
	//delay延迟时间,unit延迟单位,只执行1次,在经过delay延迟时间之后开始执行
    public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
	//首次执行时间时然后在initialDelay之后,然后在initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
	//首次执行时间时然后在initialDelay之后,然后延迟delay时间执行
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
}

Вопрос ответ

Запускаемый интерфейс и вызываемый интерфейс

Затем взгляните на запись задачи отправки.

Метод отправки реализован абстрактным классом AbstractExecutorService.

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

Видно, что входящий объект Runnable и Callable передаются в метод newTaskFor, а затем возвращается объект RunnableFuture.

Давайте еще раз посмотрим на метод newTaskFor.

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

Вот конструкторы, которые вызывают FutureTask, давайте посмотрим вниз

private Callable<V> callable;

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;      
}

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       
}

В классе FutureTask есть вызываемая переменная-член, и входящий объект Runnable продолжает вызывать вызываемый метод фабричного класса Executors для возврата объекта Callable.

public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
}
//适配器
static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}

Что ж, здесь раскрывается правда: объект Runnable окончательно адаптируется в объект Callable адаптером RunnableAdapter после серии вызовов методов. График вызова метода выглядит следующим образом

方法调用图

Адрес GitHub

адрес здесь

Я думаю, что хорошо указать звезду

В следующей статье будет представлен пользовательский пул потоков, а метод newWorkStealingPool будет обновлен позже.

использованная литература

[1] Искусство параллельного программирования на Java

[2] Практика параллельного программирования на Java