Параллельные потоки Java8: быстрое исполнение!

Java

До Java 7, если мы хотели обрабатывать коллекцию параллельно, нам требовались следующие шаги:

1. Вручную разделить на разделы

2. Создайте тему для каждой части

3. Объединяйте, когда это уместно

А также необходимо обратить внимание на изменение общих переменных между несколькими потоками. А Java8 предоставляет нам параллельные потоки, которые могут включать параллельный режим одним кликом. Разве это не круто? Посмотрим.

параллельный поток

Распознавать и открывать параллельные потоки

Что такое параллельная потоковая передача: Параллельная потоковая передача — это поток, который делит содержимое потока на несколько фрагментов данных и обрабатывает каждый фрагмент данных отдельно с помощью разных потоков. Например, есть такое требование:

Существует коллекция списка, и каждый объект Apple в списке имеет только вес. Мы также знаем, что цена единицы Apple составляет 5 юаней / кг. Теперь нам нужно рассчитать цену единицы каждого яблока. Традиционный путь выглядит следующим образом :

List<Apple> appleList = new ArrayList<>(); // 假装数据是从库里查出来的

for (Apple apple : appleList) {
  apple.setPrice(5.0 * apple.getWeight() / 1000);
}

Мы завершаем вычисление цены каждого яблока, просматривая объекты яблок в списке через итератор. Временная сложность этого алгоритма равна O(list.size()). По мере увеличения размера списка затраты времени также будут увеличиваться линейно. параллельный поток

Это время можно значительно сократить. Способ параллельной обработки коллекции следующий:

appleList.parallelStream().forEach(apple -> apple.setPrice(5.0 * apple.getWeight() / 1000));

Отличие от обычных потоков заключается в вызываемом здесь методе parallelStream(). Конечно, также возможно преобразовать обычный поток в параллельный поток с помощью stream.parallel() . Параллельные потоки также могут быть преобразованы в последовательные потоки с помощью метода sequence().

Но будьте осторожны: параллельные и последовательные преобразования потоков не вносят реальных изменений в сами потоки, а просто помечают их. И выполнять несколько параллельных/последовательных преобразований потока в конвейере, которые вступают в силу при последнем вызове метода.

Параллельные потоки настолько удобны, откуда берутся их потоки? Сколько их там? Как настроить?

Пул потоков ForkJoinPool по умолчанию используется внутри параллельного потока. Количество потоков по умолчанию — это количество ядер процессора, и настройте свойства ядра системы:

java.util.concurrent.ForkJoinPool.common.parallelism может изменить размер пула потоков. Но значение является глобальной переменной. Его изменение влияет на все параллельные потоки. В настоящее время нет возможности настроить выделенное количество потоков на поток. Вообще говоря, количество ядер процессора — хороший выбор.

Проверка производительности параллельных потоков

Чтобы проще было протестировать производительность, мы позволяем потоку спать в течение 1 с после каждого расчета цены яблока, указывая, что другие операции, связанные с вводом-выводом, выполнялись в течение этого периода, и выводим время, затрачиваемое на выполнение программы, и время- потребление последовательного выполнения:

public static void main(String[] args) throws InterruptedException {
    List<Apple> appleList = initAppleList();

    Date begin = new Date();
    for (Apple apple : appleList) {
        apple.setPrice(5.0 * apple.getWeight() / 1000);
        Thread.sleep(1000);
    }
    Date end = new Date();
    log.info("苹果数量:{}个, 耗时:{}s", appleList.size(), (end.getTime() - begin.getTime()) /1000);
}

Параллельная версия

List<Apple> appleList = initAppleList();

Date begin = new Date();
appleList.parallelStream().forEach(apple ->
     {
         apple.setPrice(5.0 * apple.getWeight() / 1000);
         try {
             Thread.sleep(1000);
          }catch (InterruptedException e) {
                                           e.printStackTrace();
          }
      });
