RxJS Mastery – #22 forkJoin

RxJS forkJoin operator lesson title

The RxJS forkJoin operator is another join creation operator. This operator emits at most once. And the values that are emitted are the last values of the input Observables. We can either pass an array of Observables or a dictionary of Observables as arguments of forkJoin.

RxJS forkJoin explained 🎓

Let’s have a look at the following example with two input Observables. The first one emits 3 values while the second one only emits two. But this doesn’t really matter as long as a value is emitted and each input Observable completes.

forkJoin(
    of(1,2,3),
    of('a', 'b'),
).subscribe(console.log);
// [3, 'b']

Above code emits the following:

[3, 'b']

The last values are emitted in an array in the order of the input Observables. Also the Observable created by forkJoin completes.

Also in the following picture the emission is clearly indicated. The last value of each of the three input Observables ends up in the output. Immediate values are “lost”. Please also note that the subscription to all input Observables happens at the beginning at the same time.

What problems does the RxJS forkJoin operator solve? 🚧

The forkJoin operator can be used whenever you are only interested in the last value emitted by each of multiple Observables. For example you could use forkJoin to trigger multiple http requests or another asynchronous operations and only if all of them have completed your forkJoin would also emit (and complete).

Let’s take this as a simplified example:

forkJoin({
    post: of({ title: 'RxJS forkJoin', content: 'forkJoin is used to combine...'}),
    likes: of(99),
}).subscribe(console.log);

We execute two requests for a post and its likes on a webpage. The corresponding requests (here simplified by of) are passed in a dictionary to forkJoin. The operator then executes each request in parallel fashion and waits until all have completed.
And the dictionary input is also giving us a nice output. The results are mapped onto the keys:

{
  post: { 
    title: 'RxJS forkJoin', 
    content: 'forkJoin is used to combine...' 
  },
  likes: 99
}

It’s important to ensure that all Observables passed into forkJoin eventually complete. Otherwise forkJoin would “run” forever.

How to test the forkJoin operator🚦

Code around forkJoin can be tested like most other RxJS cases. By using RxJS marbles we can clearly specify also the timing of the emission (using ms or s) and the completion (using the pipe):

it('marbles testing', () => {
    testScheduler.run((helpers) => {
        const { expectObservable } = helpers;

        const forkJoinResult$ = forkJoin(
            interval(1000).pipe(take(3)),
            of('a', 'b'),
        );

        expectObservable(forkJoinResult$).toBe(
            '3s (a|)',
            { a: [2, 'b']}
        );
    });
});

In above example the second input Observable based on of is emitting and completing immediately. Because the first input Observable takes a little bit more time also the result of forkJoin will only be emitted later. After 3 seconds interval’s emissions is also done and completes. This is the signal for the forkJoin operator to also emit the values as well as the complete notification at the same time.

Exercise for the forkJoin operator 💪

What happens if an input Observable for forkJoin is running into an error? Demonstrate in a test what happens in such a case. You could for example use the following situation:

const forkJoinResult$ = forkJoin(
    interval(1000).pipe(take(3)),
    cold('#'),
    of(3, 4),
);

As always the code examples and exercise solution can be found on GitHub.