Автор: Xianyu Technology - Куньмин
RxJava — это среда реализации реактивного программирования на Java, основанная на событиях библиотека кода, предоставляющая мощные и элегантные асинхронные вызывающие программы. За последние 18 лет проект обновления архитектуры приложений, инициированный технологическим отделом Taobao, надеется улучшить общую производительность системы и использование машинных ресурсов, уменьшить задержку в сети и повторно использовать ресурсы за счет преобразования реактивной архитектуры и полной асинхронности. Быстрые инновации обеспечивают гибкую архитектурную поддержку. Основные ссылки Xianyu, такие как пакетное обновление товаров, пакетный запрос заказов и т. д., используют возможности асинхронного программирования RxJava.
Тем не менее, RxJava легко начать и сложно освоить, и в нем случайно встречается много подводных камней. Сегодня давайте рассмотрим использование, основные принципы и меры предосторожности RxJava.
1. Прежде чем начать
Давайте сначала рассмотрим болевые точки кода обратного вызова, который мы написали перед использованием RxJava.
Когда нашему приложению необходимо обрабатывать пользовательские события и асинхронные вызовы, с увеличением сложности потоковых событий и логики обработки сложность реализации кода резко возрастает. Например, иногда нам нужно иметь дело с комбинацией нескольких потоков событий, обрабатывать исключения или тайм-ауты потоков событий и выполнять работу по очистке после окончания потока событий.Если нам нужно реализовать это с нуля, мы должны тщательно обрабатывать обратные вызовы. , мониторинг, параллелизм и многие другие каверзные вопросы.
Существует также проблема, называемая «ад обратных вызовов», которая описывает нечитаемый код.
Code 1.1
// 示例引自callbackhell.com
fs.readdir(source, function (err, files) {
if (err) {
console.log('Error finding files: ' + err)
} else {
files.forEach(function (filename, fileIndex) {
console.log(filename)
gm(source + filename).size(function (err, values) {
if (err) {
console.log('Error identifying file size: ' + err)
} else {
console.log(filename + ' : ' + values)
aspect = (values.width / values.height)
widths.forEach(function (width, widthIndex) {
height = Math.round(width / aspect)
console.log('resizing ' + filename + 'to ' + height + 'x' + height)
this.resize(width, height).write(dest + 'w' + width + '_' + filename, function(err) {
if (err) console.log('Error writing file: ' + err)
})
}.bind(this))
}
})
})
}
})
В приведенном выше js-коде есть два очевидных слота: 1. Много }) появляется в конце кода из-за входящих послойных callback-методов 2. Порядок написания кода противоположен порядку выполнения кода : функция обратного вызова появится первой. Код в предыдущей строке выполняется первым.
И если мы будем использовать RxJava, мы легко справимся с обратными вызовами, исключениями и т. д.
2. Представьте RxJava
Предположим, теперь мы хотим получить список пользователей асинхронно, а затем обработать результат, например отобразить в UI или записать в кеш, код после того, как мы используем RxJava, выглядит следующим образом:
Code 2.1
Observable<Object> observable = Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(@NotNull ObservableEmitter<Object> emitter) throws Exception {
System.out.println(Thread.currentThread().getName() + "----TestRx.subscribe");
List<UserDo> result = userService.getAllUser();
for (UserDo st : result) {emitter.onNext(st);}
}
});
Observable<String> map = observable.map(s -> s.toString());
// 创建订阅关系
map.subscribe(o -> System.out.println(Thread.currentThread().getName() + "----sub1 = " + o)/*更新到ui*/);
map.subscribe(o -> System.out.println(Thread.currentThread().getName() + "----sub2 = " + o)/*写缓存*/,
e-> System.out.println("e = " + e)),
()->System.out.println("finish")));
userService.getAllUser() — это обычный синхронный метод, но мы оборачиваем его в Observable, и когда возвращается результат, отправляем пользователей слушателям по одному. Первый слушатель обновляет пользовательский интерфейс, а второй слушатель записывает данные в кеш. И когда возникает исключение вверх по течению, печать; когда поток событий заканчивается, печать завершена.
Кроме того, вы можете легко настроить тайм-аут восходящего потока, пул потоков вызовов, резервные результаты и т. д., это очень мощно?
Следует отметить, что код RxJava выглядит как приведенный выше пример, который прост в использовании и очень читабелен, но если он не полностью понят, легко могут возникнуть неожиданные ошибки: новички могут подумать, что в приведенном выше коде после возвращается список пользователей, каждый элемент отправляется асинхронно двум нижестоящим наблюдателям, которые печатают результаты в своих собственных потоках. Но это не так: userService.getAllUser() будет вызываться дважды (метод getAllUser() будет вызываться снова всякий раз, когда устанавливается отношение подписки), и после запроса списка пользователей он будет вызыватьсяСинхронизироватьнаправляется двум наблюдателям, наблюдатели такжеСинхронизироватьраспечатать каждый элемент. то есть sub1=user1, sub1=user2, sub1=user3, sub2=user1, sub2=user2, sub2=user3.
Видно, если нет другой конфигурации, RxJavaПо умолчанию используется синхронная блокировка.из! ! ! Итак, как мы используем его асинхронные неблокирующие возможности, давайте посмотрим вниз.
Code 2.2
Observable
.fromCallable(() -> {
System.out.println(Thread.currentThread().getName() + "----observable fromCallable");
Thread.sleep(1000); // imitate expensive computation
return "event";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.map(i->{
System.out.println(Thread.currentThread().getName() + "----observable map");
return i;
})
.observeOn(Schedulers.newThread())
.subscribe(str -> System.out.println(Thread.currentThread().getName() + "----inputStr=" + str));
System.out.println(Thread.currentThread().getName() + "----end");
Thread.sleep(2000); // <--- wait for the flow to finish. In RxJava the default Schedulers run on daemon threads
Мы используем Observable.fromCallable(), чтобы заменить метод Observable.create самого низкого уровня в code2.1 для создания Observable (т. е. наблюдаемого). Метод fromCallable создает ленивый Observable, и входящий код выполняется только тогда, когда его кто-то прослушивает. (Мы поговорим об этом позже, просто чтобы показать, что существует много способов создать Observable).
Затем укажите пул потоков, выполняемый наблюдателем, через subscribeOn(Schedulers.io()). наблюдатьOn(Schedulers.single()) указывает пул потоков, выполняемый нижестоящим наблюдателем (метод карты на самом деле является наблюдателем). Метод карты, как и многие потоковые API, преобразует каждый элемент восходящего потока в другой. Наконец, текущий нижестоящий наблюдатель формируется черезObservOn(Schedulers.newThread()), то есть пул потоков, выполняемый наблюдателем (лямбда-метод), переданный при последней подписке.
После выполнения приведенного выше кода из напечатанного имени потока видно, что наблюдаемый, карта и наблюдатель — все это разные потоки, и последний «конец» основного потока будет выполняться первым, то есть асинхронный несинхронный поток. блокировка реализована.
3. Как использовать
Эта статья не является интерфейсным документом RxJava, она не будет подробно знакомить с каждым API, но кратко описывает некоторые общие или специальные API для дальнейшего развития возможностей RxJava.
3.1 Основные компоненты
Основной принцип RxJava на самом деле очень прост. Сопоставимо с шаблоном Observer.ObservableЭто наблюдаемое, которое производит данные как источник данных.ObserverЭто наблюдатель, который использует восходящие источники данных.
Каждый Observable может зарегистрировать несколько наблюдателей. Но по умолчанию метод подписки Observable будет вызываться всякий раз, когда происходит регистрация. Если вы хотите создать его только один раз, вы можете вызвать метод Observable.cached.
Существует также несколько вариантов наблюдаемого Observable, например Single и Flowable. Single представляет источник данных, который создает только один элемент. Flowable — это источник данных, поддерживающий противодавление. Благодаря конструкции обратного давления нижестоящий слушатель может передавать информацию вышестоящему, что позволяет реализовать функцию управления скоростью отправки.
Observable и Observer упаковываются слой за слоем через шаблон декоратора для последовательного соединения. API-интерфейсы преобразования, такие как карта и т. д., создадут новый ObservableMap (базовый слой из Observable), оборачивая исходный Observable в качестве источника, и когда он фактически выполняется, сначала выполняется операция преобразования, а затем отправляется в нижестоящий наблюдатель.
SchedulerЭто класс поддержки, предоставляемый RxJava для многопоточного выполнения.Он может обернуть логику выполнения производителя или потребителя в Worker и отправить его в общедоступный пул потоков, предоставляемый фреймворком, например Schedulers.io(), Schedulers. .newThread() Подождите. Для простоты понимания планировщики можно сравнить с пулами потоков, а рабочие процессы — с потоками в пулах потоков. Асинхронная неблокировка может быть достигнута путем указания потоков, выполняемых наблюдателем и наблюдателем, через Observable.subscribeOn и Observable.observeOn соответственно.
Схема базовой архитектуры RxJava выглядит следующим образом:
3.2 Конверсионный API
•
map: см. код 2.2, преобразование один к одному, как и многие потоковые API, преобразует каждый элемент восходящего потока в другой элемент.
•
flatMap: преобразование «один ко многим», преобразует каждый элемент восходящего потока в 0 или более элементов. Аналогия Java8: Stream.flatMap возвращает поток, а Observerable.flatMap возвращает Observerable. Обратите внимание, что этот метод очень мощный, и многие нижние уровни API основаны на этом методе. А поскольку множество наблюдателей, возвращаемых flatMap, не зависят друг от друга, на основе этой функции можно добиться параллелизма.
3.3 API композиции
•
слияние: объединение двух потоков событий в один временной поток.Последовательность объединенных потоков событий согласуется с временной последовательностью прибытия элементов в двух восходящих потоках.
• zip: получать каждый элемент из нескольких восходящих потоков один за другим, объединять их один к одному и отправлять вниз по течению после преобразования. Например, см. код 3.1.
code 3.1
//第一个流每1秒输出一个偶数
Observable<Long> even = Observable.interval(1000, TimeUnit.MILLISECONDS).map(i -> i * 2L);
//第二个流每3秒输出一个奇数
Observable<Long> odd = Observable.interval(3000, TimeUnit.MILLISECONDS).map(i -> i * 2L + 1);
//zip也可以传入多个流,这里只传入了两个
Observable.zip(even, odd, (e, o) -> e + "," + o).forEach(x -> {
System.out.println("observer = " + x);
});
/* 输出如下,可以看到,当某个流有元素到来时,会等待其他所有流都有元素到达时,才会合并处理然后发给下游
observer = 0,1
observer = 2,3
observer = 4,5
observer = 6,7
...
*/
Код кода 3.1 выглядит нормально, два потока выполняются одновременно, и, наконец, zip используется для ожидания их результатов. Но он скрывает очень важную проблему: RxJava по умолчанию является синхронной и блокирующей! ! Когда мы хотим следовать описанному выше методу для одновременной отправки нескольких запросов и, наконец, использовать zip для отслеживания всех результатов, легко обнаружить странное явление.В коде кода 3.2 код ob2 всегда выполняется после ob1. Не то, чтобы два запроса были выполнены одновременно, как мы ожидали. Вы также можете увидеть напечатанное имя потока, что два сингла выполняются последовательно в одном потоке!
code 3.2
// Single是只返回一个元素的Observable的实现类
Single<String> ob1 = Single.fromCallable(() -> {
System.out.println(Thread.currentThread().getName() + "----observable 1");
TimeUnit.SECONDS.sleep(3);
return userService.queryById(1).getName();
});
Single<String> ob2 = Single.fromCallable(() -> {
System.out.println(Thread.currentThread().getName() + "----observable 2");
TimeUnit.SECONDS.sleep(1);
return userService.queryById(1).getName();
});
String s = Single.zip(ob1, ob2,
(e, o) -> {System.out.println(e + "++++" + o);
Так почему же два потока кода 3.1 могут выполняться одновременно? Читая исходный код, вы можете обнаружить, что реализация zip на самом деле заключается в подписке сначала на первый поток, а затем на второй поток, поэтому по умолчанию, конечно же, используется последовательное выполнение. Но поток, созданный Observable.interval, будет отправлен в пул потоков, предоставленный Schedulers.computation() по умолчанию. О пуле потоков мы расскажем позже в этой статье.
3.4 Создать API
• Создать: создать самые оригинальные и подписки, другие методы создаются на основе этого
code 3.3
// 返回的子类是ObservableCreate
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("event");
emitter.onNext("event2");
emitter.onComplete();
}
});
// 订阅observable
observable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println(Thread.currentThread().getName() + " ,TestRx.onSubscribe");
}
@Override
public void onNext(String s) {
System.out.println(Thread.currentThread().getName() + " ,s = " + s);
}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {
System.out.println(Thread.currentThread().getName() + " ,TestRx.onComplete");
}
});
•just : Observable.just("e1","e2"); просто создает Observable, который испускает указанные n элементов. •interval: в коде 3.1 приведен пример создания Observable, который непрерывно генерирует элементы с определенным интервалом По умолчанию он выполняется в пуле потоков, предоставленном Schedulers.comutation() • defer: генерирует Observable, который создается лениво. Небольшой поворот: хотя наблюдаемые объекты, созданные Observable.create, задерживаются, данные будут генерироваться только тогда, когда кто-то подпишется. Но метод создания Observable выполняется сразу. Метод Observable.defer начнет создавать Observable только тогда, когда кто-то подпишется. Например, код Code3.4
public String myFun() {
String now = new Date().toString();
System.out.println("myFun = " + now);
return now;
}
public void testDefer(){
// 该代码会立即执行myFun()
Observable<String> ob1 = Observable.just(myFun());
// 该代码会在产生订阅时,才会调用myFun(), 可类比Java8的Supplier接口
Observable<String> ob2 = Observable.defer(() -> Observable.just(myFun()) );
}
• fromCallable : Создает отложенно созданный Observable, упрощенный метод отсрочки. Observable.fromCallable(() -> myFun()) эквивалентно Observable.defer(() -> Observable.just(myFun()) );
4. Обоснование
Код RxJava является воплощением шаблона наблюдателя + шаблона декоратора.
4.1 Observable.create
См. код 3.3, метод create получает объект интерфейса ObserverableOnSubscribe, мы определяем код для отправки элемента, а метод create возвращает объект типа ObserverableCreate (унаследованный от абстрактного класса Observerable). Следуйте исходному коду метода create и напрямую возвращайте новый ObserverableCreate, который упаковывает исходный объект, то есть входящий ObserverableOnSubscribe.
code4.1
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
//onAssembly默认直接返回ObservableCreate
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
Метод Create такой же простой, просто помните, что он возвращает Observer, который упаковывает исходный код.
4.2 Observerable.subscribe(observer)
Посмотрите, что происходит, когда отношение подписки (observalbe.subscribe) создается в коде 3.3:
code4.2
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) {... } catch (Throwable e) {... }
}
Observable — это абстрактный класс, который определяет окончательный метод подписки, который в конечном итоге вызовет subscribeActual(observer), а subscribeActual — это метод, реализованный подклассами.Естественно, нам нужно посмотреть на метод, реализованный ObserverableCreate.
code4.3
//ObserverableCreate实现的subscribeActual方法
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent); //source是ObservableOnSubscribe,即我们写的生产元素的代码
} catch (Throwable ex) {...}
}
1. Оберните наблюдателя-наблюдателя в CreateEmitter. 2. Вызвать метод onSubscribe наблюдателя и передать эмиттер. 3. Вызовите метод подписки источника (т. е. интерфейса производственного кода) и передайте эмиттеру.
На втором шаге напрямую вызывается написанный нами метод onSubscribe потребителя, что хорошо понятно, то есть метод обратного вызова для создания отношения подписки.
Основное внимание уделяется третьему шагу, source.subscribe(parent); этот родитель является эмиттером, обертывающим наблюдателя. Помните, что источник — это код, который мы написали для отправки события. Который вручную вызывает emitter.onNext() для отправки данных. Итак, что мы делаем CreateEmitter.onNext()?
code4.4
public void onNext(T t) {
if (t == null) {...}
if (!isDisposed()) { observer.onNext(t); }
}
!isDisposed() определяет, что если отношения подписки не были отменены, вызовитеObserver.onNext(t); этот наблюдатель является написанным нами потребителем.В коде 3.3 мы переписали его метод onNext для печати полученного элемента.
Вышеизложенное является самым основным принципом RxJava.На самом деле, логика очень проста.При создании отношения подписки непосредственно вызывается код производственной логики, а затем вызываетсяObserver.onNextв onNext производственной логики. Временная диаграмма выглядит следующим образом.
Очевидно, самый основной принцип заключается в том, чтобы полностью отделить отношения от асинхронных обратных вызовов и многопоточности.
4.2 Observable.map
Посмотрите, что делает API конвертации с простейшим методом карты.
Как и в коде 2.1, вызов метода map и передача функции преобразования могут преобразовать вышестоящий элемент в другой тип элемента один к одному.
code4.5
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
code4.5 является последним методом карты, определенным Observable.Можно видеть, что метод карты оборачивает это (исходный наблюдатель) и преобразователь функции преобразования в ObservableMap (ObservableMap также наследует Observable), а затем возвращает этот ObservableMap (onAssembly делает ничего по умолчанию).
Поскольку обслуживать также наблюдаемый, поэтому его метод подписки будет вызываться, когда слои создают подписчик, определяется следующий метод подписки.
code4.6
//ObservableMap的subscribeActual
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
Вы можете видеть, что в subscribeActual ObservableMap исходный наблюдатель t и функция функции преобразования заключены в новый наблюдатель MapObserver, и он подписан на источник наблюдателя.
Мы знаем, что при отправке данных будет вызываться onNext наблюдателя, поэтому смотрим на метод onNext MapObserver
code4.7
@Override
public void onNext(T t) {
if (done) {return; }
if (sourceMode != NONE) { actual.onNext(null);return;}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {...}
actual.onNext(v);
}
В коде 4.7 вы можете видеть, что mapper.apply(t) применяет средство отображения функции преобразования к каждому элементу t, получает v после преобразования и, наконец, вызывает fact.onNext(v) для отправки v нижестоящему наблюдателю. code4 t) передается при создании MapObserver в версии .6.
Подводя итог принципам преобразования API, таких как карта:
1. Метод map возвращает ObservableMap, который заключает в себе исходный наблюдатель t и функцию функции преобразования. 2. ObservableMap наследуется от AbstractObservableWithUpstream (который наследуется от Observable). subscribeActual4 класса реализации. Создайте MapObserver (оборачивающий исходный наблюдатель) в ObservableMap.subscribeActual и подпишитесь на исходный Observable5. Когда onNext вызывается для отправки данных, сначала примените операцию преобразования, а затем вызовите onNext исходного наблюдателя , который передается нижестоящему наблюдателю
4.3 Планирование потоков
Пример планирования потоков приведен в коде 2.2. subscribeOn(Schedulers.io()) указывает пул потоков, который должен выполняться наблюдателем. наблюдатьOn(Schedulers.single()) указывает пул потоков для выполнения нижестоящего наблюдателя. После вышеизложенного изучения естественно понять, что принцип заключается в том, чтобы использовать режим декоратора, чтобы слой за слоем обернуть Observable и Observer и кинуть их в пул потоков для выполнения. В качестве примера возьмемObservOn(), см. код 4.8.
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
//observeOn(Scheduler) 返回ObservableObserveOn(继承自Observable)
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
// Observable的subscribe方法最终会调用到ObservableObserveOn.subscribeActual方法
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
//创建一个ObserveOnObserver包装了原观察者、worker,把它订阅到source(原observable)
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
1.observeOn(Scheduler) возвращает ObservableObserveOn2.ObservableObserveOn наследуется от Observable3, поэтому метод подписки в конечном итоге вызовет метод subscribeActual, переопределенный ObservableObserveOn
По логике Observer, при отправке данных будет вызываться метод onNext, поэтому смотрим на метод onNext ObserveOnObserver:
code4.9
public void onNext(T t) {
if (done) { return; }
if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t);}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this); //this是ObserveOnObserver,他同样实现了Runable
}
}
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal(); //最终会调用actual.onNext(v) , 即调用被封装的下游观察者,v是emmiter
}
}
1. Когда onNext вызывается в финальном коде производителя, будет вызываться метод schedule 2. В методе schedule он отправляет себя (ObserveOnObserver) в пул потоков 3. Метод run вызывает onNext (emmiter)
Видно, что механизм планирования потоков RxJava заключается в отправке кода onNext (emmiter) отправляющего элемента в пул потоков для выполнения черезObservOn (Scheduler).
5. Будьте осторожны
Наконец, дайте несколько мер предосторожности, которые мы суммировали в разработке, чтобы никто не наступил на яму.
5.1 Применимые сценарии
Не все операции ввода-вывода и асинхронные обратные вызовы нужно решать с помощью RxJava, например, если мы представляем собой только комбинацию одного или двух вызовов службы RPC, или каждый запрос представляет собой независимую логику обработки, то внедрение RxJava не принесет многого. доход. Несколько наиболее подходящих сценариев приведены ниже.
• Обработка событий пользовательского интерфейса. • Асинхронный ответ и обработка результатов ввода-вывода. • События или данные передаются производителями вне их контроля. • Объединение полученных событий.
Ниже приведен сценарий использования данных пакетной добавки для продуктов Xianyu:
Исходная информация: Алгоритм рекомендует некоторые продукты пользователя. В настоящее время существует только основная информация, и необходимо вызвать несколько бизнес-интерфейсов, чтобы дополнить дополнительную бизнес-информацию о пользователе и продукте, такую как аватар пользователя, ссылка на видео о продукте, первое изображение продукта и т. д. И в соответствии с различными типами товаров заполните различную вертикальную бизнес-информацию.
Трудности: 1. Множественные интерфейсы имеют переднюю и заднюю зависимости или даже перекрестные зависимости 2. Каждый интерфейс может таймаутить или сообщать об ошибке, что в свою очередь влияет на последующую логику 3. В соответствии с характеристиками разных зависимых интерфейсов необходимо контролировать тайм-аут и откат отдельно. Весь интерфейс также должен установить общий тайм-аут и откат.
Решение. Если это просто асинхронный запрос с несколькими независимыми интерфейсами, то CompletableFuture можно использовать полностью. Но из-за его недружественной поддержки комбинации, тайм-аута и отката он не подходит для этого сценария. В итоге мы использовали RxJava для его реализации. Ниже приведена грубая логика кода. HsfInvoker в коде — это класс инструмента в Ali, который преобразует обычный интерфейс HSF в интерфейс Rx.По умолчанию он работает в отдельном пуле потоков, поэтому он может реализовывать параллельные вызовы.
// 查找当前用户的所有商品
Single<List<IdleItemDO>> userItemsFlow =
HSFInvoker.invoke(() -> idleItemReadService.queryUserItems(userId, userItemsQueryParameter))
.timeout(300, TimeUnit.MILLISECONDS)
.onErrorReturnItem(errorRes)
.map(res -> {
if (!res.isSuccess()) {
return emptyList;
}
return res.getResult();
})
.singleOrError();
//补充商品,依赖userItemsFlow
Single<List<FilledItemInfo>> fillInfoFlow =
userItemsFlow.flatMap(userItems -> {
if (userItems.isEmpty()) {
return Single.just(emptyList);
}
Single<List<FilledItemInfo>> extraInfo =
Flowable.fromIterable(userItems)
.flatMap(item -> {
//查找商品extendsDo
Flowable<Optional<ItemExtendsDO>> itemFlow =
HSFInvoker.invoke(() -> newItemReadService.query(item.getItemId(), new ItemQueryParameter()))
.timeout(300, TimeUnit.MILLISECONDS)
.onErrorReturnItem(errorRes)
.map(res -> Optional.ofNullable(res.getData()));
//视频url
Single<String> injectFillVideoFlow =
HSFInvoker.invoke(() -> videoFillManager.getVideoUrl(item))
.timeout(100, TimeUnit.MILLISECONDS)
.onErrorReturnItem(fallbackUrl);
//填充首图
Single<Map<Long, FrontCoverPageDO>> frontPageFlow =
itemFlow.flatMap(item -> {
...
return frontCoverPageManager.rxGetFrontCoverPageWithTpp(item.id);
})
.timeout(200, TimeUnit.MILLISECONDS)
.onErrorReturnItem(fallbackPage);
return Single.zip(itemFlow, injectFillVideoFlow, frontPageFlow, (a, b, c) -> fillInfo(item, a, b, c));
})
.toList(); //转成商品List
return extraInfo;
});
//头像信息
Single<Avater> userAvaterFlow =
userAvaterFlow = userInfoManager.rxGetUserAvaters(userId).timeout(150, TimeUnit.MILLISECONDS).singleOrError().onErrorReturnItem(fallbackAvater);
//组合用户头像和商品信息,一并返回
return Single.zip(fillInfoFlow, userAvaterFlow,(info,avater) -> fillResult(info,avater))
.timeout(300, TimeUnit.MILLISECONDS)
.onErrorReturn(t -> errorResult)
.blockingGet(); //最后阻塞式的返回
Видно, что с введением RxJava стало удобнее поддерживать управление тайм-аутом, итоговую стратегию, обратный вызов запроса и комбинирование результатов.
5.2 Пул потоков планировщика
RxJava2 имеет несколько встроенных реализаций планировщика, но мы рекомендуем использовать Schedulers.from(executor) для указания пула потоков, что позволяет избежать использования общедоступного пула потоков по умолчанию, предоставляемого платформой, предотвращая блокировку выполнения одной длинной задачей других потоков. , или создание слишком большого количества потоков. Потоки вызывают неработоспособность.
5.3 CompletableFuture
Когда наша логика относительно проста и мы хотим асинхронно вызывать только одну или две службы RPC, мы можем полностью рассмотреть возможность использования реализации CompletableFuture, предоставляемой Java 8. По сравнению с Future она выполняется асинхронно и также может реализовывать простую комбинационную логику.
5.4 Параллелизм
Один Observable всегда выполняется последовательно, и onNext() не может быть вызван одновременно.
code5.1
Observable.create(emitter->{
new Thread(()->emitter.onNext("a1")).start();
new Thread(()->emitter.onNext("a2")).start();
})
Однако каждый Observable может выполняться независимо и одновременно.
code5.2
Observable ob1 = Observable.create(e->new Thread(()->e.onNext("a1")).start());
Observable ob2 = Observable.create(e->new Thread(()->e.onNext("a2")).start());
Observable ob3 = Observable.merge(ob1,ob2);
Два потока, ob1 и ob2, объединены в ob3, и каждый поток независим. (Здесь следует отметить, что эти два потока могут выполняться одновременно, и есть условие, что их отправляющий код выполняется в разных потоках, как и в примерах в code3.1 и code3.2, хотя два потока независимы, но Если он не передается в разные потоки, он все равно выполняется последовательно).
5.5 Противодавление
В RxJava 2.x противодавление поддерживает только тип Flowable. Конечно, проблемы, которые может решить Observable, могут быть решены и с помощью Flowable. Однако дополнительная логика, которую он добавляет для поддержки противодавления, заставляет Flowable работать намного медленнее, чем Observable, поэтому рекомендуется использовать Flowable только при работе со сценариями противодавления. Если можно определить, что восходящий и нисходящий потоки работают в одном и том же потоке или что восходящий и нисходящий потоки работают в разных потоках, а нисходящий поток обрабатывает данные быстрее, чем восходящий поток выдает данные, не будет проблемы обратного давления и не будет нужно использовать Flowable. Что касается использования Flowable, то из-за недостатка места в этой статье описываться не будет.
5.6 Тайм-аут
Настоятельно рекомендуется установить время ожидания для асинхронных вызовов, а также использовать методы timeout и onErrorReturn для установки нижней логики таймаута, иначе запрос всегда будет занимать поток Observable, и при поступлении большого количества запросов, это также вызовет OOM.
6. Заключение
В настоящее время многие бизнес-сценарии Xianyu используют RxJava для асинхронной обработки, что значительно снижает затраты разработчиков на асинхронную разработку. В то же время он имеет хорошую производительность при комбинации ответов на несколько запросов и одновременной обработке. Встроенная логика тайм-аута и восходящая стратегия могут обеспечить надежность пакетной обработки бизнес-данных, что является надежной поддержкой для бесперебойной работы пользователей.