Как изящно использовать пул потоков в Java?

Java задняя часть

Зачем использовать пул потоков?

Чем больше потоков, тем лучше?

  1. Поток — это объект в java, а также ресурс операционной системы, для создания и уничтожения потоков требуется время. Если время создания + небольшое время встречи > время выполнения задачи, это нерентабельно.
  2. Объекты Java занимают память кучи, а потоки операционной системы занимают системную память.Согласно спецификации JVM, максимальный размер стека потока по умолчанию составляет 1 М. Это пространство стека должно быть выделено из системной памяти. Слишком много потоков будет потреблять много памяти.
  3. Операционная система должна часто переключать контексты потоков (каждый поток хочет быть запущенным), что влияет на производительность.

Введение пула потоков должно облегчить контроль количества потоков на стороне.

Пул потоков

Основные понятия пула потоков

Пул потоков состоит из следующих четырех основных компонентов:

  1. менеджер пула потоков: используется для создания пулов потоков и управления ими, включая создание пулов потоков, уничтожение пулов потоков и добавление новых задач;
  2. рабочий поток: потоки в пуле потоков находятся в состоянии ожидания, когда нет задач, и могут выполнять задачи циклически;
  3. интерфейс задач: оправдание, которое каждая задача должна реализовать для рабочего потока, чтобы запланировать выполнение задачи, в основном указывает вход задачи, завершение работы после выполнения задачи, статус выполнения задачи и т. д.;
  4. очередь задач: Используется для хранения необработанных задач. Обеспечивает буферный механизм.

Определение интерфейса пула потоков и класс реализации

Можно считать, что ScheduledThreadPoolExector — самый богатый класс реализации.

ExecutorService

public interface ExecutorService extends Executor {
    /**
     * 优雅关闭线程池,之前提交的任务将被执行,但是不会接受新的任务。
     */
    void shutdown();

    /**
     * 尝试停止所有正在执行的任务,停止等待任务的处理,并返回等待执行任务的列表。
     */
    List<Runnable> shutdownNow();

    /**
     * 如果此线程池已关闭,则返回true.
     */
    boolean isShutdown();

    /**
     * 如果关闭后的所有任务都已完成,则返回true
     */
    boolean isTerminated();

    /**
     * 监测ExecutorService是否已经关闭,直到所有任务完成执行,或超时发生,或当前线程被中断。
     */
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

    /**
     * 提交一个用于执行的Callable返回任务,并返回一个Future,用于获取Callable执行结果。
     */
    <T> Future<T> submit(Callable<T> task);

    /**
     * 提交可运行任务以执行,并返回Future,执行结果为传入的result
     */
    <T> Future<T> submit(Runnable task, T result);

    /**
     * 提交可运行任务以执行,并返回Future对象,执行结果为null
     */
    Future<?> submit(Runnable task);

    /**
     * 执行给定的任务集合,执行完毕后,则返回结果。
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;

    /**
     * 执行给定的任务集合,执行完毕或者超时后,则返回结果,其他任务终止。
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;

    /**
     * 执行给定的任务,任意一个执行成功则返回结果,其他任务终止。
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;

    /**
     * 执行给定的任务,任意一个执行成功或者超时后,则返回结果,其他任务终止
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
}

ScheduledExecutorService

public interface ScheduledExecutorService extends ExecutorService {

    /**
     * 创建并执行一个一次性任务,过了延迟时间就会被执行
     */
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

    /**
     * 创建并执行一个一次性任务,过了延迟时间就会被执行
     */
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

    /**
     * 创建并执行一个周期性任务,过了给定的初始化延迟时间,会第一次被执行。执行过程中发生了异常,那么任务停止
     * 一次任务执行时长超过了周期时间,下一次任务会等到该次任务执行结束后,立刻执行,这也是它和scheduleWithTixedDelay的重要区别
     */
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

    /**
     * 创建并执行一个周期性任务,过了给定的初始化延迟时间,会第一次被执行。执行过程中发生了异常,那么任务停止
     * 一次任务执行时长超过了周期时间,下一次任务会在该次任务执行结束的时间基础上,计算执行延时。
     * 对于超时周期的长时间处理任务的不同处理方式,这是它和scheduleAtFixedRate的重要区别
     */
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

}

Вспомогательный класс пула потоков

Во время использования вы можете создать экземпляр пула потоков самостоятельно или использовать Executors для создания фабрики пула потоков.

newFixedThreadPool(int nThreads)

Создайте пул потоков фиксированного размера с неверно истолкованной емкостью очереди задач. Количество основных потоков = максимальное количество потоков.

newCachedThreadPool()

