Используйте новую функцию Java8 parallelStream с осторожностью

интервью

1. Введение

Прежде чем говорить о parallelStream, убедитесь, что понимаете Stream и его основные операции.

Рекомендую всем прочитать предыдущую статьюОсвободите свои руки, у Stream действительно есть эта волна операций бога

2. Что такое ParallelStream

Упомянутый выше поток Java8 Stream сериализуется во время выполнения. Если выполнение задачи занимает много времени, вы можете использовать «братский поток» Stream ParallelStream.

Чтобы не вводить в заблуждение, использование параллелизма не требует много времени и может быть разумно использовано в соответствии с различными бизнес-сценариями.

parallelStream — это параллельный поток, что означает параллельную обработку при обработке задач, что неразрывно связано с параллельным программированием.

Предпосылка состоит в том, что аппаратное обеспечение поддерживает это.Если есть одноядерный процессор, будет только параллельная обработка, а не параллелизм.

Эта статья предназначена главным образом для того, чтобы объяснить, что одна из возможных точек зрения ParallelStream — превратиться в мину.

Будут ли параллельные потоки, используемые бизнесом в проекте, действительно обрабатываться параллельно?

3. Как использовать ParallelStream

ParallelStream в использовании ничем не отличается от Stream. По сути, он возвращает поток, но когда выполняется базовая обработкаСудя по условиям параллельный он или последовательный

Параллельные потоки выполняются не по исходной последовательной траектории, аслучайное исполнение,Конечно, для этого вывода forEach также можно выполнить последовательную сериализацию, но это не является предметом рассмотрения в этой статье.

4. ForkJoinPool

Я считаю, что если вы действительно использовали параллельные потоки в своем проекте, вы знаете ForkJoinPool.

Верно, нижний слой параллельного потока — это используемый ForkJoinPool,Алгоритм кражи работы Пул потоков

Преимущество ForkJoinPool в том, чтоВоспользуйтесь преимуществами нескольких процессоров, разделить задачу на несколько «маленьких задач» и поместить несколько «маленьких задач» на несколько ядер процессора для параллельного выполнения; когда несколько «маленьких задач» завершены, объединить результаты выполнения

5. Подводные камни параллельных потоков

5.1 Вопросы безопасности потоков

Пока он обрабатывается параллельно, если операции в процессе генерируют состояние гонки, возникнут проблемы с безопасностью потоков.

Вот пример, иллюстрирующий конкретную проблему

public static void main(String[] args) {
    List<Integer> integerList = Lists.newArrayList();
    List<String> strList = Lists.newArrayList();

    int practicalSize = 1000000;

    for (int i = 0; i < practicalSize; i++) {
        strList.add(String.valueOf(i));
    }

    strList.parallelStream().forEach(each -> {
        integerList.add(Integer.parseInt(each));
    });

    log.info("  >>> integerList 预计长度 :: {}", practicalSize);
    log.info("  >>> integerList 实际长度 :: {}", integerList.size());
}
/**
 * >>> integerList 预估长度 :: 1000000
 * >>> integerList 实际长度 :: 211195
 */

Процесс работы вышеуказанной программы описывается следующим образом:

1. Создал два списка типа String и Integer.

2. Вставить 1000000 записей в strList

3. Отформатируйте данные в strList как Integer, используя параллельный поток, и добавьте в integerList.

4. Выведите ожидаемую длину и фактическую длину integerList.

При нормальных обстоятельствах мы надеемся, что integerList в конечном итоге выведет 1000000.

Но поскольку обработка параллельных потоков является многопоточной операцией, это сделает поток ArrayList небезопасным.

Фактическая длина в примере не фиксирована и зависит от конкретной скорости обработки ЦП.

Решение

Если в проекте есть необходимость в приведенном выше коде, вы можете использоватьКласс Vector, пакет Collections, класс JUC

Поскольку используется параллельная обработка, определенные требования к производительности все же есть, поэтому данный контейнер смещен в сторону JUC.

5.2 Будет ли это параллельно при любых обстоятельствах?

Этот вопрос, который находится в центре внимания этой статьи, представляет собой небольшой блокнот для заметок.

Во-первых, давайте перечислим API, которые могут вызывать параллельные потоки.

public static void main(String[] args) {
    List<String> stringList = Lists.newArrayList();
    stringList.parallelStream();
    stringList.stream().parallel();
    Stream.of(stringList).parallel();
    ...
}

Хотя методы вызова API различаются, нижний уровень — этоДля флага parallel в AbstractPipeline установлено значение true.