Date end = new Date();
log.info("苹果数量:{}个, 耗时:{}s", appleList.size(), (end.getTime() - begin.getTime()) /1000);

кропотливый

В соответствии с нашим прогнозом, мой компьютер представляет собой четырехъядерный процессор I5.После включения распараллеливания каждый из четырех процессоров выполняет поток, и задача выполняется за последние 1 с!

Можно ли случайно использовать параллельные потоки?

Разделяемость влияет на скорость потока

Через приведенный выше тест некоторые люди легко придут к выводу: параллельный поток очень быстр, мы можем полностью отказаться от внешней итерации foreach/fori/iter и использовать внутреннюю итерацию, предоставляемую Stream.

Это действительно так? Действительно ли параллельные потоки настолько совершенны? Ответ, конечно, нет. Вы можете скопировать приведенный ниже код и протестировать его на своем компьютере. После тестирования можно обнаружить, что параллельные потоки не всегда являются самым быстрым способом обработки.

1. Для первых n чисел, обрабатываемых итерационным методом, независимо от того, параллельный он или нет, он всегда медленнее, чем цикл.Под непараллельным вариантом можно понимать медлительность, вызванную тем, что потоковая операция больше склоняюсь к нижнему слою без петли. Почему распараллеливаемая версия работает медленно? Здесь следует отметить два момента:

  • генерация итерации представляет собой объект в штучной упаковке, вы должны распаковать в цифровую сумму, чтобы

  • Нам сложно разделить итерацию на несколько независимых блоков для параллельного выполнения.

Этот вопрос интересен, и мы должны понимать, что некоторые потоковые операции легче распараллелить, чем другие. Для итерации каждое применение этой функции зависит от результата предыдущего приложения.

Таким образом, в этом случае мы не только не можем эффективно разделить поток на небольшие фрагменты для обработки. Наоборот, это еще и увеличивает стоимость из-за распараллеливания.

2. Для метода LongStream.rangeClosed() второй болевой точки итерации не существует. Он генерирует значения базовых типов без операций распаковки и может напрямую разбивать число 1 - n, которое нужно сгенерировать, на 1 - n/4, 1n/4 - 2n/4, ... 3n/4 - n таких четырех части. Таким образом, rangeClosed() в параллельном режиме выполняется быстрее, чем итерация вне цикла for:

package lambdasinaction.chap7;

import java.util.stream.*;

public class ParallelStreams {

    public static long iterativeSum(long n) {
        long result = 0;
        for (long i = 0; i <= n; i++) {
            result += i;
        }
        return result;
    }

    public static long sequentialSum(long n) {
        return Stream.iterate(1L, i -> i + 1).limit(n).reduce(Long::sum).get();
    }

    public static long parallelSum(long n) {
        return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(Long::sum).get();
    }

    public static long rangedSum(long n) {
        return LongStream.rangeClosed(1, n).reduce(Long::sum).getAsLong();
    }

    public static long parallelRangedSum(long n) {
        return LongStream.rangeClosed(1, n).parallel().reduce(Long::sum).getAsLong();
    }
}

package lambdasinaction.chap7;

import java.util.concurrent.*;
import java.util.function.*;

public class ParallelStreamsHarness {

    public static final ForkJoinPool FORK_JOIN_POOL = new ForkJoinPool();

    public static void main(String[] args) {
        System.out.println("Iterative Sum done in: " + measurePerf(ParallelStreams::iterativeSum, 10_000_000L) + " msecs");
        System.out.println("Sequential Sum done in: " + measurePerf(ParallelStreams::sequentialSum, 10_000_000L) + " msecs");
        System.out.println("Parallel forkJoinSum done in: " + measurePerf(ParallelStreams::parallelSum, 10_000_000L) + " msecs" );
        System.out.println("Range forkJoinSum done in: " + measurePerf(ParallelStreams::rangedSum, 10_000_000L) + " msecs");
        System.out.println("Parallel range forkJoinSum done in: " + measurePerf(ParallelStreams::parallelRangedSum, 10_000_000L) + " msecs" );
    }

