commit
4736ca490c
@ -0,0 +1,152 @@
|
||||
import WS from 'ws';
|
||||
import { Socket as EIOSocket } from 'engine.io-client';
|
||||
|
||||
export interface GristClientSocketOptions {
|
||||
headers?: Record<string, string>;
|
||||
}
|
||||
|
||||
export class GristClientSocket {
|
||||
// Exactly one of _wsSocket and _eioSocket will be set at any one time.
|
||||
private _wsSocket: WS.WebSocket | WebSocket | undefined;
|
||||
private _eioSocket: EIOSocket | undefined;
|
||||
|
||||
// Set to true if a WebSocket connection (in _wsSocket) was succesfully
|
||||
// established. Errors from the underlying WebSocket are not forwarded to
|
||||
// the client until that point, in case we end up downgrading to Engine.IO.
|
||||
private _wsConnected: boolean = false;
|
||||
|
||||
private _messageHandler: null | ((data: string) => void);
|
||||
private _openHandler: null | (() => void);
|
||||
private _errorHandler: null | ((err: Error) => void);
|
||||
private _closeHandler: null | (() => void);
|
||||
|
||||
constructor(private _url: string, private _options?: GristClientSocketOptions) {
|
||||
this._createWSSocket();
|
||||
}
|
||||
|
||||
public set onmessage(cb: null | ((data: string) => void)) {
|
||||
this._messageHandler = cb;
|
||||
}
|
||||
|
||||
public set onopen(cb: null | (() => void)) {
|
||||
this._openHandler = cb;
|
||||
}
|
||||
|
||||
public set onerror(cb: null | ((err: Error) => void)) {
|
||||
this._errorHandler = cb;
|
||||
}
|
||||
|
||||
public set onclose(cb: null | (() => void)) {
|
||||
this._closeHandler = cb;
|
||||
}
|
||||
|
||||
public close() {
|
||||
if (this._wsSocket) {
|
||||
this._wsSocket.close();
|
||||
} else {
|
||||
this._eioSocket!.close();
|
||||
}
|
||||
}
|
||||
|
||||
public send(data: string) {
|
||||
if (this._wsSocket) {
|
||||
this._wsSocket.send(data);
|
||||
} else {
|
||||
this._eioSocket!.send(data);
|
||||
}
|
||||
}
|
||||
|
||||
// pause() and resume() are used for tests and assume a WS.WebSocket transport
|
||||
public pause() {
|
||||
(this._wsSocket as WS.WebSocket)?.pause();
|
||||
}
|
||||
|
||||
public resume() {
|
||||
(this._wsSocket as WS.WebSocket)?.resume();
|
||||
}
|
||||
|
||||
private _createWSSocket() {
|
||||
if (typeof WebSocket !== 'undefined') {
|
||||
this._wsSocket = new WebSocket(this._url);
|
||||
} else {
|
||||
this._wsSocket = new WS(this._url, undefined, this._options);
|
||||
}
|
||||
this._wsSocket.onmessage = this._onWSMessage.bind(this);
|
||||
this._wsSocket.onopen = this._onWSOpen.bind(this);
|
||||
this._wsSocket.onerror = this._onWSError.bind(this);
|
||||
this._wsSocket.onclose = this._onWSClose.bind(this);
|
||||
}
|
||||
|
||||
private _destroyWSSocket() {
|
||||
if (this._wsSocket) {
|
||||
this._wsSocket.onmessage = null;
|
||||
this._wsSocket.onopen = null;
|
||||
this._wsSocket.onerror = null;
|
||||
this._wsSocket.onclose = null;
|
||||
this._wsSocket = undefined;
|
||||
}
|
||||
}
|
||||
|
||||
private _onWSMessage(event: WS.MessageEvent | MessageEvent<any>) {
|
||||
// event.data is guaranteed to be a string here because we only send text frames.
|
||||
// https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/message_event#event_properties
|
||||
this._messageHandler?.(event.data);
|
||||
}
|
||||
|
||||
private _onWSOpen() {
|
||||
// The connection was established successfully. Any future events can now
|
||||
// be forwarded to the client.
|
||||
this._wsConnected = true;
|
||||
this._openHandler?.();
|
||||
}
|
||||
|
||||
private _onWSError(ev: Event) {
|
||||
if (!this._wsConnected) {
|
||||
// The WebSocket connection attempt failed. Switch to Engine.IO.
|
||||
this._destroyWSSocket();
|
||||
this._createEIOSocket();
|
||||
return;
|
||||
}
|
||||
|
||||
// WebSocket error events are deliberately void of information,
|
||||
// see https://websockets.spec.whatwg.org/#eventdef-websocket-error,
|
||||
// so we ignore the incoming event.
|
||||
this._errorHandler?.(new Error("WebSocket error"));
|
||||
}
|
||||
|
||||
private _onWSClose() {
|
||||
this._closeHandler?.();
|
||||
}
|
||||
|
||||
private _createEIOSocket() {
|
||||
this._eioSocket = new EIOSocket(this._url, {
|
||||
path: new URL(this._url).pathname,
|
||||
addTrailingSlash: false,
|
||||
transports: ['polling'],
|
||||
upgrade: false,
|
||||
extraHeaders: this._options?.headers,
|
||||
withCredentials: true,
|
||||
});
|
||||
|
||||
this._eioSocket.on('message', this._onEIOMessage.bind(this));
|
||||
this._eioSocket.on('open', this._onEIOOpen.bind(this));
|
||||
this._eioSocket.on('error', this._onEIOError.bind(this));
|
||||
this._eioSocket.on('close', this._onEIOClose.bind(this));
|
||||
}
|
||||
|
||||
private _onEIOMessage(data: string) {
|
||||
this._messageHandler?.(data);
|
||||
}
|
||||
|
||||
private _onEIOOpen() {
|
||||
this._openHandler?.();
|
||||
}
|
||||
|
||||
private _onEIOError(err: string | Error) {
|
||||
this._errorHandler?.(typeof err === "string" ? new Error(err) : err);
|
||||
}
|
||||
|
||||
private _onEIOClose() {
|
||||
this._closeHandler?.();
|
||||
}
|
||||
}
|
@ -0,0 +1,138 @@
|
||||
import * as WS from 'ws';
|
||||
import * as EIO from 'engine.io';
|
||||
|
||||
export abstract class GristServerSocket {
|
||||
public abstract set onerror(handler: (err: Error) => void);
|
||||
public abstract set onclose(handler: () => void);
|
||||
public abstract set onmessage(handler: (data: string) => void);
|
||||
public abstract removeAllListeners(): void;
|
||||
public abstract close(): void;
|
||||
public abstract terminate(): void;
|
||||
public abstract get isOpen(): boolean;
|
||||
public abstract send(data: string, cb?: (err?: Error) => void): void;
|
||||
}
|
||||
|
||||
export class GristServerSocketEIO extends GristServerSocket {
|
||||
private _eventHandlers: Array<{ event: string, handler: (...args: any[]) => void }> = [];
|
||||
private _messageCounter = 0;
|
||||
|
||||
// Engine.IO only invokes send() callbacks on success. We keep a map of
|
||||
// send callbacks for messages in flight so that we can invoke them for
|
||||
// any messages still unsent upon receiving a "close" event.
|
||||
private _messageCallbacks: Map<number, (err: Error) => void> = new Map();
|
||||
|
||||
constructor(private _socket: EIO.Socket) { super(); }
|
||||
|
||||
public set onerror(handler: (err: Error) => void) {
|
||||
// Note that as far as I can tell, Engine.IO sockets never emit "error"
|
||||
// but instead include error information in the "close" event.
|
||||
this._socket.on('error', handler);
|
||||
this._eventHandlers.push({ event: 'error', handler });
|
||||
}
|
||||
|
||||
public set onclose(handler: () => void) {
|
||||
const wrappedHandler = (reason: string, description: any) => {
|
||||
// In practice, when available, description has more error details,
|
||||
// possibly in the form of an Error object.
|
||||
const maybeErr = description ?? reason;
|
||||
const err = maybeErr instanceof Error ? maybeErr : new Error(maybeErr);
|
||||
for (const cb of this._messageCallbacks.values()) {
|
||||
cb(err);
|
||||
}
|
||||
this._messageCallbacks.clear();
|
||||
|
||||
handler();
|
||||
};
|
||||
this._socket.on('close', wrappedHandler);
|
||||
this._eventHandlers.push({ event: 'close', handler: wrappedHandler });
|
||||
}
|
||||
|
||||
public set onmessage(handler: (data: string) => void) {
|
||||
const wrappedHandler = (msg: Buffer) => {
|
||||
handler(msg.toString());
|
||||
};
|
||||
this._socket.on('message', wrappedHandler);
|
||||
this._eventHandlers.push({ event: 'message', handler: wrappedHandler });
|
||||
}
|
||||
|
||||
public removeAllListeners() {
|
||||
for (const { event, handler } of this._eventHandlers) {
|
||||
this._socket.off(event, handler);
|
||||
}
|
||||
this._eventHandlers = [];
|
||||
}
|
||||
|
||||
public close() {
|
||||
this._socket.close();
|
||||
}
|
||||
|
||||
// Terminates the connection without waiting for the client to close its own side.
|
||||
public terminate() {
|
||||
// Trigger a normal close. For the polling transport, this is sufficient and instantaneous.
|
||||
this._socket.close(/* discard */ true);
|
||||
}
|
||||
|
||||
public get isOpen() {
|
||||
return this._socket.readyState === 'open';
|
||||
}
|
||||
|
||||
public send(data: string, cb?: (err?: Error) => void) {
|
||||
const msgNum = this._messageCounter++;
|
||||
if (cb) {
|
||||
this._messageCallbacks.set(msgNum, cb);
|
||||
}
|
||||
this._socket.send(data, {}, () => {
|
||||
if (cb && this._messageCallbacks.delete(msgNum)) {
|
||||
// send was successful: pass no Error to callback
|
||||
cb();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
export class GristServerSocketWS extends GristServerSocket {
|
||||
private _eventHandlers: Array<{ event: string, handler: (...args: any[]) => void }> = [];
|
||||
|
||||
constructor(private _ws: WS.WebSocket) { super(); }
|
||||
|
||||
public set onerror(handler: (err: Error) => void) {
|
||||
this._ws.on('error', handler);
|
||||
this._eventHandlers.push({ event: 'error', handler });
|
||||
}
|
||||
|
||||
public set onclose(handler: () => void) {
|
||||
this._ws.on('close', handler);
|
||||
this._eventHandlers.push({ event: 'close', handler });
|
||||
}
|
||||
|
||||
public set onmessage(handler: (data: string) => void) {
|
||||
const wrappedHandler = (msg: Buffer) => handler(msg.toString());
|
||||
this._ws.on('message', wrappedHandler);
|
||||
this._eventHandlers.push({ event: 'message', handler: wrappedHandler });
|
||||
}
|
||||
|
||||
public removeAllListeners() {
|
||||
// Avoiding websocket.removeAllListeners() because WS.Server registers listeners
|
||||
// internally for websockets it keeps track of, and we should not accidentally remove those.
|
||||
for (const { event, handler } of this._eventHandlers) {
|
||||
this._ws.off(event, handler);
|
||||
}
|
||||
this._eventHandlers = [];
|
||||
}
|
||||
|
||||
public close() {
|
||||
this._ws.close();
|
||||
}
|
||||
|
||||
public terminate() {
|
||||
this._ws.terminate();
|
||||
}
|
||||
|
||||
public get isOpen() {
|
||||
return this._ws.readyState === WS.OPEN;
|
||||
}
|
||||
|
||||
public send(data: string, cb?: (err?: Error) => void) {
|
||||
this._ws.send(data, cb);
|
||||
}
|
||||
}
|
@ -0,0 +1,111 @@
|
||||
import * as http from 'http';
|
||||
import * as WS from 'ws';
|
||||
import * as EIO from 'engine.io';
|
||||
import {GristServerSocket, GristServerSocketEIO, GristServerSocketWS} from './GristServerSocket';
|
||||
import * as net from 'net';
|
||||
|
||||
const MAX_PAYLOAD = 100e6;
|
||||
|
||||
export interface GristSocketServerOptions {
|
||||
verifyClient?: (request: http.IncomingMessage) => Promise<boolean>;
|
||||
}
|
||||
|
||||
export class GristSocketServer {
|
||||
private _wsServer: WS.Server;
|
||||
private _eioServer: EIO.Server;
|
||||
private _connectionHandler: (socket: GristServerSocket, req: http.IncomingMessage) => void;
|
||||
|
||||
constructor(server: http.Server, private _options?: GristSocketServerOptions) {
|
||||
this._wsServer = new WS.Server({ noServer: true, maxPayload: MAX_PAYLOAD });
|
||||
|
||||
this._eioServer = new EIO.Server({
|
||||
// We only use Engine.IO for its polling transport,
|
||||
// so we disable the built-in Engine.IO upgrade mechanism.
|
||||
allowUpgrades: false,
|
||||
transports: ['polling'],
|
||||
maxHttpBufferSize: MAX_PAYLOAD,
|
||||
cors: {
|
||||
// This will cause Engine.IO to reflect any client-provided Origin into
|
||||
// the Access-Control-Allow-Origin header, essentially disabling the
|
||||
// protection offered by the Same-Origin Policy. This sounds insecure
|
||||
// but is actually the security model of native WebSockets (they are
|
||||
// not covered by SOP; any webpage can open a WebSocket connecting to
|
||||
// any other domain, including the target domain's cookies; it is up to
|
||||
// the receiving server to check the request's Origin header). Since
|
||||
// the connection attempt is validated in `verifyClient` later,
|
||||
// it is safe to let any client attempt a connection here.
|
||||
origin: true,
|
||||
// We need to allow the client to send its cookies. See above for the
|
||||
// reasoning on why it is safe to do so.
|
||||
credentials: true,
|
||||
methods: ["GET", "POST"],
|
||||
},
|
||||
});
|
||||
|
||||
this._eioServer.on('connection', this._onEIOConnection.bind(this));
|
||||
|
||||
this._attach(server);
|
||||
}
|
||||
|
||||
public set onconnection(handler: (socket: GristServerSocket, req: http.IncomingMessage) => void) {
|
||||
this._connectionHandler = handler;
|
||||
}
|
||||
|
||||
public close(cb: (...args: any[]) => void) {
|
||||
this._eioServer.close();
|
||||
|
||||
// Terminate all clients. WS.Server used to do it automatically in close() but no
|
||||
// longer does (see https://github.com/websockets/ws/pull/1904#discussion_r668844565).
|
||||
for (const ws of this._wsServer.clients) {
|
||||
ws.terminate();
|
||||
}
|
||||
this._wsServer.close(cb);
|
||||
}
|
||||
|
||||
private _attach(server: http.Server) {
|
||||
// Forward all WebSocket upgrade requests to WS
|
||||
server.on('upgrade', async (request, socket, head) => {
|
||||
if (this._options?.verifyClient && !await this._options.verifyClient(request)) {
|
||||
// Because we are handling an "upgrade" event, we don't have access to
|
||||
// a "response" object, just the raw socket. We can still construct
|
||||
// a well-formed HTTP error response.
|
||||
socket.write('HTTP/1.1 403 Forbidden\r\n\r\n');
|
||||
socket.destroy();
|
||||
return;
|
||||
}
|
||||
this._wsServer.handleUpgrade(request, socket as net.Socket, head, (client) => {
|
||||
this._connectionHandler?.(new GristServerSocketWS(client), request);
|
||||
});
|
||||
});
|
||||
|
||||
// At this point an Express app is installed as the handler for the server's
|
||||
// "request" event. We need to install our own listener instead, to intercept
|
||||
// requests that are meant for the Engine.IO polling implementation.
|
||||
const listeners = [...server.listeners("request")];
|
||||
server.removeAllListeners("request");
|
||||
server.on("request", async (req, res) => {
|
||||
// Intercept requests that have transport=polling in their querystring
|
||||
if (/[&?]transport=polling(&|$)/.test(req.url ?? '')) {
|
||||
if (this._options?.verifyClient && !await this._options.verifyClient(req)) {
|
||||
res.writeHead(403).end();
|
||||
return;
|
||||
}
|
||||
|
||||
this._eioServer.handleRequest(req, res);
|
||||
} else {
|
||||
// Otherwise fallback to the pre-existing listener(s)
|
||||
for (const listener of listeners) {
|
||||
listener.call(server, req, res);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
server.on("close", this.close.bind(this));
|
||||
}
|
||||
|
||||
private _onEIOConnection(socket: EIO.Socket) {
|
||||
const req = socket.request;
|
||||
(socket as any).request = null; // Free initial request as recommended in the Engine.IO documentation
|
||||
this._connectionHandler?.(new GristServerSocketEIO(socket), req);
|
||||
}
|
||||
}
|
@ -0,0 +1,170 @@
|
||||
import { assert } from 'chai';
|
||||
import * as http from 'http';
|
||||
import { GristClientSocket } from 'app/client/components/GristClientSocket';
|
||||
import { GristSocketServer } from 'app/server/lib/GristSocketServer';
|
||||
import { fromCallback, listenPromise } from 'app/server/lib/serverUtils';
|
||||
import { AddressInfo } from 'net';
|
||||
import httpProxy from 'http-proxy';
|
||||
|
||||
describe(`GristSockets`, function () {
|
||||
|
||||
for (const webSocketsSupported of [true, false]) {
|
||||
describe(`when the networks ${webSocketsSupported ? "supports" : "does not support"} WebSockets`, function () {
|
||||
|
||||
let server: http.Server | null;
|
||||
let serverPort: number;
|
||||
let socketServer: GristSocketServer | null;
|
||||
let proxy: httpProxy | null;
|
||||
let proxyServer: http.Server | null;
|
||||
let proxyPort: number;
|
||||
let wsAddress: string;
|
||||
|
||||
beforeEach(async function () {
|
||||
await startSocketServer();
|
||||
await startProxyServer();
|
||||
});
|
||||
|
||||
afterEach(async function () {
|
||||
await stopProxyServer();
|
||||
await stopSocketServer();
|
||||
});
|
||||
|
||||
async function startSocketServer() {
|
||||
server = http.createServer((req, res) => res.writeHead(404).end());
|
||||
socketServer = new GristSocketServer(server);
|
||||
await listenPromise(server.listen(0, 'localhost'));
|
||||
serverPort = (server.address() as AddressInfo).port;
|
||||
}
|
||||
|
||||
async function stopSocketServer() {
|
||||
await fromCallback(cb => socketServer?.close(cb));
|
||||
await fromCallback(cb => { server?.close(); server?.closeAllConnections(); server?.on("close", cb); });
|
||||
socketServer = server = null;
|
||||
}
|
||||
|
||||
// Start an HTTP proxy that supports WebSockets or not
|
||||
async function startProxyServer() {
|
||||
proxy = httpProxy.createProxy({
|
||||
target: `http://localhost:${serverPort}`,
|
||||
ws: webSocketsSupported,
|
||||
timeout: 1000,
|
||||
});
|
||||
proxy.on('error', () => { });
|
||||
proxyServer = http.createServer();
|
||||
|
||||
if (webSocketsSupported) {
|
||||
// prevent non-WebSocket requests
|
||||
proxyServer.on('request', (req, res) => res.writeHead(404).end());
|
||||
// proxy WebSocket requests
|
||||
proxyServer.on('upgrade', (req, socket, head) => proxy!.ws(req, socket, head));
|
||||
} else {
|
||||
// proxy non-WebSocket requests
|
||||
proxyServer.on('request', (req, res) => proxy!.web(req, res));
|
||||
// don't leave WebSocket connection attempts hanging
|
||||
proxyServer.on('upgrade', (req, socket, head) => socket.destroy());
|
||||
}
|
||||
|
||||
await listenPromise(proxyServer.listen(0, 'localhost'));
|
||||
proxyPort = (proxyServer.address() as AddressInfo).port;
|
||||
wsAddress = `ws://localhost:${proxyPort}`;
|
||||
}
|
||||
|
||||
async function stopProxyServer() {
|
||||
if (proxy) {
|
||||
proxy.close();
|
||||
proxy = null;
|
||||
}
|
||||
if (proxyServer) {
|
||||
const server = proxyServer;
|
||||
await fromCallback(cb => { server.close(cb); server.closeAllConnections(); });
|
||||
}
|
||||
proxyServer = null;
|
||||
}
|
||||
|
||||
function getMessages(ws: GristClientSocket, count: number): Promise<string[]> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const messages: string[] = [];
|
||||
ws.onerror = (err) => {
|
||||
ws.onerror = ws.onmessage = null;
|
||||
reject(err);
|
||||
};
|
||||
ws.onmessage = (data: string) => {
|
||||
messages.push(data);
|
||||
if (messages.length >= count) {
|
||||
ws.onerror = ws.onmessage = null;
|
||||
resolve(messages);
|
||||
}
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a promise for the connected websocket.
|
||||
*/
|
||||
function connectClient(url: string): Promise<GristClientSocket> {
|
||||
const socket = new GristClientSocket(url);
|
||||
return new Promise<GristClientSocket>((resolve, reject) => {
|
||||
socket.onopen = () => {
|
||||
socket.onerror = null;
|
||||
resolve(socket);
|
||||
};
|
||||
socket.onerror = (err) => {
|
||||
socket.onopen = null;
|
||||
reject(err);
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
it("should expose initial request", async function () {
|
||||
const connectionPromise = new Promise<http.IncomingMessage>((resolve) => {
|
||||
socketServer!.onconnection = (socket, req) => {
|
||||
resolve(req);
|
||||
};
|
||||
});
|
||||
const clientWs = new GristClientSocket(wsAddress + "/path?query=value", {
|
||||
headers: { "cookie": "session=1234" }
|
||||
});
|
||||
const req = await connectionPromise;
|
||||
clientWs.close();
|
||||
|
||||
// Engine.IO may append extra query parameters, so we check only the start of the URL
|
||||
assert.match(req.url!, /^\/path\?query=value/);
|
||||
|
||||
assert.equal(req.headers.cookie, "session=1234");
|
||||
});
|
||||
|
||||
it("should receive and send messages", async function () {
|
||||
socketServer!.onconnection = (socket, req) => {
|
||||
socket.onmessage = (data) => {
|
||||
socket.send("hello, " + data);
|
||||
};
|
||||
};
|
||||
const clientWs = await connectClient(wsAddress);
|
||||
clientWs.send("world");
|
||||
assert.deepEqual(await getMessages(clientWs, 1), ["hello, world"]);
|
||||
clientWs.close();
|
||||
});
|
||||
|
||||
it("should invoke send callbacks", async function () {
|
||||
const connectionPromise = new Promise<void>((resolve) => {
|
||||
socketServer!.onconnection = (socket, req) => {
|
||||
socket.send("hello", () => resolve());
|
||||
};
|
||||
});
|
||||
const clientWs = await connectClient(wsAddress);
|
||||
await connectionPromise;
|
||||
clientWs.close();
|
||||
});
|
||||
|
||||
it("should emit close event for client", async function () {
|
||||
const clientWs = await connectClient(wsAddress);
|
||||
const closePromise = new Promise<void>(resolve => {
|
||||
clientWs.onclose = resolve;
|
||||
});
|
||||
clientWs.close();
|
||||
await closePromise;
|
||||
});
|
||||
|
||||
});
|
||||
}
|
||||
});
|
Loading…
Reference in new issue