rxjs

作者:分分快三计划

订阅管理

如果没有及时退订可能会引发内存泄露,我们需要通过退订去释放资源。

1)命令式管理

const subscription = source$.subscribe(observer)
// later...
subscription.unsubscribe()

上面的管理方式,数量很少时还好,如果数量较多,将会显得十分笨拙。

2) 声明式管理

const kill1 = fromEvent(button, 'click')
const kill2 = getStreamOfRouteChanges()
const kill3 = new Subject()

const merged$ = mege(
    source1.pipe(takeUntil(kill1)),
    source2.pipe(takeUntil(kill2)),
    source3.pipe(takeUntil(kill3))
)

const sub = merged$.subscribe(observer)
// later...
sub.unsubscribe()

// 或者发出任意结束的事件
kill3.next(true)

通过 takeUntil、map 或者其他操作符组合进行管理。这样更不容易漏掉某个退订,订阅也减少了。

3)让框架或者某些类库去处理

比如 Angular 中的 async pipe,当 unmount 时会自动退订,也不用写订阅。

Observable as generalizations of functions

与主流相反,Observable不像EventEmitters,也不像Promise。在某些情况下,Observable的行为可能像EventEmitters,比如当使用RxJS的Subjects进行多途径传播时,但是大部分的情况它们都是不一样的。

考虑下面的情况:

function foo(){
  console.log('Hello');
  return 42;
}

var x = foo.call();  //  same as foo()
console.log(x);
var y = foo.call();  //  same as foo()
console.log(y)

我们期望出现下面的结果:

"Hello"
42
"Hello"
42

当使用Observables时:

var foo = Rx.Observable.create(function (observer){
  console.log('Hello');
  observer.next(42);
});

foo.subscribe(function (x){
  console.log(x);
});
foo.subscribe(function (y){
  console.log(y);
})

它们有着同样地输出:

"Hello"
42
"Hello"
42

之所以出现这种情况是因为function和Observables都是延迟(lazy)计算的。如果你不调用function,console.log('Hello')这段代码是不会执行的。Observables是同样的,如果你不执行(subscribe)它,代码也不会执行。“calling”和"subscribing"都是一个独立的操作:两个function分别导致两个结果,两个Observale subscribes trigger也会分别导致两个结果。这与EventEmitters截然相反,EventEmitters会共享结果,并且它执行的时候也不会顾忌到底是否有subscribers存在,Observables不会是共享结果,并且也是延迟执行。

Observable (可观察对象)

Observables 是多个值的惰性推送集合

Observable是RxJS的核心概念之一.它实际上就是可以被外界观察的一个对象.当本身的状态发生变化时,就会将其变化推送给外界观察它的对象,也就是 观察者对象.同时因为Observables是多个值的惰性推送集合所以只有当使用一个观察者对象去订阅了它之后.它才会同步或异步地返回零到(有可能的)无限多个值.下面是使用RxJS创建一个Observable的方式

var observable = Rx.Observable.create(function subscribe(observer) {
  var id = setInterval(() => {
    observer.next('hi')
  }, 1000);
});

上面实例创建了一个 Observable,它每隔一秒会向观察者发送字符串 'hi'.

操作符和数组方法

Observable 的操作符和数组的方法有相似之处,但是也有很大的不同,体现在以下两点:

  1. 延迟运算
  2. 渐进式取值

延迟运算,我们之前有讲到过,就是只有订阅后才会开始对元素进行运算。

因为 Observable 是时间上的集合,操作符不是像数组方法那样运算完所有元素再返回交给下一个方法,而是一个元素一直运算到底,就像管道中的水流一样,先发出的数据先经过操作符的运算。

Subscribing一个Observable像调用一个函数一样,当一个数据被传递时提供一个回调

这个addEventListener/removeEventListener这样的API完全不一样。observable.subscribe作为一个给定的观察者,在Observable中并没有像listener一样被注册。Observable甚至不需要维护一系列的Observers。

Schedulers (调度器)

什么是调度器? - 调度器控制着何时启动 subscription 和何时发送通知。它由三部分组成:

  • 调度器是一种数据结构。 它知道如何根据优先级或其他标准来存储任务和将任务进行排序。
  • 调度器是执行上下文。 它表示在何时何地执行任务(举例来说,立即的,或另一种回调函数机制(比如 setTimeout 或 process.nextTick),或动画帧)。
  • 调度器有一个(虚拟的)时钟。 调度器功能通过它的 getter 方法 now() 提供了“时间”的概念。在具体调度器上安排的任务将严格遵循该时钟所表示的时间。
    调度器可以让你规定 Observable 在什么样的执行上下文中发送通知给它的观察者。

轮询中的错误处理

interval(10000).pipe(
  switchMap(() => from(axios.get(url))),
  catchError(err => EMPTY)
).subscribe(data => render(data))

上面的代码,每隔 10s 去发送一个请求,当某个请求返回出错时,返回空的 Observable 而不渲染数据。这样处理貌似正确,但是实际上某个请求出错时,整个 Observable 终结了,因此轮询就结束了。为了保持轮询,我们需要进行隔离,把错误处理移到 switchMap 内部进行处理。

interval(10000).pipe(
  switchMap(() => from(axios.get(url)).pipe(
    catchError(err => EMPTY)
  ))
).subscribe(data => render(data))

创建一个Observables

Rx.Observable.create是Observable构造器的一个别名,他需要一个参数:一个subscribe函数

下面的例子创建一个Observable,它的作用是每秒钟输出字符串'hi':

var observable = Rx.Observable.create(function subscrite(observer){
  var id = setInterval(() => {
    observer.next('hi')
  }, 1000);
});

Observables可以使用create创建,但是我们经常使用creation operators,诸如from,interval等。

在上面的例子中,subscribe函数是描述Observable最重要的一部分,让我们来看看subscribing是什么意思。

Observer (观察者)

什么是观察者? - 观察者是由 Observable 发送的值的消费者。观察者只是一组回调函数的集合,每个回调函数对应一种 Observable 发送的通知类型:next、error 和 complete 。

简单来说,Observer就是使用Observable发送出来值的一个方法集合.当一个Observable发送出来值之后由Observer来决定如何的去使用它.而使用的方式就是通过回调函数.将Observable发送出来的值作为参数传入其中.让后在内部去使用.同时根据Observable发送出来的值不同.其调用的回调函数也不同.分别有next(下一步),error(报错),complete(结束).下面是使用Observer的方法:

observable.subscribe(observer);

要使用观察者,需要把它提供给 Observable 的 subscribe 方法

异步 API:

异步编程时不仅要面对这些问题,还有下面这些使用方式各异的 API:

  • DOM Events
  • XMLHttpRequest
  • fetch
  • WebSocket
  • Service Worker
  • setTimeout
  • setInterval
  • requestAnimationFrame

而如果使用 RxJS,可以用统一的 API 来进行处理,而且借助 RxJS 各种强大的操作符,我们可以更简单地实现我们的需求。

Observables可以同步或异步地传递一个值

Observable和function的不同是什么呢?随之时间的流逝,Observables可以“返回”多个值,函数是不可以的。你不可以这么做:

function foo(){
  console.log('Hello');
  return 42;
  return 100;  //  不会执行到这儿
}

函数只能返回一次,Observables可以做到返回多次:

var foo = Rx.Observable.create(function (observer){
  console.log('Hello');
  observer.next(42);
  observer.next(100);  //  "return another value"
  observer.next(200);  //  "return" yet another
});

console.log('before');
foo.subscribe(function (x){
  console.log(x);
});
console.log('after');

同步输出:

"before"
"Hello"
42
100
200
"after"

你也可以异步返回:

var foo = Rx.Observable.create(function (observer){
  console.log('Hello');
  observer.next(42);
  observer.next(100);
  observer.next(200);
  setTimeout(() => {
    observer.next(300);  //  异步
  }, 1000);
});

console.log('before');
foo.subscribe(function(x){
  console.log(x);
});
console.log('after');

输出:

"before"
"Hello"
42
100
200
"after"
300

结论:

  • func.call()表示“同步给我一个数据”
  • observable.subscribe()表示“给我任何数量的值,同步或者异步”
RxJS核心概念
  • Observable (可观察对象): 表示一个概念,这个概念是一个可调用的未来值或事件的集合。
  • Observer(观察者): 一个回调函数的集合,它知道如何去监听由 Observable 提供的值。
  • Subscription (订阅): 表示 Observable 的执行,主要用于取消 Observable 的执行。

  • Operators (操作符): 采用函数式编程风格的纯函数 (pure function),使用像 map、filter、
    concat、flatMap 等这样的操作符来处理集合。

  • Subject (主体): 相当于 EventEmitter,并且是将值或事件多路推送给多个 Observer 的唯一方式。
  • Schedulers (调度器): 用来控制并发并且是中央集权的调度员,允许我们在发生计算时进行协调,例如 setTimeout 或 requestAnimationFrame 或其他。

创建 Observable

要创建一个 Observable,只要给 new Observable 传递一个接收 observer 参数的回调函数,在这个函数中去定义如何发送数据。

import { Observable } from 'rxjs';

const source$ = new Observable(observer => {
  observer.next(1)
  observer.next(2)
  observer.next(3)
})

const observer = {
  next : item => console.log(item)
}

console.log('start')
source$.subscribe(observer)
console.log('end')

上面的代码通过 new Observable 创建了一个 Observable,调用它的 subscribe 方法进行订阅,执行结果为依次输出 'start',1,2,3,'end'。

下面我们再看一个异步的例子:

import { Observable } from 'rxjs';

const source$ = new Observable(observer => {
  let number = 1
  setInterval(() => {
    observer.next(number  )
  }, 1000)
})

const observer = {
  next : item => console.log(item)
}

console.log('start')
source$.subscribe(observer)
console.log('end')

先输出 ’start' 、'end',然后每隔 1000 ms 输出一个递增的数字。

通过这两个小例子,我们知道 RxJS 既能处理同步的行为,也能处理异步的。

subscribing to Observables

Observable的observable可以被订阅(subscribed),就像这样:

observable.subscribe(x => console.log(x));

observable.scribe和Observable.create(function subscribe(observer) {...})中的subscribe有着相同的名字并不是巧合。在库中,它们是不同的,但是在实际的用途中你可以在逻辑上把他们想成相同的。

同样的Observable被多个Observers监听时,它们是不共享的。

本文结构:

from 方法

上面的代码用 from 则是这样:

import {from} from 'rxjs'
const source$ = from([1, 2, 3])

from 可以将可遍历的对象(iterable)转化为一个 Observable,字符串也部署有 iterator 接口,所以也支持。

from 还可以根据 promise 创建一个 Observable。我们用 fetch 或者 axios 等类库发送的请求都是一个 promise 对象,我们可以使用 from 将其处理为一个 Observable 对象。

Multicasted Observables

一个"multicasted Observable"的实现是通过Subject的多个订阅(subscribers)来实现的,然而一个"unicast Observable"仅仅只通知一个单一的Observer。

在后台,multicast是这样操作的:Observers订阅(subscribe)一个相关的Subject,Subject订阅一个Observable源。

