Разделяй и властвуй Платформа Fork-Join

Java

Что такое форк-джойн

Платформа Fork/Join — это платформа, предоставляемая Java 7 для параллельного выполнения задач, которая делит большую задачу на несколько небольших задач и, наконец, суммирует результаты каждой маленькой задачи для получения результатов большой задачи. Метод разработки также называется программированием «разделяй и властвуй». Программирование по принципу «разделяй и властвуй» может значительно использовать ресурсы процессора и повысить эффективность выполнения задач, а также является передовой технологией, связанной с многопоточностью.

Каковы проблемы с традиционным программированием «разделяй и властвуй»?

Выше был введен принцип «разделяй и властвуй», который заключается в том, чтобы разбивать большие задачи на маленькие задачи для выполнения. Кажется, что добиться этого не сложно! Зачем делать новый фреймворк специально?

Давайте сначала посмотрим, как использовать обычный пул потоков без использования инфраструктуры Fork-Join.

  1. Мы отправляем большую задачу в пул потоков и указываем порог разделения задачи.
  2. Поток в пуле потоков (при условии, что это поток A) выполняет большую задачу, и обнаруживается, что размер большой задачи превышает пороговое значение, поэтому она делится на две подзадачи, и для этого вызывается функция submit(). отправить его в пул потоков, и будет получено будущее возвращенной подзадачи.
  3. Поток A вызывает метод get() возвращенного Future для блокировки и ожидания результата выполнения подзадачи.
  4. Остальные потоки в пуле (кроме потока А, который заблокирован) выполняют две подзадачи, а затем определяют, превышает ли размер подзадачи пороговое значение, если превышает, продолжаем нарезку согласно шагу 2, иначе вычисляем и возвращаем результат.

Хз, вроде все нормально. Действительно? Не забывайте, что поток каждой задачи резки (например, поток A) блокируется до тех пор, пока его подзадачи не будут завершены, и может продолжать выполняться. Если задача слишком велика и ее нужно сократить несколько раз, тогда несколько потоков будут заблокированы, и производительность быстро снизится. Что еще хуже, если ваш пул потоков имеет верхний предел количества потоков, очень вероятно, что все потоки в пуле будут заблокированы, и пул потоков не сможет выполнять задачи.

Проблема блокировки, когда общий пул потоков реализует разделяй и властвуй

public class CountTest {
    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ForkJoinPool forkJoinPool = new ForkJoinPool();
        //创建一个计算任务,计算 由1加到12
        CountTask countTask = new CountTask(1, 12);
        Future<Integer> future = forkJoinPool.submit(countTask);
        System.out.println("最终的计算结果:" + future.get());
    }
}

class CountTask extends RecursiveTask<Integer> {

    private static final int THRESHOLD = 2;
    private int start;
    private int end;


    public CountTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        boolean canCompute = (end - start) <= THRESHOLD;

        //任务已经足够小,可以直接计算,并返回结果
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
            System.out.println("执行计算任务,计算    " + start + "到 " + end + "的和  ,结果是:" + sum + "   执行此任务的线程:" + Thread.currentThread().getName());
            return sum;
        } else { //任务过大,需要切割
            System.out.println("任务过大,切割的任务:  " + start + "加到 " + end + "的和       执行此任务的线程:" + Thread.currentThread().getName());
            int middle = (start + end) / 2;
            //切割成两个子任务
            CountTask leftTask = new CountTask(start, middle);
            CountTask rightTask = new CountTask(middle + 1, end);
            //执行子任务
            leftTask.fork();
            rightTask.fork();
            //等待子任务的完成,并获取执行结果
            invokeAll(leftTask,rightTask);
//            int leftResult = leftTask.join();
//            int rightResult = rightTask.join();
            //合并子任务
//            sum = leftResult + rightResult;
//            return sum;
            return leftTask.join()+rightTask.join();
        }

    }
}

результат операции:

切割的任务:1加到10   执行此任务的线程是 pool-1-thread-1
切割的任务:1加到5   执行此任务的线程是 pool-1-thread-2
切割的任务:6加到10   执行此任务的线程是 pool-1-thread-3

В пуле всего три потока, после трехкратного разделения задач все потоки в пуле блокируются, не могут выполнять какие-либо задачи и остаются зависшими.

Для решения этой проблемы появляются алгоритмы кражи рабочих мест.

алгоритм кражи работы

Для вышеуказанной проблемы инфраструктура Fork-Join использует алгоритм «кражи работы». Алгоритм кражи работы — это когда поток крадет задачи из других очередей для выполнения. Объяснение алгоритма кражи работы в The Art of Java Concurrent Programming:

Каковы преимущества использования алгоритма кражи работы? Если нам нужно выполнить относительно большую задачу, мы можем разделить задачу на несколько независимых подзадач.Чтобы уменьшить конкуренцию между потоками, мы помещаем эти подзадачи в разные очереди, и назначаем их каждой очереди.Создаем отдельный поток для выполнения задачи в очереди, а поток соответствует очереди один к одному.Например, поток A отвечает за обработку задач в очереди A. Однако некоторые потоки сначала завершат задачи в своих очередях, в то время как в очередях, соответствующих другим потокам, все еще есть задачи, ожидающие обработки. Вместо ожидания поток, завершивший свою работу, может помочь другим потокам, поэтому он крадет задачу из очереди других потоков для выполнения. В это время они будут обращаться к одной и той же очереди, поэтому, чтобы уменьшить конкуренцию между крадущим потоком задачи и украденным потоком задачи, обычно используется двусторонняя очередь, и украденный поток задачи всегда будет брать задачу из голову двусторонней очереди и выполнить ее.Поток, крадущий задачу, всегда берет задачу из хвоста очереди и выполняет ее.

Фреймворк Fork-Join использует алгоритм кражи работы.

  1. Пул потоков инфраструктуры Fork-Join Задачи ForkJoinPool делятся на «внешние задачи» и «внутренние задачи».
  2. «Внешние задачи» помещаются в глобальную очередь ForkJoinPool;
  3. Каждый поток в пуле ForkJoinPool поддерживает внутреннюю очередь для «внутренних задач».
  4. Подзадачи, полученные задачей нарезания резьбы, будут помещены во внутреннюю очередь как «внутренние задачи».
  5. Когда этот поток хочет получить результат вычисления подзадачи, он сначала определяет, была ли подзадача завершена. Если он не завершен, он определяет, была ли подзадача «украдена» другими потоками. будут выполнены другие задачи во "внутренней очереди" этого потока, или сканирование других очередей задач, воровство задач, если подзадача не украдена, она будет выполнена этим потоком.
  6. Наконец, когда поток завершил свою «внутреннюю задачу» и находится в состоянии ожидания, он будет сканировать другие очереди задач и забирать задачи.

Преимущества алгоритмов кражи работы

Преимущества алгоритма кражи работы в структуре Fork-Join можно резюмировать в следующих двух моментах:

  1. Поток не будет заблокирован в ожидании или приостановлен, потому что он ожидает завершения подзадачи или нет внутренней задачи для выполнения, но будет сканировать все очереди, забирать задачи и будет приостановлен до тех пор, пока все очереди не опустеют.
  2. Инфраструктура Fork-Join может обеспечить хорошую параллельную производительность в многопроцессорной среде. В случае использования общего пула потоков, когда ЦП больше не является узким местом производительности, несколько потоков могут выполняться параллельно, но производительность не может быть улучшена из-за взаимоисключающего доступа к очереди задач. Платформа Fork-Join поддерживает внутреннюю очередь задач и глобальную очередь задач для каждого потока, а очереди задач являются двунаправленными очередями, которые могут получать задачи с обоих концов, что значительно снижает вероятность конкуренции.Повышение параллельной производительности.

Введение в использование фреймворка Fork-Join

Fork/Join имеет три основных класса:

  1. ForkJoinPool: пул потоков для выполнения задач, наследующий класс AbstractExecutorService.
  2. ForkJoinWorkerThread: рабочий поток, выполняющий задачу (то есть поток в пуле потоков ForkJoinPool). Каждый поток поддерживает внутреннюю очередь для «внутренних задач». Унаследован от класса Thread.
  3. ForkJoinTask: абстрактный класс задачи для ForkJoinPool. Реализует интерфейс будущего

Поскольку ForkJoinTask более сложен и имеет много абстрактных методов, при повседневном использовании ForkJoinTask обычно не наследуется для реализации пользовательских задач, но наследуются два подкласса ForkJoinTask для реализации метода calculate():

RecursiveTask: 子任务带返回结果时使用
RecursiveAction: 子任务不带返回结果时使用

Режим реализации метода вычисления обычно следующий:

if 任务足够小
    直接返回结果
else
    分割成N个子任务
    依次调用每个子任务的fork方法执行子任务
    依次调用每个子任务的join方法合并执行结果

Демонстрация примера Fork-Join

Вычислите результат 1+2+....+12.

