Серия мертвых потоков Java самостоятельно пишет пул потоков

Java

Добро пожаловать, чтобы обратить внимание на мою общедоступную учетную запись «Брат Тонг читает исходный код», проверить больше статей из серии исходного кода и поплавать в океане исходного кода с братом Тонгом.

mythreadpool

(удобнее просматривать исходный код на горизонтальном экране мобильного телефона)

проблема

(1) Какие факторы необходимо учитывать при написании пула потоков самостоятельно?

(2) Как протестировать пул потоков, написанный вами?

Введение

Пул потоков — это технология, часто используемая в параллельном программировании на Java, так как же написать пул потоков самостоятельно? В этой статье брат Тонг поможет вам написать доступный пул потоков.

анализ атрибутов

Пул потоков, как следует из названия, это прежде всего «пул», в котором размещаются потоки, и потоки используются для выполнения задач.

во-первых, потоки в пуле потоков должны быть классифицированы, некоторые из них являются основными потоками, некоторые — неосновными потоками, поэтому нам нужны две переменные для определения количества основных потоков coreSize и максимального количества потоков maxSize.

Зачем различать, является ли это основным потоком? Это нужно для контроля количества потоков в системе.

Когда количество потоков в пуле потоков не достигает количества основных потоков coreSize, можно добавить поток к задаче, а также повысить эффективность выполнения задачи.

Когда количество потоков в пуле потоков достигает количества основных потоков, необходимо контролировать количество потоков и приходить в расширенную очередь для задач.Если выполнение задачи достаточно быстрое, эти основные потоки могут быстро завершить выполнение задач в очереди, а нового добавления, необходимого для потока, вообще нет.

Когда задачи в очереди заполнены, основные потоки сами по себе не могут вовремя обработать задачи, поэтому в это время необходимо добавлять новые потоки, но потоки не могут увеличиваться до бесконечности, поэтому необходимо контролировать максимальное количество потоков. максимальный размер.

Второй, Нам нужна очередь задач для хранения задач. Эта очередь должна быть потокобезопасной. Обычно мы используем BlockingQueue в качестве блокирующей очереди. Конечно, также можно использовать ConcurrentLinkedQueue (обратите внимание, что ConcurrentLinkedQueue не является блокирующей очередью и не может использоваться в пуле потоков jdk).

Наконец, когда задач становится все больше и обработка потока происходит несвоевременно, рано или поздно он дойдет до состояния, очередь заполнена, а количество потоков достигло максимального количества потоков, что мне при этом делать время? В это время необходимо принять стратегию отклонения, которая представляет собой стратегию того, что делать с этими задачами, которые не могут быть обработаны вовремя.Обычные стратегии включают отбрасывание текущей задачи, отбрасывание самой старой задачи, обработку самого вызывающего абонента и выбрасывание исключений.

Согласно приведенному выше описанию, Мы определяем пул потоков, для которого требуется всего четыре переменные: количество основных потоков coreSize, максимальное количество потоков maxSize, очередь блокировки BlockingQueue и политика отклонения RejectPolicy.

Кроме того, чтобы дать пулу потоков имя, мы добавляем еще одну переменную: имя пула потоков.

Таким образом, мы получили следующие свойства и методы построения пула потоков:

public class MyThreadPoolExecutor implements Executor {
    /**
     * 线程池的名称
     */
    private String name;
    /**
     * 核心线程数
     */
    private int coreSize;
    /**
     * 最大线程数
     */
    private int maxSize;
    /**
     * 任务队列
     */
    private BlockingQueue<Runnable> taskQueue;
    /**
     * 拒绝策略
     */
    private RejectPolicy rejectPolicy;

    public MyThreadPoolExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {
        this.name = name;
        this.coreSize = coreSize;
        this.maxSize = maxSize;
        this.taskQueue = taskQueue;
        this.rejectPolicy = rejectPolicy;
    }
}

анализ потока задач

Согласно приведенному выше анализу атрибутов, в основном мы получили полную логику потока задач:

во-первых, если количество запущенных потоков меньше количества основных потоков, непосредственно создайте новый основной поток для запуска новой задачи.

Второй, если количество запущенных потоков достигает количества основных потоков, новая задача помещается в очередь.

потом, а если очередь также заполнена, создайте новый неосновной поток для запуска новой задачи.