var source = Rx.Observable.from([1, 2, 3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);

// These are, under the hood, `subject.subscribe({...})`:
multicasted.subscribe({
  next: (v) => console.log('observerAa: '   v)
});
muticasted.subscribe({
  next: (v) => console.log('observerB: '   v)
});

// This is, under the hood, `source.subscribe(subject)`:
muticasted.connect();
纯净性

先看反面例子:

var count = 0;
var button = document.querySelector('button');
button.addEventListener('click', () => console.log(`Clicked ${  count} times`));

count作为一个全局变量,污染了全局环境,把应用状态搞的一团糟

下面是正面例子:

var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
  .scan(count => count   1, 0)
  .subscribe(count => console.log(`Clicked ${count} times`));

scan 操作符的工作原理与数组的 reduce 类似。
每次回调函数运行后的返回值会作为下次回调函数运行时的参数.

RxJS 要点

RxJS 有一个核心和三个重点,一个核心是 Observable 再加上相关的 Operators,三个重点分别是 Observer、Subject、Schedulers。

Pull和Push

PullPush是关于数据提供者和数据消费者交互的两个不同的协议。

什么是Pull?在Pull系统中,当Consumer收到Producer的数据时,它会自己判断是否接收该数据,Producer自己并不知道数据将交给哪个Consumer。

所有的JavaScript函数都是一个Pull系统。函数是一个数据提供者,调用函数的代码是一个consuming(消费者),它将函数返回的值"pulling"出来。

ES2015介绍了generator functions and iterators (function*),它们是另外一种Pull系统。iterator.next() 是Consumer,它从iterator(Producer)中"pulling"出多个值

Producer Consumer
Pull 被动:当需要时产生数据 主动:决定是否接收数据
Push 主动:自己决定将数据传给谁 被动:响应式接收数据

什么是Push?在Push系统中,Producer决定将数据发往哪个Consumer。Consumer并不知道它自己的值来自哪个Producer

Promise是最常见的一个Push系统。一个Promise(Producer)分发一个结果值给注册的接口(Consumer),与函数不同的是,Promise当遇到值被"push"给callback时,他会保证它传递的对象是正确的。

RxJS介绍了Observables,它是一个新的Push系统。Observable是一个提供多值的Producer,将它们"pushing"给Observers(Consumer)

  • Function:计算并同步调用一个值
  • generator:计算并同步调用多个值
  • Promise:计算后可能(不可能)返回一个值
  • Observable:计算然后同步或异步返回一个或多个值
Operators (操作符)

操作符是 Observable 类型上的方法,比如.map(...)、.filter(...)、.merge(...),等等。当操作符被调用时,它们不会改变已经存在的Observable实例。相反,它们返回一个新的 Observable ,它的 subscription 逻辑基于第一个 Observable

操作符是函数,它基于当前的 Observable 创建一个新的 Observable。这是一个无副作用的操作:前面的 Observable 保持不变。

就本质上而言Operators就是一个纯粹的函数.它可以接收一个 Observable 作为输入.并在经过内部的一系列处理后返回一个新的Observable作为输出.流向下一个操作.

empty、throwError、never

empty 是创建一个立即完结的 Observable,throwError 是创建一个抛出错误的 Observable,never 则是创建一个什么也不做的 Observable(不完结、不吐出数据、不抛出错误)。这三个操作符单独用时没有什么意义,主要用来与其他操作符进行组合。目前官方不推荐使用 empty 和 never 方法,而是推荐使用常量 EMPTY 和 NEVER(注意不是方法,已经是一个 Observable 对象了)。

第一个例子

正常注册一个事件监听函数:

var button = document.querySelector('button');
button.addEventListener('click', () => console.log('Clicked!'));

使用RxJS,你可以创建一个observable来代替:

var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
  .subscrible(() => console.log('Clicked!'));
  • 什么是RxJS
  • RxJS有什么特点
  • RxJS核心概念


不要 Rx 一切

不要过度使用 Rx,它比较适合以下场景:

  • 组合事件时
  • 增加延迟和控制频率
  • 组合异步任务
  • 需要取消时

简单的应用并不需要 RxJS。

什么是Operators?

Opeartors是Obsrevable的方法,就像map(),filter(),merge()等。当它被调用时,它们并不改变已经存在的Observable,而是返回一个基于第一个Observable上新的Observable。

一个Operator本质上是一个纯函数,它接收一个Observable,基于其上返回一个新的Observable。在下面的例子中,我们创建了一个自定义的operator方法:

function multiplayByTen(input){
  var output = Rx.Observable.create(function subscribe(observer){
    input.subscribe({
      next: v => observer.next(10 * v),
      error: err => observer.error(err),
      complete: () => observer.complete()
    });
  });
return output;
}

var input = Rx.Observable.from([1, 2, 3 ,4]);
var output = multiplayByTen(input);
output.subscribe(x => console.log(x));

输出为:

10
20
30
40

注意订阅(subscribe)的输出将导致输入的Observable可观测的变化。我们称这个为"operator subscription chain"。

什么是RxJS

在javaScript中,我们可能会经常接触到类似于回调函数、Promise、Gender、async函数等异步编程方式,虽然以上的方式各有各的特点,但是我们需要更加强大的特性和更加优雅的写法.因此RxJS便是我们更好的选择.

Rx.JS是英文 Reactive Extensions for JavaScript 的缩写.翻译成中文就是: JavaScript的响应式扩展.其主要的功能就是利用响应式编程的模式来实现JavaScript的异步式编程.

一个小的练习

本文中的例子基本来自30 天精通 RxJS,使用 RxJS v6 版本进行重写。

页面上有一个 p 标签存放一个状态,初始为 0,有两个按钮,一个按钮点击后这个状态增加 1,另一个按钮点击后这个状态减少 1。

<button id="addButton">Add</button>
<button id="minusButton">Minus</button>
<p id="state"></p>

这两个按钮的点击事件我们都可以建立响应式数据流,可以使用 mapTo(1) 和 mapTo(-1) 分别表示点击后增加 1 和减少 1。我们可以使用 EMPTY 创建一个空的数据流来表示这个状态,用 startWith 设定初始值。然后 merge 这两个点击的数据流,但是这还有一个问题,点击事件的数据流需要与表示状态的数据流进行逻辑计算,发出最终的状态,我们才能去订阅这个最终的数据流来更改页面的显示。而这种累计计算的方法,可以用 scan 操作符来实现。最终实现如下:

import { fromEvent, EMPTY, merge } from 'rxjs'
import { mapTo, startWith, scan } from 'rxjs/operators'

const addButton = document.getElementById('addButton')
const minusButton = document.getElementById('minusButton')
const state = document.getElementById('state')

const addClick$ = fromEvent(addButton, 'click').pipe(mapTo(1))
const minusClick$ = fromEvent(minusButton, 'click').pipe(mapTo(-1))

merge(
  EMPTY.pipe(startWith(0)),
  addClick$, 
  minusClick$)
.pipe(
  scan((origin, next) => origin   next)
).subscribe(item => {
  state.textContent = item
})

查看演示

Instance opeartors versus static operators

什么是instance operator?最常见的情况是当你引用一个opeartors时,我们假定实现了一个operator,它是Observable实例的一个方法。例如,如果multiplayByTen operator变成一个官方的operator,它看起来会是这样:

Rx.Observable.prototype.multiplyByTen = function multiplyByTen(){
  var input = this;
  return Rx.subscrible.create(function subscribe(observer){
    input.subccribe({
      next: (v) => observer.next(10 * v),
      error: (err) => observer.error(err),
      complete: () => observer.complete()
    });
  });
}

Instance operators是一个实例运算符,我们使用它来推断可观测的输入。

注意,input observable不再是一个函数参数:

var observable = Rx.Observable.from([1, 2, 3, 4]).multiplyByTen();
observable.subscribe(x => console.log(x));

什么是static operator?除了instance operators之外,static operators是直接附加在Observable类上的方法。一个static operator使用内部的this进行操作,但是并不完全依赖它的参数。

static operators是附着在Observable类上的纯函数,通常用于创建Observable

最常见的static operators类型是Create Operators,他不是将一个Observable改变成另外一个Observable,它们简单的获得一个non-Observable参数,比如number,然后create一个新的Observable。

一个典型的例子是使用interval函数。它获取一个数值(不是一个Observable)作为输入参数,然后输出一个Observable:

var observable = Rx.Observable.interval(1000 /* number of milliseconds */)

创建一个creation operaor的另外一个例子是create,就是我们之前一直在使用的例子。 all static creation operators here

然而,static operators也许和普通的creation性质不同。一些Combination Operator也许是静态的,比如merge、combineLatest、concat等。将这些作为静态是有意义的,因为它们把multiple Observales作为输入,不只是一个,比如:

var observable1 = Rx.Observable.interval(1000);
var observable2 = Rx.Observable.interval(400);

var merged = Rx.Observable.merge(observable1, observable2);
Subscription (订阅)

什么是 Subscription? Subscription 是表示可清理资源的对象,通常是 Observable 的执行。Subscription 有一个重要的方法,即 unsubscribe,它不需要任何参数,只是用来清理由 Subscription 占用的资源。在上一个版本的 RxJS 中,Subscription 叫做 "Disposable" (可清理对象)。
  Subscription(订阅)是使用observable.subscribe()创建一个观察者对象时.所返回的一个对象.它主要就是使用unsubscribe() 函数主动关闭Observer对Observable的监听订阅.其使用方法如下:

var observable = Rx.Observable.interval(1000);
var subscription = observable.subscribe(x => console.log(x));
// 稍后:
// 这会取消正在进行中的 Observable 执行
// Observable 执行是通过使用观察者调用 subscribe 方法启动的
subscription.unsubscribe();

of 方法

之前我们写的这种形式:

const source$ = new Observable(observer => {
  observer.next(1)
  observer.next(2)
  observer.next(3)
  observer.complete()
})

使用 of 方法将会非常简洁:

import {of} from 'rxjs'
const source$ = of(1, 2, 3)

Scheduler

什么是Scheduler?当一个subscription开始工作或者notifications被传递,scheduler就会开始调图。它包含三个组件。

  • 一个Scheduler是一个数据结构(data structure)。它知道如何基于优先级或者其它标准进行存储,执行队列任务
  • 一个Scheduler是一个执行上下文(execution context)。它表示task在哪个地方,什么时候执行()
  • 一个Scheduler是一个(虚拟(virtual))时钟。它基于scheduler上的getter方法now(),提出了一个"时间(time)"的概念。任务被安排在一个特殊的调度器中,它会遵守给它的时间。

看下面例子中,我们使用之前已经写过的例子,同步传递数值1、2、 3,然后使用observerOn操作符来指定异步调度:

var observable = Rx.Observable.create(function (observer) {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    observer.complete();
})
.observerOn(Rx.Scheduler.async);

console.log('just before subscribe');
observable.subscribe({
    next: x => console.log('got value '   x),
    error: err => console.error('something wrong occurred: '   err),
    complete: () => console.log('done')
});
console.log('just after subscribe');

输出:

just before subscribe
just after subscribe
got value 1
got value 2
got value 3
done

注意got value这个语句实在 just after subscribe只有打印输出的,这和我们看到的代码顺序不一致。这时因为 observerOn(Rx.Scheduler.async)在Observable.create和最后一个Observer之间引入了一个代理的Observer。让我们重新为一些标识符取名,以便让他们之间有着明显的差别:

var observable = Rx.Observable.create(function (proxyObserver) {
    proxyObserver.next(1);
    proxyObserver.next(2);
    proxyObserver.next(3);
    proxyObserver.complete();
})
    .observeOn(Rx.Scheduler.async);

var finalObserver = {
    next: x => console.log('got value '   x),
    error: err => console.error('something wrong occurred: '   err),
    complete: () => console.log('done')
};

console.log('just before subscribe');
observable.subscribe(finalObserver);
console.log('just after subscribe');

proxyObserver在observeOn(Rx.Scheduler.async)中创建,它的next(val)方法大概像下面这样:

var proxyObserver = {
  next: (val) => {
    Rx.Scheduler.async.schedule(
      (x) => finalObserver.next(x),
      0 /* delay */,
      val /* will be the x for the function above */
    );
  },

  // ...
}

这有点儿像setTimeout或者setInterval是异步调度操作,即使给定的delay为0。按照惯例,在JS中,setTimeout(fn, 0)知道运行fn函数的时机最早是下一次循环队列初。这也说明了为什么 got value 1是最后运行的。

可以给Scheduler的schedule传递一个延时(delay)参数,它可以让Scheduler内部的时钟去延时到指定时间。Scheduler的时钟和真实的时钟没有任何关系。它更类似于延时,而不是运行指定的时间。

流动性 (Flow)

反面例子:

var count = 0;
var rate = 1000;
var lastClick = Date.now() - rate;
var button = document.querySelector('button');
button.addEventListener('click', () => {
  if (Date.now() - lastClick >= rate) {
    console.log(`Clicked ${  count} times`);
    lastClick = Date.now();
  }
});   //实现控制一秒钟内最多点击一次

正面教材:

var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
  .throttleTime(1000)
  .scan(count => count   1, 0)
  .subscribe(count => console.log(`Clicked ${count} times`));

这下觉得RxJS 用起来挺清爽的吧

其他一些操作符

1) repeat

repeat 用来重复上游 Observable

2)pluck 类似 lodash 的方法 pluck,提取对象的嵌套属性的值。

const click$ = fromEvent(document, 'click')
const tagName$ = click$.pipe(pluck('target', 'tagName'))
tagName$.subscribe(x => console.log(x))

等价于:

click$.pipe(map(e => e.target.tagName))

3)toArray

将发出的数据汇聚为数组

interval(1000).pipe(
  take(3),
  toArray()
).subscribe(x => console.log(x))
// [0, 1, 2]

4)partition

将上游的 Observable 分为两个,一个 Observable 的数据是符合判定的数据,另一个时不符合判定的数据。

const part$ = interval(1000).pipe(
  take(6),
  partition(x => x % 2 === 0)
)

part$[0].subscribe(x => console.log(x)) // 0, 2, 4
part$[1].subscribe(x => console.log(x)) // 1, 3, 5

5) 更多操作符

RxJS 中的操作符非常多,这里只介绍了一部分,更多请查看官网 API。

Subscribing一个Observable就像调用一个函数一样

一些人要求Observables是异步的,这是不正确的。看下面这个例子:

console.log('before');
console.log(foo.call());
console.log('after');

你将会看到这样的输出:

"before"
"Hello"
42
"after"

使用Observables

console.log('before');
foo.subscribe(function(x) {
  console.log(x);
});
console.log('after');

输出是:

"before"
"Hello"
42
"after"

