Учебные заметки по реактивному программированию Reactor

Java

Начните с реактивного программирования

Реактивное программирование — это стиль асинхронного программирования, ориентированный на потоки данных и распространение изменений. Это означает, что он может выражать статические (например, массивы) или динамические (например, источники событий) потоки данных в существующих языках программирования.

В реактивном программировании Microsoft сделала первый шаг, создав библиотеку Reactive Extensions (Rx) в экосистеме .NET. Затем RxJava реализует реактивное программирование на JVM. Позже на платформе JVM появился набор стандартных спецификаций реактивного программирования, который определяет ряд стандартных интерфейсов и спецификаций взаимодействия. И интегрирован в Java 9 (класс Flow).

Реактивное программирование часто используется как расширение «шаблона проектирования Observer» в объектно-ориентированном программировании. Реактивные потоки также имеют что-то общее с «шаблоном проектирования Iterator», потому что существует аналог Iterable-Iterator. Основное отличие состоит в том, что итераторы основаны на подходе «вытягивания», тогда как реактивные потоки основаны на подходе «проталкивания».

  • Итератор — это «императивная» парадигма программирования, даже несмотря на то, что метод доступа к элементу является исключительной ответственностью Iterable. Дело в том, что разработчик сам решает, когда выполнять next() для получения элемента.
  • В реактивном потоке соответствующая роль — Издатель-Подписчик, но при поступлении нового значения издатель (Publisher) в свою очередь уведомляет подписчика (Subscriber).Этот режим «push» является реактивным.

Кроме того, операции с передаваемыми данными выражаются декларативно, а не императивно: разработчики определяют логику обработки потока данных, описывая «поток управления».

В дополнение к отправке данных хорошо определены обработка ошибок и сигналы завершения. Издатель может отправлять новые значения своему подписчику (вызывая метод onNext), а также отправлять сигналы ошибки (вызывая метод onError) и завершения (вызывая метод onComplete). Сигналы ошибки и завершения могут завершать реактивные потоки. Его можно описать следующим выражением:

onNext x 0..N [onError | onComplete]

Этот подход очень гибкий и может обрабатываться со значениями/без значений или с n значениями (включая потоки с бесконечными значениями, такие как непрерывный обратный отсчет часов).

Вышеприведенное исходит изпроект реактор.IO/docs/core/th…перевести

Reactive Streams

Reactive Streams — это набор стандартных спецификаций реактивного программирования, упомянутых выше. Он состоит из четырех основных понятий:

  • Издатель сообщения: существует только один интерфейс подписки, который вызывается подписчиком для подписки на сообщение издателя. Издатель отправляет сообщение подписчику после того, как подписчик вызывает запрос.
    public interface Publisher<T> {
        public void subscribe(Subscriber<? super T> s);
    }
    
  • Подписчик: Подписчик включает в себя четыре интерфейса, все из которых запускаются и вызываются издателем. onSubscribe сообщает подписчику, что подписка прошла успешно, и возвращает подписку; через подписку подписчик может указать издателю отправить указанное количество сообщений (запрос выполнен); onNext заключается в том, что когда у издателя есть сообщение, интерфейс подписчик вызывается для достижения цели публикации сообщения; onError уведомляет подписчика о том, что в издателе произошла ошибка; onComplete уведомляет подписчика о том, что сообщение было отправлено.
    public interface Subscriber<T> {
        public void onSubscribe(Subscription s);
        public void onNext(T t);
        public void onError(Throwable t);
        public void onComplete();
    }
    
  • Подписка: включает два интерфейса, запрашивающих n сообщений и отменяющих подписку.
    public interface Subscription {
        // request(n)用来发起请求数据,其中n表示请求数据的数量,它必须大于0,
        // 否则会抛出IllegalArgumentException,并触发onError,request的调用会
        // 累加,如果没有终止,最后会触发相应次数的onNext方法.
        public void request(long n);
        // cancel相当于取消订阅,调用之后,后续不会再收到订阅,onError 和 
        // onComplete也不会被触发
        public void cancel();
    }
    
  • Процессор: процессор наследует и подписчика, и издателя; он представляет собой этап обработки.
    public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
    }
    