public final S parallel() {
   sourceStage.parallel = true;
   return (S) this;
}

Напрашивается вопрос, называя эти три разных параллельных потоковых API,Использует ли нижний слой тот же ForkJoinPool?

Сначала давайте посмотрим, как инициализируется ForkJoinPool.

В параллельном потоке используется свойство статической переменной внутри ForkJoinPool.

static final ForkJoinPool common;

public static ForkJoinPool commonPool() {
    // assert common != null : "static init error";
    return common;
}

Статический блок ForkJoinPool отвечает за инициализацию общего

static {
    // initialize field offsets for CAS etc
    try {
        U = sun.misc.Unsafe.getUnsafe();
        Class<?> k = ForkJoinPool.class;
        CTL = U.objectFieldOffset
                (k.getDeclaredField("ctl"));
        RUNSTATE = U.objectFieldOffset
                (k.getDeclaredField("runState"));
        STEALCOUNTER = U.objectFieldOffset
                (k.getDeclaredField("stealCounter"));
        Class<?> tk = Thread.class;
        PARKBLOCKER = U.objectFieldOffset
                (tk.getDeclaredField("parkBlocker"));
        Class<?> wk = WorkQueue.class;
        QTOP = U.objectFieldOffset
                (wk.getDeclaredField("top"));
        QLOCK = U.objectFieldOffset
                (wk.getDeclaredField("qlock"));
        QSCANSTATE = U.objectFieldOffset
                (wk.getDeclaredField("scanState"));
        QPARKER = U.objectFieldOffset
                (wk.getDeclaredField("parker"));
        QCURRENTSTEAL = U.objectFieldOffset
                (wk.getDeclaredField("currentSteal"));
        QCURRENTJOIN = U.objectFieldOffset
                (wk.getDeclaredField("currentJoin"));
        Class<?> ak = ForkJoinTask[].class;
        ABASE = U.arrayBaseOffset(ak);
        int scale = U.arrayIndexScale(ak);
        if ((scale & (scale - 1)) != 0)
            throw new Error("data type scale not a power of two");
        ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
    } catch (Exception e) {
        throw new Error(e);
    }

    commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
    defaultForkJoinWorkerThreadFactory =
            new ForkJoinPool.DefaultForkJoinWorkerThreadFactory();
    modifyThreadPermission = new RuntimePermission("modifyThread");

   // 创建ForkJoinPool
    common = java.security.AccessController.doPrivileged
            (new java.security.PrivilegedAction<ForkJoinPool>() {
                public ForkJoinPool run() {
                    return makeCommonPool();
                }
            });
    int par = common.config & SMASK; // report 1 even if threads disabled
    commonParallelism = par > 0 ? par : 1;
}

Как видно из следующего кода инициализации, параллелизм, threadFactory и exceptionHandler могут быть инициализированы и персонализированы.

