Поток в Java8 такой мощный, знаете, в чем его принцип?

Java

Нет публики:Маленькое кофейное шоу Java,Веб-сайт:javaxks.com

Автор: Years Enron, ссылка: elsef.com/2019/09/16/Принципиальный анализ Stream в Java8

API Java 8 добавляет новую абстракцию под названием Stream, которая позволяет обрабатывать данные декларативным способом.

Потоки обеспечивают высокоуровневую абстракцию над операциями над наборами и представлениями Java интуитивно понятным способом, аналогичным запросу данных из базы данных с помощью операторов SQL.

Stream API может значительно повысить производительность Java-программистов, позволяя программистам писать эффективный, чистый и лаконичный код.

В этой статье будет проанализирован принцип реализации Stream.

Состав и характеристики Stream

Поток представляет собой очередь элементов из источника данных и поддерживает операции агрегации:

  • Элементы — это объекты определенного типа, образующие очередь. Потоки в Java не хранят и не управляют такими элементами, как коллекции, а вычисляют по запросу.
  • Источником потока источника данных может быть коллекция, массив, канал ввода-вывода, генератор и т. д.
  • Операции агрегирования аналогичны операторам SQL, таким как фильтрация, сопоставление, сокращение, поиск, сопоставление, сортировка и т. д.

Собрание различных предыдущих операций, ручья работа Есть две основные характеристики:

  • Конвейерная обработка: Промежуточные операции возвращают сам объект потока. Таким образом, несколько операций могут быть объединены в конвейер, как в свободном стиле. Это позволяет оптимизировать такие операции, как оценка лени и короткое замыкание.
  • Внутренняя итерация: ранее обход коллекции выполнялся с помощью Iterator или For-Each, который явно выполняет итерацию за пределами коллекции, что называется внешней итерацией. Поток обеспечивает внутренний способ итерации, реализованный через шаблон посетителя (Visitor).

В отличие от итераторов, потоки могут работать параллельно, тогда как итераторы могут работать только императивно и последовательно. Как следует из названия, при использовании последовательного режима для перемещения каждый элемент считывается до того, как будет считан следующий элемент. При использовании параллельного обхода данные разбиваются на несколько сегментов, каждый из которых обрабатывается в отдельном потоке, а результаты выводятся вместе.

Параллельная работа Stream основана на инфраструктуре Fork/Join (JSR166y), представленной в Java7 для разделения задач и ускорения обработки. Эволюция параллельного API Java в основном выглядит следующим образом:

1.0-1.4 中的 java.lang.Thread  
5.0 中的 java.util.concurrent
6.0 中的 Phasers 等
7.0 中的 Fork/Join 框架
8.0 中的 Lambda

Stream имеет возможности параллельной обработки, и процесс обработки будет разделять и властвовать, то есть большая задача делится на несколько маленьких задач, а это значит, что каждая задача является операцией:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream()
       .forEach(out::println); 

Видно, что простая линия кода помогает нам реализовать функцию вывода элементов в наборе параллельно, но поскольку порядок параллельного исполнения неконтролируемо, результатом каждого исполнения не обязательно одинаково.

Если это должно быть то же самое, вы можете использовать метод forEachOrdered для выполнения операции завершения:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream()
       .forEachOrdered(out::println);  

Здесь возникает вопрос, если результат нужно упорядочить, противоречит ли это нашему первоначальному замыслу параллельного выполнения? Да, в этом сценарии очевидно, что нет необходимости использовать параллельные потоки, и его можно выполнить напрямую с последовательными потоками, иначе производительность может быть хуже, потому что все параллельные результаты принудительно сортируются в конце.

Хорошо, давайте сначала представим соответствующие знания интерфейса Stream.

Интерфейс BaseStream

Родительским интерфейсом Stream является BaseStream, который представляет собой интерфейс верхнего уровня, реализуемый всеми потоками и определяемый следующим образом:

public interface BaseStream<T, S extends BaseStream<T, S>>
        extends AutoCloseable {
    Iterator<T> iterator();

    Spliterator<T> spliterator();

    boolean isParallel();

    S sequential();

    S parallel();

    S unordered();

    S onClose(Runnable closeHandler);

    void close();
}

Среди них T — тип элементов в потоке, S — класс реализации BaseStream, элементы в нем — тоже T и S — тоже он сам:

S extends BaseStream<T, S>

У вас немного кружится голова?

На самом деле это легко понять.Давайте посмотрим на использование S в интерфейсе: например, sequence() и parallel(), эти два метода возвращают экземпляры S, что означает, что они соответственно поддерживают сериализацию текущего потока. Или работать параллельно и возвращать «измененный» объект потока.

