RxJS Mastery – How to build streams

I have often seen people struggling when using RxJS. They are frustrated because they try too much at once, do not have proper tests, or have difficulties understanding error messages. How can you do RxJS development effectively and efficiently? I give my view on that in this article. Done right, RxJS development is easy because the library offers a lot of solutions for the usually difficult asynchronous programming.

How to develop RxJS code

Mental modal for RxJS

When working with RxJS it is important to have the right ideas about it. Seeing your code as a collection of streams of values can help. On that value stream, you can apply simple operators. Each stream you are dealing with can start, error out, or complete.

Controlled step-by-step approach with verification

Streams can be complex. That means they can have branches, multiple values arriving at different times, or nested structures. And that is why one has to build them step-by-step and include some verification into the process:

  • You can effectively build streams by using TDD (Test-Driven Development)
  • Keep Observables as flat as possible (avoid nesting where possible)
  • Use Typescript properly to achieve additional safety and add more clarity inside streams
  • Understand compiler and error messages when something goes wrong

Use reactive code only when necessary

You do not need to cover everything in RxJS. If something is not asynchronous, RxJS is the wrong choice. Keep your streams small and understandable and implement synchronous parts of your application with the right pattern, i.e. in synchronous code. Asynchronous code is hard enough on its own, despite RxJS.

Example case

Let us illustrate the above considerations in an example. In our demo case, we would like to load a Post entity based on a user action. To load that we also need to pass the user ID which is also offered as Observable. To spice things up, we also would like to load the post’s comments after the post is loaded.

Setup testing properly

The start is a simple test with an implementation you could choose to load the post. I have seen code similar to the following one many times. And I have seen developers struggling with it, being frustrated, and in the end, they blame RxJS.

it('does not work', () => {
    const action$: Observable<LoadPostsAction> = of({ postId: 1 });
    const postService = new PostService();

    action$.pipe(
        map(action => action.postId),
        concatMap((postId) => postService.loadPost(postId)),
    ).subscribe({
        next: result => {
            expect(result).toEqual({id: 1, content: 'content of post 2'});
        },
    });
});

If we run that test, all looks good:

All looks fine in the test.

The above code looks perfectly fine at first sight. But it is not! The test is green although the assertion is wrong (“post 2” although we would get “post 1” back in the result). Time for our first tip:

Tip #1: Use TDD when developing RxJS streams and let the test first fail.

This means that in our case we first need a proper test setup. When testing asynchronous code in Jest (and alternatives), we need a callback. This is usually defined as “done” and we are then invoking the done() callback after asserting the outcome.

it('does not work', (done) => {
    const action$: Observable<LoadPostsAction> = of({ postId: 1 });
    const postService = new PostService();

    action$.pipe(
        map(action => action.postId),
        concatMap((postId) => postService.loadPost(postId)),
    ).subscribe({
        next: result => {
            expect(result).toEqual({id: 1, content: 'content of post 2'});
            done();
        },
    });
});

We have a failing test and are good to go and start extending our stream:

Of course, there are other testing approaches, e.g. marbles. But I don’t want to open that topic here. You can read my blog post about different RxJS testing approaches if you are interested. Anyway, we need to be sure that we are testing the right thing and that is why our test should fail first, independent of the testing approach!

Have the right mental model

Before we move on let us spend some time to see how you as a developer can think about RxJS streams. The mental model I propose asks us to think about the values and how they stream through our program. We can illustrate this as follows:

Values are green, source Observables blue (they are not the only Observables), and operators are indicated as rounded rectangles. Arrows show the flow of values.

Tip #2: Remember that Observables are just values over time and those values flow through your program.

Now let us extend the functionality safely.

Keep Observables as flat as possible

Let us now tackle the next step of our functionality and load posts by user. For that, we need to take the current user ID into account. RxJS would allow nesting Observables. What you do then is branching of streams. But besides that, you also make code harder to read and harder to change. So it is not advisable to implement the new functionality like this (switchMap is nested inside concatMap):

action$.pipe(
    map(action => action.postId),
    concatMap((postId) => {
        return user.getCurrentUserId$().pipe(
            switchMap((userId) => postService.loadPost(postId, userId))
        );
    })
).subscribe({
    next: result => {
        expect(result).toEqual({id: 1, content: 'content of post 1'});
        done();
    },
});

The user.getCurrentUserId$() Observable is used inside concatMap. This also means that we need to flatten the Observable of loadPost (more towards that in the next sub-section). A flat and clean structure is easier to handle:

