前端analysis | What,Why,Who,When,Where,How

《angular8》- Rxjs 操作符实践指南

2020-07-17

  • 本文目的在于介绍rxjs operators,总结下个人code实践和理解,以备以后个人查阅,code 采用ts+rollup打包方式
  • 具体可以移步下方传送通道

Rxjs 入门

传送门

操作符实战

1.工具方法型

count

  • 统计总数
    1
    2
    3
    4
    5
    6
    7
    8
    import { range } from 'rxjs';
    import { count } from 'rxjs/operators';

    const numbers = range(1, 7);
    const result = numbers.pipe(count(i => i % 2 === 1));
    result.subscribe(x => console.log(x));
    // Results in:
    // 4

reduce

  • 累计
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    import { fromEvent, interval } from 'rxjs';
    import { reduce, takeUntil, mapTo } from 'rxjs/operators';

    const clicksInFiveSeconds = fromEvent(document, 'click').pipe(
    takeUntil(interval(5000)),
    );
    const ones = clicksInFiveSeconds.pipe(mapTo(1));
    const seed = 0;
    const count = ones.pipe(reduce((acc, one) => acc + one, seed));
    count.subscribe(x => console.log(x));

max\min

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
  import { of,merge } from 'rxjs';
import { max,min,tap } from 'rxjs/operators';

const obs$ = of(5, 4, 7, 2, 8);
merge(
obs$.pipe(max()),
obs$.pipe(min()),
).pipe(tap((val) => {
console.log("result....",val);
})).subscribe(console.log);

//output
result.... 8
8
result.... 2
2

tap

日志输出
of 是一个个输出的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
  import { of,merge } from 'rxjs';
import { max,min,tap } from 'rxjs/operators';

const obs$ = of(5, 4, 7, 2, 8);
obs$.pipe(tap({
next:(val) => {
console.log("val",val);
},
error:() => {

},
complete:() => {

}
})).subscribe(console.log)

//output
val 5
5
val 4
4
val 7
7
val 2
2

repeat

重复 === 多次订阅

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import { tap } from 'rxjs/operators';
// RxJS v6+
import { repeat, delay } from 'rxjs/operators';
import { of } from 'rxjs';

const delayedThing = of('delayed value').pipe(
tap(() => {
console.log("time..1.",new Date().toLocaleTimeString());
}),
delay(2000)
);

delayedThing
.pipe(
tap(() => {
console.log("time...2",new Date().toLocaleTimeString());
}),
repeat(3)
)
.subscribe(console.log);

//output
time..1. 4:42:45 PM
time...2 4:42:47 PM
delayed value
time..1. 4:42:47 PM
time...2 4:42:49 PM
delayed value
time..1. 4:42:49 PM
time...2 4:42:51 PM
delayed value

subscribeOn, observeOn

  • 调整执行时机,
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    import { of, merge } from 'rxjs';

    const a = of(1, 2, 3, 4);
    const b = of(5, 6, 7, 8, 9);
    merge(a, b).subscribe(console.log);
    // 1 2 3 4 5 6 7 8 9


    import { of, merge, asyncScheduler } from 'rxjs';
    import { subscribeOn } from 'rxjs/operators';

    const a = of(1, 2, 3, 4).pipe(subscribeOn(asyncScheduler));
    const b = of(5, 6, 7, 8, 9);
    merge(a, b).subscribe(console.log);
    //5 6 7 8 9 1 2 3 4


    import { interval } from 'rxjs';
    import { observeOn } from 'rxjs/operators';

    const intervals = interval(10); // Intervals are scheduled
    // with async scheduler by default...
    intervals.pipe(
    observeOn(animationFrameScheduler), // ...but we will observe on animationFrame
    ) // scheduler to ensure smooth animation.
    .subscribe(val => {
    someDiv.style.height = val + 'px';
    });

materialize

  • 用默认对象包箱, dematerialize 开箱
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    import { of } from 'rxjs';
    import { materialize, map } from 'rxjs/operators';

    const letters = of('a', 'b', '13', 'd');
    const upperCase = letters.pipe(map(x => x.toUpperCase()));
    const materialized = upperCase.pipe(materialize());
    materialized.subscribe(x => console.log(x));

    Notification { kind: 'N', value: 'A', error: undefined, hasValue: true }
    Notification { kind: 'N', value: 'B', error: undefined, hasValue: true }
    Notification { kind: 'N', value: '13', error: undefined, hasValue: true }
    Notification { kind: 'N', value: 'D', error: undefined, hasValue: true }
    Notification { kind: 'C', value: undefined, error: undefined, hasValue: false }