Если он параллельный, он должен включать разделение текущего потока, то есть разделение потока на несколько подпотоков, причем подпоток должен быть того же типа, что и родительский поток. Подпотоки могут продолжать разделять подпотоки и так далее...

Другими словами, S здесь является классом реализации BaseStream, который также является потоком, таким как Stream, IntStream, LongStream и т. д.

Потоковый интерфейс

Давайте взглянем на объявление интерфейса Stream:

public interface Stream<T> extends BaseStream<T, Stream<T>> 

Это нетрудно понять применительно к приведенному выше объяснению: то есть Stream может продолжать разбиваться на Stream, и мы можем подтвердить это через некоторые его методы:

    Stream<T> filter(Predicate<? super T> predicate);
    <R> Stream<R> map(Function<? super T, ? extends R> mapper);
    <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);
    Stream<T> sorted();
    Stream<T> peek(Consumer<? super T> action);
    Stream<T> limit(long maxSize);
    Stream<T> skip(long n);
    ...

Это промежуточные операции, которые работают с потоками, и их возвращаемым результатом должен быть сам объект потока.

операция закрытия потока

BaseStream реализует интерфейс AutoCloseable, то есть метод close() будет вызываться при закрытии потока. В то же время BaseStream также предоставляет нам метод onClose():

/** * Returns an equivalent stream with an additional close handler. Close * handlers are run when the {@link #close()} method * is called on the stream, and are executed in the order they were * added. All close handlers are run, even if earlier close handlers throw * exceptions. If any close handler throws an exception, the first * exception thrown will be relayed to the caller of {@code close()}, with * any remaining exceptions added to that exception as suppressed exceptions * (unless one of the remaining exceptions is the same exception as the * first exception, since an exception cannot suppress itself.) May * return itself. * * <p>This is an <a href="package-summary.html#StreamOps">intermediate * operation</a>. * * @param closeHandler A task to execute when the stream is closed * @return a stream with a handler that is run if the stream is closed */
S onClose(Runnable closeHandler);

Когда вызывается интерфейс close() AutoCloseable, будет вызываться метод onClose() объекта потока, но следует отметить несколько моментов:

  • Метод onClose() возвращает сам объект потока, что означает, что объект может вызываться несколько раз.
  • Если вызывается несколько методов onClose(), он срабатывает в том порядке, в котором они вызываются, но если у метода есть исключение, вверх будет выброшено только первое исключение.
  • Исключение, созданное предыдущим методом onClose(), не повлияет на использование последующих методов onClose().
  • Если несколько методов onClose() генерируют исключения, будет отображаться только стек первого исключения, тогда как другие исключения будут сжаты, и будет отображаться только часть информации.

Параллельные и последовательные потоки

Интерфейс BaseStream предоставляет два метода, параллельный поток и последовательный поток.Эти два метода могут вызываться произвольно несколько раз или смешиваться, но в конечном итоге превалирует только возвращаемый результат последнего вызова метода.

Обратитесь к описанию метода parallel():

Returns an equivalent stream that is parallel. May return itself, either because the stream was already parallel, or because the underlying stream state was modified to be parallel.

Таким образом, вызов одного и того же метода несколько раз не создает новый поток, а напрямую повторно использует текущий объект потока.

В следующем примере преобладает последний вызов parallel(), и сумма, наконец, вычисляется параллельно:

stream.parallel()
   .filter(...)
   .sequential()
   .map(...)
   .parallel()
   .sum();

Человек, стоящий за ParallelStream: ForkJoinPool

Платформа ForkJoin — это новая функция JDK 7. Как и ThreadPoolExecutor, она также реализует интерфейсы Executor и ExecutorService. Он использует «бесконечную очередь» для сохранения задач, которые необходимо выполнить, и количество потоков передается через конструктор.Если желаемое количество потоков не передается в конструктор, количество ЦП, доступных на текущем будет установлено значение по умолчанию для количества потоков.

ForkJoinPool в основном используется для решения проблем с использованием алгоритма «разделяй и властвуй», типичных приложений, таких как алгоритм быстрой сортировки. Дело в том, что ForkJoinPool необходимо использовать относительно небольшое количество потоков для обработки большого количества задач. Например, для сортировки 10 миллионов элементов данных задача будет разделена на две задачи сортировки по 5 миллионов и задачу слияния двух наборов данных по 5 миллионов. По аналогии, такой же процесс сегментации будет выполнен для 5 миллионов данных, и в конце будет установлен порог, указывающий, когда размер данных велик, и такой процесс сегментации будет остановлен. Например, когда количество элементов меньше 10, он перестанет разбиваться и вместо этого будет использовать сортировку вставками для их сортировки. Так что в итоге все задачи в сумме составят около 2 000 000+.