С помощью вышеупомянутых четырех основных концепций и связанных с ними функций Reactive Streams создает рамочное соглашение для реактивных потоков, которое не имеет конкретной реализации. Проще говоря, он предоставляет только общее и подходящее решение, и каждый может реализовать его в соответствии с этой спецификацией.

Существует три основных библиотеки классов Java Reactive Programming, а именно Akka-Streams, RxJava и Project Reactor. Spring 5 начал поддерживать реактивное программирование, в котором внизу используется Project Reactor. Эта статья в основном предназначена для изучения и обобщения некоторых моментов в Project Reactor.

Project Reactor

Project Reactor — это реактивная библиотека на основе Java 8, реализующая спецификацию Reactive Streams.

Reactor представляет реактивные классы Flux и Mono, которые реализуют Publisher, а также широкие возможности манипуляций. Объект Flux представляет реактивную последовательность из 0..N элементов, а объект Mono представляет результат из нуля или одного (0..1) элемента.

Флюс и моно

Flux — это производитель, то есть издатель, о котором мы упоминали выше, который представляет собой асинхронную последовательность, содержащую элементы 0-N. Mono можно рассматривать как частный случай Flux, представляющий элементы 0-1. Если вам не нужно производить никаких Элементы, которым просто нужен сигнал для выполнения задачи, могут использовать Mono.

Flux — асинхронная последовательность 0-N элементов

Flux

Давайте сначала посмотрим на эту картинку, которая прямо вставлена ​​из официальной документации. Чтобы проиллюстрировать эту картину, давайте сначала сосредоточимся на нескольких моментах:

  • Ось временных рядов слева направо
  • 1-6 - это элементы Flux, включенные
  • Логотип вертикальной линии после успешного выполнения 6 вышеперечисленных действий
  • 1-3 ниже представляют результат преобразования
  • ❌ Указывает, что произошла ошибка, соответствующая выполнению onError
  • оператор : оператор, декларативный ассемблируемый реактивный метод, собранная цепочка называется «цепочкой операций»

В целом, Flux генерирует метаданные и получает результат преобразования через серию операторских операций.Обычный успех — onCompleted, а ошибка — onError. См. небольшой пример ниже:

Flux.just("glmapper","leishu").subscribe(new Subscriber<String>() {
    @Override
    public void onSubscribe(Subscription subscription) {
        // subscription 表示订阅关系
        System.out.println("onSubscribe,"+ subscription.getClass());
        // subscription 通过 request 来触发 onNext
        subscription.request(2);
    }
    @Override
    public void onNext(String s) {
        System.out.println("currrent value is = " + s);
    }
    @Override
    public void onError(Throwable throwable) {
        System.out.println("it's error.");
    }
    @Override
    public void onComplete() {
        System.out.println("it's completed.");
    }
});

Результаты:

onSubscribe,class reactor.core.publisher.StrictSubscriber
currrent value is = glmapper
currrent value is = leishu
it's completed.

Если мы не выполним запрос в методе onSubscribe, ничего не последует. См. ниже о запросе.

Flux — это стандартный издатель, способный выдавать от 0 до N элементов, который завершается сигналом «ошибка» или «завершение». Следовательно, результатом Flux может быть значение, завершение или ошибка. Как указано в спецификации Reactive Streaming, эти три типа сигналов транслируются как направленные в нисходящем направлении.onNext,onCompleteиonErrorметод.

Моно - асинхронный результат 0-1

Mono

Эта картинка тоже из официальной документации Отличие от Flux выше в том, что Mono может генерировать максимум один элемент.

Mono.just("glmapper").subscribe(System.out::println);

резюме

