Параллелизм в Java — Fork/Join Framework

Java

Статья основана на jdk1.7, на основе изучения «Искусства параллельного программирования на Java», понимания структуры Fork/Join.

Что такое фреймворк Fork/Join

Платформа Fork/Join — это платформа, предоставляемая Java 7 для параллельного выполнения задач, которая делит большую задачу на несколько небольших задач и, наконец, суммирует результаты каждой маленькой задачи для получения результатов большой задачи.

Его основная идея: разделяй и властвуй.

алгоритм кражи работы

Алгоритм кражи работы — это когда поток крадет задачи из других очередей для выполнения.

Что нужно для использования алгоритма кражи работы? Если нам нужно выполнить относительно большую задачу, мы можем разделить задачу на несколько независимых подзадач.Чтобы уменьшить конкуренцию между потоками, мы помещаем эти подзадачи в разные очереди, и назначаем их каждой очереди.Создаем отдельный поток для выполнения задачи в очереди, а поток соответствует очереди один к одному.Например, поток A отвечает за обработку задач в очереди A. Однако некоторые потоки сначала завершат задачи в своих очередях, в то время как в очередях, соответствующих другим потокам, все еще есть задачи, ожидающие обработки. Вместо ожидания поток, завершивший свою работу, может помочь другим потокам, поэтому он крадет задачу из очереди других потоков для выполнения. В это время они будут обращаться к одной и той же очереди, поэтому, чтобы уменьшить конкуренцию между крадущим потоком задачи и украденным потоком задачи, обычно используется двусторонняя очередь, и украденный поток задачи всегда будет брать задачу из голову двусторонней очереди и выполнить ее.Поток, крадущий задачу, всегда берет задачу из хвоста очереди и выполняет ее.

Преимущество алгоритма кражи работы состоит в том, что он полностью использует потоки для параллельных вычислений и уменьшает конкуренцию между потоками.Недостаток заключается в том, что в некоторых случаях все еще существует конкуренция, например, когда в двойном цикле есть только одна задача. закончилась очередь. И это потребляет больше системных ресурсов, таких как создание нескольких потоков и нескольких очередей.

вводить

Разработка фреймворка Fork/Join разделена на два этапа:

Первый шаг — разделить задачу. Прежде всего, нам нужен класс-форк, чтобы разделить большую задачу на подзадачи.Возможно, подзадачи все еще очень большие, поэтому нам нужно продолжать деление, пока подзадачи не станут достаточно маленькими.

Второй шаг выполняет задачу и объединяет результаты. Разделенные подзадачи помещаются в двусторонние очереди, а затем несколько потоков запуска получают задачи из двусторонних очередей для выполнения. Все результаты подзадач помещаются в очередь, запускается поток для получения данных из очереди, а затем данные объединяются.

Fork/Join использует два класса для выполнения обоих вышеперечисленных действий:

  • Forkjointask: Чтобы использовать Forkjoin Framework, мы должны сначала создать задачу Forkjoin. Он предоставляет механизм для выполнения вилки () и присоединения () операций в задачах. Обычно нам не нужно напрямую наследовать класс Forkjointask, но нужно только наследовать свои подклассы. Рамка Fork / Join предоставляет следующие два подкласса:

    RecursiveAction:用于没有返回结果的任务。 RecursiveTask :用于有返回结果的任务。

  • ForkJoinPool: ForkJoinTask должен выполняться через ForkJoinPool.Подзадачи, разделенные задачей, будут добавлены в двустороннюю очередь, поддерживаемую текущим рабочим потоком, и попадут в начало очереди. Когда в очереди рабочего потока нет задачи, он случайным образом получает задачу из хвоста очереди других рабочих потоков.

использовать

Рассчитано с использованием структуры Fork/Join: 1+2+3+...+100000000.

