RxJS 是 Javascript 世界中实现响应式编程工具库,它使得在 JS 中处理复杂的数据流变得简单和易于维护。

响应式编程反应式编程(英语:Reactive programming)是一种面向数据和变化传播的声明式编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。 - wikipedia

RxJS 例子 🌰

发布订阅 Observable & subscribe

RxJS 最核心的能力就是 软件设计中的 发布/订阅模式. 它通过 Observablesubscribe实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import { Observable } from 'rxjs';
// 记录时间
const now = new Date().getTime();

// 创建流
const stream$ = new Observable((subscriber) => {
setTimeout(() => {
subscriber.next([1, 2, 3]); // 发布
}, 500);
setTimeout(() => {
subscriber.next({ a: 1000 }); // 发布
}, 1000);
setTimeout(() => {
subscriber.next('end'); // 发布
}, 3000);
setTimeout(() => {
subscriber.complete(); // 完成
}, 4000);
});

// 启动流。 订阅
const subscription1 = stream$.subscribe({
complete: () => console.log('done'),
next: (v) => console.log(new Date().getTime() - now, 'ms stream1', v),
error: () => console.log('error'),
});

// 延时1s后,启动流
setTimeout(() => {
const subscription2 = stream$.subscribe({
next: (v) => console.log(new Date().getTime() - now, 'ms stream2', v),
});
}, 1000);

结果:

1
2
3
4
5
6
7
502 ms stream1 [ 1, 2, 3 ]
1008 ms stream1 { a: 1000 }
1512 ms stream2 [ 1, 2, 3 ]
2013 ms stream2 { a: 1000 }
3006 ms stream1 end
done
4010 ms stream2 end

数据流的处理 operators

RxJS 提供各种操作符(operators)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta http-equiv="X-UA-Compatible" content="IE=edge" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>RxJS</title>
</head>
<body>
<button id="btn">按 钮</button>
<script src="https://cdn.bootcdn.net/ajax/libs/rxjs/6.6.3/rxjs.umd.js"></script>
<script>
let { fromEvent } = window.rxjs;
let { throttleTime, scan } = window.rxjs.operators;
let btn = document.getElementById('btn');
// fromEvent 创建一个 Observable
fromEvent(btn, 'click')
.pipe(
throttleTime(1000), // 防抖1秒钟
scan((count) => count + 1, 0) // 累积计算
)
.subscribe((count) => console.log(`click ${count} times`)); //订阅并且将处理后的数据传递给 subscribe
</script>
</body>
</html>

Subject 监听多路事件和发布

Subject类似于 EventEmitter可以将监听多个事件和广播事件触发。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import { Subject } from 'rxjs';

// 创建subject
const subject = new Subject();

// 订阅一个observer
subject.subscribe((v) => console.log('stream 1', v));
// 再订阅一个observer
subject.subscribe((v) => console.log('stream 2', v));
// 延时1s再订阅一个observer
setTimeout(() => {
subject.subscribe((v) => console.log('stream 3', v));
}, 1000);
// 产生数据1
subject.next(1);
// 产生数据2
subject.next(2);
// 延时3s产生数据3
setTimeout(() => {
subject.next(3);
}, 3000);

RxJS 核心概念

RxJS 结合了 观察者模式、迭代器模式、使用集合的函数编程,以满足一种理想方式来挂历事件序列所需要的一切。

RxJS 解决异步事件管理的基本概念:

  • Observable (可观察对象): 表示一个概念,这个概念是一个可调用的未来值或事件的集合。
  • Observer (观察者): 一个回调函数的集合,它知道如何去监听由 Observable 提供的值。
  • Subscription (订阅): 表示 Observable 的执行,主要用于取消 Observable 的执行。
  • Operators (操作符): 采用函数式编程风格的纯函数 (pure function),使用像 mapfilterconcatflatMap 等这样的操作符来处理集合。
  • Subject (主体): 相当于 EventEmitter,并且是将值或事件多路推送给多个 Observer 的唯一方式。
  • Schedulers (调度器): 用来控制并发并且是中央集权的调度员,允许我们在发生计算时进行协调,例如 setTimeoutrequestAnimationFrame 或其他。

