Создание пула потоков вручную (Java)

Java

предисловие

На этот раз я реализовал простой пул потоков, в основном для последующего просмотра.ThreadPoolИсходный код подготовлен путем улучшения чужого кода и изучения чего-либо путем чтения чужого исходного кода, поэтому я специально написал эту статью, чтобы в будущем я мог просмотреть, как я научился. Конечно, я также надеюсь, что обмен этим может оказать хорошее влияние на других!

Использование пула потоков Java

Прежде чем внедрить пул резьбы самостоятельно, вы должны сначала знать, как его использовать. Потому что, познавая, как его использовать, вы можете понять написание некоторых кода. Я не буду в подробности о том, как использовать его здесь, просто нажмите Baidu или Google. Чтобы не позволить читателям тратить слишком много времени, ищите его, я искал это.статья, сказал это более ясно.

Обзор

Мы видим, что кромеThreadа такжеRunnable, остальные определяются нами самими, давайте объясним их по порядку.

Прежде чем мы приступим к анализу, поговорим о рабочем процессе пула потоков, который также удобен для всех, чтобы иметь психологическое дно, когда они будут смотреть его позже.

Пул потоков, как следует из его названия, представляет собой пул, в котором хранится несколько потоков. В компьютерных языках мы используем структуры данных для хранения потоков, а в этом пуле потоков мы используем очередь для хранения потоков для обработки задач. Поэтому, как только пул потоков запущен, в пуле потоков должно быть определенное количество потоков, поэтому нам не нужно беспокоиться о количестве потоков, просто нужно знать, что есть какие-то потоки, ожидающие выполнения. пользователю помещать задачи, выполняемые требуемыми потоками, в пул потоков Внутри пула. Затем потоки в пуле потоков будут автоматически выполнять задачи за вас.

Конечно, некоторые люди говорят, что я могу просто создать поток, когда выполняю задачу, так зачем беспокоиться. Нам нужно знать, что поток создается для задачи,

  1. Для создания потока требуется время, что влияет на скорость отклика.

  2. Системные ресурсы ограничены.Если необходимо создать десятки тысяч потоков, это будет сильно потреблять системные ресурсы и снижать стабильность системы.

На самом деле, когда задач много, некоторые потоки обрабатывают только некоторые очень легкие задачи и быстро их завершают.Если следующая задача только что пришла, а предыдущий поток только что закончил работу, то этот поток будет следующим.В этом случае ответ скорость повышается, а затем поток повторно используется для снижения потребления системных ресурсов. Не убьет ли он двух зайцев одним выстрелом?

Раньше это было просто совпадение, так что давайте немного смягчим условия. Если поток завершил выполнение задачи, не выходите первым. Вместо этого он ожидает выполнения задачи, и этот поток можно рассматривать как получившийвыполнять задачиКоманда! **就等着任务来,任务一来,我就去执行。任务执行结束,线程就等,直到下一个任务来。周而复始,直到手动关闭! **这就是线程池的本质。

Дальше возникает проблема, в пуле потоков, ожидающих выполнения задач, всего 5 потоков, а одновременно нужно выполнять 10 задач, затем выполняется 5 задач, где остальные 5? Это отброшено? Это не является первоначальным намерением нашего проекта пула потоков! Вы можете определенно думать об этом, это должна быть структура данных для хранения оставшихся потоков! (Мы используем хранилище очереди, которое тогда называлось рабочей очередью.) Поскольку время обработки задач потоками не обязательно, некоторые потоки должны быть быстрыми, а некоторые — медленными. Так что кто справится с этим первым, тот и справится с остальными задачами. Как говорится, те, кто может больше работать!

Еще один вопрос, если первые 5 потоков выполняются очень медленно, то последние 5 потоков придется ждать долго.В это время лучше сразу создавать потоки для работы.Да, про пул потоков тоже подумали когда он был разработан.Этот вопрос, мы поговорим об этом позже, когда мы проектируем, давайте сначала посмотрим здесь!

Поскольку задействована многопоточность, должна быть задействована синхронизация.Какой объект необходимо синхронизировать? Конечно, это очередь задач. Нам нужно знать, что очень вероятно, что одновременно будет много пар потоков.та же очередь задачБерем задачи и ставим задачи, поэтому чтобы добиться синхронизации используемsynchronizedСинхронизация реализации ключевого слова, то есть добавить блокировку в эту очередь задач, какой поток может получить блокировку очереди задач, какой поток может получить задачу. Поток не получал эту блокировку и т. д., если только он не был прерван или отключен.

