Сравнение и выбор ForkJoinPool и ThreadPoolExecutor

Java

В дополнение к общему ThreadPoolExecutor, Java также предоставляет специальный пул потоков ForkJoinPool. Этот класс примерно подобен классу ThreadPoolExecutor и реализует интерфейсы Executor и ExecutorService. При использовании этих интерфейсов ForkJoinPool использует неограниченную очередь для хранения задач, которые выполняются числом потоков, указанным в конструкторе пула потоков. Если количество потоков не задано, по умолчанию используется количество потоков, доступных на текущем компьютере, или количество ЦП, настроенное в контейнере Docker.

ForkJoinPool часто используется для реализации алгоритмов «разделяй и властвуй», которые разлагают задачу на несколько подзадач с аддитивностью, а затем выполняют эти подзадачи параллельно, Наконец, выполняют операцию агрегирования результатов работы подзадач для получения окончательного результата. например, алгоритм быстрой сортировки.

При использовании алгоритма «разделяй и властвуй» следует отметить, что он имеет тенденцию создавать большое количество задач, но вы вряд ли создадите столько потоков, сколько задач для их выполнения. Например, для сортировки массива из 10 миллионов элементов процесс декомпозиции подзадач выглядит следующим образом: разбить массив пополам, отсортировать, а затем объединить два подмассива один раз Этот процесс может быть рекурсивным, пока длина подмассива не станет нечетной или длина уже маленькая.

Предполагая декомпозицию до длины подмассива

Нетрудно обнаружить, что пока дочерняя задача не будет завершена, ее родительская задача не может быть выполнена, и если мы используем ThreadPoolExecutor для реализации этого алгоритма, производительность будет довольно низкой. Однако потоку в ForkJoinPool не нужно ждать завершения подзадачи: когда задача приостановлена, он может выполнять другие отложенные задачи.

Возьмем простой пример: Сейчас есть массив типа double, и нам нужно посчитать количество элементов в массиве меньше 0,5.Для решения этой задачи мы используем стратегию разделяй и властвуй.

public class TestForkJoinPool {
    private static double[] d;
    private class ForkJoinTask extends RecursiveTask<Integer> {
        private int first;
        private int last;
        public ForkJoinTask(int first, int last) {
            this.first = first;
            this.last = last;
        }
        @Override
        protected Integer compute() {
            int subCount = 0;
            if (last - first < 10) {
                for (int i = first; i <= last; i++) {
                    if (d[i] < 0.5) {
                        subCount++;
                    }
                }
                return subCount;
            } else {
                int mid = (first + last) >>> 1;
                ForkJoinTask left = new ForkJoinTask(first, mid);
                left.fork();
                ForkJoinTask right = new ForkJoinTask(mid + 1, last); 
               right.fork(); 
               subCount = left.join(); 
               subCount += right.join();
            }
            return subCount;
        }
    }

Ключевыми здесь являются методы fork() и join().Такого рода рекурсия не может быть достигнута с помощью ThreadPoolExecutor. Эти два метода используют серию внутренних очередей для каждого потока для выполнения задач, а также реализуют переключение между задачами, выполняемыми потоками. Эти детали прозрачны для разработчика. Итак, как выбрать классы ForkJoinPool и ThreadPoolExecutor в практических приложениях?

Прежде всего, метод fork/join приводит к приостановке выполнения задачи, в результате чего для выполнения всех задач требуется всего несколько потоков. Если вы передадите массив из 2 миллионов элементов в приведенном выше коде, будет целых 4 миллиона задач, но для их выполнения потребуется всего несколько потоков или даже один. Для выполнения аналогичной задачи с помощью ThreadPoolExecutor потребуется 4 миллиона потоков, поскольку каждый поток должен дождаться завершения своих подзадач, которые могут завершиться только в том случае, если в пуле потоков доступны дополнительные потоки. Таким образом, пауза fork/join позволяет нам использовать алгоритмы, которые мы не могли бы использовать в противном случае, что является большим преимуществом в производительности.

Конечно, сценарии использования в примерах редко встречаются в реальном производстве и на самом деле чаще используются в следующих сценариях:

