Способ расширения ThreadPoolExecutor

Java задняя часть API

Обзор


В мире JAVA, если вы хотите выполнять некоторые задачи параллельно, вы можете использовать ThreadPoolExecutor. В большинстве случаев прямое использование ThreadPoolExecutor может удовлетворить требования, но в некоторых сценариях, таких как мгновенный большой трафик, чтобы улучшить отклик и пропускную способность, лучше расширить ThreadPoolExecutor.

ИТ-специалисты JAVA во всей вселенной должны знать процесс выполнения ThreadPoolExecutor:

  • Если основной поток все еще может с этим справиться, то константа создать новую ветку;
  • Если основной поток не справляется, задача ставится в очередь;
  • очередь полная( Это означает, что задача вставки не удалась), затем начните создавать MAX потоков.После того, как количество потоков достигает MAX, очередь все еще заполнена, и выдается исключение RejectedExecutionException.

В этом процессе выполнения есть небольшая проблема, то есть, когда основной поток не может обработать запрос, он Добавляйте задачу в очередь сразу, если очередь очень длинная и задач много, будут частые задачи ставятся в очередь и Операция удаления задачи из очереди.

Согласно фактическому измерению давления, эта операция также имеет определенный расход. По сути, очередь SynchronousQueue, предоставляемая JAVA, представляет собой очередь нулевой длины.Задачи напрямую передаются производителем потребителю, и процесс входа в очередь отсутствует.Видно, что разработчики JAVA API также считается стоимость входа в очередь.

Кроме того, когда задач слишком много, они сразу выбрасываются в очередь, а MAX-поток не работает.Если задач в очереди слишком много, занят только бедный основной поток, что также повлияет на производительность. .

Когда основной поток не может справиться с запросом, можно ли его отложить? Как насчет операции входа в очередь? Пусть поток MAX запустится как можно скорее, чтобы помочь обработать задачу.

То есть, когда основной поток не может справиться с запросом, если количество потоков в текущем пуле потоков все еще меньше, чем количество потоков MAX, продолжайте создавать новые задачи обработки потоков и вставляйте задачу до тех пор, пока количество потоков достигает MAX. в очереди

Мы делаем это, переопределяя метод предложения очереди.

  @Override
public  boolean offer(Runnable o) {
    int currentPoolThreadSize = executor.getPoolSize();
    //如果线程池里的线程数量已经到达最大,将任务添加到队列中
    if (currentPoolThreadSize == executor.getMaximumPoolSize()) {
        return super.offer(o);
    }
    //说明有空闲的线程,这个时候无需创建core线程之外的线程,而是把任务直接丢到队列里即可
    if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
        return super.offer(o);
    }

    //如果线程池里的线程数量还没有到达最大,直接创建线程,而不是把任务丢到队列里面
    if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
        return false;
    }

    return super.offer(o);
}

Обратите внимание, что

if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
        return super.offer(o);
}

Это значит, что основной поток еще может ее обработать, а есть незанятые потоки, и задача ставится в очередь. Как определить, есть ли простаивающие потоки в пуле потоков? Его можно реализовать с помощью счетчика, который увеличивается на 1 при каждом выполнении метода execute и уменьшается на 1 при выполнении afterExecute.

    @Override
    public void execute(Runnable command) {
        submittedTaskCount.incrementAndGet();
        //代码未完整,待补充。。。。。
    }
 @Override
    protected void afterExecute(Runnable r, Throwable t) {
        submittedTaskCount.decrementAndGet();
    }

Таким образом, когда

executor.getSubmittedTaskCount() < currentPoolThreadSize

Когда есть незадействованный поток.


полный код


Класс EnhancedThreadPoolExecutor


package executer;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class EnhancedThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {

    /**
     * 计数器,用于表示已经提交到队列里面的task的数量,这里task特指还未完成的task。
     * 当task执行完后,submittedTaskCount会减1的。
     */
    private final AtomicInteger submittedTaskCount = new AtomicInteger(0);

    public EnhancedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, TaskQueue workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new ThreadPoolExecutor.AbortPolicy());
        workQueue.setExecutor(this);
    }

    /**
     * 覆盖父类的afterExecute方法,当task执行完成后,将计数器减1
     */
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        submittedTaskCount.decrementAndGet();
    }


    public int getSubmittedTaskCount() {
        return submittedTaskCount.get();
    }


    /**
     * 覆盖父类的execute方法,在任务开始执行之前,计数器加1。
     */
    @Override
    public void execute(Runnable command) {
        submittedTaskCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            //当发生RejectedExecutionException,尝试再次将task丢到队列里面,如果还是发生RejectedExecutionException,则直接抛出异常。
            BlockingQueue<Runnable> taskQueue = super.getQueue();
            if (taskQueue instanceof TaskQueue) {
                final TaskQueue queue = (TaskQueue)taskQueue;
                if (!queue.forceTaskIntoQueue(command)) {
                    submittedTaskCount.decrementAndGet();
                    throw new RejectedExecutionException("队列已满");
                }
            } else {
                submittedTaskCount.decrementAndGet();
                throw rx;
            }
        }
    }
}

TaskQueue

package executer;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;

public class TaskQueue extends LinkedBlockingQueue<Runnable> {
    private EnhancedThreadPoolExecutor executor;