Здесь следует отметить, чтоприостановить блокировкуа такжеожидание блокировкиразница.

  1. приостановить блокировкуОн вызывается после того, как поток получает блокировкуawaitМетод только войдет в состояние, предпосылка в том, что блокировка получена первой. После уведомления он проснется, а затем изawaitПосле выполнения кода.

  2. ожидание блокировкиЭто связано с тем, что другие потоки все еще удерживают блокировку.Если поток не получил блокировку в это время, он войдет в последовательность entrySet блокировки и подождет, пока блокировка будет снята, а затем захватит ее.

После приведенного выше объяснения мы можем в основном понять идею дизайна и принцип пула потоков и добавить немного контента ниже.

  1. Внутри пула потоков есть две структуры данных (очереди) для хранения потребностей.поток, выполняющий задачу(также называемые рабочими потоками) и задачи, которые необходимо выполнить.

  2. Поток, инициализированный пулом потоков, помещается врабочая очередьВнутри задача, которую пользователь хочет выполнить, помещается вочередь задач

  3. После того, как пользователь добавит задачу, поток рабочей очереди будет уведомлен о получении задачи!

  4. Если поток рабочей очереди пуст и очередь задач не пуста, какой поток получает блокировку и какой поток может получить задачу в очереди задач, и тогда количество задач в очереди задач равно -1.

  5. Когда много потоков пытаются получить блокировку, только один поток может ее получить.Другие потоки, которые не получили блокировку, не блокируются в ожидании, а ожидают получения блокировки!

  6. Если очередь задач пуста после получения блокировки, блокировка будет приостановлена. Если вы проснулись из-за уведомления, продолжите операцию 3 4 5 6.

Давайте сначала взглянем на блок-схему всего нашего пула потоков, чтобы вы знали, что происходит, когда будете его проектировать!

Обработать

BaseThreadPool

Сначала посмотрите на основные свойства этого класса

public class BaseThreadPool extends Thread implements ThreadPool { 
	
    /*初始化线程数*/
    private int initSize;

    /*最大工作线程数*/
    private int maxSize;

    /*核心线程数*/
    private int coreSize;

    /*当前活跃线程数*/
    private  int activityCount = 0;

    /*指定任务队列的大小数*/
    private int queueSize;

    /*创建工作线程的工厂,在构造方法由线程池规定好*/
    private ThreadFactory threadFactory;

    /*1. 任务队列,在构造方法由线程池规定好*/
    private RunnableQueue runnableQueue;

    //2. 工作队列
    private final static Queue<ThreadTask> threadQueue = new ArrayDeque<>();

    //3. 本线程池默认的拒绝策略
    private final static DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.IgnoreDenyPolicy();

    /*4. 默认的线程工厂*/
    private final static ThreadFactory DEFAULT_THREAD_FACTORY =new DefaultThreadFactory();

    /*线程池是否关闭,默认为false*/
    boolean isShutdown = false;

    private  long keepAliveTime;

