浏览新版,请访问 RxJS Observable

在介绍 Observable 之前,我们要先了解两个设计模式:

  • Observer Pattern - (观察者模式)
  • Iterator Pattern - (迭代器模式)

这两个模式是 Observable 的基础,下面我们先来介绍一下 Observer Pattern。

Observer Pattern

观察者模式定义

观察者模式是软件设计模式的一种。在此种模式中,一个目标对象管理所有相依于它的观察者对象,并且在它本身的状态改变时主动发出通知。这通常透过呼叫各观察者所提供的方法来实现。此种模式通常被用来实时事件处理系统。 — 维基百科

观察者模式又叫发布订阅模式(Publish/Subscribe),它定义了一种一对多的关系,让多个观察者对象同时监听某一个主题对象,这个主题对象的状态发生变化时就会通知所有的观察者对象,使得它们能够自动更新自己。

我们可以使用日常生活中,期刊订阅的例子来形象地解释一下上面的概念。期刊订阅包含两个主要的角色:期刊出版方和订阅者,他们之间的关系如下:

  • 期刊出版方 - 负责期刊的出版和发行工作
  • 订阅者 - 只需执行订阅操作,新版的期刊发布后,就会主动收到通知,如果取消订阅,以后就不会再收到通知

在观察者模式中也有两个主要角色:Subject (主题) 和 Observer (观察者) 。它们分别对应例子中的期刊出版方和订阅者。接下来我们来看张图,从而加深对上面概念的理解。

观察者模式优缺点

观察者模式的优点:

  • 支持简单的广播通信,自动通知所有已经订阅过的对象
  • 目标对象与观察者之间的抽象耦合关系能够单独扩展以及重用

观察者模式的缺点:

  • 如果一个被观察者对象有很多的直接和间接的观察者的话,将所有的观察者都通知到会花费很多时间
  • 如果在观察者和观察目标之间有循环依赖的话,观察目标会触发它们之间进行循环调用,可能导致系统崩溃

观察者模式的应用

在前端领域,观察者模式被广泛地使用。最常见的例子就是为 DOM 对象添加事件监听,具体示例如下:

<button id="btn">确认</button>function clickHandler(event) {console.log('用户已点击确认按钮!');
}
document.getElementById("btn").addEventListener('click', clickHandler);

上面代码中,我们通过 addEventListener API 监听 button 对象上的点击事件,当用户点击按钮时,会自动执行我们的 clickHandler 函数。

观察者模式实战

Subject 类定义:

class Subject {constructor() {this.observerCollection = [];}registerObserver(observer) {this.observerCollection.push(observer);}unregisterObserver(observer) {let index = this.observerCollection.indexOf(observer);if(index >= 0) this.observerCollection.splice(index, 1);}notifyObservers() {this.observerCollection.forEach((observer)=>observer.notify());}
}

Observer 类定义:

class Observer {constructor(name) {this.name = name;}notify() {console.log(`${this.name} has been notified.`);}
}

使用示例:

let subject = new Subject(); // 创建主题对象let observer1 = new Observer('semlinker'); // 创建观察者A - 'semlinker'
let observer2 = new Observer('lolo'); // 创建观察者B - 'lolo'subject.registerObserver(observer1); // 注册观察者A
subject.registerObserver(observer2); // 注册观察者Bsubject.notifyObservers(); // 通知观察者subject.unregisterObserver(observer1); // 移除观察者Asubject.notifyObservers(); // 验证是否成功移除

以上代码成功运行后控制台的输出结果:

semlinker has been notified. # 输出一次
2(unknown) lolo has been notified. # 输出两次

需要注意的是,在观察者模式中,通常情况下调用注册观察者后,会返回一个函数,用于移除监听,有兴趣的读者,可以自己尝试一下。(备注:在 Angular 1.x 中调用 $scope.$on() 方法后,就会返回一个函数,用于移除监听)

Iterator Pattern

迭代器模式定义

