Support HTTP long polling as an alternative to WebSockets (#859)
The motivation for supporting an alternative to WebSockets is that while all browsers supported by Grist offer native WebSocket support, some networking environments do not allow WebSocket traffic. Engine.IO is used as the underlying implementation of HTTP long polling. The Grist client will first attempt a regular WebSocket connection, using the same protocol and endpoints as before, but fall back to long polling using Engine.IO if the WebSocket connection fails. Include these changes: - CORS websocket requests are now rejected as a stronger security measure. This shouldn’t affect anything in practice; but previously it could be possible to make unauthenticated websocket requests from another origin. - GRIST_HOST variable no longer affects CORS responses (also should not affect anything in practice, as it wasn't serving a useful purpose)pull/920/head
parent
b4c2562029
commit
96b652fb52
@ -0,0 +1,152 @@
|
||||
import WS from 'ws';
|
||||
import { Socket as EIOSocket } from 'engine.io-client';
|
||||
|
||||
export interface GristClientSocketOptions {
|
||||
headers?: Record<string, string>;
|
||||
}
|
||||
|
||||
export class GristClientSocket {
|
||||
// Exactly one of _wsSocket and _eioSocket will be set at any one time.
|
||||
private _wsSocket: WS.WebSocket | WebSocket | undefined;
|
||||
private _eioSocket: EIOSocket | undefined;
|
||||
|
||||
// Set to true if a WebSocket connection (in _wsSocket) was succesfully
|
||||
// established. Errors from the underlying WebSocket are not forwarded to
|
||||
// the client until that point, in case we end up downgrading to Engine.IO.
|
||||
private _wsConnected: boolean = false;
|
||||
|
||||
private _messageHandler: null | ((data: string) => void);
|
||||
private _openHandler: null | (() => void);
|
||||
private _errorHandler: null | ((err: Error) => void);
|
||||
private _closeHandler: null | (() => void);
|
||||
|
||||
constructor(private _url: string, private _options?: GristClientSocketOptions) {
|
||||
this._createWSSocket();
|
||||
}
|
||||
|
||||
public set onmessage(cb: null | ((data: string) => void)) {
|
||||
this._messageHandler = cb;
|
||||
}
|
||||
|
||||
public set onopen(cb: null | (() => void)) {
|
||||
this._openHandler = cb;
|
||||
}
|
||||
|
||||
public set onerror(cb: null | ((err: Error) => void)) {
|
||||
this._errorHandler = cb;
|
||||
}
|
||||
|
||||
public set onclose(cb: null | (() => void)) {
|
||||
this._closeHandler = cb;
|
||||
}
|
||||
|
||||
public close() {
|
||||
if (this._wsSocket) {
|
||||
this._wsSocket.close();
|
||||
} else {
|
||||
this._eioSocket!.close();
|
||||
}
|
||||
}
|
||||
|
||||
public send(data: string) {
|
||||
if (this._wsSocket) {
|
||||
this._wsSocket.send(data);
|
||||
} else {
|
||||
this._eioSocket!.send(data);
|
||||
}
|
||||
}
|
||||
|
||||
// pause() and resume() are used for tests and assume a WS.WebSocket transport
|
||||
public pause() {
|
||||
(this._wsSocket as WS.WebSocket)?.pause();
|
||||
}
|
||||
|
||||
public resume() {
|
||||
(this._wsSocket as WS.WebSocket)?.resume();
|
||||
}
|
||||
|
||||
private _createWSSocket() {
|
||||
if (typeof WebSocket !== 'undefined') {
|
||||
this._wsSocket = new WebSocket(this._url);
|
||||
} else {
|
||||
this._wsSocket = new WS(this._url, undefined, this._options);
|
||||
}
|
||||
this._wsSocket.onmessage = this._onWSMessage.bind(this);
|
||||
this._wsSocket.onopen = this._onWSOpen.bind(this);
|
||||
this._wsSocket.onerror = this._onWSError.bind(this);
|
||||
this._wsSocket.onclose = this._onWSClose.bind(this);
|
||||
}
|
||||
|
||||
private _destroyWSSocket() {
|
||||
if (this._wsSocket) {
|
||||
this._wsSocket.onmessage = null;
|
||||
this._wsSocket.onopen = null;
|
||||
this._wsSocket.onerror = null;
|
||||
this._wsSocket.onclose = null;
|
||||
this._wsSocket = undefined;
|
||||
}
|
||||
}
|
||||
|
||||
private _onWSMessage(event: WS.MessageEvent | MessageEvent<any>) {
|
||||
// event.data is guaranteed to be a string here because we only send text frames.
|
||||
// https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/message_event#event_properties
|
||||
this._messageHandler?.(event.data);
|
||||
}
|
||||
|
||||
private _onWSOpen() {
|
||||
// The connection was established successfully. Any future events can now
|
||||
// be forwarded to the client.
|
||||
this._wsConnected = true;
|
||||
this._openHandler?.();
|
||||
}
|
||||
|
||||
private _onWSError(ev: Event) {
|
||||
if (!this._wsConnected) {
|
||||
// The WebSocket connection attempt failed. Switch to Engine.IO.
|
||||
this._destroyWSSocket();
|
||||
this._createEIOSocket();
|
||||
return;
|
||||
}
|
||||
|
||||
// WebSocket error events are deliberately void of information,
|
||||
// see https://websockets.spec.whatwg.org/#eventdef-websocket-error,
|
||||
// so we ignore the incoming event.
|
||||
this._errorHandler?.(new Error("WebSocket error"));
|
||||
}
|
||||
|
||||
private _onWSClose() {
|
||||
this._closeHandler?.();
|
||||
}
|
||||
|
||||
private _createEIOSocket() {
|
||||
this._eioSocket = new EIOSocket(this._url, {
|
||||
path: new URL(this._url).pathname,
|
||||
addTrailingSlash: false,
|
||||
transports: ['polling'],
|
||||
upgrade: false,
|
||||
extraHeaders: this._options?.headers,
|
||||
withCredentials: true,
|
||||
});
|
||||
|
||||
this._eioSocket.on('message', this._onEIOMessage.bind(this));
|
||||
this._eioSocket.on('open', this._onEIOOpen.bind(this));
|
||||
this._eioSocket.on('error', this._onEIOError.bind(this));
|
||||
this._eioSocket.on('close', this._onEIOClose.bind(this));
|
||||
}
|
||||
|
||||
private _onEIOMessage(data: string) {
|
||||
this._messageHandler?.(data);
|
||||
}
|
||||
|
||||
private _onEIOOpen() {
|
||||
this._openHandler?.();
|
||||
}
|
||||
|
||||
private _onEIOError(err: string | Error) {
|
||||
this._errorHandler?.(typeof err === "string" ? new Error(err) : err);
|
||||
}
|
||||
|
||||
private _onEIOClose() {
|
||||
this._closeHandler?.();
|
||||
}
|
||||
}
|
@ -0,0 +1,138 @@
|
||||
import * as WS from 'ws';
|
||||
import * as EIO from 'engine.io';
|
||||
|
||||
export abstract class GristServerSocket {
|
||||
public abstract set onerror(handler: (err: Error) => void);
|
||||
public abstract set onclose(handler: () => void);
|
||||
public abstract set onmessage(handler: (data: string) => void);
|
||||
public abstract removeAllListeners(): void;
|
||||
public abstract close(): void;
|
||||
public abstract terminate(): void;
|
||||
public abstract get isOpen(): boolean;
|
||||
public abstract send(data: string, cb?: (err?: Error) => void): void;
|
||||
}
|
||||
|
||||
export class GristServerSocketEIO extends GristServerSocket {
|
||||
private _eventHandlers: Array<{ event: string, handler: (...args: any[]) => void }> = [];
|
||||
private _messageCounter = 0;
|
||||
|
||||
// Engine.IO only invokes send() callbacks on success. We keep a map of
|
||||
// send callbacks for messages in flight so that we can invoke them for
|
||||
// any messages still unsent upon receiving a "close" event.
|
||||
private _messageCallbacks: Map<number, (err: Error) => void> = new Map();
|
||||
|
||||
constructor(private _socket: EIO.Socket) { super(); }
|
||||
|
||||
public set onerror(handler: (err: Error) => void) {
|
||||
// Note that as far as I can tell, Engine.IO sockets never emit "error"
|
||||
// but instead include error information in the "close" event.
|
||||
this._socket.on('error', handler);
|
||||
this._eventHandlers.push({ event: 'error', handler });
|
||||
}
|
||||
|
||||
public set onclose(handler: () => void) {
|
||||
const wrappedHandler = (reason: string, description: any) => {
|
||||
// In practice, when available, description has more error details,
|
||||
// possibly in the form of an Error object.
|
||||
const maybeErr = description ?? reason;
|
||||
const err = maybeErr instanceof Error ? maybeErr : new Error(maybeErr);
|
||||
for (const cb of this._messageCallbacks.values()) {
|
||||
cb(err);
|
||||
}
|
||||
this._messageCallbacks.clear();
|
||||
|
||||
handler();
|
||||
};
|
||||
this._socket.on('close', wrappedHandler);
|
||||
this._eventHandlers.push({ event: 'close', handler: wrappedHandler });
|
||||
}
|
||||
|
||||
public set onmessage(handler: (data: string) => void) {
|
||||
const wrappedHandler = (msg: Buffer) => {
|
||||
handler(msg.toString());
|
||||
};
|
||||
this._socket.on('message', wrappedHandler);
|
||||
this._eventHandlers.push({ event: 'message', handler: wrappedHandler });
|
||||
}
|
||||
|
||||
public removeAllListeners() {
|
||||
for (const { event, handler } of this._eventHandlers) {
|
||||
this._socket.off(event, handler);
|
||||
}
|
||||
this._eventHandlers = [];
|
||||
}
|
||||
|
||||
public close() {
|
||||
this._socket.close();
|
||||
}
|
||||
|
||||
// Terminates the connection without waiting for the client to close its own side.
|
||||
public terminate() {
|
||||
// Trigger a normal close. For the polling transport, this is sufficient and instantaneous.
|
||||
this._socket.close(/* discard */ true);
|
||||
}
|
||||
|
||||
public get isOpen() {
|
||||
return this._socket.readyState === 'open';
|
||||
}
|
||||
|
||||
public send(data: string, cb?: (err?: Error) => void) {
|
||||
const msgNum = this._messageCounter++;
|
||||
if (cb) {
|
||||
this._messageCallbacks.set(msgNum, cb);
|
||||
}
|
||||
this._socket.send(data, {}, () => {
|
||||
if (cb && this._messageCallbacks.delete(msgNum)) {
|
||||
// send was successful: pass no Error to callback
|
||||
cb();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
export class GristServerSocketWS extends GristServerSocket {
|
||||
private _eventHandlers: Array<{ event: string, handler: (...args: any[]) => void }> = [];
|
||||
|
||||
constructor(private _ws: WS.WebSocket) { super(); }
|
||||
|
||||
public set onerror(handler: (err: Error) => void) {
|
||||
this._ws.on('error', handler);
|
||||
this._eventHandlers.push({ event: 'error', handler });
|
||||
}
|
||||
|
||||
public set onclose(handler: () => void) {
|
||||
this._ws.on('close', handler);
|
||||
this._eventHandlers.push({ event: 'close', handler });
|
||||
}
|
||||
|
||||
public set onmessage(handler: (data: string) => void) {
|
||||
const wrappedHandler = (msg: Buffer) => handler(msg.toString());
|
||||
this._ws.on('message', wrappedHandler);
|
||||
this._eventHandlers.push({ event: 'message', handler: wrappedHandler });
|
||||
}
|
||||
|
||||
public removeAllListeners() {
|
||||
// Avoiding websocket.removeAllListeners() because WS.Server registers listeners
|
||||
// internally for websockets it keeps track of, and we should not accidentally remove those.
|
||||
for (const { event, handler } of this._eventHandlers) {
|
||||
this._ws.off(event, handler);
|
||||
}
|
||||
this._eventHandlers = [];
|
||||
}
|
||||
|
||||
public close() {
|
||||
this._ws.close();
|
||||
}
|
||||
|
||||
public terminate() {
|
||||
this._ws.terminate();
|
||||
}
|
||||
|
||||
public get isOpen() {
|
||||
return this._ws.readyState === WS.OPEN;
|
||||
}
|
||||
|
||||
public send(data: string, cb?: (err?: Error) => void) {
|
||||
this._ws.send(data, cb);
|
||||
}
|
||||
}
|
@ -0,0 +1,111 @@
|
||||
import * as http from 'http';
|
||||
import * as WS from 'ws';
|
||||
import * as EIO from 'engine.io';
|
||||
import {GristServerSocket, GristServerSocketEIO, GristServerSocketWS} from './GristServerSocket';
|
||||
import * as net from 'net';
|
||||
|
||||
const MAX_PAYLOAD = 100e6;
|
||||
|
||||
export interface GristSocketServerOptions {
|
||||
verifyClient?: (request: http.IncomingMessage) => Promise<boolean>;
|
||||
}
|
||||
|
||||
export class GristSocketServer {
|
||||
private _wsServer: WS.Server;
|
||||
private _eioServer: EIO.Server;
|
||||
private _connectionHandler: (socket: GristServerSocket, req: http.IncomingMessage) => void;
|
||||
|
||||
constructor(server: http.Server, private _options?: GristSocketServerOptions) {
|
||||
this._wsServer = new WS.Server({ noServer: true, maxPayload: MAX_PAYLOAD });
|
||||
|
||||
this._eioServer = new EIO.Server({
|
||||
// We only use Engine.IO for its polling transport,
|
||||
// so we disable the built-in Engine.IO upgrade mechanism.
|
||||
allowUpgrades: false,
|
||||
transports: ['polling'],
|
||||
maxHttpBufferSize: MAX_PAYLOAD,
|
||||
cors: {
|
||||
// This will cause Engine.IO to reflect any client-provided Origin into
|
||||
// the Access-Control-Allow-Origin header, essentially disabling the
|
||||
// protection offered by the Same-Origin Policy. This sounds insecure
|
||||
// but is actually the security model of native WebSockets (they are
|
||||
// not covered by SOP; any webpage can open a WebSocket connecting to
|
||||
// any other domain, including the target domain's cookies; it is up to
|
||||
// the receiving server to check the request's Origin header). Since
|
||||
// the connection attempt is validated in `verifyClient` later,
|
||||
// it is safe to let any client attempt a connection here.
|
||||
origin: true,
|
||||
// We need to allow the client to send its cookies. See above for the
|
||||
// reasoning on why it is safe to do so.
|
||||
credentials: true,
|
||||
methods: ["GET", "POST"],
|
||||
},
|
||||
});
|
||||
|
||||
this._eioServer.on('connection', this._onEIOConnection.bind(this));
|
||||
|
||||
this._attach(server);
|
||||
}
|
||||
|
||||
public set onconnection(handler: (socket: GristServerSocket, req: http.IncomingMessage) => void) {
|
||||
this._connectionHandler = handler;
|
||||
}
|
||||
|
||||
public close(cb: (...args: any[]) => void) {
|
||||
this._eioServer.close();
|
||||
|
||||
// Terminate all clients. WS.Server used to do it automatically in close() but no
|
||||
// longer does (see https://github.com/websockets/ws/pull/1904#discussion_r668844565).
|
||||
for (const ws of this._wsServer.clients) {
|
||||
ws.terminate();
|
||||
}
|
||||
this._wsServer.close(cb);
|
||||
}
|
||||
|
||||
private _attach(server: http.Server) {
|
||||
// Forward all WebSocket upgrade requests to WS
|
||||
server.on('upgrade', async (request, socket, head) => {
|
||||
if (this._options?.verifyClient && !await this._options.verifyClient(request)) {
|
||||
// Because we are handling an "upgrade" event, we don't have access to
|
||||
// a "response" object, just the raw socket. We can still construct
|
||||
// a well-formed HTTP error response.
|
||||
socket.write('HTTP/1.1 403 Forbidden\r\n\r\n');
|
||||
socket.destroy();
|
||||
return;
|
||||
}
|
||||
this._wsServer.handleUpgrade(request, socket as net.Socket, head, (client) => {
|
||||
this._connectionHandler?.(new GristServerSocketWS(client), request);
|
||||
});
|
||||
});
|
||||
|
||||
// At this point an Express app is installed as the handler for the server's
|
||||
// "request" event. We need to install our own listener instead, to intercept
|
||||
// requests that are meant for the Engine.IO polling implementation.
|
||||
const listeners = [...server.listeners("request")];
|
||||
server.removeAllListeners("request");
|
||||
server.on("request", async (req, res) => {
|
||||
// Intercept requests that have transport=polling in their querystring
|
||||
if (/[&?]transport=polling(&|$)/.test(req.url ?? '')) {
|
||||
if (this._options?.verifyClient && !await this._options.verifyClient(req)) {
|
||||
res.writeHead(403).end();
|
||||
return;
|
||||
}
|
||||
|
||||
this._eioServer.handleRequest(req, res);
|
||||
} else {
|
||||
// Otherwise fallback to the pre-existing listener(s)
|
||||
for (const listener of listeners) {
|
||||
listener.call(server, req, res);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
server.on("close", this.close.bind(this));
|
||||
}
|
||||
|
||||
private _onEIOConnection(socket: EIO.Socket) {
|
||||
const req = socket.request;
|
||||
(socket as any).request = null; // Free initial request as recommended in the Engine.IO documentation
|
||||
this._connectionHandler?.(new GristServerSocketEIO(socket), req);
|
||||
}
|
||||
}
|
@ -0,0 +1,170 @@
|
||||
import { assert } from 'chai';
|
||||
import * as http from 'http';
|
||||
import { GristClientSocket } from 'app/client/components/GristClientSocket';
|
||||
import { GristSocketServer } from 'app/server/lib/GristSocketServer';
|
||||
import { fromCallback, listenPromise } from 'app/server/lib/serverUtils';
|
||||
import { AddressInfo } from 'net';
|
||||
import httpProxy from 'http-proxy';
|
||||
|
||||
describe(`GristSockets`, function () {
|
||||
|
||||
for (const webSocketsSupported of [true, false]) {
|
||||
describe(`when the networks ${webSocketsSupported ? "supports" : "does not support"} WebSockets`, function () {
|
||||
|
||||
let server: http.Server | null;
|
||||
let serverPort: number;
|
||||
let socketServer: GristSocketServer | null;
|
||||
let proxy: httpProxy | null;
|
||||
let proxyServer: http.Server | null;
|
||||
let proxyPort: number;
|
||||
let wsAddress: string;
|
||||
|
||||
beforeEach(async function () {
|
||||
await startSocketServer();
|
||||
await startProxyServer();
|
||||
});
|
||||
|
||||
afterEach(async function () {
|
||||
await stopProxyServer();
|
||||
await stopSocketServer();
|
||||
});
|
||||
|
||||
async function startSocketServer() {
|
||||
server = http.createServer((req, res) => res.writeHead(404).end());
|
||||
socketServer = new GristSocketServer(server);
|
||||
await listenPromise(server.listen(0, 'localhost'));
|
||||
serverPort = (server.address() as AddressInfo).port;
|
||||
}
|
||||
|
||||
async function stopSocketServer() {
|
||||
await fromCallback(cb => socketServer?.close(cb));
|
||||
await fromCallback(cb => { server?.close(); server?.closeAllConnections(); server?.on("close", cb); });
|
||||
socketServer = server = null;
|
||||
}
|
||||
|
||||
// Start an HTTP proxy that supports WebSockets or not
|
||||
async function startProxyServer() {
|
||||
proxy = httpProxy.createProxy({
|
||||
target: `http://localhost:${serverPort}`,
|
||||
ws: webSocketsSupported,
|
||||
timeout: 1000,
|
||||
});
|
||||
proxy.on('error', () => { });
|
||||
proxyServer = http.createServer();
|
||||
|
||||
if (webSocketsSupported) {
|
||||
// prevent non-WebSocket requests
|
||||
proxyServer.on('request', (req, res) => res.writeHead(404).end());
|
||||
// proxy WebSocket requests
|
||||
proxyServer.on('upgrade', (req, socket, head) => proxy!.ws(req, socket, head));
|
||||
} else {
|
||||
// proxy non-WebSocket requests
|
||||
proxyServer.on('request', (req, res) => proxy!.web(req, res));
|
||||
// don't leave WebSocket connection attempts hanging
|
||||
proxyServer.on('upgrade', (req, socket, head) => socket.destroy());
|
||||
}
|
||||
|
||||
await listenPromise(proxyServer.listen(0, 'localhost'));
|
||||
proxyPort = (proxyServer.address() as AddressInfo).port;
|
||||
wsAddress = `ws://localhost:${proxyPort}`;
|
||||
}
|
||||
|
||||
async function stopProxyServer() {
|
||||
if (proxy) {
|
||||
proxy.close();
|
||||
proxy = null;
|
||||
}
|
||||
if (proxyServer) {
|
||||
const server = proxyServer;
|
||||
await fromCallback(cb => { server.close(cb); server.closeAllConnections(); });
|
||||
}
|
||||
proxyServer = null;
|
||||
}
|
||||
|
||||
function getMessages(ws: GristClientSocket, count: number): Promise<string[]> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const messages: string[] = [];
|
||||
ws.onerror = (err) => {
|
||||
ws.onerror = ws.onmessage = null;
|
||||
reject(err);
|
||||
};
|
||||
ws.onmessage = (data: string) => {
|
||||
messages.push(data);
|
||||
if (messages.length >= count) {
|
||||
ws.onerror = ws.onmessage = null;
|
||||
resolve(messages);
|
||||
}
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a promise for the connected websocket.
|
||||
*/
|
||||
function connectClient(url: string): Promise<GristClientSocket> {
|
||||
const socket = new GristClientSocket(url);
|
||||
return new Promise<GristClientSocket>((resolve, reject) => {
|
||||
socket.onopen = () => {
|
||||
socket.onerror = null;
|
||||
resolve(socket);
|
||||
};
|
||||
socket.onerror = (err) => {
|
||||
socket.onopen = null;
|
||||
reject(err);
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
it("should expose initial request", async function () {
|
||||
const connectionPromise = new Promise<http.IncomingMessage>((resolve) => {
|
||||
socketServer!.onconnection = (socket, req) => {
|
||||
resolve(req);
|
||||
};
|
||||
});
|
||||
const clientWs = new GristClientSocket(wsAddress + "/path?query=value", {
|
||||
headers: { "cookie": "session=1234" }
|
||||
});
|
||||
const req = await connectionPromise;
|
||||
clientWs.close();
|
||||
|
||||
// Engine.IO may append extra query parameters, so we check only the start of the URL
|
||||
assert.match(req.url!, /^\/path\?query=value/);
|
||||
|
||||
assert.equal(req.headers.cookie, "session=1234");
|
||||
});
|
||||
|
||||
it("should receive and send messages", async function () {
|
||||
socketServer!.onconnection = (socket, req) => {
|
||||
socket.onmessage = (data) => {
|
||||
socket.send("hello, " + data);
|
||||
};
|
||||
};
|
||||
const clientWs = await connectClient(wsAddress);
|
||||
clientWs.send("world");
|
||||
assert.deepEqual(await getMessages(clientWs, 1), ["hello, world"]);
|
||||
clientWs.close();
|
||||
});
|
||||
|
||||
it("should invoke send callbacks", async function () {
|
||||
const connectionPromise = new Promise<void>((resolve) => {
|
||||
socketServer!.onconnection = (socket, req) => {
|
||||
socket.send("hello", () => resolve());
|
||||
};
|
||||
});
|
||||
const clientWs = await connectClient(wsAddress);
|
||||
await connectionPromise;
|
||||
clientWs.close();
|
||||
});
|
||||
|
||||
it("should emit close event for client", async function () {
|
||||
const clientWs = await connectClient(wsAddress);
|
||||
const closePromise = new Promise<void>(resolve => {
|
||||
clientWs.onclose = resolve;
|
||||
});
|
||||
clientWs.close();
|
||||
await closePromise;
|
||||
});
|
||||
|
||||
});
|
||||
}
|
||||
});
|
Loading…
Reference in new issue