    private  TimeUnit timeUnit ;

Из приведенных выше свойств мы знаем, что наш собственный класс пула потоков зависит от нескольких классов.

С последующимRunnableQueue,DenyPolicy,ThreadFactory.

И из обзорной диаграммы мы знаем,BaseThreadPoolэто то, что мы определилиThreadPoolИнтерфейс и наследует класс Thread и переопределяет метод запуска.

Логика выполнения будет проанализирована позже, здесь ее можно пропустить.

@Override
    public void run() { // BaseThreadPool
        while (!isShutdown && !isInterrupted()){
            try {
                timeUnit.sleep(keepAliveTime);
            } catch (InterruptedException e) {
               //到这里就是关闭线程池了
                isShutdown = true;
                continue;
            }
//          这里同步代码块,保证了每次访问的时候都是最新的数据!
            synchronized (this){
                if(isShutdown) break;
//                任务队列不为空,并且当前可以工作的线程小于coreCount,那么说明工作线程数不够,就先增加到maxSize
//                比如说coreSize 为20,initSize为10,maxSize 为30,
//                突然一下子来了20分线程进来,但是工作线程只有15个,由于某种原因可能那15个工作现场还没执行完,那么此时的任务队列肯定还有剩余的,发现此时线程还没到coreSize
//                那么就直接开maxSize个线程先把
                if(runnableQueue.size() > 0){
                    for (int i = runnableQueue.size(); i < maxSize; i++) {
                        newThread();
                    }
                }
//                任务队列为空,并且当前可以工作的线程数大于coreCount,工作线程数太多啦!那么就减少到coreCount
                if(runnableQueue.size() == 0 &&  activityCount > coreSize){
                    for (int i = coreSize; i < activityCount; i++) {
                        removeThread();
                    }
                }
            }
        }
    }

Давайте сначала посмотрим на метод построения BaseThreadPool.

//1 用户传入初始化线程数,最大线程数,核心线程数,和任务队列的大小即可
public BaseThreadPool(int initSize, int maxSize, int coreSize,int queueSize) {
   /*这里创建线程的工厂和拒绝策略都是用自己定义好的对象*/  this(initSize,maxSize,coreSize,queueSize,DEFAULT_THREAD_FACTORY,DEFAULT_DENY_POLICY,10,TimeUnit.SECONDS);
    }

// 2
public BaseThreadPool(int initSize, int maxSize, int coreSize, int queueSize, ThreadFactory threadFactory, DenyPolicy denyPolicy, long keepAliveTime, TimeUnit timeUnit) {
        this.initSize = initSize; //初始化线程池的初始化线程数
        this.maxSize = maxSize; // 初始化线程池可以拥有最大的线程数
        this.coreSize = coreSize; // 这个值的意义后面说
        this.threadFactory = threadFactory; //初始化创建线程池的工厂
        //自定义存放任务的队列
        this.runnableQueue = new LinkRunnableQueue(queueSize,denyPolicy,this); //RunnableQueue的实现类,自己定义
        this.keepAliveTime = keepAliveTime;
        this.timeUnit = timeUnit;
        this.init(); //初始化函数
    }

// ---init()

 public void init(){

        /*启动本线程池*/
        this.start();//BaseThreadPool 继承了 Thread,原因后面说

        /*初始化initSize个线程在线程池中*/
        for (int i = 0; i < initSize; i++) {
            newThread();
        }
    }

//  newThread()

  public void newThread(){
        /*创建工作线程,然后让工作线程等待任务到来被唤醒*/
        Woker woker = new Woker(runnableQueue);
        Thread thread = threadFactory.createThread(woker);

        /*将线程和任务包装在一起*/
        ThreadTask threadTask = new ThreadTask(thread,woker);
        threadQueue.offer(threadTask);
        this.activityCount++;
        /*启动刚才新建的线程*/
        thread.start();
    }


// 再看看DefaultThreadFactory,就是
/*工厂创建一个新的线程*/
public class DefaultThreadFactory implements ThreadFactory {

    private static final AtomicInteger GROUP_COUNTER  = new AtomicInteger(0); //线程组号
    //计数
    private static  AtomicInteger COUNTER = new AtomicInteger(1);
    private static final ThreadGroup group  = new ThreadGroup("MyThreadPool-" + GROUP_COUNTER.getAndIncrement());

    @Override
    public Thread createThread(Runnable runnable) {
        return new Thread(group,runnable,"threadPool-" + COUNTER.getAndIncrement());
    }
}

Вот как мы можемnew Thread(new Runnable(){....}).startСоздайте и запустите тему. это позвонитьThreadнужно пройти вRunnableсоздание экземпляра конструктораThreadкласс, переопределяяRunnableвнутриrunМетод может указать, что нужно делать по теме при запуске.

Мы виделиDefaultThreadFactoryСуществует только один способ создать поток — указать задачи, которые необходимо выполнить после запуска потока, и переименовать поток, то есть использовать метод, описанный выше. Так что перейти к нужно перейти кcreateThreadметод реализацииRunnableтип. И этот классWoker

Давайте посмотрим на код Вокера

//------------Woker BaseThreadPool依赖的类

/*工作线程的任务*/
public class Woker implements Runnable{
    /*任务队列,方便后面取出任务*/
    private RunnableQueue runnableQueue;

    /*方便判断该内部任务对应的线程是否运行,确保可见性!*/
    private volatile boolean running = true;

    public Woker(RunnableQueue runnableQueue) {
        this.runnableQueue = runnableQueue;
    }

    @Override
    public void run() {
        /*当前对应的线程正在运行并且没有被中断*/
        while (running && !Thread.currentThread().isInterrupted()){
            //调用take的时候,如果任务队列没任务就会阻塞在这,直到拿到任务
            Runnable task = runnableQueue.take();
            task.run();
        }
    }

