Учебник по RxJS

внешний интерфейс JavaScript Promise RxJS

начиная


основная концепция:

  • Наблюдаемый (наблюдаемый объект):Представляет концепцию, которая представляет собой вызываемую коллекцию будущих значений или событий.
  • Наблюдатель:Набор функций обратного вызова, которые знают, как прослушивать значения, предоставляемые Observable.
  • **Подписка (подписка): ** представляет выполнение Observable, которое в основном используется для отмены выполнения Observable.
  • Оператор:Чистые функции в стиле функционального программирования с использованием таких операторов, как map, filter, concat, flatMap и т. д. для работы с коллекциями.
  • Тема:Эквивалентен EventEmitter и является единственным способом мультиплексирования значения или события для нескольких наблюдателей.
  • Планировщики:: используется для управления параллелизмом и является централизованным планировщиком, который позволяет нам координировать, когда происходят вычисления, такие как setTimeout или requestAnimationFrame или другие.

Наблюдаемый (наблюдаемый объект)


НаблюдаемыеЛенивое нажатие нескольких значенийсобирать. Он заполняет пробелы в таблице ниже:

одно значение несколько значений
Вытащить Function
толкать Promise

Тянуть против Толкать

Pull и Push — это два разных протокола, которые описывают, как производители данных взаимодействуют с потребителями данных.

Вытащить?Потребитель сам решает, когда получать данные от производителя, сам производитель не знает, когда данные доставляются потребителю.

Каждая функция Javascript — это вытягивающая система. Производитель функциональных данных, которые потребляет код, вызывающий функцию, путем получения единственного возвращаемого значения из вызова функции.

режиссер потребитель
Вытащить Пассивный: выдает данные по запросу.
толкать Активный: генерируйте данные в своем собственном темпе.

толкать?Производитель сам решает, когда отправлять данные потребителю. Потребители сами не знают, когда принимать данные

Обещания являются наиболее распространенным типом push-системы. Промисы (производители) передают разрешенное значение зарегистрированным функциям обратного вызова (потребителям), но, в отличие от функций, промисы решают, когда «протолкнуть» значение в функцию обратного вызова.

RxJS представляет Observables, новую систему push-уведомлений javascript. Observable — это несколько производителей значений и отправляет значения наблюдателям (потребителям)

  • Functionэто ленивая операция вычисления, которая синхронно возвращает одно значение при вызове
  • Generatorэто операция ленивой оценки, которая синхронно возвращает от нуля до бесконечного числа значений при вызове
  • Promiseэто операция, которая в конечном итоге может вернуть значение
  • Observable— это операция ленивой оценки, которая может возвращать от нуля до бесконечного числа значений с момента ее вызова или асинхронно.

Анатомия наблюдаемого

  • Создать наблюдаемый
  • Подпишись на Наблюдательный
  • Выполнить наблюдаемое
  • очистить

Создать наблюдаемый

RX.Observable.create — это псевдоним конструктора Observable, который принимает один параметр — функцию подписки.

// 生产者
var observable = RX.Observable.create(function subscribe(observer){
	var id = setInterval(()=>{
		observer.next('hi')
	},1000);
})

Наблюдаемые объекты можно создавать с помощью create, но обычно мы используем так называемые операторы создания, такие как of, from, interval и т. д.

Подпишись на Наблюдательный

// 观察者
observable.subscribe(x=>console.log(x))

Это указывает на то, что вызовы подписки не распределяются между несколькими наблюдателями одного и того же Observable.Каждый вызов observable.subscribe запускает независимую настройку для данного наблюдателя. Observable даже не поддерживает дополнительный список наблюдателей.

Выполнить наблюдаемое

Код... в Observable.create(function subscribe(observer) {...}) означает "Observable executes", что является ленивой операцией, которая выполняется только после подписки каждого наблюдателя. Выполнение дает несколько значений с течением времени, синхронно или асинхронно.

Наблюдаемые исполнения могут передавать три типа значений:

  • Уведомление «Далее»: отправьте значение, например число, строку, объект и т. д.
  • Уведомление об ошибке: отправить сообщение об ошибке или исключении JavaScript.
  • Уведомление «Завершено»: больше никаких значений отправляться не будет.

Вот пример Observable, который отправляет три уведомления «Далее», за которыми следует уведомление «Завершено»:

