Multicast Operator
The connectable() function
In the previous section we learned how to manually use a source Observable with a Subject in order to multicast to multiple Observers.
In most cases when we need to multicast we do not necessarily need to create Subject.
What we really need is a multicasted Observable.
We'll use the connectable() function, or the connect() or share() operators, to create multicasted Observables.
Example
Let's look at an example of using the connectable() function to create a new connectable Observable:
import { connectable, Observable, Subject } from "rxjs";
import { takeUntil } from "rxjs/operators";
const completeSubject = new Subject<void>();
/* Create a new observable, providing the observer. */
const observable = new Observable<number>((observer) => {
console.log("%cNew subscription created", "background: #222; color: #bada55");
let i = 0;
const interval = setInterval(() => {
observer.next(i++);
}, 1000);
return () => {
clearInterval(interval);
};
}).pipe(takeUntil(completeSubject));
/* Create a connectable observable specifying the connector Subject. */
const subject = new Subject<number>();
const multicasted = connectable(observable, {
connector: () => subject
});
/* Each subscription receives a copy of Observer. */
multicasted.subscribe((value) => console.log("First subscription", value));
multicasted.subscribe((value) => console.log("Second subscription", value));
/* Connect the subject to the observabe. */
multicasted.connect();
/* Complete the observable after 5 seconds. */
setTimeout(() => {
completeSubject.next();
completeSubject.complete();
}, 5000);
Let's break this down:
- First, we define
completeSubject, which we will use to trigger the completion of theobservableusing thetakeUntil()operator. - We are creating a new
Observablethat emits anumbervalue every 1000 milliseconds. - We then create a new
Subject, which we'll use to explicitly provide as theconnectorto the connectable Observable. - We create the
multicastedconnectable Observable using theconnectable()function. Note, theConnectableConfigis optional, and by default, will create anew Subject<unknown>()internally. - We subscribe multiple times to the
multicastedObservable that was created by theconnectable()function. - We invoke the
connect()method on themulticastinstance in order to subscribe to theSubjectthat is multicasting the Observable. Theconnect()method returns the connection, which is an instance of aSubcription, which we can then use to unsubscribe from the multicasted Observable. - Finally, after 5000 milliseconds we complete the source Observable.