Создается буферизованный пул потоков неограниченного размера. Его очередь задач является синхронной очередью. Задача добавляется в пул, если в пуле есть незанятый поток, то она будет выполняться с незанятым потоком, если нет, то для выполнения будет создан новый поток. Потоки в пуле, простаивающие более 60 секунд, будут уничтожены и освобождены. Количество потоков зависит от количества задач. Подходит для выполнения асинхронных задач, которые занимают меньше времени. Основные потоки пула = 0, максимальное количество потоков = Integer.MAX_VALUE

newSingleThreadExecutor()

Пул с одним потоком, в котором только один поток выполняет неограниченную очередь задач. Пул потоков гарантирует, что порядок, в котором добавляются задачи, выполняется по одной за раз. Когда единственный поток прерывается из-за задачи, создается новый поток для продолжения выполнения последующих задач. Отличие от newFixedThreadPool(1) заключается в том, что размер пула одного пула потоков жестко запрограммирован в методе newSingleThreadExecutor и не может быть изменен.

newScheduledThreadPool(int corePoolSize)

Пул потоков, который может периодически выполнять задачи. Количество основных потоков в пуле задается параметром, максимальное количество потоков = Integer.MAX_VALUE

Процесс выполнения пула потоков задач

Как я могу подтвердить правильное количество потоков?

  • Если приложения интенсивно используют ЦП, размер потока устанавливается равным (N — общее количество ядер ЦП) N + 1
  • Если это приложение с интенсивным вводом-выводом, размер пула потоков устанавливается равным 2N + 1 (n — общее количество ЦП).
  • Чем выше процент времени ожидания потока (IO), тем больше потоков требуется.
  • Чем выше процент процессорного времени потока, тем меньше требуется потоков.

Самой быстрой частью системы является ЦП, поэтому именно ЦП определяет предел пропускной способности системы. Повышенная вычислительная мощность ЦП может увеличить верхний предел пропускной способности системы. Однако, согласно эффекту короткой платы, реальная пропускная способность системы не может быть рассчитана исключительно на основе ЦП. Чтобы улучшить пропускную способность системы, вам нужно начать с «недостатков системы» (таких как сетевая задержка, ввод-вывод):

  1. Попробуйте улучшить коэффициент распараллеливания операций с короткими досками, таких как технология многопоточной загрузки;
  2. Расширение возможностей короткой платы, например замена IO на NIO;

Анализ использования пула потоков