Из приведенных выше двух небольших фрагментов кода наиболее интуитивно понятно, что Flux эквивалентен List, а Mono эквивалентен Optional. На самом деле, все результаты в программировании могут быть представлены списком, но когда возвращается только один результат или не возвращается ни одного результата, может быть более точным использовать Необязательный.

Необязательные связанные понятия можно искать самостоятельно в jdk.

Кроме того, и Mono, и Flux предоставляют некоторые фабричные методы для создания связанных экземпляров, которые кратко перечислены здесь:

// 可以指定序列中包含的全部元素。创建出来的 Flux 
// 序列在发布这些元素之后会自动结束。
Flux.just("glmapper", "leishu");
// 从一个Iterable 对象中创建 Flux 对象,当然还可以是数组、Stream对象等
Flux.fromIterable(Arrays.asList("glmapper","leishu"));
// 创建一个只包含错误消息的序列。
Flux.error(new IllegalStateException());
// 创建一个包含了从 0 开始递增的 Long 对象的序列。其中包含的元素按照指定的间
// 隔来发布。除了间隔时间之外,还可以指定起始元素发布之前的延迟时间。
Flux.interval(Duration.ofMillis(100)).take(10);
// 创建一个不包含任何消息通知的序列。
Flux.never();
// 创建一个不包含任何元素,只发布结束消息的序列。
Flux.empty(); 
// 创建包含从 start 起始的 count 个数量的 Integer 对象的序列
Flux.range(int start, int count);
// Mono 同上
Mono.empty();
Mono.never();
Mono.just("glmapper");
Mono.error(new IllegalStateException());

Вышеупомянутые статические методы подходят для простой генерации последовательности.Когда генерация последовательности требует сложной логики, следует использовать метод generate() или create().

некоторые концепции

  • Оператор: оператор представляет собой серию удобных функциональных операций, которые можно вызывать в цепочке. Все вызовы функций в основном являются операторами Reactor, такими как just, map, flatMap, filter и т. д.
  • Процессор: как видно из определения интерфейса процессора, он является одновременно и подписчиком, и издателем; процессор зажат между первым издателем и последним подписчиком для обработки данных. Немного похоже на карту, фильтр и другие методы в потоке. В частности, в потоке данных обработчик подписывается на издателя в качестве подписчика для получения данных и принимает подписки других подписчиков в качестве издателя.После получения данных от издателя, на которого он подписан, он выполняет некоторую обработку, а затем пересылает его Подписчику, который подписывается на него.
  • противодавление: противодавление. Тем, кто разбирается в MQ, должно быть ясно.Отставание сообщений, как правило, находится на стороне потребителя, то есть сторона производителя отвечает только за производство и не заботится о потребительской способности стороны потребителя, которая приводит к отставанию по давлению со стороны потребителя, что является положительным моментом. Исходя из приведенного выше понимания Reactor, подписчик активно запрашивает издателя, так что, когда скорость потребления на стороне потребителя не такая высокая, как у производителя, эти сообщения все еще задерживаются на стороне производителя; это преимущество заключается в том, что производитель может сделать соответствующие запросы в соответствии с реальной ситуацией, чтобы настроить скорость, с которой создаются сообщения.
  • Горячее VS холодное: ссылкаHot VS Cold

основной процесс вызова

Основной процесс вызова Reactor можно условно разделить на несколько этапов на рисунке.

  • Объявления: используете ли вы просто или любой другой способ создания реактивного потока, этот процесс можно назвать объявлениями, потому что на этом этапе код фактически не будет выполняться.
  • подписка: когда вызывается подписка, весь процесс выполнения переходит в фазу подписки.После серии вызовов действие подписки будет делегировано конкретному потоку для реализации.
  • onSubscribe: Фаза onSubscribe относится к фазе, в которой последовательно вызывается метод Subscriber#onSubscribe. На этом этапе каждый подписчик узнает, что метод подписки запущен, и вскоре начнется реальный процесс обработки.
  • Фаза request: onSubscribe — это способ выразить действие подписки, сообщая каждому подписчику, что он готов начать обработку данных. Когда последний подписчик готов к обработке данных, он вызывает метод запроса подписки для запроса данных.
  • onNext: Выполните реальную оперативную обработку данных, вызвав метод onNext подписчика.
  • onComplete: успешное состояние терминала, дальнейшие события отправляться не будут.
  • onError: конечное состояние ошибки (то же самое, что и onComplete, когда это происходит, следующее выполнение не будет продолжено).

