Анализ принципа потока Java 8

Java

Говоря о Java 8, мы знаем, что одним из основных изменений в Java 8 является расширение функционального программирования, а Stream API является главным героем функционального программирования.Stream API — это стиль обработки потоковых данных, то есть обрабатываемые данные обрабатывается как поток., который передается в конвейере, и данные обрабатываются в каждом узле конвейера, например фильтрация, сортировка, преобразование и т. д.

Во-первых, давайте рассмотрим пример использования Stream API, конкретный код выглядит следующим образом:

code1 Stream example

Это очень простой пример использования Stream.После того, как мы отфильтровали пустые строки, мы преобразовали их в тип int и вычислили максимальное значение.Это включает в себя три операции: filter, mapToInt и sum. Я полагаю, что у большинства людей при первом использовании Stream API возникнет вопрос: как реализован Stream, выполняет ли он итерацию каждый раз при вызове функции? Ответ однозначно нет, потому что, если бы действительно выполнялась итерация на вызов функции, эффективность была бы неприемлемой, и Stream не был бы так популярен.

По сути, внутренний Stream реализуется через конвейер (Pipeline).Основная идея заключается в том, чтобы выполнять как можно больше операций по конвейеру во время итерации, чтобы избежать многократных итераций. Чтобы иметь более четкое представление о работе Stream, мы суммируем все операции Stream.

Как видно из приведенной выше таблицы, Stream делит все операции на две категории: промежуточные операции и операции завершения. Среди них промежуточные операции делятся на операции без сохранения состояния и операции с отслеживанием состояния, а операции завершения делятся на операции без короткого замыкания и операции короткого замыкания.Смысл этих операций следующий:

** 1. Промежуточная операция: ** Промежуточная операция — это своего рода метка, только конечная операция запускает фактический расчет.

  • нет статуса: означает, что на обработку элемента не влияет предыдущий элемент;
  • **С отслеживанием состояния:** промежуточные операции с отслеживанием состояния должны ждать, пока все элементы будут обработаны, прежде чем станет известен окончательный результат. Например, сортировка — это операция с отслеживанием состояния, и результат сортировки нельзя определить, пока не будут прочитаны все элементы.

** 2. Операция завершения: ** Как следует из названия, это операция для получения окончательного результата расчета.

  • **Операция короткого замыкания: ** означает, что результат может быть возвращен без обработки всех элементов;
  • **Операция без короткого замыкания:** означает, что все элементы должны быть обработаны для получения конечного результата.

Поточное конвейерное решение

Из приведенного выше введения мы узнали, что Stream записывает только при выполнении промежуточных операций.Когда пользователь вызывает операцию завершения, все записанные операции будут выполняться по конвейеру за одну итерацию. В рамках этого направления мысли необходимо решить несколько вопросов:

  1. Как записываются действия пользователя?
  2. Как складываются операции?
  3. Как проходит операция после наложения?

1. Как записать операцию

Изображение 1-1

Что касается записи операции, этап (операция) используется много раз в комментариях к исходному коду JDK для идентификации каждой операции пользователя, и обычно операция Stream требует функции обратного вызова, поэтому полная операция определяется данными. источник, операция, Представлена ​​тройка функций обратного вызова. В конкретной реализации экземпляр ReferencePipeline используется для представления экземпляров Head, StatelessOp и StatefulOp на рис. 1-1. Далее давайте взглянем на исходный код нескольких распространенных методов Stream.

code2 Collection.Stream()

code3 StreamSupport.stream()

code4 ReferencePipeline.map()

Как видно из приведенного выше исходного кода, когда мы вызываем метод stream(), мы в конечном итоге создаем экземпляр Head для представления заголовка операции потока.При вызове метода map() экземпляр промежуточной операции без сохранения состояния StatelessOp будет создан, а также будут вызываться другие операции.Соответствующий метод также сгенерирует экземпляр ReferencePipeline, который здесь не указан. После того, как пользователь вызовет ряд операций, в конечном итоге будет сформирован двусвязный список, как показано на следующем рисунке:

Рисунок 1-2

2. Как наложить операцию

Выше мы объяснили, что Stream записывает операции по этапу, но этап сохраняет только текущую операцию, он не знает, как управлять следующим этапом и какие операции требуются. Поэтому, если вы хотите его выполнить, вам нужен какой-то протокол для связывания каждого этапа. JDK реализован с использованием интерфейса Slink.Интерфейс Slink определяет четыре метода: begin(), end(), CancelRequested() и accept(), как показано в следующей таблице.

Оглядываясь назад на метод code3 ReferencePipeline.map(), мы обнаружим, что при создании экземпляра ReferencePipeline нам нужно переписать метод opWrapSink для создания соответствующего экземпляра Sink. И, прочитав исходный код, вы обнаружите, что обычные операции создают экземпляр ChainedReference. Мы можем посмотреть на реализацию исходного кода абстрактного класса code5 ChainedReference, потому что ChainedReference является лишь абстрактной реализацией и не несет в себе характеристик конкретных операций, поэтому может лучше отражать концепцию дизайна автора.

При просмотре исходного кода можно обнаружить, что ChainedReference будет удерживать Slink следующей операции и вызывать соответствующий метод Slink следующей операции при вызове методов begin, end, cancelRequested, чтобы достичь эффекта суперпозиции. .

code5 ChainedReference

3. Как выполнить операцию после наложения

Sink отлично инкапсулирует каждый шаг операции Stream и предоставляет режим [обработка->пересылка] для наложения операций. Эта серия передач была включена, и последний шаг — повернуть шестерни, чтобы начать выполнение. Что инициирует эту цепочку операций? Возможно, вы уже подумали, что первоначальная сила запуска — это терминальная операция.Как только терминальная операция будет вызвана, будет запущено выполнение всего конвейера.

Других операций после конечной операции быть не может, поэтому конечная операция не создаст новую стадию пайплайна (Stage) Интуитивно понятно, что связанный список пайплайна в дальнейшем не будет расширяться. Конечная операция создаст приемник, который обертывает свою собственную операцию, которая также является последним приемником в конвейере.Этот приемник должен только обрабатывать данные и не должен передавать результат в нижестоящий приемник (поскольку нисходящего потока нет). . Для модели [обработка->пересылка] приемника приемник, завершающий операцию, является выходом из цепочки вызовов.

Давайте рассмотрим, как восходящий приемник находит нисходящий приемник. Необязательным решением является установка поля Sink в PipelineHelper, поиск нижестоящей стадии в конвейере и доступ к полю Sink. Но разработчик библиотеки классов Stream этого не сделал, а задал метод Sink AbstractPipeline.opWrapSink(int flags, Sink downstream) для получения Sink, функция этого метода — вернуть новую операцию, содержащую текущую Stage и может передать результат нижестоящему объекту Sink. Зачем создавать новый объект, а не возвращать поле Sink? Это связано с тем, что использование opWrapSink() может объединить текущую операцию с нижестоящим приемником (параметр нисходящего потока выше) в новый приемник. Просто представьте, что пока вы начинаете с последнего этапа конвейера и продолжаете вызывать метод opWrapSink() предыдущего этапа до самого начала (исключая этап 0, поскольку этап 0 представляет собой источник данных и не содержит операций), вы может получить представление обо всех операциях на конвейере.Sink, выраженный в коде, выглядит так:

code6 AbstractPipeline.wrapSink

Теперь все операции от начала до конца пайплайна упакованы в сток.Выполнение этого стока эквивалентно выполнению всего пайплайна.Код выполнения стока такой:

code7 AbstractPipeline.copyInto

Вышеприведенный код сначала вызывает метод wrapSink.begin(), чтобы сообщить приемнику о поступлении данных, затем вызывает метод spliterator.forEachRemaining() для перебора данных и, наконец, вызывает методwrappedSink.end(), чтобы уведомить приемник. Сообщите, что обработка данных завершена. Логика такая ясная.

Автор: Хуан Жунпэн