Разговор о реактивном программировании Spring Reactor

задняя часть Spring Flux функциональное программирование

предисловие

иметь дело сВ среде с высокой степенью параллелизмасерверного программирования Microsoft предлагает реализациюАсинхронное программированиеСхема -Reactive Programming,Китайское имяреактивное программирование. Впоследствии этому примеру быстро последовали и другие технологии, такие какES6пройти черезPromiseБыл введен аналогичный способ асинхронного программирования.JavaОбщество тоже не отстаетNetflixиTypeSafeКомпания предоставляетRxJavaиAkka Streamтехнология, пустьJavaПлатформы также имеют фреймворки, которые позволяют использовать реактивное программирование.

текст

функциональное программирование

Функциональное программирование — это способ программирования, который рассматривает вычисления компьютера как вычисление функции. Наиболее важными основами языков функционального программирования являютсялямбда-исчисление (lambda calculus), а функция λ-исчисления может принимать функцию в видеВходные параметры)ивывод (возвращаемое значение).lambdaПары выражений уже знакомы большинству программистов.jdk8а такжеes6импортируютсяlambda.

Особенности функционального программирования

  • ленивое вычисление
  • Функции - это "граждане первого сорта"
  • Используйте только выражения, а не операторы
  • никаких побочных эффектов

реактивное программирование

реактивное программирование(reactive programming)основан напоток данных (data stream)иизменить перевод (propagation of change)издекларативный (declarative)парадигма программирования.

Особенности реактивного программирования

1. Событийный

вуправляемый событиямиприложений взаимодействие между компонентами осуществляется через слабую связьрежиссер (production)ипотребитель (consumption)быть реализованным. Эти событияасинхронныйинеблокирующийотправлено и получено.

управляемый событиямисистема опирается натолкающий режимвместорежим вытягиванияилиголосование,СейчасрежиссерОн отправляет данные, когда есть сообщениепотребитель, не расходуя ресурсы: пустьпотребительнепрерывноголосованиеилиожидание данных.

2. Ответ в режиме реального времени

После того, как программа начнет выполнение, она должнабыстрыйвернуться в хранилищеконтекст результата, передать конкретную реализациюфоновая нить. После завершения отложенной обработки асинхронноистинное возвращаемое значениеупаковано здеськонтекствместоблокироватьвыполнение программы. Реагирование в режиме реального времени черезасинхронныйПрограммирование, например: после инициации звонков аналогично быстрому возвратуjava8серединаCompletableFutureобъект.

3. Механизм эластичности

управляемый событиямислабо связанныйПредоставляет компоненты, которые могут быть обнаружены при сбоеполностью изолированконтекстуальный сценарий, какинкапсуляция сообщения, отправляемые нижестоящим компонентам. могут быть запрограммированы в конкретныхпроверить на ошибки, такие как: получена ли она, является ли полученная команда исполняемой и т. д., и решить, как реагировать.

Введение в реактор

ReactorрамкаPivotalна основеReactive Programmingидея реализована. это соответствуетReactive StreamsСпецификация (Reactive StreamsОтNetflix,TypeSafe,Pivotalи другие компании) технологии. его имя имеетреакторсмысл, отражающий мощныйпредставление.

1. Reactive Programming

Reactive Programming, Китайское имяреактивное программирование.Reactive Programmingэтонеблокирующий,поток данных, управляемый событиямипрограмма развития, использующаяфункциональное программированиеКонцепция работы с потоком данных, данные в одной части системы будут автоматически обновлять другие части после изменения данных, а стоимость чрезвычайно низкая.

Впервые он был предложен Microsoft и внедрен в платформу .NET, а ES6 также представила аналогичные технологии. На платформе Java одним из первых пользователей технологии реактивного программирования является платформа RxJava с открытым исходным кодом от Netflix. Hystrix разработан на основе RxJava.

Реактивное программирование на самом деле не таинственно, благодаря знакомомушаблон итератораПо сравнению вы можете понять его основную идею:

событие Iterable (pull) Observable (push)
получить данные T next() onNext(T)
найти исключение throws Exception onError(Exception)
обработка завершена hasNext() onCompleted()

в таблице вышеObservableЭтот столбец представляетреактивное программированиеизAPIспособ использования. это на самом делеШаблон наблюдателярасширение .

еслишаблон итераторарассматривается какрежим вытягивания,ТотШаблон наблюдателяявляетсятолкающий режим.

  1. Подписан (Publisher)Активно отправлять данные вподписчик (Subscriber),курокonNext()метод. Два других метода запускаются при исключении и завершении.

  2. Подписан (Publisher)Если возникает исключение, инициироватьподписчик (Subscriber)изonError()метод для перехвата исключений.

  3. Подписан (Publisher)Срабатывает один раз при каждом нажатииonNext()метод. Когда все толчки завершены без исключения,onCompleted()метод будетв концеЗапустить один раз.

еслиPublisherПубликация новостей слишком быстро, болееSubscriberскорость обработки, что делать? ЭтоBackpressureпроисхождение.Reactive ProgrammingФреймворк должен обеспечиватьМеханизм противодавления, так чтоSubscriberв состоянии контролироватьИспользование сообщенийскорость.

2. Reactive Streams

существуетJavaна платформе,Netflix(развитыйRxJava),TypeSafe(развитыйScala,Akka),Pivatol(развитыйSpring,Reactor) вместе разрабатывают термин, называемыйReactive StreamsПроект (спецификация), используемый для разработки спецификаций и интерфейсов, связанных с реактивным программированием.

Reactive StreamsОн состоит из следующих компонентов:

  • диктор: опубликовать элемент для подписчиков
  • подписчик: потребительский элемент
  • подписка: В издателе при создании подписки она будет доступна подписчику.
  • процессор: Обработка данных между издателями и подписчиками.

Его основными интерфейсами являются эти три:

  • Publisher
  • Subscriber
  • Subcription

в,Subcriberкоторый включает в себя приведенную выше таблицуonNext,onError,onCompletedэти три метода. заReactive Streams, просто нужно понять его идеи, в том числе основные идеи иBackpressureПросто дождитесь мысли.

3. Основные модули Reactor

ReactorФреймворк в основном состоит из двух основных модулей:

  • reactor-core
  • reactor-ipc

Первый в основном отвечает заReactive ProgrammingСвязанныйосновной APIреализации, последний отвечает заВысокопроизводительная сетевая связьреализация в настоящее время основана наNettyосуществленный.

4. Классы активной зоны реактора

существуетReactorСуществует не так много часто используемых классов, в основном следующие два:

  • Mono

MonoДостигнутоorg.reactivestreams.Publisherинтерфейс, представляющий0прибыть1элементдиктор.

  • Flux

Fluxтакже достигнутоorg.reactivestreams.Publisherинтерфейс, представляющий0прибытьNИздатель элемента.

  • Scheduler

Представляет планировщик, который управляет реактивными потоками за кулисами, обычно реализуемый различными пулами потоков.

5. WebFlux

Spring 5введено на основеNettyвместоServletвысокой производительностиWebРамка -Spring WebFlux, но не используется так же, как традиционныйServletизSpring MVCКакая большая разница.

WebFluxсерединаMVCПример интерфейса:

@RequestMapping("/webflux")
@RestController
public class WebFluxTestController {
    @GetMapping("/mono")
    public Mono<Foobar> foobar() {
        return Mono.just(new Foobar());
    }
}

Самое большое изменение заключается в том, что возвращаемое значение изменяется сFoobarПредставленный объект становитсяMono<Foobar>илиFlux<Foobar>.

6. Реактивные потоки, Reactor и WebFlux

описано вышереактивное программированиенекоторые концепции. Возможно, читатели увидят здесь небольшой беспорядок, разберутся в отношениях между тремя:

  1. Reactive Streamsэто реактивное программированиестандартныйиСпецификация;
  2. Reactorоснован наReactive Streamsнаборсреда реактивного программирования;
  3. WebFluxотReactorна основе реализацииWebполесреда реактивного программирования.

