Java fork/join framework

Java

Ref

Что такое фреймворк Fork/Join

Fork/JoinФреймворк, то есть фреймворк ветвления/слияния, — это фреймворк, предусмотренный в Java 7 для параллельного выполнения задач.Это фреймворк, который делит большую задачу на несколько маленьких задач и, наконец, суммирует результаты каждой маленькой задачи для получения результаты большого дела.Fork/Joinкадр, иMapReduceПринцип аналогичен, оба реализуют параллельные вычисления, разделяя большие задачи на маленькие задачи, в основном используяразделяй и властвуйИдея реализовать многозадачные параллельные вычисления.

ForkЭто разделение большой задачи на несколько подзадач для параллельного выполнения.JoinОн состоит в том, чтобы объединить результаты выполнения этих подзадач и в итоге получить результат этой большой задачи. такие как вычисление1+2+。。+10000, который можно разделить на 10 подзадач, каждая из которых суммирует 1000 чисел, и, наконец, суммирует результаты этих 10 подзадач.Fork/JoinСхема операции выглядит следующим образом.

Fork/JoinЗадачи, созданные фреймворком, нужно пройтиForkJoinPoolначать,ForkJoinPoolЭто пул потоков, особенностью которого является то, что количество потоков устанавливается в соответствии с количеством ядер процессора.ForkJoinPoolчерез кражу работы (work-stealing) алгоритм для улучшения использования ЦП.

алгоритм кражи работы

кража работы(work-stealing) заключается в том, что поток крадет задачи из других очередей для выполнения. Блок-схема операции по краже работы выглядит следующим образом.

В каждом потоке поддерживается очередь для хранения задач, которые необходимо выполнить, а алгоритм кражи работы позволяет украсть очередь из очередей других потоков.самый последний (Oldest, хвост очереди)Это позволяет избежать конкуренции с потоком, которому принадлежит текущая задача.

Чтобы уменьшить конкуренцию между потоком задачи-ворователя и потоком задачи-крады, обычно используется двусторонняя очередь.Поток задачи-крады всегда выполняет задачу из головы двусторонней очереди, а поток задачи-крада всегда выполняет задачу из хвоста двусторонней очереди Берет задачу на выполнение.

Как показано на рисунке выше, Thread2 берет последнюю Task1 из очереди Thread1 для выполнения, а Thread1 берет для выполнения Task2, что позволяет избежать конкуренции.

Преимущества алгоритма кражи работы

  • Полное использование потоков для параллельных вычислений
  • Уменьшение конкуренции между потоками

Недостатки алгоритма кражи работы

  • В некоторых случаях будет конкуренция (только одна задача в деке)
  • потребляет больше системных ресурсов

На практике перехват работы означает, что эти задачи примерно равномерно распределяются междуForkJoinPoolиспользуется для перераспределения и балансировки задач между рабочими потоками в пуле. На рисунке ниже показан этот процесс, когда задача в очереди рабочего потока делится на две подзадачи, подзадача «крадет» простаивающий рабочий поток. Как упоминалось ранее, этот процесс может повторяться до тех пор, пока условие, указывающее, что подзадачи должны выполняться последовательно, не станет истинным.

Базовый класс фреймворка Fork/Join

Рассмотрим, как спроектироватьFork/Joinрамки, необходимо учитывать следующие 2 момента

  1. Шаг 1 Разделите задачу. Сначала нужно иметьforkКласс используется для разделения большой задачи на подзадачи.Возможно, подзадачи все еще очень велики, поэтому необходимо продолжать деление до тех пор, пока подзадачи не станут достаточно маленькими.

  2. Шаг 2 выполняет задачу и объединяет результаты. Разделенные подзадачи помещаются в двустороннюю очередь, а затем несколько потоков запуска получают выполнение задачи из двусторонней очереди. Все результаты подзадач помещаются в очередь, запускается поток для получения данных из очереди, а затем выполняется слияние данных.

Fork/JoinИспользуйте 2 класса, чтобы сделать оба вышеперечисленных

  • ForkJoinTask: мы хотим использоватьFork/JoinFramework, вы должны сначала создатьForkJoinTaskЗадача. Обеспечивает выполнение в задачахfork()а такжеjoin()механизм работы. Обычно нам не нужно прямое наследованиеForkJoinTaskclass, и нужно только наследовать его подклассы,Fork/JoinФреймворк предоставляет следующие два подкласса
    • RecursiveAction: для задач, которые не возвращают результатов.
    • RecursiveTask: Используется для задач, которые возвращают результаты.
  • ForkJoinPool:ForkJoinTaskнужно пройтиForkJoinPoolДля выполнения подзадачи, разделенные задачей, будут добавлены в двустороннюю очередь, поддерживаемую текущим рабочим потоком, и попадут в начало очереди. Когда в очереди рабочего потока нет задачи, он случайным образом получает задачу из хвоста очереди других рабочих потоков.

Кроме того, фреймворк также предоставляет следующие 2 класса

  • ForkJoinWorkerThread:ДаForkJoinPoolвнутриworker thread,воплощать в жизньForkJoinTask, который имеетForkJoinPool.WorkQueueчтобы сохранить выполнениеForkJoinTask.
  • ForkJoinPool.WorkQueue: сохранить для выполненияForkJoinTask.

