Multicast Operator
The connectable()
function
In the previous section we learned how to manually use a source Observable with a Subject
in order to multicast to multiple Observers.
In most cases when we need to multicast we do not necessarily need to create Subject
.
What we really need is a multicasted Observable.
We'll use the connectable()
function, or the connect()
or share()
operators, to create multicasted Observables.
Example
Let's look at an example of using the connectable()
function to create a new connectable Observable
:
import { connectable, Observable, Subject } from "rxjs";
import { takeUntil } from "rxjs/operators";
const completeSubject = new Subject<void>();
/* Create a new observable, providing the observer. */
const observable = new Observable<number>((observer) => {
console.log("%cNew subscription created", "background: #222; color: #bada55");
let i = 0;
const interval = setInterval(() => {
observer.next(i++);
}, 1000);
return () => {
clearInterval(interval);
};
}).pipe(takeUntil(completeSubject));
/* Create a connectable observable specifying the connector Subject. */
const subject = new Subject<number>();
const multicasted = connectable(observable, {
connector: () => subject
});
/* Each subscription receives a copy of Observer. */
multicasted.subscribe((value) => console.log("First subscription", value));
multicasted.subscribe((value) => console.log("Second subscription", value));
/* Connect the subject to the observabe. */
multicasted.connect();
/* Complete the observable after 5 seconds. */
setTimeout(() => {
completeSubject.next();
completeSubject.complete();
}, 5000);
Let's break this down:
- First, we define
completeSubject
, which we will use to trigger the completion of theobservable
using thetakeUntil()
operator. - We are creating a new
Observable
that emits anumber
value every 1000 milliseconds. - We then create a new
Subject
, which we'll use to explicitly provide as theconnector
to the connectable Observable. - We create the
multicasted
connectable Observable using theconnectable()
function. Note, theConnectableConfig
is optional, and by default, will create anew Subject<unknown>()
internally. - We subscribe multiple times to the
multicasted
Observable that was created by theconnectable()
function. - We invoke the
connect()
method on themulticast
instance in order to subscribe to theSubject
that is multicasting the Observable. Theconnect()
method returns the connection, which is an instance of aSubcription
, which we can then use to unsubscribe from the multicasted Observable. - Finally, after 5000 milliseconds we complete the source Observable.