import { Subject, AnonymousSubject } from '../../Subject';
import { Subscriber } from '../../Subscriber';
import { Observable } from '../../Observable';
import { Subscription } from '../../Subscription';
import { Operator } from '../../Operator';
import { ReplaySubject } from '../../ReplaySubject';
import { Observer, NextObserver } from '../../types';
/**
* WebSocketSubjectConfig is a plain Object that allows us to make our
* webSocket configurable.
*
* Provides flexibility to {@link webSocket}
*
* It defines a set of properties to provide custom behavior in specific
* moments of the socket's lifecycle. When the connection opens we can
* use `openObserver`, when the connection is closed `closeObserver`, if we
* are interested in listening for data coming from server: `deserializer`,
* which allows us to customize the deserialization strategy of data before passing it
* to the socket client. By default, `deserializer` is going to apply `JSON.parse` to each message coming
* from the Server.
*
* ## Examples
*
* **deserializer**, the default for this property is `JSON.parse` but since there are just two options
* for incoming data, either be text or binary data. We can apply a custom deserialization strategy
* or just simply skip the default behaviour.
*
* ```ts
* import { webSocket } from 'rxjs/webSocket';
*
* const wsSubject = webSocket({
* url: 'ws://localhost:8081',
* //Apply any transformation of your choice.
* deserializer: ({ data }) => data
* });
*
* wsSubject.subscribe(console.log);
*
* // Let's suppose we have this on the Server: ws.send('This is a msg from the server')
* //output
* //
* // This is a msg from the server
* ```
*
* **serializer** allows us to apply custom serialization strategy but for the outgoing messages.
*
* ```ts
* import { webSocket } from 'rxjs/webSocket';
*
* const wsSubject = webSocket({
* url: 'ws://localhost:8081',
* // Apply any transformation of your choice.
* serializer: msg => JSON.stringify({ channel: 'webDevelopment', msg: msg })
* });
*
* wsSubject.subscribe(() => subject.next('msg to the server'));
*
* // Let's suppose we have this on the Server:
* // ws.on('message', msg => console.log);
* // ws.send('This is a msg from the server');
* // output at server side:
* //
* // {"channel":"webDevelopment","msg":"msg to the server"}
* ```
*
* **closeObserver** allows us to set a custom error when an error raises up.
*
* ```ts
* import { webSocket } from 'rxjs/webSocket';
*
* const wsSubject = webSocket({
* url: 'ws://localhost:8081',
* closeObserver: {
* next() {
* const customError = { code: 6666, reason: 'Custom evil reason' }
* console.log(`code: ${ customError.code }, reason: ${ customError.reason }`);
* }
* }
* });
*
* // output
* // code: 6666, reason: Custom evil reason
* ```
*
* **openObserver**, Let's say we need to make some kind of init task before sending/receiving msgs to the
* webSocket or sending notification that the connection was successful, this is when
* openObserver is useful for.
*
* ```ts
* import { webSocket } from 'rxjs/webSocket';
*
* const wsSubject = webSocket({
* url: 'ws://localhost:8081',
* openObserver: {
* next: () => {
* console.log('Connection ok');
* }
* }
* });
*
* // output
* // Connection ok
* ```
*/
export interface WebSocketSubjectConfig {
/** The url of the socket server to connect to */
url: string;
/** The protocol to use to connect */
protocol?: string | Array;
/** @deprecated Will be removed in v8. Use {@link deserializer} instead. */
resultSelector?: (e: MessageEvent) => T;
/**
* A serializer used to create messages from passed values before the
* messages are sent to the server. Defaults to JSON.stringify.
*/
serializer?: (value: T) => WebSocketMessage;
/**
* A deserializer used for messages arriving on the socket from the
* server. Defaults to JSON.parse.
*/
deserializer?: (e: MessageEvent) => T;
/**
* An Observer that watches when open events occur on the underlying web socket.
*/
openObserver?: NextObserver;
/**
* An Observer that watches when close events occur on the underlying web socket
*/
closeObserver?: NextObserver;
/**
* An Observer that watches when a close is about to occur due to
* unsubscription.
*/
closingObserver?: NextObserver;
/**
* A WebSocket constructor to use. This is useful for situations like using a
* WebSocket impl in Node (WebSocket is a DOM API), or for mocking a WebSocket
* for testing purposes
*/
WebSocketCtor?: { new (url: string, protocols?: string | string[]): WebSocket };
/** Sets the `binaryType` property of the underlying WebSocket. */
binaryType?: 'blob' | 'arraybuffer';
}
const DEFAULT_WEBSOCKET_CONFIG: WebSocketSubjectConfig = {
url: '',
deserializer: (e: MessageEvent) => JSON.parse(e.data),
serializer: (value: any) => JSON.stringify(value),
};
const WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT =
'WebSocketSubject.error must be called with an object with an error code, and an optional reason: { code: number, reason: string }';
export type WebSocketMessage = string | ArrayBuffer | Blob | ArrayBufferView;
export class WebSocketSubject extends AnonymousSubject {
// @ts-ignore: Property has no initializer and is not definitely assigned
private _config: WebSocketSubjectConfig;
/** @internal */
// @ts-ignore: Property has no initializer and is not definitely assigned
_output: Subject;
private _socket: WebSocket | null = null;
constructor(urlConfigOrSource: string | WebSocketSubjectConfig | Observable, destination?: Observer) {
super();
if (urlConfigOrSource instanceof Observable) {
this.destination = destination;
this.source = urlConfigOrSource as Observable;
} else {
const config = (this._config = { ...DEFAULT_WEBSOCKET_CONFIG });
this._output = new Subject();
if (typeof urlConfigOrSource === 'string') {
config.url = urlConfigOrSource;
} else {
for (const key in urlConfigOrSource) {
if (urlConfigOrSource.hasOwnProperty(key)) {
(config as any)[key] = (urlConfigOrSource as any)[key];
}
}
}
if (!config.WebSocketCtor && WebSocket) {
config.WebSocketCtor = WebSocket;
} else if (!config.WebSocketCtor) {
throw new Error('no WebSocket constructor can be found');
}
this.destination = new ReplaySubject();
}
}
/** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */
lift(operator: Operator): WebSocketSubject {
const sock = new WebSocketSubject(this._config as WebSocketSubjectConfig, this.destination as any);
sock.operator = operator;
sock.source = this;
return sock;
}
private _resetState() {
this._socket = null;
if (!this.source) {
this.destination = new ReplaySubject();
}
this._output = new Subject();
}
/**
* Creates an {@link Observable}, that when subscribed to, sends a message,
* defined by the `subMsg` function, to the server over the socket to begin a
* subscription to data over that socket. Once data arrives, the
* `messageFilter` argument will be used to select the appropriate data for
* the resulting Observable. When finalization occurs, either due to
* unsubscription, completion, or error, a message defined by the `unsubMsg`
* argument will be sent to the server over the WebSocketSubject.
*
* @param subMsg A function to generate the subscription message to be sent to
* the server. This will still be processed by the serializer in the
* WebSocketSubject's config. (Which defaults to JSON serialization)
* @param unsubMsg A function to generate the unsubscription message to be
* sent to the server at finalization. This will still be processed by the
* serializer in the WebSocketSubject's config.
* @param messageFilter A predicate for selecting the appropriate messages
* from the server for the output stream.
*/
multiplex(subMsg: () => any, unsubMsg: () => any, messageFilter: (value: T) => boolean) {
const self = this;
return new Observable((observer: Observer) => {
try {
self.next(subMsg());
} catch (err) {
observer.error(err);
}
const subscription = self.subscribe({
next: (x) => {
try {
if (messageFilter(x)) {
observer.next(x);
}
} catch (err) {
observer.error(err);
}
},
error: (err) => observer.error(err),
complete: () => observer.complete(),
});
return () => {
try {
self.next(unsubMsg());
} catch (err) {
observer.error(err);
}
subscription.unsubscribe();
};
});
}
private _connectSocket() {
const { WebSocketCtor, protocol, url, binaryType } = this._config;
const observer = this._output;
let socket: WebSocket | null = null;
try {
socket = protocol ? new WebSocketCtor!(url, protocol) : new WebSocketCtor!(url);
this._socket = socket;
if (binaryType) {
this._socket.binaryType = binaryType;
}
} catch (e) {
observer.error(e);
return;
}
const subscription = new Subscription(() => {
this._socket = null;
if (socket && socket.readyState === 1) {
socket.close();
}
});
socket.onopen = (evt: Event) => {
const { _socket } = this;
if (!_socket) {
socket!.close();
this._resetState();
return;
}
const { openObserver } = this._config;
if (openObserver) {
openObserver.next(evt);
}
const queue = this.destination;
this.destination = Subscriber.create(
(x) => {
if (socket!.readyState === 1) {
try {
const { serializer } = this._config;
socket!.send(serializer!(x!));
} catch (e) {
this.destination!.error(e);
}
}
},
(err) => {
const { closingObserver } = this._config;
if (closingObserver) {
closingObserver.next(undefined);
}
if (err && err.code) {
socket!.close(err.code, err.reason);
} else {
observer.error(new TypeError(WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT));
}
this._resetState();
},
() => {
const { closingObserver } = this._config;
if (closingObserver) {
closingObserver.next(undefined);
}
socket!.close();
this._resetState();
}
) as Subscriber;
if (queue && queue instanceof ReplaySubject) {
subscription.add((queue as ReplaySubject).subscribe(this.destination));
}
};
socket.onerror = (e: Event) => {
this._resetState();
observer.error(e);
};
socket.onclose = (e: CloseEvent) => {
if (socket === this._socket) {
this._resetState();
}
const { closeObserver } = this._config;
if (closeObserver) {
closeObserver.next(e);
}
if (e.wasClean) {
observer.complete();
} else {
observer.error(e);
}
};
socket.onmessage = (e: MessageEvent) => {
try {
const { deserializer } = this._config;
observer.next(deserializer!(e));
} catch (err) {
observer.error(err);
}
};
}
/** @internal */
protected _subscribe(subscriber: Subscriber): Subscription {
const { source } = this;
if (source) {
return source.subscribe(subscriber);
}
if (!this._socket) {
this._connectSocket();
}
this._output.subscribe(subscriber);
subscriber.add(() => {
const { _socket } = this;
if (this._output.observers.length === 0) {
if (_socket && (_socket.readyState === 1 || _socket.readyState === 0)) {
_socket.close();
}
this._resetState();
}
});
return subscriber;
}
unsubscribe() {
const { _socket } = this;
if (_socket && (_socket.readyState === 1 || _socket.readyState === 0)) {
_socket.close();
}
this._resetState();
super.unsubscribe();
}
}