Первое, что нужно учитывать при использовании фреймворка Fork/Join, это как разделить задачу.Если мы хотим, чтобы каждая подзадача выполняла сложение до 10 000 чисел, то мы устанавливаем порог сегментации на 10 000. Поскольку это сложение 100 000 000 чисел, это не будет Для стоп-деления первый раз делится на две части, а именно 1~50000000 и 50000001~100000000, второй раз 1~50000000 продолжает делиться на 1~25000000 и 25000001~50000000, и 50000001 ~100000000 делится на 500000001~7500 75000001~100000000 ......, продолжайте делить, пока разница между началом и концом не станет меньше или равна 10000.

import java.util.concurrent.*;
public class CountTask extends RecursiveTask<Long> {

    /**
     * 阀值
     */
    private static final long THRESHOLD = 10000;
    // 开始数
    private long start;
    // 结束数
    private long end;

    public CountTask(long start, long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        long sum = 0;
        // 如果足够小就计算
        boolean canComplute = (end - start) <= THRESHOLD;
        if(canComplute) {
            for(long i = start; i <= end; i++) {
                sum += i;
            }
        } else {  // 否则,对大任务进行拆分
            // 对半分
            long middle = (start + end) /2;
            // 进行递归
            CountTask left = new CountTask(start, middle);
            CountTask right = new CountTask(middle + 1, end);
            // 执行子任务
            invokeAll(left, right);
            // 获取结果
            long lResult = left.join();
            long rRight = right.join();
            sum = lResult + rRight;
        }
        return sum;
    }