action$.pipe(
    map(action => action.postId),
    withLatestFrom(user.getCurrentUserId$()),
    concatMap((postId) => postService.loadPost(postId, null)),
).subscribe({
    next: result => {
        expect(result).toEqual({id: 1, content: 'content of post 1'});
        done();
    },
});

Whenever the action$ delivers a new value we want to combine it with the latest value of the current user’s ID. You see that we kept the null for userId in the loadPost method. This is because we want to first see what the compiler tells us (to showcase the importance of error messages).

Understand error messages

Error messages around RxJS can be hard to understand in the beginning. The last code example from above delivers the following on the postId parameter inside the loadPost method call:

TS2345: Argument of type '[number, number]' is not assignable to parameter of type 'number'.

The method expects a number but receives [number, number]. We have to understand pipe here. Pipe is accepting operators as UnaryFunctions and forwards the output of one operator to the subsequent one. This passed output is one value, object, or array. Calling withLatestFrom changes the values in the stream and transforms number to [number, number], representing the postId and userId in an array. Long story short, to fix the above TS2345 you have to adapt the code to consider the new input to the operator:

action$.pipe(
    map(action => action.postId),
    withLatestFrom(user.getCurrentUserId$()),
    concatMap(([postId, userId]) => postService.loadPost(postId, userId)),
).subscribe({
    next: result => {
        expect(result).toEqual({id: 1, content: 'content of post 1'});
        done();
    },
});

And please don’t use (postId, userId). Something like this is not a UnaryFunction and is not going to work! This error message was not directly provided by RxJS but often occurs during RxJS development.

Another error message one often encounters is about higher-order Observables. Let us go back to the nested example from above. It is easy to miss the higher-order mapping operator, i.e. switchMap, and just use map in place of it:

action$.pipe(
    map(action => action.postId),
    concatMap((postId) => {
        return user.getCurrentUserId$().pipe(
            map((userId) => postService.loadPost(postId, userId))
        );
    })
).subscribe({
    next: result => {
        expect(result).toEqual({id: 1, content: 'content of post 1'});
        done();
    },
});

All looks good at first sight, but the test signals us:

We are expecting an object with fields like content and id. But what we are getting is not such an Object, but an Observable. Whenever you see this, you should check if you need to flatten another Observable. In our case, switchMap flattens an Observable.

Tip #3: Try to understand each error message. They look cryptic and chaotic at times but are really not that hard to understand once you read them carefully. Go through them from top to bottom and focus on the bottom as the root cause is usually hidden there. During development with RxJS errors can happen and you should not guess, but read, and understand.

Update to our value stream model:

Use types

As soon as values become more complex it can help to use types explicitly. True, the Typescript compiler can still help us and infer some types. Nevertheless specifying them explicitly can make it easier for us developers to understand. Extending by types will change our code to:

action$.pipe(
    map((action: LoadPostAction) => action.postId),
    withLatestFrom(user.getCurrentUserId$()),
    concatMap(([postId, userId]: [number, number]) => postService.loadPost(postId, userId)),
).subscribe({
    next: (result: Post) => {
        expect(result).toEqual({id: 1, content: 'content of post 1'});
        done();
    },
});

This makes extensions safer. For example, if we map the response of the post service to the content only (return string instead of Post), we would immediately notice because we also typed the result in the subscribe’s next handler.
The first one here looks okay to the compiler. Although we are passing string (the type of content) instead of Post.

action$.pipe(
    map((action: LoadPostAction) => action.postId),
    withLatestFrom(user.getCurrentUserId$()),
    concatMap(([postId, userId]: [number, number]) => postService.loadPost(postId, userId)),
    map((response: Post) => response.content),
).subscribe({
    next: (result) => {
        expect(result).toEqual({id: 1, content: 'content of post 1'});
        done();
    },
});

As soon as the next handler does have a type for its parameter, the compiler complains:

If you used RxJS before you probably have seen such error messages already. Important here is to understand that the error message is also considering the nesting. That is why we should always go to the innermost part of the message because that is usually the part the easiest to understand. In this case, “Type ‘string’ is not assignable to type ‘Post'”. You see that TypeScript informs us quite well about the error. We expect a Post, but we are getting something of type string. The DX around error messages could be improved though.

Tip #4: some extra types can help to make your code more robust and understand errors earlier. The compiler is a cheap and fast way to “test”, please use it!

Of course, there are cases where explicit types make no sense and we should really on the type inference instead.

Conclusion

I hope those hints give you a better understanding and you can optimize your approach to RxJS development. Having the right mental model about your stream helps to understand what the operators do and what is happening to the values flowing through your program over time.