WebSocketSubject
WebSocket
The WebSocket protocol enables a single connection with full-duplex communication. WebSocket connections are ideal for streaming data in real-time.
Some example use cases for a WebSocket may include:
- Chat application
- Financial application
- News application
- Sports game application
- Game application
At this point you may be thinking:
WebSockets + RxJs sound like a perfect match
RxJS is indeed well suited for streaming data from a WebSocket into an application.
webSocket() Function
To create a new WebSocketSubject we will use the webSocket() function.
The function signature is as follows:
webSocket<T>(urlConfigOrSource: string | WebSocketSubjectConfig<T>): WebSocketSubject<T>
Let's break this down:
- First, we can specify the generic type Tfor the next notification values.
- The function accepts either the stringURL for connecting to a WebSocket or aWebSocketSubjectConfigobject.
- Finally, the function returns an instance of the WebSocketSubject.
WebSocketSubject
The WebSocketSubject enables us to communicate with the WebSocket:
- An attempt to open a connection occurs upon subscribing to the WebSocketSubjectunless a connection is already open. Additional Observers to theWebSocketSubjectinstance will result in a multicast of notifications from the single connection.
- The connection is closed when the number of Observers goes from 1to0.
- All Observers receive a next notification for messages received from the server.
By default, messages are deserialized via JSON.parse().
- All Observers receive a complete notification when the connection is closed.
- All Observers receive an error notification when an error occurs.
- Emitting a next notification using the next()method on theWebSocketSubjectinstance sends a message to the server.
- Emitting a complete notification using the complete()method on theWebSocketSubjectinstance will close the connection.
- Emitting an error notification using the error()method on theWebSocketSubjectinstance will close the connection and send the status code to the server.
The WebSocketSubject also provides the ability for multiplexing.
This involves emulating multiple separate WebSocket connections through a single connection to improve performance.
Example
Let's look at an example to get started:
import { webSocket } from 'rxjs/webSocket';
/** Create a new WebSocketSubject using the webSocket operator. */
const webSocketSubject = webSocket<string>('wss://websocket-echo.com');
/** Send a message prior to opening the connection (subscribing). */
webSocketSubject.next('first');
/** Subscribe to the WebSocketSubject. */
webSocketSubject.subscribe({
  error: e => console.error(e),
  next: console.log,
  complete: () => console.log('complete')
});
/** Send additional messages after opening the connection. */
webSocketSubject.next('second');
webSocketSubject.next('third');
/** After a few seconds close the WebSocket connection. */
window.setTimeout(() => {
  webSocketSubject.complete();
}, 2000);
Let's review the code above:
- First, we import the webSocket()function from the "rxjs/webSocket" module.
- We then create a new WebSocketSubjectusing thewebSocket()function specifying the generic type ofstringfor the next notification values, and the connection URL.
- The "first" message is emitted using the next()method before opening the connecting by subscribing to theWebSocketSubject. All messages sent when the connection is closed are buffered until the connection is opened.
- Next, we subscribe to the WebSocketSubject, which will trigger an attempt to connect to the WebSocket server.
- Then, we send some additional messages, which will be immediately sent to the WebSocket server.
- Finally, after 2000 milliseconds we invoke the complete()method to close the connection to the server.
WebSocketSubjectConfig
The WebSocketSubjectConfig object enables us to provide additional configuration to the WebSocketSubject that is created using the webSocket() operator.
Let's look at a few of the configuration properties:
- urlis the only required property, which specifies the WebSocket connection URL.
- deserializerenables us to override the default behavior of parsing the JSON messages using- JSON.parse()received from the server.
- serializerenables us to create a custom serializer before sending a message to the server.
- openObserverenables us to provide an Observer whose- next()method is invoked when a connection is opened.
- closeObserverenables us to provide an Observer whose- next()method is invoked when a connection is closed.
Let's take a quick look at an example of using the openObserver and closeObserver configuration properties:
import { Subject } from 'rxjs';
import { webSocket } from 'rxjs/webSocket';
/** Observers the open event on the websocket. */
const open = new Subject();
/** Observes the close event on the websocket. */
const close = new Subject();
/** The websocket subject. */
const webSocketSubject = webSocket<string>({
  url: 'wss://websocket-echo.com',
  openObserver: open,
  closeObserver: close
});
/** Subscribe to open and close Observers. */
close.subscribe(console.log);
open.subscribe(console.log);
/** Subscribe to WebSocketSubject to open the connection. */
webSocketSubject.subscribe({
  next: console.log,
  error: (e) => console.error(e),
  complete: () => console.log('complete')
});
/** Send message. */
webSocketSubject.next('first');
/** After a few seconds close the WebSocket connection. */
window.setTimeout(() => {
  webSocketSubject.complete();
}, 2000);
In the example above we are not providing the WebSocketSubjectConfig object to the webSocket() operator.
Let's review the example above:
- First, we create two new Subjectinstances calledopenandclose. We'll use these as Observers to the open and close events for the WebSocket connection.
- We then create a new WebSocketSubjectinstance using thewebSocket()function providing theWebSocketSubjectConfigobject. In the config object we set theurlalong with theopenObserverandcloseObserverproperties. TheopenObserverandcloseObserverproperty values are set to theopenandcloseSubjects accordingly.
- Then, we subscribe to both the closeandopenSubjects and pass theconsole.log()function as the next notification callback function.
- Then, we subscribe to the webSocketSubjectand provide an Observer with properties for each notification type.
- Next, we send a message using the next()method on thewebSocketSubject. We should expect an echo message back from the server.
- Finally, after 2000 milliseconds we invoke the complete()method to close the connection to the server.