A RxJS Scheduler defines the execution context for Observables, i.e. configures when a subscription starts and when notifications are delivered. Most of you will not be concerned with Schedulers when using RxJS. Normally the default parameter is fine. It still makes sense to understand the concept though.
RxJS Scheduler explained 🎓
Schedulers consist of three components:
- 📜 Data structure that stores and queues the values based on priority or other criteria
- 📨 Execution context controls where and when a task is executed, e.g. immediately, in setTimeout, process tick or animation frame
- 🕐 Clock a virtual notion of “time” that the tasks can be scheduled on
There are the following schedulers available in RxJS:
- null – notifications are passed synchronously and recursively
- queueScheduler – schedules on a queue in the current event frame
- asapScheduler – schedules on the micro task queue (the same queue used for promises)
- asyncScheduler – uses setInterval
- animationFrameScheduler – considers the browser content repaint
Let’s have a look at asyncScheduler. We can pipe an Observable through a Scheduler by using the observeOn function. This ensures that the notifications are scheduled by the asyncScheduler:
import { Observable, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';
const observable = new Observable((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}).pipe(
observeOn(asyncScheduler)
);
console.log('before subscribe');
observable.subscribe({
next(x) {
console.log('got value ' + x)
},
error(err) {
console.error('error: ' + err);
},
complete() {
console.log('done');
}
});
console.log('after subscribe');
Because asyncScheduler internally uses JavaScripts setTimeout and setInterval functions the values are arriving later than the last console.log statement:
before subscribe
after subscribe
got value 1
got value 2
got value 3
done
If you are using the queueScheduler in above example then you are receiving the output in the following order:
before subscribe
got value 1
got value 2
got value 3
done
after subscribe
What problems do RxJS Schedulers solve? 🚧
Schedulers ensure that notifications are delivered in the intended order and point in time.
Smooth animations in the browser
When style values of browser elements are controlled through observables it makes sense to pipe through the animationFrameScheduler.
import { interval, animationFrameScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';
const progressBarDiv = new HTMLDivElement();
interval(10)
.pipe(observeOn(animationFrameScheduler))
.subscribe(newHeight => {
progressBarDiv.style.height = newHeight + 'px';
});
The result is a smooth transformation of the div’s height:
Expression changed after it has been checked (Angular)
If you ever worked with Angular you probably have seen the ExpressionChangedAfterItHasBeenCheckedError problem when using Observables. Scheduling the involved Observable’s notification with the asyncScheduler is solving the error. The expression is evaluated at the right point in time.
Operator behaviour
A lot of operator have default schedulers set, e.g.:
export function debounceTime<T>(dueTime: number, scheduler: SchedulerLike = asyncScheduler)
// from the debounceTime implementation
const targetTime = lastTime! + dueTime;
const now = scheduler.now();
if (now < targetTime) {
// ...
It is to ensure that the values are delivered in the right way for the specific operator. In an operator’s implementation the scheduler ensures a sense of time, e.g. scheduler.now().
How to test Schedulers 🚦
Well, we are not really interested in testing the schedulers themselves, but their influence on the notification’s delivery. Of course marbles testing gives us a way to test for time and order of values. Especially interesting is the testing of the animationFrameScheduler. Marbles testing offers us a method animate that accepts a marbles string. This string (painting in the example below) defines when the browser repainting happens. In our example the pairs a, b and c, d are delivered together because of the repainting timing:
it('should test the delivery', () => {
testScheduler.run((helpers) => {
const { hot, expectObservable, animate } = helpers;
let myService = {
data: () => hot('-a-b-c-d', { a: 'a', b: 'b', c: 'c', d: 'd'}),
};
const sub = '^-------!';
const output = ' ----(ab)(cd)';
const painting = ' ----x---x';
const observable$ = myService.data();
animate(painting);
expectObservable(observable$.pipe(
observeOn(animationFrameScheduler),
), sub).toBe(output);
});
});
Exercise for Schedulers 💪
Which scheduler can I use to deliver the ‘1’ first? So, which scheduler should be used for ‘otherScheduler’?
asyncScheduler.schedule(() => console.log('2'));
otherScheduler.schedule(() => console.log('1'));
// goal:
// "1"
// "2"
Find the solution on GitHub.