ReplaySubject
ReplaySubject
The ReplaySubject class extends the Subject class, which extends the Observable class.
The ReplaySubject emits all or a specified number of past next notifications, and all future next notifications to an Observer upon subscribing until the Observer unsubscribes or the Observable is complete or errors.
As the name suggests, the notifications are "replayed" to an Observer no matter when the Observer subscribes.
The first optional argument to the constructor() function is bufferSize.
This specifies the maximum number next notification values that will be emitted immediately to an Observer upon subscribing.
The second optional argument to the constructor() function is windowTime.
This specifies the maximum number of milliseconds that each next notification value is available to be emitted immediately to on Observer upon subscribing.
Example
Let's look at an example:
import { ReplaySubject } from 'rxjs';
/* Create an instance of ReplaySubject. */
const replaySubject = new ReplaySubject<number>();
/* Subscribe to subject. */
replaySubject.subscribe({
next: (value) => console.log('before:', value),
error: (error) => console.error('before', error),
complete: () => console.log('complete before')
});
/* Emit some values. */
replaySubject.next(1);
replaySubject.next(2);
replaySubject.next(3);
/* Subscribe late to subject. */
replaySubject.subscribe({
next: (value) => console.log('after:', value),
error: (error) => console.error('after:', error),
complete: () => console.log('complete after')
});
/* Complete the observable stream. */
replaySubject.complete();
Let's review the code above:
- First, we create a new
ReplaySubjectinstance and specify the generic type. - Then, we subscribe to the
replaySubjectbefore any next notifications are emitted. - We emit three values using the
next()method. - Then, we subscribe again to the
replaySubjectafter the next notifications have been emitted. - Finally, we
complete()the Observable.
In this example the first Observer will receive the next notifications as they are emitted. However, as soon as the second Observer subscribes, the Observer receives all past next notifications.
Finally, both Observers receive the completion notification when we invoke complete().
Example with bufferSize Set
Now, let's take the previous example and specify a buffer size of 2.
import { ReplaySubject } from 'rxjs';
/* Create an instance of ReplaySubject. */
const replaySubject = new ReplaySubject<number>(2);
/* Subscribe to subject. */
replaySubject.subscribe({
next: (value) => console.log('before:', value),
error: (error) => console.error('before', error),
complete: () => console.log('complete before')
});
/* Emit some values. */
replaySubject.next(1);
replaySubject.next(2);
replaySubject.next(3);
/* Subscribe late to subject. */
replaySubject.subscribe({
next: (value) => console.log('after:', value),
error: (error) => console.error('after:', error),
complete: () => console.log('complete after')
});
/* Complete the observable stream. */
replaySubject.complete();
In the example above, the late Observer should only receive the last 2 next notification upon subscribing.