A RxJS Subject is a special type of Observable. It allows multicasting compared to a plain Observable that is unicast.
- Unicast: each subscribed Observer owns an independent execution of the Observable
- Multicast. multiple Observers see the same execution of the Observable
Simply put, we can say the following about Subjects:
Definition
A RxJS Subject is an Observable and an Observer at the same time.
RxJS Subjects explained 🎓
In the case of Observables every subscription starts an execution. When we subscribe to a Subject it just registers the new Observer in a given list of observers. Taking the definition of above into account:
- A Subject is an Observable: one can subscribe to it to receive the values. The Observer cannot tell whether it is subscribed to a Subject or a plain Observable.
- A Subject is an Observer: therefore it is an object with the three methods next(), error(), and complete() and can receive values.
Hence, we can use a Subject by calling its next() method which in turn notifies every Observer about the new value. Let’s see how this looks like in code:
import { Subject } from 'rxjs';
describe('Subject', () => {
it('should inform all subscribers', () => {
const subject$: Subject<number> = new Subject<number>();
subject$.subscribe({ next: (v) => console.log(`A: ${v}`)});
subject$.next(1);
subject$.subscribe({ next: (v) => console.log(`B: ${v}`)});
subject$.next(2);
});
});
As soon as an Observer subscribes to a Subject it receives the values that are passed by upcoming next() notifications on the Subject. This is also illustrated by the output of above test:
A: 1
A: 2
B: 2
Observer B only receives the value 2 as the subscription happened later. So, the values that an Observer receives depend on the point in time the Observer subscribes. This is also illustrator by the next example:
import { Subject } from 'rxjs';
describe('Subject', () => {
it('should inform all subscribers', () => {
const subject$: Subject<number> = new Subject<number>();
const subA = subject$.subscribe({ next: (v) => console.log(`A: ${v}`)});
subject$.next(1);
subject$.subscribe({ next: (v) => console.log(`B: ${v}`)});
subject$.next(2);
subA.unsubscribe();
subject$.subscribe({ next: (v) => console.log(`C: ${v}`)});
subject$.next(3);
});
});
On the one hand the first Observer A is only getting the values 1 and 2. Since afterwards the unsubscribe happens for A. Nevertheless the values are still emitted for other Observers. On the other hand Observer C only subscribes after already 2 values were emitted. Hence, Observer C only receives the third value 3.
A: 1
A: 2
B: 2
B: 3
C: 3
There are special variants of Subjects which are presented next. All serve specific use cases.
BehaviorSubject
If you use a BehaviorSubject the Observer receives a value directly after subscribing. This is either the initial value or the last value that was emitted:
import { BehaviorSubject } from 'rxjs';
describe('BehaviourSubject', () => {
it('should inform all subscribers (also about the last value)', () => {
const subject$: BehaviorSubject<number> = new BehaviorSubject<number>(0);
subject$.subscribe({ next: (v) => console.log(`A: ${v}`)});
subject$.next(1);
subject$.next(2);
subject$.subscribe({ next: (v) => console.log(`B: ${v}`)});
subject$.next(3);
// A: 0
// A: 1
// A: 2
// B: 2
// A: 3
// B: 3
});
});
Replay Subject
A ReplaySubject is similar to a BehaviorSubject and also replays old values. But here you can specify how many old values are emitted or from which time frame into the past.
import { ReplaySubject } from 'rxjs';
describe('ReplaySubject', () => {
it('should inform all subscribers', () => {
const subject$: ReplaySubject<number> = new ReplaySubject<number>(2);
subject$.next(1);
subject$.next(2);
subject$.next(3);
subject$.subscribe({next: (v) => console.log(`A: ${v}`)});
subject$.next(4);
// A: 2
// A: 3
// A: 4
});
});
And in the next example we are replaying the last 10 values, but only those that were to be emitted in the last 200ms:
it('should inform all subscribers time based', (done) => {
const subject$: ReplaySubject<number> = new ReplaySubject<number>(10, 200);
let value = 0;
const interval = setInterval(() => subject$.next(value++), 100);
setTimeout(() => {
subject$.subscribe({ next: (v) => console.log(`A: ${v}`)});
}, 450);
setTimeout(() => {
clearInterval(interval);
subject$.unsubscribe();
done();
}, 1000)
// A: 2
// A: 3
// A: 4
// A: 5
// A: 6
// A: 7
});
AsyncSubject
The AsyncSubject is only delivering maximum one value. This value is only emitted when the execution completes:
import { AsyncSubject } from 'rxjs';
describe('AsyncSubject', () => {
it('should inform the subscriber only with the last value before completion', () => {
const subject$: AsyncSubject<number> = new AsyncSubject<number>();
subject$.subscribe({ next: (v) => console.log(`A: ${v}`)});
subject$.next(1);
subject$.next(2);
subject$.subscribe({ next: (v) => console.log(`B: ${v}`)});
subject$.next(3);
subject$.complete();
// A: 3
// B: 3
});
});
These were Subject, BehaviorSubject, ReplaySubject, and AsyncSubject in a quick walkthrough.
Observable vs. Subject
What you should have noticed by now is the following difference between Observable and Subject. An Observable is just a function while a Subject has state. Because a Subject keeps a list of Observers.
As always the sources can be found on GitHub.
What problems do RxJS Subjects solve? 🚧
First of all, do not use a Subject if you only need the functionality of an Observable. Then, from an abstract point of view RxJS Subjects allow multicasting. This was already explained in the intro. How can we use that in practice?
Sharing Observables
We can send the values of an Observable through a Subject because a Subject is also an Observer. And then we can subscribe to this Subject to make sure that every Observer gets the same values.
it('allows to share Observables', (done) => {
const numbers$ = randomNumberGenerator$;
const subject = new Subject();
subject.subscribe({ next: (val) => console.log(`A: ${val}`) });
subject.subscribe({ next: (val) => console.log(`B: ${val}`) });
const sub = numbers$.subscribe(subject);
// A: 14
// B: 14
// A: 92
// B: 92
setTimeout(() => {
sub.unsubscribe();
done();
}, 2_500);
});
Simple state management
We can offer a simple state management service by using a BehaviorSubject:
import { BehaviorSubject, Observable } from 'rxjs';
interface User {
firstName: string;
lastName: string;
}
export class UserStateService {
constructor() {}
private initialState: User = null;
private userState = new BehaviorSubject<User>(this.initialState);
getUser(): Observable<User> {
return this.userState.asObservable();
}
setUser(value: User): void {
this.userState.next(value);
}
reset(): void {
this.userState.next(this.initialState);
}
}
Replaying values on multiple Observers
With a ReplaySubject you have many possibilities. I used it once to synchronise multiple columns with a similar accordion in each column. Opening one accordion element in column A would open the element at the same position in column B. If a third column C is added dynamically the “clicks” are replayed on this third column C because the ReplaySubject knows what happened.
How to test Subjects 🚦
When I first encountered rxjs-marbles I was excited. A colleague at work showed it to me and I immediately tried to apply it. Subjects, and of course also Observables, can be tested with marble diagrams. Throughout this series we will see different tests. For today I will focus on a simple test concerning this code from the top:
const subject$: Subject<number> = new Subject<number>();
subject$.subscribe({ next: (v) => console.log(`A: ${v}`)});
subject$.next(1);
subject$.subscribe({ next: (v) => console.log(`B: ${v}`)});
subject$.next(2);
In a test we want to verify that A receives the value 1 and 2 while B receives only the value 2. We do not directly test above code but we will create a hot observable and do the subscriptions:
import { Subject, Subscription } from 'rxjs';
import { TestScheduler } from 'rxjs/testing';
const testScheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
describe('marbles testing', () => {
it('should emit 1 and 2 to the first observer and only 2 to the second', () => {
testScheduler.run((helpers) => {
const { cold, hot, expectObservable } = helpers;
const subject$: Subject<number> = hot('-ab--', {a: 1, b: 2});
expectObservable(subject$, '^----').toEqual(cold('-xy', {x: 1, y: 2}));
expectObservable(subject$, '--^--').toEqual(cold('y', {y: 2}));
});
});
});
We have more ways to test different aspects of an RxJS Subject. For example also the subscriptions can be tested. And of course there are also libraries for different test frameworks available. Here I used the RxJS testing functions directly.
Exercise for Subject 💪
Implement undo solution using Subjects. You should be able to undo the last 5 actions but only actions that were not older than 10 seconds. Below you find the ActionType and possible interfaces:
type ActionType = 'COPY' | 'CUT' | 'PASTE' | 'DELETE';
Redo.addAction('CUT');
Redo.redo() // returns at most the last 5 actions to be replayed
Please find a possible solution on GitHub.