  • Слияние наборов результатов (не простое накопление в примере).
  • При разработке алгоритма вполне можно ограничить количество задач.

В других случаях проще разделить массив на несколько частей, а затем использовать ThreadPoolExecutor для открытия нескольких потоков для обхода подмассивов.Например, используйте пул потоков с числом основных потоков и максимальным числом потоков, равным 4, и LinkedBlockingQueue в качестве очереди задач, разделите ее на 4 поровну и используйте 4 потока для обхода 4 подмассивов, чтобы не создавалось слишком много задач, а производительность была более впечатляющей. Вот тестовое сравнение:

количество потоков ForkJoinPool ThreadPoolExecutor
1 285±15ms          5ms
4 86±20ms          1ms

Основная причина такого большого разрыва заключается в том, что алгоритм «разделяй и властвуй» генерирует большое количество объектов задач, а накладные расходы на управление этими объектами задач снижают производительность ForkJoinPool, а также влияют на GC. Так что этого следует избегать, если есть другие альтернативы.

кража работы

Как упоминалось выше, первый принцип использования ForkJoinPool — обеспечить разумное разделение задач. Помимо приостановки задач, у него есть еще одна более мощная функция, заключающаяся в том, что он реализует кражу работы. У каждого потока в своем пуле есть своя выделенная очередь задач. Поток будет расставлять приоритеты для задач в своей собственной очереди. Если очередь пуста, он будет искать задачи в очередях других потоков. Таким образом, даже если выполнение одной из 4 миллионов задач занимает много времени, другие потоки в ForkJoinPool могут выполнить остальные. ThreadPoolExecutor не может этого сделать, если это произойдет с ним, другие потоки не смогут взять на себя дополнительные задачи.

Затем измените исходный пример, чтобы значение элемента в массиве менялось в соответствии с его индексом.

for (int i = first; i <= last; i++) {
    if (d[i] < 0.5) {
        subCount++;
    }
    for (int j = 0; j < i; j++) {
        d[i] += j;
    }
}

Поскольку внешний цикл основан на позиции элемента в массиве, время вычисления будет пропорционально позиции элемента.Например, вычисление значения d[0] будет очень быстрым, но вычисление d[d.length -1] требует больше времени.

В этом сценарии, если используется ThreadPoolExecutor и массив делится на 4 части для расчета, время расчета четвертого подмассива (при условии последовательного деления) намного больше, чем время расчета первого подмассива. Как только поток, вычисляющий первый подмассив, завершит свою задачу, он перейдет в состояние ожидания.

Если вместо этого вы используете ForkJoinPool, хотя один поток застрянет в вычислении четвертого массива, другие потоки могут продолжать работать, ничего не делая. Ниже приведены результаты тестового сравнения:

количество потоков ForkJoinPool ThreadPoolExecutor
1 31±3s 30±3s
4 6±1s 10±2s

Если используется только один поток, результаты двух в основном одинаковы. Когда количество потоков достигает 4, ForkJoinPool имеет определенное преимущество. Когда некоторые задачи в серии задач занимают больше времени, чем другие, это может привести к несбалансированной ситуации,Можно сделать вывод, что когда задачу можно разделить на набор сбалансированной эффективности выполнения, разделение и использование ThreadPoolExecutor даст лучшую производительность, в противном случае больше подходит ForkJoinPool.

На самом деле здесь можно провести дальнейшую настройку производительности, но это с уклоном в сторону алгоритмического уровня: понять, когда заканчивать рекурсию. В приведенном выше примере рекурсия заканчивается, когда размер массива меньше 10. Но когда эффективность исполнения уравновешена, правильнее остановиться на 500 000.

Однако в случае дисбаланса меньшие подмассивы добьются лучшей производительности или продолжат использовать приведенный выше пример, где значения элементов в массиве будут изменяться в соответствии с их индексами, а ниже приведены результаты теста (в порядке чтобы сэкономить время и сократить до 200 000 элементов):

размер подмассива ForkJoinPool
100000 17988±100ms
50000 10613±100ms
10000 4964±100ms
1000 3940±100ms
100 3735±100ms
10 3687±100ms

Такая корректировка значений листа распространена в таких алгоритмах. Реализация быстрой сортировки Java со значением листа 47.

автоматический параллелизм

Java имеет возможность автоматически распараллеливать определенные типы кода, и эта возможность зависит от ForkJoinPool. JVM создаст для этого общий пул потоков fork-join, который представляет собой статический объект класса ForkJoinPool, размер которого по умолчанию равен количеству доступных процессоров на машине.

Среди методов класса Arrays распространен такой вид автоматического параллелизма, как сортировка массива по алгоритму быстрой сортировки, методы, работающие с каждым элементом массива и так далее. Также полезно при потоковой обработке, когда с каждым элементом коллекции можно работать (последовательно или параллельно).

Вот пример, который создает коллекцию пользовательских объектов, а затем вычисляет коэффициент активности для каждого пользователя:

List<User> users= ...;
Stream<User> stream = users.parallelStream();
stream.forEach(u -> {
    int val=calculate(u);
    ...
});

Метод foreach() создаст задачу для каждого пользовательского объекта, а затем каждая задача будет обработана общим ForkJoinPool в JVM.

Изменение размера общедоступного ForkJoinPool так же важно, как и изменение размера других пулов потоков. По умолчанию в общем пуле потоков столько потоков, сколько доступно ЦП на машине. Если на машине работает несколько JVM, часто необходимо ограничить количество потоков, чтобы JVM не конкурировали за ресурсы. Точно так же, если сервер хочет выполнять другие запросы параллельно, но вы хотите убедиться, что для этого достаточно ресурсов ЦП, вы можете рассмотреть возможность уменьшения количества потоков в общем пуле потоков. Конечно, если задачи в общем пуле часто блокируют ожидание ввода-вывода, может возникнуть необходимость увеличить размер общего пула.

Чтобы настроить размер общего пула, это можно сделать, изменив системное свойство Java Djava.util.concurrent.ForkJoinPool.common.parallelism=N. Это имеет определенную связь с версией.До 192 версии Java8 нужно ставить вручную.Можно использовать следующие способы

ForkJoinPool.commonPool().getParallelism()

Чтобы просмотреть размер текущего общедоступного пула, обратите внимание, чтоНастраивать этот метод во время работы бесполезно., вы должны изменить его перед загрузкой класса ForkJoinPool.

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","20");

Здесь следует отметить еще одну вещь: метод foreach() использует как поток в выполняемом операторе, так и поток в общем пуле для обработки элементов в потоке. Поэтому, если вам нужно настроить размер общего пула при использовании параллельных потоков или других методов автоматического распараллеливания, вы можете уменьшить ожидаемое значение на 1.

Ссылка: "Производительность OReilly.Java"