这证明了foo的订阅是一个完完全全的异步,就像一个函数一样。

RxJS有什么特点

根据官方文档的介绍:

先写个简单的例子,注册事件监听器(事件代理):

var button = document.querySelector('button');
button.addEventListener('click', () => console.log('Clicked!'));

咱们用RxJS来实现这个功能(必须要引入Rxjs):

var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
  .subscribe(() => console.log('Clicked!'));

以上代码大家应该是能看懂的,大概解释一下. Rx.Observable.fromEvent()相当于创建了一个可观察对象Observable,也就是监听的代理对象,subscribe是这个对象的一个方法,该方法返回这个监听的事件,subscribe方法的参数是对观察对象返回值做出下一步操作(回调函数).

接下来介绍RxJS的特点:

Hot Observable 和 Cold Observable

先思考一下下面的例子结果是什么?

const source$ = interval(1000).pipe(
  take(3)
)

source$.subscribe(x => console.log('Observer 1: '   x))

setTimeout(() => {
  source$.subscribe(x => console.log('Observer 2: '   x))
}, 1000)

你可能会以为 Observer 2 一秒后才订阅,错过了数据 0,因此只会输出 1 和 2,但实际上会先输出 0。为什么如此呢?这就涉及到对已错过数据的两种处理策略。

  1. 错过的就让它过去,只要订阅之后生产的数据就好
  2. 不能错过,订阅之前生产的数据也要

第一种策略类似于直播,第二种和点播相似。使用第一种策略的 Observable 叫做 Cold Observable,因为每次都要重新生产数据,是 “冷”的,需要重新发动。第二种,因为一直在生产数据,只要使用后面的数据就可以了,所以叫 Hot Observable。

RxJS 中如 interval、range 这些方法产生的 Observable 都是 Cold Observable,产生 Hot Observable 的是由 Promise、Event 这些转化而来的 Observable,它们的数据源都在外部,和 Observer 无关。

前面我们提到 Observable 都是 lazy evaluation 的,数据管道内的逻辑只有订阅后才会执行,但是 Cold Observable 相对更 lazy 一些。Cold Observable 如果没有订阅者连数据都不会产生,对于 Hot Observable,数据仍会产生,但是不会进入管道处理。

Hot Observable 是多播,对于 Cold Observable,每次订阅都重新生产了一份数据流,所以不是多播。下面的例子更加明显,两个订阅者有很大的概率会接收到不同的数据。

const source$ = interval(1000).pipe(
  map(x => Math.floor(Math.random() * 10)),
  take(3)
)

source$.subscribe(x => console.log('Observer 1: '   x))

setTimeout(() => {
  source$.subscribe(x => console.log('Observer 2: '   x))
}, 1000)

如果想要实现多播,就要使用 RxJS 中 Subject。

介绍

RxJS是一个异步编程的库,同时它通过observable序列来实现基于事件的编程。它提供了一个核心的类型:Observable,几个辅助类型(Observer,Schedulers,Subjects),受到Array的扩展操作(map,filter,reduce,every等等)启发,允许直接处理异步事件的集合。

ReactiveX结合了Observer模式、Iterator模式和函数式编程和集合来构建一个管理事件序列的理想方式。

在RxJS中管理异步事件的基本概念如下:

  • Observable:代表了一个调用未来值或事件的集合的概念
  • Observer:代表了一个知道如何监听Observable传递过来的值的回调集合
  • Subscription:代表了一个可执行的Observable,主要是用于取消执行
  • Operators:是一个纯函数,允许处理集合与函数式编程风格的操作,比如map、filter、concat、flatMap等
  • Subject:相当于一个EventEmitter,它的唯一的方法是广播一个值或事件给多个Observer
  • Schedulers:是一个集中式调度程序来控制并发性,允许我们在setTimeout或者requestAnimationFrame上进行协调计算
Subject (主体)

什么是 Subject? - RxJS Subject 是一种特殊类型的 Observable,它允许将值多播给多个观察者,所以 Subject 是多播的,而普通的Observables是单播的(每个已订阅的观察者都拥有 Observable 的独立执行)。

   `Subject` 像是 `Observalbe`,但是可以多播给多个观察者。`Subject` 还像是` EventEmitters`,维护着多个监听器的注册表。

每一个Subject都同时是一个ObservableObserver.对于Subject你可以使用subscribe方法并指定一个观察者.也可以调用next(v)、error(e)complete()来处理接受道到值.示例如下:

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: '   v)
});
subject.subscribe({
  next: (v) => console.log('observerB: '   v)
});

subject.next(1);
subject.next(2);

在上面的示例中,我们为 Subject 添加了两个观察者,然后给 Subject 提供一些值

RxJS 与 Async Iterator

Async Iterator 提案已经进入了 ES2018,可以认为是 iterator 的异步版本。在 Symbol 上部署了 asyncIterator 的接口,不过它的 next 方法返回的是 { value, done } 对象的 Promise 版本。可以使用 for-await-of 进行迭代:

for await (const line of readLines(filePath)) {
  console.log(line)
}

使用 Async Iterator 我们可以很容易实现类似 RxJS 操作符的功能:

const map = async function*(fn) {
  for await(const value of this) yield fn(value)
}

其他如 fromEvent 等也比较容易实现。Async Iterator 扩展库 axax 的一个例子:

import { fromEvent } from "axax/es5/fromEvent";

const clicks = fromEvent(document, 'click');

for await (const click of clicks) {
    console.log('a button was clicked');
}

下面是 Benjamin Gruenbaum 用 Async Iterator 实现 AutoComplete 的一个例子:

let tooSoon = false, last;
for await (const {target: {value}} of fromEvent(el, "keyup")) {
  if(!value || tooSoon) continue;
  if(value === last) continue;
  last = value;
  yield await fetch("/autocomplete/"   value); // misses `last` 
  tooSoon = true;
  delay(500).then(() => tooSoon = false);
}

Async Iterator 相比 RxJS,没有那么多概念,上手快,也比较容易扩展实现那些操作符。

从数据消费者的角度上看,RxJS 是 push stream,由生产者把数据推送过来,Async Iterator 是 pull stream,是自己去拉取数据。

纯粹

使得RxJS变得如此强大的原因是它使用了纯函数,这意味着你的代码很少会发生错误。

正常你不会创建一个纯函数,代码的其他部分可能扰乱你的状态。

var count = 0;
var button = document.querySelector('button');
button.addEventListener('click', () => console.log(`Clicked $(  count) times`));

RxJS将隔离你的状态

var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
  .scan(count => count   1, 0)
  .subscribe(count => console.log(`Clicked ${count} items`));

scan操作符类似于arrays的reduce操作符。它需要一个回调函数作为一个参数,函数返回的值将作为下次调用时的参数。

参考链接

博客:30 天精通 RxJS

书:深入浅出RxJS

视频:RxJS 5 Thinking Reactively | Ben Lesh

Reference counting

手动的调用connect()来处理Subscription是很麻烦的。通常,我们希望当第一个Observer到达时,能够自动connect,当最后一个Observer被移除时,自动取消shared execution。

看看下面这些订阅发生时的列表:

  1. 第一个Observer订阅multicasted Observable
  2. multicasted observable连接
  3. next value 0被传递给第一个Observer
  4. 第二个Observer订阅multicasted Observable
  5. next value 1被传递给第一个Observer
  6. next value 1被传递给第二个Observer
  7. 第一个Observer解除监听
  8. next value2被传递给第二个Observer
  9. 第二个Observer解除监听
  10. 与multicasted observable连接的Observable解除连接

看下面的代码:

var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);
var subscription1, subscription2, subscriptionConnect;

subscription1 = multicasted.subscribe({
  next: (v) => console.log('observerA: '   v)
});
// We should call `connect()` here, because the first
// subscriber to `multicasted` is interested in consuming values
subscriptionConnect = multicasted.connect();

setTimeout(() => {
  subscription2 = multicasted.subscribe({
    next: v => console.log('observerB: '   v)
  });
}, 600);

setTimeout(() => {
  subscrption1.unscribe();
}, 1200);

// We should unsubscribe the shared Observable execution here,
// because `multicasted` would have no more subscribers after this
setTimeout(() => {
  subscription2.unsubscribe();
  subscriptionConnect.unsubscribe(); // for the shared Observable execution
}, 2000);

如果我们希望避免一遍遍地调用connect(),我们可以使用ConnectableObservable的refCount()方法(reference counting),它返回一个Observable来跟踪有多少个订阅者(subscribers)。当订阅者从0增加到1时,它将自动调用connect(),只有当订阅者从1变为0时,它才会disconnect。

看下面的例子:

var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var refCounted = source.multicast(subject).refCount();
var subscrption1, subscription2, subscriptionConnect;


// This calls `connect()`, because
// it is the first subscriber to `refCounted`
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
  next: (v) => console.log('observerA: '   v);
});

setTimeout(() => {
  console.log('observerB subscribed');
  subscription2 = refCounted.subscribe({
    next: (v) => console.log('observerA: '   v)
  });
}, 600);

setTimeout(() => {
  console.log('observerA unsubscribed');
  subscription1.unsubscribe();
}, 1200);

// This is when the shared Observable execution will stop, because
// `refCounted` would have no more subscribers after this
setTimeout(() => {
  console.log('observerB unsubscribed');
  subscription2.unsubscribe();
}, 2000);

执行结果:

observerA subscribed
observerA: 0
observerB subscribed
observerA: 1
observerB: 1
observerA unsubscribed
observerB: 2
observerB unsubscribed

refCount()方法仅存在ConnectableObservable中,它返回一个Observable,而不是另外的ConnectableObservable。

操作符

在 RxJS 中,操作符是用来处理数据流的。我们往往需要对数据流做一系列处理,才交给 Observer,这时一个操作符就像一个管道一样,数据进入管道,完成处理,流出管道。

import { interval } from 'rxjs';
import { map } from 'rxjs/operators'

const source$ = interval(1000).pipe(
  map(x => x * x)
)

source$.subscribe(x => console.log(x))

interval 操作符创造了一个数据流,interval(1000) 会产生一个每隔 1000 ms 就发出一个从 0 开始递增的数据。map 操作符和数组的 map 方法类似,可以对数据流进行处理。具体见演示地址。

这个 map 和数组的 map 方法会产生新的数组类似,它会产生新的 Observable。每一个操作符都会产生一个新的 Observable,不会对上游的 Observable 做任何修改,这完全符合函数式编程“数据不可变”的要求。

上面的 pipe 方法就是数据管道,会对数据流进行处理,上面的例子只有一个 map 操作符进行处理,可以添加更多的操作符作为参数。

operators的分类

参考官网:Categories of operators

简单拖拽

页面上有一个 id 为 drag 的 div:

<div id="drag"></div>

页面 css:

html, body {
  height: 100%;
  background-color: tomato;
  position: relative;
}

#drag {
  position: absolute;
  width: 100px;
  height: 100px;
  background-color: #fff;
  cursor: all-scroll;
}

要实现的功能如下:

  1. 当在这个 div 上按下鼠标左键(mousedown)时,开始监听鼠标移动(mousemove)位置
  2. 当鼠标松开(mouseup)时,结束监听鼠标移动
  3. 当鼠标移动被监听时,更新 div 样式来实现拖拽效果

实现思路:

  1. 我们可以使用 fromEvent 去转化 DOM 事件

    const mouseDown$ = fromEvent(eleDrag, 'mousedown')
    const mouseMove$ = fromEvent(eleBody, 'mousemove')
    const mouseUp$ = fromEvent(eleBody, 'mouseup')
    
  2. 对于鼠标按下这个数据流,每次鼠标按下事件发生时都转成鼠标移动的数据流

    mouseDown$.pipe(
      map(mouseDownEvent => mouseMove$)
    )
    
  3. 鼠标松开时,结束监听鼠标移动,我们可以用 takeUntil 表示这个逻辑

    mouseDown$.pipe(
      map(mouseDownEvent => mouseMove$.pipe(
        takeUntil(mouseUp$)
      ))
    )
    
  4. 上面的 map 操作符内将每次 mousedown 映射为一个 Observable,形成了高阶 Observable,我们需要用 concatlAll 压平,map 和 concatAll 连用,可以用更简洁的 concatMap

    mouseDown$.pipe(
      concatMap(mouseDownEvent => mouseMove$.pipe(
        takeUntil(mouseUp$)
      ))
    )
    
  5. 订阅这个 mousemove 数据流更新 div 位置。我们可以获取 mousemove event 中的 clientX 和 clientY,减去初始鼠标按下时鼠标相对 div 元素的值来得到最终 div 的绝对位置的 left 和 top。也可以使用 withLatestFrom 操作符,见 demo。

    mouseDown$.pipe(
      concatMap(mouseDownEvent => mouseMove$.pipe(
        map(mouseMoveEvent => ({
          left: mouseMoveEvent.clientX - mouseDownEvent.offsetX,
          top: mouseMoveEvent.clientY - mouseDownEvent.offsetY
        })),
        takeUntil(mouseUp$)
      ))
    ).subscribe(position => {
      eleDrag.style.left = position.left   'px'
      eleDrag.style.top = position.top   'px'
    })
    