    public TaskQueue(int capacity) {
        super(capacity);
    }

    public void setExecutor(EnhancedThreadPoolExecutor exec) {
        executor = exec;
    }

    public boolean forceTaskIntoQueue(Runnable o) {
        if (executor.isShutdown()) {
            throw new RejectedExecutionException("Executor已经关闭了,不能将task添加到队列里面");
        }
        return super.offer(o);
    }

    @Override
    public  boolean offer(Runnable o) {
        int currentPoolThreadSize = executor.getPoolSize();
        //如果线程池里的线程数量已经到达最大,将任务添加到队列中
        if (currentPoolThreadSize == executor.getMaximumPoolSize()) {
            return super.offer(o);
        }
        //说明有空闲的线程,这个时候无需创建core线程之外的线程,而是把任务直接丢到队列里即可
        if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
            return super.offer(o);
        }

        //如果线程池里的线程数量还没有到达最大,直接创建线程,而不是把任务丢到队列里面
        if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
            return false;
        }

        return super.offer(o);
    }
}

TestExecuter

package executer;

import java.util.concurrent.TimeUnit;

public class TestExecuter {
    private static final int CORE_SIZE = 5;

    private static final int MAX_SIZE = 10;

    private static final long KEEP_ALIVE_TIME = 30;

    private static final int QUEUE_SIZE = 5;

    static EnhancedThreadPoolExecutor executor = new EnhancedThreadPoolExecutor(CORE_SIZE,MAX_SIZE,KEEP_ALIVE_TIME, TimeUnit.SECONDS , new TaskQueue(QUEUE_SIZE));

    public static void main(String[] args){
        for (int i = 0; i < 15; i++) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.currentThread().sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });

            System.out.println("线程池中现在的线程数目是:"+executor.getPoolSize()+",  队列中正在等待执行的任务数量为:"+ executor.getQueue().size());
        }
    }
}

Сначала запустите код, чтобы увидеть, ожидается ли он. Непосредственно выполните основной метод в классе TestExecuter, и результаты будут следующими:

线程池中现在的线程数目是:1,  队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:2,  队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:3,  队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:4,  队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:5,  队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:6,  队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:7,  队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:8,  队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:9,  队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:10,  队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:10,  队列中正在等待执行的任务数量为:1
线程池中现在的线程数目是:10,  队列中正在等待执行的任务数量为:2
线程池中现在的线程数目是:10,  队列中正在等待执行的任务数量为:3
线程池中现在的线程数目是:10,  队列中正在等待执行的任务数量为:4
线程池中现在的线程数目是:10,  队列中正在等待执行的任务数量为:5

Видно, что при увеличении количества потоков до количества ядер задач в очереди нет. Пока количество потоков не увеличится до числа MAX, равного 10, в очереди есть задачи. В соответствии с нашими ожиданиями.

Если закомментировать метод offer в классе TaskQueue, то есть метод offer, не переопределяющий очередь, то результаты будут следующими:

线程池中现在的线程数目是:1,  队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:2,  队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:3,  队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:4,  队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:5,  队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:5,  队列中正在等待执行的任务数量为:1
线程池中现在的线程数目是:5,  队列中正在等待执行的任务数量为:2
线程池中现在的线程数目是:5,  队列中正在等待执行的任务数量为:3
线程池中现在的线程数目是:5,  队列中正在等待执行的任务数量为:4
线程池中现在的线程数目是:5,  队列中正在等待执行的任务数量为:5
线程池中现在的线程数目是:6,  队列中正在等待执行的任务数量为:5
线程池中现在的线程数目是:7,  队列中正在等待执行的任务数量为:5
线程池中现在的线程数目是:8,  队列中正在等待执行的任务数量为:5
线程池中现在的线程数目是:9,  队列中正在等待执行的任务数量为:5
线程池中现在的线程数目是:10,  队列中正在等待执行的任务数量为:5

Видно, что при увеличении количества потоков до количества ядер задачи в очереди уже есть.


Думай дальше


Что делать при использовании ThreadPoolExecutor, если возникает исключение RejectedExecutionException? Код в этой статье предназначен для повторной вставки задачи в очередь, и если она по-прежнему не выполняется, сразу создается исключение отклонения.

 @Override
    public void execute(Runnable command) {
        submittedTaskCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            //当发生RejectedExecutionException,尝试再次将task丢到队列里面,如果还是发生RejectedExecutionException,则直接抛出异常。
            BlockingQueue<Runnable> taskQueue = super.getQueue();
            if (taskQueue instanceof TaskQueue) {
                final TaskQueue queue = (TaskQueue)taskQueue;
                if (!queue.forceTaskIntoQueue(command)) {
                    submittedTaskCount.decrementAndGet();
                    throw new RejectedExecutionException("队列已满");
                }
            } else {
                submittedTaskCount.decrementAndGet();
                throw rx;
            }
        }
    }

Класс TaskQueue предоставляет метод forceTaskIntoQueue для вставки задач в очередь.

Существует еще одно решение, заключающееся в использовании другого пула потоков для выполнения задач, когда первый пул потоков создает исключение Reject, перехватывает его и использует второй пул потоков для обработки задачи.