RxJS Mastery – #27 buffer

RxJS buffer operators lesson title

By default RxJS has no buffering or throttling. But five operators are offered to achieve it in different ways. Those operators are buffer, bufferCount, bufferTime, bufferToggle and bufferWhen. They help to “hold back” some values. The values are emitted in an array when the observer is finally notified about the buffered values.

Why using RxJS buffer operators?

The buffer operators are helpful when processing in batches is better than handling each single value separately. If an observer executes an expensive operation buffering on the Observable’s side helps to not overwhelm observers. This could for example make sense when the observer is re-rendering the DOM tree or even executing http requests.

buffer

The buffer operator accepts a single parameter which is another Observable. The emission of that passed Observable decides when values are emitted:

const events$ = cold('123456789');
const notifier$ = cold('--a--a--a');
const buffered$ = events$.pipe(buffer(notifier$));

buffered$.subscribe(console.log);
// [1,2,3]
// [4,5,6]
// [7,8,9]

So, the output is really controlled by the notifier$ Observable. That’s why it’s also possible that empty arrays are emitted:

const events$ = cold('123---456');
const notifier$ = cold('--a--a--a');
const buffered$ = events$.pipe(buffer(notifier$));

buffered$.subscribe(console.log);
// [1,2,3]
// []
// [4,5,6]

bufferCount

The RxJS bufferCount operator is simple to understand in its basic variant. It just waits for a certain amount of values before a next notification is triggered:

const events$ = cold('123456789|');
const buffered$ = events$.pipe(bufferCount(2));

buffered$.subscribe(console.log);
// [1,2]
// [3,4]
// [5,6]
// [7,8]
// [9]

Be aware that the 9 above is only emitted because the events$ stream completed. Otherwise bufferCount would have continued to wait for a second value and would not have emitted 9 as a single value in the array.

Then there is a second variant of the bufferCount operator or better a second parameter:

bufferCount<T>(bufferSize: number, startBufferEvery: number = null)

When passing the number n for startBufferEvery then a new buffer is started every n values. Importantly, the bufferSize of course is not affected by that parameter. This means a single value can be emitted multiple times (bufferSize > startBufferEvery):

const events$ = cold('123456789|');
const buffered$ = events$.pipe(bufferCount(3, 2));

buffered$.subscribe(console.log);
// [1,2,3]
// [3,4,5]
// [5,6,7]
// [7,8,9]
// [9]

Or some values can be skipped (bufferSize < startBufferEvery):

const events$ = cold('123456789|');
const buffered$ = events$.pipe(bufferCount(2, 3));

buffered$.subscribe(console.log);
// [1,2]
// [4,5]
// [7,8]

bufferTime

The RxJS bufferTime operator buffers values for a certain amount of milliseconds. Imagine that every 1ms a value is emitted by the source. If we pass 5 as bufferTimeSpan for the bufferTime function then we are getting up to 5 values each time the buffer is emitted as next notification:

const events$ = cold('123456789|');
const buffered$ = events$.pipe(bufferTime(5));

buffered$.subscribe(console.log);
// [1,2,3,4,5]
// [6,7,8,9]

In addition to the bufferTimeSpan the bufferCreationInterval allows to define that the buffer should be opened every bufferCreationInterval milliseconds. And there is a third parameter maxBufferSize that obviously limits the number of values that are collected in the buffer:

const events$ = cold('123456789|');
const buffered$ = events$.pipe(bufferTime(5, 5, 2));

buffered$.subscribe(console.log);
// [1,2]
// [6,7]

bufferToggle

The RxJS bufferToggle is similar to buffer. But it allows to define the start as well as the end of the buffering phase:

bufferToggle<T, O>(openings: ObservableInput<O>, closingSelector: (value: O) => ObservableInput<any>)

The passed openings Observable opens the buffer while the closingSelector’s emission closes the buffer and triggers the next notification.

In below example the openings Observable is starting the buffer after 2 time frames. That means the buffer starts with value 3. The closeBuffer$ is a hot Observable. Therefore it begins its work already at the start of the time and emits after 5 time frames. Finally the emission is the array with the values 3,4,5, and 6.

const events$ = cold(    '123456789|');
const openBuffer$ = cold( '--o-----');
const closeBuffer$ = hot('-----c');
const buffered$ = events$.pipe(bufferToggle(openBuffer$, (o) => closeBuffer$));

buffered$.subscribe(console.log);
// [3,4,5,6]

bufferWhen

Finally, there is the RxJS bufferWhen that accepts a factory for a closingSelector to control when to close, emit, and reset the buffer:

const events$ = cold('123456|');
const closingSelector$ = cold('---a');
const buffered$ = events$.pipe(bufferWhen(() => closingSelector$));

buffered$.subscribe(console.log);
// [1,2,3]
// [4,5,6]

Exercise for the buffer operators 💪

Take above example of bufferToggle where we used a hot Observable. Replace that hot Observable for the closeBuffer$ with a cold one. What changes and why?

const events$ = cold(    '123456789|');
const openBuffer$ = cold( '--o-----');
const closeBuffer$ = cold('-----c');
const buffered$ = events$.pipe(bufferToggle(openBuffer$, (o) => closeBuffer$));

buffered$.subscribe(console.log);

As always the code examples and exercise solution can be found on GitHub.