    public static void main(String[] args) {
        long s = System.currentTimeMillis();
        ForkJoinPool pool = ForkJoinPool.commonPool();
        CountTask countTask = new CountTask(1,100000000);   // 参数为起始值与结束值
        Future<Long> result = pool.submit(countTask);
        // 如果任务完成
        if(!((ForkJoinTask<Long>) result).isCompletedAbnormally()) {
            try {
                // 获取任务结果
                System.out.println("fork/join计算为:" + result.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        System.out.println("fork/join计算花费时间:" + (System.currentTimeMillis() - s) + "ms");

        s = System.currentTimeMillis();
        long sum = 0;
        for(int i = 1; i <= 100000000 ; i++) {
            sum += i;
        }
        System.out.println("计算结果:" + sum);
        System.out.println("普通计算花费时间:" + (System.currentTimeMillis() - s) + "ms");
    }
}

fork/join рассчитывается как: 5000000050000000 Время расчета fork/join: 53 мс Результат расчета: 5000000050000000 Нормальное время расчета: 55 мс

Три способа отправки задач в ForkJoinPool:

  1. execute(): выполняется асинхронно без возврата.
  2. invoke(): выполняется синхронно.После вызова вам нужно дождаться завершения задачи, прежде чем выполнять следующий код.
  3. submit(): асинхронное выполнение, когда вызывается метод get, он блокируется, и после завершения будет возвращен будущий объект для проверки состояния и текущих результатов.
ForkJoinPool commonPool = ForkJoinPool.commonPool(); 

Предоставление ссылки на общий пул с использованием предопределенного общего пула снижает потребление ресурсов, поскольку это не позволяет каждой задаче создавать отдельный пул потоков.

Проверить статус выполнения задачи

  • Независимо от того, как завершается задача, метод isDone() возвращает значение true;
  • Если завершение задачи не отменено или возникает исключение, метод isCompletedNormally() возвращает значение true;
  • Метод isCancelled() возвращает true, если задача отменена;
  • Метод isCompletedAbnormally() возвращает true, если задача была отменена или возникла исключительная ситуация.

Обработка исключений

ForkJoinTask может генерировать исключения во время выполнения, но мы не можем перехватывать исключения непосредственно в основном потоке, поэтому ForkJoinTask предоставляет метод isCompletedAbnormally() для проверки того, вызвала ли задача исключение или была ли она отменена, и может передать метод ForkJoinTask. метод получает исключение. Используйте следующий код:

if(task.isCompletedAbnormally()) {
    System.out.println(task.getException());
}

Метод getException возвращает объект Throwable или CancellationException, если задача отменена. Возвращает null, если задача не была завершена или не было создано исключение.

Отличие от ExecutorService

Fork/Join использует «режим кражи работы», при выполнении новой задачи он может разделить ее на более мелкие задачи, добавить небольшую задачу в очередь потоков, а затем украсть одну из случайного потока и поместить ее. Он присоединяется к своей собственной очереди .

Например, на двух процессорах есть разные задачи.В это время A была выполнена, а B имеет задачи, ожидающие выполнения.В это время A украдет задачу в конце команды B и добавит ее в свою Для традиционных потоков ForkJoin более эффективно использует ресурсы ЦП!

Принцип реализации

ForkJoinPool состоит из массива ForkJoinTask и массива ForkJoinWorkerThread, массив ForkJoinTask отвечает за хранение задач, отправленных программой в ForkJoinPool, а массив ForkJoinWorkerThread отвечает за выполнение этих задач.

Принцип реализации метода форка ForkJoinTask. Когда мы вызываем метод fork для ForkJoinTask, программа вызывает метод pushTask для ForkJoinWorkerThread для асинхронного выполнения задачи, а затем немедленно возвращает результат. код показывает, как показано ниже:

public final ForkJoinTask fork() {
        ((ForkJoinWorkerThread) Thread.currentThread())
            .pushTask(this);
        return this;
}

Метод pushTask сохраняет текущую задачу в очереди массива ForkJoinTask. Затем вызовите метод signalWork() в ForkJoinPool, чтобы активировать его, или создайте рабочий поток для выполнения задачи. код показывает, как показано ниже:

final void pushTask(ForkJoinTask t) {
        ForkJoinTask[] q; int s, m;
        if ((q = queue) != null) {    // ignore if queue removed
            long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
            UNSAFE.putOrderedObject(q, u, t);
            queueTop = s + 1;         // or use putOrderedInt
            if ((s -= queueBase) <= 2)
                pool.signalWork();
	else if (s == m)
                growQueue();
        }
    }

Во-первых, он вызывает метод doJoin() и использует метод doJoin() для получения статуса текущей задачи, чтобы определить, какой результат следует вернуть.Есть четыре статуса задачи: завершено (НОРМАЛЬНО), отменено (ОТМЕНЕНО), сигнал ( СИГНАЛ) и появится ИСКЛЮЧИТЕЛЬНЫЙ.

  • Если статус задачи завершен, результат задачи возвращается напрямую.
  • Если состояние задачи отменено, возникает исключение CancellationException.
  • Если состояние задачи состоит в том, чтобы вызвать исключение, соответствующее исключение создается напрямую.

Давайте еще раз проанализируем код реализации метода doJoin():

private int doJoin() {
        Thread t; 
    	ForkJoinWorkerThread w; 
    	int s; 
    	boolean completed;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
            if ((s = status) < 0)
 				return s;
            if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
                try {
                    completed = exec();
                } catch (Throwable rex) {
                    return setExceptionalCompletion(rex);
                }
                if (completed)
                    return setCompletion(NORMAL);
            }
            return w.joinTask(this);
        }
        else
            return externalAwaitDone();
    }

В методе doJoin() сначала проверьте статус задачи, чтобы увидеть, была ли задача выполнена. Если она завершена, она возвращается к статусу задачи напрямую. Если она не завершена, задача удаляется из задачи. массив и выполняется. Если задача выполнена успешно, установите статус задачи на НОРМАЛЬНЫЙ.Если возникает исключение, запишите исключение и установите статус задачи на ИСКЛЮЧИТЕЛЬНЫЙ.

Добро пожаловать, чтобы обратить внимание на мою официальную учетную запись и получать последние статьи как можно скорее ~ Выполните поиск в официальной учетной записи: Code Cafe или отсканируйте QR-код ниже:

img