这里是一个更复杂一些的例子,当页面滑动到视频出页面时视频 fixed 定位,这是可以拖拽移动视频位置。通过 getValidValue 对视频拖拽的位置进行了一个限制。

Observer

什么是Observer?一个Observer是Observable传递过来的数据的customer。Observers是一个简单的一些列的回调,next、error、和 complete用来传递数据。下面的例子展现了一个典型的Observer对象:

var observer = {
  next: x => console.log('Observable got a next value: '   x),
  error: err => console.log('Observable got and error: '   err),
  complete: () => console.log('Observable got a complete notification')
};

为了使用Observalbe,提供了一个subscribe:

observable.subscribe(observer)

你也可以提供部分回调:

var observer = {
  next: x => console.log('Observer got a next value: '   x),
  error: err => console.error('Observer got an error: '   err),
};

当你订阅(subscribing)一个Observable时,你也许仅仅只提供一个函数作为参数:

observable.subscribe(x => console.log('Observer got a next value: '   x));

在observable.subscribe的内部,他将使用第一个回调创建一个Observer对象作为一个next handler。所有的callback类型都可能被提供:

observable.subscribe(
  x => console.log('Observer got a next value: '   x),
  err => console.error('Observer got an error: '   err),
  () => console.log('Observer got a complete notification')
);

Subject 的错误处理

在 RxJS 5 中,如果 Subject 的某个下游数据流产生了错误异常,而又没有被 Observer 处理,那这个 Subject 的其他 Observer 都会失败。但是在 RxJS 6 中不会如此。

在 v6 的这个例子 中,ObserverA 没有对错误进行处理,但是并不影响 ObserverB,而在 v5 这个demo中因为 ObserverA 没有对错误进行处理,使得 ObserverB 终止了。很明显 v6 的这种处理更符合直觉。

Marble diagrams

为了解释operators是如何工作的,光是文本解释是不够的。许多operators和时间有关,它们可能会延迟执行,例如,throttle等。图标往往能够比文字更多表达清楚。Marble Diagrams能够可视化的表现出operators是如何工作的,包括输入的Observable(s),operator和它的参数,以及输出的Observable

在一个marble diagram中,随着时间的流逝,它会描述值("marbles")在Observable execution上传递。

你可以在下面看到marble diagram的解析:

图片 1

Paste_Image.png

  • 时间从左往右流动,代表input Observable的execution
  • 这些代表Observable传递传来的值
  • 这个竖线表示"complete" notification,它表明Observable已经成功完成了。
  • 这个方框表示input Observable的operator(上图)产生出的output Observable(下图)。方框内的文字表示转变的属性。
  • 这个Observable是调用operator产生的
  • 这个X代表output Observable发出的错误,说明因为某些原因而异常终止。

在这个网站的站点,我们会广泛的使用marble diagrams去解释operators是如何工作的。它们也许在其他的地方也很有用,比如单元测试等。

什么是 RxJS

我们都知道 JS 是什么,那么什么是 Rx 呢?Rx 是 Reactive Extension(也叫 ReactiveX)的简称,指的是实践响应式编程的一套工具,Rx 官网首页的介绍是一套通过可监听流来做异步编程的 API(An API for asynchronous programming with observable streams)。

Rx 最早是由微软开发的 LinQ 扩展出来的开源项目,之后由开源社区维护,有多种语言的实现,如 Java 的 RxJava,Python 的 RxPY 等,而 RxJS 就是 Rx 的 JavaScript 语言实现。

Flow

RxJS有一系列的操作符来帮你监控事件将如何流动。

这是一个每秒最多点击一次的程序:

var count = 0;
var rate = 1000;
var lastClick = Date.now() - rate;
var button = document.querySelector('button');
button.addEventListener('click', () => {
  if (Date.now() - lastClick >= rate){
    console.log(`Clicked ${  count} times`);
    lastClick = Date.now();
  }
});

使用RxJS:

var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
  .throttleTime(1000)
  .scan(count => count   1, 0)
  .subscribe(count => console.log(`Clicked ${count} times`));

另外的控制符还有:filter, delay, debounceTime, take, takeUntil, distinct, distinctUntilChanged等。

弹珠图

弹珠图(Marble diagrams)就是用图例形象地表示 Observable 和各种操作符的一种方法。

用 - 表示一小段时间,X 代表有错误发生, | 表示结束,() 表示同步发生。

上面的例子可以如下表示:

source: -----0-----1-----2-----3--...
        map(x => x * x)
newest: -----0-----1-----4-----9--...

具体关于弹珠图的使用可以查看这个网站。

BehaviorSubject

Subjects的一种变形是BehaviorSubject,它有一个"the current value" 的概念。它存储了consumer最后一次执行时的value,每当一个Observer订阅时,它都会立即从BehaviorSubject接收一个"current value"。

例子:

var subject = new Rx.BehaviorSubject(0);  //  0 is the inital value

subject.subscribe({
  next: v => console.log('observerA: '   v)
});

subject.next(1);
subject.next(2);

subject.subscribe({
  next: v = console.log('observerB: '   v)
});

subject.next(3);

输出:

observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

RxJS 使用

RxJS 仓库现在移到了 ReactiveX 组织下,最新的大版本为 6,与之前的版本相比有许多破坏性变更,请注意。

RxJS 的 import 路径有以下 5 种:

  1. 创建 Observable 的方法、types、schedulers 和一些工具方法

    import { Observable, Subject, asapScheduler, pipe, of, from, interval, merge, fromEvent, SubscriptionLike, PartialObserver } from 'rxjs';

  2. 操作符 operators

    import { map, filter, scan } from 'rxjs/operators';

  3. webSocket

    import { webSocket } from 'rxjs/webSocket';

  4. ajax

    import { ajax } from 'rxjs/ajax';

  5. 测试

    import { TestScheduler } from 'rxjs/testing';

本文所有 demo 均在 v6.2.1 中测试过

Observable

Observables是一个延迟Push(关于Push的概念见后面)操作数据的集合。它们遵从下表:

Single Multiple
Pull Function Iterator
Push Promise Observable

举个例子。下面是一个Observable,当执行subscribed,它将会立即push 1、 2、 3(同步),然后过去一秒后push 4

var observable = Rx.Observable.create(function (observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000); 
});

为了调用Observable,然后看这些值,我们需要对这些数据进行订阅(subscribe)

var observable = Rx.Observable.create(function (observer){
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  })
});

console.log('just before subscribe');
observerble.subscribe({
  next: x => console.log(`got value`   x),
  error: err => console.error('somthing wrong occurred: '  err),
  complete: () => console.log('done')
});
console.log('just after subscribe');

执行结果如下:

just before subscribe
got value 1
got value 2
got value 3
just after sbuscribe
got value 4
done

RxJS 的两种编程思想

RxJS 引入了两种重要的编程思想:函数式编程和响应式编程。

函数式编程(Functional Programming,简称 FP)是一种编程范式,强调使用函数来思考问题、编写代码。

In computer science, functional programming is a programming paradigm—a style of building the structure and elements of computer programs—that treats computation as the evaluation of mathematical functions and avoids changing-state and mutable data.

函数式编程的主要设计点在于避免使用状态和可变的数据,即 stateless and immutable。

函数式编程对函数的使用有一些特殊要求:

  • 声明式(Declarative)
  • 纯函数(Pure Function)
  • 数据不可变性(Immutability)

声明式的函数,让开发者只需要表达”想要做什么”,而不需要表达“怎么去做”。

纯函数指的是执行结果由输入参数决定,参数相同时结果相同,不受其他数据影响,并且不会带来副作用(Side Effect)的函数。副作用指的是函数做了和本身运算返回值没有关系的事情,如修改外部变量或传入的参数对象,甚至是执行 console.log 都算是 Side Effect。前端中常见的副作用有发送 http 请求、操作 DOM、调用 alert 或者 confirm 函数等。满足纯函数的特性也叫做引用透明度(Referential Transparency)。

数据不可变就是指这个数据一旦产生,它的值就永远不会变。JavaScript 中字符串类型和数字类型就是不可改变的,而对象基本都是可变的,可能会带来各种副作用。现在有各种库可以实现 Immutable 特性,如 immutable.js 和 immer.js

中文维基上说响应式编程(Reactive Programming)是一种面向数据流(stream)和变化传播的编程范式。个人的理解是对数据流进行编程的一种编程范式,使用各种函数创建、组合、过滤数据流,然后通过监听这个数据流来响应它的变化。响应式编程抽象出了流这个概念,提高了代码的抽象级别,我们不用去关心大量的实现细节,而专注于对数据流的操作。

响应式流可以认为是随着时间发出的一系列元素。响应式和观察者模式有点相似,订阅者订阅后,发布者吐出数据时,订阅者会响应式进行处理。实际上Rx 组合了观察者模式(Observer pattern )、迭代器模式(Iterator pattern)和函数式编程。

RxJS 是上面两种编程思想的结合,但是对于它是不是函数响应式编程(FRP)有比较大的争议,因为它虽然既是函数式又是响应式但是不符合早期 FRP 的定义。

Executing observables

代码Observable.create(function subscribe(observer) {...})代表了一个"Observable execution",它将仅仅在每个Observer的subscribes的延迟计算中。随着时间的推移,将产生多个结果,同步或者异步。

Observable可以传递的有三种类型:

  • "Next" notification:传递一个数值,诸如Number、String、Object等
  • “Error” notification:传递一个js异常
  • "Complete" notification:什么值都不传递

Next notifications是最重要的也是最常见的类型:它们表示一个实际数据被送到Observer。在Observable Execute执行期间Error和Complete最多会发生一次。

下面的语法是在Observable Grammar or Contract中最好的表达:

next*(error|complete)?

在一个Observable Execute中,0或多个Next notifications可能被传递。如果有error或者Complete被传递,剩下的next将不会被传递。

下面是Observable execute传递3个Next notifications的例子:

var observable = Rx.Observable.create(function subscribe(observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
})

下面的例子中,Next notification 4不会被传递:

var observable = Rx.Observable.create(function subscribe(observer){
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
  observer.next(4);  //  不会被执行
})

用tru/catch代码快包裹起来是个好主意:

var observable = Rx.Observable.create(function subscribe(observer) {
  try {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    observer.complete();
  } catch (err) {
    observer.error(err); // delivers an error if it caught one
  }
});

什么是 Observable

个人认为在文档中说的 Observable 更确切的说法是 Observable Stream,也就是 Rx 的响应式数据流。

在 RxJS 中 Observable 是可被观察者,观察者则是 Observer,它们通过 Observable 的 subscribe 方法进行关联。

前面提到了 RxJS 结合了观察者模式和迭代器模式。

对于观察者模式,我们其实比较熟悉了,比如各种 DOM 事件的监听,也是观察者模式的一种实践。核心就是发布者发布事件,观察者选择时机去订阅(subscribe)事件。

在 ES6 中,Array、String 等可遍历的数据结构原生部署了迭代器(Iterator )接口。

const numbers = [1, 2, 3]
const iterator = numbers[Symbol.iterator]()
iterator.next() // {value: 1, done: false}
iterator.next() // {value: 2, done: false}
iterator.next() // {value: 3, done: false}
iterator.next() // {value: undefined, done: true}

观察者模式和迭代器模式的相同之处是两者都是渐进式使用数据的,只不过从数据使用者的角度来说,观察者模式数据是推送(push)过来的,而迭代器模式是自己去拉取(pull)的。Rx 中的数据是 Observable 推送的,观察者不需要主动去拉取。

Observable 与 Array 相当类似,都可以看作是 Collection,只不过 Observable 是 a collection of items over time,是随时间发出的一序列元素,所以下面我们会看到 Observable 的一些操作符与 Array 的方法极其相似。

解析一个Observable

Observables使用Rx.Observable.create或者一个构造器创建(create),使用Observer来监听(subscribed),执行(execute)是通过投递一个next/error/complete来通知其他的Observer,然后按照各自的意愿(disposed)来执行。在一个Observable实例中,这四个方面都是通过编码实现的,但是这些可能与其他的类型相关,比如Obsrever和Subscription。

Observable的核心点:

  • Creating Observables
  • Subscribing to Observables
  • Executing the Observable
  • Disposing Observables

多播

前面的例子都是只有一个订阅者的情况,实际上当然可以有多个订阅者,这就是多播(multicast),即一个数据流的内容被多个 Observable 订阅。

Opeartors

RxJS最有用的一部分就是operators,即使Observable是最基础的。Operators最基本的要点是允许复杂的代码变得简单化。

fromEvent 方法

用 DOM 事件创建 Observable,第一个参数为 DOM 对象,第二个参数为事件名称。具体示例见前面 RxJS 入门章节的一个简单例子。

