RxJS Mastery – #2 Observables

RxJS Observables provide us with a stream of immutable values. We can subscribe to Observables to observe these values. In the end we should also unsubscribe from them as soon as we do not need the values anymore. I like the following definition:

Definition
An Observable is a representation of any set of values over any amount of time.

rxjs.dev

Observables explained 🎓

Let’s take the example from the Intro where we were listening on mouse clicks:

const mouseEventObservable: Observable<MouseEvent> =
    fromEvent<MouseEvent>(document, 'click');

The fromEvent function is creating an Observable. As Observable<T> is a generic type we can also specify the type of values that the stream delivers. Given that you are working with TypeScript of course. In our case the stream is going to contain values of type MouseEvent.

So, what exactly is an Observable? An Observable is just a function! The following Observable emits the values 1,2,3 and completes.

const source = new Observable<number>((subscriber) => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});

One can then subscribe to this source and observe the values:

source({
  next: console.log,
  complete: () => console.log('done'),
});

Here we are using the source function with notifications. First, we console.log the value we receive when the Observable emits the next value. Second, we log ‘done’ as soon as the Observable completes. Beside next and complete there is a third notification error in case of problems occurring in the Observable.

Why RxJS?

So, why are we using RxJS and not just plain functions? This is because libraries like RxJS give us some guarantees around Observables. This is basically the contract when using Observables and contains the following points:

  • After an Observable completes, it emits no more values
  • In an error case the stream is terminated
  • The Observer that subscribes to an Observable is able to cancel the subscription and therefore also the async task behind the Observable. This is the teardown.
  • The Observable can emit 0 to infinite next notifications. But error or complete are only delivered once and only either one of them. This can be expressed in Regex: next*(error|complete)?

Consequently, if we are using RxJs we do not have to worry about some things.

Lastly, the Observable starts its work only when we subscribe to it and not when we create the Observable. Most probably you have run into such situations when using RxJS. For example you could have been wondering why an HTTP request behind an Observable was not executed. Hence, if you want the Observable to do its async task, subscribe to it!

Hot vs. cold Observable

If you are dealing with Observables you will sooner or later hear about hot or cold Observables. And often developers are confused about this distinction. We have a cold case when the Observable itself creates the producer, i.e. the source of values:

const cold = new Observable((observer) => {
  const producer = new Producer();
  // observer listens on producer
});

On the other hand we have a hot Observable if the producer is created outside the Observable.

const producer = new Producer();
const hot = new Observable((observer) => {
  // observer listens on producer
});

In the end Observable are just functions that connect an Observer to a producer.

A cold Observable creates the producer, activates it and starts listening. So, we have a unicast, i.e. one producer delivering values for one Observer. If a second Observer subscribes to the cold Observable it gets its own producer.

A hot Observable shares a reference to a producer. Upon subscription it starts listening for new values on the shared producer. So, usually this is a multicast case where multiple Observers can use the same producer.

Therefore we note: a hot Observable should be used when we don’t want to create a producer multiple times. For example we might be interested in not opening a websocket for each Observer.

What problems do Observables solve? 🚧

Primarily, Observables wrap async data sources (but Observables do not necessarily have to be asynchronous!). The first example in this article, i.e. fromEvent, is a wrapper around click events. If you are using Angular http responses are handled as Observables. Furthermore, we can read the filesystem by using Observables or work with WebSockets.

Let’s create a custom Observable for a random number generator:

export const randomNumberGenerator$ = new Observable<number>((subscriber) => {
    const interval = setInterval(() => {
        const randomNumber = Math.floor(Math.random() * 100);
        subscriber.next(randomNumber);
    }, 1000);
});

Our Observable returns numbers. Hence, we are typing it to return values of type number. Then basically in the constructor we are defining a function that takes care of every subscriber. In our case the Observable produces a random number every second by using JavaScripts setInterval. The subscriber is informed about a random number immediately because its next() method is invoked and the randomNumber is passed.

But above random number generator is never going to stop. This is where the teardown functionality of observables comes into play.

export const randomNumberGenerator$ = new Observable<number>((subscriber) => {
    const interval = setInterval(() => {
        const randomNumber = Math.floor(Math.random() * 100);
        subscriber.next(randomNumber);
    }, 1000);

    return () => {
        clearInterval(interval);
    }
});

At the end of the function we are returning another function. In that function we are cleaning up the “async” resource (in this case the interval is cleared).

subscriber.complete
Why do we not call subscriber.complete() in this teardown function? Because if an Observer unsubscribes from our Observable, the Observer knows that no values are coming anymore. The subscriber.complete() is reserved for regular completion from an Observables (see Observable at the beginning of the article that emits 1,2,3 and completes).

This is our custom observable. Let’s see how we can test it.

How to test Observables 🚦

Normally you would test RxJS Observables with rxjs-marbles. But here it is a bit difficult because of the random output. You will see rxjs-marbles in action in later lessons. This time our test looks like this:

describe('Random Number Generator', () => {

    it('should output random numbers until unsubscribed', (done) => {
        const randomNumbers: number[] = [];
        const randomNumberSubscription = randomNumberGenerator$.subscribe({
            next(x) { randomNumbers.push(x); },
        });

        setTimeout(() => {
            randomNumberSubscription.unsubscribe();
            console.log(randomNumbers);
            expect(randomNumbers.length).toEqual(10);
            done();
        }, 10_500);
    });
});

We are just checking if the observable did emit 10 numbers. Sources can be found on GitHub.

Exercises for Observables 💪

Here comes your task. Create a function that returns an Observable for an HTTP GET request:

// interface
httpGet<T>(url: string): Observable<T>

// usage
const $response = httpGet<ResponseType>('someUrl');

Please find a possible minimal solution on my GitHub repository.
And for those that want to do more: Create an Observable that emits the online status if it changes:

Observable<'online' | 'offline'> 
// using the DOM events 'online', 'offline'
// only emitting on changes

The solution can be found here. Of course using fromEvent is also a good approach.

In the next article we are covering Subjects, a special kind of Observable.