timestamp

  • 添加时间戳
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
     import { of } from 'rxjs';
    import { materialize, map, timestamp, tap } from 'rxjs/operators';

    const letters = of('a', 'b', '13', 'd');

    const times = letters.pipe(timestamp());
    times.subscribe(res => {
    console.log("res...",res)
    });

    //output
    res... Timestamp { value: 'a', timestamp: 1594074567694 }
    res... Timestamp { value: 'b', timestamp: 1594074567700 }
    res... Timestamp { value: '13', timestamp: 1594074567700 }
    res... Timestamp { value: 'd', timestamp: 1594074567700 }

toArray

最终结果toArray,取决于source是一个个产生的,map,filter,interval

1
2
3
4
5
6
7
8
9
10
11
12
import { interval } from 'rxjs';
import { toArray, take } from 'rxjs/operators';

const source = interval(1000);
const example = source.pipe(
take(10),
toArray()
);

const subscribe = example.subscribe(val => console.log(val));

// output: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

结合 filter、map

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import { from } from 'rxjs'
import { filter , tap, toArray, map } from 'rxjs/operators'

const arr = [{
a:1,
b:2,
c:3
},{
a:2,
b:4,
c:5,
},{
a:7,
b:10,
c:10
}];

from(arr).pipe(
map((item) => {
return {
a:item.a,
b:item.b * 2,
c: item.c
}
}),
tap((val) => {
console.log("map result ....",val);
}),
toArray()
).subscribe(console.log)


from(arr).pipe(
filter( (item) => {
return item.a > 1;
}),
tap( (val) => {
console.log("the result is ",val);
}),
toArray(),
).subscribe(console.log)

延迟类

delay

  • 延迟执行,但是忽略error ?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import { of } from 'rxjs';
import { tap, delay } from 'rxjs/operators';

const obs$ = of([1,2]);
obs$.pipe(tap(res => {
console.log("get value from of....",new Date().toLocaleTimeString());
}),delay(2000),tap(() => { # 延迟执行,也可以变成随机延迟。 delayWhen(event => interval(Math.random() * 5000))
console.log("get value from of....",new Date().toLocaleTimeString());
})).subscribe(res => {
console.log("of res...;.",res);
});

//output
get value from of.... 7:52:27 AM
get value from of.... 7:52:29 AM
of res...;. [ 1, 2 ]

delayWhen

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import { interval, timer } from 'rxjs';
import { delayWhen } from 'rxjs/operators';

const message = interval(1000);
const delayForFiveSeconds = () => timer(5000);
const delayWhenExample = message.pipe(delayWhen(delayForFiveSeconds));
const subscribe = delayWhenExample.subscribe(val => console.log(val));

//output
5s延迟....

0
1
2

timeout、timeInterval

  • timeInterval 输出对象,timeout输出值
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
     import { timeInterval, timeout } from "rxjs/operators";
    import { interval } from "rxjs";

    const seconds = interval(1000);

    seconds.pipe(timeInterval())
    .subscribe(
    value => {
    console.log("time.....");
    console.log(value)
    },
    err => console.log(err),
    );

    seconds.pipe(timeout(1100)) # 900小于间隔1000,会忽略,不输出值
    .subscribe(
    value => {
    console.log("out.....");
    console.log(value)
    },
    err => console.log(err),
    );

    //output
    time.....
    TimeInterval { value: 0, interval: 1007 }
    out.....
    0
    time.....
    TimeInterval { value: 1, interval: 1005 }
    out.....
    1

timeoutWith

1
2
3
4
5
6
7
8
9
import { interval } from 'rxjs';
import { timeoutWith } from 'rxjs/operators'
import { of } from 'rxjs';

const first$ = interval(3000);
const second$ = of('go to the default');
first$.pipe(timeoutWith(2000,second$)).subscribe(console.log) # 2s内必须获取到数据,否则走默认值
//output
go to the default

interval

异步产生数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import { interval } from 'rxjs';
import { take } from 'rxjs/operators';

const numbers = interval(1000);

const takeFourNumbers = numbers.pipe(take(4));

takeFourNumbers.subscribe(x => console.log('Next: ', x));

// Logs:
// Next: 0
// Next: 1
// Next: 2
// Next: 3

timer

1
2
3
4
import { timer } from 'rxjs';