ReplaySubject

功能和它的名字一样:

var subject = new Rx.ReplaySubject(3);  // buffer 3 values for new subscribers

subject.subscribe({
  next: v => console.log('observerA: '   v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: v => console.log('observerB: '   v)
});

subject.next(5);

输出:

observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5

你还可以指定一个以毫秒为单位的窗口事时间,除了buffer size之外,决定记录的值可以重复(时间内)。

var subject = new Rx.ReplaySubject(100, 500);

subject.subscribe({
  next: (v) => console.log('observerA: '   v)
});

var i = 1;
setInterval(() => subject.next(i  ), 200);

setTimeout(() => {
  subject.subscribe({
    next: v => console.log('observerB: '   v)
  });
}, 1000)

下面的输出中,第二个Observer在最后500ms内得到的数值为3、 4、 5:

observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerA: 5
observerB: 3
observerB: 4
observerB: 5
observerA: 6
observerB: 6
...

一个简单的例子

import { fromEvent } from 'rxjs';
import { take } from 'rxjs/operators';

const eleBtn = document.querySelector('#btn')
const click$ = fromEvent(eleBtn, 'click')

click$.pipe(take(1))
  .subscribe(e => {
    console.log('只可点击一次')
    eleBtn.setAttribute('disabled', '')
  })

这里演示了 RxJS 的大概用法,通过 fromEvent 将点击事件转换为 RxJS 的 Observable (响应式数据流),take(1) 表示只操作一次,观察者通过订阅(subscribe)来响应变化。具体 API 的使用会在后面讲到。

演示地址

代表流的变量用 $ 符号结尾,是 RxJS 中的一种惯例。

AsyncSubject

AsyncSubject表示只有最后一个Observable execution的值会被发送给observers,仅仅发生在执行完成时

var subject = new Rx.AsyncSubject();

subject.subscrbe({
  next: v => console.log('onbserverA: '   v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: '   v)
});

subject.next(5);
subject.complete();

输出:

observerA: 5
observerB: 5

AsyncSubject类似于一个last() operator,他等待complete通知来传递一个唯一的值。

tap 操作符

我们可以使用 tap 操作符来进行调试。

拦截源 Observable 的每一次发送,执行一个函数,返回源 Observable 的镜像 Observable。

这个 API 有助于我们对 Observable 的值进行验证(debug)和执行一个会带来副作用的函数,而不会影响源 Observable。如我们用鼠标进行 canvas 绘图,鼠标按下是开始画图,鼠标松开即停止。我们需要在 mousedown 的时候进行 moveTo,否则这次画的会和上次画的连在一起。我们应该把这个会带来副作用过程放在 tap 操作符的函数中,这样才不会影响原来的数据流。

tap 操作符和订阅并不相同,tap 返回的 Observable 如果没有被订阅,tap 中产生副作用的函数并不会执行。

你可以使用你的observables来转换值。

这是一个每次点击添加x坐标的程序:

var count = 0;
var rate = 1000;
var lastClick = Date.now() - rate;
var button = document.querySelector('button');
button.addEventListener('click', (event) => {
  if (Date.now() - lastClick >= rate){
    count  = event.clientX;
    console.log(count);
    lastClick = Date.now();
  }
})

使用Rxjs:

var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
  .throttleTime(1000)
  .map(event => event.clientX)
  .scan((count, clientX) => count   clientX, 0)
  .subscribe(count => console.log(count));

另外的producing操作符:pluck、pairwise、sample等

认识 RxJS

Scheduler类型

异步Scheduler只是RxJS提供的一种Scheduler。通过使用Scheduler的静态方法可以创建下面的类型

Scheduler Purpose
null 不使用Scheduler, notifications将会被同步和递归地交付给Observer。使用这个来进行常量操作或者尾部递归操作
Rx.Scheduler.queue Schedules on a queue in the current event frame (trampoline scheduler). Use this for iteration operations.
Rx.Scheduler.asap Schedules on the micro task queue, which uses the fastest transport mechanism available, either Node.js' process.nextTick() or Web Worker MessageChannel or setTimeout or others. Use this for asynchronous conversions.
Rx.Scheduler.async Schedules work with setInterval. Use this for time-based operations.

Promise

使用 Promise 可以减轻一些异步问题,如将回调函数变为串行的链式调用,统一同步和异步代码等,async/await 中也可以使用 try/catch 来捕获错误。但是对于复杂的场景,仍然难于处理。而且 Promise 还有其他的问题,一是只有一个结果,二是不可以取消。

Subscription

什么是Subscription?一个Subscription代表了一个一次性的资源,通常表示的是一个Observable execution。一个Subscription有一个重要的方法,unsubscribe,它不需要参数,仅仅是处理subscription的资源。在之前的RxJS版本中,Subscription被称作"Disposable"。

var observable = Rx.Observable.interval(1000);
var subscription = observable.subscribe(x => console.log(x));
// Later:
// This cancels the ongoing Observable execution which
// was started by calling subscribe with an Observer.
subscription.unsubscribe();

一个Subscription实质上是一个unsubscribe()函数,用来释放资源或者取消一个Observable executions。

Subscriptions也可以放在一起,这样会导致使用一个unsubscribe()将取消多个Observable executions。你可以这么做:

var observable1 = Rx.Observable.interval(400);
var observable2 = Rx.Observable.interval(300);

var subscription = observable1.subscribe(x => console.log('first: '   x));
var childSubscription = observable2.subscribe(x => console.log('second: '   x));

subscription.add(childSubscription);

setTimeout(() => {
  // Unsubscribes BOTH subscription and childSubscription
  subscription.unsubscribe();
}, 1000);

当执行时,我们将看到如下输出:

second: 0
first: 0
second: 1
first: 1
second: 2

Subscriptions有一个remove(otherSubscription)方法,用来移除关联的Subscirption

Scheduler

Scheduler(调度器)用于控制数据流中数据的推送节奏。

import { range, asapScheduler } from 'rxjs'

const source$ = range(1, 3, asapScheduler)

console.log('before subscribe')
source$.subscribe(x => console.log(x))
console.log('subscribed')

上面的代码,如果去掉 asapScheduler 参数,因为 range 是同步的,会先输出 1, 2, 3,再输出 'subscribed',但是加了以后就变成 先输出 'subscribed',改变了原来数据产生的方式。asap 是 as soon as possible 的缩写,同步任务完成后就会马上执行。

Scheduler 拥有一个虚拟时钟,如 interval 创建的数据流每隔一段时间要发出数据,由 Scheduler 提供时间来判断是否到了发送数据的时间。

选择一个operator

你需要为你的程序选择一个适当的operator吗?先从下面的列表选择一个:

  • 我已经有了一个Observable
  • 我想改变每个传递的值
    • 让它成为一个固定(constant)的值
      • 你应该使用mapTo
    • 通过公式计算出来的值
      • 你应该使用map
  • 我想选择每个传递值的属性
    • 你应该使用pluck
  • 我想查看每个被传递的值,但是不影响它们
    • 你应该使用do
  • 我想过滤某些值
    • 基于一个自定义的逻辑
      • 你应该使用filter

更多内容参考官网:Choose an operator

interval、timer

interval 和 JS 中的 setInterval 类似,参数为间隔时间,下面的代码每隔 1000 ms 会发出一个递增的整数。

interval(1000).subscribe(console.log)
// 0
// 1
// 2
// ...

timer 则可以接收两个参数,第一个参数为发出第一个值需要等待的时间,第二个参数为之后的间隔时间。第一个参数可以是数字,也可以是一个 Date 对象,第二个参数可省。

Subject

什么是Subject?一个RxJS Subject是一个特殊类型的Observable,它允许值可以多路广播给多个Observers。普通的Observables是单路广播(每个subscribed Observer拥有自己独立的Observable execution),Subjects是多路广播。

一个Subject像一个Observable,但是可以多路广播给Observers。Subjects像Eventmitters:它们维持许多注册过的监听器。

每个subject是一个Observable。给定一个Subject,你可以通过提供一个Observer来订阅(subscribe)它,然后开始正常的接收值。从Observer的角度来看,他不能告知Observer的Observable execution到底是来自一个不同的单路传播的Observable,还是来自Subject。

在Subject的内部,subscribe并没有调用一个新的execute去传递数据。它只是简单的注册Observers列表中的一个Observer,类似于addListener的使用。

每个subject是一个Observer。他是拥有next(v),error(e)和complete()方法的对象。为了给Subject一个新值,只需要调用 next(theValue),他讲多路传播给注册过的Observer。

在下面的例子中,我们在Subject中注册了两个Observers,我们传递一些值给Subject:

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: '   v)
});
subject.subscribe({
  next: (v) => console.log('observerB: '   v)
});

subject.next(1);
subject.next(2);

输出:

observerA: 1
observerB: 1
observerA: 2
observerB: 2

因为Subject同时也是一个Observer,这意味着你应该提供一个Subject作为Observable的subscribe的参数,像这样:

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: '   v)
});
subject.subscribe({
  next: (v) => console.log('observerB: '   v)
});

var observable = Rx.Observable.from([1, 2, 3]);

observable.subscribe(subject);  // You can subscribe providing a Subject

执行如下:

observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

在上面的处理中,我们本质上仅仅是通过Subject将一个单路广播的Observable execution变为多路广播的。这个演示展示了Subjects是怎样将单路广播变为多路广播的。

这里有几个特殊的Subject类型:BehaviorSubject、ReplaySubject和AsyncSubject。

Scheduler 实例

  • undefined/null:不指定 Scheduler,代表同步执行的 Scheduler
  • asap:尽快执行的 Scheduler
  • async:利用 setInterval 实现的 Scheduler
  • queue:利用队列实现的 Scheduler,用于迭代一个的大的集合的场景。
  • animationFrame:用于动画的 Scheduler

asap 会尽量使用 micro task,而 async 会使用 macro task。

处理(Disposing)Observable Executions

Observable Executing的个数可能是无限个,Observer中应该处理有限个next,所以我们需要一个API来停止execution。因为execution在每个Observer中都是独立的,一旦Observer完成接收值,它必须有一个方法来停止executing。

当 observable.subscribe 被调用,Observer将被附加到一个新创建的Observable execution中,这次调用将返回一个对象,即Subscription:

var subscription = observable.subscribe(x => console.log(x));

Subscription代表了一个进行中的executing,它有一个最小的API允许你取消execution。可以在这里阅读更多有关于 Subscription type here 的东西。使用 subscription.unsubscribe() 你可以取消正在进行的execution:

var observable = Rx.Observable.from([10, 20, 30]);
var subscription = observable.subscribe(x => console.log(x));
//  Later:
subscription.unsubscribe();

当我们使用create()创建一个Observable时,我们必须定义execution怎么处理资源。你可以通过返回一个自定义的 unsubscribe 函数来实现该步骤。

var observable = Rx.Observable.create(function subscribe(observer){
  var intervalID = setInterval(() => {
    observer.next('hi')
  });

  return function unsubscribe(){
    clearInterval(intervalID);
  }
})

然后这样来调用:

function subscribe(observer) {
  var intervalID = setInterval(() => {
    observer.next('hi');
  }, 1000);

  return function unsubscribe() {
    clearInterval(intervalID);
  };
}

var unsubscribe = subscribe({next: (x) => console.log(x)});

// Later:
unsubscribe(); // dispose the resources

dalay、delayWhen

用来延迟上游 Observable 数据的发出。

delay 可以接受一个数字(单位默认为 ms)或者 date 对象作为延迟控制。

const clicks = fromEvent(document, 'click')
const delayedClicks = clicks.pipe(delay(1000)) // 所有点击事件延迟 1 秒
delayedClicks.subscribe(x => console.log(x))

我们前面介绍过 bufferWhen,dalayWhen 也带有 when,在 RxJS 中,这种操作符它接收的参数都是 Observable Factory,即一个返回 Observable 对象的回调函数,用这个 Observable 来进行控制。

每个 click 都延迟 0 至 5 秒之间的任意一个时间:

const clicks = fromEvent(document, 'click')
const delayedClicks = clicks.pipe(
  delayWhen(event => interval(Math.random() * 5000)),
)
delayedClicks.subscribe(x => console.log(x))

使用Schedulers

见Using Schedulers

相关操作符

一些创建数据流的方法可以提供 Scheduler 参数,合并类操作符如 merge 也可以,在创建数据流后我们也可以使用操作符,使得产生的下游 Observable 推送数据的节奏由指定的 Scheduler 来控制。这个操作符就是 observeOn。

const tick$ = interval(10) // Intervals are scheduled with async scheduler by default...
tick$.pipe(
  observeOn(animationFrameScheduler)  // but we will observe on animationFrame scheduler to ensure smooth animation.
)
.subscribe(val => {
  someDiv.style.height = val   'px'
})

本来每 10 ms 就会发送一个数据,修改 Scheduler 为 animationFrame 后只有浏览器重绘才会发送数据更新样式。

