Обзор знаний
Мы много говорили об инструментах параллелизма.«Цель» этих инструментов — позволить нам сосредоточиться только на самой задаче и игнорировать детали взаимодействия между потоками, что упрощает параллельное программирование и значительно повышает безопасность. Хотя «цель» класса инструментов для пользователя одна и та же, каждый класс инструментов имеет свои собственные уникальные сценарии применения, такие как:
- Я бы создавал потоки вручную, зачем использовать пул потоков?Вводит использование потоков управления пулом потоков для разбиения большой задачи на несколько подзадач для простого выполнения с помощьюЯ не умею пользоваться Java Future, подозреваю, что вы быстрее меня заварите чай, а это супер длинная картинка и текст! !Функция Future получает результат выполнения подзадачи —Комбинация этих двух может обрабатывать простые параллельные задачи
- Получите CompletableFuture, параллельное асинхронное программирование и в чем разница между написанием последовательных программ?С помощью CompletableFuture значительно снижается сложность асинхронного программирования —Организовывать задачи, используя последовательное мышление (агрегация И или ИЛИ)
- Поскольку ExecutorService рождается, почему рождается CompletionService?В связи с последовательным выполнением задач, во избежание ожидания блокировки——CompletionService — лучший выбор для пакетных параллельных задач.
Визуализируйте приведенные выше три общих сценария:
Объединяя приведенную выше картинку, я полагаю, что в вашем уме возникли конкретные методы реализации этих классов инструментов, и я чувствую, что это охватывает все параллельные сценарии.
TYTS, подпотоки в вышеуказанных методах не будут продолжать разбиваться на «под-под-» задачи после получения задач, то есть, даже если под-потоки получают большие или сложные задачи, они должны кусать пулю и усердно работать над их выполнением. Очевидно, что эта большая задача является сутью дела
Было бы неплохо, если бы большую задачу можно было разделить на более мелкие подзадачи, пока подзадачи не станут достаточно простыми, чтобы их можно было решать напрямую., это идея разделяй и властвуй
разделяй и властвуй
В компьютерных науках принцип «разделяй и властвуй» является важным алгоритмом. Буквальное толкование — «разделяй и властвуй», то есть решение сложной проблемы.разделен наДва или более того же или похожеподзадача, а затем разделите подзадачу на более мелкие подзадачи... До тех пор, пока последнюю подзадачу нельзя будет просто решить напрямую, решение исходной проблемы становится решением подзадачисливаться.
Эта методика лежит в основе многих эффективных алгоритмов, таких как алгоритмы сортировки (быстрая сортировка, сортировка слиянием), преобразование Фурье (быстрое преобразование Фурье). Если вы занимаетесь большими данными, MapReduce — типичная ветвь мышления, если вы хочу быть более подробным Чтобы понять алгоритм, связанный с разделяй и властвуй, пожалуйста, обратитесь к этомуАлгоритм и идеология декоргенации текстовой диаграммы
В сочетании с приведенным выше описанием я полагаю, что вы уже построили в своем уме модель «разделяй и властвуй»:
Можно ли решить все большие задачи методом «разделяй и властвуй»? Очевидно нет
Ситуации, в которых применим разделяй и властвуй
Вообще говоря, проблемы, которые можно решить методом «разделяй и властвуй», обычно имеют следующие характеристики:
-
Проблема может быть легко решена путем уменьшения размера проблемы до определенного уровня.
-
Задача может быть декомпозирована на несколько одинаковых задач меньшего масштаба, то есть задача обладает свойством оптимальной подструктуры.
-
Решения подзадач, разложенных на задачу, могут быть объединены в решение задачи;
-
Подзадачи, разлагаемые этой проблемой, независимы друг от друга, то есть подзадачи не содержат общих подподзадач.
Поняв основную идею алгоритма «разделяй и властвуй», давайте посмотрим, как Java использует идею «разделяй и властвуй» для разделения и объединения задач.
ForkJoin
Если есть подзадачи, естественно использовать многопоточность. Мы давно сказали,Потоки, выполняющие подзадачи, не могут создаваться по отдельности и управляются пулом потоков.. Придерживаясь той же концепции дизайна в сочетании с алгоритмом «разделяй и властвуй», ForkJoinPool и ForkJoinTask появляются в рамках ForkJoin. Как говорится:
Небо к земле, дождь к ветру. Континент против неба. Горные цветы на фоне морского дерева, а красное изречение на фоне неба.
Применяя существующие знания, простое понимание выглядит так:
Мы уже говорили бесчисленное количество раз, что JDK не будет изобретать велосипед. Мы говорим о сходстве здесь, чтобы дать всем простое и интуитивно понятное впечатление. Внутри должны быть некоторые различия. Давайте в общих чертах посмотрим на эти два класса:
ForkJoinTask
Это снова этот человек,Doug Lea
, почему так хорошо (сломанным тоном)
/**
* Abstract base class for tasks that run within a {@link ForkJoinPool}.
* A {@code ForkJoinTask} is a thread-like entity that is much
* lighter weight than a normal thread. Huge numbers of tasks and
* subtasks may be hosted by a small number of actual threads in a
* ForkJoinPool, at the price of some usage limitations.
*
* @since 1.7
* @author Doug Lea
*/
public abstract class ForkJoinTask<V> implements Future<V>, Serializable
можно увидетьForkJoinTask
ДостигнутоFuture
Интерфейс (то есть со свойствами интерфейса Future) также оправдывает свое название,fork()
а такжеjoin()
природа — это два ее основных метода
-
fork()
: выполнить подзадачу асинхронно (разделить, как указано выше) -
join()
: Заблокировать текущий поток и дождаться результата выполнения подзадачи (упомянутого выше слияния)
Кроме того, как видно из приведенного выше кода,ForkJoinTask
является абстрактным классом, в модели «разделяй и властвуй» он также имеет два абстрактных подклассаRecursiveAction
а такжеRecursiveTask
Так в чем же разница между этими двумя субабстрактными классами? Если вы откроете IDE, вы сможете увидеть разницу с первого взгляда, так просто
public abstract class RecursiveAction extends ForkJoinTask<Void>{
...
/**
* The main computation performed by this task.
*/
protected abstract void compute();
...
}
public abstract class RecursiveTask<V> extends ForkJoinTask<V>{
...
protected abstract void compute();
...
}
Оба класса определяютАннотацияметодcompute()
, подклассы необходимо переписать для реализации определенной логики
Какой логике должен следовать подкласс, чтобы переопределить этот метод?
Следуя идее «разделяй и властвуй», логика переписывания очень проста, то есть ответить на три вопроса:
- Когда дальше разделить задачу?
- Когда выполняется минимальная исполняемая задача, т. е. больше нет разбиений?
- Когда агрегировать результаты подзадачи
Использование «псевдокода» для перевода абзаца выше, вероятно, будет таким:
if(任务小到不用继续拆分){
直接计算得到结果
}else{
拆分子任务
调用子任务的fork()进行计算
调用子任务的join()合并计算结果
}
(Как программист, если вы написали рекурсивные операции, эту логику очень просто понять)
Представлено здесь, вы можете использовать ForkJoin, чтобы что-то делать - классикаFibonacciВы можете использовать мышление «разделяй и властвуй» для расчета (не верьте, вы можете следовать вышеизложенному один за другимАлгоритм «разделяй и властвуй»Задайте себе вопрос? ), брать напрямуюОфициальные документы(Обратите внимание на метод вычисления), добавьте дополнительный основной метод, чтобы увидеть:
@Slf4j
public class ForkJoinDemo {
public static void main(String[] args) {
int n = 20;
// 为了追踪子线程名称,需要重写 ForkJoinWorkerThreadFactory 的方法
final ForkJoinPool.ForkJoinWorkerThreadFactory factory = pool -> {
final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
worker.setName("my-thread" + worker.getPoolIndex());
return worker;
};
//创建分治任务线程池,可以追踪到线程名称
ForkJoinPool forkJoinPool = new ForkJoinPool(4, factory, null, false);
// 快速创建 ForkJoinPool 方法
// ForkJoinPool forkJoinPool = new ForkJoinPool(4);
//创建分治任务
Fibonacci fibonacci = new Fibonacci(n);
//调用 invoke 方法启动分治任务
Integer result = forkJoinPool.invoke(fibonacci);
log.info("Fibonacci {} 的结果是 {}", n, result);
}
}
@Slf4j
class Fibonacci extends RecursiveTask<Integer> {
final int n;
Fibonacci(int n) {
this.n = n;
}
@Override
public Integer compute() {
//和递归类似,定义可计算的最小单元
if (n <= 1) {
return n;
}
// 想查看子线程名称输出的可以打开下面注释
//log.info(Thread.currentThread().getName());
Fibonacci f1 = new Fibonacci(n - 1);
// 拆分成子任务
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
// f1.join 等待子任务执行结果
return f2.compute() + f1.join();
}
}
Результат выполнения следующий:
На данный момент я считаю, что основное использование завершено.ForkJoinPool используется в приведенном выше коде, и проблема заключается в следующем:
Поскольку объединение — это класс идей, в Java уже есть
ThreadPoolExecutor
, зачем делать ещеForkJoinPool
Шерстяная ткань?
С помощью следующей картинки вспомним принцип реализации ThreadPoolExecutor (подробнее см.Зачем использовать пулы потоков):
На первый взгляд это типично生产者/消费者
модель,Потоки-потребители потребляют отправленные задачи из общей очереди задач.. Простая параллельная работа ThreadPoolExecutor в основном предназначена дляЗадачи с неопределенным временем выполнения(ввод-вывод или задачи по времени и т. д.)
Для JDK невозможно многократно строить колеса.На самом деле, идею «разделяй и властвуй» также можно понимать как отношение зависимости задачи родитель-потомок.Когда уровень зависимости очень глубокий, используйтеThreadPoolExecutor
разобраться с этой взаимосвязью заведомо не реально, поэтомуForkJoinPool
появился как функциональная добавка
ForkJoinPool
После того, как задачи разделены, есть зависимости, и конкуренция между потоками должна быть уменьшена, поэтому пусть потоки выполняют свои собственные задачи.ThreadPoolExecutor
в виде одной TaskQueue ,ForkJoinPool
Он представлен в виде нескольких TaskQueues, которые просто представлены в виде графика, который выглядит следующим образом:
Существует несколько очередей задач, поэтому в ForkJoinPool есть переменная-член в виде массива.WorkQueue[]
. Это снова проблема
Существует несколько очередей задач. В какую очередь помещается отправленная задача? (на картинке выше
Router Rule
часть)
Для этого требуется набор правил маршрутизации.Как можно понять из приведенного выше демо-кода, необходимо выполнить две основные задачи:
-
Есть внешние прямые подчинения (submission task)
-
Также есть задачи, которые разветвляются сами по себе (worker task)
Чтобы еще больше разграничить эти две задачи, Дуг Ли разработал простое правило маршрутизации:
- Буду
submission task
в массив WorkQueue「偶数」
подписка - Буду
worker task
в WorkQueue「奇数」
В индексах, и только нечетные индексы имеют потоки (рабочие) относительно них
Он должен быть обогащен локально.Картинка выше выглядит так:
Время выполнения каждой задачи разное (в глазах ЦП, конечно), задачи рабочей очереди быстро выполняющегося потока могут быть пустыми.В целях максимального использования ресурсов ЦП допускаются простаивающие потоки чтобы взять другое содержимое очередей задач, этот процесс называетсяwork-stealing
(кража работы)
Текущий поток хочет выполнить задачу, и другие потоки могут прийти, чтобы украсть задачу, что вызовет конкуренцию.Чтобы уменьшить конкуренцию, WorkQueue спроектирован как двусторонняя очередь:
- Поддержка операций LIFO (последним пришел-первым вышел) push (положить) и pop (взять) ——Управляйте верхней стороной
- Операции опроса (взятия), поддерживающие FIFO (первым поступил – первым обслужен) --Управляйте базовой стороной
Поток (воркер) оперирует своей собственной WorkQueue по умолчанию как операция LIFO (опционально FIFO).Когда поток (воркер) пытается украсть задачи в других WorkQueues, он в это время выполняет операцию FIFO, то есть ворует из базы сторону и обогащает ее фотографиями. Вот и все:
Преимущества этого очень очевидны:
- Операции LIFO могут выполняться только соответствующим работником, а push и pop не должны учитывать параллелизм.
- При разделении чем больше задача, тем больше она находится в конце WorkQueue, и ее можно как можно быстрее разложить, чтобы она как можно быстрее попала в расчет.
Это также видно из модификаторов переменных-членов WorkQueue (у base есть модификатор volatile, а у top нет):
volatile int base; // index of next slot for poll
int top; // index of next slot for push
На данный момент, я полагаю, вы уже понимаете основной принцип реализации ForkJoinPool, но он также будет сопровождаться множеством вопросов (как это реализовано?), таких как:
- Если есть конкуренция, нужны блокировки Как ForkJoinPool контролирует состояние?
- Как количество динамиков для контроля Forkjoinpool?
- Какова конкретная логика упомянутых выше правил маршрутизации?
- ......
Сохраните эти вопросы и немного посмотрите на исходный код, чтобы понять:
Анализ исходного кода (JDK 1.8)
Исходный код ForkJoinPool включает в себя множество битовых операций, основная часть будет объяснена здесь, если вы хотите понять глубже, вам нужно отследить и проверить это самостоятельно.
Сочетая вышеизложенное предзнаменование, вы должны знать, что в ForkJoinPool есть три важные роли:
- ForkJoinWorkerThread (наследует Thread): это поток (Worker), о котором мы упоминали выше.
- WorkQueue: двунаправленная очередь задач
- ForkJoinTask: объект, выполняемый работником
Весь процесс анализа исходного кода также объясняется методами этих классов, но прежде чем понять эти три роли, нам нужно понять, что ForkJoinPool проложил для этих трех ролей.
История должна начаться с метода построения ForkJoinPool.
Конструктор ForkJoinPool
public ForkJoinPool() {
this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
defaultForkJoinWorkerThreadFactory, null, false);
}
public ForkJoinPool(int parallelism) {
this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
В дополнение к указанным выше трем методам построения в JDK1.8 был добавлен еще один способ инициализации объекта ForkJoinPool (QQ: что это за шаблон проектирования?):
static final ForkJoinPool common;
/**
* @return the common pool instance
* @since 1.8
*/
public static ForkJoinPool commonPool() {
// assert common != null : "static init error";
return common;
}
Common инициализируется в статическом блоке (выполняется только один раз):
common = java.security.AccessController.doPrivileged
(new java.security.PrivilegedAction<ForkJoinPool>() {
public ForkJoinPool run() { return makeCommonPool(); }});
private static ForkJoinPool makeCommonPool() {
int parallelism = -1;
... 其他默认初始化内容
if (parallelism < 0 && // default 1 less than #cores
(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
parallelism = 1;
if (parallelism > MAX_CAP)
parallelism = MAX_CAP;
// 执行上面的构造方法
return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
"ForkJoinPool.commonPool-worker-");
}
Поскольку это одноэлементный универсальный ForkJoinPool, помните:
Если вы используете общий ForkJoinPool, лучше всего выполнять только вычислительные операции с интенсивным использованием ЦП и не иметь неопределенного содержимого ввода-вывода в задаче, чтобы не перетаскивать весь
Все вышеперечисленные конструкторы в конечном итоге вызывают этот закрытый метод:
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int mode,
String workerNamePrefix) {
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.config = (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
Параметров очень много, вот значение каждого параметра:
серийный номер | имя параметра | описать/объяснить |
---|---|---|
1 | parallelism | Параллелизм, это не определенное количество потоков, конкретное количество потоков и длина WorkQueue рассчитываются на основе этого параллелизма с помощью вышеуказанногоmakeCommonPoolМетод может знать, что значение параллелизма по умолчанию равно количеству потоков ядра ЦП минус 1. |
2 | factory | Очень распространено, создайте фабричный интерфейс для ForkJoinWorkerThread. |
3 | handler | обработчик исключений для каждого потока |
4 | mode | Упомянутый выше режим WorkQueue, LIFO/FIFO; |
5 | workerNamePrefix | Имя префикса для ForkJoinWorkerThread |
6 | ctl | Поле основного управляющего потока пула потоков |
В конструкторе уже есть битовые операции, это слишком сложно:
Если вы хотите узнать значение переменных членов Config of Forkjoinpool, вы должны осторожно разобраться.
static final int SMASK = 0xffff; // short bits == max index
this.config = (parallelism & SMASK) | mode;
parallelism & SMASK
На самом деле это делается для того, чтобы значение параллелизма не могло быть больше SMASK.Все вышеперечисленные методы построения будут вызываться при передаче параллелизма.checkParallelism
Для проверки легитимности:
static final int MAX_CAP = 0x7fff; // max #workers - 1
private static int checkParallelism(int parallelism) {
if (parallelism <= 0 || parallelism > MAX_CAP)
throw new IllegalArgumentException();
return parallelism;
}
Вы можете видеть, что максимальное значение параллелизма равноMAX_CAP
сейчас,0x7fff
определенно меньше, чем0xffff
. Таким образом, значение config на самом деле:
this.config = parallelism | mode;
Предполагается, что параллелизмMAX_CAP
, а затем с режимом或运算
, из которых есть три режима:
- LIFO_QUEUE
- FIFO_QUEUE
- SHARED_QUEUE
Ниже приведен пример LIFO_QUEUE и FIFO_QUEUE:
// Mode bits for ForkJoinPool.config and WorkQueue.config
static final int MODE_MASK = 0xffff << 16; // top half of int
static final int LIFO_QUEUE = 0;
static final int FIFO_QUEUE = 1 << 16;
static final int SHARED_QUEUE = 1 << 31; // must be negative
такparallelism | mode
В зависимости от режима есть два результата, но вы получите подтверждающее сообщение:
17-й бит конфигурации представляет режим, а младшие 15 бит представляют параллелизм.
Когда нам нужно получить режим режима из конфигурации, просто используйте маску режима (MODE_MASK) и конфигурацию, чтобы сделать与运算
просто хорошо
Итак, картинка, резюмирующая конфиг:
long np = (long)(-parallelism); // offset ctl counts
Вышеприведенный код предназначен для преобразования параллелизма, дополняющего параллелизм, в длинный тип, вMAX_CAP
В качестве примера параллелизма значение np равно
Значение этого np будет использоваться для вычисления переменной-члена ForkJoinPool ctl:
// Active counts 活跃线程数
private static final int AC_SHIFT = 48;
private static final long AC_UNIT = 0x0001L << AC_SHIFT;
private static final long AC_MASK = 0xffffL << AC_SHIFT;
// Total counts 总线程数
private static final int TC_SHIFT = 32;
private static final long TC_UNIT = 0x0001L << TC_SHIFT;
private static final long TC_MASK = 0xffffL << TC_SHIFT;
private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
// 计算 ctl
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
-
np << AC_SHIFT
То есть np перемещается на 48 бит влево, так что исходные младшие 16 бит становятся старшими 16 битами, а затем используется маска AC (AC_MASK) для выполнения与运算
, то есть от 49 до 64 бит ctl представляют количество активных потоков -
np << TC_SHIFT
То есть сдвиньте np влево на 32 бита, чтобы исходные младшие 16 бит стали от 33 до 48 бит, а затем используйте маску TC, чтобы сделать与运算
, то есть от 33 до 48 бит ctl представляют общее количество потоков
Наконец, два снова объединяются по ИЛИ, если степень параллелизма все ещеMAX_CAP
, окончательный результат ctl:
На данный момент мы только что закончили чтение содержимого конструктора.Из окончательного заключения мы видим, что AC = TC после инициализации, а ctl — число меньше нуля, ctl — 64-битный длинный тип, и как построение младших 32 бит не отражается в конструкторе, но комментарии дают четкое описание:
/*
* Bits and masks for field ctl, packed with 4 16 bit subfields:
* AC: Number of active running workers minus target parallelism
* TC: Number of total workers minus target parallelism
* SS: version count and status of top waiting thread
* ID: poolIndex of top of Treiber stack of waiters
*
* When convenient, we can extract the lower 32 stack top bits
* (including version bits) as sp=(int)ctl. The offsets of counts
* by the target parallelism and the positionings of fields makes
* it possible to perform the most common checks via sign tests of
* fields: When ac is negative, there are not enough active
* workers, when tc is negative, there are not enough total
* workers. When sp is non-zero, there are waiting workers. To
* deal with possibly negative fields, we use casts in and out of
* "short" and/or signed shifts to maintain signedness.
*
* Because it occupies uppermost bits, we can add one active count
* using getAndAddLong of AC_UNIT, rather than CAS, when returning
* from a blocked join. Other updates entail multiple subfields
* and masking, requiring CAS.
*/
Этот комментарий в основном объясняет роль младших 32 бит (это будет отражено в исходном коде позже, вот впечатление, которое поможет прочитать исходный код позже), по смыслу комментария сначала улучшите значение ctl:
- **SS:** Состояние и номер версии верхнего рабочего потока в стеке (каждый поток будет содержать индекс рабочей очереди, в которой находится предыдущий ожидающий поток, когда он приостановлен, таким образом формируя стек ожидающих рабочих потоков, а вершина стека — это последний ожидающий поток потока), первый бит указывает на статус
1:不活动(inactive)
;0:活动(active)
, последние 15 представляют собой номер версии для предотвращения проблем с ABA. - ID:Индекс рабочей очереди, в которой находится верхний рабочий поток.
В примечании также говорится, что другойsp=(int)ctl
, то есть получить младшие 32 бита 64-битного ctl (SS | ID
), потому что младшие 32 бита — это значения, которые существуют только после создания потока, поэтому делается вывод, что если sp != 0, есть ожидающий рабочий поток, просто проснитесь и используйте его, не создавая новый нить. Таким образом, вы можете получить всю необходимую информацию о потоке через ctl
В дополнение к переменным-членам, созданным конструктором, ForkJoinPool также имеет очень важную переменную-членrunState
, как вы узнали ранее, пулы потоков также нуждаются в состоянии для управления
volatile int runState; // lockable status
// runState bits: SHUTDOWN must be negative, others arbitrary powers of two
private static final int RSLOCK = 1; //线程池被锁定
private static final int RSIGNAL = 1 << 1; //线程池有线程需要唤醒
private static final int STARTED = 1 << 2; //线程池已经初始化
private static final int STOP = 1 << 29; //线程池停止
private static final int TERMINATED = 1 << 30; //线程池终止
private static final int SHUTDOWN = 1 << 31; //线程池关闭
runState
Есть вышеперечисленные 6 переключателей состояний, судя по комментариям, толькоSHUTDOWN
Состояние — это отрицательное число, а остальные — целые числа. Для изменения состояния в параллельной среде необходимо использовать блокировки. ForkJoinPool блокирует и разблокирует пул потоков,lockRunState
а такжеunlockRunState
Реализовать (эти два метода можно временно пропустить без глубокого понимания, просто нужно понимать, что это блокировки, помогающие безопасно изменить состояние пула потоков)
Ничего страшного, что я не знаю больше, но я не могу не написать... Разве ты не должен прочитать это позже?
lockRunState
/**
* Acquires the runState lock; returns current (locked) runState.
*/
// 从方法注释中看到,该方法一定会返回 locked 的 runState,也就是说一定会加锁成功
private int lockRunState() {
int rs;
return ((((rs = runState) & RSLOCK) != 0 ||
!U.compareAndSwapInt(this, RUNSTATE, rs, rs |= RSLOCK)) ?
awaitRunStateLock() : rs);
}
- Поскольку RSLOCK = 1, если runState & RSLOCK == 0, это означает, что в данный момент блокировки нет, введите
或运算
Вторая половина КАС - Сначала попытайтесь заблокировать через CAS, вернитесь напрямую, если попытка удалась, и вызовите, если попытка не удалась.
awaitRunStateLock
метод
/**
* Spins and/or blocks until runstate lock is available. See
* above for explanation.
*/
private int awaitRunStateLock() {
Object lock;
boolean wasInterrupted = false;
for (int spins = SPINS, r = 0, rs, ns;;) {
//判断是否加锁(==0表示未加锁)
if (((rs = runState) & RSLOCK) == 0) {
// 通过CAS加锁
if (U.compareAndSwapInt(this, RUNSTATE, rs, ns = rs | RSLOCK)) {
if (wasInterrupted) {
try {
// 重置线程终端标记
Thread.currentThread().interrupt();
} catch (SecurityException ignore) {
// 这里竟然 catch 了个寂寞
}
}
// 加锁成功返回最新的 runState,for 循环的唯一正常出口
return ns;
}
}
else if (r == 0)
r = ThreadLocalRandom.nextSecondarySeed();
else if (spins > 0) {
r ^= r << 6; r ^= r >>> 21; r ^= r << 7; // xorshift
if (r >= 0)
--spins;
}
// Flag1 如果是其他线程正在初始化占用锁,则调用 yield 方法让出 CPU,让其快速初始化
else if ((rs & STARTED) == 0 || (lock = stealCounter) == null)
Thread.yield(); // initialization race
// Flag2 如果其它线程持有锁,并且线程池已经初始化,则将唤醒位标记为1
else if (U.compareAndSwapInt(this, RUNSTATE, rs, rs | RSIGNAL)) {
// 进入互斥锁
synchronized (lock) {
// 再次判断,如果等于0,说明进入互斥锁前刚好有线程进行了唤醒,就不用等待,直接进行唤醒操作即可,否则就进入等待
if ((runState & RSIGNAL) != 0) {
try {
lock.wait();
} catch (InterruptedException ie) {
if (!(Thread.currentThread() instanceof
ForkJoinWorkerThread))
wasInterrupted = true;
}
}
else
lock.notifyAll();
}
}
}
}
Строки с 33 по 34 (Flag1) и с 36 по 50 (Flag2) приведенного выше кода, если вы не читали последующий код, сейчас немного сложно понять, я объясню это заранее:
Flag1:Когда ForkJoinPool полностью инициализирован, непосредственно используется атомарная переменнаяstealCounter, потому что StealCounter будет присвоено значение во время инициализации (при вызове externalSubmit). Поэтому логика здесь такова, что когда состояние не STARTED или StealCounter пустой, пусть поток ждет, то есть другие потоки не были полностью инициализированы, пусть продолжают занимать инициализацию блокировки
Flag2:Когда мы говорили о модели ожидания/уведомления, мы говорили: не позволяйте бесконечным вращениям пытаться, ждите, если ресурсы не удовлетворены, уведомляйте, если ресурсы удовлетворены, поэтому, если(runState & RSIGNAL) == 0
Устанавливается, указывая на то, что есть поток, который нужно разбудить, просто разбудить сразу, иначе не тратьте ресурсы, подождите некоторое время
При чтении этого кода сразу возникают два вопроса:
Q1:Поскольку это блокировка, почему бы не использовать существующий колесный ReentrantLock?
**PS:** Если вы читали серию статей о параллелизмеСинхронизатор очереди Java AQS и применение ReentrantLock, вы узнаете, что ReentrantLock использует полное состояние поля для управления состоянием синхронизации. Но здесь состояние пула потоков также будет оцениваться при конкуренции за блокировки.Если это состояние инициализации, он будет активно уступать и отдавать ЦП, чтобы уменьшить конкуренцию; кроме того, использование полного runState для представления состояния также отражает более мелкую зернистость.
Q2:Хотя синхронизированный метод хорош, все мы знаем, что это относительно тяжеловесная блокировка, почему она до сих пор применяется здесь?
PS:Во-первых, синхронизация постоянно оптимизируется, и она уже не такая тяжелая, как при первом появлении.Кроме того, согласно кодовому значению флага 2, вероятность входа в синхронизированный блок синхронизации по-прежнему очень мала. )
Есть блокировка для естественной разблокировки, смотрите ниже unlockRunState
unlockRunState
Логика разблокировки относительно проста, и общая цель состоит в том, чтобы сбросить бит флага блокировки. Если состояние успешно изменено на целевое, естественная разблокировка прошла успешно, в противном случае это означает, что в ожидание вошел другой поток, и вам нужно вызвать notifyAll, чтобы проснуться и попытаться конкурировать снова.
/**
* Unlocks and sets runState to newRunState.
*
* @param oldRunState a value returned from lockRunState
* @param newRunState the next value (must have lock bit clear).
*/
private void unlockRunState(int oldRunState, int newRunState) {
if (!U.compareAndSwapInt(this, RUNSTATE, oldRunState, newRunState)) {
Object lock = stealCounter;
runState = newRunState; // clears RSIGNAL bit
if (lock != null)
synchronized (lock) { lock.notifyAll(); }
}
}
Эти два метода проходят через последующий анализ кода, уделите больше вниманияunlockRunState
Кроме того, вы можете видеть, что все уведомления используются notifyAll, а не notify. Мы уже обращали внимание на эту проблему раньше, вы помните, почему? Если не помните, откройтеОжидание механизма уведомления о параллельном программированиизапомнить
Первый пласт знаний почти готов, идем дальше
invoke/submit/execute
Возвращаясь к демонстрации с основной функцией в начале этой статьи, мы вызываем метод вызова для отправки задач в ForkJoinPool, Фактически, ForkJoinPool также поддерживает методы отправки и выполнения для отправки задач. Параллельный игровой процесс очень похож, и задачи этих трех типов методов также хорошо различаются:
- вызывать: отправить задачу и дождаться возврата результата выполнения
- submit: отправить и немедленно вернуть задачу ForkJoinTask реализует Future и может в полной мере использовать возможности Future.
- выполнить: только отправлять задачи
На основе этих трех категорий перегружаются еще несколько более мелких методов, которые здесь не перечислены:
public <T> T invoke(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
return task.join();
}
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
return task;
}
public void execute(ForkJoinTask<?> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
}
Я полагаю, вы обнаружили, что метод отправки задачи вызовет использование externalPush(task), и, наконец, появится главный герой исходного кода.
но......
Если вы посмотрите на код externalPush, первая строка — это объявление переменной массива WorkQueue.Чтобы сделать последующий процесс более плавным, мы должны проложить путь для некоторых знаний о WorkQueue (и проложить путь)
WorkQueue
Глядя на такое количество переменных-членов, я все еще в панике, но нам нужно поставить только несколько основных.
//初始队列容量
static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
//最大队列容量
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
// Instance fields
volatile int scanState; // versioned, <0: inactive; odd:scanning
int stackPred; // pool stack (ctl) predecessor 前任池(WorkQueue[])索引,由此构成一个栈
int nsteals; // number of steals 偷取的任务个数
int hint; // randomization and stealer index hint 记录偷取者的索引,方便后面顺藤摸瓜
int config; // pool index and mode
volatile int qlock; // 1: locked, < 0: terminate; else 0
volatile int base; // index of next slot for poll
int top; // index of next slot for push
ForkJoinTask<?>[] array; // the elements (initially unallocated) 任务数组
final ForkJoinPool pool; // the containing pool (may be null)
final ForkJoinWorkerThread owner; // owning thread or null if shared 当前工作队列的工作线程,共享模式下为null
volatile Thread parker; // == owner during call to park; else null 调用park阻塞期间为owner,其他情况为null
volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin 记录当前join来的任务
volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer 记录从其他工作队列偷取过来的任务
Как мы сказали выше, WorkQueue — двусторонняя очередь, у пула потоков есть runState, а у WorkQueue — scanState.
- Меньше нуля: неактивно (неактивное состояние)
- Нечетное число: сканирование (статус сканирования)
- Четное число: работает (рабочее состояние)
Пул потоков операций нуждается в блокировках, и очередь операций также нуждается в блокировках, поэтому qlock пригодится.
- 1: заблокировано
- 0: разблокировано
- меньше нуля: статус завершения
Конфиг в WorkQueue тоже есть, но он отличается от ForkJoinPool.Конфиг в WorkQueue записывает индекс и режим WorkQueue в массив WorkQueue[].
Напишем значение остальных полей в комментариях к коду.Вновь появляется главный герой, на этот раз это правда
externalPush
Как упоминалось ранее в статье, задачи подразделяются наsubmission task
а такжеworker task
,worker task
даfork
аут, входящий из этого подъезда естественно такой жеsubmission task
, что значит:
пройти через
invoke()
|submit()
|execute()
Задача, представленная другим способом,submission task
, будет помещен в массив WorkQueueчетноеположение индексапередача
fork()
Сгенерированная методом задача, называемая рабочей задачей, будет помещена в массив WorkQueue.нечетное числоположение индекса
Комментарии к этому методу также очень четко написаны, подробности см. в комментариях к коду.
/**
* Tries to add the given task to a submission queue at
* submitter's current queue. Only the (vastly) most common path
* is directly handled in this method, while screening for need
* for externalSubmit.
*
* @param task the task. Caller must ensure non-null.
*/
final void externalPush(ForkJoinTask<?> task) {
WorkQueue[] ws; WorkQueue q; int m;
//Flag1: 通过ThreadLocalRandom产生随机数,用于下面计算槽位索引
int r = ThreadLocalRandom.getProbe();
int rs = runState; //初始状态为0
//Flag2: 如果ws,即ForkJoinPool中的WorkQueue数组已经完成初始化,且根据随机数定位的index存在workQueue,且cas的方式加锁成功
if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
//对WorkQueue操作加锁
U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask<?>[] a; int am, n, s;
//WorkQueue中的任务数组不为空
if ((a = q.array) != null &&
(am = a.length - 1) > (n = (s = q.top) - q.base)) { //组长度大于任务个数,不需要扩容
int j = ((am & s) << ASHIFT) + ABASE; //WorkQueue中的任务数组不为空
U.putOrderedObject(a, j, task); //向Queue中放入任务
U.putOrderedInt(q, QTOP, s + 1);//top值加一
U.putIntVolatile(q, QLOCK, 0); //对WorkQueue操作解锁
//任务个数小于等于1,那么此槽位上的线程有可能等待,如果大家都没任务,可能都在等待,新任务来了,唤醒,起来干活了
if (n <= 1)
//唤醒可能存在等待的线程
signalWork(ws, q);
return;
}
//任务入队失败,前面加锁了,这里也要解锁
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
//Flag3: 不满足上述条件,也就是说上面的这些 WorkQueue[]等都不存在,就要通过这个方法一切从头开始创建
externalSubmit(task);
}
Выше были добавлены три флага, чтобы все могли лучше понять код, необходимо дополнительно пояснить:
Flag1:ThreadLocalRandom является производным от ThreadLocal. Проба по умолчанию для каждого потока равна 0. Когда поток вызывает ThreadLocalRandom.current(), начальное значение и проба будут инициализированы и поддерживаются внутри потока. Здесь полезно знать, что случайное число генерируется Конкретные детали Это стоит увидеть своими глазами
Flag2:Здесь много информации
// 二进制为:0000 0000 0000 0000 0000 0000 0111 1110
static final int SQMASK = 0x007e; // max 64 (even) slots
- Значение m представляет максимальную таблицу ниже массива WorkQueue.
- m & r гарантирует, что часть случайного числа r больше m недоступна.
- m & r & SQMASK Поскольку последняя цифра SQMASK равна 0, окончательный результат будет четным.
- r != 0 указывает, что текущий поток уже инициализировал некоторый контент
- rs > 0 означает, что runState ForkJoinPool также был инициализирован.
Flag3:Прочитав описание флага 2, вы сможете очень хорошо понять флаг 3. Если вы отправляете задачу в первый раз, вы должны перейти к флагу 3.externalSubmit
метод
externalSubmit
Этот метод очень длинный, но не превышает 80 строк, подробности смотрите в комментариях к методу.
//初始化所需要的一切
private void externalSubmit(ForkJoinTask<?> task) {
int r; // initialize caller's probe
//生成随机数
if ((r = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit();
r = ThreadLocalRandom.getProbe();
}
for (;;) {
WorkQueue[] ws; WorkQueue q; int rs, m, k;
boolean move = false;
// 如果线程池的状态为终止状态,则帮助终止
if ((rs = runState) < 0) {
tryTerminate(false, false); // help terminate
throw new RejectedExecutionException();
}
//Flag1: 再判断一次状态是否为初始化,因为在lockRunState过程中有可能状态被别的线程更改了
else if ((rs & STARTED) == 0 || // initialize
((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
int ns = 0;
//Flag1.1: 加锁
rs = lockRunState();
try {
if ((rs & STARTED) == 0) {
// 初始化stealcounter的值(任务窃取计数器,原子变量)
U.compareAndSwapObject(this, STEALCOUNTER, null,
new AtomicLong());
// create workQueues array with size a power of two
//取config的低16位(确切说是低15位),获取并行度
int p = config & SMASK; // ensure at least 2 slots
//Flag1.2: 如果你看过HashMap 的源码,这个就很好理解了,获取2次幂大小
int n = (p > 1) ? p - 1 : 1;
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;
n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
//初始化 WorkQueue 数组
workQueues = new WorkQueue[n];
// 标记初始化完成
ns = STARTED;
}
} finally {
// 解锁
unlockRunState(rs, (rs & ~RSLOCK) | ns);
}
}
//Flag2 上面分析过,取偶数位槽位,将任务放进偶数槽位
else if ((q = ws[k = r & m & SQMASK]) != null) {
// 对 WorkQueue 加锁
if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask<?>[] a = q.array;
int s = q.top;
// 初始化任务提交标识
boolean submitted = false; // initial submission or resizing
try { // locked version of push
//计算内存偏移量,放任务,更新top值
if ((a != null && a.length > s + 1 - q.base) ||
(a = q.growArray()) != null) {
int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
U.putOrderedObject(a, j, task);
U.putOrderedInt(q, QTOP, s + 1);
//提交任务成功
submitted = true;
}
} finally {
//WorkQueue解锁
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
// 任务提交成功了
if (submitted) {
//自然要唤醒可能存在等待的线程来处理任务了
signalWork(ws, q);
return;
}
}
//任务提交没成功,可以重新计算随机数,再走一次流程
move = true; // move on failure
}
//Flag3: 接Flag2,如果找到的槽位是空,则要初始化一个WorkQueue
else if (((rs = runState) & RSLOCK) == 0) { // create new queue
q = new WorkQueue(this, null);
// 设置工作队列的窃取线索值
q.hint = r;
// 如上面 WorkQueue 中config 的介绍,记录当前WorkQueue在WorkQueue[]数组中的值,和队列模式
q.config = k | SHARED_QUEUE;
// 初始化为 inactive 状态
q.scanState = INACTIVE;
//加锁
rs = lockRunState(); // publish index
if (rs > 0 && (ws = workQueues) != null &&
k < ws.length && ws[k] == null)
ws[k] = q; // else terminated
//解锁
unlockRunState(rs, rs & ~RSLOCK);
}
else
move = true; // move if busy
if (move)
r = ThreadLocalRandom.advanceProbe(r);
}
}
Flag1.1 :Есть деталь, о которой нужно сказать, мы находимся вСинхронизатор очереди Java AQS и применение ReentrantLockКогда я упомянул парадигму использования блокировок и почему они используются,ForkJoinPool
Этой парадигмы придерживаются и здесь.
Lock lock = new ReentrantLock();
lock.lock();
try{
...
}finally{
lock.unlock();
}
Flag1.2:Краткое описание этого процесса заключается в инициализации массивов WorkQueue[] разного размера в соответствии с разной степенью параллелизма.Размер массива должен быть n-й степенью 2, поэтому я дам вам таблицу, чтобы интуитивно понять взаимосвязь между параллелизмом и емкостью очереди:
параллелизм р | емкость |
---|---|
1, 2 | 4 |
3, 4 | 8 |
от 5 до 8 | 16 |
с 9 до 16 | 32 |
Флаг 1, 2, 3:Если вы понимаете описанный выше метод, то очевидно, что логический порядок внутри этого метода должен выполняться впервые.Flag1
——>Flag3
——>Flag2
externalSubmit вызывается, если задача была успешно отправленаsignalWork
метод
signalWork
Предвещаемое знание пригодится в больших масштабах (напала большая волна зомби), готовы?
Если функция переменной-члена ctl ForkJoinPool была забыта, включите ее и снова запомните.
//常量值
static final int SS_SEQ = 1 << 16; // version count
final void signalWork(WorkQueue[] ws, WorkQueue q) {
long c; int sp, i; WorkQueue v; Thread p;
// ctl 小于零,说明活动的线程数 AC 不够
while ((c = ctl) < 0L) { // too few active
// 取ctl的低32位,如果为0,说明没有等待的线程
if ((sp = (int)c) == 0) { // no idle workers
// 取TC的高位,如果不等于0,则说明目前的工作着还没有达到并行度
if ((c & ADD_WORKER) != 0L) // too few workers
//添加 Worker,也就是说要创建线程了
tryAddWorker(c);
break;
}
//未开始或者已停止,直接跳出
if (ws == null) // unstarted/terminated
break;
//i=空闲线程栈顶端所属的工作队列索引
if (ws.length <= (i = sp & SMASK)) // terminated
break;
if ((v = ws[i]) == null) // terminating
break;
//程序执行到这里,说明有空闲线程,计算下一个scanState,增加了版本号,并且调整为 active 状态
int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState
int d = sp - v.scanState; // screen CAS
//计算下一个ctl的值,活动线程数 AC + 1,通过stackPred取得前一个WorkQueue的索引,重新设置回sp,行程最终的ctl值
long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
//更新 ctl 的值
if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
v.scanState = vs; // activate v
//如果有线程阻塞,则调用unpark唤醒即可
if ((p = v.parker) != null)
U.unpark(p);
break;
}
//没有任务,直接跳出
if (q != null && q.base == q.top) // no more work
break;
}
}
Если предположить, что программа только что начала выполняться, количество активных потоков и общее количество потоков не должны удовлетворять требованиям параллелизма, тогда будет выполнен вызов.tryAddWorker
метод
tryAddWorker
Логика tryAddWorker очень проста, потому что это пул потоков операций, который также будет использоваться.lockRunState
/unlockRunState
блокировка управления
private void tryAddWorker(long c) {
//初始化添加worker表识
boolean add = false;
do {
//因为要添加Worker,所以AC和TC都要加一
long nc = ((AC_MASK & (c + AC_UNIT)) |
(TC_MASK & (c + TC_UNIT)));
//ctl还没被改变
if (ctl == c) {
int rs, stop; // check if terminating
if ((stop = (rs = lockRunState()) & STOP) == 0)
//更新ctl 的值,
add = U.compareAndSwapLong(this, CTL, c, nc);
unlockRunState(rs, rs & ~RSLOCK);
if (stop != 0)
break;
//ctl值更新成功,开始真正的创建Worker
if (add) {
createWorker();
break;
}
}
// 重新获取ctl,并且没有达到最大线程数,并且没有空闲的线程
} while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
}
Все идет хорошо, нам нужно вызвать метод createWorker для создания настоящего Worker, ситуация постепенно проясняется
createWorker
Представлены WorkerQueue и ForkJoinTask, последняя из трех важных ролей, упомянутых выше.ForkJoinWorkerThread
наконец появился
private boolean createWorker() {
ForkJoinWorkerThreadFactory fac = factory;
Throwable ex = null;
ForkJoinWorkerThread wt = null;
try {
//如果工厂已经存在了,就用factory来创建线程,会去注册线程,这里的this就是ForkJoinPool对象
if (fac != null && (wt = fac.newThread(this)) != null) {
//启动线程
wt.start();
return true;
}
} catch (Throwable rex) {
ex = rex;
}
//如果创建线程失败,就要逆向注销线程,包括前面对ctl等的操作
deregisterWorker(wt, ex);
return false;
}
То, как рабочий поток соответствует WorkQueue, скрыто вfac.newThread(this)
В этом методе следующий код показывает вызывающий процесс
public ForkJoinWorkerThread newThread(ForkJoinPool pool);
static final class DefaultForkJoinWorkerThreadFactory
implements ForkJoinWorkerThreadFactory {
public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
return new ForkJoinWorkerThread(pool);
}
}
protected ForkJoinWorkerThread(ForkJoinPool pool) {
// Use a placeholder until a useful name can be set in registerWorker
super("aForkJoinWorkerThread");
this.pool = pool;
this.workQueue = pool.registerWorker(this);
}
Очевидно, что основное содержаниеregisterWorker
в методе
registerWorker
WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
this.pool = pool;
this.owner = owner;
// Place indices in the center of array (that is not yet allocated)
base = top = INITIAL_QUEUE_CAPACITY >>> 1;
}
final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
UncaughtExceptionHandler handler;
//这里线程被设置为守护线程,因为,当只剩下守护线程时,JVM就会推出
wt.setDaemon(true); // configure thread
//填补处理异常的handler
if ((handler = ueh) != null)
wt.setUncaughtExceptionHandler(handler);
//创建一个WorkQueue,并且设置当前WorkQueue的owner是当前线程
WorkQueue w = new WorkQueue(this, wt);
int i = 0; // assign a pool index
//又用到了config的知识,提取出我们期望的WorkQueue模式
int mode = config & MODE_MASK;
//加锁
int rs = lockRunState();
try {
WorkQueue[] ws; int n; // skip if no array
//判断ForkJoinPool的WorkQueue[]都初始化完全
if ((ws = workQueues) != null && (n = ws.length) > 0) {
//一种魔数计算方式,用以减少冲突
int s = indexSeed += SEED_INCREMENT; // unlikely to collide
//假设WorkQueue的初始长度是16,那这里的m就是15,最终目的就是为了得到一个奇数
int m = n - 1;
//和得到偶数的计算方式一样,得到一个小于m的奇数i
i = ((s << 1) | 1) & m; // odd-numbered indices
//如果这个槽位不为空,说明已经被其他线程初始化过了,也就是有冲突,选取别的槽位
if (ws[i] != null) { // collision
int probes = 0; // step by approx half n
//步长加2,也就保证step还是奇数
int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
//一直遍历,直到找到空槽位,如果都遍历了一遍,那就需要对WorkQueue[]扩容了
while (ws[i = (i + step) & m] != null) {
if (++probes >= n) {
workQueues = ws = Arrays.copyOf(ws, n <<= 1);
m = n - 1;
probes = 0;
}
}
}
//初始化一个随机数
w.hint = s; // use as random seed
//如文章前面所说,config记录索引值和模式
w.config = i | mode;
//扫描状态也记录为索引值,如文章前面所说,奇数表示为scanning状态
w.scanState = i; // publication fence
//把初始化好的WorkQueue放到ForkJoinPool的WorkQueue[]数组中
ws[i] = w;
}
} finally {
//解锁
unlockRunState(rs, rs & ~RSLOCK);
}
//设置worker的前缀名,用于业务区分
wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
//返回当前线程创建的WorkQueue,回到上一层调用栈,也就将WorkQueue注册到ForkJoinWorkerThread里面了
return w;
}
Здесь поток успешно создан, но если поток не создан успешно, вам потребуется deregisterWorker, чтобы выполнить последующие действия.
deregisterWorker
Метод deregisterWorker получает ссылку на поток и только что созданное исключение в качестве параметров для выполнения последующих действий и отзыва работы, связанной с registerWorker.
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
WorkQueue w = null;
if (wt != null && (w = wt.workQueue) != null) {
WorkQueue[] ws; // remove index from array
//获取当前线程注册的索引值
int idx = w.config & SMASK;
//加锁
int rs = lockRunState();
//如果奇数槽位都不为空,则清空内容
if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
ws[idx] = null;
//解锁
unlockRunState(rs, rs & ~RSLOCK);
}
long c; // decrement counts
//死循环式CAS更改ctl的值,将前面AC和TC加1的值再减1,ctl就在那里,不增不减
do {} while (!U.compareAndSwapLong
(this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
(TC_MASK & (c - TC_UNIT)) |
(SP_MASK & c))));
//清空WorkQueue,将其中的task取消掉
if (w != null) {
w.qlock = -1; // ensure set
w.transferStealCount(this);
w.cancelAll(); // cancel remaining tasks
}
//可能的替换操作
for (;;) { // possibly replace
WorkQueue[] ws; int m, sp;
//如果线程池终止了,那就跳出循环即可
if (tryTerminate(false, false) || w == null || w.array == null ||
(runState & STOP) != 0 || (ws = workQueues) == null ||
(m = ws.length - 1) < 0) // already terminating
break;
//当前线程创建失败,通过sp判断,如果还存在空闲线程,则调用tryRelease来唤醒这个线程,然后跳出
if ((sp = (int)(c = ctl)) != 0) { // wake up replacement
if (tryRelease(c, ws[sp & m], AC_UNIT))
break;
}
//如果没空闲线程,并且还没有达到满足并行度的条件,那就得再次尝试创建一个线程,弥补刚刚的失败
else if (ex != null && (c & ADD_WORKER) != 0L) {
tryAddWorker(c); // create replacement
break;
}
else // don't need replacement
break;
}
if (ex == null) // help clean on way out
//处理异常
ForkJoinTask.helpExpungeStaleExceptions();
else // rethrow
ForkJoinTask.rethrow(ex);
}
Короче говоря, метод deregisterWorker отменяет регистрацию потока из пула потоков, очищает WorkQueue, одновременно обновляет ctl и, наконец, делает возможные замены.В соответствии со статусом пула потоков решите, найти ли замену своему собственному :
- Если есть бездействующий поток, разбудите его
- Пустых потоков нет, попробуйте еще раз создать новый рабочий поток.
Поток deregisterWorker четко объяснен, чтобы помочь вам полностью понять процесс, но процесс после успешного завершения registerWorker не завершен, мы должны продолжить работу с Worker, а затем вызватьwt.start()
работай
run
ForkJoinWorkerThread наследуется от Thread, после вызова метода start() естественно вызовет переопределенный им же метод run()
public void run() {
if (workQueue.array == null) { // only run once
Throwable exception = null;
try {
onStart();
//Work开始工作,处理workQueue中的任务
pool.runWorker(workQueue);
} catch (Throwable ex) {
exception = ex;
} finally {
try {
onTermination(exception);
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
pool.deregisterWorker(this, exception);
}
}
}
}
Фокус метода, естественно, состоит в том, чтобы войти в runWorker
runWorker
runWorker — очень обычная трилогия:
- сканирование: получать задачи путем сканирования
- Runtask: выполнить отсканированную задачу
- awaitWork: ни одна задача не переходит в ожидание
Подробности смотрите в примечаниях
final void runWorker(WorkQueue w) {
//初始化队列,并根据需要是否扩容为原来的2倍
w.growArray(); // allocate queue
int seed = w.hint; // initially holds randomization hint
int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift
//死循环更新偏移r,为扫描任务作准备
for (ForkJoinTask<?> t;;) {
//扫描任务
if ((t = scan(w, r)) != null)
//扫描到就执行任务
w.runTask(t);
//没扫描到就等待,如果等也等不到任务,那就跳出循环别死等了
else if (!awaitWork(w, r))
break;
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
}
}
Первый взгляд на метод сканирования
scan
Приближается механизм кражи задач ForkJoinPool Как украсть скрыто в методе сканирования
private ForkJoinTask<?> scan(WorkQueue w, int r) {
WorkQueue[] ws; int m;
//再次验证workQueue[]数组的初始化情况
if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
//获取当前扫描状态
int ss = w.scanState; // initially non-negative
//又一个死循环,注意到出口位置就好
//和前面逻辑类似,随机一个起始位置,并赋值给k
for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
int b, n; long c;
//如果k槽位不为空
if ((q = ws[k]) != null) {
//base-top小于零,并且任务q不为空
if ((n = (b = q.base) - q.top) < 0 &&
(a = q.array) != null) { // non-empty
//获取base的偏移量,赋值给i
long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
//从base端获取任务,和前文的描述的steal搭配上了,是从base端steal
if ((t = ((ForkJoinTask<?>)
U.getObjectVolatile(a, i))) != null &&
q.base == b) {
//是active状态
if (ss >= 0) {
//更新WorkQueue中数组i索引位置为空,并且更新base的值
if (U.compareAndSwapObject(a, i, t, null)) {
q.base = b + 1;
//n<-1,说明当前队列还有剩余任务,继续唤醒可能存在的其他线程
if (n < -1) // signal others
signalWork(ws, q);
//直接返回任务
return t;
}
}
else if (oldSum == 0 && // try to activate
w.scanState < 0)
tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
}
//如果获取任务失败,则准备换位置扫描
if (ss < 0) // refresh
ss = w.scanState;
r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
origin = k = r & m; // move and rescan
oldSum = checkSum = 0;
continue;
}
checkSum += b;
}
//k一直在变,扫描到最后,如果等于origin,说明已经扫描了一圈还没扫描到任务
if ((k = (k + 1) & m) == origin) { // continue until stable
if ((ss >= 0 || (ss == (ss = w.scanState))) &&
oldSum == (oldSum = checkSum)) {
if (ss < 0 || w.qlock < 0) // already inactive
break;
//准备inactive当前工作队列
int ns = ss | INACTIVE; // try to inactivate
//活动线程数AC减1
long nc = ((SP_MASK & ns) |
(UC_MASK & ((c = ctl) - AC_UNIT)));
w.stackPred = (int)c; // hold prev stack top
U.putInt(w, QSCANSTATE, ns);
if (U.compareAndSwapLong(this, CTL, c, nc))
ss = ns;
else
w.scanState = ss; // back out
}
checkSum = 0;
}
}
}
return null;
}
Если задача успешно просканирована, вам нужно вызвать метод runTask, чтобы фактически запустить задачу.
runTask
Скоро это будет близко к истине, у кражи есть задание, так что давайте приступим к делу
final void runTask(ForkJoinTask<?> task) {
if (task != null) {
scanState &= ~SCANNING; // mark as busy
//Flag1: 记录当前的任务是偷来的,至于如何执行task,是我们写在compute方法中的,我们一会看doExec() 方法
(currentSteal = task).doExec();
U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
execLocalTasks();
ForkJoinWorkerThread thread = owner;
//累加偷来的数量,亲兄弟明算帐啊,虽然算完也没啥实际意义
if (++nsteals < 0) // collect on overflow
transferStealCount(pool);
//任务执行完后,就重新更新scanState为SCANNING
scanState |= SCANNING;
if (thread != null)
thread.afterTopLevelExec();
}
}
Flag1:Метод doExec является ключом к реальному выполнению задачи. Он является ядром связывания нашего пользовательского метода вычислений. Давайте посмотрим на метод doExec.
doExec
Ситуация очень хорошая, держись, приоткрой завесу экзека, и увидишь суть
//ForkJoinTask中的抽象方法,RecursiveTask 和 RecursiveAction 都重写了它
protected abstract boolean exec();
final int doExec() {
int s; boolean completed;
if ((s = status) >= 0) {
try {
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
s = setCompletion(NORMAL);
}
return s;
}
//RecursiveTask重写的内容,终于看到我们文章开头 demo 中的compute 了
protected final boolean exec() {
result = compute();
return true;
}
На этом мы увидели суть.Пройдя такой большой круг,не так просто окончательно подключиться к переписанному нами же методу вычислений,но последней песни awaitWork в трилогии runWorker все еще нет.Давайте посмотрим Смотреть
awaitWork
Выше написано, что скан приходит на задачу.Если скан не приходит на задачу, текущий поток должен быть заблокирован.Подробности отмечены в комментариях.Можно понять вкратце.
private boolean awaitWork(WorkQueue w, int r) {
if (w == null || w.qlock < 0) // w is terminating
return false;
for (int pred = w.stackPred, spins = SPINS, ss;;) {
if ((ss = w.scanState) >= 0)
break;
else if (spins > 0) {
r ^= r << 6; r ^= r >>> 21; r ^= r << 7;
if (r >= 0 && --spins == 0) { // randomize spins
WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc;
if (pred != 0 && (ws = workQueues) != null &&
(j = pred & SMASK) < ws.length &&
//前驱任务队列还在
(v = ws[j]) != null && // see if pred parking
//并且工作队列已经激活,说明任务来了了
(v.parker == null || v.scanState >= 0))
//继续自旋等一会,别返回false
spins = SPINS; // continue spinning
}
}
//自旋之后,再次检查工作队列是否终止,若是,退出扫描
else if (w.qlock < 0) // recheck after spins
return false;
else if (!Thread.interrupted()) {
long c, prevctl, parkTime, deadline;
int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK);
if ((ac <= 0 && tryTerminate(false, false)) ||
(runState & STOP) != 0) // pool terminating
return false;
if (ac <= 0 && ss == (int)c) { // is last waiter
prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred);
int t = (short)(c >>> TC_SHIFT); // shrink excess spares
if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))
return false; // else use timed wait
parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t);
deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
}
else
prevctl = parkTime = deadline = 0L;
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this); // emulate LockSupport
w.parker = wt;
if (w.scanState < 0 && ctl == c) // recheck before park
U.park(false, parkTime);
U.putOrderedObject(w, QPARKER, null);
U.putObject(wt, PARKBLOCKER, null);
if (w.scanState >= 0)
break;
if (parkTime != 0L && ctl == c &&
deadline - System.nanoTime() <= 0L &&
U.compareAndSwapLong(this, CTL, c, prevctl))
return false; // shrink pool
}
}
return true;
}
На этом этапе весь процесс ForkJoinPool можно рассматривать как базовое понимание, но контент, о котором мы говорили ранее, взят из задачи отправки в качестве точки входа. Для метода вычисления, о котором мы только что говорили, мы написали собственную логику в соответствии с парадигмой алгоритма «разделяй и властвуй».Подробности см. в демонстрации в начале статьи.Ключевым моментом является то, что мы вызываем метод fork в вычислении, что дает нам представление о рабочей задаче. Сейчас есть возможность, продолжайте смотреть на метод fork
fork
Логика метода Fork очень проста: если текущий поток имеет тип ForkJoinWorkerThread, то есть он прошел зарегистрированный выше Worker, то напрямую вызывается метод push, чтобы поместить задачу в WorkQueue, принадлежащую текущему потоку. , в противном случае вызовите externalPush для повторного прохождения.Вся логика, упомянутая выше (решите ли вы пройти ее еще раз?)
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
//push 方法很简单,这里就不再过多解释了
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
U.putOrderedInt(this, QTOP, s + 1);
if ((n = s - b) <= 1) {
if ((p = pool) != null)
p.signalWork(p.workQueues, this);
}
else if (n >= m)
growArray();
}
}
Если будет форк, будет и соединение, продолжаем смотреть на метод соединения ()
join
Основной вызов соединения находится в doJoin, но видя так много каскадных тернарных операторов, я впадаю в панику.
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
//status,task 的运行状态
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
Заменим метод doJoin на самый знакомый метод if/else, и вдруг станет понятно.
private int doJoin() {
int s;
Thread t;
ForkJoinWorkerThread wt;
ForkJoinPool.WorkQueue w;
if((s = status) < 0) { // 有结果,直接返回
return s;
}else {
if((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
// 如果是 ForkJoinWorkerThread Worker
if((w = (wt = (ForkJoinWorkerThread) t).workQueue).tryUnpush(this) // 类似上面提到的 scan,但是是专项尝试从本工作队列里取出等待的任务
// 取出了任务,就去执行它,并返回结果
&& (s = doExec()) < 0) {
return s;
}else {
// 也有可能别的线程把这个任务偷走了,那就执行内部等待方法
return wt.pool.awaitJoin(w, this, 0L);
}
}else {
// 如果不是 ForkJoinWorkerThread,执行外部等待方法
return externalAwaitDone();
}
}
}
Среди них awaitJoin и externalAwaitDone используют две стратегии Helper (помощь) и Compensating (компенсация). Вы можете прочитать эти две стратегии самостоятельно, особенно метод awaitJoin. Настоятельно рекомендуется прочитать его самостоятельно. здесь, здесь уже не развернуть
На этом содержание, связанное с ForkJoinPool, закончилось.Чтобы дать всем лучшее понимание механизма fork/join, мы все же нарисуем несколько картинок, объясняющих
Диаграмма разветвления/соединения
Предположим, что наша большая задача — это Task(8), а наименьшая единица, которая, наконец, разделена и преобразована в исполняемый файл, — это Task(1)
Общая цель разделения задач по принципу «разделяй и властвуй» заключается в следующем:
Внешне представлена большая Задача (8), в которой на четных слотах (Пожалуйста, обратите внимание на соответствие цвета)
Если степень параллелизма неудовлетворительна, для сканирования будет создан Worker 1, а задача task(8) будет украдена с базовой стороны и выполнена для вычисления, разветвления
Выньте две задачи (4) и поместите их в WorkQueue.
При выполнении задач он всегда будет подтверждать выполнение требований параллелизма, если нет, то продолжит создавать новых воркеров, при этом продолжит форковать задачи до минимальной единицы. Worker1 вытащит задачу (4) из верхней части, чтобы продолжить вычисления и разветвление, и повторно отправит в WorkQueue.
Задача (2) - это не самая маленькая единица, поэтому он будет продолжать выпустить задачу (2), и, наконец, раздача две задачи (1) толкает на работу
task(1) уже имеет наименьшую степень детализации и может быть непосредственно вызван для выполнения для получения конечного результата; в то время как Worker1 выполняет эти всплывающие операции, другие Worker, такие как Worker 2, также будут созданы для удовлетворения требований параллелизма. базовая сторона очереди, где находится Worker 1, ворует задачи
Worker 2 по-прежнему выполняет pop->fork в соответствии с этим правилом и, наконец, может выполнить задачу.Предполагая, что задача Worker 1 выполняется первой и результат должен быть объединен, когда задача (4) объединяется, возможно чтобы найти, кто украл задачу с помощью подсказки.(4), в это время найдите Worker2, если у Worker2 все еще есть задачи, которые не были выполнены, Worker1 украдет их обратно, чтобы помочь с выполнением, чтобы помочь друг другу, и наконец выполнить задание быстро
вопрос души
-
Почему ForkjoinPool более эффективен? Также рекомендуете использовать commonPool?
-
Нижний слой JDK1.8 Stream полностью использует ForkJoinPool. Вы знаете, где используется ForkJoinPool?
-
Сколько слотов может быть максимум у ForkJoinPool?
-
Некоторые люди в приведенном ниже коде говорят, что ForkJoinPool нельзя использовать полностью, а для отправки нескольких задач следует использовать invokeAll. Знаете почему? Как использовать fork/join без invokeAll?
protected Long compute() {
if (任务足够小) {
return cal();
}
SumTask subtask1 = new SumTask(...);
SumTask subtask2 = new SumTask(...);
// 分别对子任务调用fork():
subtask1.fork();
subtask2.fork();
// 分别获取合并结果:
Long subresult1 = subtask1.join();
Long subresult2 = subtask2.join();
return subresult1 + subresult2;
}
Суммировать
Это еще одна длинная статья.Многие друзья в частном порядке предложили мне разделить длинную статью.С одной стороны, читатели легко ее переваривают, а с другой стороны, я сам стал высокопродуктивным в плане количества. Несколько раз хотел открыть, но многие статьи при открытии теряли непрерывность (кривая забывчивости у всех). Я не вернулся в родной город на Новый год, так что есть время писать статьи. Для того, чтобы лучше понять исходный код, в статье много основополагающего контента.Увидев это, вы должны сильно устать.Если вы хотите нанизать воедино больше разрозненных знаний, то посмотрите комментарии к коду для послевкусия, и затем поклоняйтесь Дугу Ли вместе.
Ссылаться на
- Параллельное программирование на Java на практике
- Ляо Сюэфэн.com/article/114…
- Блог Woohoo.cn на.com/Abird/Fear/Ani…
- cloud.Tencent.com/developer/ ах…