Expand GristSockets tests, simplify GristClientSocket switching

pull/859/head
Jonathan Perret 2 months ago
parent 1961b2c3ee
commit adcdd5f3f3

@ -1,18 +1,19 @@
import WS from 'ws';
import {Socket as EIOSocket} from 'engine.io-client';
import { Socket as EIOSocket } from 'engine.io-client';
export interface GristClientSocketOptions {
headers?: Record<string, string>;
}
export class GristClientSocket {
// Exactly one of _wsSocket and _eioSocket will be set at any one time.
private _wsSocket: WS.WebSocket | WebSocket | undefined;
private _eioSocket: EIOSocket | undefined;
// Set to true when the connection process is complete, either succesfully or
// after the WebSocket and polling transports have both failed. Events from
// the underlying socket are not forwarded to the client until that point.
private _openDone: boolean = false;
// Set to true if a WebSocket connection (in _wsSocket) was succesfully
// established. Errors from the underlying WebSocket are not forwarded to
// the client until that point, in case we end up downgrading to Engine.IO.
private _wsConnected: boolean = false;
private _messageHandler: null | ((data: string) => void);
private _openHandler: null | (() => void);
@ -65,7 +66,7 @@ export class GristClientSocket {
}
private _createWSSocket() {
if(typeof WebSocket !== 'undefined') {
if (typeof WebSocket !== 'undefined') {
this._wsSocket = new WebSocket(this._url);
} else {
this._wsSocket = new WS(this._url, undefined, this._options);
@ -87,30 +88,34 @@ export class GristClientSocket {
}
private _onWSMessage(event: WS.MessageEvent | MessageEvent<any>) {
if (this._openDone) {
// event.data is guaranteed to be a string here because we only send text frames.
// https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/message_event#event_properties
this._messageHandler?.(event.data);
}
// event.data is guaranteed to be a string here because we only send text frames.
// https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/message_event#event_properties
this._messageHandler?.(event.data);
}
private _onWSOpen() {
// The connection was established successfully. Any future events can now
// be forwarded to the client.
this._openDone = true;
this._wsConnected = true;
this._openHandler?.();
}
private _onWSError(ev: Event) {
// The first connection attempt failed. Trigger an attempt with another transport.
this._destroyWSSocket();
this._createEIOSocket();
if (!this._wsConnected) {
// The WebSocket connection attempt failed. Switch to Engine.IO.
this._destroyWSSocket();
this._createEIOSocket();
return;
}
// WebSocket error events are deliberately void of information,
// see https://websockets.spec.whatwg.org/#eventdef-websocket-error,
// so we ignore the incoming event.
this._errorHandler?.(new Error("WebSocket error"));
}
private _onWSClose() {
if (this._openDone) {
this._closeHandler?.();
}
this._closeHandler?.();
}
private _createEIOSocket() {
@ -137,11 +142,8 @@ export class GristClientSocket {
this._openHandler?.();
}
private _onEIOError(ev: any) {
// We will make no further attempt to connect. Any future events can now
// be forwarded to the client.
this._openDone = true;
this._errorHandler?.(ev);
private _onEIOError(err: string | Error) {
this._errorHandler?.(typeof err === "string" ? new Error(err) : err);
}
private _onEIOClose() {

@ -8,151 +8,163 @@ import httpProxy from 'http-proxy';
describe(`GristSockets`, function () {
beforeEach(async function () {
await startSocketServer();
});
afterEach(async function () {
await stopSocketServer();
});
let server: http.Server | null;
let serverPort: number;
let wsAddress: string;
let socketServer: GristSocketServer | null;
async function startSocketServer() {
server = http.createServer((req, res) => res.writeHead(404).end());
socketServer = new GristSocketServer(server);
await listenPromise(server.listen(0, 'localhost'));
serverPort = (server.address() as AddressInfo).port;
wsAddress = 'ws://localhost:' + serverPort;
}
for (const webSocketsSupported of [true, false]) {
describe(`when the networks ${webSocketsSupported ? "supports" : "does not support"} WebSockets`, function () {
let server: http.Server | null;
let serverPort: number;
let socketServer: GristSocketServer | null;
let proxy: httpProxy | null;
let proxyServer: http.Server | null;
let proxyPort: number;
let wsAddress: string;
beforeEach(async function () {
await startSocketServer();
await startProxyServer();
});
afterEach(async function () {
await stopProxyServer();
await stopSocketServer();
});
async function startSocketServer() {
server = http.createServer((req, res) => res.writeHead(404).end());
socketServer = new GristSocketServer(server);
await listenPromise(server.listen(0, 'localhost'));
serverPort = (server.address() as AddressInfo).port;
}
async function stopSocketServer() {
await fromCallback(cb => socketServer?.close(cb));
await fromCallback(cb => { server?.close(); server?.closeAllConnections(); server?.on("close", cb); });
socketServer = server = null;
}
// Start an HTTP proxy that supports WebSockets or not
async function startProxyServer() {
proxy = httpProxy.createProxy({
target: `http://localhost:${serverPort}`,
ws: webSocketsSupported,
timeout: 1000,
});
proxy.on('error', () => { });
proxyServer = http.createServer();
if (webSocketsSupported) {
// prevent non-WebSocket requests
proxyServer.on('request', (req, res) => res.writeHead(404).end());
// proxy WebSocket requests
proxyServer.on('upgrade', (req, socket, head) => proxy!.ws(req, socket, head));
} else {
// proxy non-WebSocket requests
proxyServer.on('request', (req, res) => proxy!.web(req, res));
// don't leave WebSocket connection attempts hanging
proxyServer.on('upgrade', (req, socket, head) => socket.destroy());
}
async function stopSocketServer() {
//await delay(90_000);
await fromCallback(cb => socketServer?.close(cb));
await fromCallback(cb => { server?.close(); server?.closeAllConnections(); server?.on("close", cb); });
socketServer = server = null;
}
await listenPromise(proxyServer.listen(0, 'localhost'));
proxyPort = (proxyServer.address() as AddressInfo).port;
wsAddress = `ws://localhost:${proxyPort}`;
}
function getMessages(ws: GristClientSocket, count: number): Promise<string[]> {
return new Promise((resolve, reject) => {
const messages: string[] = [];
ws.onerror = (err) => {
ws.onerror = ws.onmessage = null;
reject(err);
};
ws.onmessage = (data: string) => {
messages.push(data);
if (messages.length >= count) {
ws.onerror = ws.onmessage = null;
resolve(messages);
async function stopProxyServer() {
if (proxy) {
proxy.close();
proxy = null;
}
};
});
}
if (proxyServer) {
const server = proxyServer;
await fromCallback(cb => { server.close(cb); server.closeAllConnections(); });
}
proxyServer = null;
}
function getMessages(ws: GristClientSocket, count: number): Promise<string[]> {
return new Promise((resolve, reject) => {
const messages: string[] = [];
ws.onerror = (err) => {
ws.onerror = ws.onmessage = null;
reject(err);
};
ws.onmessage = (data: string) => {
messages.push(data);
if (messages.length >= count) {
ws.onerror = ws.onmessage = null;
resolve(messages);
}
};
});
}
/**
* Returns a promise for the connected websocket.
*/
function connectClient(url: string): Promise<GristClientSocket> {
const socket = new GristClientSocket(url);
return new Promise<GristClientSocket>((resolve, reject) => {
socket.onopen = () => {
socket.onerror = null;
resolve(socket);
};
socket.onerror = (err) => {
socket.onopen = null;
reject(err);
};
});
}
it("should expose initial request", async function () {
const connectionPromise = new Promise<http.IncomingMessage>((resolve) => {
socketServer!.onconnection = (socket, req) => {
resolve(req);
};
});
const clientWs = new GristClientSocket(wsAddress + "/path?query=value", {
headers: { "cookie": "session=1234" }
});
const req = await connectionPromise;
clientWs.close();
// Engine.IO may append extra query parameters, so we check only the start of the URL
assert.match(req.url!, /^\/path\?query=value/);
assert.equal(req.headers.cookie, "session=1234");
});
it("should receive and send messages", async function () {
socketServer!.onconnection = (socket, req) => {
socket.onmessage = (data) => {
socket.send("hello, " + data);
};
};
const clientWs = await connectClient(wsAddress);
clientWs.send("world");
assert.deepEqual(await getMessages(clientWs, 1), ["hello, world"]);
clientWs.close();
});
it("should invoke send callbacks", async function () {
const connectionPromise = new Promise<void>((resolve) => {
socketServer!.onconnection = (socket, req) => {
socket.send("hello", () => resolve());
};
});
const clientWs = await connectClient(wsAddress);
await connectionPromise;
clientWs.close();
});
it("should emit close event for client", async function () {
const clientWs = await connectClient(wsAddress);
const closePromise = new Promise<void>(resolve => {
clientWs.onclose = resolve;
});
clientWs.close();
await closePromise;
});
/**
* Returns a promise for the connected websocket.
*/
function connectClient(url: string): Promise<GristClientSocket> {
const ws = new GristClientSocket(url);
return new Promise<GristClientSocket>((resolve, reject) => {
ws.onopen = () => {
ws.onerror = null;
resolve(ws);
};
ws.onerror = (err) => {
ws.onopen = null;
reject(err);
};
});
}
it("should expose initial request", async function () {
const connectionPromise = new Promise<http.IncomingMessage>((resolve) => {
socketServer!.onconnection = (socket, req) => {
resolve(req);
};
});
new GristClientSocket(wsAddress + "/path?query=value", {
headers: { "cookie": "session=1234" }
});
const req = await connectionPromise;
// Engine.IO may append extra query parameters, so we check only the start of the URL
assert.match(req.url!, /^\/path\?query=value/);
assert.equal(req.headers.cookie, "session=1234");
});
it("should receive and send messages", async function () {
socketServer!.onconnection = (socket, req) => {
socket.onmessage = (data) => {
socket.send("hello, " + data);
socket.close();
};
};
const clientWs = await connectClient(wsAddress);
clientWs.send("world");
assert.deepEqual(await getMessages(clientWs, 1), ["hello, world"]);
});
it("should invoke send callbacks", async function () {
const connectionPromise = new Promise<void>((resolve) => {
socketServer!.onconnection = (socket, req) => {
socket.send("hello", () => resolve());
};
});
await connectClient(wsAddress);
await connectionPromise;
});
let proxy: httpProxy | null;
let proxyServer: http.Server | null;
let proxyPort: number;
// Start an HTTP proxy that does not support WebSockets
async function startProxyServer() {
proxy = httpProxy.createProxy({
target: `http://localhost:${serverPort}`,
ws: false,
timeout: 1000,
});
proxy.on('error', () => { });
proxyServer = http.createServer(proxy.web.bind(proxy));
proxyServer.on('upgrade', (req, socket) => socket.destroy());
await listenPromise(proxyServer.listen(0, 'localhost'));
proxyPort = (proxyServer.address() as AddressInfo).port;
}
async function stopProxyServer() {
proxy?.close();
await fromCallback(cb => { proxyServer?.close(cb); proxyServer?.closeAllConnections(); });
proxyServer = proxy = null;
}
beforeEach(async function () {
await startProxyServer();
});
afterEach(async function () {
await stopProxyServer();
});
describe("GristClientSocket", function () {
it("can fall back to polling", async function () {
socketServer!.onconnection = (socket, req) => {
socket.onmessage = (data) => {
socket.send("hello, " + data);
};
};
const clientWs = await connectClient(`ws://localhost:${proxyPort}`);
clientWs.send("world");
assert.deepEqual(await getMessages(clientWs, 1), ["hello, world"]);
clientWs.close();
});
});
});
Loading…
Cancel
Save