- 本文目的在于介绍rxjs operators,总结下个人code实践和理解,以备以后个人查阅,code 采用ts+rollup打包方式
- 具体可以移步下方传送通道
Rxjs 入门
操作符实战
1.工具方法型
count
- 统计总数
1
2
3
4
5
6
7
8import { range } from 'rxjs';
import { count } from 'rxjs/operators';
const numbers = range(1, 7);
const result = numbers.pipe(count(i => i % 2 === 1));
result.subscribe(x => console.log(x));
// Results in:
// 4
reduce
- 累计
1
2
3
4
5
6
7
8
9
10import { fromEvent, interval } from 'rxjs';
import { reduce, takeUntil, mapTo } from 'rxjs/operators';
const clicksInFiveSeconds = fromEvent(document, 'click').pipe(
takeUntil(interval(5000)),
);
const ones = clicksInFiveSeconds.pipe(mapTo(1));
const seed = 0;
const count = ones.pipe(reduce((acc, one) => acc + one, seed));
count.subscribe(x => console.log(x));
max\min
1 | import { of,merge } from 'rxjs'; |
tap
日志输出
of 是一个个输出的
1 | import { of,merge } from 'rxjs'; |
repeat
重复 === 多次订阅
1 | import { tap } from 'rxjs/operators'; |
subscribeOn, observeOn
- 调整执行时机,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28import { of, merge } from 'rxjs';
const a = of(1, 2, 3, 4);
const b = of(5, 6, 7, 8, 9);
merge(a, b).subscribe(console.log);
// 1 2 3 4 5 6 7 8 9
import { of, merge, asyncScheduler } from 'rxjs';
import { subscribeOn } from 'rxjs/operators';
const a = of(1, 2, 3, 4).pipe(subscribeOn(asyncScheduler));
const b = of(5, 6, 7, 8, 9);
merge(a, b).subscribe(console.log);
//5 6 7 8 9 1 2 3 4
import { interval } from 'rxjs';
import { observeOn } from 'rxjs/operators';
const intervals = interval(10); // Intervals are scheduled
// with async scheduler by default...
intervals.pipe(
observeOn(animationFrameScheduler), // ...but we will observe on animationFrame
) // scheduler to ensure smooth animation.
.subscribe(val => {
someDiv.style.height = val + 'px';
});
materialize
- 用默认对象包箱, dematerialize 开箱
1
2
3
4
5
6
7
8
9
10
11
12
13import { of } from 'rxjs';
import { materialize, map } from 'rxjs/operators';
const letters = of('a', 'b', '13', 'd');
const upperCase = letters.pipe(map(x => x.toUpperCase()));
const materialized = upperCase.pipe(materialize());
materialized.subscribe(x => console.log(x));
Notification { kind: 'N', value: 'A', error: undefined, hasValue: true }
Notification { kind: 'N', value: 'B', error: undefined, hasValue: true }
Notification { kind: 'N', value: '13', error: undefined, hasValue: true }
Notification { kind: 'N', value: 'D', error: undefined, hasValue: true }
Notification { kind: 'C', value: undefined, error: undefined, hasValue: false }
timestamp
- 添加时间戳
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15import { of } from 'rxjs';
import { materialize, map, timestamp, tap } from 'rxjs/operators';
const letters = of('a', 'b', '13', 'd');
const times = letters.pipe(timestamp());
times.subscribe(res => {
console.log("res...",res)
});
//output
res... Timestamp { value: 'a', timestamp: 1594074567694 }
res... Timestamp { value: 'b', timestamp: 1594074567700 }
res... Timestamp { value: '13', timestamp: 1594074567700 }
res... Timestamp { value: 'd', timestamp: 1594074567700 }
toArray
最终结果toArray,取决于source是一个个产生的,map,filter,interval
1 | import { interval } from 'rxjs'; |
结合 filter、map
1 | import { from } from 'rxjs' |
延迟类
delay
- 延迟执行,但是忽略error ?
1 | import { of } from 'rxjs'; |
delayWhen
1 | import { interval, timer } from 'rxjs'; |
timeout、timeInterval
- timeInterval 输出对象,timeout输出值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33import { timeInterval, timeout } from "rxjs/operators";
import { interval } from "rxjs";
const seconds = interval(1000);
seconds.pipe(timeInterval())
.subscribe(
value => {
console.log("time.....");
console.log(value)
},
err => console.log(err),
);
seconds.pipe(timeout(1100)) # 900小于间隔1000,会忽略,不输出值
.subscribe(
value => {
console.log("out.....");
console.log(value)
},
err => console.log(err),
);
//output
time.....
TimeInterval { value: 0, interval: 1007 }
out.....
0
time.....
TimeInterval { value: 1, interval: 1005 }
out.....
1
timeoutWith
1 | import { interval } from 'rxjs'; |
interval
异步产生数据
1 | import { interval } from 'rxjs'; |
timer
1 | import { timer } from 'rxjs'; |
2.创造型
of - 单一输出
同步产生数据
1
2
3
4
5
6
7
8
9
10 import { of } from 'rxjs';
of([1,2,3])
.subscribe(
next => console.log('next:', next),
err => console.log('error:', err),
() => console.log('the end'),
);
// result:
// 'next: [1,2,3]'
from - 拆分输出
同步产生数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 import { from, asyncScheduler } from 'rxjs';
console.log('start');
const array = [10, 20, 30];
const result = from(array, asyncScheduler);
result.subscribe(x => console.log(x));
console.log('end');
// Logs:
// start
// end
// 10
// 20
// 30
ajax
异步产生数据
- ajax
1
2
3
4
5
6
7
8
9
10
11import { ajax } from 'rxjs/ajax';
import { map, catchError } from 'rxjs/operators';
import { of } from 'rxjs';
const obs$ = ajax(`https://api.github.com/users?per_page=5`).pipe(
map(userResponse => console.log('users: ', userResponse)),
catchError(error => {
console.log('error: ', error);
return of(error);
})
); - getJson
1
2
3
4
5
6
7
8
9
10
11import { ajax } from 'rxjs/ajax';
import { map, catchError } from 'rxjs/operators';
import { of } from 'rxjs';
const obs$ = ajax.getJSON(`https://api.github.com/users?per_page=5`).pipe(
map(userResponse => console.log('users: ', userResponse)),
catchError(error => {
console.log('error: ', error);
return of(error);
})
); - 类jquery写法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20import { ajax } from 'rxjs/ajax';
import { of } from 'rxjs';
const users = ajax({
url: 'https://httpbin.org/delay/2',
method: 'POST',
headers: {
'Content-Type': 'application/json',
'rxjs-custom-header': 'Rxjs'
},
body: {
rxjs: 'Hello World!'
}
}).pipe(
map(response => console.log('response: ', response)),
catchError(error => {
console.log('error: ', error);
return of(error);
})
);
3.转换型
mergeMap = map + mergeAll
1 | import { of, interval } from 'rxjs'; |
concatMap = map + concatAll
1 | import { of } from 'rxjs'; |
switchMap = map + switchAll
1 | // RxJS v6+ |
exhaustMap = map + exhaustAll
1 | // RxJS v6+ |
mapTo
- 修改值
1
2
3
4
5
6import { fromEvent } from 'rxjs';
import { mapTo } from 'rxjs/operators';
const clicks = fromEvent(document, 'click');
const greetings = clicks.pipe(mapTo('Hi'));
greetings.subscribe(x => console.log(x));
map
1 | import { combineLatest, of } from 'rxjs'; |
4.联合型
combineLatest
适用场景: 依赖多个输入,产出新的结论
数据整合交给下游
1 | import { timer, combineLatest } from 'rxjs'; |
整合好数据,交给下游
入参observabel数组
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 // RxJS v6+
import { timer, combineLatest } from 'rxjs';
const timerOne$ = timer(1000, 4000);
const timerTwo$ = timer(2000, 4000);
const timerThree$ = timer(3000, 4000);
combineLatest(
timerOne$,
timerTwo$,
timerThree$,
// combineLatest also takes an optional projection function
(one, two, three) => {
return `Timer One (Proj) Latest: ${one},
Timer Two (Proj) Latest: ${two},
Timer Three (Proj) Latest: ${three}`;
}
).subscribe(console.log);
concat - 队列形式
适用于: 先到先得,队列形式数据处理
返回Observable
前一个不结束,后一个永无机会
1 | // RxJS v6+ |
倒计时
1 | // RxJS v6+ |
merge - 先到先输出
先到先输出,不论书写位置,一个个输出,非数组形式
返回Observable
1 | import { of, merge, concat } from 'rxjs'; |
exhaust
返回Observable
withLatestFrom
适用于: 多个输入源,但是只有一个主导
返回 OperatorFunction
1 | import { timeInterval, timeout, withLatestFrom } from "rxjs/operators"; |
concatAll
返回 OperatorFunction
mergeAll
返回 OperatorFunction
exhaustAll
返回 OperatorFunction
switchAll
返回 OperatorFunction
forkJoin - 只取最终值
入参observabel数组
1 | import { forkJoin, of, timer } from 'rxjs'; |
startWith
适用于: 添加特定的数据
添加单个前置数据 - hello world
1 | // RxJS v6+ |
添加多个前置数据
1 | // RxJS v6+ |
5.过滤型
filter
1 | import { from } from 'rxjs'; |
6.条件判定
every
- 每一个都需要满足条件才true
1
2
3
4
5
6
7import { of } from 'rxjs';
import { every } from 'rxjs/operators';
of(1, 2, 3, 4, 5, 6).pipe(
every(x => x < 5),
)
.subscribe(x => console.log(x)); // -> false
find、findIndex
- 找到第一个满足条件的就行
1
2
3
4
5
6import { fromEvent } from 'rxjs';
import { find } from 'rxjs/operators';
const clicks = fromEvent(document, 'click');
const result = clicks.pipe(find(ev => ev.target.tagName === 'DIV'));
result.subscribe(x => console.log(x));
isEmpty
- 判断Observable是否为空
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17import { Subject } from 'rxjs';
import { isEmpty } from 'rxjs/operators';
const source = new Subject<string>();
const result = source.pipe(isEmpty());
source.subscribe(x => console.log(x));
result.subscribe(x => console.log(x));
source.next('a');
source.next('b');
source.next('c');
source.complete();
// Results in:
// a
// false
// b
// c
iif
1 | import { iif, of, interval } from 'rxjs'; |
defaultIfEmpty
1 | import { defaultIfEmpty } from 'rxjs/operators'; |
7.异常处理
catchError
单一处理
1 | import { throwError, of } from 'rxjs'; |
整合其他operator
1 | import { throwError, fromEvent, of } from 'rxjs'; |
retry
设置重试次数
1 | // RxJS v6+ |
retryWhen
delayWhen
1 | // RxJS v6+ |
8.自定义类型
参考
Rxjs
若你觉得我的文章对你有帮助,欢迎点击上方按钮对我打赏