var observable = Rx.Observable.create(function subscribe(observer) {
	observer.next(1);
	observer.next(2);
	observer.next(3);
	observer.complete();
});

Observable строго подчиняется своей спецификации, поэтому следующий код не отправляет уведомление «Далее» 4:

var observable = Rx.Observable.create(function subscribe(observer) {
	observer.next(1);
	observer.next(2);
	observer.next(3);
	observer.complete();
	observer.next(4); // 因为违反规约,所以不会发送
});

Хорошая идея заключать любой код в подписке с блоком try/catch, который отправляет уведомление «Ошибка» при обнаружении исключения:

var observable = Rx.Observable.create(function subscribe(observer) {
	try {
		observer.next(1);
		observer.next(2);
		observer.next(3);
		observer.complete();
	} catch (err) {
		observer.error(err); // 如果捕获到异常会发送一个错误
	}
});

очистить

Поскольку каждое выполнение является эксклюзивным для соответствующего наблюдателя, после того, как наблюдатель завершит получение значений, у него должен быть способ остановить выполнение, чтобы избежать напрасной траты вычислительной мощности или ресурсов памяти.

При вызове observable.subscribe наблюдатель будет присоединен к вновь созданному исполнению Observable. Этот вызов также возвращает объект Subscription:

var subscription = observable.subscribe(x => console.log(x));

Подписка представляет собой незавершенное выполнение и имеет минимальный API, позволяющий отменить выполнение. Дополнительные сведения о подписках см. в разделе Типы подписок. Используя subscribe.unsubscribe(), вы можете отменить текущее выполнение:

var observable = Rx.Observable.from([10, 20, 30]);
var subscription = observable.subscribe(x => console.log(x));
// 稍后:
subscription.unsubscribe();

Когда вы подписываетесь на Observable, вы получаете подписку, которая представляет текущее выполнение. Выполнение можно отменить, просто вызвав метод unsubscribe().

Наблюдатель


Что такое наблюдатель? — Наблюдатели — это потребители значений, испускаемых Observable. Наблюдатель — это просто набор функций обратного вызова, каждая из которых соответствует типу уведомления, отправляемому Observable: следующее, ошибка и завершение. Следующий пример является типичным объектом-наблюдателем:

var observer = {
  next: x => console.log('Observer got a next value: ' + x),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};
observable.subscribe(observer);

Наблюдатель — это просто объект с тремя функциями обратного вызова, по одной для каждого типа уведомления, отправляемого Observable.

Подписка


Что такое подписка? - Подписка — это объект, представляющий очищаемый ресурс, обычно исполнение Observable. В Subscription есть важный метод unsubscribe, который не требует никаких параметров и используется просто для очистки ресурсов, занятых Subscription. В последней версии RxJS Подписки назывались «Одноразовые» (одноразовые объекты).

var observable = Rx.Observable.interval(1000);
var subscription = observable.subscribe(x => console.log(x));
// 稍后:
// 这会取消正在进行中的 Observable 执行
// Observable 执行是通过使用观察者调用 subscribe 方法启动的
subscription.unsubscribe();
//Subscription 还可以合在一起,这样一个 Subscription 调用 unsubscribe() 方法,可能会有多个 Subscription 取消订阅 。你可以通过把一个 Subscription 添加到另一个上面来做这件事:
var observable1 = Rx.Observable.interval(400);
var observable2 = Rx.Observable.interval(300);

var subscription = observable1.subscribe(x => console.log('first: ' + x));
var childSubscription = observable2.subscribe(x => console.log('second: ' + x));

subscription.add(childSubscription);

setTimeout(() => {
// subscription 和 childSubscription 都会取消订阅
subscription.unsubscribe();
}, 1000);

Подписки также имеют метод remove(otherSubscription), который отменяет добавленную дочернюю подписку.

Тема (тема)


Что такое предмет? — RxJS Subject — это особый тип Observable, который позволяет мультикастировать значения нескольким наблюдателям, поэтомуТема многоадресная,а такжеОбычные наблюдаемые являются одноадресными(каждый подписанный наблюдатель имеет независимое выполнение Observable).

Subject похож на Observable, но может передаваться нескольким наблюдателям. Субъекты также похожи на EventEmitters, поддерживая реестр нескольких слушателей.