const numbers = timer(3000, 1000);
numbers.subscribe(x => console.log(x));

2.创造型

of - 单一输出

同步产生数据

1
2
3
4
5
6
7
8
9
10
import { of } from 'rxjs';

of([1,2,3])
.subscribe(
next => console.log('next:', next),
err => console.log('error:', err),
() => console.log('the end'),
);
// result:
// 'next: [1,2,3]'

from - 拆分输出

同步产生数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import { from, asyncScheduler } from 'rxjs';

console.log('start');

const array = [10, 20, 30];
const result = from(array, asyncScheduler);

result.subscribe(x => console.log(x));

console.log('end');

// Logs:
// start
// end
// 10
// 20
// 30

ajax

异步产生数据

  • ajax
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    import { ajax } from 'rxjs/ajax';
    import { map, catchError } from 'rxjs/operators';
    import { of } from 'rxjs';

    const obs$ = ajax(`https://api.github.com/users?per_page=5`).pipe(
    map(userResponse => console.log('users: ', userResponse)),
    catchError(error => {
    console.log('error: ', error);
    return of(error);
    })
    );
  • getJson
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    import { ajax } from 'rxjs/ajax';
    import { map, catchError } from 'rxjs/operators';
    import { of } from 'rxjs';

    const obs$ = ajax.getJSON(`https://api.github.com/users?per_page=5`).pipe(
    map(userResponse => console.log('users: ', userResponse)),
    catchError(error => {
    console.log('error: ', error);
    return of(error);
    })
    );
  • 类jquery写法
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    import { ajax } from 'rxjs/ajax';
    import { of } from 'rxjs';

    const users = ajax({
    url: 'https://httpbin.org/delay/2',
    method: 'POST',
    headers: {
    'Content-Type': 'application/json',
    'rxjs-custom-header': 'Rxjs'
    },
    body: {
    rxjs: 'Hello World!'
    }
    }).pipe(
    map(response => console.log('response: ', response)),
    catchError(error => {
    console.log('error: ', error);
    return of(error);
    })
    );

3.转换型

mergeMap = map + mergeAll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import { of, interval } from 'rxjs';
import { mergeMap, map } from 'rxjs/operators';

const letters = of('a', 'b', 'c');
const result = letters.pipe(
mergeMap(x => interval(1000).pipe(map(i => x+i))),
);
result.subscribe(x => console.log(x));

//output
a0
b0
c0
a1
b1
c1

concatMap = map + concatAll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import { of } from 'rxjs';
import { concatMap, delay, mergeMap } from 'rxjs/operators';

const source = of(2000, 1000);
// map value from source into inner observable, when complete emit result and move to next
const example = source.pipe(
concatMap(val => of(`Delayed by: ${val}ms`).pipe(delay(val)))
);
const subscribe = example.subscribe(val =>
console.log(`With concatMap: ${val}`)
);

const mergeMapExample = source
.pipe(
// just so we can log this after the first example has run
delay(5000),
mergeMap(val => of(`Delayed by: ${val}ms`).pipe(delay(val)))
)
.subscribe(val => console.log(`With mergeMap: ${val}`));

//output
// With concatMap: Delayed by: 2000ms
// With concatMap: Delayed by: 1000ms
// With mergeMap: Delayed by: 1000ms 1s延迟短,提前输出
// With mergeMap: Delayed by: 2000ms


switchMap = map + switchAll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// RxJS v6+
import { timer, interval, of } from 'rxjs';
import { switchMap, tap } from 'rxjs/operators';


console.log("time 0....",new Date().toLocaleTimeString());
timer(1000,4000).pipe( //延迟一段时间,产生自增的整数
tap(() => {
console.log("time 1....",new Date().toLocaleTimeString());
}),
switchMap(
_ => interval(1000).pipe(tap((rs)=> {console.log('inner value....',rs)})), //特定间隔,产生递增整数
(outerValue, innerValue, outerIndex, innerIndex) => ({
outerValue,
innerValue,
outerIndex,
innerIndex
})
)
).subscribe((res) => {
console.log("final res....",res);
});

//output
time 0.... 5:21:59 PM
time 1.... 5:22:00 PM
inner value.... 0
final res.... { outerValue: 0, innerValue: 0, outerIndex: 0, innerIndex: 0 }
inner value.... 1
final res.... { outerValue: 0, innerValue: 1, outerIndex: 0, innerIndex: 1 }
inner value.... 2
final res.... { outerValue: 0, innerValue: 2, outerIndex: 0, innerIndex: 2 }
time 1.... 5:22:04 PM
inner value.... 0
final res.... { outerValue: 1, innerValue: 0, outerIndex: 1, innerIndex: 0 }

