(удобнее просматривать исходный код на горизонтальном экране мобильного телефона)
Примечание. Часть анализа исходного кода Java основана на версии java8, если не указано иное.
Примечание. Эта статья основана на классе пула потоков ForkJoinPool «разделяй и властвуй».
Введение
С развитием и широким использованием многоядерных процессоров на аппаратном обеспечении параллельное программирование стало технологией, которую программисты должны освоить, и на собеседованиях часто проверяются знания интервьюеров о параллелизме.
Сегодня давайте рассмотрим вопрос интервью:
Как в полной мере воспользоваться преимуществами многоядерного процессора и вычислить сумму всех целых чисел в очень большом массиве?
препарировать
- Однопоточное добавление?
Самое простое, что мы можем придумать, это однопоточное добавление, цикл for выполнен.
- Пулы потоков складываются?
При дальнейшей оптимизации мы, естественно, подумаем об использовании пула потоков для добавления сегментов и, наконец, добавим результаты каждого сегмента.
- разное?
Да, наш сегодняшний главный герой — ForkJoinPool, но как его добиться? Кажется, он мало используется ^^
три реализации
Хорошо, анализ окончен, давайте посмотрим на три реализации напрямую, без чернил, и подадим напрямую.
/**
* 计算1亿个整数的和
*/
public class ForkJoinPoolTest01 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 构造数据
int length = 100000000;
long[] arr = new long[length];
for (int i = 0; i < length; i++) {
arr[i] = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE);
}
// 单线程
singleThreadSum(arr);
// ThreadPoolExecutor线程池
multiThreadSum(arr);
// ForkJoinPool线程池
forkJoinSum(arr);
}
private static void singleThreadSum(long[] arr) {
long start = System.currentTimeMillis();
long sum = 0;
for (int i = 0; i < arr.length; i++) {
// 模拟耗时,本文由公从号“彤哥读源码”原创
sum += (arr[i]/3*3/3*3/3*3/3*3/3*3);
}
System.out.println("sum: " + sum);
System.out.println("single thread elapse: " + (System.currentTimeMillis() - start));
}
private static void multiThreadSum(long[] arr) throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
int count = 8;
ExecutorService threadPool = Executors.newFixedThreadPool(count);
List<Future<Long>> list = new ArrayList<>();
for (int i = 0; i < count; i++) {
int num = i;
// 分段提交任务
Future<Long> future = threadPool.submit(() -> {
long sum = 0;
for (int j = arr.length / count * num; j < (arr.length / count * (num + 1)); j++) {
try {
// 模拟耗时
sum += (arr[j]/3*3/3*3/3*3/3*3/3*3);
} catch (Exception e) {
e.printStackTrace();
}
}
return sum;
});
list.add(future);
}
// 每个段结果相加
long sum = 0;
for (Future<Long> future : list) {
sum += future.get();
}
System.out.println("sum: " + sum);
System.out.println("multi thread elapse: " + (System.currentTimeMillis() - start));
}
private static void forkJoinSum(long[] arr) throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
// 提交任务
ForkJoinTask<Long> forkJoinTask = forkJoinPool.submit(new SumTask(arr, 0, arr.length));
// 获取结果
Long sum = forkJoinTask.get();
forkJoinPool.shutdown();
System.out.println("sum: " + sum);
System.out.println("fork join elapse: " + (System.currentTimeMillis() - start));
}
private static class SumTask extends RecursiveTask<Long> {
private long[] arr;
private int from;
private int to;
public SumTask(long[] arr, int from, int to) {
this.arr = arr;
this.from = from;
this.to = to;
}
@Override
protected Long compute() {
// 小于1000的时候直接相加,可灵活调整
if (to - from <= 1000) {
long sum = 0;
for (int i = from; i < to; i++) {
// 模拟耗时
sum += (arr[i]/3*3/3*3/3*3/3*3/3*3);
}
return sum;
}
// 分成两段任务,本文由公从号“彤哥读源码”原创
int middle = (from + to) / 2;
SumTask left = new SumTask(arr, from, middle);
SumTask right = new SumTask(arr, middle, to);
// 提交左边的任务
left.fork();
// 右边的任务直接利用当前线程计算,节约开销
Long rightResult = right.compute();
// 等待左边计算完毕
Long leftResult = left.join();
// 返回结果
return leftResult + rightResult;
}
}
}
Брат Тонг тайно сообщает вам, что на самом деле при вычислении сложения 100 миллионов целых чисел один поток является самым быстрым.Мой компьютер составляет около 100 мс, но использование пула потоков сделает его медленнее.
Итак, чтобы продемонстрировать удивительные возможности ForkJoinPool, я поставил каждое число как/3*3/3*3/3*3/3*3/3*3
После одной операции он используется для моделирования времени расчета.
Проверьте результаты:
sum: 107352457433800662
single thread elapse: 789
sum: 107352457433800662
multi thread elapse: 228
sum: 107352457433800662
fork join elapse: 189
Видно, что ForkJoinPool по-прежнему значительно улучшен по сравнению с обычными пулами потоков.
Вопрос: Может ли общий пул потоков реализовать метод вычисления ForkJoinPool, то есть большие задачи делятся на средние задачи, средние задачи делятся на маленькие задачи и, наконец, агрегируются?
Вы можете попробовать (-᷅_-᷄)
Хорошо, теперь мы официально приступаем к анализу ForkJoinPool.
разделяй и властвуй
- Основная идея
Разделите крупномасштабную проблему на более мелкие подзадачи, затем разделите и властвуйте и, наконец, объедините решения подзадач, чтобы получить решение исходной проблемы.
- шаг
(1) Разделить исходную задачу:
(2) Решите подзадачи:
(3) Объединить решения подзадач с решениями исходной задачи.
В «разделяй и властвуй» подзадачи обычно независимы друг от друга, поэтому подзадачи часто решаются путем рекурсивного вызова алгоритма.
- Типичные сценарии применения
(1) Бинарный поиск
(2) Умножение больших целых чисел
(3) Умножение матриц Штрассена
(4) Наложение шахматной доски
(5) Сортировка слиянием
(6) Быстрая сортировка
(7) Выбор линейного времени
(8) Ханойская башня
Система наследования ForkJoinPool
ForkJoinPool — это новый класс пула потоков в java 7. Его система наследования выглядит следующим образом:
И ForkJoinPool, и ThreadPoolExecutor наследуются от абстрактного класса AbstractExecutorService, поэтому это почти то же самое, что и использование ThreadPoolExecutor, за исключением того, что задача становится ForkJoinTask.
Здесь используется очень важный конструктивный принцип - принцип открытого-закрытого - закрытый для модификации и открытый для расширения.
Видно, что дизайн интерфейса всей системы пула потоков вначале очень хорош. Добавление класса пула потоков не будет мешать исходному коду, и он также может использовать исходные функции.
ForkJoinTask
два основных метода
- fork()
Метод fork() аналогичен методу потока Thread.start(), но вместо фактического запуска потока он помещает задачу в рабочую очередь.
- join()
Метод join() похож на метод Thread.join() потока, но вместо простой блокировки потока он использует рабочий поток для выполнения других задач. Когда рабочий поток вызывает метод join(), он будет обрабатывать другие задачи, пока не заметит, что целевая подзадача завершена.
три подкласса
- RecursiveAction
Задача без возвращаемого значения.
- RecursiveTask
Задача с возвращаемым значением.
- CountedCompleter
Задачи с возвращаемым значением нет, и обратный вызов может быть запущен после завершения задачи.
Внутренности ForkJoinPool
ForkJoinPool реализован внутри с использованием алгоритма «кражи работы».
(1) Каждый рабочий поток имеет свою собственную рабочую очередь WorkQueue;
(2) Это двусторонняя очередь, которая является частной для потока;
(3) Подзадача fork в ForkJoinTask будет помещена в начало очереди рабочего потока, выполняющего задачу, и рабочий поток будет обрабатывать задачи в рабочей очереди в порядке LIFO;
(4) Чтобы максимизировать использование ЦП, простаивающие потоки будут «воровать» задачи из очередей других потоков для выполнения;
(5) Воровать задачи из хвоста очереди работ, чтобы уменьшить конкуренцию;
(6) Работа двусторонней очереди: push()/pop() вызывается только в рабочем потоке своего владельца, а poll() вызывается, когда другие потоки крадут задачи;
(7) Когда останется только последнее задание, конкуренция все равно будет, что достигается через CAS;
Лучшие практики ForkJoinPool
(1) Это наиболее подходит для задач с интенсивными вычислениями.Эта статья была первоначально создана общедоступной подчиненной учетной записью «Tong Ge Reading Source Code»;
(2) ManagedBlocker можно использовать, когда необходимо заблокировать рабочий поток;
(3) ForkJoinPool.invoke()/invokeAll() не следует использовать внутри RecursiveTask;
Суммировать
(1) ForkJoinPool особенно подходит для реализации алгоритма «разделяй и властвуй»;
(2) ForkJoinPool и ThreadPoolExecutor дополняют друг друга, это не отношения между тем, кто кого заменяет, и два применимых сценария различны;
(3) ForkJoinTask имеет два основных метода — fork() и join(), а также три важных подкласса — RecursiveAction, RecursiveTask и CountedCompleter;
(4) ForkjoinPool внутренне реализован на основе алгоритма «кражи работы»;
(5) Каждый поток имеет свою собственную рабочую очередь, которая представляет собой двустороннюю очередь, получающую доступ к задачам из головы очереди, а другие потоки крадут задачи из хвоста;
(6) ForkJoinPool больше всего подходит для задач с интенсивными вычислениями, но ManagedBlocker также можно использовать для блокирующих задач;
(7) Fork() можно вызывать меньше раз внутри RecursiveTask, а текущий поток можно использовать для обработки, что является навыком;
пасхальные яйца
Как использовать ManagedBlocker?
Ответ: ManagedBlocker эквивалентен явному указанию инфраструктуре ForkJoinPool заблокировать, и ForkJoinPool запустит другой поток для выполнения задачи, чтобы максимально использовать ЦП.
Пожалуйста, посмотрите пример ниже и разберитесь сами ^^.
/**
* 斐波那契数列
* 一个数是它前面两个数之和
* 1,1,2,3,5,8,13,21
*/
public class Fibonacci {
public static void main(String[] args) {
long time = System.currentTimeMillis();
Fibonacci fib = new Fibonacci();
int result = fib.f(1_000).bitCount();
time = System.currentTimeMillis() - time;
System.out.println("result,本文由公从号“彤哥读源码”原创 = " + result);
System.out.println("test1_000() time = " + time);
}
public BigInteger f(int n) {
Map<Integer, BigInteger> cache = new ConcurrentHashMap<>();
cache.put(0, BigInteger.ZERO);
cache.put(1, BigInteger.ONE);
return f(n, cache);
}
private final BigInteger RESERVED = BigInteger.valueOf(-1000);
public BigInteger f(int n, Map<Integer, BigInteger> cache) {
BigInteger result = cache.putIfAbsent(n, RESERVED);
if (result == null) {
int half = (n + 1) / 2;
RecursiveTask<BigInteger> f0_task = new RecursiveTask<BigInteger>() {
@Override
protected BigInteger compute() {
return f(half - 1, cache);
}
};
f0_task.fork();
BigInteger f1 = f(half, cache);
BigInteger f0 = f0_task.join();
long time = n > 10_000 ? System.currentTimeMillis() : 0;
try {
if (n % 2 == 1) {
result = f0.multiply(f0).add(f1.multiply(f1));
} else {
result = f0.shiftLeft(1).add(f1).multiply(f1);
}
synchronized (RESERVED) {
cache.put(n, result);
RESERVED.notifyAll();
}
} finally {
time = n > 10_000 ? System.currentTimeMillis() - time : 0;
if (time > 50)
System.out.printf("f(%d) took %d%n", n, time);
}
} else if (result == RESERVED) {
try {
ReservedFibonacciBlocker blocker = new ReservedFibonacciBlocker(n, cache);
ForkJoinPool.managedBlock(blocker);
result = blocker.result;
} catch (InterruptedException e) {
throw new CancellationException("interrupted");
}
}
return result;
// return f(n - 1).add(f(n - 2));
}
private class ReservedFibonacciBlocker implements ForkJoinPool.ManagedBlocker {
private BigInteger result;
private final int n;
private final Map<Integer, BigInteger> cache;
public ReservedFibonacciBlocker(int n, Map<Integer, BigInteger> cache) {
this.n = n;
this.cache = cache;
}
@Override
public boolean block() throws InterruptedException {
synchronized (RESERVED) {
while (!isReleasable()) {
RESERVED.wait();
}
}
return true;
}
@Override
public boolean isReleasable() {
return (result = cache.get(n)) != RESERVED;
}
}
}
Добро пожаловать, чтобы обратить внимание на мою общедоступную учетную запись «Брат Тонг читает исходный код», проверить больше статей из серии исходного кода и поплавать в океане исходного кода с братом Тонгом.