Каждый субъект является наблюдаемым— Для Subject можно предоставить наблюдателя и использовать метод подписки, а можно начать нормально получать значения. С точки зрения наблюдателя, он не может сказать, выполняется ли Observable из обычного Observable или из Subject.

Внутри субъекта подписка не вызывает нового выполнения, которое отправляет значение. Он просто регистрирует данного наблюдателя в списке наблюдателей, аналогично тому, как работает addListener в других библиотеках или языках.

Каждый Субъект является наблюдателем.-Subject — это объект со следующими методами: next(v), error(e) и complete(). Чтобы предоставить субъекту новое значение, просто вызовите next(theValue), который передаст значение наблюдателям, зарегистрированным для прослушивания субъекта.

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(1);
subject.next(2);

Поскольку Subject является наблюдателем, это также означает, что вы можете передать Subject в качестве параметра любому методу подписки Observable, как показано в следующем примере:

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

var observable = Rx.Observable.from([1, 2, 3]);

observable.subscribe(subject); // 你可以提供一个 Subject 进行订阅

// 结果
observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

Используя описанный выше метод, мы просто преобразуем одноадресное выполнение Observable в многоадресное через Subject . Это также показывает, что субъекты — единственный способ разделить выполнение любого наблюдаемого между несколькими наблюдателями.

Существуют также специальные типы субъектов: BehaviorSubject, ReplaySubject и AsyncSubject.

Многоадресные наблюдаемые

«Многоадресный наблюдаемый» отправляет уведомления через субъект, у которого может быть несколько подписчиков, тогда как обычный «одноадресный наблюдаемый» отправляет уведомления только одному наблюдателю.

Многоадресная рассылка Observable под капотом использует Subject, чтобы несколько наблюдателей могли видеть выполнение одного и того же Observable.

Под капотом оператор многоадресной рассылки работает следующим образом: Observers подписываются на нижележащий Subject, который, в свою очередь, подписывается на исходный Observable. Следующий пример похож на предыдущий пример с использованием observable.subscribe(subject):

var source = Rx.Observable.from([1, 2, 3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);

// 在底层使用了 `subject.subscribe({...})`:
multicasted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
multicasted.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

// 在底层使用了 `source.subscribe(subject)`:
multicasted.connect();

Оператор многоадресной рассылки возвращает Observable, который выглядит как обычный Observable, но действует как субъект при подписке. Что возвращает многоадресная рассылка, так это ConnectableObservable, который является просто Observable с методом connect().

Метод connect() очень важен, он определяет, когда начинать общее выполнение Observable. Поскольку метод connect() выполняет source.subscribe(subject) под капотом, он возвращает Subscription, от которой вы можете отказаться, чтобы отменить совместное выполнение Observable.

подсчет ссылок

Вызов вручную Connect () и обработки подписки часто слишком громоздки. Как правило, мы хотим автоматически подключаться, когда приходит первый наблюдатель, и мы хотим автоматически отменить общее выполнение, когда последний наблюдатель отписывается.

Рассмотрим следующий пример. В следующем списке показано, что происходит с подписками:

  1. Первый наблюдатель подписывается на многоадресную рассылку Observable.
  2. Multicast Observable подключен
  3. следующее значение 0 отправляется первому наблюдателю
  4. Второй наблюдатель подписывается на мультикаст Observable
  5. следующее значение 1 отправляется первому наблюдателю
  6. следующее значение 1 отправляется второму наблюдателю
  7. Первый наблюдатель отписывается от многоадресной рассылки Observable
  8. следующее значение 2 отправляется второму наблюдателю
  9. Второй наблюдатель отписывается от многоадресной рассылки Observable
  10. Соединение с многоадресным Observable было прервано (базовая операция заключается в отмене подписки)

Для этого вам нужно явно вызвать connect(), код выглядит следующим образом:

var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);
var subscription1, subscription2, subscriptionConnect;

subscription1 = multicasted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
// 这里我们应该调用 `connect()`,因为 `multicasted` 的第一个
// 订阅者关心消费值
subscriptionConnect = multicasted.connect();