迭代器(Iterator)模式,又叫做游标(Cursor)模式。它提供一种方法顺序访问一个聚合对象中的各个元素,而又不需要暴露该对象的内部表示。迭代器模式可以把迭代的过程从业务逻辑中分离出来,在使用迭代器模式之后,即使不关心对象的内部构造,也可以按顺序访问其中的每个元素。

迭代器模式的优缺点

迭代器模式的优点:

  • 简化了遍历方式,对于对象集合的遍历,还是比较麻烦的,对于数组或者有序列表,我们尚可以通过游标取得,但用户需要在对集合了解的前提下,自行遍历对象,但是对于 hash 表来说,用户遍历起来就比较麻烦。而引入迭代器方法后,用户用起来就简单的多了。
  • 封装性良好,用户只需要得到迭代器就可以遍历,而不用去关心遍历算法。

迭代器模式的缺点:

  • 遍历过程是一个单向且不可逆的遍历

ECMAScript 迭代器

在 ECMAScript 中 Iterator 最早其实是要采用类似 Python 的 Iterator 规范,就是 Iterator 在没有元素之后,执行 next 会直接抛出错误;但后来经过一段时间讨论后,决定采更 functional 的做法,改成在取得最后一个元素之后执行 next 永远都回传 { done: true, value: undefined }

一个迭代器对象 ,知道如何每次访问集合中的一项, 并记录它的当前在序列中所在的位置。在 JavaScript 中迭代器是一个对象,它提供了一个 next() 方法,返回序列中的下一项。这个方法返回包含 donevalue 两个属性的对象。对象的取值如下:

  • 在最后一个元素前:{ done: false, value: elementValue }
  • 在最后一个元素后:{ done: true, value: undefined }

详细信息可以参考 - 可迭代协议和迭代器协议

ES 5 迭代器

接下来我们来创建一个 makeIterator 函数,该函数的参数类型是数组,当调用该函数后,返回一个包含 next() 方法的 Iterator 对象, 其中 next() 方法是用来获取容器对象中下一个元素。具体示例如下:

function makeIterator(array){var nextIndex = 0;return {next: function(){return nextIndex < array.length ?{value: array[nextIndex++], done: false} :{done: true};}}
}

一旦初始化, next() 方法可以用来依次访问可迭代对象中的元素:

var it = makeIterator(['yo', 'ya']);
console.log(it.next().value); // 'yo'
console.log(it.next().value); // 'ya'
console.log(it.next().done);  // true

ES 6 迭代器

在 ES 6 中我们可以通过 Symbol.iterator 来创建可迭代对象的内部迭代器,具体示例如下:

let arr = ['a', 'b', 'c'];
let iter = arr[Symbol.iterator]();

调用 next() 方法来获取数组中的元素:

> iter.next()
{ value: 'a', done: false }
> iter.next()
{ value: 'b', done: false }
> iter.next()
{ value: 'c', done: false }
> iter.next()
{ value: undefined, done: true }

ES 6 中可迭代的对象:

  • Arrays
  • Strings
  • Maps
  • Sets
  • DOM data structures (work in progress)

Observable

RxJS 是基于观察者模式和迭代器模式以函数式编程思维来实现的。RxJS 中含有两个基本概念:Observables 与 Observer。Observables 作为被观察者,是一个值或事件的流集合;而 Observer 则作为观察者,根据 Observables 进行处理。

Observables 与 Observer 之间的订阅发布关系(观察者模式) 如下:

  • 订阅:Observer 通过 Observable 提供的 subscribe() 方法订阅 Observable。
  • 发布:Observable 通过回调 next 方法向 Observer 发布事件。

Proposal Observable

  • Proposal Observable
  • Proposal Observable Implementations

    • RxJS 5
    • zen-observable
  • Observables for ECMAScript

自定义 Observable

如果你想真正了解 Observable,最好的方式就是自己写一个。其实 Observable 就是一个函数,它接受一个 Observer 作为参数然后返回另一个函数。