Процесс выполнения фреймворка Fork/Join

  1. ForkJoinPoolКаждый рабочий поток поддерживает двустороннюю рабочую очередь (WorkQueue), в очереди хранятся задачи (ForkJoinTask).
  2. Каждый рабочий поток порождает новую задачу на лету (вызовfork()), поставить голову очереди работ (Задание в голове очереди имеет наименьшее время ожидания), а рабочие потоки обрабатывают свои собственные рабочие очереди, используяFIFOобразом, то есть каждый раз задача берется от руководителя команды на выполнение.
  3. Каждый рабочий поток, обрабатывая свою рабочую очередь, попытается украсть задание (или изpoolзадачи или рабочие очереди из других рабочих потоков), украденные задачи располагаются в конце рабочих очередей других потоков, то есть, когда рабочий поток крадет задачи из других рабочих потоков, он используетLIFOСпособ.
  4. при встречеjoin(), если необходимоjoinзадача еще не завершена, другие задачи будут обрабатываться первыми и ожидать их завершения.
  5. Засыпает, когда у него нет ни собственного квеста, ни квеста на кражу.

FIFO: первым пришел, первым вышел, первым пришел, первым ушел. LIFO: последний пришел, первый вышел, последний пришел, первый ушел.

Разветвление/присоединение с помощью демо

public class CountTest {
    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ForkJoinPool forkJoinPool = new ForkJoinPool();
        //创建一个计算任务,计算 由1加到12
        CountTask countTask = new CountTask(1, 12);
        Future<Integer> future = forkJoinPool.submit(countTask);
        System.out.println("最终的计算结果:" + future.get());
    }
}

class CountTask extends RecursiveTask<Integer> {

    private static final int THRESHOLD = 2;
    private int start;
    private int end;


    public CountTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        boolean canCompute = (end - start) <= THRESHOLD;

        //任务已经足够小,可以直接计算,并返回结果
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
            System.out.println("执行计算任务,计算    " + start + "到 " + end + "的和  ,结果是:" + sum + "   执行此任务的线程:" + Thread.currentThread().getName());

        } else { //任务过大,需要切割
            System.out.println("任务过大,切割的任务:  " + start + "加到 " + end + "的和       执行此任务的线程:" + Thread.currentThread().getName());
            int middle = (start + end) / 2;
            //切割成两个子任务
            CountTask leftTask = new CountTask(start, middle);
            CountTask rightTask = new CountTask(middle + 1, end);
            //执行子任务
            leftTask.fork();
            rightTask.fork();
            //等待子任务的完成,并获取执行结果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();
            //合并子任务
            sum = leftResult + rightResult;
        }
        return sum;
    }
}


Результат работы программы следующий

任务过大,切割的任务: 1加到 12的和 执行此任务的线程:ForkJoinPool-1-worker-1
任务过大,切割的任务: 7加到 12的和 执行此任务的线程:ForkJoinPool-1-worker-3
任务过大,切割的任务: 1加到 6的和 执行此任务的线程:ForkJoinPool-1-worker-2
执行计算任务,计算 7到 9的和 ,结果是:24 执行此任务的线程:ForkJoinPool-1-worker-3
执行计算任务,计算 1到 3的和 ,结果是:6 执行此任务的线程:ForkJoinPool-1-worker-1
执行计算任务,计算 4到 6的和 ,结果是:15 执行此任务的线程:ForkJoinPool-1-worker-1
执行计算任务,计算 10到 12的和 ,结果是:33 执行此任务的线程:ForkJoinPool-1-worker-3
最终的计算结果:78

Из результатов видно, что представленная вычислительная задача выполняется потоком 1, а поток 1 впервые разрезается на две подзадачи «с 7 по 12» и «с 1 по 6», и эти две подзадачи выполняются. Затем эти две задачи крадет поток 2 и поток 3. Во внутренней очереди потока 1 нет задач. В это время поток 2 и поток 3 также выполняют отсечение задачи и отправляют две подзадачи соответственно, поэтому поток 1 также крадет задачи (здесь все кражи из потока 2). Подзадачи).

Обработка исключений в Fork/Join Framework

ForkJoinTaskИсключения могут генерироваться во время выполнения, но мы не можем напрямую перехватывать исключения в основном потоке, поэтомуForkJoinTaskпри условииisCompletedAbnormally()метод, чтобы проверить, вызвало ли задание исключение или было отменено, и может пройтиForkJoinTaskизgetExceptionметод получает исключение. Используйте следующий код

if(task.isCompletedAbnormally()) {
   System.out.println(task.getException());
}

getExceptionметод возвращаетThrowableобъект, возвращаемый, если задача была отмененаCancellationException. Возвращает, если задача не была завершена или не было выдано исключениеnull.

FAQ

Разница между ForkJoinPool с использованием отправки и вызова отправки

  • invokeЭто синхронное выполнение.После вызова вам нужно дождаться завершения задачи, прежде чем выполнять следующий код.
  • submitвыполняется асинхронно, только когдаFutureперечислитьgetзаблокирует, когда .

В чем разница между наследованием RecursiveTask и RecursiveAction?

  • наследоватьRecursiveTask: применимо к сценариям с возвращаемыми значениями.
  • наследоватьRecursiveAction: подходит для сценариев без возвращаемого значения.

В чем разница между вызовом подзадачи fork и invokeAll?

  • fork: Позвольте дочернему потоку выполнить задачу самостоятельно, родительский поток контролирует выполнение дочернего потока и тратит родительский поток впустую.
  • invokeAll: дочерний и родительский потоки выполняют задачу вместе, что позволяет более эффективно использовать пул потоков.