LiveLoveApp logo

Multicast Operator

The connectable() function

In the previous section we learned how to manually use a source Observable with a Subject in order to multicast to multiple Observers. In most cases when we need to multicast we do not necessarily need to create Subject. What we really need is a multicasted Observable.

We'll use the connectable() function, or the connect() or share() operators, to create multicasted Observables.

Example

Let's look at an example of using the connectable() function to create a new connectable Observable:

import { connectable, Observable, Subject } from "rxjs";
import { takeUntil } from "rxjs/operators";

const completeSubject = new Subject<void>();

/* Create a new observable, providing the observer. */
const observable = new Observable<number>((observer) => {
  console.log("%cNew subscription created", "background: #222; color: #bada55");

  let i = 0;
  const interval = setInterval(() => {
    observer.next(i++);
  }, 1000);

  return () => {
    clearInterval(interval);
  };
}).pipe(takeUntil(completeSubject));

/* Create a connectable observable specifying the connector Subject. */
const subject = new Subject<number>();
const multicasted = connectable(observable, {
  connector: () => subject
});

/* Each subscription receives a copy of Observer. */
multicasted.subscribe((value) => console.log("First subscription", value));
multicasted.subscribe((value) => console.log("Second subscription", value));

/* Connect the subject to the observabe. */
multicasted.connect();

/* Complete the observable after 5 seconds. */
setTimeout(() => {
  completeSubject.next();
  completeSubject.complete();
}, 5000);

See example on codesandbox

Let's break this down:

  • First, we define completeSubject, which we will use to trigger the completion of the observable using the takeUntil() operator.
  • We are creating a new Observable that emits a number value every 1000 milliseconds.
  • We then create a new Subject, which we'll use to explicitly provide as the connector to the connectable Observable.
  • We create the multicasted connectable Observable using the connectable() function. Note, the ConnectableConfig is optional, and by default, will create a new Subject<unknown>() internally.
  • We subscribe multiple times to the multicasted Observable that was created by the connectable() function.
  • We invoke the connect() method on the multicast instance in order to subscribe to the Subject that is multicasting the Observable. The connect() method returns the connection, which is an instance of a Subcription, which we can then use to unsubscribe from the multicasted Observable.
  • Finally, after 5000 milliseconds we complete the source Observable.