The RxJS connect operator allows multicasting a source Observable. Please note that prior to RxJS v8 there existed multicast operators like multicast, publish, etc.
Multicasting in RxJS
First, we need to understand what multicasting in RxJS is and which problems it is solving. Observables are by default cold and unicast. This means, the operation (producer function, usually async) behind it, is executed on subscription. And the crucial part is, that this operation is executed for every subscription.
Let’s construct a simple Observable to demonstrate it:
const heavyAsyncTask = () => {
console.log('heavy async task triggered');
return 'result';
};
const observable = new Observable((subscriber) => {
const result = heavyAsyncTask();
subscriber.next(result);
});
observable.subscribe({ next: r => console.log('subscriber 1 got: ' + r)});
observable.subscribe({ next: r => console.log('subscriber 2 got: ' + r)});
Our Observable is a wrapper for the heavy task of logging something to the console and returning a fixed string 'result'
. As you might have guessed the log output of the above code is:
heavy async task triggered
subscriber 1 got: result
heavy async task triggered
subscriber 2 got: result
This means for every subscription the heavy async task is triggered. This is something that can be fine in some cases but would be bad behavior in others. That is why multicasting is needed. Multicasting allows the reusing of a producer function invocation for multiple observers.
RxJS Multicasting through an explicit Subject
RxJS Subjects allow multicasting. Let us modify the above example to only execute the heavy async task once:
const observable = new Observable((subscriber) => {
const result = heavyAsyncTask();
subscriber.next(result);
});
const subject$ = new Subject();
subject$.subscribe({ next: r => console.log('subscriber 1 got: ' + r)});
subject$.subscribe({ next: r => console.log('subscriber 2 got: ' + r)});
// important to subscribe after the subscriptions to subject$
observable.subscribe(subject$);
Because a subject is an Observable and Observer at the same time, we can put it between subscribers and the original Observable. The output this time looks like the following:
heavy async task triggered
subscriber 1 got: result
subscriber 2 got: result
RxJS connect
The the section before we have seen that we can easily implement multicasting by using an Observable. However, RxJS offers the connect
operator to do the same with some advantages.
const heavyAsyncTask = (input: number): number => {
console.log('heavy async task triggered');
return input;
};
const observable$ = new Observable<number>((subscriber) => {
subscriber.next(heavyAsyncTask(1));
subscriber.next(heavyAsyncTask(2));
subscriber.next(heavyAsyncTask(3));
});
observable$.pipe(
connect(shared$ => merge(
shared$.pipe(map(n => `all ${ n }`)),
shared$.pipe(filter(n => n % 2 === 0), map(n => `even ${ n }`)),
shared$.pipe(filter(n => n % 2 === 1), map(n => `odd ${ n }`))
))
).subscribe(console.log);
This produces the following output:
heavy async task triggered
all 1
odd 1
heavy async task triggered
all 2
even 2
heavy async task triggered
all 3
odd 3
We notice that the heavy async task is only running three times although there are 3 times 2 streams generated.
RxJS share multicasts
The RxJS share
operator returns a new Observable that multicasts. As long as there is at least one subscriber data is emitted. Once all Observers have unsubscribed also share
internally unsubscribes from the Observable “before” share
.
const source$ = interval(1000).pipe(
tap(x => console.log('The heavy task calculated', x)),
take(3),
share(),
);
source$.subscribe({ next: r => console.log('subscriber 1 got: ' + r)});
source$.subscribe({ next: r => console.log('subscriber 2 got: ' + r), complete: done});
The above code with share
produces the following output:
The heavy task calculated 0
subscriber 1 got: 0
subscriber 2 got: 0
The heavy task calculated 1
subscriber 1 got: 1
subscriber 2 got: 1
The heavy task calculated 2
subscriber 1 got: 2
subscriber 2 got: 2
Without using share
the heavy task would be executed for each subscriber.
RxJS shareReplay
This operator is a wrapper around share
for a specific case. Because we also have to take care of our late subscribers when sharing. You can imagine that this is quite a common use case. You have an Observable and add subscribers after the heavy operation behind it, e.g., an HTTP request, has already run.
const source$ = interval(1000).pipe(
tap(x => console.log('The heavy task calculated', x)),
take(3),
shareReplay(),
);
source$.subscribe({ next: r => console.log('subscriber 1 got: ' + r)});
setTimeout(() => {
source$.subscribe({ next: r => console.log('subscriber 2 got: ' + r), complete: done});
}, 2500);
The subscriber 2
joins the party after 2.5s. Thanks to shareReplay all the values are also delivered to the late subscriber 2
.
The heavy task calculated 0
subscriber 1 got: 0
The heavy task calculated 1
subscriber 1 got: 1
subscriber 2 got: 0
subscriber 2 got: 1
The heavy task calculated 2
subscriber 1 got: 2
subscriber 2 got: 2
Would we use share
instead of shareReplay
in the above example, the 2nd subscriber would only see the value 2, but no 0 or 1.
Exercise for the RxJS multicast operators 💪
Your task today is implementing a caching mechanism for an Observable. The goal is to cache responses for 10 seconds. You might want to investigate windowTime
. You can use this code as a base. Your task is to implement createRefetchable
.
const cachedRefetchableSource$ = createRefetchable(of('response'));
cachedRefetchableSource$.subscribe({ next: r => console.log('subscriber 1 got: ' + r)});
setTimeout(() => {
cachedRefetchableSource$.subscribe({ next: r => console.log('subscriber 2 got: ' + r)});
}, 5_000);
setTimeout(() => {
cachedRefetchableSource$.subscribe({ next: r => console.log('subscriber 3 got: ' + r), complete: done});
}, 15_000);
Subscriber 2 should get the cached version while subscriber 3 is going to get a fresh response again.
This post is part of the RxJS mastery series. As always the code examples can be found on GitHub.