private static ForkJoinPool makeCommonPool() {
    int parallelism = -1;
    ForkJoinPool.ForkJoinWorkerThreadFactory factory = null;
    Thread.UncaughtExceptionHandler handler = null;
    try {  // ignore exceptions in accessing/parsing properties
        String pp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.parallelism");
        String fp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.threadFactory");
        String hp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
        if (pp != null)
            parallelism = Integer.parseInt(pp);
        if (fp != null)
            factory = ((ForkJoinPool.ForkJoinWorkerThreadFactory) ClassLoader.
                    getSystemClassLoader().loadClass(fp).newInstance());
        if (hp != null)
            handler = ((Thread.UncaughtExceptionHandler) ClassLoader.
                    getSystemClassLoader().loadClass(hp).newInstance());
    } catch (Exception ignore) {
    }
    if (factory == null) {
        if (System.getSecurityManager() == null)
            factory = defaultForkJoinWorkerThreadFactory;
        else // use security-managed default
            factory = new ForkJoinPool.InnocuousForkJoinWorkerThreadFactory();
    }
    if (parallelism < 0 && // default 1 less than #cores
            (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
        parallelism = 1;
    if (parallelism > MAX_CAP)
        parallelism = MAX_CAP;
    return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
            "ForkJoinPool.commonPool-worker-");
}

Общее количество потоков внутри экземпляра ForkJoinPool, созданного с помощью параллелизма, по умолчанию равно:Количество ядер ЦП текущей запущенной среды - 1

Это очень важно и во многом связано с тем, как работают параллельные потоки, как описано ниже.

Когда ты увидишь это, твои друзья должны понять

Параллельные потоки, используемые в программе, используют статические переменные, общие для ForkJoinPool.

Продолжайте рассматривать вопросы, поднятые в этом разделе, здесь Код, использующий параллельные потоки в проекте, действительно может достичь параллелизма?

Сначала разместите тестовый код здесь, и заинтересованные друзья смогут попробовать его локально.

public static void main(String[] args) throws InterruptedException {
    System.out.println(String.format("  >>> 电脑 CPU 并行处理线程数 :: %s, 并行流公共线程池内线程数 :: %s",
            Runtime.getRuntime().availableProcessors(),
            ForkJoinPool.commonPool().getParallelism()));
    List<String> stringList = Lists.newArrayList();
    List<String> stringList2 = Lists.newArrayList();
    for (int i = 0; i < 13; i++) stringList.add(String.valueOf(i));
    for (int i = 0; i < 3; i++) stringList2.add(String.valueOf(i));

    new Thread(() -> {
        stringList.parallelStream().forEach(each -> {
            System.out.println(Thread.currentThread().getName() + " 开始执行 :: " + each);
            try {
                Thread.sleep(6000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }, "子线程-1").start();

    Thread.sleep(1500);

    new Thread(() -> {
        stringList2.parallelStream().forEach(each -> {
            System.out.println(Thread.currentThread().getName() + " :: " + each);
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

    }, "子线程-2").start();
}

Для имитации формального сценария использования в проекте тестовый код описывается следующим образом:

1. «Подпоток-1» и «Подпоток-2» соответственно представляют два разных бизнеса в проекте с использованием параллельных потоков.

2. Сервер может гарантировать только 12 одновременных потоков, а параллелизм в общем ForkJoinPool равен 11 при инициализации.

3. Работа с "подпотоком-1" требует много времени. С учетом потоков выполнения и потоков в пуле потоков одновременно может выполняться 12 задач.

4. Если «подпоток-1» запускает все пулы потоков параллельно, каков результат того, что «подпоток-2» снова запускает параллельный поток?

Запустите тестовую программу, чтобы увидеть, что произойдет

Вот описание процесса запуска диаграммы:

1. Вы видите, что поток, отправивший задачу, также участвовал в выполнении задачи.

2. Поскольку наш публичный параллельный пул ForkJoinPool равен 11, а потоков, отправляющих задачи, всего 12, но нашему «подпотоку-1» необходимо выполнить в общей сложности 13 задач.

3. Задача в «подпотоке-1» переводит поток в спящий режим, а задача симуляции занимает много времени, поэтому «подпоток-1» запускает общедоступный пул потоков и оставляет задачу на том же уровне. время

4. Поскольку «подпоток-1» выполняет множество задач, поэтому, когда выполняется «подпоток-2»,Параллельная обработка невозможна, может выполняться только тем потоком, который отправляет задачу

5. После завершения выполнения 12 задач «подпотока-1» оставшаяся задача продолжит выполняться.

краткое изложение проблемы

Из приведенной выше тестовой программы мы знаем, что когда параллельный поток используется в проекте для реального выполнения,не обязательно параллельно

Потому что, если задачи других параллельных потоков в проекте занимают много времени на выполнение, они займут соответствующие ресурсы, так что задачи в конечном итоге будут выполняться через основной поток.

Поэтому перед использованием параллельных потоков мы должны рассмотреть следующие вопросы:

1. Действительно ли бизнес-сценарий требует параллельной обработки?

2. Являются ли задачи параллельной обработки относительно независимыми?

3. Зависит ли параллельная обработка от порядка выполнения задач?

Для этих трех проблем, если бизнес может соответствовать сценариям использования и иметь соответствующие решения, параллелизм действительно может значительно улучшить производительность.

6. Резюме ParallelStream

В статье в основном описывается, что такое ParallelStream

Потоковая обработка, обеспечивающая параллельные вычисления

Какая технология используется в основе ParallelStream для получения параллельных вычислений

ForkJoinPool, параллельная возможность по умолчанию Runtime.getRuntime(). AvailableProcessors() - 1, которую можно переопределить, указав параметры

Некоторые проблемы с параллельными потоками также можно назвать проблемами параллельного программирования.

Вопросы безопасности потоков и подходит ли сценарий для параллельной обработки

В целом, параллельная обработка все еще может использоваться в подходящих сценариях.

постскриптум

В связи с ограниченным уровнем автора, надеюсь на обратную связь и исправление ошибок в статье, спасибо 🙏

Любовь моих друзей - самая большая поддержка для меня 😆 , надеюсь каждый сможетСтавьте лайки, комментируйте, смотрите Санлиан!

В этой статье используетсяmdniceнабор текста