Обзор
В мире JAVA, если вы хотите выполнять некоторые задачи параллельно, вы можете использовать ThreadPoolExecutor. В большинстве случаев прямое использование ThreadPoolExecutor может удовлетворить требования, но в некоторых сценариях, таких как мгновенный большой трафик, чтобы улучшить отклик и пропускную способность, лучше расширить ThreadPoolExecutor.
ИТ-специалисты JAVA во всей вселенной должны знать процесс выполнения ThreadPoolExecutor:
- Если основной поток все еще может с этим справиться, то константа создать новую ветку;
- Если основной поток не справляется, задача ставится в очередь;
- очередь полная( Это означает, что задача вставки не удалась), затем начните создавать MAX потоков.После того, как количество потоков достигает MAX, очередь все еще заполнена, и выдается исключение RejectedExecutionException.
В этом процессе выполнения есть небольшая проблема, то есть, когда основной поток не может обработать запрос, он Добавляйте задачу в очередь сразу, если очередь очень длинная и задач много, будут частые задачи ставятся в очередь и Операция удаления задачи из очереди.
Согласно фактическому измерению давления, эта операция также имеет определенный расход. По сути, очередь SynchronousQueue, предоставляемая JAVA, представляет собой очередь нулевой длины.Задачи напрямую передаются производителем потребителю, и процесс входа в очередь отсутствует.Видно, что разработчики JAVA API также считается стоимость входа в очередь.
Кроме того, когда задач слишком много, они сразу выбрасываются в очередь, а MAX-поток не работает.Если задач в очереди слишком много, занят только бедный основной поток, что также повлияет на производительность. .
Когда основной поток не может справиться с запросом, можно ли его отложить? Как насчет операции входа в очередь? Пусть поток MAX запустится как можно скорее, чтобы помочь обработать задачу.
То есть, когда основной поток не может справиться с запросом, если количество потоков в текущем пуле потоков все еще меньше, чем количество потоков MAX, продолжайте создавать новые задачи обработки потоков и вставляйте задачу до тех пор, пока количество потоков достигает MAX. в очереди
Мы делаем это, переопределяя метод предложения очереди.
@Override
public boolean offer(Runnable o) {
int currentPoolThreadSize = executor.getPoolSize();
//如果线程池里的线程数量已经到达最大,将任务添加到队列中
if (currentPoolThreadSize == executor.getMaximumPoolSize()) {
return super.offer(o);
}
//说明有空闲的线程,这个时候无需创建core线程之外的线程,而是把任务直接丢到队列里即可
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
return super.offer(o);
}
//如果线程池里的线程数量还没有到达最大,直接创建线程,而不是把任务丢到队列里面
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false;
}
return super.offer(o);
}
Обратите внимание, что
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
return super.offer(o);
}
Это значит, что основной поток еще может ее обработать, а есть незанятые потоки, и задача ставится в очередь. Как определить, есть ли простаивающие потоки в пуле потоков? Его можно реализовать с помощью счетчика, который увеличивается на 1 при каждом выполнении метода execute и уменьшается на 1 при выполнении afterExecute.
@Override
public void execute(Runnable command) {
submittedTaskCount.incrementAndGet();
//代码未完整,待补充。。。。。
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
submittedTaskCount.decrementAndGet();
}
Таким образом, когда
executor.getSubmittedTaskCount() < currentPoolThreadSize
Когда есть незадействованный поток.
полный код
Класс EnhancedThreadPoolExecutor
package executer;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class EnhancedThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
/**
* 计数器,用于表示已经提交到队列里面的task的数量,这里task特指还未完成的task。
* 当task执行完后,submittedTaskCount会减1的。
*/
private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
public EnhancedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, TaskQueue workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new ThreadPoolExecutor.AbortPolicy());
workQueue.setExecutor(this);
}
/**
* 覆盖父类的afterExecute方法,当task执行完成后,将计数器减1
*/
@Override
protected void afterExecute(Runnable r, Throwable t) {
submittedTaskCount.decrementAndGet();
}
public int getSubmittedTaskCount() {
return submittedTaskCount.get();
}
/**
* 覆盖父类的execute方法,在任务开始执行之前,计数器加1。
*/
@Override
public void execute(Runnable command) {
submittedTaskCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
//当发生RejectedExecutionException,尝试再次将task丢到队列里面,如果还是发生RejectedExecutionException,则直接抛出异常。
BlockingQueue<Runnable> taskQueue = super.getQueue();
if (taskQueue instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue)taskQueue;
if (!queue.forceTaskIntoQueue(command)) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException("队列已满");
}
} else {
submittedTaskCount.decrementAndGet();
throw rx;
}
}
}
}
TaskQueue
package executer;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
public class TaskQueue extends LinkedBlockingQueue<Runnable> {
private EnhancedThreadPoolExecutor executor;
public TaskQueue(int capacity) {
super(capacity);
}
public void setExecutor(EnhancedThreadPoolExecutor exec) {
executor = exec;
}
public boolean forceTaskIntoQueue(Runnable o) {
if (executor.isShutdown()) {
throw new RejectedExecutionException("Executor已经关闭了,不能将task添加到队列里面");
}
return super.offer(o);
}
@Override
public boolean offer(Runnable o) {
int currentPoolThreadSize = executor.getPoolSize();
//如果线程池里的线程数量已经到达最大,将任务添加到队列中
if (currentPoolThreadSize == executor.getMaximumPoolSize()) {
return super.offer(o);
}
//说明有空闲的线程,这个时候无需创建core线程之外的线程,而是把任务直接丢到队列里即可
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
return super.offer(o);
}
//如果线程池里的线程数量还没有到达最大,直接创建线程,而不是把任务丢到队列里面
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false;
}
return super.offer(o);
}
}
TestExecuter
package executer;
import java.util.concurrent.TimeUnit;
public class TestExecuter {
private static final int CORE_SIZE = 5;
private static final int MAX_SIZE = 10;
private static final long KEEP_ALIVE_TIME = 30;
private static final int QUEUE_SIZE = 5;
static EnhancedThreadPoolExecutor executor = new EnhancedThreadPoolExecutor(CORE_SIZE,MAX_SIZE,KEEP_ALIVE_TIME, TimeUnit.SECONDS , new TaskQueue(QUEUE_SIZE));
public static void main(String[] args){
for (int i = 0; i < 15; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
Thread.currentThread().sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
System.out.println("线程池中现在的线程数目是:"+executor.getPoolSize()+", 队列中正在等待执行的任务数量为:"+ executor.getQueue().size());
}
}
}
Сначала запустите код, чтобы увидеть, ожидается ли он. Непосредственно выполните основной метод в классе TestExecuter, и результаты будут следующими:
线程池中现在的线程数目是:1, 队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:2, 队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:3, 队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:4, 队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:5, 队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:6, 队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:7, 队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:8, 队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:9, 队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:10, 队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:10, 队列中正在等待执行的任务数量为:1
线程池中现在的线程数目是:10, 队列中正在等待执行的任务数量为:2
线程池中现在的线程数目是:10, 队列中正在等待执行的任务数量为:3
线程池中现在的线程数目是:10, 队列中正在等待执行的任务数量为:4
线程池中现在的线程数目是:10, 队列中正在等待执行的任务数量为:5
Видно, что при увеличении количества потоков до количества ядер задач в очереди нет. Пока количество потоков не увеличится до числа MAX, равного 10, в очереди есть задачи. В соответствии с нашими ожиданиями.
Если закомментировать метод offer в классе TaskQueue, то есть метод offer, не переопределяющий очередь, то результаты будут следующими:
线程池中现在的线程数目是:1, 队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:2, 队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:3, 队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:4, 队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:5, 队列中正在等待执行的任务数量为:0
线程池中现在的线程数目是:5, 队列中正在等待执行的任务数量为:1
线程池中现在的线程数目是:5, 队列中正在等待执行的任务数量为:2
线程池中现在的线程数目是:5, 队列中正在等待执行的任务数量为:3
线程池中现在的线程数目是:5, 队列中正在等待执行的任务数量为:4
线程池中现在的线程数目是:5, 队列中正在等待执行的任务数量为:5
线程池中现在的线程数目是:6, 队列中正在等待执行的任务数量为:5
线程池中现在的线程数目是:7, 队列中正在等待执行的任务数量为:5
线程池中现在的线程数目是:8, 队列中正在等待执行的任务数量为:5
线程池中现在的线程数目是:9, 队列中正在等待执行的任务数量为:5
线程池中现在的线程数目是:10, 队列中正在等待执行的任务数量为:5
Видно, что при увеличении количества потоков до количества ядер задачи в очереди уже есть.
Думай дальше
Что делать при использовании ThreadPoolExecutor, если возникает исключение RejectedExecutionException? Код в этой статье предназначен для повторной вставки задачи в очередь, и если она по-прежнему не выполняется, сразу создается исключение отклонения.
@Override
public void execute(Runnable command) {
submittedTaskCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
//当发生RejectedExecutionException,尝试再次将task丢到队列里面,如果还是发生RejectedExecutionException,则直接抛出异常。
BlockingQueue<Runnable> taskQueue = super.getQueue();
if (taskQueue instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue)taskQueue;
if (!queue.forceTaskIntoQueue(command)) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException("队列已满");
}
} else {
submittedTaskCount.decrementAndGet();
throw rx;
}
}
}
Класс TaskQueue предоставляет метод forceTaskIntoQueue для вставки задач в очередь.
Существует еще одно решение, заключающееся в использовании другого пула потоков для выполнения задач, когда первый пул потоков создает исключение Reject, перехватывает его и использует второй пул потоков для обработки задачи.