Observable

1
2
3
4
5
6
7
8
9
10
11
import { Observable } from 'rxjs';

const observable = new Observable((subscriber) => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
});

Observable 可以接受一个 Function用来发布数据。实例化Observable可以获得observable.它可以用来订阅observable,用来处理Observable发布的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import { Observable } from 'rxjs';

const observable = new Observable((subscriber) => {
console.log('Hello');
subscriber.next(42);
subscriber.next(100);
subscriber.next(200);
setTimeout(() => {
subscriber.next(300); // happens asynchronously
}, 1000);
});

console.log('before');
observable.subscribe((x) => {
console.log(x);
});
console.log('after');

/* 输出
"before"
"Hello"
42
100
200
"after"
300
*/

observable.subscribe的调用可以获取到 Observable中推动的数据及时它是异步的。

Observable 四个核心概念

  • 创建 Observables - const observable = new Observable(subscriber => {...});
  • 订阅 Observables - observable.subscribe(...)
  • 执行 Observables - new Observable(function subscribe(subscriber) {...})subscriber支持next\error\complete三种类型执行方式
  • 清理 Observables - observable.subscribe(...) 执行后获取一个Subscription,可以用它清理掉Observable subscription.unsubscribe()

除了用new Observable可以创建observable之外,Rxjs 也提供了其它创建observable的方式:

1
2
3
4
5
6
import { from } from 'rxjs';

const observable = from([10, 20, 30]);
const subscription = observable.subscribe((x) => console.log(x));
// Later:
subscription.unsubscribe();

Observer

ObserverObservable发送值的消费方。Observer有三种形式:一个函数,一个{next:Function,error:Function,complete:Function}的对象,传递三个函数作为参数,参数顺序是:next、error、complete

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import { Observable } from 'rxjs';

// 创建一个 流
const Stream$ = new Observable((subscribe) => {
// subscribe除了 next方法可以调用,也还可以调用 complete,error方法。
setTimeout(() => {
subscribe.next([1, 2, 3]);
});
subscribe.next({ a: 100 });
});

const obsever1 = {
complete: () => console.log('obsever1 - done'),
next: (v) => console.log('obsever1', v),
error: () => console.error('obsever1', 'error'),
};

// 启动流
const Subscription1 = Stream$.subscribe(obsever);
const Subscription2 = Stream$.subscribe(function observer2(v) {
console.log('obsever2', v);
});

const Subscription2 = Stream$.subscribe(
function observer3_next(v) {
console.log('obsever2', v);
},
function observer3_error(v) {
console.error('obsever2', v);
},
function observer3_complete(v) {
console.log('obsever3 - done');
}
);

next\error\complete并不需要全部定义,可以定义一部分。

operators

尽管 RxJS 的根基是 Observable,但最有用的还是它的操作符。操作符是允许复杂的异步代码以声明式的方式进行轻松组合的基础代码单元。

operators 操作符就是一个函数,它有两种类型:

  • Pipeable Operators (管道操作符): observable实例可以使用这样的语法:observableInstance.pipe(operator()) 传递一个Observable并返回一个新的Observable

    操作符本质上是一个纯函数 (pure function),它接收一个 Observable 作为输入,并生成一个新的 Observable 作为输出。订阅输出 Observable 同样会订阅输入 Observable 。

  • Creation Operators (创造操作符):可以称为独立函数来创建新的 Observable。

Piping

Pipeable Operators 是无害的纯函数,借助pipe()可以实现,多个管道操作符串行处理。

1
obs.pipe(op1(), op2(), op3(), op3());

你可以通过op()(obs)使用管道运算符,但并不推荐这样使用,因为可读性不好:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import { of } from 'rxjs';
import { map } from 'rxjs/operators';

// op()(obs) 模式
// map是管道操作符。 of返回一个 observable
map((x: number) => x * x)(of(1, 2, 3)).subscribe((v) =>
console.log(`value1: ${v}`)
);