我们还可以通过操作符 subscribeOn 控制订阅的时机。

const source$ = new Observable(observer => {
  console.log('on subscribe')
  observer.next(1)
  observer.next(2)
  observer.next(3)
  return () => {
    console.log('on unsubscribe')
  }
})

const tweaked$ = source$.pipe(subscribeOn(asapScheduler))

console.log('before subscribe')
tweaked$.subscribe(x => console.log(x))
console.log('subscribed')
// before subscribe
// subscribed
// on subscribe
// 1
// 2
// 3

通过 subscribeOn(asapScheduler),我们把订阅时间推迟到尽快执行。

RxJS 最经典的例子——AutoComplete

有一个用于搜索的 input,当输入时自动发送 ajax,并在下方显示结果列表,然后可以选择结果,这就是我们常见的 AutoComplete 效果。要实现这个效果有很多细节要考虑,如防止 race condition 和优化请求次数。

<div class="autocomplete">
    <input class="input" type="search" id="search" autocomplete="off">
    <ul id="suggest-list" class="suggest"></ul>
</div>

先获取两个 DOM 元素:

const input = document.querySelector('#search');
const suggestList = document.querySelector('#suggest-list');

我们先将输入框的 input 的事件转化为 Observable。

const input$ = fromEvent(input, 'input');

然后我们根据输入的值去发送 ajax 请求,由于我们是要获取最新的值而丢弃之前 ajax 返回的值,我们应该使用 switchMap 操作符。通过使用这个操作符,我们解决了 race condition 问题。

input$.pipe(
  switchMap(e => from(getSuggestList(e.target.value)))
)

getSuggestList 是一个发送 ajax 请求的方法,返回 promise,我们使用 from 来将其转化为 Observable。

为了优化请求,首先 e.target.value 是空字符串时不应该发送请求,然后可以使用 debounceTime 减少触发频率,也可以使用 distinctUntilChanged 操作符来表示只有与上次不同时才去发送请求。我们还可以在 API 失败时重试 3 次。

input$.pipe(
  filter(e => e.target.value.length > 1),
  debounceTime(300),
  distinctUntilChanged(),
    switchMap(
      e => from(getSuggestList(e.target.value)).pipe(retry(3))
    )
  )

然后我们去订阅渲染就可以了。

对于结果列表上的点击事件,比较简单,具体见demo。

退订(unsubscribe)

观察者想退订,只要调用订阅返回的对象的 unsubscribe 方法,这样观察者就再也不会接受到 Observable 的信息了。

const source$ = new Observable(observer => {
  let number = 1
  setInterval(() => {
    observer.next(number  )
  }, 1000)
})

const observer = {
  next : item => console.log(item)
}

const subscription = source$.subscribe(observer)

setTimeout(() => {
  subscription.unsubscribe()
}, 5000)

debounceTime、throttleTime

类似 lodash 的 debounce 和 throttle,用来降低事件的触发频率。

我们做搜索时,常常要对输入进行 debounce 来减少请求频率。

fromEvent(document.querySelector('#searchInput'), 'input').pipe(
  debounceTime(300),
  map(e => e.target.value)
).subscribe(
  input => document.querySelector('#text').textContent = input
  // 发送请求
)

RxJS 的业务实践

可以看看徐飞的相关思考:流动的数据——使用 RxJS 构造复杂单页应用的数据逻辑

操作符

操作符其实看作是处理数据流的管道,每个操作符实现了针对某个小的具体应用问题的功能,RxJS 编程最大的难点其实就是如何去组合这些操作符从而解决我们的问题。

在 RxJS 中,有各种各样的操作符,有转化类、过滤类、合并类、多播类、错误处理类、辅助工具类等等。一般不需要自己去实现操作符,但是我们需要知道操作符是一个函数,实现的时候必须考虑以下功能:

  1. 返回一个全新的 Observable 对象
  2. 对上游和下游的订阅和退订处理
  3. 处理异常情况
  4. 及时释放资源

异常错误处理

异常处理的难点:

  1. try/catch 只支持同步
  2. 回调函数容易形成回调地狱,而且每个回调函数的最开始都要判断是否存在错误
  3. Promise 不能重试,而且不强制异常被捕获

对错误处理的处理可以分为两类,即恢复(recover)和重试(retry)。

恢复是虽然发生了错误但是让程序继续运行下去。重试,是认为这个错误是临时的,重试尝试发生错误的操作。实际中往往配合使用,因为一般重试是由次数限制的,当尝试超过这个限制时,我们应该使用恢复的方法让程序继续下去。

1)catchError

catchError 用来在管道中捕获上游传递过来的错误。

interval(1000).pipe(
  take(6),
  map(x => {
    if (x === 4) {
      throw new Error('unlucky number 4')
    } else {
      return x
    }
  }),
  catchError(err => of(8))
).subscribe(x => console.log(x))
// 0
// 1
// 2
// 3
// 8

catchError 中的回调函数返回了一个 Observable,当捕获到上游的错误时,调用这个函数,返回的 Observable 中发出的数据会传递给下游。因此上面当 x 为4 时发生了错误,会用 8 来替换。

catchError 中的回调函数除了接收错误对象为参数外,还有第二个参数 caught$ 表示上游的 Observable 对象。如果回调函数返回这个 Observable 对象,就会进行重试。

interval(1000).pipe(
  take(6),
  map(x => {
    if (x === 4) {
      throw new Error('unlucky number 4')
    } else {
      return x
    }
  }),
  catchError((err, caught$) => caught$),
  take(20)
).subscribe(x => console.log(x))

这个代码会依次输出 5 次 0, 1, 2, 3。

2)retry

retry 可以接收一个整数作为参数,表示重试次数,如果是负数或者没有传参,会无限次重试。重试实际上就是退订再重新订阅。

interval(1000).pipe(
      take(6),
      map(x => {
        if (x === 4) {
          throw new Error('unlucky number 4')
        } else {
          return x
        }
      }),
      retry(5) // 重试 5 次
    ).subscribe(x => console.log(x))

在实际开发中,如果是代码原因造成的错误,重试没有意义,如果是因为外部资源导致的异常错误适合重试,如用户网络或者服务器偶尔不稳定的时候。

3)retryWhen

和前面带 when 的操作符一样,retryWhen 操作符接收一个返回 Observable 的回调函数,用这个 Observable 来控制重试的节奏。当这个 Observable 发出一个数据时就会进行一次重试,它完结时 retryWhen 返回的 Observable 也立即完结。

interval(1000).pipe(
  take(6),
  map(x => {
    if (x === 4) {
      throw new Error('unlucky number 4')
    } else {
      return x
    }
  }),
  retryWhen(err$ => err$.pipe(
    delay(1000),
    take(5))
  ) // 延迟 1 秒后重试,重试 5 次
).subscribe(x => console.log(x))

retryWhen 的可定制性非常高,不仅可以实现延迟定制,还可以实现 retry 的控制重试次数。在实践中,这种重试频率固定的方法还不够好,如果之前的重试失败,之后重试成功的几率也不高。Angular 官网介绍了一个 Exponential backoff 的方法。将每次重试的延迟时间控制为指数级增长。

import { pipe, range, timer, zip } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { retryWhen, map, mergeMap } from 'rxjs/operators';

function backoff(maxTries, ms) {
 return pipe(
   retryWhen(attempts => range(1, maxTries)
     .pipe(
       zip(attempts, (i) => i),
       map(i => i * i),
       mergeMap(i =>  timer(i * ms))
     )
   )
 );
}

ajax('/api/endpoint')
  .pipe(backoff(3, 250))
  .subscribe(data => handleData(data));

function handleData(data) {
  // ...
}

4)finalize

返回上游数据流的镜像 Observable,当上游的 Observable 完结或出错时调用传给它的函数,不影响数据流。

interval(1000).pipe(
  take(6),
  map(x => {
    if (x === 4) {
      throw new Error('unlucky number 4')
    } else {
      return x
    }
  }),
  finalize(() => console.log('finally'))
).subscribe(x => console.log('a'))

几个类似数组方法的基础操作符

map、filter 和数组的 map、filter 方法类似,scan 则是和 reduce 方法类似,mapTo 是将所有发出的数据映射到一个给定的值。

import {mapTo} from 'rxjs/operators'

fromEvent(document, 'click').pipe(
  mapTo('Hi')
).subscribe(x => console.log(x))

每次点击页面时都会输出 Hi。

distinct、distinctUntilChanged

distinct 操作符可以用来去重,将上游重复的数据过滤掉。

of(1, 1, 2, 2, 2, 1, 2, 3, 4, 3, 2, 1).pipe(
  zip(interval(1000)),
  map(arr => arr[0]),
  distinct()
).subscribe(x => console.log(x))

上面的代码只会输出 1, 2, 3, 4

distinct 操作符还可以接收一个 keySelector 的函数作为参数,这是官网的一个 typescript 的例子:

interface Person {
  age: number,
  name: string
}

of<Person>(
  { age: 4, name: 'Foo' },
  { age: 7, name: 'Bar' },
  { age: 5, name: 'Foo' },
).pipe(
  distinct((p: Person) => p.name),
).subscribe(x => console.log(x))

// { age: 4, name: 'Foo' }
// { age: 7, name: 'Bar' }

distinctUntilChanged 也是过滤重复数据,但是只会与上一次发出的元素比较。这个操作符比 distinct 更常用。distinct 要与之前发出的不重复的值进行比较,因此要在内部存储这些值,要小心内存泄漏,而 distinctUntilChanged 只用保存上一个的值。

RxJS 的一些实践

观察者 Observer

观察者 Observer 是一个有三个方法的对象:

  • next: 当 Observable 发出新的值时被调用,接收这个值作为参数
  • complete:当 Observable 完结,没有更多数据时被调用。complete 之后,next 方法无效
  • error:当 Observable 内部发生错误时被调用,之后不会调用 complete,next 方法无效

    const source$ = new Observable(observer => {
      observer.next(1)
      observer.next(2)
      observer.complete()
      observer.next(3)
    })
    
    const observer = {
      next: item => console.log(item),
      complete: () => console.log('complete')
    }
    
    source$.subscribe(observer)
    

上面的代码会输出 1,2,'complete',而不会输出 3。

const source$ = new Observable(observer => {
  try {
    observer.next(1)
    observer.next(2)
    throw new Error('there is an exception')
    observer.complete()
  } catch (e) {
    observer.error(e)
  }
})

const observer = {
  next: item => console.log(item),
  error: e => console.log(e),
  complete: () => console.log('complete')
}

source$.subscribe(observer)

注意 error 之后不会再调用 complete。

Observer 还有简单形式,即不用构建一个对象,而是直接把函数作为 subscribe 方法的参数。

source$.subscribe(
  item => console.log(item),
  e => console.log(e),
  () => console.log('complete')
)

参数依次为 next 、error、complete,后面两个参数可以省略。

defer

defer 创建的 Observable 只有在订阅时才会去创建我们真正想要操作的 Observable。defer 延迟了创建 Observable,而又有一个 Observable 方便我们去订阅,这样也就推迟了占用资源。

defer(() => ajax(ajaxUrl))

只有订阅了才会去发送 ajax 请求。

异步常见的问题

  • 回调地狱(Callback Hell)
  • 竞态条件(Race Condition)
  • 内存泄漏(Memory Leak)
  • 管理复杂状态(Manage Complex States)
  • 错误处理(Exception Handling)

回调地狱就是指层层嵌套的回调函数,造成代码难以理解,并且难以协调组织复杂的操作。

竞态条件出现的原因是无法保证异步操作的完成会和他们开始时的顺序一样,因此最终结果不可控。比如常见的 AutoComplete 效果,每次输入后向后端发送请求获取结果展示在搜索框下面,由于网络、后端数据查询等原因有可能出现最后发送的请求比之前的请求更快地完成了,这时最终展现的并不是最后那个请求的结果,而这并不是我们所希望的。

这里说的内存泄漏指的是单页应用切换页面时由于忘记在合适的时机移除监听事件造成的内存泄漏。

异步带来了状态的改变,可能会使状态管理变得非常复杂,尤其是某个状态有多个来源时,比如有些应用,一开始有一个默认值,再通过 AJAX 获取初始状态,存储在 localStorage,之后通过 WebSocket 获取更新。这时查询状态可能是同步或者异步的,状态的变更可能是主动获取也可能是被动推送的,如果还有各种排序、筛选,状态管理将会更加复杂。

JavaScript 中的 try/catch 只能捕获同步的错误,异步的错误不易处理。

创建 Observable

创建 Observable 的这些方法就是用来创建 Observable 数据流的,注意和操作符不同,它们是从 rxjs 中导入的,而不是 rxjs/operators

缓存

把上游的多个数据缓存起来,当时机合适时再把汇聚的数据传给下游。

1)buffer、bufferTime、bufferCount、bufferWhen、bufferToggle

