Multicast Operator
The multicast() Operator
In the previous section we learned how to manually use a source Observable with a Subject in order to multicast to multiple Observers.
In most cases when we need to multicast we do not necessarily need a Subject.
What we really need is a multicasted Observable.
We'll use the multicast() or publish() operators to create multicasted Observables.
Example
Let's look at an example of using the multicast() operator to create a new ConnectableObservable:
import { ConnectableObservable, Observable, Subject } from 'rxjs';
import { multicast, takeUntil, } from 'rxjs/operators';
const completeSubject = new Subject<void>();
/* Create a new observable, providing the observer. */
const observable = new Observable<number>((observer) => {
console.log('%cNew subscription created', 'background: #222; color: #bada55');
let i = 0;
const interval = setInterval(() => {
observer.next(i++);
}, 1000);
return () => {
clearInterval(interval);
};
}).pipe(takeUntil(completeSubject);
const subject = new Subject<number>();
const multicasted = observable.pipe(
multicast(subject)
) as ConnectableObservable<number>;
/* Each subscription receives a copy of Observer. */
multicasted.subscribe((value) => console.log('First subscription', value));
multicasted.subscribe((value) => console.log('Second subscription', value));
/* Connect the subject to the observabe. */
multicasted.connect();
/* Complete the observable after 5 seconds. */
setTimeout(() => {
completeSubject.next();
completeSubject.complete();
}, 5000);
Let's break this down:
- First, we define
completeSubject, which we will use to trigger the completion of theobservableusing thetakeUntil()operator. - We are creating a new
Observablethat emits anumbervalue every 1000 milliseconds. - We then create a new
Subjectand pass this to themulticast()operator. We can also provide themulticast()operator with a factory function that returns aSubject. - We subscribe multiple times to the
ConnectableObservablethat was created by themulticast()operator. - We invoke the
connect()method on theConnectableObservableinstance in order to subscribe to theSubject. Theconnect()method returns an instance of aSubcription, which we can then use to unsubscribe from the multicasted Observable. - Finally, after 5000 milliseconds we complete the source Observable.