предисловие
иметь дело сВ среде с высокой степенью параллелизмасерверного программирования Microsoft предлагает реализациюАсинхронное программированиеСхема -Reactive Programming
,Китайское имяреактивное программирование. Впоследствии этому примеру быстро последовали и другие технологии, такие какES6
пройти черезPromise
Был введен аналогичный способ асинхронного программирования.Java
Общество тоже не отстаетNetflix
иTypeSafe
Компания предоставляетRxJava
иAkka Stream
технология, пустьJava
Платформы также имеют фреймворки, которые позволяют использовать реактивное программирование.
текст
функциональное программирование
Функциональное программирование — это способ программирования, который рассматривает вычисления компьютера как вычисление функции. Наиболее важными основами языков функционального программирования являютсялямбда-исчисление (lambda calculus)
, а функция λ-исчисления может принимать функцию в видеВходные параметры)ивывод (возвращаемое значение).lambda
Пары выражений уже знакомы большинству программистов.jdk8
а такжеes6
импортируютсяlambda
.
Особенности функционального программирования
- ленивое вычисление
- Функции - это "граждане первого сорта"
- Используйте только выражения, а не операторы
- никаких побочных эффектов
реактивное программирование
реактивное программирование(reactive programming)
основан напоток данных (data stream)
иизменить перевод (propagation of change)
издекларативный (declarative)
парадигма программирования.
Особенности реактивного программирования
1. Событийный
вуправляемый событиямиприложений взаимодействие между компонентами осуществляется через слабую связьрежиссер (production)
ипотребитель (consumption)
быть реализованным. Эти событияасинхронныйинеблокирующийотправлено и получено.
управляемый событиямисистема опирается натолкающий режимвместорежим вытягиванияилиголосование,СейчасрежиссерОн отправляет данные, когда есть сообщениепотребитель, не расходуя ресурсы: пустьпотребительнепрерывноголосованиеилиожидание данных.
2. Ответ в режиме реального времени
После того, как программа начнет выполнение, она должнабыстрыйвернуться в хранилищеконтекст результата, передать конкретную реализациюфоновая нить. После завершения отложенной обработки асинхронноистинное возвращаемое значениеупаковано здеськонтекствместоблокироватьвыполнение программы. Реагирование в режиме реального времени черезасинхронныйПрограммирование, например: после инициации звонков аналогично быстрому возвратуjava8
серединаCompletableFuture
объект.
3. Механизм эластичности
управляемый событиямислабо связанныйПредоставляет компоненты, которые могут быть обнаружены при сбоеполностью изолированконтекстуальный сценарий, какинкапсуляция сообщения, отправляемые нижестоящим компонентам. могут быть запрограммированы в конкретныхпроверить на ошибки, такие как: получена ли она, является ли полученная команда исполняемой и т. д., и решить, как реагировать.
Введение в реактор
Reactor
рамкаPivotal
на основеReactive Programming
идея реализована. это соответствуетReactive Streams
Спецификация (Reactive Streams
ОтNetflix
,TypeSafe
,Pivotal
и другие компании) технологии. его имя имеетреакторсмысл, отражающий мощныйпредставление.
1. Reactive Programming
Reactive Programming
, Китайское имяреактивное программирование.Reactive Programming
этонеблокирующий,поток данных, управляемый событиямипрограмма развития, использующаяфункциональное программированиеКонцепция работы с потоком данных, данные в одной части системы будут автоматически обновлять другие части после изменения данных, а стоимость чрезвычайно низкая.
Впервые он был предложен Microsoft и внедрен в платформу .NET, а ES6 также представила аналогичные технологии. На платформе Java одним из первых пользователей технологии реактивного программирования является платформа RxJava с открытым исходным кодом от Netflix. Hystrix разработан на основе RxJava.
Реактивное программирование на самом деле не таинственно, благодаря знакомомушаблон итератораПо сравнению вы можете понять его основную идею:
событие | Iterable (pull) | Observable (push) |
---|---|---|
получить данные | T next() | onNext(T) |
найти исключение | throws Exception | onError(Exception) |
обработка завершена | hasNext() | onCompleted() |
в таблице вышеObservable
Этот столбец представляетреактивное программированиеизAPI
способ использования. это на самом делеШаблон наблюдателярасширение .
еслишаблон итераторарассматривается какрежим вытягивания,ТотШаблон наблюдателяявляетсятолкающий режим.
-
Подписан
(Publisher)
Активно отправлять данные вподписчик(Subscriber)
,курокonNext()
метод. Два других метода запускаются при исключении и завершении. -
Подписан
(Publisher)
Если возникает исключение, инициироватьподписчик(Subscriber)
изonError()
метод для перехвата исключений. -
Подписан
(Publisher)
Срабатывает один раз при каждом нажатииonNext()
метод. Когда все толчки завершены без исключения,onCompleted()
метод будетв концеЗапустить один раз.
еслиPublisher
Публикация новостей слишком быстро, болееSubscriber
скорость обработки, что делать? ЭтоBackpressure
происхождение.Reactive Programming
Фреймворк должен обеспечиватьМеханизм противодавления, так чтоSubscriber
в состоянии контролироватьИспользование сообщенийскорость.
2. Reactive Streams
существуетJava
на платформе,Netflix
(развитыйRxJava
),TypeSafe
(развитыйScala
,Akka
),Pivatol
(развитыйSpring
,Reactor
) вместе разрабатывают термин, называемыйReactive Streams
Проект (спецификация), используемый для разработки спецификаций и интерфейсов, связанных с реактивным программированием.
Reactive Streams
Он состоит из следующих компонентов:
- диктор: опубликовать элемент для подписчиков
- подписчик: потребительский элемент
- подписка: В издателе при создании подписки она будет доступна подписчику.
- процессор: Обработка данных между издателями и подписчиками.
Его основными интерфейсами являются эти три:
- Publisher
- Subscriber
- Subcription
в,Subcriber
который включает в себя приведенную выше таблицуonNext
,onError
,onCompleted
эти три метода. заReactive Streams
, просто нужно понять его идеи, в том числе основные идеи иBackpressure
Просто дождитесь мысли.
3. Основные модули Reactor
Reactor
Фреймворк в основном состоит из двух основных модулей:
- reactor-core
- reactor-ipc
Первый в основном отвечает заReactive Programming
Связанныйосновной API
реализации, последний отвечает заВысокопроизводительная сетевая связьреализация в настоящее время основана наNetty
осуществленный.
4. Классы активной зоны реактора
существуетReactor
Существует не так много часто используемых классов, в основном следующие два:
- Mono
Mono
Достигнутоorg.reactivestreams.Publisher
интерфейс, представляющий0
прибыть1
элементдиктор.
- Flux
Flux
также достигнутоorg.reactivestreams.Publisher
интерфейс, представляющий0
прибытьN
Издатель элемента.
- Scheduler
Представляет планировщик, который управляет реактивными потоками за кулисами, обычно реализуемый различными пулами потоков.
5. WebFlux
Spring 5
введено на основеNetty
вместоServlet
высокой производительностиWeb
Рамка -Spring WebFlux
, но не используется так же, как традиционныйServlet
изSpring MVC
Какая большая разница.
WebFlux
серединаMVC
Пример интерфейса:
@RequestMapping("/webflux")
@RestController
public class WebFluxTestController {
@GetMapping("/mono")
public Mono<Foobar> foobar() {
return Mono.just(new Foobar());
}
}
Самое большое изменение заключается в том, что возвращаемое значение изменяется сFoobar
Представленный объект становитсяMono<Foobar>
илиFlux<Foobar>
.
6. Реактивные потоки, Reactor и WebFlux
описано вышереактивное программированиенекоторые концепции. Возможно, читатели увидят здесь небольшой беспорядок, разберутся в отношениях между тремя:
-
Reactive Streams
это реактивное программированиестандартныйиСпецификация; -
Reactor
основан наReactive Streams
наборсреда реактивного программирования; -
WebFlux
отReactor
на основе реализацииWeb
полесреда реактивного программирования.
На самом деле, бизнес-разработчики при написании реактивного кода обычно касаются толькоPublisher
Этот интерфейс соответствуетReactor
являетсяMono
иFlux
.
заSubscriber
иSubcription
Эти два интерфейса,Reactor
Есть и соответствующие реализации. все этоSpring WebFlux
иSpring Data Reactive
используется такой каркас. еслиНе разрабатывать промежуточное ПО, разработчики не будут подвергаться воздействию.
Начало работы с Reactor
СледующийReactor
серединаMono
иFlux
Использование основных методов в этих двух классах.
в видеJava 8
представилStream
Такой же,Reactor
Способ использования в основном разделен на три шага:
- Создание начальной стадии
- Промежуточный этап обработки
- конечное потребление
Просто создание и потребление можно было бы сделать с помощью чего-то вродеSpring 5
Это делается фреймворком (например,WebFlux
серединаWebClient
перечислитьHTTP
интерфейс, возвращаемое значениеMono
). Но нам все еще нужно базовое понимание того, как развиваются эти этапы.
1. Создайте Mono и Flux (начальный этап)
использоватьReactor
Начало программирования должно заключаться в созданииMono
илиFlux
. Иногда нам не нужно создавать свои собственные, а реализовать напримерWebFlux
серединаWebClient
илиSpring Data Reactive
получить одинMono
илиFlux
.
- Используйте WebFlux WebClient для вызова интерфейса HTTP
WebClient webClient = WebClient.create("http://localhost:8080");
public Mono<User> findById(Long userId) {
return webClient
.get()
.uri("/users/" + userId)
.accept(MediaType.APPLICATION_JSON)
.exchange()
.flatMap(cr -> cr.bodyToMono(User.class));
}
- Запросить пользователя с помощью ReactiveMongoRepository
public interface UserRepository extends ReactiveMongoRepository<User, Long> {
Mono<User> findByUsername(String username);
}
Но иногда нам также необходимо активно создаватьMono
илиFlux
.
распространенный способ создания
Mono<String> helloWorld = Mono.just("Hello World");
Flux<String> fewWords = Flux.just("Hello", "World");
Flux<String> manyWords = Flux.fromIterable(words);
Когда используется этот метод создания? Обычно используется после серииТип без ввода-выводаПосле операции получается объект. Далее мы будем использовать этот объект на основеReactor
провестивысокая производительностьизIO
При работе ранее полученный объект может быть преобразован таким образом вMono
илиFlux
.
Как создается искусство
Вышеупомянутое черезСинхронный вызовРезультат создается изMono
илиFlux
, но иногда необходимо начать сНет Reactive
изАсинхронный вызовРезультат созданMono
илиFlux
.
если этоасинхронный методвернутьCompletableFuture
, что может быть основано на этомCompletableFuture
СоздаватьMono
:
Mono.fromFuture(completableFuture);
если этоАсинхронный вызовне вернетсяCompletableFuture
, есть свояметод обратного вызова, как создатьMono
Шерстяная ткань? можно использоватьstatic <T> Mono<T> create(Consumer<MonoSink<T>> callback)
метод:
Mono.create(sink -> {
ListenableFuture<ResponseEntity<String>> entity = asyncRestTemplate.getForEntity(url, String.class);
entity.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
@Override
public void onSuccess(ResponseEntity<String> result) {
sink.success(result.getBody());
}
@Override
public void onFailure(Throwable ex) {
sink.error(ex);
}
});
});
в настоящее время используетWebFlux
после,AsyncRestTemplate
Это устарело, это просто для демонстрации.
2. Работа с Mono и Flux (промежуточный этап)
промежуточный этапMono
иFlux
методы в основномfilter
,map
,flatMap
,then
,zip
,reduce
Ждать. Как пользоваться этими методами иStream
Метод аналогичен.
Вот несколькоReactor
Чтобы разработать практические задачи проекта, помогите понять сценарии использования этих методов:
Вопрос 1: Когда использовать карту, flatMap и затем
В этом параграфе будут задействованы следующие классы и методы:
-
метод:
Mono.map()
-
метод:
Mono.flatMap()
-
метод:
Mono.then()
-
своего рода:
Function
существуетMono
иFlux
средний срок, есть три несколько похожих метода:map()
,flatMap()
иthen()
. Эти три метода используются очень часто.
- традиционное императивное программирование
Object result1 = doStep1(params);
Object result2 = doStep2(result1);
Object result3 = doStep3(result2);
- Соответствующее реактивное программирование
Mono.just(params)
.flatMap(v -> doStep1(v))
.flatMap(v -> doStep2(v))
.flatMap(v -> doStep3(v));
Это видно из сравнения двух приведенных выше кодов.flatMap()
Роль метода в этом,map()
иthen()
Методы работают аналогично. Но в чем разница между этими методами? Давайте сначала посмотрим на сигнатуры этих трех методов (начиная сMono
пример):
- flatMap(Function<? super T, ? extends Mono<? extends R>> transformer)
- map(Function<? super T, ? extends R> mapper)
- then(Mono other)
then()
then()
Кажется, это означает следующий шаг, но это означает только следующий шаг в последовательности выполнения и не означает, что следующий шаг зависит от предыдущего шага.then()
Параметр метода - это простоMono
, результат выполнения предыдущего шага не может быть принят. иflatMap()
иmap()
ПараметрыFunction
, входной параметр — результат выполнения предыдущего шага.
плоская карта () и карта ()
flatMap()
иmap()
Разница в том, чтоflatMap()
входные параметры вFunction
Требование возвращаемого значенияMono
объект иmap
ВходFunction
Просто попросите вернутьнормальный объект. Часто необходимо звонить в бизнес-процессыWebClient
илиReactiveXxxRepository
методы ввозвращаемое значениеобаMono
(илиFlux
). Итак, чтобы объединить эти вызовы в одно целоецепной вызов, вы должны использоватьflatMap()
, вместоmap()
.
Вопрос 2: Как реализовать параллельное выполнение
В этом параграфе будут задействованы следующие классы и методы:
-
метод:
Mono.zip()
-
своего рода:
Tuple2
-
своего рода:
BiFunction
параллельное выполнениеявляется общим требованием.Reactive Programming
ХотяАсинхронное программированиепуть, ноасинхронныйэто не значит, чтоПараллелизм и параллелизмиз.
существуеттрадиционное императивное программированиесередина,параллельное выполнениечерезПул потоковдобавлятьFuture
способ реализован.
Future<Result1> result1Future = threadPoolExecutor.submit(() -> doStep1(params));
Future<Result2> result2Future = threadPoolExecutor.submit(() -> doStep2(params));
// Retrive result
Result1 result1 = result1Future.get();
Result2 result2 = result2Future.get();
// Do merge;
return mergeResult;
Хотя приведенный выше код реализуетАсинхронный вызов,ноFuture.get()
путьблокироватьиз. в настоящее время используетReactor
разработан спараллелизмсцена казниреактивный код, вышеуказанный метод не может быть использован.
следует использоватьMono
иFlux
серединаzip()
способMono
Например, код выглядит следующим образом:
Mono<CustomType1> item1Mono = ...;
Mono<CustomType2> item2Mono = ...;
Mono.zip(items -> {
CustomType1 item1 = CustomType1.class.cast(items[0]);
CustomType2 item2 = CustomType2.class.cast(items[1]);
// Do merge
return mergeResult;
}, item1Mono, item2Mono);
В приведенном выше коде он производитitem1Mono
иitem2Mono
Процесспараллельноиз. Например, вызовHTTP
интерфейс при выполнениизапрос к базе данныхработать. Это ускорит выполнение программы.
Но есть проблема с приведенным выше кодом, т.е.zip()
надо сделать методпринуждение. в то время как актерский составнебезопасныйиз. к счастьюzip()
метод существуетмножественные перегрузкиформа. Помимо самой основной формы, существует множествобезопасность типаформа:
static <T1, T2> Mono<Tuple2<T1, T2>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2);
static <T1, T2, O> Mono<O> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, BiFunction<? super T1, ? super T2, ? extends O> combinator);
static <T1, T2, T3> Mono<Tuple3<T1, T2, T3>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3);
не более чем7
Операция слияния каждого элемента имеетбезопасность типаизzip()
метод является необязательным. Возьмите комбинацию двух элементов в качестве примера, чтобы представить метод использования:
Mono.zip(item1Mono, item2Mono).map(tuple -> {
CustomType1 item1 = tuple.getT1();
CustomType2 item2 = tuple.getT2();
// Do merge
return mergeResult;
});
В приведенном выше кодеmap()
Параметр методаTuple2
, представляющийдвоичный массив, и соответственноTuple3
,Tuple4
Ждать.
Для одновременного выполнения двух элементов также можно передатьzip(Mono<? extends T1> p1, Mono<? extends T2> p2, BiFunction<? super T1, ? super T2, ? extends O> combinator)
метод напрямую объединяет результаты. метод заключается в том, чтобы пройтиBiFunction
выполнитьАлгоритм слияния.
Проблема 3: Агрегация после цикла сбора
В этом параграфе будут задействованы следующие классы и методы:
-
метод:
Flux.fromIterable()
-
метод:
Flux.reduce()
-
своего рода:
BiFunction
Другой немного более сложный сценарий для объекта, тип которого является коллекцией (List
,Set
), а затем обработать исходный объект. использоватьшаблон итератораКод легко написать:
List<SubData> subDataList = data.getSubDataList();
for (SubData item : subDataList) {
// Do something on data and item
}
// Do something on data
когда мы используемReactive
Когда код стиля реализует описанную выше логику, все не так просто. будет использоваться здесьFlux
изreduce()
метод.reduce()
Сигнатура метода следующая:
<A> Mono<A> reduce(A initial, BiFunction<A, ? super T, A> accumulator);
Как можно видеть,reduce()
Функция метода заключается в преобразованииFlux
полимеризацияв одинMono
.
-
первый параметр: возвращаемое значение
Mono
средний элементПервоначальный значение. -
второй параметр: Является
BiFunction
, реализоватьАгрегатная операциялогика. Для общих параметров<A, ? super T, A>
середина:- Первый
A
: означает каждый разАгрегатная операцияПослетип результата, действует какBiFunction.apply()
методпервая запись; - секунда
? super T
: указывает тип каждого элемента в коллекции, который действует какBiFunction.apply()
методвторая запись; - Третий
A
: представляет агрегатную операциюрезультат, действует какBiFunction.apply()
методвозвращаемое значение.
- Первый
Давайте рассмотрим пример:
Data initData = ...;
List<SubData> list = ...;
Flux.fromIterable(list)
.reduce(initData, (data, itemInList) -> {
// Do something on data and itemInList
return data;
});
В приведенном выше примере кодаinitData
иdata
того же типа. После выполнения приведенного выше кодаreduce()
метод вернетMono<Data>
.
3. Потребляйте Mono и Flux (конечная стадия)
прямое потреблениеMono
илиFlux
Способ - позвонитьsubscribe()
метод. если вWebFlux
Разработка в интерфейсе, возврат напрямуюMono
или Флюс.WebFlux
Фреймворк завершит финалResponse
Выход работает.
резюме
В этой статье представлены некоторые концепции реактивного программирования иSpring Reactor
Основное использование фреймворка также знакомит с тем, как использоватьReactor
Решите несколько более сложных задач.Reactor
существуетSpring 5
Применений много, и я поделюсь с вами позже.Spring Reactor
Блог боевой серии.
Добро пожаловать в технический публичный аккаунт: Zero One Technology Stack
Эта учетная запись будет продолжать делиться сухими товарами серверных технологий, включая основы виртуальных машин, многопоточное программирование, высокопроизводительные фреймворки, асинхронное ПО, промежуточное ПО для кэширования и обмена сообщениями, распределенные и микросервисы, материалы для обучения архитектуре и расширенные учебные материалы и статьи.