    public static <T, R> long measurePerf(Function<T, R> f, T input) {
        long fastest = Long.MAX_VALUE;
        for (int i = 0; i < 10; i++) {
            long start = System.nanoTime();
            R result = f.apply(input);
            long duration = (System.nanoTime() - start) / 1_000_000;
            System.out.println("Result: " + result);
            if (duration < fastest) fastest = duration;
        }
        return fastest;
    }
}

Проблема модификации общих переменных

Хотя параллельные потоки легко реализуют многопоточность, это все же не решает проблему модификации общих переменных в многопоточности. В следующем коде есть общая переменная total, которая использует последовательный и параллельный потоки для вычисления суммы первых n натуральных чисел:

public static long sideEffectSum(long n) {
    Accumulator accumulator = new Accumulator();
    LongStream.rangeClosed(1, n).forEach(accumulator::add);
    return accumulator.total;
}

public static long sideEffectParallelSum(long n) {
    Accumulator accumulator = new Accumulator();
    LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);
    return accumulator.total;
}

public static class Accumulator {
    private long total = 0;

    public void add(long value) {
        total += value;
    }
}

Результат каждого вывода последовательного выполнения: 50000005000000, в то время как результаты параллельного выполнения варьируются.

Это связано с тем, что при каждом обращении к totle будет происходить гонка данных, о причинах которой вы можете прочитать в блоге о volatile. Поэтому не рекомендуется использовать параллельные потоки, когда в коде есть операции, изменяющие общие переменные.

Примечания по использованию параллельных потоков

При использовании параллельных потоков необходимо обратить внимание на следующие моменты:

Попробуйте использовать необработанные потоки данных, такие как LongStream/IntStream/DoubleStream, вместо Stream для обработки чисел, чтобы избежать дополнительных накладных расходов на частую распаковку.

Чтобы учесть общую вычислительную стоимость конвейера операций потока, пусть N будет общим количеством задач для работы, а Q — временем на операцию. N * Q — общее время операции, и большее значение Q означает большую вероятность выгоды от использования параллельных потоков.

Например, несколько типов ресурсов отправляются из внешнего интерфейса и должны храниться в базе данных. Каждый ресурс соответствует отдельной таблице. Мы можем считать, что количество типов равно N, а сетевое время, затраченное на хранение базы данных + время, затраченное на вставку, равно Q.

В целом, время работы сети относительно велико. Поэтому эта операция больше подходит для параллельной обработки. Конечно, когда количество типов превышает количество ядер, повышение производительности этой операции будет в определенной степени обесценено. Лучшие методы оптимизации будут представлены вам в будущих блогах.

  • Параллельные потоки не рекомендуются для небольших объемов данных.

  • Потоковые данные, которые легко разбиваются на фрагменты, рекомендуются параллельные потоки

  • Ниже приведена разделяемая таблица производительности для некоторых распространенных фреймворков коллекций, соответствующих потокам.

Ниже приведена разделяемая таблица производительности для некоторых распространенных фреймворков коллекций, соответствующих потокам:

Кодировать слова непросто. Если вы чувствуете, что что-то приобретете после прочтения, вы можете просто щелкнуть по нему и позволить большему количеству людей увидеть его ~

Суммировать:

Редактор резюмирует вопросы интервью 2020. Модули, включенные в этот вопрос интервью, разделены на 19 модулей, а именно: основа Java, контейнер, многопоточность, отражение, копирование объекта, Java Web, исключение, сеть, шаблон проектирования, Spring / Spring MVC, Spring Boot/Spring Cloud, Hibernate, MyBatis, RabbitMQ, Kafka, Zookeeper, MySQL, Redis, JVM.

Обратите внимание на общедоступный номер: Программист Бай Наннан, получите указанную выше информацию.