expand
recursively turns each emission into another stream:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
const { rxObserver } = require('api/v0.3');
const { of, timer, EMPTY } = require('rxjs');
const { expand, take, mapTo } = require('rxjs/operators');
const T = 5;
of(1).pipe(
expand(value =>
value < T
? timer(T, T).pipe(
take(value + 1),
mapTo(value + 1)
)
: EMPTY
)
)
.subscribe(rxObserver());