Первое, что нужно учитывать при использовании фреймворка Fork/Join, это как разделить задачу.Если мы хотим, чтобы каждая подзадача выполняла сложение не более двух чисел, то мы устанавливаем порог сегментации равным 2, так как это сложение 12 числа. В то же время обратите внимание на имя потока, выполняющего задачу, чтобы понять реализацию алгоритма кражи работы.

public class CountTest {
    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ForkJoinPool forkJoinPool = new ForkJoinPool();
        //创建一个计算任务,计算 由1加到12
        CountTask countTask = new CountTask(1, 12);
        Future<Integer> future = forkJoinPool.submit(countTask);
        System.out.println("最终的计算结果:" + future.get());
    }
}

class CountTask extends RecursiveTask<Integer> {

    private static final int THRESHOLD = 2;
    private int start;
    private int end;


    public CountTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        boolean canCompute = (end - start) <= THRESHOLD;

        //任务已经足够小,可以直接计算,并返回结果
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
            System.out.println("执行计算任务,计算    " + start + "到 " + end + "的和  ,结果是:" + sum + "   执行此任务的线程:" + Thread.currentThread().getName());

        } else { //任务过大,需要切割
            System.out.println("任务过大,切割的任务:  " + start + "加到 " + end + "的和       执行此任务的线程:" + Thread.currentThread().getName());
            int middle = (start + end) / 2;
            //切割成两个子任务
            CountTask leftTask = new CountTask(start, middle);
            CountTask rightTask = new CountTask(middle + 1, end);
            //执行子任务
            leftTask.fork();
            rightTask.fork();
            //等待子任务的完成,并获取执行结果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();
            //合并子任务
            sum = leftResult + rightResult;
        }
        return sum;
    }
}

результат операции:

任务过大,切割的任务: 1加到 12的和 执行此任务的线程:ForkJoinPool-1-worker-1
任务过大,切割的任务: 7加到 12的和 执行此任务的线程:ForkJoinPool-1-worker-3
任务过大,切割的任务: 1加到 6的和 执行此任务的线程:ForkJoinPool-1-worker-2
执行计算任务,计算 7到 9的和 ,结果是:24 执行此任务的线程:ForkJoinPool-1-worker-3
执行计算任务,计算 1到 3的和 ,结果是:6 执行此任务的线程:ForkJoinPool-1-worker-1
执行计算任务,计算 4到 6的和 ,结果是:15 执行此任务的线程:ForkJoinPool-1-worker-1
执行计算任务,计算 10到 12的和 ,结果是:33 执行此任务的线程:ForkJoinPool-1-worker-3
最终的计算结果:78

Из результатов видно, что представленная вычислительная задача выполняется потоком 1, причем поток 1 впервые разрезается на две подзадачи «7 по 12» и «1 по 6», и эти две подзадачи подаются. Затем эти две задачи украдены потоком 2 и потоком 3. Во внутренней очереди потока 1 больше нет задач. В это время поток 2 и поток 3 также выполняют отсечение задачи и отправляют две подзадачи соответственно, поэтому поток 1 также ворует задачи (здесь все украдены у потока 2. ) Подзадачи).

Демонстрация RecursiveAction

Пройдите по указанному каталогу (включая подкаталоги), чтобы найти файл указанного типа.

public class FindDirsFiles extends RecursiveAction{

    /**
     * 当前任务需要搜寻的目录
     */
    private File path;

    public FindDirsFiles(File path) {
        this.path = path;
    }

    public static void main(String [] args){
        try {
            // 用一个 ForkJoinPool 实例调度总任务
            ForkJoinPool pool = new ForkJoinPool();
            FindDirsFiles task = new FindDirsFiles(new File("D:/"));

            //异步调用
            pool.execute(task);

            System.out.println("Task is Running......");
            Thread.sleep(1);
            int otherWork = 0;
            for(int i=0;i<1000000;i++){
                otherWork = otherWork+i;
            }
            System.out.println("Main Thread done sth......,otherWork=" + otherWork);
            //阻塞的方法
            task.join();
            System.out.println("Task end");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

	@Override
	protected void compute() {
		
		List<FindDirsFiles> subTasks = new ArrayList<>();
		
		File[] files = path.listFiles();
		if(files!=null) {
			for(File file:files) {
				if(file.isDirectory()) {
					subTasks.add(new FindDirsFiles(file));
				}else {
					//遇到文件,检查
					if(file.getAbsolutePath().endsWith("txt")) {
						System.out.println("文件:"+file.getAbsolutePath());
					}
				}
			}
			if(!subTasks.isEmpty()) {
                for (FindDirsFiles subTask : invokeAll(subTasks)) {
                    //等待子任务执行完成
                    subTask.join();
                }
			}
		}


		
	}
}