Суть проблемы в том, что задача может быть выполнена только тогда, когда все ее подзадачи выполнены, представьте себе процесс сортировки слиянием.

Таким образом, при использовании ThreadPoolExecutor возникает проблема с разделяй и властвуй, потому что потоки в ThreadPoolExecutor не могут добавить еще одну задачу в очередь задач и дождаться завершения задачи, прежде чем продолжить. При использовании ForkJoinPool поток может создать новую задачу и приостановить текущую задачу, в это время поток может выбрать подзадачи из очереди для выполнения.

Так в чем же разница в производительности при использовании ThreadPoolExecutor или ForkJoinPool?

Во-первых, с помощью ForkJoinPool можно использовать ограниченное количество потоков для выполнения очень большого количества задач с «отношением родитель-потомок», например, используя 4 потока для выполнения более 2 миллионов задач. При использовании ThreadPoolExecutor завершение невозможно, т.к. Thread в ThreadPoolExecutor не может выбрать сначала выполнение подзадач, а когда нужно выполнить 2 миллиона задач с отношениями родитель-потомок, требуется еще и 2 миллиона потоков, что явно невыполнимо .

Принцип кражи работы:

- (1) Каждый рабочий поток имеет свою собственную рабочую очередь WorkQueue; - (2) Это двусторонняя очередь из очереди, которая является частной для потока; - (3) Подзадача fork в ForkJoinTask будет помещена в выполняемую Руководитель рабочего потока задачи, рабочий поток будет обрабатывать задачи в рабочей очереди в порядке LIFO, то есть в порядке стека; - (4) Чтобы максимально использовать ЦП, бездействующий поток будет удален из очереди других потоков "Кража" задач для выполнения; - (5) но кража задач из хвоста рабочей очереди, чтобы уменьшить конкуренцию с потоком, которому принадлежит очередь; - (6) двойное - завершенные операции очереди: push()/pop() работает только в Called в рабочем потоке своего владельца, poll() вызывается, когда другие потоки крадут задачи; - (7) Когда остается только последняя задача, конкуренция все равно будет , что достигается через CAS;

Взгляд на ParallelStream глазами ForkJoinPool

Java 8 добавляет общий пул потоков в ForkJoinPool для обработки задач, которые явно не передаются в какой-либо пул потоков. Это статический элемент типа ForkJoinPool, который имеет количество потоков по умолчанию, равное количеству ЦП на работающей машине. Автоматическое распараллеливание происходит при вызове нового метода, добавленного в класс Arrays. Например, параллельная быстрая сортировка, используемая для сортировки массива, используется для параллельного обхода элементов массива. Автоматическое распараллеливание также используется в недавно добавленном Stream API Java 8.

Например, следующий код используется для перебора элементов в списке и выполнения требуемой операции:

List<UserInfo> userInfoList =
        DaoContainers.getUserInfoDAO().queryAllByList(new UserInfoModel());
userInfoList.parallelStream().forEach(RedisUserApi::setUserIdUserInfo);

Операции над элементами в списке выполняются параллельно. Метод forEach создает задачу для операции вычисления каждого элемента, которая обрабатывается commonPool в упомянутом выше ForkJoinPool. Вышеупомянутая логика параллельных вычислений, конечно, также может быть реализована с помощью ThreadPoolExecutor, но с точки зрения удобочитаемости кода и объема кода использование ForkJoinPool значительно лучше.

Для количества потоков в общем пуле потоков ForkJoinPool обычно можно использовать значение по умолчанию, то есть количество процессоров исполняемого компьютера. Вы также можете настроить количество потоков ForkJoinPool, задав системное свойство: -Djava.util.concurrent .ForkJoinPool.common.parallelism=N (N — количество потоков).

Стоит отметить, что текущий исполняемый поток также будет использоваться для выполнения задачи, поэтому конечное количество потоков равно N+1, а 1 — это текущий основной поток.

Здесь есть проблема: если вы используете блокирующие операции, такие как ввод-вывод, при выполнении параллельных потоков, это, вероятно, вызовет некоторые проблемы:

public static String query(String question) {
  List<String> engines = new ArrayList<String>();
  engines.add("http://www.google.com/?q=");
  engines.add("http://duckduckgo.com/?q=");
  engines.add("http://www.bing.com/search?q=");
   
  // get element as soon as it is available
  Optional<String> result = engines.stream().parallel().map((base) - {
    String url = base + question;
    // open connection and fetch the result
    return WS.url(url).get();
  }).findAny();
  return result.get();
}