它的基本特征:

  • 是一个函数
  • 接受一个 Observer 对象 (包含 next、error、complete 方法的对象) 作为参数
  • 返回一个 unsubscribe 函数,用于取消订阅

它的作用:

作为生产者与观察者之间的桥梁,并返回一种方法来解除生产者与观察者之间的联系,其中观察者用于处理时间序列上数据流。接下来我们来看一下 Observable 的基础实现:

DataSource - 数据源

class DataSource {constructor() {let i = 0;this._id = setInterval(() => this.emit(i++), 200); // 创建定时器}emit(n) {const limit = 10;  // 设置数据上限值if (this.ondata) {this.ondata(n);}if (n === limit) {if (this.oncomplete) {this.oncomplete();}this.destroy();}}destroy() { // 清除定时器clearInterval(this._id);}
}

myObservable

function myObservable(observer) {let datasource = new DataSource(); // 创建数据源datasource.ondata = (e) => observer.next(e); // 处理数据流datasource.onerror = (err) => observer.error(err); // 处理异常datasource.oncomplete = () => observer.complete(); // 处理数据流终止return () => { // 返回一个函数用于,销毁数据源datasource.destroy();};
}

使用示例:

const unsub = myObservable({next(x) { console.log(x); },error(err) { console.error(err); },complete() { console.log('done')}
});/**
* 移除注释,可以测试取消订阅
*/
// setTimeout(unsub, 500); 

具体运行结果,可以查看线上示例。

SafeObserver - 更好的 Observer

上面的示例中,我们使用一个包含了 next、error、complete 方法的普通 JavaScript 对象来定义观察者。一个普通的 JavaScript 对象只是一个开始,在 RxJS 5 里面,为开发者提供了一些保障机制,来保证一个更安全的观察者。以下是一些比较重要的原则:

  • 传入的 Observer 对象可以不实现所有规定的方法 (next、error、complete 方法)
  • complete 或者 error 触发之后再调用 next 方法是没用的
  • 调用 unsubscribe 方法后,任何方法都不能再被调用了
  • completeerror 触发后,unsubscribe 也会自动调用
  • nextcompleteerror 出现异常时,unsubscribe 也会自动调用以保证资源不会浪费
  • nextcompleteerror是可选的。按需处理即可,不必全部处理

为了完成上述目标,我们得把传入的匿名 Observer 对象封装在一个 SafeObserver 里以提供上述保障。SafeObserver 的具体实现如下:

class SafeObserver {constructor(destination) {this.destination = destination;}next(value) {// 尚未取消订阅,且包含next方法if (!this.isUnsubscribed && this.destination.next) {try {this.destination.next(value);} catch (err) {// 出现异常时,取消订阅释放资源,再抛出异常this.unsubscribe();throw err;}}}error(err) {// 尚未取消订阅,且包含error方法if (!this.isUnsubscribed && this.destination.error) {try {this.destination.error(err);} catch (e2) {// 出现异常时,取消订阅释放资源,再抛出异常this.unsubscribe();throw e2;}this.unsubscribe();}}complete() {// 尚未取消订阅,且包含complete方法if (!this.isUnsubscribed && this.destination.complete) {try {this.destination.complete();} catch (err) {// 出现异常时,取消订阅释放资源,再抛出异常this.unsubscribe();throw err;}this.unsubscribe();}}unsubscribe() { // 用于取消订阅this.isUnsubscribed = true;if (this.unsub) {this.unsub();}}
}

myObservable - 使用 SafeObserver

function myObservable(observer) {const safeObserver = new SafeObserver(observer); // 创建SafeObserver对象const datasource = new DataSource(); // 创建数据源datasource.ondata = (e) => safeObserver.next(e);datasource.onerror = (err) => safeObserver.error(err);datasource.oncomplete = () => safeObserver.complete();safeObserver.unsub = () => { // 为SafeObserver对象添加unsub方法datasource.destroy();};// 绑定this上下文,并返回unsubscribe方法return safeObserver.unsubscribe.bind(safeObserver);
}

使用示例:

const unsub = myObservable({next(x) { console.log(x); },error(err) { console.error(err); },complete() { console.log('done')}
});

具体运行结果,可以查看线上示例。

Operators - 也是函数

Operator 是一个函数,它接收一个 Observable 对象,然后返回一个新的 Observable 对象。当我们订阅新返回的 Observable 对象时,它内部会自动订阅前一个 Observable 对象。接下来我们来实现常用的 map 操作符:

Observable 实现:

class Observable {constructor(_subscribe) {this._subscribe = _subscribe;}subscribe(observer) {const safeObserver = new SafeObserver(observer);safeObserver.unsub = this._subscribe(safeObserver);return safeObserver.unsubscribe.bind(safeObserver);}
}

map 操作符实现:

function map(source, project) {return new Observable((observer) => {const mapObserver = {next: (x) => observer.next(project(x)),error: (err) => observer.error(err),complete: () => observer.complete()};return source.subscribe(mapObserver);});
}

具体运行结果,可以查看线上示例。

改进 Observable - 支持 Operator 链式调用

如果把 Operator 都写成如上那种独立的函数,我们链式代码会逐渐变丑:

map(map(myObservable, (x) => x + 1), (x) => x + 2);

对于上面的代码,想象一下有 5、6 个嵌套着的 Operator,再加上更多、更复杂的参数,基本上就没法儿看了。

你也可以试下 Texas Toland 提议的简单版管道实现,合并压缩一个数组的Operator并生成一个最终的Observable,不过这意味着要写更复杂的 Operator,上代码:JSBin。其实写完后你会发现,代码也不怎么漂亮:

pipe(myObservable, map(x => x + 1), map(x => x + 2));

理想情况下,我们想将代码用更自然的方式链起来:

myObservable.map(x => x + 1).map(x => x + 2);

幸运的是,我们已经有了这样一个 Observable 类,我们可以基于 prototype 在不增加复杂度的情况下支持多 Operators 的链式结构,下面我们采用prototype方式再次实现一下 Observable

Observable.prototype.map = function (project) {return new Observable((observer) => {const mapObserver = {next: (x) => observer.next(project(x)),error: (err) => observer.error(err),complete: () => observer.complete()};return this.subscribe(mapObserver);});
};

现在我们终于有了一个还不错的实现。这样实现还有其他好处,例如:可以写子类继承 Observable 类,然后在子类中重写某些内容以优化程序。

接下来我们来总结一下该部分的内容:Observable 就是函数,它接受 Observer 作为参数,又返回一个函数。如果你也写了一个函数,接收一个 Observer 作为参数,又返回一个函数,那么,它是异步的、还是同步的 ?其实都不是,它就只是一个函数。任何函数的行为都依赖于它的具体实现,所以当你处理一个 Observable 时,就把它当成一个普通函数,里面没有什么黑魔法。当你要构建 Operator 链时,你需要做的其实就是生成一个函数将一堆 Observers 链接在一起,然后让真正的数据依次穿过它们。

Rx.Observable.create

var observable = Rx.Observable.create(function(observer) {observer.next('Semlinker'); // RxJS 4.x 以前的版本用 onNextobserver.next('Lolo');});// 订阅这个 Observable
observable.subscribe(function(value) {console.log(value);
});

以上代码运行后,控制台会依次输出 'Semlinker' 和 'Lolo' 两个字符串。

需要注意的是,很多人认为 RxJS 中的所有操作都是异步的,但其实这个观念是错的。RxJS 的核心特性是它的异步处理能力,但它也是可以用来处理同步的行为。具体示例如下:

var observable = Rx.Observable.create(function(observer) {observer.next('Semlinker'); // RxJS 4.x 以前的版本用 onNextobserver.next('Lolo');});console.log('start');
observable.subscribe(function(value) {console.log(value);
});
console.log('end');

以上代码运行后,控制台的输出结果:

start
Semlinker
Lolo
end

当然我们也可以用它处理异步行为:

var observable = Rx.Observable.create(function(observer) {observer.next('Semlinker'); // RxJS 4.x 以前的版本用 onNextobserver.next('Lolo');setTimeout(() => {observer.next('RxJS Observable');}, 300);})console.log('start');
observable.subscribe(function(value) {console.log(value);
});
console.log('end');

以上代码运行后,控制台的输出结果:

start
Semlinker
Lolo
end
RxJS Observable

从以上例子中,我们可以得出一个结论 - Observable 可以应用于同步和异步的场合。

Observable - Creation Operator

RxJS 中提供了很多操作符,用于创建 Observable 对象,常用的操作符如下:

  • create
  • of
  • from
  • fromEvent
  • fromPromise
  • empty
  • never
  • throw
  • interval
  • timer

上面的例子中,我们已经使用过了 create 操作符,接下来我们来看一下其它的操作符:

of

var source = Rx.Observable.of('Semlinker', 'Lolo');source.subscribe({next: function(value) {console.log(value);},complete: function() {console.log('complete!');},error: function(error) {console.log(error);}
});

以上代码运行后,控制台的输出结果:

Semlinker
Lolo
complete!

from

var arr = [1, 2, 3];
var source = Rx.Observable.from(arr); // 也支持字符串,如 "Angular 2 修仙之路"source.subscribe({next: function(value) {console.log(value);},complete: function() {console.log('complete!');},error: function(error) {console.log(error);}
});

以上代码运行后,控制台的输出结果:

1
2
3
complete!

fromEvent

Rx.Observable.fromEvent(document.querySelector('button'), 'click');

fromPromise

var source = Rx.Observable.fromPromise(new Promise((resolve, reject) => {setTimeout(() => {resolve('Hello RxJS!');},3000)
}));source.subscribe({next: function(value) {console.log(value);},complete: function() {console.log('complete!');},error: function(error) {console.log(error);}
});

以上代码运行后,控制台的输出结果:

Hello RxJS!
complete!

empty

var source = Rx.Observable.empty();source.subscribe({next: function(value) {console.log(value);},complete: function() {console.log('complete!');},error: function(error) {console.log(error);}
});

以上代码运行后,控制台的输出结果:

complete!

empty 操作符返回一个空的 Observable 对象,如果我们订阅该对象,它会立即返回 complete 信息。

never

var source = Rx.Observable.never();source.subscribe({next: function(value) {console.log(value);},complete: function() {console.log('complete!');},error: function(error) {console.log(error);}
});

never 操作符会返回一个无穷的 Observable,当我们订阅它后,什么事情都不会发生,它是一个一直存在却什么都不做的 Observable 对象。

throw

var source = Rx.Observable.throw('Oop!');source.subscribe({next: function(value) {console.log(value);},complete: function() {console.log('complete!');},error: function(error) {console.log('Throw Error: ' + error);}
});

以上代码运行后,控制台的输出结果:

Throw Error: Oop!

throw 操作如,只做一件事就是抛出异常。

interval

var source = Rx.Observable.interval(1000);source.subscribe({next: function(value) {console.log(value);},complete: function() {console.log('complete!');},error: function(error) {console.log('Throw Error: ' + error);}
});

以上代码运行后,控制台的输出结果:

0
1
2
...

interval 操作符支持一个数值类型的参数,用于表示定时的间隔。上面代码表示每隔 1s,会输出一个递增的值,初始值从 0 开始。

timer

var source = Rx.Observable.timer(1000, 5000);source.subscribe({next: function(value) {console.log(value);},complete: function() {console.log('complete!');},error: function(error) {console.log('Throw Error: ' + error);}
});

以上代码运行后,控制台的输出结果:

0 # 1s后
1 # 5s后
2 # 5s后
...

timer 操作符支持两个参数,第一个参数用于设定发送第一个值需等待的时间,第二个参数表示第一次发送后,发送其它值的间隔时间。此外,timer 操作符也可以只传递一个参数,具体如下:

var source = Rx.Observable.timer(1000);source.subscribe({next: function(value) {console.log(value);},complete: function() {console.log('complete!');},error: function(error) {console.log('Throw Error: ' + error);}
});

以上代码运行后,控制台的输出结果:

0
complete!

Subscription

有些时候对于一些 Observable 对象 (如通过 interval、timer 操作符创建的对象),当我们不需要的时候,要释放相关的资源,以避免资源浪费。针对这种情况,我们可以调用 Subscription 对象的 unsubscribe 方法来释放资源。具体示例如下:

var source = Rx.Observable.timer(1000, 1000);// 取得subscription对象
var subscription = source.subscribe({next: function(value) {console.log(value);},complete: function() {console.log('complete!');},error: function(error) {console.log('Throw Error: ' + error);}
});setTimeout(() => {subscription.unsubscribe();
}, 5000);

RxJS - Observer

Observer (观察者) 是一个包含三个方法的对象,每当 Observable 触发事件时,便会自动调用观察者的对应方法。

Observer 接口定义:

interface Observer<T> {closed?: boolean; // 标识是否已经取消对Observable对象的订阅next: (value: T) => void;error: (err: any) => void;complete: () => void;
}

Observer 中的三个方法的作用:

  • next - 每当 Observable 发送新值的时候,next 方法会被调用
  • error - 当 Observable 内发生错误时,error 方法就会被调用
  • complete - 当 Observable 数据终止后,complete 方法会被调用。在调用 complete 方法之后,next 方法就不会再次被调用

接下来我们来看个具体示例:

var observable = Rx.Observable.create(function(observer) {observer.next('Semlinker');observer.next('Lolo');observer.complete();observer.next('not work');});// 创建一个观察者
var observer = {next: function(value) {console.log(value);},error: function(error) {console.log(error);},complete: function() {console.log('complete');}
}// 订阅已创建的observable对象
observable.subscribe(observer);

以上代码运行后,控制台的输出结果:

Semlinker
Lolo
complete

上面的例子中,我们可以看出,complete 方法执行后,next 就会失效,所以不会输出 not work

另外观察者可以不用同时包含 next、complete、error 三种方法,它可以只包含一个 next 方法,具体如下:

var observer = {next: function(value) {console.log(value);}
};

有时候 Observable 可能是一个无限的序列,例如 click 事件,对于这种场景,complete 方法就永远不会被调用。

我们也可以在调用 Observable 对象的 subscribe 方法时,依次传入 next、error、complete 三个函数,来创建观察者:

observable.subscribe(value => { console.log(value); },error => { console.log('Error: ', error); },() => { console.log('complete'); }
);

Pull vs Push

Pull 和 Push 是数据生产者和数据的消费者两种不同的交流方式。

什么是Pull?

在 "拉" 体系中,数据的消费者决定何时从数据生产者那里获取数据,而生产者自身并不会意识到什么时候数据将会被发送给消费者。

每一个 JavaScript 函数都是一个 "拉" 体系,函数是数据的生产者,调用函数的代码通过 ''拉出" 一个单一的返回值来消费该数据。

const add = (a, b) => a + b;
let sum = add(3, 4);

ES6介绍了 iterator迭代器 和 Generator生成器 — 另一种 "拉" 体系,调用 iterator.next() 的代码是消费者,可从中拉取多个值

什么是Push?

在 "推" 体系中,数据的生产者决定何时发送数据给消费者,消费者不会在接收数据之前意识到它将要接收这个数据。

Promise(承诺) 是当今 JS 中最常见的 "推" 体系,一个Promise (数据的生产者)发送一个 resolved value (成功状态的值)来执行一个回调(数据消费者),但是不同于函数的地方的是:Promise 决定着何时数据才被推送至这个回调函数。

RxJS 引入了 Observables (可观察对象),一个全新的 "推" 体系。一个可观察对象是一个产生多值的生产者,当产生新数据的时候,会主动 "推送给" Observer (观察者)。

生产者 消费者
pull拉 被请求的时候产生数据 决定何时请求数据
push推 按自己的节奏生产数据 对接收的数据进行处理

接下来我们来看张图,从而加深对上面概念的理解:

Observable vs Promise

Observable(可观察对象)是基于推送(Push)运行时执行(lazy)的多值集合。

MagicQ 单值 多值
拉取(Pull) 函数 遍历器
推送(Push) Promise Observable
  • Promise

    • 返回单个值
    • 不可取消的
  • Observable

    • 随着时间的推移发出多个值
    • 可以取消的
    • 支持 map、filter、reduce 等操作符
    • 延迟执行,当订阅的时候才会开始执行

延迟计算 & 渐进式取值

延迟计算

所有的 Observable 对象一定会等到订阅后,才开始执行,如果没有订阅就不会执行。

var source = Rx.Observable.from([1,2,3,4,5]);
var example = source.map(x => x + 1);

上面的示例中,因为 example 对象还未被订阅,所以不会进行运算。这跟数组不一样,具体如下:

var source = [1,2,3,4,5];
var example = source.map(x => x + 1); 

以上代码运行后,example 中就包含已运算后的值。

渐进式取值

数组中的操作符如:filter、map 每次都会完整执行并返回一个新的数组,才会继续下一步运算。具体示例如下:

var source = [1,2,3,4,5];
var example = source.filter(x => x % 2 === 0) // [2, 4].map(x => x + 1) // [3, 5]

关于数组中的 map、filter 的详细信息,可以参考 - RxJS Functional Programming

为了更好地理解数组操作符的运算过程,我们可以参考下图:


查看原图

虽然 Observable 运算符每次都会返回一个新的 Observable 对象,但每个元素都是渐进式获取的,且每个元素都会经过操作符链的运算后才输出,而不会像数组那样,每个阶段都得完整运算。具体示例如下:

var source = Rx.Observable.from([1,2,3,4,5]);
var example = source.filter(x => x % 2 === 0).map(x => x + 1)example.subscribe(console.log);

以上代码的执行过程如下:

  • source 发出 1,执行 filter 过滤操作,返回 false,该值被过滤掉
  • source 发出 2,执行 filter 过滤操作,返回 true,该值被保留,接着执行 map 操作,值被处理成 3,最后通过 console.log 输出
  • source 发出 3,执行 filter 过滤操作,返回 false,该值被过滤掉
  • source 发出 4,执行 filter 过滤操作,返回 true,该值被保留,接着执行 map 操作,值被处理成 5,最后通过 console.log 输出
  • source 发出 5,执行 filter 过滤操作,返回 false,该值被过滤掉

为了更好地理解 Observable 操作符的运算过程,我们可以参考下图:


查看原图

学习资源

  • RxJS 中文文档
  • RxMarbles
  • RxJS Diagrams
  • rxjs-training
  • RxJS-Playground

参考资源

  • 观察者模式
  • MDN - 迭代器和生成器
  • 构建流式应用—RxJS详解
  • 让我们一起来学习RxJS
  • Learning Observable By Building Observable
  • 30天精通RxJS - 什么是Observable

Observable详解相关推荐

  1. RxJS 系列之二 - Observable 详解

    查看新版教程,请访问前端修仙之路 RxJS 系列目录 RxJS 系列之一 - Functional Programming 简介 RxJS 系列之二 - Observable 详解 (本文) RxJS ...

  2. java rx.observable_Rxjava2 Observable的条件操作符详解及实例

    简要: 需求了解: 在使用 Rxjava 开发中,经常有一些各种条件的操作 ,如比较两个 Observable 谁先发射了数据.跳过指定条件的 Observable 等一系列的条件操作需求,那么很幸运 ...

  3. java observable 使用_Rxjava2 Observable的辅助操作详解及实例(一)

    简要: 需求了解: Rxjava中有一些方便的辅助操作符,来更方便我们的函数式的编程.比如延迟.定时.指定操作的监听.数据类型转换等一系列的操作. 下面列出了一些用于Observable的辅助操作符: ...

  4. java rx.observable_Rxjava2 Observable的错误处理操作详解及实例

    简要: 需求了解: Rxjava 中当数据处理派发中发生了异常 ,观察者会接受到一个 Error 的通知,那如果不想发射这个异常的通知,自己处理掉呢?答案当然是可以的,在 Rxjava 中很多操作符可 ...

  5. java rx.observable_Rxjava2 Observable的数据变换详解及实例(二)

    1. Window 定期将来自原始Observable的数据分解为一个Observable窗口,发射这些窗口,而不是每次发射一项数据. Window 和 Buffer 类似,但不是发射来自原始Obse ...

  6. java rx.observable_Rxjava2 Observable的数据变换详解及实例(一)

    简要: 需求了解: 对于 Observable 发射的数据有的时候可能不满足我们的要求,或者需要转化为其他类型的数据,比如:缓存,数据类型转化,数据拦截等.此时可以使用 Rx 中的一些对于数据操作的操 ...

  7. RxJava操作符在android中的使用场景详解(一)

    转载请注明出处:http://www.wangxinarhat.com/2016/04/19/2016-04-19-rxjava-android-operate1/ 最近学习了RxJava在andro ...

  8. observeOn()与subscribeOn()的详解

    Rxjava 提供了subscribeOn()方法来用于每个observable对象的操作符在哪个线程上运行 Rxjava 提供了ObserveOn()方法来用于每个Subscriber(Observ ...

  9. RxJava flatMap操作符用法详解

    RxJava系列文章目录导读: 一.RxJava create操作符的用法和源码分析 二.RxJava map操作符用法详解 三.RxJava flatMap操作符用法详解 四.RxJava conc ...

最新文章

  1. 实例代码分享Python实现Linux监控
  2. React Axios 请求解决跨域问题
  3. android实现华为手机拍照上传_继续引领手机拍照 华为将带来液态镜头
  4. ruby环境sass编译中文出现Syntax error: Invalid GBK character错误解决方法
  5. 关于我的CSDN博客的一些要说的话
  6. Android退出程序(三)——Android事件总线
  7. 《C#高级编程》笔记系列第一弹-开篇
  8. C#基础 字符串读取/写入文本文件 代码示例
  9. python 邮件报警
  10. 1982:【19CSPJ普及组】数字游戏 scratch C++
  11. 常用的Regex验证方法
  12. webform CustomValidator
  13. atitit.导出excel的设计----查询结果 导出为excel的实现java .net php 总结
  14. python批量查询高德地图经纬度(支持xlxs)
  15. Win10开启卓越性能模式,比高性能更强
  16. 苹果mac怎么连接打印机? mac系统添加共享打印机的技巧
  17. c语言ab43错误的是,求助,AB+没法玩下去了,详情请看报错代码
  18. 世界杯2022赛程表
  19. 爱奇艺iOS稳定性测试实践
  20. 计算机网络---计算机网络体系结构与参考模型

热门文章

  1. 会“举一反三”的图像质量评价模型
  2. 【sdx12】QCA6174 WiFi 5G信道auto时屏蔽DFS信道,手动设置信道检测到雷达信号之后,跳转到指定信道方法
  3. 基于51单片机十字路口红绿灯管理系统
  4. sql server 2008 (3)
  5. 苹果充值常见的刷单手段和防范方法
  6. 中学教学参考杂志中学教学参考杂志社中学教学参考编辑部2022年第9期目录
  7. 工业级路由器和家用路由器的区别_企业级、工业级与家用级路由器区别
  8. 借助BetterZip自制压缩包测试小程序
  9. 华为荣耀20和x10比较_华为畅享20pro和荣耀x10哪个好?华为畅享20pro对比荣耀x10评测...
  10. NFC配置文件保存路径