public class ExecutorsUse {
    /**
     * 测试: 提交15 个执行时间需要3秒的任务,看线程池的状况
     *
     * @param threadPoolExecutor 传入不同的线程池,看不同的结果
     * @throws Exception
     */
    public void testCommon(ThreadPoolExecutor threadPoolExecutor) throws Exception {
        // 测试: 提交15个执行时间需要3秒的任务,看超过大小的2个,对应的处理情况
        for (int i = 0; i < 15; i++) {
            int n = i;
            threadPoolExecutor.submit(() -> {
                        try {
                            System.out.println("开始执行:" + n);
                            Thread.sleep(3000L);
                            System.err.println("执行结束:" + n);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
            );
            System.out.println("任务提交成功 :" + i);
        }
        // 查看线程数量,查看队列等待数量
        Thread.sleep(500L);
        System.out.println("当前线程池线程数量为:" + threadPoolExecutor.getPoolSize());
        System.out.println("当前线程池等待的数量为:" + threadPoolExecutor.getQueue().size());
        // 等待15秒,查看线程数量和队列数量(理论上,会被超出核心线程数量的线程自动销毁)
        Thread.sleep(15000L);
        System.out.println("当前线程池线程数量为:" + threadPoolExecutor.getPoolSize());
        System.out.println("当前线程池等待的数量为:" + threadPoolExecutor.getQueue().size());
    }

    /**
     * 1、线程池信息: 核心线程数量5,最大数量10,无界队列,超出核心线程数量的线程存活时间:5秒, 指定拒绝策略
     *
     * @throws Exception
     */
    private void threadPoolExecutorTest1() throws Exception {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>());
        testCommon(threadPoolExecutor);
        // 预计结果:线程池线程数量为:5,超出数量的任务,其他的进入队列中等待被执行
    }

    /**
     * 2、 线程池信息: 核心线程数量5,最大数量10,队列大小3,超出核心线程数量的线程存活时间:5秒, 指定拒绝策略的
     *
     * @throws Exception
     */
    private void threadPoolExecutorTest2() throws Exception {
        // 创建一个 核心线程数量为5,最大数量为10,等待队列最大是3 的线程池,也就是最大容纳13个任务。
        // 默认的策略是抛出RejectedExecutionException异常,java.util.concurrent.ThreadPoolExecutor.AbortPolicy
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(3), new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.err.println("有任务被拒绝执行了");
            }
        });
        testCommon(threadPoolExecutor);
        // 预计结果:
        // 1、 5个任务直接分配线程开始执行
        // 2、 3个任务进入等待队列
        // 3、 队列不够用,临时加开5个线程来执行任务(5秒没活干就销毁)
        // 4、 队列和线程池都满了,剩下2个任务,没资源了,被拒绝执行。
        // 5、 任务执行,5秒后,如果无任务可执行,销毁临时创建的5个线程
    }

    /**
     * 3、 线程池信息: 核心线程数量5,最大数量5,无界队列,超出核心线程数量的线程存活时间:5秒
     *
     * @throws Exception
     */
    private void threadPoolExecutorTest3() throws Exception {
        // 和Executors.newFixedThreadPool(int nThreads)一样的
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
        testCommon(threadPoolExecutor);
        // 预计结:线程池线程数量为:5,超出数量的任务,其他的进入队列中等待被执行
    }

    /**
     * 4、 线程池信息:
     * 核心线程数量0,最大数量Integer.MAX_VALUE,SynchronousQueue队列,超出核心线程数量的线程存活时间:60秒
     *
     * @throws Exception
     */
    private void threadPoolExecutorTest4() throws Exception {

        // SynchronousQueue,实际上它不是一个真正的队列,因为它不会为队列中元素维护存储空间。与其他队列不同的是,它维护一组线程,这些线程在等待着把元素加入或移出队列。
        // 在使用SynchronousQueue作为工作队列的前提下,客户端代码向线程池提交任务时,
        // 而线程池中又没有空闲的线程能够从SynchronousQueue队列实例中取一个任务,
        // 那么相应的offer方法调用就会失败(即任务没有被存入工作队列)。
        // 此时,ThreadPoolExecutor会新建一个新的工作者线程用于对这个入队列失败的任务进行处理(假设此时线程池的大小还未达到其最大线程池大小maximumPoolSize)。

        // 和Executors.newCachedThreadPool()一样的
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>());
        testCommon(threadPoolExecutor);
        // 预计结果:
        // 1、 线程池线程数量为:15,超出数量的任务,其他的进入队列中等待被执行
        // 2、 所有任务执行结束,60秒后,如果无任务可执行,所有线程全部被销毁,池的大小恢复为0
        Thread.sleep(60000L);
        System.out.println("60秒后,再看线程池中的数量:" + threadPoolExecutor.getPoolSize());
    }

    /**
     * 5、 定时执行线程池信息:3秒后执行,一次性任务,到点就执行 <br/>
     * 核心线程数量5,最大数量Integer.MAX_VALUE,DelayedWorkQueue延时队列,超出核心线程数量的线程存活时间:0秒
     *
     * @throws Exception
     */
    private void threadPoolExecutorTest5() throws Exception {
        // 和Executors.newScheduledThreadPool()一样的
        ScheduledThreadPoolExecutor threadPoolExecutor = new ScheduledThreadPoolExecutor(5);
        threadPoolExecutor.schedule(new Runnable() {
            @Override
            public void run() {
                System.out.println("任务被执行,现在时间:" + System.currentTimeMillis());
            }
        }, 3000, TimeUnit.MILLISECONDS);
        System.out.println(
                "定时任务,提交成功,时间是:" + System.currentTimeMillis() + ", 当前线程池中线程数量:" + threadPoolExecutor.getPoolSize());
        // 预计结果:任务在3秒后被执行一次
    }

    /**
     * 6、 定时执行线程池信息:线程固定数量5 ,<br/>
     * 核心线程数量5,最大数量Integer.MAX_VALUE,DelayedWorkQueue延时队列,超出核心线程数量的线程存活时间:0秒
     *
     * @throws Exception
     */
    private void threadPoolExecutorTest6() throws Exception {
        ScheduledThreadPoolExecutor threadPoolExecutor = new ScheduledThreadPoolExecutor(5);
        // 周期性执行某一个任务,线程池提供了两种调度方式,这里单独演示一下。测试场景一样。
        // 测试场景:提交的任务需要3秒才能执行完毕。看两种不同调度方式的区别
        // 效果1: 提交后,2秒后开始第一次执行,之后每间隔1秒,固定执行一次(如果发现上次执行还未完毕,则等待完毕,完毕后立刻执行)。
        // 也就是说这个代码中是,3秒钟执行一次(计算方式:每次执行三秒,间隔时间1秒,执行结束后马上开始下一次执行,无需等待)
        threadPoolExecutor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(3000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任务-1 被执行,现在时间:" + System.currentTimeMillis());
            }
        }, 2000, 1000, TimeUnit.MILLISECONDS);

        // 效果2:提交后,2秒后开始第一次执行,之后每间隔1秒,固定执行一次(如果发现上次执行还未完毕,则等待完毕,等上一次执行完毕后再开始计时,等待1秒)。
        // 也就是说这个代码钟的效果看到的是:4秒执行一次。 (计算方式:每次执行3秒,间隔时间1秒,执行完以后再等待1秒,所以是 3+1)
        threadPoolExecutor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(3000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任务-2 被执行,现在时间:" + System.currentTimeMillis());
            }
        }, 2000, 1000, TimeUnit.MILLISECONDS);
    }

    /**
     * 7、 终止线程:线程池信息: 核心线程数量5,最大数量10,队列大小3,超出核心线程数量的线程存活时间:5秒, 指定拒绝策略的
     *
     * @throws Exception
     */
    private void threadPoolExecutorTest7() throws Exception {
        // 创建一个 核心线程数量为5,最大数量为10,等待队列最大是3 的线程池,也就是最大容纳13个任务。
        // 默认的策略是抛出RejectedExecutionException异常,java.util.concurrent.ThreadPoolExecutor.AbortPolicy
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(3), new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.err.println("有任务被拒绝执行了");
            }
        });
        // 测试: 提交15个执行时间需要3秒的任务,看超过大小的2个,对应的处理情况
        for (int i = 0; i < 15; i++) {
            int n = i;
            threadPoolExecutor.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("开始执行:" + n);
                        Thread.sleep(3000L);
                        System.err.println("执行结束:" + n);
                    } catch (InterruptedException e) {
                        System.out.println("异常:" + e.getMessage());
                    }
                }
            });
            System.out.println("任务提交成功 :" + i);
        }
        // 1秒后终止线程池
        Thread.sleep(1000L);
        threadPoolExecutor.shutdown();
        // 再次提交提示失败
        threadPoolExecutor.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println("追加一个任务");
            }
        });
        // 结果分析
        // 1、 10个任务被执行,3个任务进入队列等待,2个任务被拒绝执行
        // 2、调用shutdown后,不接收新的任务,等待13任务执行结束
        // 3、 追加的任务在线程池关闭后,无法再提交,会被拒绝执行
    }

    /**
     * 8、 立刻终止线程:线程池信息: 核心线程数量5,最大数量10,队列大小3,超出核心线程数量的线程存活时间:5秒, 指定拒绝策略的
     *
     * @throws Exception
     */
    private void threadPoolExecutorTest8() throws Exception {
        // 创建一个 核心线程数量为5,最大数量为10,等待队列最大是3 的线程池,也就是最大容纳13个任务。
        // 默认的策略是抛出RejectedExecutionException异常,java.util.concurrent.ThreadPoolExecutor.AbortPolicy
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(3), new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.err.println("有任务被拒绝执行了");
            }
        });
        // 测试: 提交15个执行时间需要3秒的任务,看超过大小的2个,对应的处理情况
        for (int i = 0; i < 15; i++) {
            int n = i;
            threadPoolExecutor.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("开始执行:" + n);
                        Thread.sleep(3000L);
                        System.err.println("执行结束:" + n);
                    } catch (InterruptedException e) {
                        System.out.println("异常:" + e.getMessage());
                    }
                }
            });
            System.out.println("任务提交成功 :" + i);
        }
        // 1秒后终止线程池
        Thread.sleep(1000L);
        List<Runnable> shutdownNow = threadPoolExecutor.shutdownNow();
        // 再次提交提示失败
        threadPoolExecutor.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println("追加一个任务");
            }
        });
        System.out.println("未结束的任务有:" + shutdownNow.size());

        // 结果分析
        // 1、 10个任务被执行,3个任务进入队列等待,2个任务被拒绝执行
        // 2、调用shutdownnow后,队列中的3个线程不再执行,10个线程被终止
        // 3、 追加的任务在线程池关闭后,无法再提交,会被拒绝执行
    }

    public static void main(String[] args) throws Exception {
//        new ExecutorsUse().threadPoolExecutorTest1();
//        new ExecutorsUse().threadPoolExecutorTest2();
//          new ExecutorsUse().threadPoolExecutorTest3();
        new ExecutorsUse().threadPoolExecutorTest4();
//        new ExecutorsUse().threadPoolExecutorTest5();
//        new ExecutorsUse().threadPoolExecutorTest6();
//        new ExecutorsUse().threadPoolExecutorTest7();
//        new ExecutorsUse().threadPoolExecutorTest8();
    }
}