skip to content

forEach

 

forEach function let's us iterate over stream emissions. It takes a function to handle next stream events and returns a Promise to handle error and complete events.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
const { rxObserver } = require('api/v0.3');
const { interval } = require('rxjs');
const { take } = require('rxjs/operators');


// observer is needed to draw marble diagram
const observer = rxObserver();

interval(5).pipe(
  take(10)
)
  // forEach returns a promise
  .forEach(observer.next)
  // it will resolve on Observable complete
  //     and error   on Observable error
  .then(observer.complete, observer.error);
0msstart00 11 22 33 44 55 66 77 88 99

Here's a more convenient usecase with async / await approach:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
const { rxObserver } = require('api/v0.3');
const { interval } = require('rxjs');
const { take } = require('rxjs/operators');


doAsyncWork();

// define async function
async function doAsyncWork(){
  // observer is needed to draw marble diagram
  const observer = rxObserver();

  // create observable and iterate over it
  await interval(5).pipe(
    take(10)
  )
    .forEach(observer.next);
  
  // indicate that it's completed
  observer.complete();
}
0msstart00 11 22 33 44 55 66 77 88 99