对于 buffer 这一组操作符,数据汇聚的形式就是数组。

buffer 接收一个 Observable 作为 notifier,当 notifier 发出数据时,将 缓存的数据传给下游。

interval(300).pipe(
  take(30),
  buffer(interval(1000))
).subscribe(
  x => console.log(x)
)
// [0, 1, 2]
// [3, 4, 5]
// [6, 7, 8]
// [9, 10, 11, 12]

bufferTime 是用时间来控制时机,上面可以改成 bufferTime(1000)

bufferCount 是用数量来控制时机,如 3 个一组,bufferCount(3)

bufferWhen 接收一个叫做 closeSelector 的参数,它应该返回一个 Observable。通过这个 Observable 来控制缓存。这个函数没有参数。下面的方法等价于前面的 buffer:

interval(300).pipe(
  take(30),
  bufferWhen(() => {
    return interval(1000)
  })
).subscribe(
  x => console.log(x)
)

bufferToggle 和 buffer 的不同是可以不断地控制缓存窗口的开和关,一个参数是一个 Observable,称为 opening,第二个参数是称为 closeSelector 的一个函数。这个函数的参数是 opening 产生的数据。前一个参数用来控制缓存的开始时间,后一个控制缓存的结束。与 bufferWhen 相比,它的 closeSelector 可以接收参数,控制性更强。

我们可以使用 buffer 来做事件的过滤,下面的代码只有 500ms 内连续点击两次以上才会输出 ‘success’ 。

fromEvent(document.querySelector('#btn'), 'click').pipe(
  bufferTime(500),
  filter(arr => arr.length >= 2)
).subscribe(
  x => console.log('success')
)

2)window、windowTime、windowCount、windowWhen、windowToggle

与前面的 buffer 类似,不过 window 缓存数据汇聚的形式是 Observable,因此形成了高阶 Observable。

为什么要使用 RxJS

RxJS 是一套处理异步编程的 API,那么我将从异步讲起。

前端编程中的异步有:事件(event)、AJAX、动画(animation)、定时器(timer)。

TestScheduler

RxJS 中有一个 用于测试的 TestScheduler,RxJS 的测试大家可以查看程墨的《深入浅出 RxJS》或者其他资料。

import { TestScheduler } from 'rxjs/testing'

pipeable 操作符

之前版本的 RxJS 各种操作符都挂载到了全局 Observable 对象上,可以这样链式调用:

source$.filter(x => x % 2 === 0).map(x => x * 2)

现在需要这样使用:

import {filter, map} from 'rxjs/operators'

source$.pipe(
  filter(x => x % 2 === 0),
  map(x => x * 2)
)

其实也很好理解,pipe 就是管道的意思,数据流通过操作符处理,流出然后交给下一个操作符。

多播操作符

前面我们写的 Subject 需要去订阅源数据流和被观察者订阅,写起来比较繁琐,我们可以借助操作符来实现。

1)multicast

使用方式如下,接收一个 subject 或者 subject factory。这个操作符返回了一个 connectable 的 Observable。等到执行 connect() 才会用真的 subject 订阅 source,并开始发送数据,如果没有 connect,Observable 是不会执行的。

const source = interval(1000).pipe(
  map(x => Math.floor(Math.random() * 10)),
  take(3),
  multicast(new Subject)
)

const observerA = {
  next: x => console.log('Observer A: '   x),
  error: null,
  complete: () => console.log('Observer A completed')
}
const observerB = {
  next: x => console.log('Observer B: '   x),
  error: null,
  complete: () => console.log('Observer B completed')
}

source.subscribe(observerA) // subject.subscribe(observerA)

source.connect() // source.subscribe(subject)

setTimeout(() => {
  source.subscribe(observerB) // subject.subscribe(observerB)
}, 1000)

2)refCount

上面使用了 multicast,但是还是有些麻烦,还需要去手动 connect。这时我们可以再搭配 refCount 操作符创建只要有订阅就会自动 connect 的 Observable。只需要去掉 connect 方法调用,在 multicast 后面再加一个 refCount 操作符。

multicast(new Subject),
refCount()

refCount 其实就是自动计数的意思,当 Observer 数量大于 1 时,subject 订阅上游数据流,减少为 0 时退订上游数据流。

3)multicast selector 参数

multicast 第一个参数除了是一个 subject,还可以是一个 subject factory,即返回 subject 的函数。这时使用了不同的中间人,每个观察者订阅时都重新生产数据,适用于退订了上游之后再次订阅的场景。

multicast 还可以接收可选的第二个参数,称为 selector 参数。它可以使用上游数据流任意多次,而不会重复订阅上游的数据。当使用了这个参数时,multicast 不会返回 connectable Observable,而是这个参数(回调函数)返回的 Observable。selecetor 回调函数有一个参数,通常叫做 shared,即 multicast 第一个参数所代表的 subject 对象。

const selector = shared => {
  return shared.pipe(concat(of('done')))
}
const source = interval(1000).pipe(
  take(3),
  multicast(new Subject, selector)
)

const observerA = {
  next: x => console.log('Observer A: '   x),
  error: null,
  complete: () => console.log('Observer A completed')
}
const observerB = {
  next: x => console.log('Observer B: '   x),
  error: null,
  complete: () => console.log('Observer B completed')
}

source.subscribe(observerA)
setTimeout(() => {
  source.subscribe(observerB)
}, 5000)
// Observer A: 0
// Observer A: 1
// Observer A: 2
// Observer A: done
// Observer A completed
// Observer B: done
// Observer B: completed

observerB 订阅时会调用 selector 函数,subject 即shared 已经完结,但是 concat 依然会在这个 Observable 后面加上 'done'。

可以利用 selector 处理 “三角关系”的数据流,如有一个 tick$ 数据流,对其进行 delay(500) 操作后的下游 delayTick$, 一个由它们合并得到的 mergeTick$,这时就形成了三角关系。delayTick$ 和 mergeTick$ 都订阅了 tick$。

const tick$ = interval(1000).pipe(
  take(1),
  tap(x => console.log('source: '   x))
)

const delayTick$ = tick$.pipe(
  delay(500)
)

const mergeTick$ = merge(tick$, delayTick$).subscribe(x => console.log('observer: '   x))
// source: 0
// observer: 0
// source: 0
// observer: 0

从上面的结果我们可以验证,tick$ 被订阅了两次。

我们可以使用 selector 函数来使其只订阅一次,将上面的过程移到 selector 函数内即可。

const source$ = interval(1000).pipe(
  take(1),
  tap(x => console.log('source: '   x))
)

const result$ = source$.pipe(
  multicast(new Subject(), shared => {
    const tick$ = shared
    const delayTick$ = tick$.pipe(delay(500))
    const mergeTick$ = merge(tick$, delayTick$)
    return mergeTick$
  })
)

result$.subscribe(x => console.log('observer: '   x))

这时只会输出一次 'source: 0'。

4)publish

publish 是 multicast 的一种简写方式,效果等同于如下:

function publish (selector) {
  if (selector) {
    return multicast(() => new Subject(), selector)
  } else {
    return multicast(new Subject())
  }
}

有上一节说到的 selector 函数时,等价于:

multicast(() => new Subject(), selector)

没有时,等价于:

multicast(new Subject())

5)share

share 是 multicast 和 refCount 的简写,share() 等同于在 pipe 中先调用了 multicast(() => new Subject()),再调用了 refCount()。

const source = interval(1000).pipe(
  take(3),
  share()
)

const observerA = {
  next: x => console.log('Observer A: '   x),
  error: null,
  complete: () => console.log('Observer A completed')
}
const observerB = {
  next: x => console.log('Observer B: '   x),
  error: null,
  complete: () => console.log('Observer B completed')
}

source.subscribe(observerA)
setTimeout(() => {
  source.subscribe(observerB)
}, 5000)
// Observer A: 0
// Observer A: 1
// Observer A: 2
// Observer A completed
// Observer B: 0
// Observer B: 1
// Observer B: 2
// Observer B completed

由于 share 是调用了 subject 工厂函数,而不是一个 subject 对象,因此 observerB 订阅时可以重新获取数据。

6)publishLast、publishBehavior、publishReplay

同前面的 publish,只不过使用的不是普通 Subject,而是对应的 AsyncSubject、BehaviorSubject、ReplaySubject。

RxJS 入门

RxJS 与前端框架结合

Angular 自身引用了 RxJS,如 http 和 animation 都使用了 Observable,状态管理可以使用 ngrx。

Vue 官方有与 RxJS 集成的 vue-rx。

React 可以通过 Subject 建立桥梁,Redux 也有与 RxJS 结合的中间件 Redux-Observable。

RxJS 的特点

  • 数据流抽象了很多现实问题
  • 擅长处理异步问题
  • 把复杂问题分解为简单问题的组合

前端中的 DOM 事件、WebSocket 推送消息、AJAX 请求资源、动画都可以看作是数据流。

RxJS 对数据采用“推”的方式,当一个数据产生时,会将其推送给对应的处理函数,这个处理函数不用关心数据时同步产生还是异步产生的,因此处理异步将会变得非常简单。

RxJS 中很多操作符,每个操作符都提供了一个小功能,学习 RxJS 最重要的就是学习如何组合操作符来解决复杂问题。

range

操作符 of 产生较少的数据时可以直接写如 of(1, 2, 3),但是如果是 100 个呢?这时我们可以使用 range 操作符。

range(1, 100) // 产生 1 到 100 的正整数

BehaviorSubject、ReplaySubject、AsyncSubject

1)BehaviorSubject

BehaviorSubject 需要在实例化时给定一个初始值,如果没有默认是 undefined,每次订阅时都会发出最新的状态,即使已经错过数据的发送时间。

const observerA = {
  next: x => console.log('Observer A: '   x)
}
const observerB = {
  next: x => console.log('Observer B: '   x)
}

const subject = new BehaviorSubject(0)

subject.subscribe(observerA) // Observer A: 0

subject.next(1) // Observer A: 1
subject.next(2) // Observer A: 2
subject.next(3) // Observer A: 3

setTimeout(() => {
  subject.subscribe(observerB) // Observer B: 3
}, 500)

observerB 已经错过流数据的发送时间,但是订阅时也能获取到最新数据 3。

BehaviorSubject 有点类似于状态,一开始可以提供初始状态,之后订阅都可以获取最新的状态。

2)ReplaySubject

ReplaySubject 表示重放,在新的观察者订阅时重新发送原来的数据,可以通过参数指定重放最后几个数据。

const observerA = {
  next: x => console.log('Observer A: '   x)
}
const observerB = {
  next: x => console.log('Observer B: '   x)
}

const subject = new ReplaySubject(2) // 重放最后两个

subject.subscribe(observerA)

subject.next(1) // Observer A: 1
subject.next(2) // Observer A: 2
subject.next(3) // Observer A: 3
subject.complete()

setTimeout(() => {
  subject.subscribe(observerB)
  // Observer B: 2
  // Observer B: 3
}, 500)

这里我们可以看到,即使 subject 完结后再去订阅依然可以重放最后两个数据。

ReplaySubject(1) 和前面的 BehaviorSubject 是不一样的,首先后者可以提供默认数据,而前者不行,其次前者在 subject 终结后再去订阅依然可以得到最近发出的数据而后者不行。

3)AsyncSubject

AsyncSubject 有点类似 operator last,会在 subject 完结后送出最后一个值。

const subject = new AsyncSubject()

subject.subscribe(observerA)

subject.next(1)
subject.next(2)
subject.next(3)
subject.complete()
// Observer A: 3
setTimeout(() => {
  subject.subscribe(observerB)
  // Observer B: 3
}, 500)

observerA 即使早就订阅了,但是并不会响应前面的 next,完结后才接收到最后一个值 3。

Subject

为了防止每次订阅都重新生产一份数据流,我们可以使用中间人,让这个中间人去订阅源数据流,观察者都去订阅这个中间人。这个中间人能去订阅数据流,所以是个 Observer,又能被观察者订阅,所以也是 Observable。我们可以自己实现一个这样的中间人:

const subject = {
  observers: [],
  subscribe: function (observer) {
    this.observers.push(observer)
  },
  next: function (value) {
    this.observers.forEach(o => o.next(value))
  },
  error: function (error) {
    this.observers.forEach(o => o.error(error))
  },
  complete: function () {
    this.observers.forEach(o => o.complete())
  }
}

这个 subject 拥有 Observer 的 next、error、complete 方法,每次被观察者订阅时都会在内部保存这个观察者。当接收到源数据流的数据时,会把数据发送给每一个观察者。

const source$ = interval(1000).pipe(
  map(x => Math.floor(Math.random() * 10)),
  take(3)
)

const observerA = {
  next: x => console.log('Observer A: '   x),
  error: null,
  complete: () => console.log('Observer A completed')
}
const observerB = {
  next: x => console.log('Observer B: '   x),
  error: null,
  complete: () => console.log('Observer B completed')
}

