Автор: No Dishwashing Studio - Marklux
Источник:Marklux's Pub
Все права принадлежат автору, при перепечатке указывать источник
Начало работы с реактивным программированием (RxJava)
задний план
С течением времени область программирования продолжает внедрять новые технологии, чтобы попытаться решить существующие проблемы. ** Реактивное программирование (потоковое программирование) ** - очень популярное решение в последние годы. Если мы обратим внимание на тенденции отрасли, вы обнаружили, что большинство языков и фреймворков начали поддерживать эту модель программирования:
- Java 8 => Введение шаблонов Stream, Observable и Observer
- Spring 5 => Внедрить WebFlux, нижний слой полностью использует реактивную модель.
- RxJava => Долгое время занимал первое место в самом популярном проекте github в прошлом году
Ожидается, что в будущем реактивное программирование будет широко применяться в области разработки, и Alibaba также начала соответствующие преобразования.В настоящее время архитектура приложений Taobao встала на путь обновления потоковой архитектуры.В качестве разработки, Парадигма адаптивного программирования Все еще необходимо освоить, далее будет дано краткое введение в связанные концепции и основные идеи программирования на основе RxJava 2.0 (на основе первых трех глав краткого изложения книги «Изучение Rx Java»)
Основная мысль
Существуют разные мнения по поводу определения реактивного программирования (Reactive Programming, далее RP). Википедия определяет его как парадигму программирования, ReactiveX определяет его как усовершенствование шаблона проектирования (шаблон наблюдателя), а некоторые Дэниелы считают, что RP — это просто сборка различных колес... О RP Природа RP может быть кратко обсуждена на конец статьи, но с точки зрения обучения, я думаю, лучше всего рассматривать RP как идею программирования, ориентированную на события и потоки, следующим образом:
Java уважает идею программирования ООП, поэтому для Java-программистов программа представляет собой комбинацию различных объектов, а программирование — это процесс управления состоянием и поведением объектов.
RP пропагандирует идею потокового программирования, поэтому для разработчиков, будь то события или данные, все будет представлено в виде потоков. Эти потоки иногда параллельны, а иногда пересекаются. Программирование — это процесс наблюдения и регулирования эти потоки.
из реального мира
В реальном мире существует очевидное физическое явление, т.все в движении (изменение), будь то движение транспорта, погода, люди или даже камень, он будет двигаться вместе с вращением Земли. Движения разных объектов могут не мешать друг другу, например транспортные средства и пешеходы, движущиеся по разным дорогам, или могут быть перекрестки, например пешеходы и транспортные средства встречаются на одном перекрестке.
Возвращаясь к нашему программированию, мы часто абстрагируем программу на несколько процессов. Как и реальный мир, эти процессы также меняются и движутся. Иногда они могут быть параллельными, а иногда имеют зависимости (предварительные и постусловия). традиционной модели программирования, мы часто используем многопоточные или асинхронные методы для обработки этих процессов, но таким образомне натуральный.
Поэтому RP абстрагируется и сэмплируется из реального мира, разделяя программу на следующие три компонента:
- Наблюдаемые: то, что можно наблюдать, то есть события и потоки данных.
- Наблюдатель: вещи, которые наблюдают за потоками
- Оператор: оператор, вещи, которые соединяют и фильтруют потоки и т. д.
Давайте представим эти три понятия на минимальном примере:
Observable<String> myStrings =
Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon");
myStrings.map(s -> s.length())
.subscribe(s -> System.out.println(s));
В приведенном выше примереmyStrings
Наблюдаемый,map(s -> s.length())
является Оператором,subscribe(s -> System.out.println(s))
является Наблюдателем.
Этот пример сначала возьмет длину нескольких строк, а затем выведет их одну за другой.
Observable
Observable — это просто поток, а в RxJava всего три основных API:
-
onNext()
: передать объект -
onComplete()
: "сигнал" завершения -
onError()
: передать ошибку
В основном все потоки являются оболочкой этих трех методов.
Создайте
-
использовать
Observable.create()
Observable<String> myStrings = Observable.create(emitter -> { emitter.onNext("apple"); emitter.onNext("bear"); emitter.onNext("change"); emitter.onComplete(); });
-
использовать
Observable.just()
Observable<String> myStrings = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon");
Обратите внимание, что количество элементов, созданных этим методом, должно быть ограничено.
-
Создано из других источников данных, таких как
Observable.fromIterable()
,Observable.range()
Hot & Cold Observables
Данные, создаваемые потоковой передачей Cold, статичны, как компакт-диск. Независимо от того, когда и кто их слушает, можно услышать весь контент.
Observable<String> source =
Observable.just("Alpha","Beta","Gamma","Delta","Epsilon");
//first observer
source.subscribe(s -> System.out.println("Observer 1 Received: " + s));
//second observer
source.subscribe(s -> System.out.println("Observer 2 Received: " + s));
Вышеуказанные два подписанных зарегистрированных наблюдателя получат один и тот же поток.
Данные, генерируемые потоком Hot, являются динамическими, как и радиостанция, если время трансляции пропущено, прошлые данные будут недоступны. Listener требуется для непосредственного создания Hot stream.Приведен официальный пример JavaFx, но он не подходит для размещения здесь.
На самом деле поток Cold обычно преобразуется в поток Hot с помощью ConnectableObservable:
ConnectableObservable<String> source =
Observable.just("Alpha","Beta","Gamma","Delta","Epsilon")
.publish();
//Set up observer 1
source.subscribe(s -> System.out.println("Observer 1: " + s));
//Set up observer 2
source.map(String::length)
.subscribe(i -> System.out.println("Observer 2: " + i));
//Fire!
source.connect();
пройти черезpublish()
метод созданияConnectableObservable
, то черезconnect()
Метод запускает передачу потока, в это время источник передает все данные двум наблюдателям. После этого, если вы зарегистрируете новый Observer для источника, вы не получите никаких данных, потому что Hot stream не позволяет использовать данные повторно.
Observers
Сам наблюдатель представляет собой относительно простую структуру, а основную функцию выполняет сам вызывающий объект.
может быть достигнуто путемObserver
Способ создания интерфейса для создания наблюдателя, конечно, более распространенный случай — это создание лямбда-выражения наблюдателем, как и в случае, только что использованном, здесь подробно не описанном.
Зарегистрироваться можно, позвонив в Observerbale.subscribe
метод.
Operators
Только создание потоков и наблюдателей не дает большого эффекта, большую часть времени нам нужно выполнять различные операции над потоками через операторы, чтобы сделать RP осмысленным.
RxJava предоставляет очень богатый набор операторов, грубо разделенных на следующие категории:
- Операторы Create, используемые для создания потоков, мы только что использовали некоторые из них, такие как Create, Just, Range и Interval.
- Операторы преобразования, используемые для преобразования потока в другую форму, например Buffer (упаковка и преобразование элементов потока в коллекцию), Map (выполнение функции над каждым элементом потока), Window (преобразование элементов потока в сборник) Разбиваем на разные окна и потом запускаем)
- Оператор фильтра, отфильтруйте часть данных в потоке, чтобы получить указанный элемент данных, например фильтр, первый, отличительный
- Оператор комбинации для объединения нескольких потоков в один, например And, Then, Merge, Zip.
- Операторы условного/арифметического/агрегирования/преобразования... играют роль в различных операциях
- Пользовательские операторы, созданные самими пользователями
Если вы подробно расскажете о каждом операторе, вы можете почти написать книгу, в которой показаны богатые функции, предоставляемые операторами. Только некоторые иобратное давлениесвязанные операторы.
обратное давление
Так называемое обратное давление относится кАсинхронная средаКогда скорость производителя больше, чем скорость потребителя, потребитель, в свою очередь, контролирует скорость производителя (также называемую противодавлением), что является методом управления потоком, который может использовать ресурсы более эффективно и предотвращать неправильную лавину.
Для реализации стратегии противодавления можно использовать следующие операторы.
-
Класс дросселирования
Отрегулируйте скорость, с которой Observable отправляет сообщения через операторов, например, с помощью
sample()
периодически сэмплировать и выдавать последние данные или использоватьthrottleWithTimeout()
для удаления просроченных данных. Но при этом часть данных будет удалена, и конечный потребитель не получит полного сообщения. -
Буфер и классы окон и буферов
использовать буферизацию
buffer()
и окноwindow()
Временно сохраняйте сообщения, которые отправляются слишком быстро, и освобождайте их, когда скорость отправки сообщений замедляется. Это в основном используется в сценариях, где Observable излучает с неравномерной скоростью.
В дополнение к использованию операторов есть еще один способ достижения противодавления — реактивная тяга.
Позвонив в Observerrequest(n)
метод, потребитель может контролировать производство производителя, то есть производитель генерирует данные только тогда, когда потребитель запрашивает их, и когда потребитель заканчивает потреблять данные, он запрашивает новые данные.Скорость производства слишком высока, так как показано ниже
Но для этого требуется, чтобы наблюдаемый восходящий поток мог ответить на запрос запроса.В настоящее время для управления поведением наблюдаемого можно использовать несколько операторов:
-
onBackPressureBuffer
Создайте кеш для данных, отправленных Observable.Сгенерированные данные сначала помещаются в кеш.Каждый раз, когда приходит запрос, соответствующее количество событий берется из кеша и возвращается.
-
onBackPressureDrop
Наблюдатель получает указание отбросить следующий раз, пока подписчик снова не вызовет метод request(n) и не отправит его n раз после времени вызова подписчика.
Стратегия противодействия — область, достойная глубокого изучения и обсуждения.Динамическое ограничение тока, разомкнутая цепьМожно, и из-за восприятия обратного давления приложение имеет возможность делатьДинамическое масштабирование.
Мышление: зачем вам RP
Основные понятия RP почти введены, теперь нужно подумать о том, зачем нужен RP, как применять RP на стороне сервера и какие преимущества он может дать.
Прежде всего, что касается природы RP, я думаю, что это колесо асинхронного программирования.Использование API режима наблюдателя делает процесс асинхронного программирования более понятным и простым.Это похоже на использование CSP для упрощения параллельных операций.Нижняя часть Слой фактически представляет собой инкапсуляцию предшествующего уровня техники.
Итак, вопрос в том, зачем нам инкапсулировать асинхронное программирование, какие преимущества мы можем извлечь из использования асинхронного программирования, и чтобы решить эту проблему, нам нужно вернуться к истокам.
Если вы хотите спросить, что является самым большим узким местом производительности сервера, ответ должен быть IO, потому что самая трудоемкая часть обработки запроса — это ожидание ввода-вывода, а ожидание приведет к блокировке, поэтому, если вы хотите улучшить производительность, вы не можете записать блокирующий код.
Как сделать, чтобы код не блокировался? Что касается Java-сервера, то традиционные методы обработки — это не что иное, как следующие два:
- Используйте Thread для запуска бизнес-кода и кода ввода-вывода в разных потоках. Но вы должны столкнуться с проблемами параллелизма (конкуренцией за ресурсы), и, согласно предыдущему анализу планирования потоков Java, мы знаем, что это неэффективно для использования ресурсов ЦП (переключение контекста потребляет много ресурсов).
- Используя асинхронные обратные вызовы, для его реализации можно использовать Callback или Future, но логику планирования нужно реализовывать самостоятельно, в то же время код, написанный в режиме Callback, не прост для понимания, и здесь может быть Callbcak Hell.
Итак, в конце концов, чтобы устранить узкое место в производительности, RP предлагает следующее решение:
Он обеспечивает превосходную структуру асинхронной обработки и упрощает процесс написания асинхронного кода, в конечном итоге достигая цели уменьшения блокировки и повышения производительности.