обработка сообщений

Когда вам нужно обрабатывать сообщения во Flux или Mono, вы можете добавить соответствующую логику подписки через метод subscribe. При вызове метода subscribe вы можете указать тип сообщения, которое необходимо обработать. Вы можете обрабатывать только обычные сообщения, содержащиеся в нем, или вы можете обрабатывать как сообщения об ошибках, так и сообщения о завершении.

Обработка обычных сообщений и сообщений об ошибках с помощью метода subscribe()

 Flux.just(1, 2)
    .concatWith(Mono.error(new IllegalStateException()))
    .subscribe(System.out::println, System.err::println);

результат:

1
2
java.lang.IllegalStateException

Обычная обработка сообщений относительно проста. При возникновении ошибки существует несколько различных стратегий ее обработки:

  • Вернуть значение по умолчанию с помощью метода onErrorReturn()

    Flux.just(1, 2)
        .concatWith(Mono.error(new IllegalStateException()))
        .onErrorReturn(0)
        .subscribe(System.out::println);
    

    результат:

    1
    2
    0
    
  • Используйте метод onErrorResume() для выбора потока элементов, которые будут использоваться в соответствии с различными типами исключений.

     Flux.just(1, 2)
            .concatWith(Mono.error(new IllegalArgumentException()))
            .onErrorResume(e -> {
                if (e instanceof IllegalStateException) {
                    return Mono.just(0);
                } else if (e instanceof IllegalArgumentException) {
                    return Mono.just(-1);
                }
                return Mono.empty();
                }).subscribe(System.out::println);
    

    результат:

    1
    2
    -1
    
  • Повторная попытка выполняется с помощью оператора повторной попытки, а действие повторной попытки достигается путем повторной подписки на последовательность. Количество повторных попыток может быть указано при использовании оператора повторной попытки.

    Flux.just(1, 2)
        .concatWith(Mono.error(new IllegalStateException()))
        .retry(1)
        .subscribe(System.out::println);
    

    результат:

    1
    2
    1
    2
    Exception in thread "main" reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException
    Caused by: java.lang.IllegalStateException
    	at com.glmapper.bridge.boot.reactor.SimpleTest.testFluxSub(SimpleTest.java:75)
    	at com.glmapper.bridge.boot.reactor.SimpleTest.main(SimpleTest.java:23)
    

Планировщик Планировщик