source$.subscribe(subject)
subject.subscribe(observerA)
setTimeout(() => {
  subject.subscribe(observerB)
}, 1000)

这时我们发现两个观察者接收到的是同一份数据,ObserverB 由于延迟一秒订阅,所以少接收到一个数据。将我们自己实现的 subject 换成 RxJS 中的 Subject,效果相同:

import { Subject } from 'rxjs'
const subject = new Subject()

从上面可以看到,Subject 和 Observable 有一个很大的不同:它内部保存有一个观察者列表。

前面的 subject 是在源数据流发出值时调用 next 方法,向订阅的观察者发送这个值,我们也可以手动调用 subject 的next 方法送出值:

const observerA = {
  next: x => console.log('Observer A: '   x)
}
const observerB = {
  next: x => console.log('Observer B: '   x)
}

const subject = new Subject()

subject.subscribe(observerA)
setTimeout(() => {
  subject.subscribe(observerB)
}, 500)

subject.next(1)
setTimeout(() => {
  subject.next(2)
}, 1000)

总结一下,Subject 既是 Observable 又是 Observer,它会对内部的 observers 清单进行组播(multicast)。

fromEventPattern 方法

将添加事件处理器、删除事件处理器的 API 转化为 Observable。

function addClickHandler (handler) {
  document.addEventListener('click', handler)
}

function removeClickHandler (handler) {
  document.removeEventListener('click', handler)
}

fromEventPattern(
  addClickHandler,
  removeClickHandler
).subscribe(x => console.log(x))

也可以是我们自己实现的和事件类似,拥有注册监听和移除监听的 API。

import { fromEventPattern } from 'rxjs'

class EventEmitter {
  constructor () {
    this.handlers = {}
  }
  on (eventName, handler) {
    if (!this.handlers[eventName]) {
      this.handlers[eventName] = []
    }
    if(typeof handler === 'function') {
        this.handlers[eventName].push(handler)
    } else {
        throw new Error('handler 不是函数!!!')
    }
  }
  off (eventName, handler) {
    this.handlers[eventName].splice(this.handlers[eventName].indexOf(handler), 1)
  }
  emit (eventName, ...args) {
    this.handlers[eventName].forEach(handler => {
      handler(...args)
    })
  }
}

const event = new EventEmitter()

const subscription = fromEventPattern(
  event.on.bind(event, 'say'), 
  event.off.bind(event, 'say')
).subscribe(x => console.log(x))

let timer = (() => {
  let number = 1
  return setInterval(() => {
    if (number === 5) {
      clearInterval(timer)
      timer = null
    }
    event.emit('say', number  )
  }, 1000)
})()

setTimeout(() => {
  subscription.unsubscribe()
}, 3000)

演示地址

一些过滤的操作符

  • take 是从数据流中选取最先发出的若干数据
  • takeLast 是从数据流中选取最后发出的若干数据
  • takeUntil 是从数据流中选取直到发生某种情况前发出的若干数据
  • first 是获得满足判断条件的第一个数据
  • last 是获得满足判断条件的最后一个数据
  • skip 是从数据流中忽略最先发出的若干数据
  • skipLast 是从数据流中忽略最后发出的若干数据

    import { interval } from 'rxjs';
    import { take } from 'rxjs/operators';
    
    interval(1000).pipe(
      take(3)
    ).subscribe(
      x => console.log(x),
      null,
      () => console.log('complete')
    )
    // 0
    // 1
    // 2
    // 'complete'
    

使用了 take(3),表示只取 3 个数据,Observable 就进入完结状态。

import { interval, fromEvent } from 'rxjs'
import { takeUntil } from 'rxjs/operators'

interval(1000).pipe(
  takeUntil(fromEvent(document.querySelector('#btn'), 'click'))
).subscribe(
  x => { document.querySelector('#time').textContent = x   1 },
  null,
  () => console.log('complete')
)

这里有一个 interval 创建的数据流一直在发出数据,直到当用户点击按钮时停止计时,见演示。

延迟执行(lazy evaluation)

我们传给 new Observable 的回调函数如果没有订阅是不会执行的,订阅一个 Observable 就像是执行一个函数,和下面的函数类似。这和我们常见的那种内部保存有观察者列表的观察者模式是不同的,Observable 内部没有这个观察者列表。

function subscribe (observer) {
  let number = 1
  setInterval(() => {
    observer.next(number  )
  }, 1000)
}

subscribe({
    next: item => console.log(item),
    error: e => console.log(e),
    complete: () => console.log('complete')
})

合并类操作符

合并类操作符用来将多个数据流合并。

1)concat、merge

concat、merge 都是用来把多个 Observable 合并成一个,但是 concat 要等上一个 Observable 对象 complete 之后才会去订阅第二个 Observable 对象获取数据并把数据传给下游,而 merge 时同时处理多个 Observable。使用方式如下:

import { interval } from 'rxjs'
import { merge, take } from 'rxjs/operators'

interval(500).pipe(
  take(3),
  merge(interval(300).pipe(take(6)))
).subscribe(x => console.log(x))

可以点此去比对效果,concat 的结果应该比较好理解,merge 借助弹珠图也比较好理解,它是在时间上对数据进行了合并。

source : ----0----1----2|
source2: --0--1--2--3--4--5|
            merge()
example: --0-01--21-3--(24)--5|

merge 的逻辑类似 OR,经常用来多个按钮有部分相同行为时的处理。

注意最新的官方文档和RxJS v5.x 到 6 的更新指南中指出不推荐使用 merge、concat、combineLatest、race、zip 这些操作符方法,而是推荐使用对应的静态方法。

将上面的 merge 改成从 rxjs 中导入,使用方式变成了合并多个 Observable,而不是一个 Observable 与其他 Observable 合并。

import { interval,merge } from 'rxjs'
import { take } from 'rxjs/operators'

merge(
  interval(500).pipe(take(3)),
  interval(300).pipe(take(6))
).subscribe(x => console.log(x))

2)concatAll、mergeAll、switchAll

用来将高阶的 Observable 对象压平成一阶的 Observable,和 loadash 中压平数组的 flatten 方法类似。concatAll 会对内部的 Observable 对象做 concat 操作,和 concat 操作符类似,如果前一个内部 Observable 没有完结,那么 concatAll 不会订阅下一个内部 Observable,mergeAll 则是同时处理。switchAll 比较特殊一些,它总是切换到最新的内部 Observable 对象获取数据。上游高阶 Observable 产生一个新的内部 Observable 时,switchAll 就会立即订阅最新的内部 Observable,退订之前的,这也就是 ‘switch’ 的含义。

import { interval } from 'rxjs';
import { map, switchAll, take } from 'rxjs/operators';

interval(1500).pipe(
  take(2),
  map(x => interval(1000).pipe(
    map(y => x   ':'   y), 
    take(2))
  ),
  switchAll()
).subscribe(console.log)

// 0:0
// 1:0
// 1:1

内部第一个 Observable 对象的第二个数据还没来得及发出,第二个 Observable 对象就产生了。

3)concatMap、mergeMap、switchMap

从上面的例子我们也可以看到高阶 Observable 常常是由 map 操作符将每个数据映射为 Observable 产生的,而我们订阅的时候需要将其压平为一阶 Observable,而就是要先使用 map 操作符再使用 concatAll 或 mergeAll 或 switchAll 这些操作符中的一个。RxJS 中提供了对应的更简洁的 API。使用的效果可以用下面的公式表示:

concatMap = map   concatAll
mergeMap = map   mergeAll
switchMap = map   switchAll

4)zip、combineLatest、withLatestFrom

zip 有拉链的意思,这个操作符和拉链的相似之处在于数据一定是一一对应的。

import { interval } from 'rxjs';
import { zip, take } from 'rxjs/operators';
const source$ = interval(500).pipe(take(3))
const newest$ = interval(300).pipe(take(6))

source$.pipe(
  zip(newest$, (x, y) => x   y)
).subscribe(x => console.log(x))
// 0
// 2
// 4

zip 是内部的 Observable 都发出相同顺序的数据后才交给下游处理,最后一个参数是可选的 resultSelector 参数,这个函数用来处理操作符的结果。上面的示例运行过程如下:

  1. newest 发出第一个值 0,但这时 source 还没有发出第一个值,所以不执行 resultSelector 函数也不会像下游发出数据
  2. source 发出第一个值 0,此时 newest 之前已发出了第一个值 0,执行 resultSelector 函数得到结果 0,发出这个结果
  3. newest 发出第二个值 1,但这时 source 还没有发出第二个值,所以不执行 resultSelector 函数也不会像下游发出数据
  4. newest 发出第三个值 2,但这时 source 还没有发出第三个值,所以不执行 resultSelector 函数也不会像下游发出数据
  5. source 发出第二个值 1,此时 newest 之前已发出了第一个值 1,执行 resultSelector 函数得到结果 2,发出这个结果
  6. newest 发出第四个值 3,但这时 source 还没有发出第四个值,所以不执行 resultSelector 函数也不会像下游发出数据
  7. source 发出第三个值 2,此时 newest 之前已发出了第一个值 2,执行 resultSelector 函数得到结果 4,发出这个结果
  8. source 完结,不可能再有对应的数据了,整个 Observable 完结

上面如果没有传递最后一个参数 resultSelector 函数,将会依次输出数组 [0, 0]、[1, 1]、[2, 2]。在更新指南中,官方指出不推荐使用 resultSelector 参数,将会在 v7 中移除。加上之前提到的推荐使用静态方法,这个示例应该改成这样:

import { interval, zip } from 'rxjs';
import { take, map } from 'rxjs/operators';

const source$ = interval(500).pipe(take(3))
const newest$ = interval(300).pipe(take(6))

const add = (x, y) => x   y

zip(source$, newest$).pipe(
  map(x => add(...x))
).subscribe(x => console.log(x))

使用 zip 当有数据流吐出数据很快,而有数据流发出值很慢时,要小心数据积压的问题。这时快的数据流已经发出了很多数据,由于对应的数据还没发出,RxJS 只能保存数据,快的数据流不断地发出数据,积压的数据越来越多,消耗的内存也会越来越大。

combineLatest 与 zip 不同,只要其他的 Observable 已经发出过值就行,顾名思义,就是与其他 Observable 最近发出的值结合。

import { interval, combineLatest } from 'rxjs';
import { take } from 'rxjs/operators';

const source$ = interval(500).pipe(take(3))
const newest$ = interval(300).pipe(take(6))

combineLatest(source$, newest$).subscribe(x => console.log(x))
// [0, 0]
// [0, 1]
// [0, 2]
// [1, 2]
// [1, 3]
// [2, 3]
// [2, 4]
// [2, 5]

withLatestFrom 没有静态方法,只有操作符方法,前面的方法所有 Observable 地位是平等的,而这个方法是使用这个操作符的 Observable 起到了主导作用,即只有它发出值才会进行合并产生数据发出给下游。

import { interval } from 'rxjs';
import { take, withLatestFrom } from 'rxjs/operators';

const source$ = interval(500).pipe(take(3))
const newest$ = interval(300).pipe(take(6))

source$.pipe(
  withLatestFrom(newest$)
).subscribe(x => console.log(x))
// [0, 0]
// [1, 2]
// [2, 4]
  1. source 发出 0 时,newest 最新发出的值为 0,结合为 [0, 0] 发出
  2. source 发出 1,此时 newest 最新发出的值为 2,结合为 [1, 2] 发出
  3. source 发出 2,此时 newest 最新发出的值为 4,结合为 [2, 4] 发出
  4. source 完结,整个 Observable 完结

5)startWith、forkJoin、race

startWith 是在 Observable 的一开始加入初始数据,同步立即发送,常用来提供初始状态。

import { fromEvent, from } from 'rxjs';
import { startWith, switchMap } from 'rxjs/operators';

const source$ = fromEvent(document.querySelector('#btn'), 'click')

let number = 0
const fakeRequest = x => {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      resolve(number  )
    }, 1000)
  })
}

source$.pipe(
  startWith('initData'),
  switchMap(x => from(fakeRequest(x)))
).subscribe(x => document.querySelector('#number').textContent = x)

这里通过 startWith 操作符获取了页面的初始数据,之后通过点击按钮获取更新数据。

forkJoin 只有静态方法形式,类似 Promise.all ,它会等内部所有 Observable 都完结之后,将所有 Observable 对象最后发出来的最后一个数据合并成 Observable。

race 操作符产生的 Observable 会完全镜像最先吐出数据的 Observable。

const obs1 = interval(1000).pipe(mapTo('fast one'));
const obs2 = interval(3000).pipe(mapTo('medium one'));
const obs3 = interval(5000).pipe(mapTo('slow one'));

race(obs3, obs1, obs2)
.subscribe(
  winner => console.log(winner)
);

// result:
// a series of 'fast one'

本文由分分快三计划发布,转载请注明来源

关键词: 分分快三计划 JavaScript rxjs