WebSocketSubject
WebSocket
The WebSocket protocol enables a single connection with full-duplex communication. WebSocket connections are ideal for streaming data in real-time.
Some example use cases for a WebSocket may include:
- Chat application
- Financial application
- News application
- Sports game application
- Game application
At this point you may be thinking:
WebSockets + RxJs sound like a perfect match
RxJS is indeed well suited for streaming data from a WebSocket into an application.
webSocket()
Function
To create a new WebSocketSubject
we will use the webSocket()
function.
The function signature is as follows:
webSocket<T>(urlConfigOrSource: string | WebSocketSubjectConfig<T>): WebSocketSubject<T>
Let's break this down:
- First, we can specify the generic type
T
for the next notification values. - The function accepts either the
string
URL for connecting to a WebSocket or aWebSocketSubjectConfig
object. - Finally, the function returns an instance of the
WebSocketSubject
.
WebSocketSubject
The WebSocketSubject
enables us to communicate with the WebSocket:
- An attempt to open a connection occurs upon subscribing to the
WebSocketSubject
unless a connection is already open. Additional Observers to theWebSocketSubject
instance will result in a multicast of notifications from the single connection. - The connection is closed when the number of Observers goes from
1
to0
. - All Observers receive a next notification for messages received from the server.
By default, messages are deserialized via
JSON.parse()
. - All Observers receive a complete notification when the connection is closed.
- All Observers receive an error notification when an error occurs.
- Emitting a next notification using the
next()
method on theWebSocketSubject
instance sends a message to the server. - Emitting a complete notification using the
complete()
method on theWebSocketSubject
instance will close the connection. - Emitting an error notification using the
error()
method on theWebSocketSubject
instance will close the connection and send the status code to the server.
The WebSocketSubject
also provides the ability for multiplexing.
This involves emulating multiple separate WebSocket connections through a single connection to improve performance.
Example
Let's look at an example to get started:
import { webSocket } from 'rxjs/webSocket';
/** Create a new WebSocketSubject using the webSocket operator. */
const webSocketSubject = webSocket<string>('wss://websocket-echo.com');
/** Send a message prior to opening the connection (subscribing). */
webSocketSubject.next('first');
/** Subscribe to the WebSocketSubject. */
webSocketSubject.subscribe({
error: e => console.error(e),
next: console.log,
complete: () => console.log('complete')
});
/** Send additional messages after opening the connection. */
webSocketSubject.next('second');
webSocketSubject.next('third');
/** After a few seconds close the WebSocket connection. */
window.setTimeout(() => {
webSocketSubject.complete();
}, 2000);
Let's review the code above:
- First, we import the
webSocket()
function from the "rxjs/webSocket" module. - We then create a new
WebSocketSubject
using thewebSocket()
function specifying the generic type ofstring
for the next notification values, and the connection URL. - The "first" message is emitted using the
next()
method before opening the connecting by subscribing to theWebSocketSubject
. All messages sent when the connection is closed are buffered until the connection is opened. - Next, we subscribe to the
WebSocketSubject
, which will trigger an attempt to connect to the WebSocket server. - Then, we send some additional messages, which will be immediately sent to the WebSocket server.
- Finally, after 2000 milliseconds we invoke the
complete()
method to close the connection to the server.
WebSocketSubjectConfig
The WebSocketSubjectConfig
object enables us to provide additional configuration to the WebSocketSubject
that is created using the webSocket()
operator.
Let's look at a few of the configuration properties:
url
is the only required property, which specifies the WebSocket connection URL.deserializer
enables us to override the default behavior of parsing the JSON messages usingJSON.parse()
received from the server.serializer
enables us to create a custom serializer before sending a message to the server.openObserver
enables us to provide an Observer whosenext()
method is invoked when a connection is opened.closeObserver
enables us to provide an Observer whosenext()
method is invoked when a connection is closed.
Let's take a quick look at an example of using the openObserver
and closeObserver
configuration properties:
import { Subject } from 'rxjs';
import { webSocket } from 'rxjs/webSocket';
/** Observers the open event on the websocket. */
const open = new Subject();
/** Observes the close event on the websocket. */
const close = new Subject();
/** The websocket subject. */
const webSocketSubject = webSocket<string>({
url: 'wss://websocket-echo.com',
openObserver: open,
closeObserver: close
});
/** Subscribe to open and close Observers. */
close.subscribe(console.log);
open.subscribe(console.log);
/** Subscribe to WebSocketSubject to open the connection. */
webSocketSubject.subscribe({
next: console.log,
error: (e) => console.error(e),
complete: () => console.log('complete')
});
/** Send message. */
webSocketSubject.next('first');
/** After a few seconds close the WebSocket connection. */
window.setTimeout(() => {
webSocketSubject.complete();
}, 2000);
In the example above we are not providing the WebSocketSubjectConfig
object to the webSocket()
operator.
Let's review the example above:
- First, we create two new
Subject
instances calledopen
andclose
. We'll use these as Observers to the open and close events for the WebSocket connection. - We then create a new
WebSocketSubject
instance using thewebSocket()
function providing theWebSocketSubjectConfig
object. In the config object we set theurl
along with theopenObserver
andcloseObserver
properties. TheopenObserver
andcloseObserver
property values are set to theopen
andclose
Subjects accordingly. - Then, we subscribe to both the
close
andopen
Subjects and pass theconsole.log()
function as the next notification callback function. - Then, we subscribe to the
webSocketSubject
and provide an Observer with properties for each notification type. - Next, we send a message using the
next()
method on thewebSocketSubject
. We should expect an echo message back from the server. - Finally, after 2000 milliseconds we invoke the
complete()
method to close the connection to the server.