RxJS Mastery – #28 concatMap

RxJS concatMap operator lesson title

The RxJS ConcatMap operator is one of the transformation operators that is also flatting the Observables. Flatting means that it subscribes to an inner Observable and takes care that the bare values are returned in the end. There is also the concatMapTo operator but it is only available until v8 of the RxJS library. Starting with RxJS v9 you should use concatMap(() => result) to achieve the same.

RxJS concatMap works sequentially

As a transformation operator concatMap is usually applied in the pipe(). That means we are invoking the pipe function on an Observable and passing transformation operators as arguments of pipe:

const result$ = of(1,2,3).pipe(
    concatMap((v: number) => of(`value ${v}`))
);

result$.subscribe(console.log);
// value 1
// value 2
// value 3

The important aspect of flattening operators, like concatMap, is that they can deal with inner Observables. Here the inner Observable is the of(`value ${v}`). In RxJS “concat” means that something happens sequentially. In this case concatMap sequentially subscribes to the inner Observables. It does not subscribe to the next Observable until the previous one completes.

Therefore the behaviour is as follows:

  • The subscription on the result$ triggers the subscription on of(1,2,3) and those values are emitted.
  • concatMap has a first inner Observable to work on by mapping the number 1. It maps into and subscribes to “of(value 1)” and waits until it completes.
  • When an inner Observable emits a value that value is passed down to the observer. In above example the inner Observable already completes after emitting one value.
  • Because concatMap is done with the first value and the first inner Observable has completed, it subscribes to the second one. If the previous inner Observable would not have completed yet, the next one would wait in the queue.
  • If the source Observable, i.e. of(1,2,3), and all inner Observables have completed also concatMap and therefore result$ completes.

To summarise, concatMap is a combination of map (to an inner Observable) and concat (working sequentially on those inner Observables).

Use RxJS concatMap to work sequentially on async task

There are other transformation operators similar to concatMap, e.g. mergeMap. The concat variant should be used whenever the inner Observables should be worked on in sequence (as shown above). And usually plain values are mapped to such inner streams. In the example below there is a stream of IDs of(1,2,3). For each ID we want to execute an http request to update a record belonging to that ID. But it’s important that the http request happen in order, meaning that first the ID 1 record is updated, before the ID 2 record’s request starts. However, as soon as a record is updated we want to know that in the final output. Here concatMap exactly fits those requirements:

it('works on async task in sequential fashion', () => {
    const update: (id: number) => Observable<string> = 
                                       (id: number) => of(id + ' updated');

    const result$ = of(1,2,3).pipe(
        concatMap((id: number) => update(id))
    );

    result$.subscribe(console.log);
    // 1 updated
    // 2 updated
    // 3 updated
});

Marbles in sequence for the testing of concatMap

As always, testing with RxJS marbles is quite straightforward. In the case of concatMap the sequence can be visualised nicely by the marble diagram:

it('marbles testing', () => {
    testScheduler.run((helpers) => {
        const { expectObservable } = helpers;

        const result$ = interval(1000).pipe(
            take(2),
            concatMap((v: number) => interval(1).pipe(take(2)))
        );

        expectObservable(result$).toBe(
            '1001ms ab 998ms c(d|)',
            { a: 0, b: 1, c: 0, d: 1}
        );
    });
});

Every second the source Observable emits thanks to interval(1000). This triggers a subscription on the inner Observable interval(1). That means after 1s 0 and then 1 is emitted. And the same happens after 2s. At the same time of the last emission also the Observable completes, shown by the pipe | in the marbles diagram.

Exercise for the concatMap operator 💪

Implement concatMap by reusing mergeMap. You don’t have to consider the resultSelector parameter that would transform the result.

As always the code examples and exercise solution can be found on GitHub.