Наконец, если количество неосновных потоков также достигает максимума, то выполняется политика отклонения.

mythreadpool

Логика кода примерно такая:

    @Override
    public void execute(Runnable task) {
        // 正在运行的线程数
        int count = runningCount.get();
        // 如果正在运行的线程数小于核心线程数,直接加一个线程
        if (count < coreSize) {
            // 注意,这里不一定添加成功,addWorker()方法里面还要判断一次是不是确实小
            if (addWorker(task, true)) {
                return;
            }
            // 如果添加核心线程失败,进入下面的逻辑
        }

        // 如果达到了核心线程数,先尝试让任务入队
        // 这里之所以使用offer(),是因为如果队列满了offer()会立即返回false
        if (taskQueue.offer(task)) {
            // do nothing,为了逻辑清晰这里留个空if
            // 【本篇文章由公众号“彤哥读源码”原创】
        } else {
            // 如果入队失败,说明队列满了,那就添加一个非核心线程
            if (!addWorker(task, false)) {
                // 如果添加非核心线程失败了,那就执行拒绝策略
                rejectPolicy.reject(task, this);
            }
        }
    }

Создать анализ потока

во-первыхОсновой для создания потоков является то, достигло ли количество запущенных потоков количества основных потоков или максимального количества потоков, поэтому нам также нужна переменная runningCount для записи количества запущенных потоков.

Второй, эта переменная runningCount должна быть добавлена ​​или вычтена в параллельной среде, поэтому необходимо использовать инструкцию CAS Unsafe для управления изменением ее значения. Когда используется CAS, к этой переменной добавляется изменчивая модификация. Для удобства здесь мы напрямую используем AtomicInteger, чтобы быть типом этой переменной.

потом, так как он находится в параллельной среде, необходимо оценить runningCount

Наконец, создайте поток и запускайте новые задачи, а также продолжайте брать задачи из очереди для запуска [Эта статья изначально была создана общедоступной учетной записью «Tong Ge Reading Source Code»].

mythreadpool

Логика кода следующая:

    private boolean addWorker(Runnable newTask, boolean core) {
        // 自旋判断是不是真的可以创建一个线程
        for (; ; ) {
            // 正在运行的线程数
            int count = runningCount.get();
            // 核心线程还是非核心线程
            int max = core ? coreSize : maxSize;
            // 不满足创建线程的条件,直接返回false
            if (count >= max) {
                return false;
            }
            // 修改runningCount成功,可以创建线程
            if (runningCount.compareAndSet(count, count + 1)) {
                // 线程的名字
                String threadName = (core ? "core_" : "") + name + sequence.incrementAndGet();
                // 创建线程并启动
                new Thread(() -> {
                    System.out.println("thread name: " + Thread.currentThread().getName());
                    // 运行的任务
                    Runnable task = newTask;
                    // 不断从任务队列中取任务执行,如果取出来的任务为null,则跳出循环,线程也就结束了
                    while (task != null || (task = getTask()) != null) {
                        try {
                            // 执行任务
                            task.run();
                        } finally {
                            // 任务执行完成,置为空
                            task = null;
                        }
                    }
                }, threadName).start();

                break;
            }
        }

        return true;
    }

Проведите логический анализ задачи

Для извлечения задач из очереди следует использовать метод take(). Этот метод будет блокироваться до тех пор, пока задача не будет извлечена или прервана. Если он будет прерван, он вернет значение null, чтобы текущий поток мог спокойно завершиться runningCount минус один.

    private Runnable getTask() {
        try {
            // take()方法会一直阻塞直到取到任务为止
            return taskQueue.take();
        } catch (InterruptedException e) {
            // 线程中断了,返回null可以结束当前线程
            // 当前线程都要结束了,理应要把runningCount的数量减一
            runningCount.decrementAndGet();
            return null;
        }
    }

Ну вот мы и закончили писать собственный пул потоков, давайте вместе подумаем, как его протестировать?

Логический анализ тестов

Давайте рассмотрим метод построения нашего собственного пула потоков:

    public MyThreadPoolExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {
        this.name = name;
        this.coreSize = coreSize;
        this.maxSize = maxSize;
        this.taskQueue = taskQueue;
        this.rejectPolicy = rejectPolicy;
    }