На самом деле, бизнес-разработчики при написании реактивного кода обычно касаются толькоPublisherЭтот интерфейс соответствуетReactorявляетсяMonoиFlux.

заSubscriberиSubcriptionЭти два интерфейса,ReactorЕсть и соответствующие реализации. все этоSpring WebFluxиSpring Data Reactiveиспользуется такой каркас. еслиНе разрабатывать промежуточное ПО, разработчики не будут подвергаться воздействию.

Начало работы с Reactor

СледующийReactorсерединаMonoиFluxИспользование основных методов в этих двух классах.

в видеJava 8представилStreamТакой же,ReactorСпособ использования в основном разделен на три шага:

  • Создание начальной стадии
  • Промежуточный этап обработки
  • конечное потребление

Просто создание и потребление можно было бы сделать с помощью чего-то вродеSpring 5Это делается фреймворком (например,WebFluxсерединаWebClientперечислитьHTTPинтерфейс, возвращаемое значениеMono). Но нам все еще нужно базовое понимание того, как развиваются эти этапы.

1. Создайте Mono и Flux (начальный этап)

использоватьReactorНачало программирования должно заключаться в созданииMonoилиFlux. Иногда нам не нужно создавать свои собственные, а реализовать напримерWebFluxсерединаWebClientилиSpring Data Reactiveполучить одинMonoилиFlux.

  • Используйте WebFlux WebClient для вызова интерфейса HTTP
WebClient webClient = WebClient.create("http://localhost:8080");
public Mono<User> findById(Long userId) {
    return webClient
            .get()
            .uri("/users/" + userId)
            .accept(MediaType.APPLICATION_JSON)
            .exchange()
            .flatMap(cr -> cr.bodyToMono(User.class));
}
  • Запросить пользователя с помощью ReactiveMongoRepository
public interface UserRepository extends ReactiveMongoRepository<User, Long> {
    Mono<User> findByUsername(String username);
}

Но иногда нам также необходимо активно создаватьMonoилиFlux.

распространенный способ создания

Mono<String> helloWorld = Mono.just("Hello World");
Flux<String> fewWords = Flux.just("Hello", "World");
Flux<String> manyWords = Flux.fromIterable(words);

Когда используется этот метод создания? Обычно используется после серииТип без ввода-выводаПосле операции получается объект. Далее мы будем использовать этот объект на основеReactorпровестивысокая производительностьизIOПри работе ранее полученный объект может быть преобразован таким образом вMonoилиFlux.

Как создается искусство

Вышеупомянутое черезСинхронный вызовРезультат создается изMonoилиFlux, но иногда необходимо начать сНет ReactiveизАсинхронный вызовРезультат созданMonoилиFlux.

если этоасинхронный методвернутьCompletableFuture, что может быть основано на этомCompletableFutureСоздаватьMono:

Mono.fromFuture(completableFuture);

если этоАсинхронный вызовне вернетсяCompletableFuture, есть свояметод обратного вызова, как создатьMonoШерстяная ткань? можно использоватьstatic <T> Mono<T> create(Consumer<MonoSink<T>> callback)метод:

Mono.create(sink -> {
    ListenableFuture<ResponseEntity<String>> entity = asyncRestTemplate.getForEntity(url, String.class);
    entity.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
        @Override
        public void onSuccess(ResponseEntity<String> result) {
            sink.success(result.getBody());
        }

        @Override
        public void onFailure(Throwable ex) {
            sink.error(ex);
        }
    });
});

в настоящее время используетWebFluxпосле,AsyncRestTemplateЭто устарело, это просто для демонстрации.

2. Работа с Mono и Flux (промежуточный этап)

промежуточный этапMonoиFluxметоды в основномfilter,map,flatMap,then,zip,reduceЖдать. Как пользоваться этими методами иStreamМетод аналогичен.

Вот несколькоReactorЧтобы разработать практические задачи проекта, помогите понять сценарии использования этих методов:

Вопрос 1: Когда использовать карту, flatMap и затем