exhaustMap = map + exhaustAll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// RxJS v6+
import { interval } from 'rxjs';
import { exhaustMap, tap, take } from 'rxjs/operators';

const firstInterval = interval(1000).pipe(take(10));
const secondInterval = interval(1000).pipe(take(2));

const exhaustSub = firstInterval
.pipe(
exhaustMap(f => {
console.log(`Emission Corrected of first interval: ${f}`);
return secondInterval; //激活第二个,会忽略第一个
})
)
.subscribe(val => console.log(val));

//output
Emission Corrected of first interval: 0
0
1
Emission Corrected of first interval: 2
0
1
Emission Corrected of first interval: 4
0
1
Emission Corrected of first interval: 6
0
1
Emission Corrected of first interval: 8
0
1

mapTo

  • 修改值
    1
    2
    3
    4
    5
    6
    import { fromEvent } from 'rxjs';
    import { mapTo } from 'rxjs/operators';

    const clicks = fromEvent(document, 'click');
    const greetings = clicks.pipe(mapTo('Hi'));
    greetings.subscribe(x => console.log(x));

map

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import { combineLatest, of } from 'rxjs';
import { map, tap } from 'rxjs/operators';

const weight = of(70, 72, 76, 79, 75);
const height = of(1.76, 1.77, 1.78,1.8);
const bmi = combineLatest(weight, height).pipe(
tap(([w,h]) => {
console.log(`w:${w},h:${h}`);
}),
map(([w, h]) => w / (h * h)),
);
bmi.subscribe(x => console.log('BMI is ' + x));

// With output to console:
w:75,h:1.76
BMI is 24.212293388429753
w:75,h:1.77
BMI is 23.93948099205209
w:75,h:1.78
BMI is 23.671253629592222
w:75,h:1.8
BMI is 23.148148148148145

4.联合型

combineLatest

适用场景: 依赖多个输入,产出新的结论

数据整合交给下游
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
 import { timer, combineLatest } from 'rxjs';

// timerOne emits first value at 1s, then once every 4s
const timerOne$ = timer(1000, 4000);
// timerTwo emits first value at 2s, then once every 4s
const timerTwo$ = timer(2000, 4000);
// timerThree emits first value at 3s, then once every 4s
const timerThree$ = timer(3000, 4000);

// when one timer emits, emit the latest values from each timer as an array
combineLatest(timerOne$, timerTwo$, timerThree$).subscribe(
([timerValOne, timerValTwo, timerValThree]) => {
/*
Example:
timerThree first tick: 'Timer One Latest: 0, Timer Two Latest: 0, Timer Three Latest: 0
timerOne second tick: 'Timer One Latest: 1, Timer Two Latest: 0, Timer Three Latest: 0
timerTwo second tick: 'Timer One Latest: 1, Timer Two Latest: 1, Timer Three Latest: 0
*/
console.log(
`Timer One Latest: ${timerValOne},
Timer Two Latest: ${timerValTwo},
Timer Three Latest: ${timerValThree}`
);
}
);
整合好数据,交给下游

入参observabel数组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 // RxJS v6+
import { timer, combineLatest } from 'rxjs';

const timerOne$ = timer(1000, 4000);
const timerTwo$ = timer(2000, 4000);
const timerThree$ = timer(3000, 4000);

combineLatest(
timerOne$,
timerTwo$,
timerThree$,
// combineLatest also takes an optional projection function
(one, two, three) => {
return `Timer One (Proj) Latest: ${one},
Timer Two (Proj) Latest: ${two},
Timer Three (Proj) Latest: ${three}`;
}
).subscribe(console.log);

concat - 队列形式

适用于: 先到先得,队列形式数据处理
返回Observable

前一个不结束,后一个永无机会
1
2
3
4
5
6
7
// RxJS v6+
import { interval, of, concat } from 'rxjs';

// when source never completes, any subsequent observables never run
concat(interval(1000), of('This', 'Never', 'Runs'))
// log: 1,2,3,4.....
.subscribe(console.log);
倒计时
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// RxJS v6+
import { concat, empty } from 'rxjs';
import { delay, startWith } from 'rxjs/operators';

