RxJS Mastery – #37 switchScan

The RxJS switchScan operator applies a reducer function to each value emitted by the source. This is similar to scan. But in the switchScan case the reducer function itself works with Observables. So, this is another one of those flattening operators that deals with higher-order Observables. And similar to switchMap only the most recent Observable returned by the reducer function is merged into the output stream (or outer Observable).

switchScan<T, R, O extends ObservableInput<any>>(accumulator: (acc: R, value: T, index: number) => O, seed: R): OperatorFunction<T, ObservedValueOf<O>>

If the seed value is not defined the first emitted value is taken as “seed”.

Sum the numbers from the source

Similar to scan the switchScan operator can keep a certain state thanks to its reducer function. The reducer function here returns an Observable though, i.e. of(acc + curr).

of(1,2,3,4).pipe(
    switchScan((acc, curr) => of(acc + curr), 0)
).subscribe(console.log);

// 1
// 3
// 6
// 10

The most recent value from that Observable is taken to be merged into the outer Observable. In above case the of() only returns one value. Hence, the switch logic isn’t really coming into play. Let’s see another example in the next section.

RxJS switchScan emits values only from the most recently returned Observable

So, let’s see the effect of the switch and construct an example that checks on the most recent returned Observable. This actually took me some time until I found a more or less good example. It’s also not so easy to understand on first sight. Hence, we have to go through it step by step.

Our async functionality – an upload

Our example will use an upload functionality that is defined below as Observable. It takes some resource IDs as input and starts the upload for those IDs. The more IDs it gets the longer the upload takes. An upload of 4 IDs is an exception though because in that case the upload is very quick (deliberately to demonstrate one point).

const upload = (ids: number[]) => new Observable(observer => {
    observer.next(`START upload ${ids.join(',')}`);

    // more ids take longer to upload, but exactly 4 elements are very quick ;)
    setTimeout(() => {
        observer.next(`FINISHED upload of ${ids.join(',')}`);
        observer.complete();
    }, ids.length === 4 ? 1 : ids.length * 100);

    return () => {
        console.log(`TEARDOWN upload of ${ids.join(',')}`);
    }
});

The Observable sends messages to the Observer when:

  • the upload starts
  • shortly before the upload finishes
  • the Observable is teared down (which happens in case of cancellation or completion)

The usage of switchScan in this upload example

Actually, our target is to upload. Furthermore we should create a journal to document what happens with all the uploads. During our demo we get 4 upload requests with different sets of IDs. Those requests arrive every 5ms.

interval(5).pipe(
    map(n => {
        if (n == 0) { return [1,2] }
        else if (n == 1) { return [1,2,3] }
        else if (n == 2) { return [1,2,3,4] }
        else if (n == 3) { return [1,2,3,4,5] }
    }),
    take(4),
    filter(Boolean),
    switchScan((acc, curr) => upload(curr).pipe(
        map(result => `${acc} ${result}`)
    ),
'')
).subscribe(console.log);

The first map is taking care of the different inputs we’re getting. It’s basically like that for demo purposes. But in a real application that could be user actions. Those inputs (or user actions) change very quickly until the user is happy with uploading [1,2,3,4,5]. The switchScan operator itself executes the upload and updates the journal by appending the newest upload result to the accumulated value.

The output of the upload journal

This is the final output of above example showing which uploads were started and finished:

// START upload 1,2
// TEARDOWN upload of 1,2
// START upload 1,2 START upload 1,2,3
// TEARDOWN upload of 1,2,3
// START upload 1,2 START upload 1,2,3 START upload 1,2,3,4
// START upload 1,2 START upload 1,2,3 FINISHED upload of 1,2,3,4
// TEARDOWN upload of 1,2,3,4
// START upload 1,2 START upload 1,2,3 FINISHED upload of 1,2,3,4 START upload 1,2,3,4,5
// START upload 1,2 START upload 1,2,3 FINISHED upload of 1,2,3,4 FINISHED upload of 1,2,3,4,5
// TEARDOWN upload of 1,2,3,4,5

Let’s break down why:

  • The upload for 1,2 is started but is cancelled because another upload (for 1,2,3) arrives and the inner Observable is switched. The Observable is cancelled (teardown).
  • 1,2,3 suffers the same fate and is cancelled because the upload request for 1,2,3,4 arrives. The Observable is cancelled (teardown).
  • Because the upload service is very fast for uploads of 4 IDs this upload goes through before the next one arrives. Hence, we’re seeing the “FINISHED upload of 1,2,3,4”. Here the Observable completes.
  • The next upload for 1,2,3,4,5 is going through as well because it’s the last one. Therefore the second last message shows “START upload 1,2 START upload 1,2,3 FINISHED upload of 1,2,3,4 FINISHED upload of 1,2,3,4,5”. To summarise, 4 uploads were started, 2 uploads were cancelled while 2 others were finished.
  • Finally the teardown happens because the Observable completes (again)

Compared to switchScan, the mergeScan operator in above case would finish all 4 upload attempts.

Exercise for the RxJS scan operator 💪

Find other good examples for the usage of switchScan.

As always the code examples can be found on GitHub.