Оригинальный адрес:у-у-у-у. xilidu.com/2018/02/09/…
смотрел недавноИскусство параллельного программирования на JavaПри рассмотрении принципа и параметров пула потоков я обнаружил проблему, если corePoolSize = 0 и очередь блокировки не ограничена. Как будет работать пул потоков?
Давайте сначала рассмотрим пул потоков, описанный в книге.execute()
Рабочая логика:
- Если текущий поток меньше, чем corePoolSize, для выполнения задачи создается новый поток.
- Если количество запущенных потоков равно или превышает размер corePoolSize, добавьте задачу в BlockingQueue.
- Если задачи в BlockingQueue превышают верхний предел, для обработки задач создается новый поток.
- Если количество потоков, созданных за одно выполнение, превышает максимальный размер пула, задача будет отклонена политикой отклонения.
После прочтения этих четырех шагов в описании действительно есть лазейка. Что, если количество основных потоков равно 0, а очередь блокировки также не ограничена? Согласно приведенной выше логике, ни один поток не должен запускаться, и тогда поток будет добавлен в очередь на неопределенный срок. Тогда что?
Так что я сделал тест, чтобы увидеть, что произойдет?
public class threadTest {
private final static ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1,0, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger();
while (true) {
executor.execute(() -> {
System.out.println(atomicInteger.getAndAdd(1));
});
}
}
}
в результатеSystem.out.println(atomicInteger.getAndAdd(1));
Выполняется оператор, противоречащий описанию выше. В итоге что случилось? Какова логика потока создания пула потоков? Давайте все же посмотрим на исходный код, чтобы увидеть, какова логика пула потоков?
ctl
Чтобы понять пул потоков, нам сначала нужно понять параметр управления состоянием ctl в пуле потоков.
- ctl пула потоков является атомарным AtomicInteger.
- Этот ctl содержит два параметра:
- workerCount Количество активированных потоков
- runState — состояние текущего пула потоков.
- Его младшие 29 бит используются для хранения текущего количества потоков, поэтому теоретическое максимальное количество потоков в пуле потоков равно 536870911; старшие 3 бита используются для представления состояния текущего пула потоков, а значения три старших бита соответствуют состоянию следующим образом:
- 111: RUNNING
- 000: SHUTDOWN
- 001: STOP
- 010: TIDYING
- 110: TERMINATED
Для использования пула потоков ctl предусмотрено три метода:
// Packing and unpacking ctl
// 获取线程池的状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取线程池的工作线程数
private static int workerCountOf(int c) { return c & CAPACITY; }
// 根据工作线程数和线程池状态获取 ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
execute
Внешний мир отправляет задачи в пул потоков через метод execute.
Сначала посмотрите на код:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//如果工作线程数小于核心线程数,
if (workerCountOf(c) < corePoolSize) {
//执行addWork,提交为核心线程,提交成功return。提交失败重新获取ctl
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果工作线程数大于核心线程数,则检查线程池状态是否是正在运行,且将新线程向阻塞队列提交。
if (isRunning(c) && workQueue.offer(command)) {
//recheck 需要再次检查,主要目的是判断加入到阻塞队里中的线程是否可以被执行
int recheck = ctl.get();
//如果线程池状态不为running,将任务从阻塞队列里面移除,启用拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果线程池的工作线程为零,则调用addWoker提交任务
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//添加非核心线程失败,拒绝
else if (!addWorker(command, false))
reject(command);
}
addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
//获取线程池状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 判断是否可以添加任务。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//获取工作线程数量
int wc = workerCountOf(c);
//是否大于线程池上限,是否大于核心线程数,或者最大线程数
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS 增加工作线程数
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//如果线程池状态改变,回到开始重新来
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
//上面的逻辑是考虑是否能够添加线程,如果可以就cas的增加工作线程数量
//下面正式启动线程
try {
//新建worker
w = new Worker(firstTask);
//获取当前线程
final Thread t = w.thread;
if (t != null) {
//获取可重入锁
final ReentrantLock mainLock = this.mainLock;
//锁住
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN ==> 线程处于RUNNING状态
// 或者线程处于SHUTDOWN状态,且firstTask == null(可能是workQueue中仍有未执行完成的任务,创建没有初始任务的worker线程执行)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 当前线程已经启动,抛出异常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//workers 是一个 HashSet 必须在 lock的情况下操作。
workers.add(w);
int s = workers.size();
//设置 largeestPoolSize 标记workAdded
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//如果添加成功,启动线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//启动线程失败,回滚。
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Первый взглядaddWork()
Два параметра, первый — это поток, который должен быть отправлен Runnable firstTask, а второй параметр — логического типа, указывающий, является ли это основным потоком.
Есть три вызова в execute()addWork()
Мы анализируем их один за другим.
- Впервые, состояние
if (workerCountOf(c) < corePoolSize)
Это понятно, количество рабочих потоков меньше количества основных потоков, и задача отправляется. такaddWorker(command, true)
. - второй раз, если
workerCountOf(recheck) == 0
Если число рабочих равно 0, тоaddWorker(null,false)
. почему это здесьnull
? Команда была отправлена в очередь блокировки доworkQueue.offer(command)
. Поэтому отправьте пустой поток и возьмите его прямо из очереди блокировки. - В третий раз, если в пуле потоков нет RUNNING или происходит сбой очереди блокировки предложений,
addWorker(command,false)
, ну понятно, соответсвие есть, очередь на блокировку заполнена, отправьте задачу в пул непрофильных потоков. Сравните с максимальным пулом потоков.
Пока что возобновлениеexecute()
Логика должна быть:
- Если текущий поток меньше, чем corePoolSize, для выполнения задачи создается новый поток.
- Если количество запущенных потоков равно или превышает размер corePoolSize, добавьте задачу в BlockingQueue.
- Если присоединение к BlockingQueue прошло успешно, необходимо дважды проверить состояние пула потоков.Если пул потоков не запущен, удалите задачу из BlockingQueue и запустите политику отклонения.
- Если пул потоков находится в состоянии выполнения, проверьте, равен ли рабочий поток 0. Если 0, для обработки задачи создается новый поток. Если количество запускаемых потоков больше максимального размера пула, задача будет отклонена политикой отклонения.
- При присоединении к BlockingQueue . Если это не удается, создается новый поток для обработки задачи.
- Если количество запускаемых потоков больше максимального размера пула, задача будет отклонена политикой отклонения.
Суммировать
Вспоминая вопросы, с которых я начал:
Если corePoolSize = 0 и очередь блокировки не ограничена. Как будет работать пул потоков?
На этот вопрос не должно быть сложно ответить.
Наконец
Искусство параллельного программирования на JavaЭто хорошая книга для изучения параллельного программирования на Java, и я рекомендую ее всем.
В то же время я надеюсь, что все будут хорошо думать при чтении технических данных, комбинировать исходный код, открывать, задавать вопросы и решать проблемы. Такое обучение может быть эффективным и тщательным.
Добро пожаловать в мой публичный аккаунт WeChat