суть
- Reactor — это среда реактивной потоковой передачи, работающая поверх Java8, которая предоставляет набор API-интерфейсов в реактивном стиле.
- За исключением различий в отдельных API, его принцип очень похож на Rxjava
- Это реактивная среда четвертого поколения, поддерживающая слияние операций, аналогичная RxJava 2.
- Модель реактивного программирования Spring 5 в основном опирается на Reactor.
Обзор RxJava
Reactor
Это реактивная среда четвертого поколения, чем-то похожая на RxJava 2. Проект Reactor был запущен Pivotal и основан на спецификации Reactive Streaming, Java8 и глоссарии ReactiveX. Его дизайн является результатом совместных усилий Reactor 2 (последний основной выпуск) и основных участников RxJava.
В предыдущих статьях этой же серииРазрешение экземпляра RxJavaиТест RxJavaВ этом руководстве мы рассмотрели основы реактивного программирования: концепцию потока данных, класс Observable и его различные операции, а также создание статических и динамических объектов Observable с помощью фабричных методов.
Observable является источником событий, Observer предоставляет набор простых интерфейсов и использует события Observable, подписавшись на источник событий. Observable уведомляет Observer о поступлении события через onNext, за которым может следовать onError или onComplete, чтобы указать конец события.
RxJava предоставляет TestSubscriber для тестирования Observables, TestSubscriber — это специальный Observer, который можно использовать для подтверждения событий потока.
В этой статье мы будемReactor
иRxJava
Сравните их, включая их сходства и различия.
Типы реакторов
Реактор имеет два типа,Flux
иMono
.
- FluxПодобно Observable RaxJava, он может запускать ноль или более событий и завершать обработку или вызывать ошибки в зависимости от реальной ситуации.
- MonoЗапускает не более одного события, аналогично Single и Maybe в RxJava, поэтому Mono можно использовать для уведомления о завершении асинхронной задачи.
Из-за этих двух типов простых различий мы можем легко отличить тип ответа API: по возвращаемому типу мы можем узнать методзапустить и забытьилизапросить и ждать(Mono) или обработка потока (Flux), содержащего несколько элементов данных.
Flux
иMono
Некоторые операции используют эту функцию для преобразования между двумя типами. Например, вызов метода single() для Flux вернет Mono, а использование метода concatWith() для объединения двух Mono вместе приведет к Flux. Точно так же некоторые операции не имеют смысла в Mono (например, take(n) дает n>1), в то время как некоторые операции имеют смысл только в Mono (например, or(otherMono)).
Reactor
Один из принципов дизайнаЧтобы API оставался компактным, а разделение этих двух реактивных типов — компромисс между выразительностью и простотой использования API.
Использование реактивной потоковой передачи, построенной на Rx
в видеRxJava 实例解析
Как упоминалось здесь, с точки зрения концепции дизайна,RxJava
несколько похожеJava 8 Streams API
. иReactor
немного похожеRxJava
, но это отнюдь не просто совпадение. Этот дизайн должен предоставить собственный API реактивной потоковой передачи в стиле Rx для сложной асинхронной логики. Таким образом, Reactor основан на Reactive Streams и максимально близок к RxJava с точки зрения API.
Использование библиотек реактивных классов и реактивных потоков
Reactive Streams (далее RS) — это спецификация, обеспечивающая стандарт для асинхронной обработки потоков на основе неблокирующего противодавления. Это набор спецификаций, содержащий инструментарий TCK и четыре простых интерфейса (издатель, подписчик, подписка и процессор), которые будут интегрированы в Java 9.
RS в первую очередь занимается реактивным противодавлением (подробнее об этом позже) и взаимодействием между несколькими источниками реактивных событий. Он не предоставляет никаких методов действий, он заботится только о жизненном цикле потока.
Наиболее важным моментом, который отличает Reactor от других фреймворков, является RS. И Flux, и Mono являются реализациями RS для Publisher, и обе они имеют отзывчивое обратное давление.
существуетRxJava 1
Только несколько операций поддерживают обратное давление, Observable RxJava 1 не реализует ни один из типов в RS, но имеет несколько адаптеров типа RS. Возможно, RxJava 1 на самом деле предшествовала спецификации RS, и RxJava 1 выступала в качестве функционального работника во время разработки спецификации RS.
Таким образом, когда вы используете эти адаптеры Publisher, они не предоставляют вам никаких операций. Чтобы иметь возможность выполнять некоторые полезные операции, вам может понадобиться использовать Observable обратно, и на этот раз вам понадобится другой адаптер. Этот визуальный беспорядок может испортить удобочитаемость кода, особенно в такой среде, как Spring 5, которая становится еще более загроможденной, если вся структура построена поверх такого издателя.
Спецификация RS не поддерживает нулевые значения, поэтому помните об этом при переходе с RxJava 1 на Reactor или RxJava 2. Это еще больше беспокоит, если вы используете null для особых целей в своем коде.
RxJava 2
пришел после спецификации RS, поэтому он реализует Publisher непосредственно в типе Flowable. Однако, в дополнение к типу RS, RxJava 2 также сохраняет тип RxJava 1.наследиетипов (Observable, Completable и Single) и вводит некоторые другие необязательные типы — Maybe. Эти типы предлагают разную семантику, но не реализуют интерфейс RS, что является их недостатком. В отличие от RxJava 1, Observable RxJava 2 не поддерживает протокол обратного давления RxJava 2 (эта функция есть только у Flowable). Причина такого дизайна заключается в том, чтобы предоставить набор богатых и плавных API для некоторых сценариев, таких как события, генерируемые пользовательским интерфейсом, в таких сценариях обратное давление не требуется, и его невозможно использовать. Completable, Single и Maybe не нуждаются в поддержке обратного давления, но они также предоставляют богатый набор API и ничего не делают, пока на них не подписаны.
В реактивном мире Reactor становится компактнее, и его типы Mono и Flux реализуют Publisher, и оба поддерживают обратное давление. Хотя использование Mono в качестве издателя связано с некоторыми дополнительными накладными расходами, другие преимущества Mono компенсируют его недостатки. В следующих разделах мы увидим, что означает обратное давление для Mono.
По сравнению с RxJava API похож, но не совпадает.
Глоссарий операций для ReactiveX и RxJava иногда очень сложен для понимания, а некоторые операции имеют запутанные названия по историческим причинам. Reactor старается сделать API как можно более компактным и пытается выбрать лучшее имя при именовании API, но в целом эти два API по-прежнему выглядят очень похожими. В последней итерации RxJava 2 RxJava 2 заимствует некоторую терминологию у Reactor, что свидетельствует о возможном более тесном сотрудничестве между двумя проектами. Некоторые операции и концепции всегда сначала появляются в одном из проектов, затем заимствуются друг у друга и в конечном итоге пронизывают оба проекта одновременно.
Например, во Flux также есть обычный метод just factory (хотя вариантов всего два: с одним аргументом или с аргументом переменной длины). Однако существует множество вариантов метода from, наиболее заметным из которых является fromIterable. Конечно, Flux также включает в себя эти обычные операции: map, merge, concat, flatMap, take и т. д.
Reactor изменил сбивающую с толку операцию mb в RxJava на более уместно выглядящую firstEmitting. Кроме того, toList был переименован в collectList для согласованности API. На самом деле, все операции, начинающиеся с collect, будут агрегировать значения в коллекцию определенного типа, но создавать только Mono для каждой коллекции. Хотя все операции, начинающиеся с to, зарезервированы для преобразования типов, преобразованный тип можно использовать в нереактивном программировании, таком как toFuture().
Причина, по которой Reactor может быть настолько скудным с точки зрения инициализации класса и использования ресурсов, заключается в его функции слияния: Reactor может объединять несколько последовательных операций (например, двойной вызов concatWith) в одну операцию, так что внутренний класс этой операции может быть инициализирован. только один раз (т.е. макрослияние). Эта функция включает в себя оптимизацию на основе источника данных, которая компенсирует некоторые дополнительные накладные расходы, связанные с реализацией Publisher в Mono. Он также может совместно использовать ресурсы (так называемое микрослияние) между несколькими связанными операциями, такими как внутренние очереди. Эти функции делают Reactor бескомпромиссной реактивной средой четвертого поколения, но это выходит за рамки этой статьи.
Давайте рассмотрим несколько операций Reactor.
Некоторые рабочие примеры
(Этот раздел содержит некоторые фрагменты кода, мы рекомендуем вам запустить их и подробно изучить Reactor. Поэтому вам нужно открыть IDE и создать тестовый проект, чтобы добавить Reactor к зависимостям.)
Для Maven в pom.xml можно добавить следующие зависимости:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.0.3.RELEASE</version>
</dependency>
Для Gradle используйте Reactor в качестве зависимости, например:
dependencies {
compile "io.projectreactor:reactor-core:3.0.3.RELEASE"
}
Давайте перепишем примеры из предыдущих статей этой же серии!
Создание Observable чем-то похоже на RxJava, в Reactor его можно создать с помощью фабричных методов just(T...) и fromIterator(Iterable). Метод just запускает список целиком, тогда как метод fromIterable запускает каждый элемент в списке по отдельности:
public class ReactorSnippets {
private static List<String> words = Arrays.asList(
"the",
"quick",
"brown",
"fox",
"jumped",
"over",
"the",
"lazy",
"dog"
);
@Test
public void simpleCreation() {
Flux<String> fewWords = Flux.just("Hello", "World");
Flux<String> manyWords = Flux.fromIterable(words);
fewWords.subscribe(System.out::println);
System.out.println();
manyWords.subscribe(System.out::println);
}
}
Как и в RxJava, приведенный выше код напечатает:
Hello
World
the
quick
brown
fox
jumped
over
the
lazy
dog
Чтобы напечатать каждую букву в предложении, нам также нужен метод flatMap (такой же, как в RxJava), но в Reactor мы используем fromArray вместо from. Затем мы отфильтруем повторяющиеся буквы с помощью отличительных и отсортируем их с помощью sort . Наконец, мы используем zipWith и диапазон для вывода порядка каждой буквы:
@Test
public void findingMissingLetter() {
Flux<String> manyLetters = Flux
.fromIterable(words)
.flatMap(word -> Flux.fromArray(word.split("")))
.distinct()
.sort()
.zipWith(Flux.range(1, Integer.MAX_VALUE),
(string, count) -> String.format("%2d. %s", count, string));
manyLetters.subscribe(System.out::println);
}
Мы легко можем видеть, что s опущено:
1. a
2. b
...
18. r
19. t
20. u
...
25. z
Мы можем исправить это, исправив массив слов, но мы также можем использовать concat/concatWith и Mono, чтобы вручную добавить «s» к букве Flux:
@Test
public void restoringMissingLetter() {
Mono<String> missing = Mono.just("s");
Flux<String> allLetters = Flux
.fromIterable(words)
.flatMap(word -> Flux.fromArray(word.split("")))
.concatWith(missing)
.distinct()
.sort()
.zipWith(Flux.range(1, Integer.MAX_VALUE),
(string, count) -> String.format("%2d. %s", count, string));
allLetters.subscribe(System.out::println);
}
Таким образом, после дедупликации и сортировки добавляется недостающая буква s:
1. a
2. b
...
18. r
19. s
20. t
...
26. z
В предыдущей статье упоминалось сходство между Rx и Streams API, но на самом деле Reactor будет просто отправлять события данных, как это делает Java Steams, когда данные готовы (см. ниже о обратном давлении). Просто подписка на источник события в основном потоке не может выполнять более сложные асинхронные операции, главным образом потому, что после завершения подписки управление немедленно вернется в основной поток, и вся программа завершится. Например:
@Test
public void shortCircuit() {
Flux<String> helloPauseWorld =
Mono.just("Hello")
.concatWith(Mono.just("world")
.delaySubscriptionMillis(500));
helloPauseWorld.subscribe(System.out::println);
}
Этот модульный тест распечатаетHello, но не могу распечататьworld, потому что программа преждевременно завершает работу. При выполнении простых тестов вы обычно попадаете в ловушку, если просто пишете такой простой основной класс. В качестве исправления вы можете создать объект CountDownLatch и вызвать метод countDown в подписчике (включая onError и onComplete). Но это делает его менее отзывчивым, не так ли? (Что, если вы забыли вызвать метод countDown и произошла ошибка?)
Второй способ решить эту проблему — перейти в неотзывчивый режим с некоторыми действиями. toItetable и toStream генерируют блокирующие экземпляры. Мы используем toStream в нашем примере:
@Test
public void blocks() {
Flux<String> helloPauseWorld =
Mono.just("Hello")
.concatWith(Mono.just("world")
.delaySubscriptionMillis(500));
helloPauseWorld.toStream()
.forEach(System.out::println);
}
Как и следовало ожидать, после распечаткиHelloПосле этого идет короткая пауза, затем распечатывается "мир" и он выходит. Ранее мы также упоминали, что операция amb в RxJava была переименована в firstEmitting в Reactor (как следует из ее названия: выберите первый Flux для запуска). В приведенном ниже примере мы создадим Mono с задержкой 450 мс и Flux, запускающий события с интервалом в 400 мс. При их объединении с помощью firstEmitting() берется последний Flux, потому что первое значение Flux предшествует значению Mono:
@Test
public void firstEmitting() {
Mono<String> a = Mono.just("oops I'm late")
.delaySubscriptionMillis(450);
Flux<String> b = Flux.just("let's get", "the party", "started")
.delayMillis(400);
Flux.firstEmitting(a, b)
.toIterable()
.forEach(System.out::println);
}
Этот модульный тест печатает все части предложения с интервалом в 400 мс между ними.
В этот момент вы можете подумать, а что, если бы я написал тесты, которые использовали интервалы 4000 мс вместо 400 мс? Вы не хотите ждать 4 секунды в модульном тесте! Позже мы увидим, что Reactor предоставляет некоторые инструменты тестирования, которые могут очень хорошо решить эту проблему.
Теперь, когда мы сравнили некоторые распространенные операции Reactor на примере, давайте вернемся назад и посмотрим на другие аспекты платформы, которые отличаются.
На основе Java 8
Reactor выбрал Java 8 в качестве основы для работы, а не любую предыдущую версию, что снова совпадает с его целью по упрощению API: RxJava выбрала Java 6, и без пакета java.util.function в Java 6 RxJava не может воспользоваться преимуществами классы Functino и Consumer в этом пакете, поэтому он должен создать множество классов, таких как Func1, Func2, Action0, Action1. RxJava 2 отображает эти классы в java.util.function аналогично Reactor 2, поскольку он также должен поддерживать Java 7.
Reactor API также использует некоторые новые типы, представленные в Java 8. Поскольку большинство операций, основанных на времени, связаны с периодами времени (например, тайм-ауты, интервалы, задержки и т. д.), класс Duration в Java 8 используется напрямую.
Java 8 Stream API и CompletableFuture можно легко конвертировать в Flux/Mono и обратно. Итак, в общем, должны ли мы конвертировать Stream в Flux? неуверенный. Хотя накладные расходы на инкапсуляцию операций ввода-вывода и операций с памятью с помощью Flux или Mono незначительны, Stream сам по себе не вызывает больших задержек, поэтому напрямую использовать Stream API не проблема. В приведенном выше случае в RxJava 2 вам нужно использовать Observable, потому что Observable не поддерживает обратное давление, поэтому, как только вы подпишетесь на него, он станет источником push-уведомлений. Reactor основан на Java 8, поэтому в большинстве случаев достаточно Stream API. Обратите внимание, что хотя фабричные шаблоны Flux и Mono также поддерживают простые типы, их основная цель — включение объектов в потоки более высокого уровня. В общем, при применении реактивных шаблонов к существующему коду вы не хотите преобразовывать такие методы, как "long getCount()" в "Mono getCount()".
Об обратном давлении
Противодавление является одной из основных (если вообще существует) проблем спецификации RS и Reactor. Принцип обратного давления заключается в том, что в сценарии нажима производитель производит быстрее, чем потребитель потребляет, и потребитель сигнализирует производителю: «Эй, помедленнее, я не могу с этим справиться». скорость, с которой генерируются данные, вместо того, чтобы отбрасывать данные или продолжать генерировать данные с риском каскадных ошибок.
Вы можете задаться вопросом, зачем противодавление необходимо и в Mono: какой потребитель будет подавлен одним событием триггера? Ответ: «Такого потребителя быть не должно». Однако между тем, как работают Mono и CompletableFuture, все же есть одно ключевое различие. Последнее только подталкивает: если вы держите ссылку на Future, то уже выполняется асинхронная задача. С другой стороны, Flux или Mono с обратным давлением инициируют отложенную итерацию pull-push:
- Задержка связана с тем, что ничего не происходит, пока не будет вызван метод subscribe().
- Вытягивание связано с тем, что при подписке и выполнении запроса подписчик сигнализирует восходящему потоку о готовности получить следующий блок данных.
- Затем производитель отправляет данные потребителю, что находится в рамках запроса потребителя.
Для Mono метод subscribe() похож на кнопку, нажатие на которую означает, что я готов получать данные. У Flux есть аналогичная кнопка, но это метод request(n), который представляет собой обобщенное использование subscribe().
Моно как издатель, который часто представляет собой ресурсоемкую задачу (с точки зрения ввода-вывода, задержки и т. д.), является ключом к пониманию обратного давления: если вы не подписываетесь на него, вы ничего за него не платите. Поскольку Mono часто объединяется в реактивную цепочку вместе с Flux с обратным давлением, результаты из нескольких асинхронных источников данных потенциально могут быть объединены, и эта возможность запуска по запросу является ключом к предотвращению блокировки.
Мы можем использовать обратное давление, чтобы различать различные сценарии использования Mono.По сравнению с приведенными выше примерами, у Mono есть еще один распространенный сценарий использования: асинхронное агрегирование данных Flux в Mono. reduce и hasElement могут потреблять каждый элемент в Flux, а затем агрегировать данные в той или иной форме (соответственно, результат вызова функции reduce и логическое значение) и выставлять данные как Mono. В этом случае противодавите восходящему потоку с помощью Long.MAX_VALUE, и восходящий поток будет работать как полный толчок.
Еще одна интересная тема, связанная с обратным давлением, — это то, как оно ограничивает количество объектов потока, хранящихся в памяти. Как издатель, очень вероятно, что источник данных медленно генерирует данные, а запросы от подчиненных превышают доступные элементы данных. В этом случае весь поток естественным образом переходит в режим push, когда потребители уведомляются о поступлении новых данных. Когда наступают пики добычи или в случае увеличения производительности, весь поток возвращается в режим вытягивания. В приведенных выше двух случаях в памяти будет храниться не более N элементов данных (количество данных, запрашиваемых функцией request()).
Вы можете сделать более точный расчет использования памяти, объединив N элементов данных с памятью W, которую должен потреблять каждый элемент: таким образом вы можете рассчитать объем памяти, который будет потреблять не более W*N. Фактически, в большинстве случаев Reactor оптимизируется на основе N: создайте внутреннюю очередь по мере необходимости и примените стратегию предварительной выборки, которая каждый раз автоматически запрашивает 75% данных.
Операции Reactor иногда меняют сигналы противодавления в зависимости от семантики, которую они представляют, и ожиданий вызывающего. Например, для операции buffer(10): нисходящий поток запрашивает N элементов данных, и эта операция запросит 10N данных у восходящего потока, чтобы буфер мог быть заполнен и подписчикам было предоставлено достаточно данных. Это часто называют «активным противодавлением», и разработчики могут воспользоваться этой функцией, например, в сценариях микропакетов, явно указав Reactor, как переключаться с источника ввода на место назначения вывода.
Отношения с Весной
Reactor является основой всей экосистемы Spring, в частности Spring 5 (через Spring Web Reactive) и Spring Data "kay" (соответствует spring-data-commons 2.0).
Реактивные версии этих двух проектов очень полезны, поэтому мы можем разрабатывать полностью реактивные веб-приложения: асинхронно обрабатывать запрос на всем пути к базе данных и, наконец, асинхронно возвращать результат. Поэтому приложения Spring могут использовать ресурсы более эффективно, избегая выделения отдельного потока для каждого запроса и ожидания блокировки ввода-вывода.
Reactor будет использоваться для внутренних компонентов реактивного ядра будущих приложений Spring и API-интерфейсов, предоставляемых этими компонентами Spring. В целом, они могут работать с RS Publisher, но большую часть времени им приходится иметь дело с Flux/Mono и использовать богатые возможности Reactor. Конечно, вы также можете выбрать другие реактивные фреймворки самостоятельно.Reactor предоставляет интерфейсы-ловушки, которые можно использовать для адаптации к другим типам Reactor, типам RxJava и даже простым типам RS.
В настоящее время вы можете испытать Spring Web Reactive с Spring Boot 2.0.0.BUILD-SNAPSHOT и зависимостями spring-boot-starter-web-reactive (вы можете создать такой проект на start.spring.io):
<dependency>
<groupId>org.springframework.boot.experimental</groupId>
<artifactId>spring-boot-starter-web-reactive</artifactId>
</dependency>
Вы можете написать свой @Controller как обычно, но просто сделайте Spring MVC низкоуровневым реактивным и замените большинство контрактов Spring MVC реактивными неблокирующими. Реактивный слой работает на Tomcat 8.5 по умолчанию, но вы также можете использовать Undertow или Netty.
{% asset_img 1.jpg %}
Кроме того, хотя Spring API основан на типах Reactor, в модуле Spring Web Reactive для запросов и ответов доступны различные реактивные типы:
- Mono: как и @RequestBody, сущность запроса T будет десериализована асинхронно, а последующая обработка может быть связана с Mono. В качестве возвращаемого типа каждый раз, когда Mono выдает значение, T асинхронно сериализуется и отправляется обратно клиенту. Вы можете взять запрос Mono в качестве параметра и вернуть параметризованный процесс ассоциации в качестве результата Mono.
- Flux: используется в сценариях потоковой передачи (в качестве входного потока, используемого @RequestBody и Server Sent Events с возвращаемым типом Flux).
- Single/Observable: соответствует Mono и Flux соответственно, но переключится обратно на RxJava.
- Mono как возвращаемый тип: обработка запроса завершается, когда заканчивается Mono.
- Нереактивные возвращаемые типы (void и T): на данный момент ваш метод @Controller является синхронным, но он должен быть неблокирующим (переходная обработка). Обработка запроса завершается, когда завершается выполнение метода, а возвращенный T асинхронно сериализуется и отправляется обратно клиенту.
Вот пример использования Spring Web Reactive:
@Controller
public class ExampleController {
private final MyReactiveLibrary reactiveLibrary;
public ExampleController(@Autowired MyReactiveLibrary reactiveLibrary) {
this.reactiveLibrary = reactiveLibrary;
}
@RequestMapping("hello/{who}")
@ResponseBody
public Mono<String> hello(@PathVariable String who) {
return Mono.just(who)
.map(w -> "Hello " + w + "!");
}
@RequestMapping(value = "heyMister", method = RequestMethod.POST)
@ResponseBody
public Flux<String> hey(@RequestBody Mono<Sir> body) {
return Mono.just("Hey mister ")
.concatWith(body
.flatMap(sir -> Flux.fromArray(sir.getLastName().split("")))
.map(String::toUpperCase)
.take(1)
).concatWith(Mono.just(". how are you?"));
}
}
Первая конечная точка содержит переменную пути, которая преобразуется в Mono и сопоставляется с приветствием, возвращаемым клиенту.
Запрос GET к /hello/SImon получит текстовый ответ «Привет, Саймон!».
Вторая конечная точка немного сложнее: она асинхронно получает сериализованный объект Sir (класс со свойствами firstName и lastName) и использует метод flatMap для сопоставления его с потоком писем, содержащим все буквы lastName. Затем он берет первую букву в потоке, преобразует ее в верхний регистр и объединяет с приветствием.
Итак, отправьте объект JSON в /heyMister
{
"firstName": "Paul",
"lastName": "tEsT"
}
вернет строку «Привет, мистер Т. Как дела?».
Reactive Spring Data в настоящее время находится в разработке как часть выпуска Kay с кодом в ветке spring-data-commons 2.0.x. Доступен промежуточный релиз:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-releasetrain</artifactId>
<version>Kay-M1</version>
<scope>import</scope>
<type>pom</type>
</dependency>
</dependencies>
</dependencyManagement>
Затем просто добавьте зависимость Spring Data Commons (она автоматически получит номер версии из приведенной выше спецификации):
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-commons</artifactId>
</dependency>
Поддержка ответов Spring Data в основном в новом интерфейсе ReactiveCrudRepository, который расширяет репозиторий. Этот интерфейс предоставляет метод crUD, используя ввод и возвращаемое значение типа Reactor. Существует также версия RXJAVA 1, которая называется RxJava1crudRepository. Для получения сущности через ID в CRUDRepository можно вызвать метод "T Findone(ID)", а "Mono Findone(ID)" и "Observable Findone(ID)" вызываются отдельно в ReactiveCrudRepository и RXJava1crudRepository. Есть и другие варианты, которые получают MONO/SINGLE в качестве параметра, предоставляют Key асинхронно и объединяют результат на этой основе.
Предполагая реактивное внутреннее хранилище (или имитацию bean-компонента ReactiveCrudRepository), следующие контроллеры будут реактивными спереди назад:
@Controller
public class DataExampleController {
private final ReactiveCrudRepository<Sir, String> reactiveRepository;
public DataExampleController(
@Autowired ReactiveCrudRepository<Sir, String> repo) {
this.reactiveRepository = repo;
}
@RequestMapping("data/{who}")
@ResponseBody
public Mono<ResponseEntity<Sir>> hello(@PathVariable String who) {
return reactiveRepository.findOne(who)
.map(ResponseEntity::ok)
.defaultIfEmpty(ResponseEntity.status(404)
.body(null));
}
}
Обратите внимание на весь процесс: мы асинхронно извлекаем объект и сопоставляем его с ResponseEntity, получая Mono, который можно сразу же вернуть. Если репозиторий Spring Data не может найти данные для этого ключа, он вернет пустой Mono. Мы используем defaultIfEmpty для явного возврата 404.
Испытательный реактор
Тест RxJavaВ этой статье упоминается, как тестировать Observables. Как мы видим, RxJava предоставляет TestScheduler, который мы можем использовать с операциями RxJava, принимающими параметр Scheduler, и TestScheduler запустит виртуальные часы для этих операций. RxJava также предоставляет класс TestSubscriber, который можно использовать для ожидания завершения выполнения Observable, а также для проверки каждого события (значение onNext и его номер, срабатывание onError и т. д.). В RxJava 2 TestSubscriber является подписчиком RS, вы можете использовать его для тестирования Reactor Flux и Mono!
В Reactor две вышеупомянутые широко используемые функции объединены в класс StepVerifier. StepVerifier можно получить в модуле реактора-теста репозитория реактора-аддонов. При создании экземпляра Publisher вызовите метод StepVerifier.create, чтобы инициализировать StepVerifier. Если вы хотите использовать виртуальные часы, вы можете вызвать метод StepVerifier.withVirtualTime, который принимает поставщика в качестве параметра. Он разработан таким образом, потому что он сначала гарантирует создание объекта VirtualTimeScheduler и передачу его в качестве планировщика по умолчанию для устаревших операций. StepVerifier настроит Flux/Mono, созданный в Supplier, для преобразования операций, основанных на времени, в «операции с виртуальным временем». Затем вы можете написать все варианты использования, которые вы ожидаете: каким должен быть следующий элемент, если произойдет ошибка, должен ли он двигаться вперед во времени и т. д. Вы можете выполнять более сложные взаимодействия с этими значениями (как если бы использовали структуру утверждений), используя другие методы, такие как сопоставление событий с предикатами или использование событий onNext. Любая ошибка AssertionError, выброшенная где-либо, будет отражена в конечном результате. Наконец, проверьте свой вариант использования, вызвав verify() , который подписывается на предопределенный источник событий с помощью методов StepVerifier.create или StepVerifier.withVirtualTime.
Давайте рассмотрим несколько простых примеров, чтобы проиллюстрировать, как работает StepVerifier. Сначала добавьте зависимость в POM:
<dependency>
<groupId>io.projectreactor.addons</groupId>
<artifactId>reactor-test</artifactId>
<version>3.0.3.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.5.2</version>
<scope>test</scope>
</dependency>
Предположим, у вас есть класс с именем MyReactiveLibrary, и вы хотите протестировать поток, сгенерированный этим классом:
@Component
public class MyReactiveLibrary {
public Flux<String> alphabet5(char from) {
return Flux.range((int) from, 5)
.map(i -> "" + (char) i.intValue());
}
public Mono<String> withDelay(String value, int delaySeconds) {
return Mono.just(value)
.delaySubscription(Duration.ofSeconds(delaySeconds));
}
}
Первый метод вернет 5 букв после данной буквы. Второй метод возвращает Flux, который запускает заданное значение с заданным интервалом, где интервал измеряется в секундах. Первый тест состоит в том, чтобы убедиться, что вывод вызова Alphabet5 с x ограничен x, y, z. Использование StepVerifier выглядит так:
@Test
public void testAlphabet5LimitsToZ() {
MyReactiveLibrary library = new MyReactiveLibrary();
StepVerifier.create(library.alphabet5('x'))
.expectNext("x", "y", "z")
.expectComplete()
.verify();
}
Второй тест гарантирует, что каждое значение, возвращаемое Alphabet5, является алфавитом. Здесь мы используем структуру утверждений AssertJ:
@Test
public void testAlphabet5LastItemIsAlphabeticalChar() {
MyReactiveLibrary library = new MyReactiveLibrary();
StepVerifier.create(library.alphabet5('x'))
.consumeNextWith(c -> assertThat(c)
.as("first is alphabetic").matches("[a-z]"))
.consumeNextWith(c -> assertThat(c)
.as("second is alphabetic").matches("[a-z]"))
.consumeNextWith(c -> assertThat(c)
.as("third is alphabetic").matches("[a-z]"))
.consumeNextWith(c -> assertThat(c)
.as("fourth is alphabetic").matches("[a-z]"))
.expectComplete()
.verify();
}
В результате эти испытания провалились. Давайте проверим вывод StepVirifier, чтобы увидеть, сможем ли мы найти ошибку:
java.lang.AssertionError: expected: onComplete(); actual: onNext({)
java.lang.AssertionError: [fourth is alphabetic]
Expecting:
"{"
to match pattern:
"[a-z]"
Похоже, наш метод не останавливается на z и продолжает выдавать символы ASCII. Мы можем исправить эту ошибку, добавив .take(Math.min(5,'z'-from+1)), или передать Math.min в качестве второго параметра диапазона.
Последний тест, который нам нужно сделать, — это использовать виртуальные часы: мы используем конструктор withVirtualTime для проверки задержки метода, фактически не ожидая заданного времени:
@Test
public void testWithDelay() {
MyReactiveLibrary library = new MyReactiveLibrary();
Duration testDuration =
StepVerifier.withVirtualTime(() -> library.withDelay("foo", 30))
.expectSubscription()
.thenAwait(Duration.ofSeconds(10))
.expectNoEvent(Duration.ofSeconds(10))
.thenAwait(Duration.ofSeconds(10))
.expectNext("foo")
.expectComplete()
.verify();
System.out.println(testDuration.toMillis() + "ms");
}
Этот тестовый пример проверяет FLUX, который будет задержан на 30 секунд: не произойдет в течение 30 секунд после подписки, а затем завершится событие OnNext ("foo").
System.out распечатает, сколько времени потребовалось для проверки, в недавнем тесте это заняло 8 мс.
Методы thenAwait и expectNoEvent по-прежнему можно использовать, если вызывается метод create конструктора, но они будут заблокированы на указанное время.
пользовательский динамический источник
существуетРазрешение экземпляра RxJavaДинамические и статические Observable, упомянутые в статье, также применимы к Reactor.
Если вы хотите создать собственный Flux, вам нужно использовать Reactor FluxSink. Этот класс позаботится обо всем асинхронном для вас, вам просто нужно сосредоточиться на запуске событий.
FluxSink, полученный в результате обратного вызова с помощью Flux.create, можно использовать для последующих событий запуска. Этот пользовательский Flux является статическим, чтобы сделать его динамическим, используйте методы publish() и connect(). Основываясь на примере из предыдущей статьи, мы можем почти дословно перевести его в версию Reactor:
SomeFeed<PriceTick> feed = new SomeFeed<>();
Flux<PriceTick> flux =
Flux.create(emitter ->
{
SomeListener listener = new SomeListener() {
@Override
public void priceTick(PriceTick event) {
emitter.next(event);
if (event.isLast()) {
emitter.complete();
}
}
@Override
public void error(Throwable e) {
emitter.error(e);
}};
feed.register(listener);
}, FluxSink.OverflowStrategy.BUFFER);
ConnectableFlux<PriceTick> hot = flux.publish();
Перед подключением к динамическому Flux можно сделать две подписки: одна подписка будет печатать детали каждого тика, а другая подписка будет распечатывать инструмент:
hot.subscribe(priceTick -> System.out.printf("%s %4s %6.2f%n", priceTick
.getDate(), priceTick.getInstrument(), priceTick.getPrice()));
hot.subscribe(priceTick -> System.out.println(priceTick.getInstrument()));
Затем мы подключаемся к динамическому Flux и позволяем ему работать в течение 5 секунд, прежде чем программа завершится:
hot.connect();
Thread.sleep(5000);
(Обратите внимание, что если изменится метод isLast() в PriceTick, сама лента также прекратится).
FluxSink использует isCancelled(), чтобы проверить, была ли отменена подписка нижестоящего уровня. Вы также можете получить количество запросов с помощью requiredFromDownstream() , что полезно при использовании стратегии обратного давления. Наконец, вы можете освободить все используемые ресурсы с помощью метода setCancellation.
Обратите внимание, что FluxSink использует противодавление, поэтому вы должны предоставить OverflowStrategy для явной обработки противодавления. Это эквивалентно использованию операций onBackpressureXXX (например, FluxSink.OverflowStrategy.BUFFER эквивалентно .onBackpressureBuffer() ), которые переопределяют сигналы противодавления от нисходящего потока.
в заключении
В этой статье мы узнали о Reactor, реактивной среде четвертого поколения, работающей на Java 8 и основанной на спецификации Rx и спецификации Reactive Streams. Мы покажем, как концепции дизайна RxJava можно применить к Reactor, хотя между ними есть некоторые различия в дизайне API. Мы также показали, как Reactor стал основой Spring 5, и предоставили некоторые ресурсы, связанные с тестированием Publisher, Flux и Mono.
Добро пожаловать в мой личный блог
Обратите внимание на общедоступный номер:ЯВА 9:30 класс, вот группа отличных программистов, присоединяйтесь к нам, чтобы обсудить технологии и вместе добиться прогресса! Ответьте на «Информация», чтобы получить самую свежую информацию об индустрии 2T!