Этот пример типичен, давайте разберем его:

  • Эта операция параллельных потоковых вычислений будет выполняться совместно основным потоком и JVM по умолчанию ForkJoinPool.commonPool().
  • Map — это метод блокировки, которому необходимо получить доступ к интерфейсу HTTP и получить ответ, поэтому любой рабочий поток будет блокироваться и ждать результата, когда он будет выполняться здесь.
  • Поэтому, когда в это время метод расчета вызывается в режиме параллельного потока в других местах, на него будет воздействовать метод, который блокируется и ожидает здесь.
  • Текущая реализация ForkJoinPool не рассматривает компенсацию ожидания рабочих потоков, которые заблокированы в ожидании вновь порожденного потока, поэтому в конечном итоге потоки в ForkJoinPool.commonPool() будут находиться в режиме ожидания и блокировать ожидание.

?Как мы знаем из анализа ситуации выше, выполнение лямбда не происходит мгновенно, все программы, использующие параллельные потоки, могут стать источником блокирующих программ, а другие части программы не смогут получить к ним доступ в процессе выполнения .workers, что означает, что любая программа, использующая параллельные потоки, станет непредсказуемой и потенциально опасной, когда что-то еще займет общий ForkJoinPool.

резюме:

当需要处理递归分治算法时,考虑使用 ForkJoinPool。
仔细设置不再进行任务划分的阈值,这个阈值对性能有影响。
Java 8 中的一些特性会使用到 ForkJoinPool 中的通用线程池。在某些场合下,需要调整该线程池的默认的线程数量
lambda 应该尽量避免副作用,也就是说,避免突变基于堆的状态以及任何 IO
lambda 应该互不干扰,也就是说避免修改数据源(因为这可能带来线程安全的问题)
避免访问在流操作生命周期内可能会改变的状态

Производительность параллельных потоков

На производительность инфраструктуры параллельной потоковой передачи влияют следующие факторы:

  • Размер данных: если данные достаточно велики и время обработки каждого конвейера достаточно велико, параллелизм имеет смысл;
  • Структура исходных данных: каждая операция конвейера основана на исходном источнике данных, обычно коллекции, и разделение различных источников данных коллекции потребует определенного объема;
  • Бокс: обработка типов-примитивов выполняется быстрее, чем типов-боксов;
  • Количество ядер: по умолчанию, чем больше ядер, тем больше потоков запускается в базовом пуле потоков fork/join;
  • Накладные расходы на единицу обработки: чем больше времени затрачивается на каждый элемент в потоке, тем более очевидным является улучшение производительности за счет параллельных операций;

Исходные структуры данных делятся на следующие 3 группы:

  • Хорошая производительность: ArrayList, array или IntStream.range (данные поддерживают случайное чтение и могут быть легко разделены произвольно)
  • Общая производительность: HashSet, TreeSet (данные сложно разложить честно, большинство из них тоже возможно)
  • Низкая производительность: LinkedList (необходимо пройти по связанному списку, сложно разложить пополам), Stream.iterate и BufferedReader.lines (неизвестная длина, сложно разложить)

Примечание. Следующие части взяты из: За кулисами Streams, кстати, спасибо автору Брайану Гетцу, написание слишком ясное.

NQ-модель

Последними двумя факторами, которые следует учитывать, чтобы определить, обеспечит ли параллелизм ускорение, являются объем доступных данных и объем вычислений, выполняемых для каждого элемента данных.

В нашем первоначальном описании параллельной декомпозиции мы приняли концепцию разделения источника до тех пор, пока сегмент не станет достаточно маленьким, чтобы последовательный подход к решению задачи на этом сегменте был более эффективным. Размер сегмента должен зависеть от решаемой задачи и от того, сколько работы выполняет каждый элемент. Например, вычисление длины строки требует гораздо меньше усилий, чем вычисление хэша SHA-1 строки. Чем больше работы выполняется для каждого элемента, тем ниже порог «достаточно большой, чтобы воспользоваться преимуществами параллелизма». Точно так же, чем больше у вас данных, тем больше сегментов вы можете разделить, не нарушая порог «слишком маленький».

Простая, но полезная модель параллельной производительности — это модель NQ, где N — количество элементов данных, а Q — объем работы, выполняемой для каждого элемента. Чем больше произведение N*Q, тем больше вероятность, что вы получите параллельное ускорение. Для задач с очень малым Q, таких как суммирование чисел, обычно может потребоваться N > 10 000 для ускорения; по мере увеличения Q размер данных, необходимых для получения ускорения, уменьшается.