// obs.pipe( op1(),op2(),op3(),op3()) 模式
of(1, 2, 3)
.pipe(map((v) => v + v))
.subscribe((v) => console.log(`value2: ${v}`));

/** 输出
value1: 1
value1: 4
value1: 9

value2: 2
value2: 4
value2: 6
*/

Creation Operators

什么是创作运算符?与管道运算符不同,创建运算符是可用于创建Observable对象的函数.具有一些常见的预定义行为或通过加入其他 Observable.

创建运算符的典型示例是间隔函数。它以数字(不是 Observable)作为输入参数,并产生 Observable 作为输出:

1
2
3
import { interval } from 'rxjs';

const observable = interval(1000 /* number of milliseconds */);

操作符分类

操作符有着不同的用途,它们可作如下分类:创建、转换、过滤、组合、错误处理、工具,等等。在下面的列表中,你可以按分类组织好的所有操作符。

创建操作符
连接创建操作符

这些是 Observable 创建运算符,还具有联接功能-发出多个源 Observable 的值。

转换操作符
过滤操作符
连接操作符
多广播操作符
错误捕获操作符
工具操作符
条件和布尔操作符
数学和聚合操作符

例子 🌰

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta http-equiv="X-UA-Compatible" content="IE=edge" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>RxJS</title>
</head>
<body>
<div>click count: <span id="count"></span></div>
<div>
<button id="button-plus">+1</button>
<button id="button-minus">-1</button>
</div>
<script src="https://cdn.bootcdn.net/ajax/libs/rxjs/6.6.3/rxjs.umd.js"></script>
<script>
let { fromEvent, operators } = window.rxjs;
let { mapTo, scan, merge } = window.rxjs.operators;
console.log('operators:', operators);
console.log('window.rxjs:', window.rxjs);
fromEvent(document.getElementById('button-plus'), 'click')
.pipe(
mapTo(1),
merge(
fromEvent(document.getElementById('button-minus'), 'click').pipe(
mapTo(-1)
)
),
scan((total, now) => total + now)
)
.subscribe((value) => {
document.getElementById('count').innerText = value;
});
</script>
</body>
</html>

Subscription

Subscription - 是代表一次性资源的对象,通常是Observable的执行。Subscription拥有一种重要的方法unsubscribe(),即取消订阅,该方法不带任何参数,而只是处置该订阅所拥有的资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import { interval } from 'rxjs';

const observable = interval(1000);
const subscription = observable.subscribe((x) => console.log(x));
// Later:
// This cancels the ongoing Observable execution which
// was started by calling subscribe with an Observer.
setTimeout(() => {
subscription.unsubscribe();
}, 5000);

/** 输出
0
1
2
3
*/

Subscription 还可以合在一起,这样一个 Subscription 调用 unsubscribe() 方法,可能会有多个 Subscription 取消订阅 。你可以通过把一个 Subscription 添加到另一个上面来做这件事:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import { interval } from 'rxjs';

const observable1 = interval(200);
const observable2 = interval(300);

const subscription = observable1.subscribe((v) =>
console.log(`observable1 ${v}`)
);
const childSubscription = observable2.subscribe((v) =>
console.log(`observable2 ${v}`)
);

subscription.add(childSubscription);

setTimeout(() => {
subscription.unsubscribe();
}, 1500);

/** 输出
observable1 0
observable2 0
observable1 1
observable1 2
observable2 1
observable1 3
observable2 2
observable1 4
observable2 3
observable1 5
observable1 6
*/

Subject

Subject 是一种特殊类型的Observable,它允许将值多播给多个观察者,所以 Subject 是多播的,而普通的 Observables 是单播的(每个已订阅的观察者都拥有 Observable 的独立执行)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import { Subject } from 'rxjs';

// 创建subject
const subject = new Subject();

