RxJS Mastery – #68 timeInterval, timestamp, timeout

RxJS offers some time operators in the utility category. In this article, we are going to have a look at timeInterval, timestamp, timeout, and timeoutWith.

RxJS timeInterval measures the time between two values

The timeInterval operator emits an object containing not only a value but also the time that has passed since emitting the previous value. The easiest example is probably the one based on interval.

interval(1000)
    .pipe(take(4), timeInterval())
    .subscribe({ next: x => console.log(x), complete: done});

// TimeInterval { value: 0, interval: 1004 }
// TimeInterval { value: 1, interval: 1047 }
// TimeInterval { value: 2, interval: 1010 }
// TimeInterval { value: 3, interval: 1006 }

Be aware that intervals are non-deterministic and it is not always going to be a 1000 ms. Also with rxjs marbles, we can nicely show the behavior. You might notice that the complete notification is not considered:

const timeMeasured$ = cold('100ms a 200ms b 300ms c 400ms |').pipe(
    timeInterval(),
    map((v: TimeInterval<string>) => v.interval),
);

expectObservable(timeMeasured$).toBe(
    '100ms a 200ms b 300ms c 400ms |',
    {
        a: 100,
        b: 201,
        c: 301,
    }
);

Also, the emission itself takes 1 ms. Therefore the time between is 201 ms or 301 ms.

RxJS timestamp attaches the datetime to an emission

The timestamp operator simply creates the current date, usually with Date.now(), and attaches it to the emission. The value emission therefore looks like this: {value: T, timestamp: R}.

interval(1000)
    .pipe(take(4), timestamp())
    .subscribe({ next: x => console.log(x), complete: done});

// { value: 1, timestamp: 1702807174323 }
// { value: 2, timestamp: 1702807175327 }
// { value: 3, timestamp: 1702807176335 }

You see in the output the value alongside the UNIX timestamp. Again we can have a look at the situation when using the TestScheduler and marbles:

createTestScheduler().run((helpers) => {
    const { expectObservable, cold } = helpers;

    const timeStamped$ = cold('100ms a 200ms b 300ms c 400ms |').pipe(
        timestamp(),
    );

    expectObservable(timeStamped$).toBe(
        '100ms a 200ms b 300ms c 400ms |',
        {
            a: { value: 'a', timestamp: 100 },
            b: { value: 'b', timestamp: 301 },
            c: { value: 'c', timestamp: 602 },
        }
    );
});

Thanks to the TestScheduler the time is not based on the AsyncScheduler’s Date.now(). Time starts at 0 and adds up nicely.

RxJS timeout errors if values do not arrive in time

The timeout operator allows to throw an error should too much time pass until a value is emitted. In the below example, we specify a timeout of 250 ms. Therefore the value c has no chance of being emitted 300ms after the value b. Instead, a TimeoutError is thrown.

const timeout$ = cold('100ms a 200ms b 300ms c 400ms |').pipe(
    timeout(250),
);

expectObservable(timeout$).toBe(
    '100ms a 200ms b 249ms #',
    {
        a: 'a',
        b: 'b',
    },
    new TimeoutError(),
);

The timeoutWith operator is deprecated and the timeout operator itself offers a with parameter in the TimeoutConfig. This helps us to specify how we want to time out. We can for example define a custom error instead of the TimeoutError.

class CustomTimeoutError extends Error {
    constructor() {
        super('It was too slow');
        this.name = 'CustomTimeoutError';
    }
}

createTestScheduler().run((helpers) => {
    const { expectObservable, cold } = helpers;

    const timeout$ = cold('100ms a 200ms b 300ms c 400ms |').pipe(
        timeout({
            each: 250,
            with: () => throwError(() => new CustomTimeoutError())
        }),
    );

    expectObservable(timeout$).toBe(
        '100ms a 200ms b 249ms #',
        {
            a: 'a',
            b: 'b',
        },
        new CustomTimeoutError(),
    );
});

Note that we have to pass a TimeoutConfig object in this case where the timeout value is defined in the each parameter.

Exercise for the RxJS timeout operator 💪

The timeout operator not only allows to specify a custom error in the with parameter. But it also enables us to switch to a different Observable entirely. That way we can switch to a faster Observable on timeout. This is exactly the task for you. When the timeout occurs you should replace the Observable with something that fulfills the below marbles diagram:

const timeout$ = cold('100ms a 200ms b 300ms c 400ms |').pipe(
    timeout({
        each: 250,
        with: // your code
    }),
);

expectObservable(timeout$).toBe(
    '100ms a 200ms b 249ms c 100ms |',
    {
        a: 'a',
        b: 'b',
        c: 'c'
    },
);

This post is part of the RxJS mastery series. As always the code examples can be found on GitHub.