Что такое ForkJoinPool?
Когда дело доходит до пулов потоков, многие люди думают о некоторых предустановленных пулах потоков, предоставляемых Executors, таких как однопоточные пулы потоков.SingleThreadExecutor
бассейн резьбы с фиксированным размеромFixedThreadPool
Но мало кто заметит, что также предусмотрен специальный пул потоков:WorkStealingPool
, щелкнем в этот метод, и мы увидим, что, в отличие от других методов, этот пул потоков не проходитThreadPoolExecutor
создавать, ноForkJoinPool
создавать:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
Между этими двумя пулами потоков нет отношения наследования, но отношение уровня:
Например, мы хотим посчитать совокупную сумму от 1 до 100. Если мы используем ForkJoinPool для его реализации, мы можем разделить 1-100 на 5-битные секции и разделить их на 20 секций как 20 задач.Каждая задача вычисляет только сама себя. результаты в интервале, и, наконец, результаты этих 20 задач суммируются, чтобы получить кумулятивную сумму 1-100
Как использовать ForkJoinPool?
Суть ForkJoinPool в двух моментах:
- Если задача небольшая: рассчитать результат напрямую
-
Если задача большая:
- Разделить на N подзадач
- Вызов fork() подзадачи для выполнения вычисления
- Вызовите join() подзадач, чтобы объединить результаты
Далее, давайте сделаем пример накопления 1-100:
- Сначала определим задачи, которые нам нужно выполнить:
class Task extends RecursiveTask<Integer> {
private int start;
private int end;
private int mid;
public Task(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
if (end - start < 6) {
// 当任务很小时,直接进行计算
for (int i = start; i <= end; i++) {
sum += i;
}
System.out.println(Thread.currentThread().getName() + " count sum: " + sum);
} else {
// 否则,将任务进行拆分
mid = (end - start) / 2 + start;
Task left = new Task(start, mid);
Task right = new Task(mid + 1, end);
// 执行上一步拆分的子任务
left.fork();
right.fork();
// 拿到子任务的执行结果
sum += left.join();
sum += right.join();
}
return sum;
}
}
здесьRecursiveTask
даForkJoinTask
подкласс ,ForkJoinTask
СноваFuture
Подкласс , если вы не знаете класс Future, его можно понимать как класс, используемый для получения результата выполнения асинхронных операций
Сначала мы определяем некоторые данные, необходимые для задачи, в классе Task, такие как начальная позиция и конечная позиция. Дело в методе вычислений, в котором реализованы только что упомянутые шаги: если задача небольшая (судя по данным задачи), выполняется вычисление, в противном случае задача разбивается, выполняется с помощью fork() и проходит через join() ), чтобы получить результат вычисления
- Отправить задачу в пул потоков
Мы только что определили класс задачи, а затем нам нужно отправить эту задачу в пул потоков:
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
Task countTask = new Task(1, 100);
ForkJoinTask<Integer> result = forkJoinPool.submit(countTask);
System.out.println("result: " + result.get());
forkJoinPool.shutdown();
}
Обратите внимание, что здесь можно передать параметр parallel при инициализации ForkJoinPool.Если этот параметр не передан, то по умолчанию в качестве параметра parallel будет использоваться количество процессоров.
После создания объекта задачи и пула потоков используйте метод submit для отправки задачи, которая возвращаетForkJoinTask<T>
Тип объекта, вызовите его метод get, чтобы получить результат выполнения
В то же время следует отметить, что пул потоков также должен вызывать метод shutdown для завершения работы.
Принцип ForkJoinPool
В ForkJoinPool есть три важные роли:
- ForkJoinWorkerThread: рабочий поток, который внутренне инкапсулирует поток.
- WorkQueue: очередь задач
- ForkJoinTask: задача, унаследованная от Future, разделенная на два типа: подчинение и задача по смыслу
В пуле потоков очередь задач хранится с помощью массива, в котором хранятся все отправленные задачи:
- существуетнечетное числорасположение
- существуетчетноезадача сохранения местоположения
submission
Относится к локально отправленным задачам, таким как задачи, отправленные с помощью отправки и выполнения; а такжеtask
Это подзадача, добавленная методом fork. Эти две задачи различаются только по смыслу, поэтому они хранятся вместе в очереди задач и различаются по местоположению.
Ядро ForkJoinPool
Если вы хотите понять принцип ForkJoinPool, вы должны понять его суть.Есть два момента.разделяй и властвуй, второйалгоритм кражи работы. Метод «разделяй и властвуй» считается излишним говорить, он заключается в улучшении параллелизма путем разделения больших задач на маленькие задачи. Главное, о чем стоит поговорить, это алгоритм кражи работы, принцип работы алгоритма:
Все потоки пытаются найти и выполнитьОтправленные задачи, или созданный другой задачейПодзадачи
Положите эту функцию, чтобы попытаться избежать ситуации, когда нить «ничего не делает» после выполнения собственной задачи. Между тем, порядок кражи это ФИФО