RxJS 是 Javascript 世界中实现响应式编程工具库,它使得在 JS 中处理复杂的数据流变得简单和易于维护。
响应式编程 或反应式编程 (英语:Reactive programming)是一种面向数据流 和变化传播的声明式 编程范式 。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。 - wikipedia
RxJS 例子 🌰 发布订阅 Observable & subscribe RxJS 最核心的能力就是 软件设计中的 发布/订阅模式 . 它通过 Observable
和subscribe
实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import { Observable } from 'rxjs' ;const now = new Date ().getTime();const stream$ = new Observable((subscriber ) => { setTimeout (() => { subscriber.next([1 , 2 , 3 ]); }, 500 ); setTimeout (() => { subscriber.next({ a : 1000 }); }, 1000 ); setTimeout (() => { subscriber.next('end' ); }, 3000 ); setTimeout (() => { subscriber.complete(); }, 4000 ); }); const subscription1 = stream$.subscribe({ complete : () => console .log('done' ), next : (v ) => console .log(new Date ().getTime() - now, 'ms stream1' , v), error : () => console .log('error' ), }); setTimeout (() => { const subscription2 = stream$.subscribe({ next : (v ) => console .log(new Date ().getTime() - now, 'ms stream2' , v), }); }, 1000 );
结果:
1 2 3 4 5 6 7 502 ms stream1 [ 1, 2, 3 ] 1008 ms stream1 { a: 1000 } 1512 ms stream2 [ 1, 2, 3 ] 2013 ms stream2 { a: 1000 } 3006 ms stream1 end done 4010 ms stream2 end
数据流的处理 operators RxJS 提供各种操作符(operators)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 <!DOCTYPE html > <html lang ="en" > <head > <meta charset ="UTF-8" /> <meta http-equiv ="X-UA-Compatible" content ="IE=edge" /> <meta name ="viewport" content ="width=device-width, initial-scale=1.0" /> <title > RxJS</title > </head > <body > <button id ="btn" > 按 钮</button > <script src ="https://cdn.bootcdn.net/ajax/libs/rxjs/6.6.3/rxjs.umd.js" > </script > <script > let { fromEvent } = window .rxjs; let { throttleTime, scan } = window .rxjs.operators; let btn = document .getElementById('btn' ); fromEvent(btn, 'click' ) .pipe( throttleTime(1000 ), scan((count ) => count + 1 , 0 ) ) .subscribe((count ) => console .log(`click ${count} times` )); </script > </body > </html >
Subject 监听多路事件和发布 Subject
类似于 EventEmitter
可以将监听多个事件和广播事件触发。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import { Subject } from 'rxjs' ;const subject = new Subject();subject.subscribe((v ) => console .log('stream 1' , v)); subject.subscribe((v ) => console .log('stream 2' , v)); setTimeout (() => { subject.subscribe((v ) => console .log('stream 3' , v)); }, 1000 ); subject.next(1 ); subject.next(2 ); setTimeout (() => { subject.next(3 ); }, 3000 );
RxJS 核心概念 RxJS 结合了 观察者模式、迭代器模式、使用集合的函数编程,以满足一种理想方式来挂历事件序列所需要的一切。
RxJS 解决异步事件管理的基本概念:
Observable (可观察对象): 表示一个概念,这个概念是一个可调用的未来值或事件的集合。
Observer (观察者): 一个回调函数的集合,它知道如何去监听由 Observable 提供的值。
Subscription (订阅): 表示 Observable 的执行,主要用于取消 Observable 的执行。
Operators (操作符): 采用函数式编程风格的纯函数 (pure function),使用像 map
、filter
、concat
、flatMap
等这样的操作符来处理集合。
Subject (主体): 相当于 EventEmitter,并且是将值或事件多路推送给多个 Observer 的唯一方式。
Schedulers (调度器): 用来控制并发并且是中央集权的调度员,允许我们在发生计算时进行协调,例如 setTimeout
或 requestAnimationFrame
或其他。
1 2 3 4 5 6 7 8 9 10 11 import { Observable } from 'rxjs' ;const observable = new Observable((subscriber ) => { subscriber.next(1 ); subscriber.next(2 ); subscriber.next(3 ); setTimeout (() => { subscriber.next(4 ); subscriber.complete(); }, 1000 ); });
Observable
可以接受一个 Function
用来发布数据。实例化Observable
可以获得observable
.它可以用来订阅observable
,用来处理Observable
发布的数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import { Observable } from 'rxjs' ;const observable = new Observable((subscriber ) => { console .log('Hello' ); subscriber.next(42 ); subscriber.next(100 ); subscriber.next(200 ); setTimeout (() => { subscriber.next(300 ); }, 1000 ); }); console .log('before' );observable.subscribe((x ) => { console .log(x); }); console .log('after' );
observable.subscribe
的调用可以获取到 Observable
中推动的数据及时它是异步的。
Observable 四个核心概念
创建 Observables - const observable = new Observable(subscriber => {...});
订阅 Observables - observable.subscribe(...)
执行 Observables - new Observable(function subscribe(subscriber) {...})
中 subscriber
支持next\error\complete
三种类型执行方式
清理 Observables - observable.subscribe(...)
执行后获取一个Subscription
,可以用它清理掉Observable
subscription.unsubscribe()
除了用new Observable
可以创建observable
之外,Rxjs 也提供了其它创建observable
的方式:
1 2 3 4 5 6 import { from } from 'rxjs' ;const observable = from ([10 , 20 , 30 ]);const subscription = observable.subscribe((x ) => console .log(x));subscription.unsubscribe();
Observer
是Observable
发送值的消费方。Observer
有三种形式:一个函数,一个{next:Function,error:Function,complete:Function}
的对象,传递三个函数作为参数,参数顺序是:next、error、complete
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import { Observable } from 'rxjs' ;const Stream$ = new Observable((subscribe ) => { setTimeout (() => { subscribe.next([1 , 2 , 3 ]); }); subscribe.next({ a : 100 }); }); const obsever1 = { complete : () => console .log('obsever1 - done' ), next : (v ) => console .log('obsever1' , v), error : () => console .error('obsever1' , 'error' ), }; const Subscription1 = Stream$.subscribe(obsever);const Subscription2 = Stream$.subscribe(function observer2 (v ) { console .log('obsever2' , v); }); const Subscription2 = Stream$.subscribe( function observer3_next (v ) { console .log('obsever2' , v); }, function observer3_error (v ) { console .error('obsever2' , v); }, function observer3_complete (v ) { console .log('obsever3 - done' ); } );
next\error\complete
并不需要全部定义,可以定义一部分。
尽管 RxJS 的根基是 Observable,但最有用的还是它的操作符 。操作符是允许复杂的异步代码以声明式的方式进行轻松组合的基础代码单元。
operators 操作符就是一个函数,它有两种类型:
Pipeable Operators (管道操作符): observable
实例可以使用这样的语法:observableInstance.pipe(operator())
传递一个Observable
并返回一个新的Observable
。
操作符本质上是一个纯函数 (pure function),它接收一个 Observable 作为输入,并生成一个新的 Observable 作为输出。订阅输出 Observable 同样会订阅输入 Observable 。
Creation Operators (创造操作符):可以称为独立函数来创建新的 Observable。
Piping Pipeable Operators 是无害的纯函数,借助pipe()
可以实现,多个管道操作符串行处理。
1 obs.pipe(op1(), op2(), op3(), op3());
你可以通过op()(obs)
使用管道运算符,但并不推荐这样使用,因为可读性不好:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import { of } from 'rxjs' ;import { map } from 'rxjs/operators' ;map((x: number ) => x * x)(of (1 , 2 , 3 )).subscribe((v ) => console .log(`value1: ${v} ` ) ); of (1 , 2 , 3 ) .pipe(map((v ) => v + v)) .subscribe((v ) => console .log(`value2: ${v} ` ));
Creation Operators 什么是创作运算符?与管道运算符不同,创建运算符是可用于创建Observable
对象的函数.具有一些常见的预定义行为或通过加入其他 Observable.
创建运算符的典型示例是间隔函数。它以数字(不是 Observable)作为输入参数,并产生 Observable 作为输出:
1 2 3 import { interval } from 'rxjs' ;const observable = interval(1000 );
操作符分类 操作符有着不同的用途,它们可作如下分类:创建、转换、过滤、组合、错误处理、工具,等等。在下面的列表中,你可以按分类组织好的所有操作符。
创建操作符
连接创建操作符 这些是 Observable 创建运算符,还具有联接功能-发出多个源 Observable 的值。
转换操作符
过滤操作符
连接操作符
多广播操作符
错误捕获操作符
工具操作符
条件和布尔操作符
数学和聚合操作符
例子 🌰 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 <!DOCTYPE html > <html lang ="en" > <head > <meta charset ="UTF-8" /> <meta http-equiv ="X-UA-Compatible" content ="IE=edge" /> <meta name ="viewport" content ="width=device-width, initial-scale=1.0" /> <title > RxJS</title > </head > <body > <div > click count: <span id ="count" > </span > </div > <div > <button id ="button-plus" > +1</button > <button id ="button-minus" > -1</button > </div > <script src ="https://cdn.bootcdn.net/ajax/libs/rxjs/6.6.3/rxjs.umd.js" > </script > <script > let { fromEvent, operators } = window .rxjs; let { mapTo, scan, merge } = window .rxjs.operators; console .log('operators:' , operators); console .log('window.rxjs:' , window .rxjs); fromEvent(document .getElementById('button-plus' ), 'click' ) .pipe( mapTo(1 ), merge( fromEvent(document .getElementById('button-minus' ), 'click' ).pipe( mapTo(-1 ) ) ), scan((total, now ) => total + now) ) .subscribe((value ) => { document .getElementById('count' ).innerText = value; }); </script > </body > </html >
Subscription
- 是代表一次性资源的对象,通常是Observable
的执行。Subscription
拥有一种重要的方法unsubscribe()
,即取消订阅,该方法不带任何参数,而只是处置该订阅所拥有的资源。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import { interval } from 'rxjs' ;const observable = interval(1000 );const subscription = observable.subscribe((x ) => console .log(x));setTimeout (() => { subscription.unsubscribe(); }, 5000 );
Subscription 还可以合在一起,这样一个 Subscription 调用 unsubscribe()
方法,可能会有多个 Subscription 取消订阅 。你可以通过把一个 Subscription 添加到另一个上面来做这件事:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import { interval } from 'rxjs' ;const observable1 = interval(200 );const observable2 = interval(300 );const subscription = observable1.subscribe((v ) => console .log(`observable1 ${v} ` ) ); const childSubscription = observable2.subscribe((v ) => console .log(`observable2 ${v} ` ) ); subscription.add(childSubscription); setTimeout (() => { subscription.unsubscribe(); }, 1500 );
Subject
是一种特殊类型的Observable
,它允许将值多播给多个观察者,所以 Subject
是多播的,而普通的 Observables
是单播的(每个已订阅的观察者都拥有 Observable
的独立执行)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import { Subject } from 'rxjs' ;const subject = new Subject();subject.subscribe((v ) => console .log('stream 1' , v)); subject.subscribe((v ) => console .log('stream 2' , v)); setTimeout (() => { subject.subscribe((v ) => console .log('stream 3' , v)); }, 1000 ); subject.next(1 ); subject.next(2 ); setTimeout (() => { subject.next(3 ); }, 3000 );
单播: 每个普通的 Observables 实例都只能被一个观察者订阅,当它被其他观察者订阅的时候会产生一个新的实例。也就是普通 Observables 被不同的观察者订阅的时候,会有多个实例,不管观察者是从何时开始订阅,每个实例都是从头开始把值发给对应的观察者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import { Observable } from 'rxjs' ;const observable = new Observable((observer ) => { observer.next(Math .random()); setTimeout (() => observer.next(Math .random()), 300 ); }); observable.subscribe((val ) => console .log(`订阅1: ${val} ` )); observable.subscribe((val ) => console .log(`订阅2: ${val} ` )); setTimeout ( () => observable.subscribe((val ) => console .log(`订阅3:${val} ` )), 500 );
多播: Subject
是多播,则多个订阅都是一个实例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import { Subject } from 'rxjs' ;const subject = new Subject();subject.subscribe((val ) => console .log(`订阅1: ${val} ` )); subject.subscribe((val ) => console .log(`订阅2: ${val} ` )); setTimeout (() => { subject.subscribe((val ) => console .log(`订阅3: ${val} ` )); }, 200 ); subject.next(Math .random()); setTimeout (() => { subject.next(Math .random()); }, 300 );
每个 Subject 都是 Observable 。 - 对于 Subject,你可以提供一个观察者并使用 subscribe
方法,就可以开始正常接收值。从观察者的角度而言,它无法判断 Observable 执行是来自普通的 Observable 还是 Subject 。
在 Subject 的内部,subscribe
不会调用发送值的新执行。它只是将给定的观察者注册到观察者列表中,类似于其他库或语言中的 addListener
的工作方式。
每个 Subject 都是 Observer。 - Subject 是一个有如下方法的对象: next(v)
、error(e)
和 complete()
。要给 Subject 提供新值,只要调用 next(theValue)
,它会将值多播给已注册监听该 Subject 的观察者们。
1 2 3 4 5 6 7 8 9 10 11 var subject = new Rx.Subject();subject.subscribe({ next : (v ) => console .log('observerA: ' + v), }); subject.subscribe({ next : (v ) => console .log('observerB: ' + v), }); subject.next(1 ); subject.next(2 );
因为 Subject 是观察者,这也就在意味着你可以把 Subject 作为参数传给任何 Observable 的 subscribe
方法,如下面的示例所展示的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import { from , Subject } from 'rxjs' ;const Subject$ = new Subject();Subject$.subscribe({ next : (v ) => console .log('observerA: ' + v) }); Subject$.subscribe({ next : (v ) => console .log('observerB: ' + v) }); const observable = from ([1 , 2 , 3 , 4 ]);observable.subscribe(Subject$);
还有一些特殊类型的 Subject
:BehaviorSubject
、ReplaySubject
和 AsyncSubject
。
BehaviorSubject Subject 的其中一个变体就是 BehaviorSubject
,它有一个“当前值”的概念。它保存了发送给消费者的最新值。并且当有新的观察者订阅时,会立即从 BehaviorSubject
那接收到“当前值”。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import { BehaviorSubject } from 'rxjs' ;const subject = new BehaviorSubject(0 ); subject.subscribe({ next : (v ) => console .log(`observerA: ${v} ` ), }); subject.next(1 ); subject.next(2 ); subject.subscribe({ next : (v ) => console .log(`observerB: ${v} ` ), }); subject.next(3 );
ReplaySubject ReplaySubject
类似于 BehaviorSubject
,它可以发送旧值给新的订阅者,但它还可以记录 Observable 执行的一部分。
ReplaySubject
记录 Observable 执行中的多个值并将其回放给新的订阅者。
当创建 ReplaySubject
时,你可以指定回放多少个值:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import { ReplaySubject } from 'rxjs' ;const subject = new ReplaySubject(3 ); subject.subscribe({ next : (v ) => console .log(`observerA: ${v} ` ), }); subject.next(1 ); subject.next(2 ); subject.next(3 ); subject.next(4 ); subject.subscribe({ next : (v ) => console .log(`observerB: ${v} ` ), }); subject.next(5 );
除了缓冲数量,你还可以指定 window time (以毫秒为单位)来确定多久之前的值可以记录。在下面的示例中,我们使用了较大的缓存数量100
,但 window time 参数只设置了500
毫秒。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import { ReplaySubject } from 'rxjs' ;const subject = new ReplaySubject(100 , 500 );subject.subscribe({ next : (v ) => console .log(`observerA: ${v} ` ), }); let i = 1 ;setInterval (() => subject.next(i++), 200 );setTimeout (() => { subject.subscribe({ next : (v ) => console .log(`observerB: ${v} ` ), }); }, 1000 );
AsyncSubject AsyncSubject 是另一个 Subject 变体,只有当 Observable 执行完成时(执行 complete()
),它才会将执行的最后一个值发送给观察者。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import { AsyncSubject } from 'rxjs' ;const subject = new AsyncSubject();subject.subscribe({ next : (v ) => console .log(`observerA: ${v} ` ), }); subject.next(1 ); subject.next(2 ); subject.next(3 ); subject.next(4 ); subject.subscribe({ next : (v ) => console .log(`observerB: ${v} ` ), }); subject.next(5 ); subject.complete();
AsyncSubject 和 last()
操作符类似,因为它也是等待 complete
通知,以发送一个单个值。
Scheduler
控制着何时启动subscription
和何时发送通知。它由三部分组成:
调度器是一种数据结构 。它知道如何根据优先级或其他标准来存储任务和将任务进行排序。
调度器是执行上下文 。 它表示在何时何地执行任务(举例来说,立即的,或另一种回调函数机制(比如 setTimeout 或 process.nextTick),或动画帧)。
调度器有一个(虚拟的)时钟。 调度器功能通过它的 getter 方法 now() 提供了”时间”的概念。在具体调度器上安排的任务将严格遵循该时钟所表示的时间。
调度器可以让你规定 Observable 在什么样的执行上下文中发送通知给它的观察者。
在之前的例子中我们是用Obervable
同步的输出值。下面通过observeOn
指的async
调度器去传递这些值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import { Observable, asyncScheduler } from 'rxjs' ;import { observeOn } from 'rxjs/operators' ;const observable = new Observable((observer ) => { observer.next(1 ); observer.next(2 ); observer.next(3 ); observer.complete(); }).pipe( observeOn(asyncScheduler) ); console .log('just before subscribe' );observable.subscribe({ next (x ) { console .log('got value ' + x); }, error (err ) { console .error('something wrong occurred: ' + err); }, complete ( ) { console .log('done' ); }, }); console .log('just after subscribe' );
Schedulers(调度器) 类型 async
调度器是 RxJS 提供的内部调度器之一。
调度器
目的
null
不传递任何调度器的话,会以同步递归的方式发送通知。用于定时操作或尾递归操作。
queueScheduler
当前事件帧中的队列调度(蹦床调度器)。用于迭代操作。
asapScheduler
Schedules 是在微任务队列上, 它就像运行在 promises 队列上.在当前执行任务之后再下一个任务之前执行。它就像一个异步执行。
asyncScheduler
Schedules 与 setInterval
一起使用.基础的调度器
animationFrameScheduler
Schedules 会在浏览器下一次渲染之前执行. 它可以用来绘制流程的动画。
使用 Schedulers(调度器) 你可能在你的 RxJS 代码中已经使用过调度器了,只是没有明确地指明要使用的调度器的类型。这是因为所有的 Observable 操作符处理并发性都有可选的调度器。如果没有提供调度器的话,RxJS 会通过使用最小并发原则选择一个默认调度器。这意味着引入满足操作符需要的最小并发量的调度器会被选择。例如,对于返回有限和少量消息的 observable 的操作符,RxJS 不使用调度器,即 null
或 undefined
。对于返回潜在大量的或无限数量的消息的操作符,使用 queueScheduler
调度器。对于使用定时器的操作符,使用 asyncScheduler
调度器。
静态创建操作符通常可以接收调度器作为参数。 举例来说,from(array, scheduler)
可以让你指定调度器,当发送从 array
转换的每个通知的时候使用。调度器通常作为操作符的最后一个参数。静态创建操作符接收调度器参数:
bindCallback
bindNodeCallback
combineLatest
concat
empty
from
fromPromise
interval
merge
of
range
throw
timer
使用 subscribeOn
来调度 subscribe()
调用在什么样的上下文中执行。 默认情况下,Observable 的 subscribe()
调用会立即同步地执行。然而,你可能会延迟或安排在给定的调度器上执行实际的 subscription ,使用实例操作符 subscribeOn(scheduler)
,其中 scheduler
是你提供的参数。
使用 observeOn
来调度发送通知的的上下文。 正如我们在上面的示例中所看到的,实例操作符 observeOn(scheduler)
在源 Observable 和目标观察者之间引入了一个中介观察者,中介负责调度,它使用给定的 scheduler
来调用目标观察者。
bufferTime
、debounceTime
、delay
、auditTime
、sampleTime
、throttleTime
、timeInterval
、timeout
、timeoutWith
、windowTime
这样时间相关的操作符全部接收调度器作为最后的参数,并且默认的操作是在 asyncScheduler
调度器上。
其他接收调度器作为参数的实例操作符:cache
、combineLatest
、concat
、expand
、merge
、publishReplay
、startWith
。
注意,cache
和 publishReplay
都接收调度器是因为它们使用了 ReplaySubject 。ReplaySubjects 的构造函数接收一个可选的调度器作为最后的参数,因为 ReplaySubject 可能会处理时间,这只在调度器的上下文中才有意义。默认情况下,ReplaySubject 使用 queue
调度器来提供时钟。
RxJS 业务上的实践 页面元素拖拽 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 <!DOCTYPE html > <html lang ="en" > <head > <meta charset ="UTF-8" /> <meta http-equiv ="X-UA-Compatible" content ="IE=edge" /> <meta name ="viewport" content ="width=device-width, initial-scale=1.0" /> <title > RxJS 元素拖拽</title > <style > html , body { height : 100% ; background-color : tomato; position : relative; } #drag { position : absolute; display : inline-block; width : 100px ; height : 100px ; background-color : #fff ; cursor : all-scroll; } </style > </head > <body > <div id ="drag" > </div > <script src ="https://cdn.bootcdn.net/ajax/libs/rxjs/6.6.3/rxjs.umd.js" > </script > <script > const { fromEvent, operators } = window .rxjs; const { flatMap, map, takeUntil } = operators; let dragDOM = document .getElementById('drag' ); let body = document .body; const mouseDown = fromEvent(dragDOM, 'mousedown' ); const mouseUp = fromEvent(body, 'mouseup' ); const mouseMove = fromEvent(body, 'mousemove' ); mouseDown .pipe( flatMap((event ) => mouseMove.pipe(takeUntil(mouseUp))), map((event ) => ({ x : event.clientX, y : event.clientY })) ) .subscribe((pos ) => { dragDOM.style.left = `${pos.x} px` ; dragDOM.style.top = `${pos.y} px` ; }); </script > </body > </html >
Refer To https://rxjs.dev/guide/overview
https://zhuanlan.zhihu.com/p/274469124
https://zhuanlan.zhihu.com/p/270609123
https://cisy.me/rxjs/
https://zhuanlan.zhihu.com/p/34357403
https://www.cnblogs.com/star91/p/rxjs-ru-men.html
https://zhuanlan.zhihu.com/p/146795979
https://zhuanlan.zhihu.com/p/34357403