skip to content

mergeMap vs exhaustMap vs switchMap vs concatMap

 

Source that emits at 5ms, 10ms, 20ms will be *Mapped to a timer(0, 3), limited to 3 emissions
Also, see these dedicated playgrounds for mergeMap, switchMap, concatMap, and exhaustMap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
const { rxObserver, palette } = require('api/v0.3');
const { from, timer, pipe } = require('rxjs');
const { zip, take, map, mergeMap, switchMap, concatMap, exhaustMap, tap, delayWhen } = require('rxjs/operators');


// our source$ will emit values at 5ms, 10ms, 20ms
const source$ = fromDelayed([ 5, 10, 20 ]).pipe(
    zip(from(palette), Marble) // colorize each item
  );

// target$ that we'll be mapping to
const target$ = timer(0, 3).pipe(take(3));

const mergeMap$ = source$.pipe(
    mergeMap(x => target$.pipe(colorize(x.color))) // colorize as source$ value
  );

const exhaustMap$ = source$.pipe(
    exhaustMap(x => target$.pipe(colorize(x.color)))
  );

const switchMap$ = source$.pipe(
    switchMap(x => target$.pipe(colorize(x.color)))
  );

const concatMap$ = source$.pipe(
    concatMap(x => target$.pipe(colorize(x.color)))
  );


// visualization
source$.subscribe(rxObserver('[source$] A stream that emits at [5ms, 10ms, 20ms]'));
target$
  .pipe(colorize('#ff5073'))
  .subscribe(rxObserver('[target$] will be mapped to a timer that emits at [N+0ms, N+3ms, N+6ms]'));
mergeMap$.subscribe(rxObserver('mergeMap'));
exhaustMap$.subscribe(rxObserver('exhaustMap'));
switchMap$.subscribe(rxObserver('switchMap'));
concatMap$.subscribe(rxObserver('concatMap'));


// helpers
function colorize(color) {
  return pipe(
    map(y => Marble(y, color))
  );
}

// creates a colored Marble
function Marble(value,color) {
  return {
    valueOf: ()=>value
    , color
  };
}

// like .from, but items are delayed by their value
function fromDelayed (arr) {
  return from(arr).pipe(
      delayWhen(x=>timer(x))
    );
}

0ms[source$] A stream that emits at [5ms, 10ms, 20ms]startcomplete55 1010 2020 [target$] will be mapped to a timer that emits at [N+0ms, N+3ms, N+6ms]startcomplete00 11 22 mergeMapstartcomplete00 11 00 22 11 22 00 11 22 exhaustMapstartcomplete00 11 22 00 11 22 switchMapstartcomplete00 11 00 11 22 00 11 22 concatMapstartcomplete00 11 22 00 11 22 00 11 22