В Reactor режим выполнения и процесс выполнения зависят от используемого планировщика.Планировщик — это абстрактный интерфейс с широким набором классов реализации.Статические методы, предоставляемые классом планировщиков, используются для достижения следующей среды выполнения:

  • Текущий поток (Schedulers.immediate())
     Schedulers.immediate().schedule(()->{
        System.out.println(Thread.currentThread().getName()+"-"+11);
     });
     
     // main-11
    
  • Многоразовый одиночный поток (Schedulers.single()). Обратите внимание, что этот метод предоставляет один и тот же поток для использования всеми вызывающими объектами до тех пор, пока Планировщик не станет устаревшим. Если вы хотите использовать выделенный поток, используйте Schedulers.newSingle() для каждого вызова.
    Schedulers.single().schedule(()->{
        System.out.println(Thread.currentThread().getName()+"-"+11);
    });
    
    // single-1-11
    
  • Эластичный пул потоков (Schedulers.elastic(). Он создает пул потоков по мере необходимости и повторно использует простаивающие потоки. Если пул потоков простаивает слишком долго (по умолчанию 60 с), он будет отброшен. Он подходит для ввода-вывода. Блокирующие сценарии Планировщики .elastic() позволяют легко назначить блокирующей задаче отдельный поток, чтобы она не мешала другим задачам и ресурсам.
    Schedulers.elastic().schedule(()->{
        System.out.println(Thread.currentThread().getName()+"-"+11);
    });
    
    // elastic-2-11
    
  • Пул потоков фиксированного размера (Schedulers.parallel()). Размер создаваемого пула потоков равен количеству процессоров.
    Schedulers.parallel().schedule(()->{
        System.out.println(Thread.currentThread().getName()+"-"+11);
    });
    
    // parallel-1-11
    
  • Создайте планировщик на основе существующего ExecutorService
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    Schedulers.fromExecutorService(executorService).schedule(()->{
        System.out.println(Thread.currentThread().getName()+"-"+11);
    });
            
    // pool-4-thread-1-11
    
  • Создание планировщиков на основе методов newXXX
    Schedulers.newElastic("test-elastic").schedule(()->{
        System.out.println(Thread.currentThread().getName()+"-"+11);
    });
    
    // test-elastic-4-11
    

Некоторые операторы будут использовать указанный планировщик по умолчанию (обычно позволяя разработчикам приспосабливаться к другим планировщикам).Например, Flux, сгенерированный фабричным методом Flux.interval(Duration.ofMillis(100)) по умолчанию помечается каждые 100 мс. это Schedulers.parallel(), и приведенный ниже код демонстрирует, как заменить его на Schedulers.single().

Flux<String> intervalResult = Flux.interval(Duration.ofMillis(100),
        Schedulers.newSingle("test"))
        .map(i -> Thread.currentThread().getName() +"@"+i);
        intervalResult.subscribe(System.out::println);

результат:

test-1@0
test-1@1
test-1@2
test-1@3
test-1@4
// 省略

публикация и подписка

Reactor предоставляет два способа настройки планировщика в реактивной цепочке: publishOn и subscribeOn. Оба они принимают планировщик в качестве параметра, что позволяет изменять планировщик. Но место публикации в цепочке имеет значение, а подписка не имеет значения.

  • publishOn используется как любой другой оператор в цепочке подписчиков. Он передает восходящий сигнал нисходящему и одновременно выполняет обратный вызов в определенном рабочем потоке указанного планировщика планировщика. Это изменит поток, в котором выполняются последующие операторы (пока в цепочке не появится следующая публикация).
  • subscribeOn используется для процесса подписки, действуя по восходящей цепочке подписки (издатель активируется только при подписке, а направление доставки подписки восходящее). Таким образом, независимо от того, где вы разместили subscribeOn в цепочке операций, это повлияет на контекст выполнения потока источника. Однако это не влияет на последующую публикацию, которая по-прежнему может переключать контекст выполнения потока оператора, следующего за ней.
Flux.create(sink -> {
        sink.next(Thread.currentThread().getName());
        sink.complete();
    })
    .publishOn(Schedulers.single())
    .map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
    .publishOn(Schedulers.elastic())
    .map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
    .subscribeOn(Schedulers.parallel())
    .toStream()
    .forEach(System.out::println);

результат:

[elastic-2] [single-1] parallel-1

В приведенном выше коде используется метод create() для создания нового объекта Flux, содержащего единственный элемент, который является именем текущего потока.

Затем есть две пары методов publishOn() и map(), роль которых заключается в том, чтобы сначала переключать планировщик во время выполнения, а затем добавлять имя текущего потока в качестве префикса.

Наконец, метод subscribeOn() используется для изменения метода выполнения при создании потока.

Имя самого внутреннего потока parallel-1 происходит от планировщика Schedulers.parallel(), используемого при создании элементов в потоке, имя среднего потока single-1 происходит от планировщика Schedulers.single() перед первой операцией сопоставления, имя самого внешнего потока Имя потока слоя elastic-2 происходит от планировщика Schedulers.elastic() перед второй операцией карты.

Сначала зайди сюда, а потом придумай остальное...

Ссылаться на