The RxJS window operators come in five different variants: window, windowCount, windowTime, windowToggle and windowWhen. The operators offer buffering similar to RxJS buffer but they emit a nested Observable instead of an array. Buffer also comes with the same five variants as window.
RxJS window
Window accepts a single parameter which is another Observable. This Observable’s emission determine when a new nested Observable is branched out of the source Observable.
window<T>(windowBoundaries: Observable<any>): OperatorFunction<T, Observable<T>>
Let’s see in an easy example how this works. We take an interval as source Observable and branch out a nested Observable based on another interval Observable. Because the output value of window is an Observable we need to subscribe to when receiving the value:
interval(100).pipe(
take(5),
window(interval(310)),
).subscribe({
next: nestedObservable => nestedObservable.subscribe(console)
});
// 0
// 1
// 2
// 3
// 4
You don’t see anything special, do you? Well, the nested Observables are subscribed to and those values are logged to the console. If we add a bit more logging we see what happens:
interval(100).pipe(
take(5),
window(interval(310)),
).subscribe({
next: nestedObservable => nestedObservable.subscribe({
next: console.log,
complete: () => console.log('inner complete'),
}),
});
// 0
// 1
// inner complete
// 2
// 3
// 4
// inner complete
Two nested (inner) Observables are created and subscribed to. Whenever a nested Observables completes we log a message. Hence, we see that the first branched out Observables emits the values 0 and 1 before it completes. The second one delivers the values 2, 3 and 4 and also completes.
Attention: Normally you would not subscribe to the nested Observable in the way shown above. It’s important to keep the reactivity and avoid nested subscriptions. For example the mergeAll operator could flatten the Observables in such cases.
And interestingly the RxJS window operator doesn’t actually return an Observable, but a Subject. This is due to performance reasons, see here for details: https://github.com/ReactiveX/rxjs/pull/2408.
RxJS window vs. RxJS buffer
The main difference between the two is that window notifies immediately when a new buffer is created. Because in that case a new Observable is emitted. Buffer collects the values internally and emits them as array once the conditions for emission of that buffer are fulfilled.
It also allows you to stay reactive and use further RxJS operators on the output compared to the array emitted by buffer.
Other RxJS window variants
Here you won’t find any details about the rest of the operators. They work the same way as buffer. Important as always with window is the emission of a nested Observable instead of an array. Otherwise the semantics and concepts are the same as for buffer.
Exercise for the RxJS window operator 💪
Implement buffer based on window. Of course you can simplify a few things around it and start with:
const myBuffer = function<T>(closingNotifier: Observable<any>): OperatorFunction<T, unknown> {
return pipe(
// your code based on window
)
};
As always the code examples can be found on GitHub.