RxJS Mastery – #21 concat

RxJS concat operator lesson title

The RxJS concat operator is a join creation operator. Use it whenever you deal with multiple Observables and you want to have them processed in sequence. A new subscription will not happen until the previous subscribed Observable has completed.

RxJS concat explained 🎓

The concat operator accepts multiple Observables as parameter.

it('should emit in order', () => {
    concat(
        of(1, 2, 3),
        of('a', 'b'),
    ).subscribe({ next: console.log });
});

Above code emits the following:

1
2
3
a
b

As said the concat works on Observables in sequence. The following drawing also illustrates that:

RxJs concat

The first Observable that provided the values a, b and c is already completed and concat did notify its observer with next notifications about those values. Currently concat is subscribed to the second Observable that is emitting the values 1 and 2. The last Observable passed to concat in above case would emit x, y and z. But this Observable is currently “sleeping”. The concat operator did not subscribe yet to it. Therefore this last input Observable also didn’t start its work yet.

What problems does the RxJS concat operator solve? 🚧

As discussed above the concat operator should be used if you want a sequential subscription to input Observables. If you want concurrent subscriptions you would use the merge operator. The nature of concat for example helps if you want to execute http requests in sequence. Given that the http requests are represented by Observables:

it('should execute "requests" in order', () => {
    const updateRequest$ = of("updated 1");
    const fetchRequest$ = of([
         {"id": 1, "value": true},
         {"id": 2, "value": false}
    ]);

    concat(
        updateRequest$,
        fetchRequest$,
    ).subscribe({ next: console.log });
    // updated 1
    // [ { id: 1, value: true }, { id: 2, value: false } ]
});

How to test the concat operator🚦

Nothing very special to pay attention to when testing code around concat. Nevertheless we can have a look at the following example:

it('marbles testing', () => {
    testScheduler.run((helpers) => {
        const { expectObservable, cold } = helpers;

        const concatResult$ = concat(
            interval(1000).pipe(take(3)),
            of(3, 4),
        );

        expectObservable(concatResult$).toBe(
            '1s a 999ms b 999ms (cde|)',
            { a: 0, b: 1, c: 2, d: 3, e: 4}
        );

    });
});

Here we concat two input Observables. One is based on interval that emits every 1s and is limited to 3 values by the take operator. The second input Observable is based on the of operator which emits immediately.
Hence, the marble diagram for this test specifies the 1s waiting time or the 999ms and, e.g. “b” which also sums up to 1s. On the other hand when c (or value 2) is emitted the first Observable completes. The concat operator then immediately goes over to the next Observable. Because the next Observable is based on of the emission of 2, 3 and 4 happens at the same time. And also at the same time the whole Observable created by concat completes.

Exercise for the concat operator 💪

What happens if an input Observable for concat is running into an error? Are the later values omitted? Demonstrate in a test what happens in such a case. You could for example use the following situation:

const concatResult$ = concat(
    interval(1000).pipe(take(3)),
    cold('#'),
    of(3, 4),
);

As always the code examples and exercise solution can be found on GitHub.