В этом параграфе будут задействованы следующие классы и методы:

  • метод:Mono.map()
  • метод:Mono.flatMap()
  • метод:Mono.then()
  • своего рода:Function

существуетMonoиFlux средний срок, есть три несколько похожих метода:map(),flatMap()иthen(). Эти три метода используются очень часто.

  • традиционное императивное программирование
Object result1 = doStep1(params);
Object result2 = doStep2(result1);
Object result3 = doStep3(result2);
  • Соответствующее реактивное программирование
Mono.just(params)
    .flatMap(v -> doStep1(v))
    .flatMap(v -> doStep2(v))
    .flatMap(v -> doStep3(v));

Это видно из сравнения двух приведенных выше кодов.flatMap()Роль метода в этом,map()иthen()Методы работают аналогично. Но в чем разница между этими методами? Давайте сначала посмотрим на сигнатуры этих трех методов (начиная сMonoпример):

  • flatMap(Function<? super T, ? extends Mono<? extends R>> transformer)
  • map(Function<? super T, ? extends R> mapper)
  • then(Mono other)
then()

then()Кажется, это означает следующий шаг, но это означает только следующий шаг в последовательности выполнения и не означает, что следующий шаг зависит от предыдущего шага.then()Параметр метода - это простоMono, результат выполнения предыдущего шага не может быть принят. иflatMap()иmap()ПараметрыFunction, входной параметр — результат выполнения предыдущего шага.

плоская карта () и карта ()

flatMap()иmap()Разница в том, чтоflatMap()входные параметры вFunctionТребование возвращаемого значенияMonoобъект иmapВходFunctionПросто попросите вернутьнормальный объект. Часто необходимо звонить в бизнес-процессыWebClientилиReactiveXxxRepositoryметоды ввозвращаемое значениеобаMono(илиFlux). Итак, чтобы объединить эти вызовы в одно целоецепной вызов, вы должны использоватьflatMap(), вместоmap().

Вопрос 2: Как реализовать параллельное выполнение

В этом параграфе будут задействованы следующие классы и методы:

  • метод:Mono.zip()
  • своего рода:Tuple2
  • своего рода:BiFunction

параллельное выполнениеявляется общим требованием.Reactive ProgrammingХотяАсинхронное программированиепуть, ноасинхронныйэто не значит, чтоПараллелизм и параллелизмиз.

существуеттрадиционное императивное программированиесередина,параллельное выполнениечерезПул потоковдобавлятьFutureспособ реализован.

Future<Result1> result1Future = threadPoolExecutor.submit(() -> doStep1(params));
Future<Result2> result2Future = threadPoolExecutor.submit(() -> doStep2(params));
// Retrive result
Result1 result1 = result1Future.get();
Result2 result2 = result2Future.get();
// Do merge;
return mergeResult;

Хотя приведенный выше код реализуетАсинхронный вызов,ноFuture.get()путьблокироватьиз. в настоящее время используетReactorразработан спараллелизмсцена казниреактивный код, вышеуказанный метод не может быть использован.

следует использоватьMonoиFluxсерединаzip()способMonoНапример, код выглядит следующим образом:

Mono<CustomType1> item1Mono = ...;
Mono<CustomType2> item2Mono = ...;
Mono.zip(items -> {
    CustomType1 item1 = CustomType1.class.cast(items[0]);
    CustomType2 item2 = CustomType2.class.cast(items[1]);
    // Do merge
    return mergeResult;
}, item1Mono, item2Mono);

В приведенном выше коде он производитitem1Monoиitem2MonoПроцесспараллельноиз. Например, вызовHTTPинтерфейс при выполнениизапрос к базе данныхработать. Это ускорит выполнение программы.

Но есть проблема с приведенным выше кодом, т.е.zip()надо сделать методпринуждение. в то время как актерский составнебезопасныйиз. к счастьюzip()метод существуетмножественные перегрузкиформа. Помимо самой основной формы, существует множествобезопасность типаформа:

static <T1, T2> Mono<Tuple2<T1, T2>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2);
static <T1, T2, O> Mono<O> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, BiFunction<? super T1, ? super T2, ? extends O> combinator); 
static <T1, T2, T3> Mono<Tuple3<T1, T2, T3>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3);

не более чем7Операция слияния каждого элемента имеетбезопасность типаизzip()метод является необязательным. Возьмите комбинацию двух элементов в качестве примера, чтобы представить метод использования:

Mono.zip(item1Mono, item2Mono).map(tuple -> {
    CustomType1 item1 = tuple.getT1();
    CustomType2 item2 = tuple.getT2();
    // Do merge
    return mergeResult;
});

В приведенном выше кодеmap()Параметр методаTuple2, представляющийдвоичный массив, и соответственноTuple3,Tuple4Ждать.

Для одновременного выполнения двух элементов также можно передатьzip(Mono<? extends T1> p1, Mono<? extends T2> p2, BiFunction<? super T1, ? super T2, ? extends O> combinator)метод напрямую объединяет результаты. метод заключается в том, чтобы пройтиBiFunctionвыполнитьАлгоритм слияния.

Проблема 3: Агрегация после цикла сбора

В этом параграфе будут задействованы следующие классы и методы:

  • метод:Flux.fromIterable()
  • метод:Flux.reduce()
  • своего рода:BiFunction

Другой немного более сложный сценарий для объекта, тип которого является коллекцией (List,Set), а затем обработать исходный объект. использоватьшаблон итератораКод легко написать:

List<SubData> subDataList = data.getSubDataList();
for (SubData item : subDataList) {
    // Do something on data and item
}
// Do something on data

когда мы используемReactiveКогда код стиля реализует описанную выше логику, все не так просто. будет использоваться здесьFluxизreduce()метод.reduce()Сигнатура метода следующая:

  • <A> Mono<A> reduce(A initial, BiFunction<A, ? super T, A> accumulator);

Как можно видеть,reduce()Функция метода заключается в преобразованииFlux полимеризацияв одинMono.

  • первый параметр: возвращаемое значениеMonoсредний элементПервоначальный значение.

  • второй параметр: ЯвляетсяBiFunction, реализоватьАгрегатная операциялогика. Для общих параметров<A, ? super T, A>середина:

    • ПервыйA: означает каждый разАгрегатная операцияПослетип результата, действует какBiFunction.apply()методпервая запись;
    • секунда? super T: указывает тип каждого элемента в коллекции, который действует какBiFunction.apply()методвторая запись;
    • ТретийA: представляет агрегатную операциюрезультат, действует какBiFunction.apply()методвозвращаемое значение.

Давайте рассмотрим пример:

Data initData = ...;
List<SubData> list = ...;
Flux.fromIterable(list)
    .reduce(initData, (data, itemInList) -> {
        // Do something on data and itemInList
        return data;
    });

В приведенном выше примере кодаinitDataиdataтого же типа. После выполнения приведенного выше кодаreduce()метод вернетMono<Data>.

3. Потребляйте Mono и Flux (конечная стадия)

прямое потреблениеMonoилиFluxСпособ - позвонитьsubscribe()метод. если вWebFluxРазработка в интерфейсе, возврат напрямуюMonoили Флюс.WebFluxФреймворк завершит финалResponseВыход работает.

резюме

В этой статье представлены некоторые концепции реактивного программирования иSpring ReactorОсновное использование фреймворка также знакомит с тем, как использоватьReactorРешите несколько более сложных задач.ReactorсуществуетSpring 5Применений много, и я поделюсь с вами позже.Spring ReactorБлог боевой серии.


Добро пожаловать в технический публичный аккаунт: Zero One Technology Stack

零壹技术栈

Эта учетная запись будет продолжать делиться сухими товарами серверных технологий, включая основы виртуальных машин, многопоточное программирование, высокопроизводительные фреймворки, асинхронное ПО, промежуточное ПО для кэширования и обмена сообщениями, распределенные и микросервисы, материалы для обучения архитектуре и расширенные учебные материалы и статьи.