LiveLoveApp logo

Recap Observer

Observers

An Observer consumes the values that are emitted by an Observerable. The Observer is an object with three properties for each notification type.

interface NextObserver<T> {
    closed?: boolean;
    next: (value: T) => void;
    error?: (err: any) => void;
    complete?: () => void;
}
interface ErrorObserver<T> {
    closed?: boolean;
    next?: (value: T) => void;
    error: (err: any) => void;
    complete?: () => void;
}
interface CompletionObserver<T> {
    closed?: boolean;
    next?: (value: T) => void;
    error?: (err: any) => void;
    complete: () => void;
}
declare type PartialObserver<T> = NextObserver<T> | ErrorObserver<T> | CompletionObserver<T>;

The Observer can optionally specify one or more properties corresponding to each notification type:

  • next: the next value notification that is emitted 0 to n number of times.
  • error: the error notification is emitted 0 or 1 times when an exception occurs, along with the immediate completion of the stream.
  • complete: the completion notification is emitted upon completion.

When the Observable is complete the Observer will not receive any further notification (of any type).

Finally, we should note that when subscribing to an Observable the PartialObserver is optional:

subscribe(observer?: PartialObserver<T>): Subscription;

We can also specify the callback functions for each notification type as arguments to the subscribe() method rather than the PartialObservable object:

subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription;

Creation of an Observable

It helps us to understand the Observer by looking at the creation of a new Observable.

import { Observable } from 'rxjs';

/* create a new observable, providing the observer. */
const observable: Observable<string> = new Observable(observer => {
  const interval = setInterval(() => {
    observer.next('Hello from Observableland!');
  }, 2000);

  // teardown
  return () => {
    clearInterval(interval);
  };
});

/* Subscribe to Notifications. */
observable.subscribe(value => console.log(value));

See example on codesandbox

Before we break this down it may be helpful to look at the constructor function signature:

constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic);

Let's break this down:

  • First, we import the Observable class.
  • We create a new Observable specifying the generic type of string.
  • The constructor accepts a subscribe function whose this execution context is an Observable. The function requires a new Observer.
  • The next() method on the Observer emits a next notification to Observers.
  • The error() method on the Observer emits the error notification when an exception occurs.
  • The complete() method on the Observer emits the completion notification when the Observable stream is complete and will no longer emit an error or next notification.
  • Finally, the subscribe function returns a teardown function. The teardown function is invoked when the Observable is complete, errors, or the Observer unsubscribes. This is an opportunity for the observable to clean anything up.

Key Takeaways

  1. Observers have three methods: next(), error(), and complete().
  2. While not common when using RxJS, you can new-up an Observable using its constructor() function.