// elems
const userMessage = document.getElementById('message');
// helper
const delayedMessage = (message, delayedTime = 1000) => {
return empty().pipe(startWith(message), delay(delayedTime));
};

concat(
delayedMessage('Get Ready!'),
delayedMessage(3),
delayedMessage(2),
delayedMessage(1),
delayedMessage('Go!'),
delayedMessage('', 2000)
).subscribe((message: any) => (userMessage.innerHTML = message));

merge - 先到先输出

先到先输出,不论书写位置,一个个输出,非数组形式
返回Observable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import { of, merge, concat } from 'rxjs';
import { mapTo, delay, concatAll, mergeAll } from 'rxjs/operators';

//emit one item
const example = of(null);

merge(
example.pipe(mapTo('Hello --- 1')),
example.pipe(mapTo('World1!--- 1'),delay(1300)),
example.pipe(mapTo('Goodbye --- 1'),delay(500)),
example.pipe(mapTo('World!2 -- 1'),delay(300))
).subscribe(val => console.log(val));;

//output
Hello --- 1
World!2 -- 1
Goodbye --- 1
World1!--- 1

exhaust

返回Observable

withLatestFrom

适用于: 多个输入源,但是只有一个主导
返回 OperatorFunction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import { timeInterval, timeout, withLatestFrom } from "rxjs/operators";
import { interval, of } from "rxjs";

const seconds = interval(1000);

const first = interval(500);

const obs$ = first.pipe(withLatestFrom(seconds));
obs$.subscribe(res => {
console.log("res...",res);
});

//output
res... [ 1, 0 ]
res... [ 2, 0 ]
res... [ 3, 1 ]
res... [ 4, 1 ]
res... [ 5, 2 ]
res... [ 6, 2 ]
res... [ 7, 3 ]
res... [ 8, 3 ]
res... [ 9, 4 ]

concatAll

返回 OperatorFunction

mergeAll

返回 OperatorFunction

exhaustAll

返回 OperatorFunction

switchAll

返回 OperatorFunction

forkJoin - 只取最终值

入参observabel数组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import { forkJoin, of, timer } from 'rxjs';

const observable = forkJoin({
foo: of(1, 2, 3, 4),
bar: Promise.resolve(8),
baz: timer(4000),
});
observable.subscribe({
next: value => console.log(value),
complete: () => console.log('This is how it ends!'),
});

// Logs:
// { foo: 4, bar: 8, baz: 0 } after 4 seconds
// "This is how it ends!" immediately after

startWith

适用于: 添加特定的数据

添加单个前置数据 - hello world
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// RxJS v6+
import { startWith, scan } from 'rxjs/operators';
import { of } from 'rxjs';

//emit ('World!', 'Goodbye', 'World!')
const source = of('World!', 'Goodbye', 'World!');
//start with 'Hello', concat current string to previous
const example = source.pipe(
startWith('Hello'),
scan((acc, curr) => `${acc} ${curr}`)
);
/*
output:
"Hello"
"Hello World!"
"Hello World! Goodbye"
"Hello World! Goodbye World!"
*/
const subscribe = example.subscribe(val => console.log(val));
添加多个前置数据
1
2
3
4
5
6
7
8
9
10
// RxJS v6+
import { startWith } from 'rxjs/operators';
import { interval } from 'rxjs';

//emit values in sequence every 1s
const source = interval(1000);
//start with -3, -2, -1
const example = source.pipe(startWith(-3, -2, -1));
//output: -3, -2, -1, 0, 1, 2....
const subscribe = example.subscribe(val => console.log(val));

5.过滤型

filter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import { from } from 'rxjs';
import { filter } from 'rxjs/operators';

const source = from([
{ name: 'Joe', age: 31 },
{ name: 'Bob', age: 25 }
]);
const example = source.pipe(filter((person,index) => {
const res = person.age >= 30;
console.log(`person info.....`,person,index);
return res;
} ));
const subscribe = example.subscribe(val => console.log(`final result Over 30: ${val.name}`));

//output
person info..... { name: 'Joe', age: 31 } 0
final result Over 30: Joe
person info..... { name: 'Bob', age: 25 } 1

6.条件判定

every

  • 每一个都需要满足条件才true
    1
    2
    3
    4
    5
    6
    7
    import { of } from 'rxjs';
    import { every } from 'rxjs/operators';

    of(1, 2, 3, 4, 5, 6).pipe(
    every(x => x < 5),
    )
    .subscribe(x => console.log(x)); // -> false