setTimeout(() => {
  subscription2 = multicasted.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 600);

setTimeout(() => {
  subscription1.unsubscribe();
}, 1200);

// 这里我们应该取消共享的 Observable 执行的订阅,
// 因为此后 `multicasted` 将不再有订阅者
setTimeout(() => {
  subscription2.unsubscribe();
  subscriptionConnect.unsubscribe(); // 用于共享的 Observable 执行
}, 2000);

Вместо явного вызова connect() мы можем использовать метод ConnectableObservable refCount() (счетчик ссылок), который возвращает Observable, отслеживающий количество подписчиков. Когда количество подписчиков изменяется с 0 на 1, он вызывает функцию connect(), чтобы начать совместное выполнение. Когда количество подписчиков уменьшается с 1 до 0, он полностью отписывается, останавливая дальнейшее выполнение.

Эффект refCount заключается в том, что многоадресный Observable автоматически начнет выполнение, когда появится первый подписчик, и автоматически прекратит выполнение, когда последний подписчик уйдет.

var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var refCounted = source.multicast(subject).refCount();
var subscription1, subscription2, subscriptionConnect;

// 这里其实调用了 `connect()`,
// 因为 `refCounted` 有了第一个订阅者
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

setTimeout(() => {
  console.log('observerB subscribed');
  subscription2 = refCounted.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 600);

setTimeout(() => {
  console.log('observerA unsubscribed');
  subscription1.unsubscribe();
}, 1200);

// 这里共享的 Observable 执行会停止,
// 因为此后 `refCounted` 将不再有订阅者
setTimeout(() => {
  console.log('observerB unsubscribed');
  subscription2.unsubscribe();
}, 2000);

// 执行结果:
observerA subscribed
observerA: 0
observerB subscribed
observerA: 1
observerB: 1
observerA unsubscribed
observerB: 2
observerB unsubscribed

refCount() существует только в ConnectableObservable, он возвращает Observable, а не другой ConnectableObservable.

BehaviorSubject

Одним из вариантов Subject является BehaviorSubject, который имеет понятие «текущее значение». Он содержит последнее значение, отправленное потребителю. А когда подписывается новый наблюдатель, от BehaviorSubject сразу же принимается «текущее значение».

BehaviorSubjects подходят для представления «значений во времени». Например, поток дней рождения — это Subject, а поток возрастов — BehaviorSubject.

В приведенном ниже примере BehaviorSubject инициализируется значением 0, которое получает 0 при подписке первого наблюдателя. Второй наблюдатель получает значение 2 при подписке, даже если он подписался после отправки значения 2.

var subject = new Rx.BehaviorSubject(0); // 0是初始值

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(3);

выход:

observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

ReplaySubject

ReplaySubject похож на BehaviorSubject тем, что может отправлять старые значения новым подписчикам, но также может записывать часть выполнения Observable.

ReplaySubject записывает несколько значений из выполнения Observable и воспроизводит их новым подписчикам.

var subject = new Rx.ReplaySubject(3); // 为新的订阅者缓冲3个值

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5); 

В дополнение к объему буфера вы также можете указать время окна (в миллисекундах), чтобы определить, как давно может быть записано значение. В приведенном ниже примере мы используем больший объем кэша, равный 100, но для параметра времени окна установлено значение только 500 миллисекунд.

var subject = new Rx.ReplaySubject(100, 500 /* windowTime */);

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

var i = 1;
setInterval(() => subject.next(i++), 200);

setTimeout(() => {
  subject.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 1000);

// 输出:
observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerA: 5
observerB: 3
observerB: 4
observerB: 5
observerA: 6
observerB: 6
...

AsyncSubject

AsyncSubject — это еще один вариант Subject, который будет отправлять последнее значение выполнения наблюдателю только после завершения выполнения Observable (выполнение complete()).

var subject = new Rx.AsyncSubject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);
subject.complete();

выход:

observerA: 5
observerB: 5

AsyncSubject похож на оператор last() в том, что он также ожидает полного уведомления для отправки одного значения.

Операторы

尽管 RxJS 的根基是 Observable,但最有用的还是它的操作符。 Операторы — это основные единицы кода, которые позволяют легко составлять сложный асинхронный код в декларативной манере.

Что такое оператор?

Операторы — это методы типа Observable, такие как .map(...), .filter(...), .merge(...) и т. д. Когда операторы вызываются, они не изменяют существующие экземпляры Observable. Вместо этого они возвращают новый Observable, логика подписки которого основана на первом Observable.

Операторы — это функции, которые создают новый Observable на основе текущего Observable. Это операция без побочных эффектов: предыдущий Observable остается неизменным.

Оператор — это, по сути, чистая функция, которая принимает Observable в качестве входных данных и создает новый Observable в качестве вывода. Подписка на выходной Observable также подписывается на входной Observable. В следующем примере мы создаем пользовательскую операторную функцию, которая умножает каждое значение, полученное из входных данных Observable, на 10:

function multiplyByTen(input) {
  var output = Rx.Observable.create(function subscribe(observer) {
    input.subscribe({
      next: (v) => observer.next(10 * v),
      error: (err) => observer.error(err),
      complete: () => observer.complete()
    });
  });
  return output;
}

var input = Rx.Observable.from([1, 2, 3, 4]);
var output = multiplyByTen(input);
output.subscribe(x => console.log(x));

выход:

10
20
30
40

Обратите внимание, что подписка на вывод приводит к подписке и на вход Observable. Мы называем это «цепочкой подписки оператора».

Операторы экземпляра против статических операторов

Что такое операторы экземпляра? - Обычно, когда мы говорим об операторах, мы имеем в виду операторы экземпляра, которые являются методами экземпляров Observable. Например, если бы приведенный выше оператор MultipleByTen был официальным оператором экземпляра, это выглядело бы примерно так:

Rx.Observable.prototype.multiplyByTen = function multiplyByTen() {
  var input = this;
  return Rx.Observable.create(function subscribe(observer) {
    input.subscribe({
      next: (v) => observer.next(10 * v),
      error: (err) => observer.error(err),
      complete: () => observer.complete()
    });
  });
}

Операторы экземпляра — это функции, которые используют ключевое слово this для ссылки на входной Observable.

Обратите внимание, что входной Observable больше не является параметром функции, теперь это объект this. Вот как мы можем использовать такой оператор экземпляра:

var observable = Rx.Observable.from([1, 2, 3, 4]).multiplyByTen();

observable.subscribe(x => console.log(x));

Что такое статический оператор?- В дополнение к операторам экземпляра есть также статические операторы, которые напрямую привязаны к классу Observable. Статический оператор не использует внутри себя ключевое слово this, а полностью зависит от своего аргумента.

Статические операторы — это чистые функции, прикрепленные к классу Observalbe, которые обычно используются для создания Observalbe с нуля.

Наиболее распространенным типом статического оператора является так называемый оператор создания. Вместо того, чтобы преобразовывать входной Observable в выходной Observable, они принимают только ненаблюдаемые аргументы, такие как числа, и создают новый Observable.

Типичным примером статического оператора является интервальная функция. Он принимает число (не наблюдаемое) в качестве параметра и выдает наблюдаемое в качестве вывода:

var observable = Rx.Observable.interval(1000 /* 毫秒数 */);

Другим примером оператора создания является create, который широко использовался в предыдущих примерах. Щелкните здесь, чтобы просмотреть список всех статических операторов.

Однако некоторые статические операторы могут отличаться от простого создания. Некоторые операторы комбинирования могут быть статическими, например, merge, combLatest, concat и т. д. Они имеют смысл как статические операторы, потому что они принимают несколько Observables в качестве входных данных, а не только один, например:

var observable1 = Rx.Observable.interval(1000);
var observable2 = Rx.Observable.interval(400);

var merged = Rx.Observable.merge(observable1, observable2);

Для объяснения того, как работают операторы, текстовых описаний часто бывает недостаточно. Многие операторы зависят от времени, и они могут по-разному задерживать, сэмплировать, дросселировать или устранять дребезг значений. Диаграммы обычно являются более подходящим инструментом. Мраморная диаграмма — это визуальное представление того, как работает оператор, состоящее из входных Observable(s) (входными могут быть несколько Observable s), оператора и его параметров, а также выходных Observable s.

В мраморном графе время течет вправо, и граф описывает, как значения («шарики») испускаются во время выполнения Observable.

Рассеченный мрамор можно увидеть на изображении ниже.

Alt text

На всей станции документации мы широко используем мраморные диаграммы, чтобы объяснить, как работают операторы. Они также могут быть очень полезны в других контекстах, например, на доске или даже в наших модульных тестах (например, в диаграммах ASCII).