// 订阅一个observer
subject.subscribe((v) => console.log('stream 1', v));
// 再订阅一个observer
subject.subscribe((v) => console.log('stream 2', v));
// 延时1s再订阅一个observer
setTimeout(() => {
subject.subscribe((v) => console.log('stream 3', v));
}, 1000);
// 产生数据1
subject.next(1);
// 产生数据2
subject.next(2);
// 延时3s产生数据3
setTimeout(() => {
subject.next(3);
}, 3000);
/**
stream 1 1
stream 2 1
stream 1 2
stream 2 2
stream 1 3
stream 2 3
stream 3 3
*/

单播和多播

单播: 每个普通的 Observables 实例都只能被一个观察者订阅,当它被其他观察者订阅的时候会产生一个新的实例。也就是普通 Observables 被不同的观察者订阅的时候,会有多个实例,不管观察者是从何时开始订阅,每个实例都是从头开始把值发给对应的观察者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import { Observable } from 'rxjs';

// Observable是单播,每一个订阅是独立
const observable = new Observable((observer) => {
observer.next(Math.random());
setTimeout(() => observer.next(Math.random()), 300);
});

observable.subscribe((val) => console.log(`订阅1: ${val}`));

observable.subscribe((val) => console.log(`订阅2: ${val}`));

setTimeout(
() => observable.subscribe((val) => console.log(`订阅3:${val}`)),
500
);

/**
订阅1: 0.6689612996581182
订阅2: 0.28043048188854236
订阅1: 0.1297750238807054
订阅2: 0.17389658580744594
订阅3:0.6927734300935287
订阅3:0.39659821631152314
*/

多播: Subject是多播,则多个订阅都是一个实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import { Subject } from 'rxjs';

const subject = new Subject();

subject.subscribe((val) => console.log(`订阅1: ${val}`));
subject.subscribe((val) => console.log(`订阅2: ${val}`));

setTimeout(() => {
subject.subscribe((val) => console.log(`订阅3: ${val}`));
}, 200);

subject.next(Math.random());
setTimeout(() => {
subject.next(Math.random());
}, 300);

/**
订阅1: 0.05834565385958235
订阅2: 0.05834565385958235
订阅1: 0.7996036293548809
订阅2: 0.7996036293548809
订阅3: 0.7996036293548809
*/

每个 Subject 都是 Observable 。 - 对于 Subject,你可以提供一个观察者并使用 subscribe 方法,就可以开始正常接收值。从观察者的角度而言,它无法判断 Observable 执行是来自普通的 Observable 还是 Subject 。

在 Subject 的内部,subscribe 不会调用发送值的新执行。它只是将给定的观察者注册到观察者列表中,类似于其他库或语言中的 addListener 的工作方式。

每个 Subject 都是 Observer。 - Subject 是一个有如下方法的对象: next(v)error(e)complete() 。要给 Subject 提供新值,只要调用 next(theValue),它会将值多播给已注册监听该 Subject 的观察者们。

1
2
3
4
5
6
7
8
9
10
11
var subject = new Rx.Subject();

subject.subscribe({
next: (v) => console.log('observerA: ' + v),
});
subject.subscribe({
next: (v) => console.log('observerB: ' + v),
});

subject.next(1);
subject.next(2);

因为 Subject 是观察者,这也就在意味着你可以把 Subject 作为参数传给任何 Observable 的 subscribe 方法,如下面的示例所展示的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import { from, Subject } from 'rxjs';

const Subject$ = new Subject();

Subject$.subscribe({ next: (v) => console.log('observerA: ' + v) });
Subject$.subscribe({ next: (v) => console.log('observerB: ' + v) });

const observable = from([1, 2, 3, 4]);
observable.subscribe(Subject$);

/**输出
observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
observerA: 4
observerB: 4
*/

还有一些特殊类型的 SubjectBehaviorSubjectReplaySubjectAsyncSubject

BehaviorSubject

Subject 的其中一个变体就是 BehaviorSubject,它有一个“当前值”的概念。它保存了发送给消费者的最新值。并且当有新的观察者订阅时,会立即从 BehaviorSubject 那接收到“当前值”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject(0); // 0 is the initial value

subject.subscribe({
next: (v) => console.log(`observerA: ${v}`),
});

subject.next(1);
subject.next(2);

