forEach

 

forEach function let's us iterate over stream emissions. It takes a function to handle next stream events and returns a Promise to handle error and complete events.

const { rxObserver } = require('api/v0.3');
const { interval } = require('rxjs');
const { take } = require('rxjs/operators');


// observer is needed to draw marble diagram
const observer = rxObserver();

interval(5).pipe(
  take(10)
)
  // forEach returns a promise
  .forEach(observer.next)
  // it will resolve on Observable complete
  //     and error   on Observable error
  .then(observer.complete, observer.error);

Here's a more convenient usecase with async / await approach:

const { rxObserver } = require('api/v0.3');
const { interval } = require('rxjs');
const { take } = require('rxjs/operators');


doAsyncWork();

// define async function
async function doAsyncWork(){
  // observer is needed to draw marble diagram
  const observer = rxObserver();

  // create observable and iterate over it
  await interval(5).pipe(
    take(10)
  )
    .forEach(observer.next);
  
  // indicate that it's completed
  observer.complete();
}