имя, это только что передано;

coreSize, мы предполагаем 5;

maxSize, мы предполагаем 10;

taskQueue, очередь задач, так как мы задаем границу, то будем использовать простейшую ArrayBlockingQueue, вместимость задана 15, так что в ней можно хранить до 15 задач;

rejectPolicy, политика отклонения, мы принимаем политику отбрасывания текущей задачи, хорошо, давайте реализуем ее.

/**
 * 丢弃当前任务
 */
public class DiscardRejectPolicy implements RejectPolicy {
    @Override
    public void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor) {
        // do nothing
        System.out.println("discard one task");
    }
}

Итак, такой пул потоков создан, и далее идет выполнение задачи.Предположим, через цикл for непрерывно добавляется 100 задач.

public class MyThreadPoolExecutorTest {
    public static void main(String[] args) {
        Executor threadPool = new MyThreadPoolExecutor("test", 5, 10, new ArrayBlockingQueue<>(15), new DiscardRejectPolicy());
        AtomicInteger num = new AtomicInteger(0);

        for (int i = 0; i < 100; i++) {
            threadPool.execute(()->{
                try {
                    Thread.sleep(1000);
                    System.out.println("running: " + System.currentTimeMillis() + ": " + num.incrementAndGet());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

Давайте проанализируем эту программу:

(1) Во-первых, непрерывно создаются 5 основных потоков и выполняются новые задачи;

(2) В очередь встали следующие 15 задач;

(3) Когда очередь заполнена, создается 5 потоков подряд и выполняются новые задачи;

(4) Следующие задачи не могут быть выполнены, и используются все стратегии отбрасывания;

(5) Таким образом, действительно успешных задач должно быть 5 + 15 + 5 = 25 задач;

Запустить его:

thread name: core_test2
thread name: core_test5
thread name: core_test3
thread name: core_test4
thread name: core_test1
thread name: test6
thread name: test7
thread name: test8
thread name: test9
discard one task
thread name: test10
discard one task
...省略被拒绝的任务
【本篇文章由公众号“彤哥读源码”原创】
discard one task
running: 1570546871851: 2
running: 1570546871851: 8
running: 1570546871851: 7
running: 1570546871851: 6
running: 1570546871851: 5
running: 1570546871851: 3
running: 1570546871851: 4
running: 1570546871851: 1
running: 1570546871851: 10
running: 1570546871851: 9
running: 1570546872852: 14
running: 1570546872852: 20
running: 1570546872852: 19
running: 1570546872852: 17
running: 1570546872852: 18
running: 1570546872852: 16
running: 1570546872852: 15
running: 1570546872852: 12
running: 1570546872852: 13
running: 1570546872852: 11
running: 1570546873852: 21
running: 1570546873852: 24
running: 1570546873852: 23
running: 1570546873852: 25
running: 1570546873852: 22

Видно, что создано 5 основных потоков и 5 неосновных, а задач успешно выполнено 25. С завершением проблем нет, отлично ^^.

Суммировать

(1) Основными факторами, которые следует учитывать при самостоятельном написании пула потоков, являются: количество основных потоков, максимальное количество потоков, очередь задач и политика отклонения.

(2) При создании потоков всегда помните о ловушках параллелизма;

пасхальные яйца

Мы знаем, что пул потоков, поставляемый с jdk, имеет два параметра: keepAliveTime и unit, что они делают?

О: Они используются для управления тем, когда следует уничтожать второстепенные потоки. Конечно, основные потоки также могут быть уничтожены. Пожалуйста, ждите подробного анализа в следующей главе.

Полный исходный код

Интерфейс исполнителя

public interface Executor {
    void execute(Runnable command);
}

Класс реализации пула потоков MyThreadPoolExecutor

/**
 * 自动动手写一个线程池
 */
public class MyThreadPoolExecutor implements Executor {

    /**
     * 线程池的名称
     */
    private String name;
    /**
     * 线程序列号
     */
    private AtomicInteger sequence = new AtomicInteger(0);
    /**
     * 核心线程数
     */
    private int coreSize;
    /**
     * 最大线程数
     */
    private int maxSize;
    /**
     * 任务队列
     */
    private BlockingQueue<Runnable> taskQueue;
    /**
     * 拒绝策略
     */
    private RejectPolicy rejectPolicy;
    /**
     * 当前正在运行的线程数【本篇文章由公众号“彤哥读源码”原创】
     * 需要修改时线程间立即感知,所以使用AtomicInteger
     * 或者也可以使用volatile并结合Unsafe做CAS操作(参考Unsafe篇章讲解)
     */
    private AtomicInteger runningCount = new AtomicInteger(0);

    public MyThreadPoolExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {
        this.name = name;
        this.coreSize = coreSize;
        this.maxSize = maxSize;
        this.taskQueue = taskQueue;
        this.rejectPolicy = rejectPolicy;
    }

    @Override
    public void execute(Runnable task) {
        // 正在运行的线程数
        int count = runningCount.get();
        // 如果正在运行的线程数小于核心线程数,直接加一个线程
        if (count < coreSize) {
            // 注意,这里不一定添加成功,addWorker()方法里面还要判断一次是不是确实小
            if (addWorker(task, true)) {
                return;
            }
            // 如果添加核心线程失败,进入下面的逻辑
        }

        // 如果达到了核心线程数,先尝试让任务入队
        // 这里之所以使用offer(),是因为如果队列满了offer()会立即返回false
        if (taskQueue.offer(task)) {
            // do nothing,为了逻辑清晰这里留个空if
        } else {
            // 如果入队失败,说明队列满了,那就添加一个非核心线程
            if (!addWorker(task, false)) {
                // 如果添加非核心线程失败了,那就执行拒绝策略
                rejectPolicy.reject(task, this);
            }
        }
    }

    private boolean addWorker(Runnable newTask, boolean core) {
        // 自旋判断是不是真的可以创建一个线程
        for (; ; ) {
            // 正在运行的线程数
            int count = runningCount.get();
            // 核心线程还是非核心线程
            int max = core ? coreSize : maxSize;
            // 不满足创建线程的条件,直接返回false
            if (count >= max) {
                return false;
            }
            // 修改runningCount成功,可以创建线程
            if (runningCount.compareAndSet(count, count + 1)) {
                // 线程的名字
                String threadName = (core ? "core_" : "") + name + sequence.incrementAndGet();
                // 创建线程并启动
                new Thread(() -> {
                    System.out.println("thread name: " + Thread.currentThread().getName());
                    // 运行的任务【本篇文章由公众号“彤哥读源码”原创】
                    Runnable task = newTask;
                    // 不断从任务队列中取任务执行,如果取出来的任务为null,则跳出循环,线程也就结束了
                    while (task != null || (task = getTask()) != null) {
                        try {
                            // 执行任务
                            task.run();
                        } finally {
                            // 任务执行完成,置为空
                            task = null;
                        }
                    }
                }, threadName).start();

                break;
            }
        }

        return true;
    }

    private Runnable getTask() {
        try {
            // take()方法会一直阻塞直到取到任务为止
            return taskQueue.take();
        } catch (InterruptedException e) {
            // 线程中断了,返回null可以结束当前线程
            // 当前线程都要结束了,理应要把runningCount的数量减一
            runningCount.decrementAndGet();
            return null;
        }
    }

}

Интерфейс политики отклонения RejectPolicy

public interface RejectPolicy {
    void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor);
}

DiscardRejectPolicy класс реализации политики отмены

/**
 * 丢弃当前任务
 */
public class DiscardRejectPolicy implements RejectPolicy {
    @Override
    public void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor) {
        // do nothing
        System.out.println("discard one task");
    }
}

тестовый класс

public class MyThreadPoolExecutorTest {
    public static void main(String[] args) {
        Executor threadPool = new MyThreadPoolExecutor("test", 5, 10, new ArrayBlockingQueue<>(15), new DiscardRejectPolicy());
        AtomicInteger num = new AtomicInteger(0);

        for (int i = 0; i < 100; i++) {
            threadPool.execute(()->{
                try {
                    Thread.sleep(1000);
                    System.out.println("running: " + System.currentTimeMillis() + ": " + num.incrementAndGet());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

    }
}

---

Добро пожаловать, чтобы обратить внимание на мою общедоступную учетную запись «Брат Тонг читает исходный код», проверить больше статей из серии исходного кода и поплавать в океане исходного кода с братом Тонгом.

qrcode