RxJS Mastery – #56 mergeAll

RxJS mergeAll

The RxJS mergeAll operator flattens higher-order Observable. It considers all values in the order they occur, independent of the inner Observable.

RxJS mergeAll just flattens

While concatAll and exhaustAll have some special semantics, mergeAll is rather simple and therefore also often used.

Below there is a source observable that emits the values a, b, c, and d twice each in an inner Observable.

const indexedValues = ['a', 'b', 'c', 'd'];

const source$ = interval(100).pipe(
    take(4),
    map((outerV) => interval(100).pipe(
        map(_ => indexedValues[outerV]),
        take(2)),
    )
);
const result$ = source$.pipe(mergeAll());

expectObservable(result$).toBe(
    '200ms a 99ms (ab) 96ms (bc) 96ms (cd) 96ms (d|)',
    { a: 'a', b: 'b', c: 'c', d: 'd' }
);

If those values are piped through the mergeAll operator, they are delivered concurrently. That means that two values can also occur in the same time frame (see ab, bc and cd).

MergeAll has a concurrency parameter

MergeAll has a single parameter defined as concurrent: number = Infinity. This parameter controls the maximum number of inner Observables being subscribed to concurrently. By default, there is no restriction. On the other end, a concurrent parameter of 1 is basically concatMap because then all the inner Observables are taken into account sequentially. The parameter can be helpful to not subscribe to too many things and keep the application’s performance on a good level.

Exercise for the RxJS mergeAll operator 💪

In the below examples mergeAll is used to combine news and ads from various sources. This is one of the most common use cases of mergeAll, i.e. flattening short-lived Observables into a single output stream. Your task is to define the marbles timing for the feed$ where all the sources are merged:

const { expectObservable, cold } = helpers;

const fetchNews = (source): Observable<string> => cold('99ms r|', { r: 'News from ' + source });
const fetchAds = (type): Observable<string> => cold('10ms r|', { r: 'Ad for ' + type });

const newsDef = { type: 'news', value: 'Source'};
const adsDef = { type: 'ads', value: 'Product'};

const source$ = cold('nan|', { n: newsDef, a: adsDef }).pipe(
    map(def => def.type === 'news' ? fetchNews(def.value) : fetchAds(def.value)),
);
const feed$ = source$.pipe(mergeAll());

expectObservable(feed$).toBe(
    'define the output timings',
    { a: 'Ad for Product', n: 'News from Source' }
);

This post is part of the RxJS mastery series. As always the code examples can be found on GitHub.