    public void stop(){
        running = false;
    }

}

Мы виделиrunМетод, эта задача собираетсяочередь задачВозьмите задание внутрь и выполните его. Пока текущая работа не будет остановлена ​​или текущий поток не будет прерван. и этоочередь задачЭто объект, который мы указали при вызове конструктора, то есть этот код

this.runnableQueue = new LinkRunnableQueue(queueSize,denyPolicy,this);

см. далееLinkRunnableQueueкак это достигается

public class LinkRunnableQueue implements RunnableQueue{//BaseThreadPool依赖的类

    //指定任务队列的大小
    private int limit;

    //也是使用BaseThreadPool传进来的默认拒绝策略
    private DenyPolicy denyPolicy;

    //这里传进BaseThreadPool实例
    private ThreadPool threadPool;

   	//这个就是真正存储Runnable实例对象的数据结构!也就是一个链表
    private LinkedList<Runnable> queue = new LinkedList<>();

    //构造函数,也就是初始化这个类的属性
    public LinkRunnableQueue(int queueSize,DenyPolicy denyPolicy,ThreadPool pool) {
        this.limit = queueSize;
        this.denyPolicy = denyPolicy;
        this.threadPool = pool;
    }

    //任务队列添加任务,这个方法一般由线程池的execute方法调用
    @Override
    public void offer(Runnable runnable) {
        //因为任务队列只有一个,可能会有多个线程同时操作任务队列,所以要考虑同步问题
        //取得queue的锁才能加入任务,拿不到所就进入queue的entrySet
        synchronized (queue){
        if(queue.size() > limit){
            //如果此时任务队列超过限制的值,那么就拒绝!
            denyPolicy.reject(runnable,threadPool);
        }else{
            //把任务加入到任务队列呗
            queue.addLast(runnable);
            //唤醒等待的线程,这些线程在queue的waitSet里面,要结合take方法
            queue.notifyAll();
        }
    }
}

