RxJS Mastery – #23 merge

RxJS merge operator lesson title

The RxJS merge operator is another join creation operator. It does exactly what the name suggests: just merging multiple input streams into one (without any transformation). The Observable created by merge only completes once all input Observables have completed.

RxJS merge explained 🎓

Let’s merge two input Observables that output a value 1s each:

const mergeResult$ = merge(
    interval(1000).pipe(take(3)),
    interval(1000).pipe(take(2)),
);

mergeResult$.subscribe(console.log);
// 0, 0, 1, 1, 2

The first input Observable is emitting 3 values while the second one only emits 2 values. In the Observable created by the merge operator the values are just emitted without any transformation or special preference. So, it’s basically like a funnel:

rxjs merge operator

Should one of the input Observables emit an error then the final Observable also emits the error.

What problems does the RxJS merge operator solve? 🚧

The merge operator allows you to turn multiple Observables into one. The resulting Observable that is created by the merge operator makes it look like the values are just coming from a single source.

You use merge if the order in which the values are emitted are not relevant for you. On the opposite if you wanted to have each Observable combined in order then concat would be the better choice. If it’s important to get at least one value of each input combineLatest is your friend. Finally, you can use forkJoin if only the last emitted value of each input Observable matters. So, merge really doesn’t care about such things and just takes everything it gets and forwards. Merge is simple.

How to test the merge operator🚦

As almost always also code around the merge operator is easy to test with RxJS marbles. For below example please just be aware that every character in the diagram takes also 1ms. Hence after “1s (aa)” not 1s is gone, but actually 1.004s. The 4 characters of “(aa)” stand for 4ms. The emission of a happens on 1s though.
So, after the initial 1.004 we wait another 996ms to get to 2s. At 2s the second value of each input stream is emitted, and so on.

it('marbles testing', () => {
    testScheduler.run((helpers) => {
        const { expectObservable } = helpers;

        const mergeResult$ = merge(
            interval(1000).pipe(take(3)),
            interval(1000).pipe(take(2)),
        );

        expectObservable(mergeResult$).toBe(
            '1s (aa) 996ms (bb) 996ms (c|)',
            { a: 0, b: 1, c: 2}
        )
    });
});

Exercise for the merge operator 💪

What happens if an error is emitted by one of two input Observables after 1.5s? How can you test that elegantly with RxJS marbles?

const mergeResult$ = merge(
    interval(1000).pipe(take(3)),
    cold('1.5s #')
);

As always the code examples and exercise solution can be found on GitHub.