Говоря о ForkJoinPool

Java

Что такое ForkJoinPool?

Когда дело доходит до пулов потоков, многие люди думают о некоторых предустановленных пулах потоков, предоставляемых Executors, таких как однопоточные пулы потоков.SingleThreadExecutorбассейн резьбы с фиксированным размеромFixedThreadPoolНо мало кто заметит, что также предусмотрен специальный пул потоков:WorkStealingPool, щелкнем в этот метод, и мы увидим, что, в отличие от других методов, этот пул потоков не проходитThreadPoolExecutorсоздавать, ноForkJoinPoolсоздавать:

    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

Между этими двумя пулами потоков нет отношения наследования, но отношение уровня:

ThreadPoolExecutor должен быть знаком с ним. Это базовый пул потоков, в котором хранятся потоки. Когда необходимо выполнить задачу, для выполнения берется поток из пула потоков. И ForkJoinPool не так уж прост и не заменяет ThreadPoolExecutor, этот пул потоков предназначен для достижения "разделяй и властвуй«Созданный по этой идее, путем разделения больших задач на маленькие задачи, а затем суммирования результатов маленьких задач получается конечный результат, который очень похож на идею MapReduce.

Например, мы хотим посчитать совокупную сумму от 1 до 100. Если мы используем ForkJoinPool для его реализации, мы можем разделить 1-100 на 5-битные секции и разделить их на 20 секций как 20 задач.Каждая задача вычисляет только сама себя. результаты в интервале, и, наконец, результаты этих 20 задач суммируются, чтобы получить кумулятивную сумму 1-100

Как использовать ForkJoinPool?

Суть ForkJoinPool в двух моментах:

  1. Если задача небольшая: рассчитать результат напрямую
  2. Если задача большая:
    • Разделить на N подзадач
    • Вызов fork() подзадачи для выполнения вычисления
    • Вызовите join() подзадач, чтобы объединить результаты

Далее, давайте сделаем пример накопления 1-100:

  1. Сначала определим задачи, которые нам нужно выполнить:
class Task extends RecursiveTask<Integer> {

    private int start;

    private int end;
    private int mid;

    public Task(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        if (end - start < 6) {
            // 当任务很小时,直接进行计算
            for (int i = start; i <= end; i++) {
                sum += i;
            }
            System.out.println(Thread.currentThread().getName() + " count sum: " + sum);
        } else {
            // 否则,将任务进行拆分
            mid = (end - start) / 2 + start;
            Task left = new Task(start, mid);
            Task right = new Task(mid + 1, end);

            // 执行上一步拆分的子任务
            left.fork();
            right.fork();

            // 拿到子任务的执行结果
            sum += left.join();
            sum += right.join();
        }

        return sum;
    }
}

здесьRecursiveTaskдаForkJoinTaskподкласс ,ForkJoinTaskСноваFutureПодкласс , если вы не знаете класс Future, его можно понимать как класс, используемый для получения результата выполнения асинхронных операций

Сначала мы определяем некоторые данные, необходимые для задачи, в классе Task, такие как начальная позиция и конечная позиция. Дело в методе вычислений, в котором реализованы только что упомянутые шаги: если задача небольшая (судя по данным задачи), выполняется вычисление, в противном случае задача разбивается, выполняется с помощью fork() и проходит через join() ), чтобы получить результат вычисления

  1. Отправить задачу в пул потоков

Мы только что определили класс задачи, а затем нам нужно отправить эту задачу в пул потоков:

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();

        Task countTask = new Task(1, 100);
        ForkJoinTask<Integer> result = forkJoinPool.submit(countTask);

        System.out.println("result: " + result.get());

        forkJoinPool.shutdown();
    }

Обратите внимание, что здесь можно передать параметр parallel при инициализации ForkJoinPool.Если этот параметр не передан, то по умолчанию в качестве параметра parallel будет использоваться количество процессоров.

После создания объекта задачи и пула потоков используйте метод submit для отправки задачи, которая возвращаетForkJoinTask<T>Тип объекта, вызовите его метод get, чтобы получить результат выполнения

В то же время следует отметить, что пул потоков также должен вызывать метод shutdown для завершения работы.

Принцип ForkJoinPool

В ForkJoinPool есть три важные роли:

  • ForkJoinWorkerThread: рабочий поток, который внутренне инкапсулирует поток.
  • WorkQueue: очередь задач
  • ForkJoinTask: задача, унаследованная от Future, разделенная на два типа: подчинение и задача по смыслу

В пуле потоков очередь задач хранится с помощью массива, в котором хранятся все отправленные задачи:

  1. существуетнечетное числорасположение
  2. существуетчетноезадача сохранения местоположения

submissionОтносится к локально отправленным задачам, таким как задачи, отправленные с помощью отправки и выполнения; а такжеtaskЭто подзадача, добавленная методом fork. Эти две задачи различаются только по смыслу, поэтому они хранятся вместе в очереди задач и различаются по местоположению.

Ядро ForkJoinPool

Если вы хотите понять принцип ForkJoinPool, вы должны понять его суть.Есть два момента.разделяй и властвуй, второйалгоритм кражи работы. Метод «разделяй и властвуй» считается излишним говорить, он заключается в улучшении параллелизма путем разделения больших задач на маленькие задачи. Главное, о чем стоит поговорить, это алгоритм кражи работы, принцип работы алгоритма:

Все потоки пытаются найти и выполнитьОтправленные задачи, или созданный другой задачейПодзадачи

Положите эту функцию, чтобы попытаться избежать ситуации, когда нить «ничего не делает» после выполнения собственной задачи. Между тем, порядок кражи это ФИФО