   	//线程从任务队列里面拿任务,如果拿不到就会阻塞,直到有任务来并且抢到
    @Override
    public Runnable take() {
        //这里之前也说过了,要先拿到锁才能拿任务
        synchronized (queue){
            //如果任务队列为空,那么肯定拿不了,所以就等待呗
            while (queue.size() == 0){
                try {
                    //这个线程在这里就等待让出锁,直到执行offer方法从而被唤醒,然后
                    //再重新抢到锁,这里是个循环,如果被唤醒后,也抢到锁了,但是队列
                    //还是空的话,继续等待
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //到这里执行这个方法的线程就是抢到锁了,然后得到任务啦!
            return queue.removeFirst();
        }

    }

    //返回调用该方法时任务队列有多少个任务在等待
    @Override
    public int size() {
       synchronized (queue){
           return queue.size();
       }
    }
}

Комментарии кода были объяснены очень четко, здесь в основном для того, чтобы понять, почему в РаботеRunnable task = runnableQueue.take()Нет задачи заблокировать ожидание, суть в том

1 После получения блокировки объекта очереди в очереди задач нет задач, снимите блокировку объекта, который фактически хранит задачу, и войдите в очередь ожидания объекта, чтобы дождаться пробуждения.

2 Конечно, если блокировка не получена, он будет ждать получения блокировки, и тогда будет то же самое, что и 1.

Если вы не понимаете, что вы здесь видите, вы можете вернуться назад и ознакомиться с базовыми знаниями о потоках Java иsynchronizedПодробное объяснение, чтобы знания можно было лучше соединить последовательно!

Далее мы смотрим нарабочая очередьна что это похоже.

ThreadTask — это внутренний класс BaseThreadPool.

//把工作线程和内部任务绑定在一起
    class ThreadTask{
        Thread thread;
        Woker woker;
        public ThreadTask(Thread thread, Woker woker) {
            this.thread = thread;
            this.woker = woker;
        }
    }

Из приведенного выше кода мы знаем, что ThreadTask предназначен для совместной инкапсуляции рабочего потока и задач рабочего потока.пул потоков закрытКогда задача, которую должен выполнить поток, может быть остановлена!

Операция отключения пула потоков,BaseThreadPoolметод класса

  /*shutdown 就要把 Woker 给停止 和 对应的线程给中断*/
    @Override
    public void shutDown() {
        synchronized (this){
            if(isShutDown())
                return;
            //设置标志位,让线程池线程也执行完run方法,然后退出线程。
            isShutdown = true;
            /*全部线程停止工作*/
            for (ThreadTask task: threadQueue
                 ) {
                //1 这里就是把Woker实例对象的running置为false
                task.woker.stop();
                //2 中断执行对应任务的线程
                task.thread.interrupt();
            }
        }
    }

Вы можете видеть, что закрытие пула потоков означает обход очереди, в которой хранятся рабочие потоки.1 и 2 являются условиями цикла while, которые уничтожают объект Woker., так что объект WorkerrunВыполнение метода завершается. (Здесь вы можете посмотреть класс WokerrunМетод поймет, что я сказал)

Мы сказали в начале,BaseThreadPoolКогда он начинается, это на самом деле поток, в егоinitметод называетсяstartметод выполненияrunЛогика внутри, код запуска мы смотрели раньше, но не анализировали его, давайте разберем сейчас

@Override 
    public void run() { //BaseThreadPool类的方法
        //还记得shutDown()方法里面的 isShutdown = true语句吗?
        //作用就是为了让这里下一次判断while循环的时候退出,然后执行完run啦!
        while (!isShutdown && !isInterrupted()){
            try {
                timeUnit.sleep(keepAliveTime);
            } catch (InterruptedException e) {
                //如果线程池这个线程被中断
                //到这里就是关闭线程池了,也是把isShutdown设置为我true!
                isShutdown = true;
                continue;
            }
//          这里同步代码块,保证了每次访问的时候都是最新的数据!
            synchronized (this){
                if(isShutdown) break;
				//任务队列不为空,并且当前可以工作的线程小于coreCount,那么说明工作				   //线程数不够,就先增加到maxSize.
				//比如说coreSize 为20,initSize为10,maxSize 为30,
				//突然一下子来了20分线程进来,但是工作线程只有15个,由于某种原因可能
                //那15个工作现场还没执行完,那么此时的任务队列肯定还有剩余的,发现此
                //时线程还没到coreSize
				//那么就直接开maxSize个线程先把
                //如果发现现在工作的的线程已经过了coreSize就先不增加线程数啦
                if(runnableQueue.size() > 0 && activityCount < coreSize){
                    for (int i = runnableQueue.size(); i < maxSize; i++) {
                        newThread();
                    }
                }
//                任务队列为空,并且当前可以工作的线程数大于coreCount,工作线程数太多啦!那么就减少到coreCount基本大小把
                if(runnableQueue.size() == 0 &&  activityCount > coreSize){
                    for (int i = coreSize; i < activityCount; i++) {
                        removeThread();
                    }
                }
            }
        }
    }


//----------removeThread()
//   线程池中去掉某个工作线程,这里的操作是不是很类似shutDown的内容
    public void removeThread(){
        this.activityCount--;
        ThreadTask task = threadQueue.remove();
        task.woker.stop();//就是破坏Woker对象的while循环的条件
    }

Приведенные выше заметки ясно объясняют это, если вам что-то непонятно, прочитайте еще несколько статей, просто смоделируйте идею сами!

существуетrunметод, важная часть связана с динамическим изменением количества потоков в пуле потоков.

coreSize: базовый размер пула потоков, эквивалент разделительной линии.

initSize: Размер инициализации пула потоков, что тут сказать по этому поводу

ActivityCount: текущее количество рабочих потоков.

maxSIze: максимальное количество потоков в пуле потоков

говорить об отношениях между ними

Когда очередь задач не пуста

  1. Когда activityCount

  2. Когда activityCount >= coreSize, это означает, что количество рабочих потоков в текущем пуле потоков достигло базового размера, если есть задача, нужно подождать!

Примечание: механизм расширения здесь представляет собой простое расширение. Пул потоков, реализованный в Java, не расширяется, как я уже сказал, что решает проблему в начале. Конкретное время - анализ исходного кода. Вот простая реализация!

тестовое задание

тестовый код

package blogDemo.ThreadDemo;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Test {
    public static void main(String[] args) {
        ThreadPool threadPool = new BaseThreadPool(4,30,6,30);
        for (int i = 0; i < 20; i++) {
            threadPool.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " is running and done.");
            });
        }

    }
}

Результаты теста

код проекта

GitHub.com/J IE Mingli/T…

Суммировать

Эта статья написана здесь. Когда вы читаете статью, вы можете читать код, читая объяснение, что облегчит понимание. Я надеюсь, что это поможет читателям понять собственный пул потоков java. В следующей статье будет проанализирована сама java , Исходный код пула потоков с ним!