find、findIndex

  • 找到第一个满足条件的就行
    1
    2
    3
    4
    5
    6
    import { fromEvent } from 'rxjs';
    import { find } from 'rxjs/operators';

    const clicks = fromEvent(document, 'click');
    const result = clicks.pipe(find(ev => ev.target.tagName === 'DIV'));
    result.subscribe(x => console.log(x));

isEmpty

  • 判断Observable是否为空
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    import { Subject } from 'rxjs';
    import { isEmpty } from 'rxjs/operators';

    const source = new Subject<string>();
    const result = source.pipe(isEmpty());
    source.subscribe(x => console.log(x));
    result.subscribe(x => console.log(x));
    source.next('a');
    source.next('b');
    source.next('c');
    source.complete();

    // Results in:
    // a
    // false
    // b
    // c

iif

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import { iif, of, interval } from 'rxjs';
import { mergeMap } from 'rxjs/operators';

const r$ = of('R');
const x$ = of('X');

interval(1000)
.pipe(mergeMap(v => iif(() => v % 4 === 0, r$, x$)))
.subscribe(console.log);
//output
R
X
R
X

defaultIfEmpty

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import { defaultIfEmpty } from 'rxjs/operators';
import { empty } from 'rxjs';
import { of, merge } from 'rxjs';

const exampleOne = of().pipe(defaultIfEmpty('Observable.of() Empty!'));
const example = empty().pipe(defaultIfEmpty('Observable.empty()!'));

merge(
example,
exampleOne
).subscribe(console.log);

//output
Observable.empty()!
Observable.of() Empty!

7.异常处理

catchError

单一处理
1
2
3
4
5
6
7
8
import { throwError, of } from 'rxjs';
import { catchError } from 'rxjs/operators';
//emit error
const source = throwError('This is an error!');
//gracefully handle error, returning observable with error message
const example = source.pipe(catchError(val => of(`I caught: ${val}`)));
//output: 'I caught: This is an error'
const subscribe = example.subscribe(val => console.log(val));
整合其他operator
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import { throwError, fromEvent, of } from 'rxjs';
import {
catchError,
tap,
switchMap,
mergeMap,
concatMap,
exhaustMap
} from 'rxjs/operators';

const fakeRequest$ = of().pipe(
tap(_ => console.log('fakeRequest')),
throwError
);

const iWillContinueListening$ = fromEvent(
document.getElementById('continued'),
'click'
).pipe(
switchMap(_ => fakeRequest$.pipe(catchError(_ => of('keep on clicking!!!'))))
);

const iWillStopListening$ = fromEvent(
document.getElementById('stopped'),
'click'
).pipe(
switchMap(_ => fakeRequest$),
catchError(_ => of('no more requests!!!'))
);

iWillContinueListening$.subscribe(console.log);
iWillStopListening$.subscribe(console.log);

retry

设置重试次数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
 // RxJS v6+
import { interval, of, throwError } from 'rxjs';
import { mergeMap, retry } from 'rxjs/operators';

//emit value every 1s
const source = interval(1000);
const example = source.pipe(
mergeMap(val => {
//throw error for demonstration
if (val > 5) {
return throwError('Error!');
}
return of(val);
}),
//retry 2 times on error
retry(2)
);
/*
output:
0..1..2..3..4..5..
0..1..2..3..4..5..
0..1..2..3..4..5..
"Error!: Retried 2 times then quit!"
*/
const subscribe = example.subscribe({
next: val => console.log(val),
error: val => console.log(`${val}: Retried 2 times then quit!`)
});

retryWhen

delayWhen
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
 // RxJS v6+
import { timer, interval } from 'rxjs';
import { map, tap, retryWhen, delayWhen } from 'rxjs/operators';

//emit value every 1s
const source = interval(1000);
const example = source.pipe(
map(val => {
if (val > 5) {
//error will be picked up by retryWhen
throw val;
}
return val;
}),
retryWhen(errors =>
errors.pipe(
//log error message
tap(val => console.log(`Value ${val} was too high!`)),
//restart in 6 seconds
delayWhen(val => timer(val * 1000))
)
)
);
/*
output:
0
1
2
3
4
5
"Value 6 was too high!"
--Wait 6 seconds then repeat
*/
const subscribe = example.subscribe(val => console.log(val));

8.自定义类型

参考

Rxjs

使用支付宝打赏
使用微信打赏

若你觉得我的文章对你有帮助,欢迎点击上方按钮对我打赏