Многие препятствия для распараллеливания (такие как разделение затрат, объединение затрат или чувствительность к порядку) могут быть смягчены операциями с более высокой добротностью. Хотя результаты разделения функции LinkedList могут быть плохими, все же возможно получить параллельное ускорение, если у вас достаточно большое значение Q.

порядок встречи

Порядок встречи относится к тому, является ли порядок, в котором элементы распределяются источником, критическим для вычислений. Некоторые источники (например, наборы и карты на основе хэшей) не имеют значимого порядка встреч. Флаг потока ORDERED указывает, имеет ли поток осмысленный порядок встреч. Разделитель коллекции JDK устанавливает этот флаг в соответствии со спецификацией коллекции; некоторые промежуточные операции могут вводить ORDERED (sorted()) или очищать его (unordered()).

Если поток не соответствует порядку, большинство операций потока должны подчиняться этому порядку. При последовательном выполнении порядок встреч «автоматически сохраняется», потому что элементы обрабатываются естественным образом в том порядке, в котором они встречаются. Даже при параллельном выполнении многие операции (промежуточные операции без сохранения состояния и некоторые завершающие операции (например, reduce()))), соблюдая порядок встречи, не влекут за собой никаких реальных затрат. Но для других операций (промежуточных операций с сохранением состояния, операций завершения, семантика которых связана с порядком встречи, таких как findFirst() или forEachOrdered()), ответственность за соблюдение порядка встречи при параллельном выполнении может быть значительной. Если поток имеет определенный порядок встреч, который не имеет смысла для результата, то последовательное выполнение конвейеров, содержащих операции, чувствительные к порядку, можно ускорить, удалив флаг ORDERED с помощью операции unordered().

В качестве примера операции, чувствительной к порядку встреч, рассмотрим limit(), которая усекает поток до заданного размера. Реализовать limit() в последовательном выполнении просто: ведите счетчик количества просмотренных элементов, после этого отбрасывайте любые элементы. Но при параллельном выполнении реализовать limit() намного сложнее, нужно сохранить первые N элементов. Это требование значительно ограничивает возможности использования параллелизма: если ввод разделен на части, вы не будете знать, будет ли результат части включен в окончательный результат, пока не будут завершены все части, предшествующие этой части. В результате реализация обычно ошибочно выбирает не использовать все доступные ядра или кэшировать весь экспериментальный результат, пока не будет достигнута целевая длина.

Если поток не соответствует порядку, операция limit() может свободно выбирать любые N элементов, что делает выполнение намного более эффективным. Элементы могут быть отправлены вниз по течению, как только они станут известны, без какой-либо буферизации, и единственная координация, которую необходимо выполнить между потоками, — это отправить сигнал, чтобы убедиться, что длина целевого потока не превышена.

Другим менее распространенным примером последовательных затрат является сортировка. Операция sorted() обеспечивает стабильное упорядочение (те же элементы появляются в выходных данных в том же порядке, в котором они были введены во входные данные), если порядок встреч имеет смысл, тогда как для неупорядоченных потоков стабильность (со стоимостью) не требуется. Аналогичная ситуация и у Different(): если поток имеет порядок встреч, то для нескольких одинаковых входных элементов, Different() должен генерировать первый из них, а для неупорядоченных потоков он может генерировать любой элемент — опять же Гораздо более эффективная параллельная реализация может быть получен.

Аналогичная ситуация возникает при агрегировании с помощью collect() . Если операция collect(groupingBy()) выполняется для неупорядоченного потока, элементы, соответствующие любым ключам, должны быть предоставлены нижестоящему сборщику в том порядке, в котором они появляются во входных данных. Этот порядок обычно не имеет смысла для приложений, и никакой порядок не имеет смысла. В этих случаях может быть лучше выбрать параллельный сборщик (такой как groupingByConcurrent()), который игнорирует порядок встречи и позволяет всем потокам собираться непосредственно в общую параллельную структуру данных (такую ​​как ConcurrentHashMap), вместо того, чтобы каждый поток собирал в свою собственную промежуточную карту, а затем объединяет промежуточную карту (что может быть дорого).

Когда использовать параллельные потоки

Говорите так много, используйте заметки о параллельных потоках. parallelStream требует особого внимания, он не решает вопрос производительности под одну гребенку, напротив, при неправильном использовании может серьезно повлиять на производительность. Об этом вопросе я расскажу отдельно в другой статье.

References