RxJS Mastery – #20 combineLatest

RxJS combineLatest operator lesson title

The RxJS combineLatest operator is a join creation operator. That means it is an operator that emits values of multiple source Observables.

RxJS combineLatest explained 🎓

The parameter passed to the combineLatest() could be an array of Observable or also a dictionary of Observables.

combineLatest(arg: T): Observable<unknown>

Let’s have a look at a simple example where we pass an array of two Observables:

combineLatest(
    of(1, 2, 3),
    of('a', 'b'),
).subscribe({ next: console.log });
// [ 3, 'a' ]
// [ 3, 'b' ]

The following happens:

  • combineLatest internally subscribes to the passed Observables in order, i.e. first it subscribes to of(1,2,3) and then to of(‘a’, ‘b’)
  • Now it tries to combine the latest emitted values. of(1,2,3) emits immediately and hence the latest emitted value is 3. To be able to emit something combineLatest now needs a value from the second Observable. The first value that is arriving from the second Observable is ‘a’. The operator waited for all input Observables to emit at least once and can now emit [3, ‘a’]
  • The second input Observable, of(‘a’, ‘b’), didn’t emit the latest value yet. Therefore a second emission is happening and combineLatest returns an Observable [3, ‘b’].

So, combineLatest will not emit any value until each input Observable emits at least a value.

Let’s use RxJS marbles to understand exactly how the operator is working:

it('should emit the combined values', () => {
    testScheduler.run((helpers) => {
        const { expectObservable, cold } = helpers;

        expectObservable(combineLatest(
            cold(        '-q-----r|'),
            cold(        'xx----y|')
        ).pipe(
            tap(console.log))
        ).toBe('-(aa)-bc|',
            {
                     a: ['q', 'x'],
                     b: ['q', 'y'],
                     c: ['r', 'y'],
            }
        );
    });
});

Attention: (aa) emits twice the value a synchronously in the same frame and then advances virtual time by 4 frames. This is because ‘(aa)’.length === 4 (see also the paragraph about Marble Syntax here). Nevertheless we should see how the operator works:

  • Frame 1: not all input Observables have emitted a value yet. Only the second input emitted x. Hence, combineLatest cannot emit yet.
  • Frame 2: the first input Observable has also emitted it’s first value. Now combineLatest can emit the first combination which is [‘q’, ‘x’].
  • Frame 7: the second input Observable emits another value y. Therefore there is a new combination that can be emitted by combineLatest. It takes the latest value q from the first input, combines it with y, and emits [‘q’, ‘y’].
  • Frame 8: the first Observable emits another value r. Although the second Observable is complete the combineLatest operator still emits the latest combination: [‘r’, ‘y’].
  • Frame 9: the last “active” input Observable has also completed and that is why also the Observable returned by combineLatest completes.

The following drawing should also visualise and explain the operator well:

What problems does the RxJS combineLatest operator solve? 🚧

The RxJS combineLatest operator is used when you have multiple long running Observables that you want to combine. And importantly you should be aware that an emission happens every time an input Observable is emitting (see also the zip operator if you want a different behaviour).

We have a look at a simple example. What if you have 3 form fields and you want to consider all 3 values of them whenever one of the values would change? For below example we have 3 input Observables representing the values of the red, green, and blue colors. In our case we define them as BehaviorSubjects but they could also be Observables emitting values of form fields.

Every time one of the red, green or blue values changes we want to calculate the final rgb(r,g,b) value. In such a case we can use combineLatest:

const initialValue = 0;
const red$ = new BehaviorSubject<number>(initialValue);
const green$ = new BehaviorSubject<number>(initialValue);
const blue$ = new BehaviorSubject<number>(initialValue);

const rgb$ = combineLatest(red$, green$, blue$).pipe(
    map(([r, g, b]: [number, number, number]) => `rgb(${r},${g},${b})`)
);

rgb$.subscribe(result => console.log(result));

green$.next(100);
blue$.next(255);
green$.next(50);
red$.next(10);

This would deliver the following output:

rgb(0,0,0)
rgb(0,100,0)
rgb(0,100,255)
rgb(0,50,255)
rgb(10,50,255)

Like that we always get the final color based on the latest values of each color component.

How to test the combineLatest operator🚦

Code around combineLatest can be tested as shown above.

Exercise for the combineLatest operator 💪

What happens if one of many input Observables used in combineLatest is emitting an error? Write a rxjs marbles test to demonstrate how the output Observable is looking like. Are values emitted by other input Observables still considered or is the output Observable terminating with an error?

As always the code examples and exercise solution can be found on GitHub.