subject.subscribe({
next: (v) => console.log(`observerB: ${v}`),
});

subject.next(3);
/** 输出
observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
*/

ReplaySubject

ReplaySubject 类似于 BehaviorSubject,它可以发送旧值给新的订阅者,但它还可以记录 Observable 执行的一部分。

ReplaySubject 记录 Observable 执行中的多个值并将其回放给新的订阅者。

当创建 ReplaySubject 时,你可以指定回放多少个值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(3); // buffer 3 values for new subscribers

subject.subscribe({
next: (v) => console.log(`observerA: ${v}`),
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
next: (v) => console.log(`observerB: ${v}`),
});

subject.next(5);

// Logs:
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerB: 2
// observerB: 3
// observerB: 4
// observerA: 5
// observerB: 5

除了缓冲数量,你还可以指定 window time (以毫秒为单位)来确定多久之前的值可以记录。在下面的示例中,我们使用了较大的缓存数量100,但 window time 参数只设置了500毫秒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(100, 500 /* windowTime */);

subject.subscribe({
next: (v) => console.log(`observerA: ${v}`),
});

let i = 1;
setInterval(() => subject.next(i++), 200);

setTimeout(() => {
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`),
});
}, 1000);

// Logs
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerA: 5
// observerB: 3
// observerB: 4
// observerB: 5
// observerA: 6
// observerB: 6
// ...

AsyncSubject

AsyncSubject 是另一个 Subject 变体,只有当 Observable 执行完成时(执行 complete()),它才会将执行的最后一个值发送给观察者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import { AsyncSubject } from 'rxjs';
const subject = new AsyncSubject();

subject.subscribe({
next: (v) => console.log(`observerA: ${v}`),
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
next: (v) => console.log(`observerB: ${v}`),
});

subject.next(5);
subject.complete();

// Logs:
// observerA: 5
// observerB: 5

AsyncSubject 和 last() 操作符类似,因为它也是等待 complete 通知,以发送一个单个值。

Scheduler(调度器)

Scheduler 控制着何时启动subscription和何时发送通知。它由三部分组成:

  • 调度器是一种数据结构。它知道如何根据优先级或其他标准来存储任务和将任务进行排序。
  • 调度器是执行上下文。 它表示在何时何地执行任务(举例来说,立即的,或另一种回调函数机制(比如 setTimeout 或 process.nextTick),或动画帧)。
  • 调度器有一个(虚拟的)时钟。 调度器功能通过它的 getter 方法 now() 提供了”时间”的概念。在具体调度器上安排的任务将严格遵循该时钟所表示的时间。

调度器可以让你规定 Observable 在什么样的执行上下文中发送通知给它的观察者。

在之前的例子中我们是用Obervable同步的输出值。下面通过observeOn指的async 调度器去传递这些值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import { Observable, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';

const observable = new Observable((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}).pipe(
observeOn(asyncScheduler) //如果没有这个,则 just after subscribe 最后输出
);

console.log('just before subscribe');
observable.subscribe({
next(x) {
console.log('got value ' + x);
},
error(err) {
console.error('something wrong occurred: ' + err);
},
complete() {
console.log('done');
},
});
console.log('just after subscribe');

/** 输出
just before subscribe
just after subscribe
got value 1
got value 2
got value 3
done
*/

Schedulers(调度器) 类型

async调度器是 RxJS 提供的内部调度器之一。

调度器 目的
null 不传递任何调度器的话,会以同步递归的方式发送通知。用于定时操作或尾递归操作。
queueScheduler 当前事件帧中的队列调度(蹦床调度器)。用于迭代操作。
asapScheduler Schedules 是在微任务队列上, 它就像运行在 promises 队列上.在当前执行任务之后再下一个任务之前执行。它就像一个异步执行。
asyncScheduler Schedules 与 setInterval 一起使用.基础的调度器
animationFrameScheduler Schedules 会在浏览器下一次渲染之前执行. 它可以用来绘制流程的动画。

使用 Schedulers(调度器)

你可能在你的 RxJS 代码中已经使用过调度器了,只是没有明确地指明要使用的调度器的类型。这是因为所有的 Observable 操作符处理并发性都有可选的调度器。如果没有提供调度器的话,RxJS 会通过使用最小并发原则选择一个默认调度器。这意味着引入满足操作符需要的最小并发量的调度器会被选择。例如,对于返回有限和少量消息的 observable 的操作符,RxJS 不使用调度器,即 nullundefined 。对于返回潜在大量的或无限数量的消息的操作符,使用 queueScheduler 调度器。对于使用定时器的操作符,使用 asyncScheduler 调度器。

静态创建操作符通常可以接收调度器作为参数。 举例来说,from(array, scheduler) 可以让你指定调度器,当发送从 array 转换的每个通知的时候使用。调度器通常作为操作符的最后一个参数。静态创建操作符接收调度器参数:

  • bindCallback
  • bindNodeCallback
  • combineLatest
  • concat
  • empty
  • from
  • fromPromise
  • interval
  • merge
  • of
  • range
  • throw
  • timer

使用 subscribeOn 来调度 subscribe() 调用在什么样的上下文中执行。 默认情况下,Observable 的 subscribe() 调用会立即同步地执行。然而,你可能会延迟或安排在给定的调度器上执行实际的 subscription ,使用实例操作符 subscribeOn(scheduler),其中 scheduler 是你提供的参数。

使用 observeOn 来调度发送通知的的上下文。 正如我们在上面的示例中所看到的,实例操作符 observeOn(scheduler) 在源 Observable 和目标观察者之间引入了一个中介观察者,中介负责调度,它使用给定的 scheduler 来调用目标观察者。

bufferTimedebounceTimedelayauditTimesampleTimethrottleTimetimeIntervaltimeouttimeoutWithwindowTime 这样时间相关的操作符全部接收调度器作为最后的参数,并且默认的操作是在 asyncScheduler 调度器上。

其他接收调度器作为参数的实例操作符:cachecombineLatestconcatexpandmergepublishReplaystartWith

注意,cachepublishReplay 都接收调度器是因为它们使用了 ReplaySubject 。ReplaySubjects 的构造函数接收一个可选的调度器作为最后的参数,因为 ReplaySubject 可能会处理时间,这只在调度器的上下文中才有意义。默认情况下,ReplaySubject 使用 queue 调度器来提供时钟。

RxJS 业务上的实践

页面元素拖拽

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta http-equiv="X-UA-Compatible" content="IE=edge" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>RxJS 元素拖拽</title>
<style>
html,
body {
height: 100%;
background-color: tomato;
position: relative;
}

#drag {
position: absolute;
display: inline-block;
width: 100px;
height: 100px;
background-color: #fff;
cursor: all-scroll;
}
</style>
</head>
<body>
<div id="drag"></div>
<script src="https://cdn.bootcdn.net/ajax/libs/rxjs/6.6.3/rxjs.umd.js"></script>
<script>
const { fromEvent, operators } = window.rxjs;
const { flatMap, map, takeUntil } = operators;
let dragDOM = document.getElementById('drag');
let body = document.body;

const mouseDown = fromEvent(dragDOM, 'mousedown');
const mouseUp = fromEvent(body, 'mouseup');
const mouseMove = fromEvent(body, 'mousemove');
mouseDown
.pipe(
flatMap((event) => mouseMove.pipe(takeUntil(mouseUp))),
map((event) => ({ x: event.clientX, y: event.clientY }))
)
.subscribe((pos) => {
dragDOM.style.left = `${pos.x}px`;
dragDOM.style.top = `${pos.y}px`;
});
</script>
</body>
</html>

Refer To

https://rxjs.dev/guide/overview

https://zhuanlan.zhihu.com/p/274469124

https://zhuanlan.zhihu.com/p/270609123

https://cisy.me/rxjs/

https://zhuanlan.zhihu.com/p/34357403

https://www.cnblogs.com/star91/p/rxjs-ru-men.html

https://zhuanlan.zhihu.com/p/146795979

https://zhuanlan.zhihu.com/p/34357403