Оригинальная ссылка:blog.angular в деталях.com/learn-to-co…
Эта статьяКитайское сообщество RxJSПереведенные статьи, если нужно перепечатать, просьба указывать источник, спасибо за сотрудничество!
Если вы хотите пойти с нами, переводя на более качественную статью RxJS, посвященную вам, пожалуйста, нажмите на【здесь】
При разработке приложений значительной сложности обычно используется более одного источника данных. Этими источниками данных могут быть несколько внешних точек данных, таких как Firebase, или несколько компонентов пользовательского интерфейса, с которыми взаимодействует пользователь. Составление последовательностей — это метод, который позволяет создавать сложные запросы к нескольким источникам данных путем объединения этих связанных нескольких потоков данных в один поток данных. RxJS предоставляет множество операторов, которые помогут вам с этой задачей, в этой статье мы представим некоторые из наиболее часто используемых операторов.
В процессе написания этой статьи я спроектировал и создал несколько интуитивно понятных диаграмм потока данных, чтобы лучше показать разницу между всеми операторами, что сделало меня почти профессиональным аниматором на полставки. Однако все диаграммы встроены в эту статью в виде анимированных GIF-файлов, поэтому их загрузка занимает некоторое время. Пожалуйста, также будьте терпеливы.
В коде, представленном в этой статье, я буду использовать оператор pipeable, если вы с ним не знакомы, вы можетенажмите здесь, чтобы посмотреть. Я бы также использовал обычайstream
Оператор, который будет первым аргументом, что и имя подписного потока непрерывно сгенерировано асинхронно отправленным значением.
Ниже приведено описание типов диаграмм, используемых в этой статье:
Объединение нескольких потоков одновременно
Первый оператор, который мы введем, этоmerge. Этот оператор может объединять несколько потоков, а затем одновременно выдавать все значения в каждом входном потоке. Как только значения создаются во входном потоке, эти значения выдаются как часть потока результатов. Этот процесс часто упоминается в документации как выравнивание.
Когда все входные потоки будут завершены, результирующий поток будет завершен.Если какой-либо входной поток сообщит об ошибке, результирующий поток сообщит об ошибке. Если входной поток не завершен, результирующий поток не будет завершен.
Если вас интересуют только все значения из нескольких потоков, как если бы они были созданы одним потоком, и вам не важен порядок, в котором выдаются значения, используйтеmerge
оператор.
В анимации ниже вы можете увидетьmerge
оператор объединяет два потокаA
а такжеB
, каждый из двух потоков создает по 3 значения, которые передаются в результирующий поток каждый раз при генерации значения.
Вот пример кода для сопровождения анимации:
const a = stream('a', 200, 3, 'partial');
const b = stream('b', 200, 3, 'partial');
merge(a, b).subscribe(fullObserver('merge'));
// 还可以使用实例操作符
// a.pipe(merge(b)).subscribe(fullObserver('merge'));
Редактируемая онлайн-демонстрация stackblitz:combining-sequences-merge.stackblitz.io
Последовательное объединение нескольких потоков
Далее будет представлен операторconcat. Он подписывается на каждый входной поток по порядку и выдает в нем все значения, в каждый момент времени существует только одна подписка. Только когда текущий входной поток будет завершен, он подпишется на следующий входной поток и передаст его значение в результирующий поток.
Когда все входные потоки будут завершены, результирующий поток будет завершен.Если какой-либо входной поток сообщит об ошибке, результирующий поток сообщит об ошибке. Если входной поток не завершится, результирующий поток не завершится, а это означает, что на некоторые потоки никогда не будет подписки.
Если порядок, в котором выдаются значения, важен, и вы хотите, чтобы первый входной поток, переданный оператору, выдавал значения первым, используйтеconcat
оператор. Например, есть два потока: один извлекает значения из кеша, а другой извлекает значения с удаленного сервера. Если вы хотите объединить их и убедиться, что значение в кеше выдается первым, вы можете использоватьconcat
.
На анимации ниже вы можете увидетьconcat
оператор объединяет два потокаA
а такжеB
, каждый из двух потоков генерирует 3 значения, сначалаA
Испущенное значение передается в поток результатов, а затемB
.
Вот пример кода для сопровождения анимации:
const a = stream('a', 200, 3, 'partial');
const b = stream('b', 200, 3, 'partial');
concat(a, b).subscribe(fullObserver('concat'));
// 还可以使用实例操作符
// a.pipe(concat(b)).subscribe(fullObserver(‘concat’));
Редактируемая онлайн-демонстрация stackblitz:concat.stackblitz.io
Пусть несколько потоков конкурируют
Операторы, которые будут представлены далееraceВводит довольно интересное понятие. Он выполняет любую комбинацию потоков сам по себе, но выбирает первый поток, который дает значение. Как только первый поток выдает значение, другие потоки отписываются и полностью игнорируются.
Когда выбранный поток завершен, результирующий поток также завершается.Если выбранный поток сообщает об ошибке, результирующий поток также сообщает об ошибке. Аналогично, если выбранный поток не завершится, результирующий поток никогда не завершится.
Этот оператор полезен, когда есть несколько потоков, предоставляющих значения, например, у продукта есть серверы по всему миру, но из-за сетевых условий задержка непредсказуема и сильно варьируется. использоватьrace
Если это так, вы можете отправить один и тот же запрос в несколько источников данных, а затем использовать результат первого ответного запроса.
На анимации ниже вы можете увидетьrace
оператор объединяет два потокаA
а такжеB
, каждый из двух потоков выдает по 3 значения, но толькоA
испускается, потому что он выдал значение первым.
const a = intervalProducer(‘a’, 200, 3, ‘partial’);
const b = intervalProducer(‘b’, 500, 3, ‘partial’);
race(a, b).subscribe(fullObserver(‘race’));
// 还可以使用实例操作符
// a.pipe(race(b)).subscribe(fullObserver(‘race’));
Редактируемая онлайн-демонстрация stackblitz:combining-sequences-race-b-is-ignored.stackblitz.io
Используйте наблюдаемые более высокого порядка для объединения неизвестного количества потоков
Введенные ранее операторы, как статические, так и инстансированные, можно использовать только для объединения известного количества потоков, но что, если вы не знаете, какие потоки объединять? На самом деле это очень распространенный асинхронный сценарий. Например, сетевой запрос инициирует некоторые другие запросы на основе результата возвращаемого значения.
RxJS предоставляет несколько операторов для получения потоков из потоков, также известных как наблюдаемые объекты более высокого порядка. Эти операторы примут значение внутреннего потока и будут работать в соответствии с правилами композиции, описанными в предыдущей главе.
Эти операторы также потерпят неудачу, если произойдет сбой каких-либо внутренних потоков, и они могут использовать только операторы экземпляра. Теперь давайте представим их по одному.
MergeAll
Этот оператор объединяет значения, испускаемые всеми внутренними потоками, как если быmerge
оператор, является параллельным.
На анимации ниже вы можете увидеть поток более высокого порядка.H
, который генерирует два внутренних потокаA
а такжеB
.mergeAll
Оператор будет объединять значения из двух потоков, и значение будет передаваться в результирующий поток всякий раз, когда значение испускается.
Вот пример кода для сопровождения анимации:
const a = stream(‘a’, 200, 3);
const b = stream(‘b’, 200, 3);
const h = interval(100).pipe(take(2), map(i => [a, b][i]));
h.pipe(mergeAll()).subscribe(fullObserver(‘mergeAll’));
Редактируемая онлайн-демонстрация stackblitz:merge-all.stackblitz.io.
ConcatAll
Этот оператор будет объединять значения, испускаемые всеми внутренними потоками, как если быconcat
оператора, который подключен по порядку.
На анимации ниже вы можете увидеть поток более высокого порядка.H
, который генерирует два внутренних потокаA
а такжеB
.concatAll
оператор первый из потокаA
значение из потока, а затемB
принимает значение и передает все значения в поток результатов.
Вот пример кода для сопровождения анимации:
const a = stream(‘a’, 200, 3);
const b = stream(‘b’, 200, 3);
const h = interval(100).pipe(take(2), map(i => [a, b][i]));
h.pipe(concatAll()).subscribe(fullObserver(‘concatAll’));
Редактируемая онлайн-демонстрация stackblitz:concat-all.stackblitz.io.
SwitchAll
Иногда полученное значение из всего внутреннего потока не то, что мы хотим. В некоторых сценариях нас может интересовать только значение последнего внутреннего потока. Хороший пример — поиск. Когда пользователь вводит ключевое слово, он отправляет запрос на сервер, поскольку запрос является асинхронным, результат возвращает наблюдаемый запрос. Что произойдет, если пользователь обновит поле поиска по ключевому слову до того, как будут возвращены результаты запроса? Будет выдан второй запрос, сейчас отправил два запроса на сервер. Однако результаты первого поиска пользователя уже не волнуют. Причем, если первый результат поиска, если позже, чем второй результат поиска, то (Примечание переводчика: например, сервер дважды раздает Запрос не на один и тот же узел), то пользователь увидит результат будет с первого раза, это заставляет людей чувствовать себя обеспокоенными. Мы не хотим, чтобы это произошло, поэтомуswitchAll
Куда приходят операторы. Он подпишется только на последний внутренний поток и проигнорирует (Примечание переводчика: игнорировать = отказаться от подписки) предыдущий внутренний поток.
На анимации ниже вы можете увидеть поток более высокого порядка.H
, который генерирует два внутренних потокаA
а такжеB
.switchAll
оператор первый из потокаA
Возьмем значение, когда текущийB
При отключении конвекцииA
подпишись, потом из стримаB
принимает значение и передает значение в поток результатов.
Вот пример кода для сопровождения анимации:
const a = stream(‘a’, 200, 3);
const b = stream(‘b’, 200, 3);
const h = interval(100).pipe(take(2), map(i => [a, b][i]));
h.pipe(switchAll()).subscribe(fullObserver(‘switchAll’));
Редактируемая онлайн-демонстрация stackblitz:switch-all.stackblitz.io.
concatMap, mergeMap и switchMap
Интересно то, что операторы карты concatMap, mergeMap и switchMap используются гораздо чаще, чем соответствующие им операторы, работающие с наблюдаемыми более высокого порядка.concatAll
,mergeAll
а такжеswitchAll
. Но если подумать, то они почти одинаковы. все*Map
Операторы фактически генерируют наблюдаемые более высокого порядка в два этапа, сначала отображают их на наблюдаемые более высокого порядка, а затем обрабатывают внутренний поток, сгенерированный наблюдаемыми более высокого порядка, посредством соответствующей комбинационной логики.
Давайте посмотрим на предыдущийmeregeAll
Пример кода для оператора:
const a = stream('a', 200, 3);
const b = stream('b', 200, 3);
const h = interval(100).pipe(take(2), map(i => [a, b][i]));
h.pipe(mergeAll()).subscribe(fullObserver('mergeAll'));
map
оператор генерирует наблюдаемые более высокого порядка, затемmergeAll
Оператор объединяет значения этих внутренних потоков, используяmergeMap
можно легко заменитьmap
а такжеmergeAll
,так:
const a = stream('a', 200, 3);
const b = stream('b', 200, 3);
const h = interval(100).pipe(take(2), mergeMap(i => [a, b][i]));
h.subscribe(fullObserver('mergeMap'));
Результат обоих фрагментов кода абсолютно одинаков.concatMap
а такжеswitchMap
То же самое, проверьте сами.
Объединяйте потоки в паре
Все предыдущие операторы позволяли нам сглаживать несколько потоков и передавать значения из этих потоков в результирующий поток без изменений, как если бы все значения поступали из одного и того же потока. Набор операторов, которые мы рассмотрим далее, принимает несколько потоков в качестве входных данных, но отличается тем, что они объединяют значения из каждого потока и создают одно комбинированное значение в качестве значения в результирующем потоке.
Каждый оператор необязательно принимает в качестве конечного аргумента проекционную функцию, определяющую, как объединяются значения в результирующем потоке. В примерах в этой статье я буду использовать функцию приведения по умолчанию, которая просто объединяет значения с запятыми в качестве разделителей. В конце этого раздела я покажу, как использовать пользовательскую функцию проекции.
CombineLatest
Первый введенный операторcombineLatest
. Используйте его, чтобы взять последние значения из нескольких входных потоков и преобразовать эти значения в одно значение для передачи в поток результатов. RxJS кэширует последнее значение в каждом входном потоке, и только когда все входные потоки выдают хотя бы одно значение, он будет использовать функцию проекции (брать последнее значение из предыдущего кэша) для вычисления значения результата, а затем передавать поток результатов в вычисленное значение.Результирующее значение выдается.
Когда все входные потоки будут завершены, результирующий поток будет завершен.Если какой-либо входной поток сообщит об ошибке, результирующий поток сообщит об ошибке. Если входной поток не завершен, результирующий поток не будет завершен. Другими словами, если какой-либо входной поток завершается без выдачи значения, то результирующий поток также завершится и не выдаст никакого значения при завершении, потому что нет способа получить значение из завершенного входного потока и поместить его в поток результатов. Кроме того, если входной поток не выдает значение и не завершается, тоcombineLatest
никогда не выдаст значение и не завершится по той же причине, что и выше, он будет ждать, пока все входные потоки не выдадут значение.
Этот оператор полезен, если вам нужно оценить некоторую комбинацию состояний, и снова, когда одно из государств меняется. Самый простой пример - система мониторинга. Каждый сервис может быть представлен потоком, который возвращает логическое значение для определения того, доступна ли услуга. Состояние мониторинга будет зеленым, когда все услуги доступны, поэтому функция ряд должна просто выполнять логическую и операцию.
На анимации ниже вы можете увидетьcombineLatest
оператор объединяет два потокаA
а такжеB
. Как только все входные потоки выдали хотя бы одно значение, результирующий поток выдаст комбинированные значения:
Вот пример кода для сопровождения анимации:
const a = stream('a', 200, 3, 'partial');
const b = stream('b', 500, 3, 'partial');
combineLatest(a, b).subscribe(fullObserver('latest'));
Редактируемая онлайн-демонстрация stackblitz:combine-latest.stackblitz.io.
Zip
zip
Способ слияния операторов тоже очень интересный, механика чем-то похожа на молнии на одежде или сумках. Он объединяет соответствующие значения из двух или более входных потоков в кортеж (пару в случае двух входных потоков). Он ожидает, пока все входные потоки выдадут соответствующие значения, затем использует функцию проецирования для преобразования их в одно значение и выдает их в потоке результатов. Поток результатов выдает значения только тогда, когда соответствующие новые значения собираются вместе из каждого входного потока, поэтому, если один из входных потоков выдает значения быстрее, чем другой, скорость, с которой выдается значение результата, будет определяется разницей между двумя входными потоками, тем медленнее решение.
Когда любой входной поток завершен, поток результатов будет завершен после отправки значения от других входных потоков. Если какой-либо входной поток никогда не завершен, поток результатов никогда не будет завершен. Если какой-либо входной поток неверный, поток результатов также сообщит об ошибке.
использоватьzip
Оператор можно реализовать очень просто, используя таймер для генерации потока значений диапазона. Вот базовый пример, где функция приведения используется для возвратаrange
Значение в потоке:
zip(range(3, 5), interval(500), v => v).subscribe();
На анимации ниже вы можете увидетьzip
оператор объединяет два потокаA
а такжеB
. Как только соответствующие значения объединены в пары, результирующий поток выдает объединенное значение:
const a = stream('a', 200, 3, 'partial');
const b = stream('b', 500, 3, 'partial');
zip(a, b).subscribe(fullObserver('zip'));
Редактируемая онлайн-демонстрация stackblitz:zip.stackblitz.io.
forkJoin
Иногда есть набор потоков, но вас интересует только последнее значение в каждом потоке. Обычно эти потоки также имеют только одно значение. Например, вы хотите сделать несколько сетевых запросов и выполнить действие только тогда, когда все запросы вернули результаты. Функция этого оператора такая же, какPromise.allаналогичный. Однако, если в потоке более одного значения, все значения, кроме последнего, будут проигнорированы.
Поток результатов выдает уникальное значение только тогда, когда все входные потоки завершены. Если какой-либо входной поток не завершен, результирующий поток никогда не будет завершен.Если какой-либо входной поток сообщит об ошибке, результирующий поток также сообщит об ошибке.
На анимации ниже вы можете увидетьforkJoin
оператор объединяет два потокаA
а такжеB
.当所有输入流都完成后,结果流将每个输入流中的最后一个值组合起来并发出:
Вот пример кода для сопровождения анимации:
const a = stream('a', 200, 3, 'partial');
const b = stream('b', 500, 3, 'partial');
forkJoin(a, b).subscribe(fullObserver('forkJoin'));
Редактируемая онлайн-демонстрация stackblitz:fork-join.stackblitz.io.
WithLatestFrom
Последний оператор, представленный в этой статье,withLatestFrom
. Этот оператор можно использовать, когда есть основной поток и также требуются последние значения других потоков.withLatestFrom
а такжеcombineLatest
чем-то похож, разница в том, чтоcombineLatest
заключается в том, что когда любой входной поток выдает значение, результирующий поток выдает новое значение, иwithLatestFrom
Разве что только когда основной поток выдает значение, результирующий поток выдает новое значение.
так какcombineLatest
,withLatestFrom
Будет ждать, пока каждый входной поток выдаст хотя бы одно значение, и когда основной поток завершится, возможно, что результирующий поток никогда не выдаст значение после завершения. Если основной поток не завершен, поток результатов никогда не будет завершен, и если какой-либо входной поток сообщит об ошибке, поток результатов также сообщит об ошибке.
На анимации ниже вы можете увидетьwithLatestFrom
оператор объединяет два потокаA
а такжеB
,B
является основным потоком. каждый разB
При передаче нового значения результирующий поток будет использоватьA
чтобы выдать комбинированное значение с последним значением в:
Вот пример кода для сопровождения анимации:
const a = stream('a', 3000, 3, 'partial');
const b = stream('b', 500, 3, 'partial');
b.pipe(withLatestFrom(a)).subscribe(fullObserver('latest'));
Редактируемая онлайн-демонстрация stackblitz:with-latest-from.stackblitz.io.
Функция проекции
Как упоминалось в начале этого раздела, все операторы, которые объединяют потоки путем спаривания, могут принимать дополнительную функцию проецирования. Эта функция определяет, как преобразуется результирующее значение. Используя функцию проекции, вы можете выбрать, выдавать ли только значения из определенного входного потока или каким-либо образом объединять значения:
// 返回第二个流中的值
zip(s1, s2, s3, (v1, v2, v3) => v2)
// 使用 - 作为分隔符来连接值
zip(s1, s2, s3, (v1, v2, v3) => `${v1}-${v2}-${v3}`)
// 返回单个布尔值
zip(s1, s2, s3, (v1, v2, v3) => v1 && v2 && v3)
Если вы хотите увидеть все анимации в одном месте, см.Pierre Criulanscyэтоgist.