diff --git a/app/client/components/Comm.ts b/app/client/components/Comm.ts index 30bcfa74..fe50123f 100644 --- a/app/client/components/Comm.ts +++ b/app/client/components/Comm.ts @@ -24,7 +24,8 @@ import {GristWSConnection} from 'app/client/components/GristWSConnection'; import * as dispose from 'app/client/lib/dispose'; -import {CommMessage, CommRequest, CommResponse, CommResponseError, ValidEvent} from 'app/common/CommTypes'; +import * as log from 'app/client/lib/log'; +import {CommRequest, CommResponse, CommResponseBase, CommResponseError, ValidEvent} from 'app/common/CommTypes'; import {UserAction} from 'app/common/DocActions'; import {DocListAPI, OpenLocalDocResult} from 'app/common/DocListAPI'; import {GristServerAPI} from 'app/common/GristServerAPI'; @@ -161,7 +162,7 @@ export class Comm extends dispose.Disposable implements GristServerAPI, DocListA public useDocConnection(docId: string): GristWSConnection { const connection = this._connection(docId); connection.useCount += 1; - console.log(`Comm.useDocConnection(${docId}): useCount now ${connection.useCount}`); + log.debug(`Comm.useDocConnection(${docId}): useCount now ${connection.useCount}`); return connection; } @@ -175,7 +176,7 @@ export class Comm extends dispose.Disposable implements GristServerAPI, DocListA const connection = this._connections.get(docId); if (connection) { connection.useCount -= 1; - console.log(`Comm.releaseDocConnection(${docId}): useCount now ${connection.useCount}`); + log.debug(`Comm.releaseDocConnection(${docId}): useCount now ${connection.useCount}`); // Dispose the connection if it is no longer in use (except in "classic grist"). if (!this._singleWorkerMode && connection.useCount <= 0) { this.stopListening(connection); @@ -249,7 +250,7 @@ export class Comm extends dispose.Disposable implements GristServerAPI, DocListA methodName: string, ...args: any[]): Promise { const connection = this._connection(docId); if (clientId !== null && clientId !== connection.clientId) { - console.log("Comm: Rejecting " + methodName + " for outdated clientId %s (current %s)", + log.warn("Comm: Rejecting " + methodName + " for outdated clientId %s (current %s)", clientId, connection.clientId); return Promise.reject(new Error('Comm: outdated session')); } @@ -258,7 +259,7 @@ export class Comm extends dispose.Disposable implements GristServerAPI, DocListA method: methodName, args }; - console.log("Comm request #" + request.reqId + " " + methodName, request.args); + log.debug("Comm request #" + request.reqId + " " + methodName, request.args); return new Promise((resolve, reject) => { const requestMsg = JSON.stringify(request); const sent = connection.send(requestMsg); @@ -304,7 +305,7 @@ export class Comm extends dispose.Disposable implements GristServerAPI, DocListA const error = "GristWSConnection disposed"; for (const [reqId, req] of this.pendingRequests) { if (reqMatchesConnection(req.docId, docId)) { - console.log(`Comm: Rejecting req #${reqId} ${req.methodName}: ${error}`); + log.warn(`Comm: Rejecting req #${reqId} ${req.methodName}: ${error}`); this.pendingRequests.delete(reqId); req.reject(new Error('Comm: ' + error)); } @@ -319,8 +320,7 @@ export class Comm extends dispose.Disposable implements GristServerAPI, DocListA * We should watch timeouts, and log something when there is no response for a while. * There is probably no need for callers to deal with timeouts. */ - private _onServerMessage(docId: string|null, - message: CommResponse | CommResponseError | CommMessage) { + private _onServerMessage(docId: string|null, message: CommResponseBase) { if ('reqId' in message) { const reqId = message.reqId; const r = this.pendingRequests.get(reqId); @@ -331,7 +331,7 @@ export class Comm extends dispose.Disposable implements GristServerAPI, DocListA // We should not let the user see the document any more. Let's reload the // page, reducing this to the problem of arriving at a document the user // doesn't have access to, which is already handled. - console.log(`Comm response #${reqId} ${r.methodName} issued AUTH_NO_VIEW - closing`); + log.warn(`Comm response #${reqId} ${r.methodName} issued AUTH_NO_VIEW - closing`); window.location.reload(); } if (isCommResponseError(message)) { @@ -345,19 +345,19 @@ export class Comm extends dispose.Disposable implements GristServerAPI, DocListA err.details = message.details; } err.shouldFork = message.shouldFork; - console.log(`Comm response #${reqId} ${r.methodName} ERROR:${code} ${message.error}` + log.warn(`Comm response #${reqId} ${r.methodName} ERROR:${code} ${message.error}` + (message.shouldFork ? ` (should fork)` : '')); this._reportError?.(err); r.reject(err); } else { - console.log(`Comm response #${reqId} ${r.methodName} OK`); + log.debug(`Comm response #${reqId} ${r.methodName} OK`); r.resolve(message.data); } } finally { this.pendingRequests.delete(reqId); } } else { - console.log("Comm: Response to unknown reqId " + reqId); + log.warn("Comm: Response to unknown reqId " + reqId); } } else { if (message.type === 'clientConnect') { @@ -372,10 +372,10 @@ export class Comm extends dispose.Disposable implements GristServerAPI, DocListA // Another asynchronous message that's not a response. Broadcast it as an event. if (ValidEvent.guard(message.type)) { - console.log("Comm: Triggering event " + message.type); + log.debug("Comm: Triggering event " + message.type); this.trigger(message.type, message); } else { - console.log("Comm: Server message of unknown type " + message.type); + log.warn("Comm: Server message of unknown type " + message.type); } } } @@ -395,7 +395,7 @@ export class Comm extends dispose.Disposable implements GristServerAPI, DocListA r.sent = connection.send(r.requestMsg); } if (error) { - console.log("Comm: Rejecting req #" + reqId + " " + r.methodName + ": " + error); + log.warn("Comm: Rejecting req #" + reqId + " " + r.methodName + ": " + error); r.reject(new Error('Comm: ' + error)); this.pendingRequests.delete(reqId); } diff --git a/app/client/components/GristWSConnection.ts b/app/client/components/GristWSConnection.ts index 82be5ac8..5410a9d6 100644 --- a/app/client/components/GristWSConnection.ts +++ b/app/client/components/GristWSConnection.ts @@ -1,6 +1,7 @@ import {get as getBrowserGlobals} from 'app/client/lib/browserGlobals'; import {guessTimezone} from 'app/client/lib/guessTimezone'; import {getWorker} from 'app/client/models/gristConfigCache'; +import {CommResponseBase} from 'app/common/CommTypes'; import * as gutil from 'app/common/gutil'; import {addOrgToPath, docUrl, getGristConfig} from 'app/common/urlUtils'; import {UserAPI, UserAPIImpl} from 'app/common/UserAPI'; @@ -121,6 +122,10 @@ export class GristWSConnection extends Disposable { private _wantReconnect: boolean = true; private _ws: WebSocket|null = null; + // The server sends incremental seqId numbers with each message on the connection, starting with + // 0. We keep track of them to allow for seamless reconnects. + private _lastReceivedSeqId: number|null = null; + constructor(private _settings: GristWSSettings = new GristWSSettingsBrowser()) { super(); this._clientCounter = _settings.advanceCounter(); @@ -204,45 +209,14 @@ export class GristWSConnection extends Disposable { * @event serverMessage Triggered when a message arrives from the server. Callbacks receive * the raw message data as an additional argument. */ - public onmessage(ev: any) { - this._log('GristWSConnection: onmessage (%d bytes)', ev.data.length); - this._scheduleHeartbeat(); - const message = JSON.parse(ev.data); - - // clientConnect is the first message from the server that sets the clientId. We only consider - // the connection established once we receive it. - if (message.type === 'clientConnect') { - if (this._established) { - this._log("GristWSConnection skipping duplicate 'clientConnect' message"); - return; - } - this._established = true; - // Add a flag to the message to indicate if the active session changed, and warrants a reload. - message.resetClientId = (message.clientId !== this._clientId && !this._firstConnect); - this._log(`GristWSConnection established: clientId ${message.clientId} counter ${this._clientCounter}` + - ` resetClientId ${message.resetClientId}`); - if (message.dup) { - this._warn("GristWSConnection missed initial 'clientConnect', processing its duplicate"); - } - if (message.clientId !== this._clientId) { - this._clientId = message.clientId; - if (this._settings) { - this._settings.updateClientId(this._assignmentId, message.clientId); - } - } - this._firstConnect = false; - this.trigger('connectState', true); - - // Process any missed messages. (Should only have any if resetClientId is false.) - for (const msg of message.missedMessages) { - this.trigger('serverMessage', JSON.parse(msg)); - } - } - if (!this._established) { - this._log("GristWSConnection not yet established; ignoring message", message); + public onmessage(ev: MessageEvent) { + if (!this._ws) { + // It's possible to receive a message after we disconnect, at least in tests (where + // WebSocket is a node library). Ignoring is easier than unsubscribing properly. return; } - this.trigger('serverMessage', message); + this._scheduleHeartbeat(); + this._processReceivedMessage(ev.data, true); } public send(message: any) { @@ -255,6 +229,70 @@ export class GristWSConnection extends Disposable { return true; } + private _processReceivedMessage(msgData: string, processClientConnect: boolean) { + this._log('GristWSConnection: onmessage (%d bytes)', msgData.length); + const message: CommResponseBase & {seqId: number} = JSON.parse(msgData); + + if (typeof message.seqId === 'number') { + // For sequenced messages (all except clientConnect), check that seqId is as expected, and + // update this._lastReceivedSeqId. + if (this._lastReceivedSeqId !== null && message.seqId !== this._lastReceivedSeqId + 1) { + this._log('GristWSConnection: unexpected seqId after %s: %s', this._lastReceivedSeqId, message.seqId); + this.disconnect(); + return; + } + this._lastReceivedSeqId = message.seqId; + } + + // clientConnect is the first message from the server that sets the clientId. We only consider + // the connection established once we receive it. + let needReload = false; + if ('type' in message && message.type === 'clientConnect' && processClientConnect) { + if (this._established) { + this._log("GristWSConnection skipping duplicate 'clientConnect' message"); + return; + } + this._established = true; + + // Update flag to indicate if the active session changed, and warrants a reload. (The server + // should be setting needReload too, so this shouldn't be strictly needed.) + if (message.clientId !== this._clientId && !this._firstConnect) { + message.needReload = true; + } + needReload = Boolean(message.needReload); + this._log(`GristWSConnection established: clientId ${message.clientId} counter ${this._clientCounter}` + + ` needReload ${needReload}`); + if (message.dup) { + this._warn("GristWSConnection missed initial 'clientConnect', processing its duplicate"); + } + if (message.clientId !== this._clientId) { + this._clientId = message.clientId; + this._settings.updateClientId(this._assignmentId, message.clientId); + } + this._firstConnect = false; + this.trigger('connectState', true); + + // Process any missed messages. (Should only have any if needReload is false.) + if (!needReload && message.missedMessages) { + for (const msg of message.missedMessages) { + this._processReceivedMessage(msg, false); + } + } + } + if (!this._established) { + this._log("GristWSConnection not yet established; ignoring message", message); + return; + } + + if (needReload) { + // If we are unable to resume this connection, disconnect to avoid accept more messages on + // this connection, they are likely to only cause errors. Elsewhere, the app will reload. + this._log('GristWSConnection: needReload'); + this.disconnect(); + } + this.trigger('serverMessage', message); + } + // unschedule any pending heartbeat message private _clearHeartbeat() { if (this._heartbeatTimeout) { @@ -309,18 +347,16 @@ export class GristWSConnection extends Disposable { this._ws.onmessage = this.onmessage.bind(this); - this._ws.onerror = (ev: Event) => { - this._log('GristWSConnection: onerror', ev); + this._ws.onerror = (ev: Event|ErrorEvent) => { + this._log('GristWSConnection: onerror', 'error' in ev ? String(ev.error) : ev); }; this._ws.onclose = () => { - if (this._settings) { - this._log('GristWSConnection: onclose'); - } if (this.isDisposed()) { return; } + this._log('GristWSConnection: onclose'); this._established = false; this._ws = null; this.trigger('connectState', false); @@ -344,6 +380,9 @@ export class GristWSConnection extends Disposable { url.searchParams.append('clientId', this._clientId || '0'); url.searchParams.append('counter', this._clientCounter); url.searchParams.append('newClient', String(isReconnecting ? 0 : 1)); + if (isReconnecting && this._lastReceivedSeqId !== null) { + url.searchParams.append('lastSeqId', String(this._lastReceivedSeqId)); + } url.searchParams.append('browserSettings', JSON.stringify({timezone})); url.searchParams.append('user', this._settings.getUserSelector()); return url.href; @@ -362,29 +401,8 @@ export class GristWSConnection extends Disposable { } } - // Log a message using the configured logger, or send it to console if no - // logger available. - private _log(...args: any[]): void { - if (!this._settings) { - // tslint:disable-next-line:no-console - console.warn('log called without settings in GristWSConnection'); - console.log(...args); // tslint:disable-line:no-console - } else { - this._settings.log(...args); - } - } - - // Log a warning using the configured logger, or send it to console if no - // logger available. - private _warn(...args: any[]): void { - if (!this._settings) { - // tslint:disable-next-line:no-console - console.warn('warn called without settings in GristWSConnection'); - console.warn(...args); // tslint:disable-line:no-console - } else { - this._settings.warn(...args); - } - } + private _log = (...args: any[]) => this._settings.log(...args); + private _warn = (...args: any[]) => this._settings.warn(...args); } Object.assign(GristWSConnection.prototype, BackboneEvents); diff --git a/app/client/ui/App.ts b/app/client/ui/App.ts index b7238ab9..06d9afad 100644 --- a/app/client/ui/App.ts +++ b/app/client/ui/App.ts @@ -134,7 +134,7 @@ export class App extends DisposableWithEvents { }, this, isHelpPaneVisible)); this.listenTo(this.comm, 'clientConnect', (message) => { - console.log(`App clientConnect event: resetClientId ${message.resetClientId} version ${message.serverVersion}`); + console.log(`App clientConnect event: needReload ${message.needReload} version ${message.serverVersion}`); this._settings(message.settings); if (message.serverVersion === 'dead' || (this._serverVersion && this._serverVersion !== message.serverVersion)) { console.log("Upgrading..."); @@ -142,9 +142,9 @@ export class App extends DisposableWithEvents { return this.reload(); } this._serverVersion = message.serverVersion; - // If the clientId changed, then we need to reload any open documents. We'll simply reload the - // active component of the App regardless of what it is. - if (message.resetClientId) { + // Reload any open documents if needed (if clientId changed, or client can't get all missed + // messages). We'll simply reload the active component of the App regardless of what it is. + if (message.needReload) { this.reloadPane(); } }); diff --git a/app/common/CommTypes.ts b/app/common/CommTypes.ts index fc925669..85a30560 100644 --- a/app/common/CommTypes.ts +++ b/app/common/CommTypes.ts @@ -53,6 +53,8 @@ export interface CommMessageBase { export type CommDocMessage = CommDocUserAction | CommDocUsage | CommDocShutdown | CommDocError; export type CommMessage = CommDocMessage | CommDocListAction | CommClientConnect; +export type CommResponseBase = CommResponse | CommResponseError | CommMessage; + export type CommDocEventType = CommDocMessage['type'] /** @@ -130,8 +132,11 @@ export interface CommClientConnect extends CommMessageBase { // the server. clientId: string; + // If set, the reconnecting client cannot be sent all missed messages, and needs to reload. + needReload?: boolean; + // Array of serialized messages missed from the server while disconnected. - missedMessages: string[]; + missedMessages?: string[]; // Which version the server reports for itself. serverVersion?: string; diff --git a/app/server/lib/Client.ts b/app/server/lib/Client.ts index 2ded558c..1ad4b0db 100644 --- a/app/server/lib/Client.ts +++ b/app/server/lib/Client.ts @@ -73,12 +73,12 @@ export class Client { private _session: ScopedSession|null = null; - private _log = new LogMethods('Client ', (s: null) => this.getLogMeta()); + private _log = new LogMethods('Client ', (extra?: object|null) => this.getLogMeta(extra || {})); // Maps docFDs to DocSession objects. private _docFDs: Array = []; - private _missedMessages: string[] = []; + private _missedMessages = new Map(); private _destroyTimer: NodeJS.Timer|null = null; private _destroyed: boolean = false; private _websocket: WebSocket|null; @@ -88,13 +88,15 @@ export class Client { private _userName: string|null = null; private _firstLoginAt: Date|null = null; private _isAnonymous: boolean = false; + private _nextSeqId: number = 0; // Next sequence-ID for messages sent to the client + + // Identifier for the current GristWSConnection object connected to this client. + private _counter: string|null = null; constructor( private _comm: Comm, private _methods: Map, private _locale: string, - // Identifier for the current GristWSConnection object connected to this client. - private _counter: string|null, ) { this.clientId = generateClientId(); } @@ -105,13 +107,14 @@ export class Client { return this._locale; } - public setConnection(websocket: WebSocket, browserSettings: BrowserSettings) { + public setConnection(websocket: WebSocket, counter: string|null, browserSettings: BrowserSettings) { this._websocket = websocket; + this._counter = counter; this.browserSettings = browserSettings; - websocket.on('error', this._onError.bind(this)); - websocket.on('close', this._onClose.bind(this)); - websocket.on('message', this._onMessage.bind(this)); + websocket.on('error', (err) => this._onError(err)); + websocket.on('close', () => this._onClose()); + websocket.on('message', (msg: string) => this._onMessage(msg)); } /** @@ -138,25 +141,10 @@ export class Client { this._docFDs[fd] = null; } - // Check that client still has access to all documents. Used to determine whether - // a Comm client can be safely reused after a reconnect. Without this check, the client - // would be reused even if access to a document has been lost (although an error would be - // issued later, on first use of the document). - public async isAuthorized(): Promise { - for (const docFD of this._docFDs) { - try { - if (docFD !== null) { await docFD.authorizer.assertAccess('viewers'); } - } catch (e) { - return false; - } - } - return true; - } - /** - * Closes all docs. + * Closes all docs. Returns the number of documents closed. */ - public closeAllDocs() { + public closeAllDocs(): number { let count = 0; for (let fd = 0; fd < this._docFDs.length; fd++) { const docSession = this._docFDs[fd]; @@ -168,7 +156,7 @@ export class Client { } this._docFDs[fd] = null; } - this._log.debug(null, "closeAllDocs() closed %d doc(s)", count); + return count; } public interruptConnection() { @@ -187,7 +175,8 @@ export class Client { return; } - const message: string = JSON.stringify(messageObj); + const seqId = this._nextSeqId++; + const message: string = JSON.stringify({...messageObj, seqId}); // Log something useful about the message being sent. if ('error' in messageObj && messageObj.error) { @@ -199,91 +188,125 @@ export class Client { try { await this._sendToWebsocket(message); } catch (err) { - // Sending failed. Presumably we should be getting onClose around now too. - // NOTE: if this handler is run after onClose, we could have messages end up out of order. - // Let's check to make sure. If this can happen, we need to refactor for correct ordering. - if (!this._websocket) { - this._log.error(null, "sendMessage: UNEXPECTED ORDER OF CALLBACKS"); - } - this._log.warn(null, "sendMessage: queuing after send error: %s", err.toString()); - this._missedMessages.push(message); + // Sending failed. Add the message to missedMessages. + this._log.warn(null, "sendMessage: queuing after send error:", err.toString()); + this._missedMessages.set(seqId, message); + + // NOTE: A successful send does NOT mean the message was received. For a better system, see + // https://docs.microsoft.com/en-us/azure/azure-web-pubsub/howto-develop-reliable-clients + // (keeping a copy of messages until acked). With our system, we are more likely to be + // lacking the needed messages on reconnect, and having to reset the client. } - } else if (this._missedMessages.length < clientMaxMissedMessages) { + } else if (this._missedMessages.size < clientMaxMissedMessages) { // Queue up the message. - this._missedMessages.push(message); + this._missedMessages.set(seqId, message); } else { // Too many messages queued. Boot the client now, to make it reset when/if it reconnects. - this._log.error(null, "sendMessage: too many messages queued; booting client"); - if (this._destroyTimer) { - clearTimeout(this._destroyTimer); - this._destroyTimer = null; - } + this._log.warn(null, "sendMessage: too many messages queued; booting client"); this.destroy(); } } /** - * Called from Comm.ts to prepare this Client object to accept a new connection that requests - * the same clientId. Returns whether this Client is available and ready for this connection. + * Called from Comm.ts to decide whether this Client is available to accept a new connection + * that requests the same clientId. */ - public async reconnect(counter: string|null, newClient: boolean): Promise { - // Refuse reconnect if another websocket is currently active. It may be a new browser tab, - // and will need its own Client object. - if (this._websocket) { return false; } - - // Don't reuse this Client object if it's no longer authorized to access the open documents. - if (!await this.isAuthorized()) { return false; } - - this._counter = counter; - - this._log.info(null, "existing client reconnected (%d missed messages)", this._missedMessages.length); - if (this._destroyTimer) { - this._log.warn(null, "clearing scheduled destruction"); - clearTimeout(this._destroyTimer); - this._destroyTimer = null; - } - if (newClient) { - // If newClient is set, then we assume that the browser client lost its state (e.g. - // reloaded the page), so we treat it as a disconnect followed by a new connection to the - // same state. At the moment, this only means that we close all docs. - if (this._missedMessages.length) { - this._log.warn(null, "clearing missed messages for new client"); - } - this._missedMessages.length = 0; - this.closeAllDocs(); - } - return true; + public canAcceptConnection(): boolean { + // Refuse reconnect if another websocket is currently active. It may be a new browser tab + // (which may reuse clientId from a copy of sessinStorage). It will need its own Client object. + return !this._websocket; } /** - * Send the initial 'clientConnect' message after receiving a connection. + * Complete initialization of a new connection, and send the initial 'clientConnect' message. + * See comments at the top of app/server/lib/Comm.ts for some relevant notes. */ - public async sendConnectMessage(parts: Partial): Promise { - this._log.debug(null, `sending clientConnect with ${this._missedMessages.length} missed messages`); + public async sendConnectMessage( + newClient: boolean, reuseClient: boolean, lastSeqId: number|null, parts: Partial + ): Promise { + if (this._destroyTimer) { + clearTimeout(this._destroyTimer); + this._destroyTimer = null; + } + + let missedMessages: string[]|undefined = undefined; + let seamlessReconnect = false; + if (!newClient && reuseClient && await this._isAuthorized()) { + // Websocket-level reconnect: existing browser tab reconnected to an existing Client object. + // We also check that the Client is still authorized to access all open docs. If not, we'll + // close the docs and tell the Client to reload the app. + missedMessages = this.getMissedMessages(lastSeqId); + if (missedMessages) { + // We have all the needed messages (possibly an empty array); can do a seamless reconnect. + seamlessReconnect = true; + } + } + + // We collected any missed messages we need; clear the stored map of them. + this._missedMessages.clear(); + + let docsClosed: number|null = null; + if (!seamlessReconnect) { + // The browser client can't recover from missed messages and will need to reopen docs. Close + // all docs we kept open. If it's a new Client object, this is a no-op. + docsClosed = this.closeAllDocs(); + } + + // An existing browser client that can't recover, or that connected to a new Client object, + // will need to reopen docs. Tell it to reload. + const needReload = !newClient && !seamlessReconnect; + + this._log.debug({newClient, needReload, docsClosed, missedMessages: missedMessages?.length}, + 'sending clientConnect'); + // Don't use sendMessage here, since we don't want to queue up this message on failure. const clientConnectMsg: CommClientConnect = { ...parts, type: 'clientConnect', clientId: this.clientId, - missedMessages: this._missedMessages.slice(0), profile: this._profile, + missedMessages, + needReload, }; - // If reconnecting a client with missed messages, clear them now. - this._missedMessages.length = 0; - await this._sendToWebsocket(JSON.stringify(clientConnectMsg)); - // A heavy-handed fix to T396, since 'clientConnect' is sometimes not seen in the browser, - // (seemingly when the 'message' event is triggered before 'open' on the native WebSocket.) - // See also my report at https://stackoverflow.com/a/48411315/328565 - await delay(250); + try { + await this._sendToWebsocket(JSON.stringify(clientConnectMsg)); - if (this._destroyed || this._websocket?.readyState !== WebSocket.OPEN) { - this._log.debug(null, `websocket closed right after clientConnect`); - } else { - await this._sendToWebsocket(JSON.stringify({...clientConnectMsg, dup: true})); + if (needReload) { + // If the client should reload, close the socket without waiting. This connection should + // not be used anyway, and we want it released by the time the new connection comes in. + this._websocket?.close(); + return; + } + + // A heavy-handed fix to T396, since 'clientConnect' is sometimes not seen in the browser, + // (seemingly when the 'message' event is triggered before 'open' on the native WebSocket.) + // See also my report at https://stackoverflow.com/a/48411315/328565 + await delay(250); + + if (!this._destroyed && this._websocket?.readyState === WebSocket.OPEN) { + await this._sendToWebsocket(JSON.stringify({...clientConnectMsg, dup: true})); + } + } catch (err) { + // It's possible that the connection was closed while we were preparing this response. + // We just warn, and let _onClose() take care of cleanup. + this._log.warn(null, "failed to prepare or send clientConnect:", err.toString()); } } + // Get messages in order of their key in the _missedMessages map. + public getMissedMessages(lastSeqId: number|null): string[]|undefined { + const result: string[] = []; + if (lastSeqId !== null) { + for (let i = lastSeqId + 1; i < this._nextSeqId; i++) { + const m = this._missedMessages.get(i); + if (m === undefined) { return; } + result.push(m); + } + } + return result; + } + // Assigns the given ScopedSession to the client. public setSession(session: ScopedSession): void { this._session = session; @@ -302,11 +325,13 @@ export class Client { * object and clientId. */ public destroy() { - this._log.info(null, "client gone"); - this.closeAllDocs(); + const docsClosed = this.closeAllDocs(); + this._log.info({docsClosed}, "client gone"); if (this._destroyTimer) { clearTimeout(this._destroyTimer); + this._destroyTimer = null; } + this._missedMessages.clear(); this._comm.removeClient(this); this._destroyed = true; } @@ -379,8 +404,7 @@ export class Client { throw new ApiError(this._profile ? `user not known: ${this._profile.email}` : 'user not set', 403); } - public getLogMeta() { - const meta: {[key: string]: any} = {}; + public getLogMeta(meta: {[key: string]: any} = {}) { if (this._profile) { meta.email = this._profile.email; } // We assume the _userId has already been cached, which will be true always (for all practical // purposes) because it's set when the Authorizer checks this client. @@ -455,6 +479,21 @@ export class Client { undefined; } + // Check that client still has access to all documents. Used to determine whether + // a Comm client can be safely reused after a reconnect. Without this check, the client + // would be reused even if access to a document has been lost (although an error would be + // issued later, on first use of the document). + private async _isAuthorized(): Promise { + for (const docFD of this._docFDs) { + try { + if (docFD !== null) { await docFD.authorizer.assertAccess('viewers'); } + } catch (e) { + return false; + } + } + return true; + } + // Returns the next unused docFD number. private _getNextDocFD(): number { let fd = 0; @@ -478,7 +517,6 @@ export class Client { * Processes the closing of a websocket. */ private _onClose() { - this._log.info(null, "onClose"); this._websocket?.removeAllListeners(); // Remove all references to the websocket. @@ -490,7 +528,7 @@ export class Client { this._log.warn(null, "clearing previously scheduled destruction"); clearTimeout(this._destroyTimer); } - this._log.warn(null, "will discard client in %s sec", Deps.clientRemovalTimeoutMs / 1000); + this._log.info(null, "websocket closed; will discard client in %s sec", Deps.clientRemovalTimeoutMs / 1000); this._destroyTimer = setTimeout(() => this.destroy(), Deps.clientRemovalTimeoutMs); } } diff --git a/app/server/lib/Comm.ts b/app/server/lib/Comm.ts index 0fc5896f..a00f499d 100644 --- a/app/server/lib/Comm.ts +++ b/app/server/lib/Comm.ts @@ -14,6 +14,22 @@ * browser window, and should persist across brief disconnects. A Client has a 'clientId' * property, which uniquely identifies a client within the currently running server. Method * registered with Comm always receive a Client object as the first argument. + * + * NOTES: + * + * The communication setup involves primarily the modules app/server/lib/{Comm,Client}.ts, and + * app/client/components/{Comm,GristWSConnection}.ts. In particular, these implement reconnect + * logic, which is particularly confusing as done here because it combines two layers: + * + * - Websocket-level reconnects, where an existing browser tab may reconnect and attempt to + * restore state seamlessly by recovering any missed messages. + * + * - Application-level reconnects, where even in case of a failed websocket-level reconnect (e.g. + * a reloaded browser tab, or existing tab that can't recover missed messages), the tab may + * connect to existing state. This matters for undo/redo history (to allow a user to undo after + * reloading a browser tab), but the only thing this relies on is preserving the clientId. + * + * In other words, there is an opportunity for untangling and simplifying. */ import {EventEmitter} from 'events'; @@ -185,7 +201,6 @@ export class Comm extends EventEmitter { * Processes a new websocket connection, and associates the websocket and a Client object. */ private async _onWebSocketConnection(websocket: WebSocket, req: http.IncomingMessage) { - log.info("Comm: Got WebSocket connection: %s", req.url); if (this._options.hosts) { // DocWorker ID (/dw/) and version tag (/v/) may be present in this request but are not // needed. addOrgInfo assumes req.url starts with /o/ if present. @@ -197,37 +212,37 @@ export class Comm extends EventEmitter { // Parse the cookie in the request to get the sessionId. const sessionId = this.sessions.getSessionIdFromRequest(req); - const params = new URL(req.url!, `http://${req.headers.host}`).searchParams; + const params = new URL(req.url!, `ws://${req.headers.host}`).searchParams; const existingClientId = params.get('clientId'); const browserSettings = safeJsonParse(params.get('browserSettings') || '', {}); - const newClient = (params.get('newClient') === '1'); + const newClient = (params.get('newClient') !== '0'); // Treat omitted as new, for the sake of tests. + const lastSeqIdStr = params.get('lastSeqId'); + const lastSeqId = lastSeqIdStr ? parseInt(lastSeqIdStr) : null; const counter = params.get('counter'); const userSelector = params.get('user') || ''; + const scopedSession = this.getOrCreateSession(sessionId!, req as RequestWithOrg, userSelector); + const profile = await this._getSessionProfile(scopedSession, req); + // Associate an ID with each websocket, reusing the supplied one if it's valid. let client: Client|undefined = this._clients.get(existingClientId!); - if (!client || !await client.reconnect(counter, newClient)) { - client = new Client(this, this._methods, localeFromRequest(req), counter); + let reuseClient = true; + if (!client?.canAcceptConnection()) { + reuseClient = false; + client = new Client(this, this._methods, localeFromRequest(req)); this._clients.set(client.clientId, client); } - // Add a Session object to the client. - log.info(`Comm ${client}: using session ${sessionId}`); - const scopedSession = this.getOrCreateSession(sessionId!, req as RequestWithOrg, userSelector); - client.setSession(scopedSession); - // Associate the client with this websocket. - client.setConnection(websocket, browserSettings); + log.rawInfo('Comm: Got Websocket connection', {...client.getLogMeta(), urlPath: req.url, reuseClient}); - const profile = await this._getSessionProfile(scopedSession, req); + client.setSession(scopedSession); // Add a Session object to the client. client.setOrg((req as RequestWithOrg).org || ""); client.setProfile(profile); + client.setConnection(websocket, counter, browserSettings); - client.sendConnectMessage({ + await client.sendConnectMessage(newClient, reuseClient, lastSeqId, { serverVersion: this._serverVersion || version.gitcommit, settings: this._options.settings, - }) - .catch(err => { - log.error(`Comm ${client}: failed to prepare or send clientConnect:`, err); }); } diff --git a/app/server/lib/serverUtils.ts b/app/server/lib/serverUtils.ts index 7f5742ca..a67bb127 100644 --- a/app/server/lib/serverUtils.ts +++ b/app/server/lib/serverUtils.ts @@ -31,7 +31,7 @@ export function fromCallback(nodeFunc: NodeCallbackFunc): Promise { * @param {Number} optCount: Number of ports to check, defaults to 200. * @returns Promise: Promise for an available port. */ -export function getAvailablePort(firstPort: number = 8000, optCount: number = 200) { +export function getAvailablePort(firstPort: number = 8000, optCount: number = 200): Promise { const lastPort = firstPort + optCount - 1; function checkNext(port: number): Promise { if (port > lastPort) { diff --git a/test/server/Comm.ts b/test/server/Comm.ts new file mode 100644 index 00000000..651a8630 --- /dev/null +++ b/test/server/Comm.ts @@ -0,0 +1,547 @@ +import {Events as BackboneEvents} from 'backbone'; +import {promisifyAll} from 'bluebird'; +import {assert} from 'chai'; +import * as http from 'http'; +import {AddressInfo, Server, Socket} from 'net'; +import * as sinon from 'sinon'; +import * as WebSocket from 'ws'; +import * as path from 'path'; +import * as tmp from 'tmp'; + +import {GristWSConnection, GristWSSettings} from 'app/client/components/GristWSConnection'; +import {Comm as ClientComm} from 'app/client/components/Comm'; +import * as log from 'app/client/lib/log'; +import {Comm, sendDocMessage} from 'app/server/lib/Comm'; +import {Client, ClientMethod} from 'app/server/lib/Client'; +import {CommClientConnect} from 'app/common/CommTypes'; +import {delay} from 'app/common/delay'; +import {isLongerThan} from 'app/common/gutil'; +import {fromCallback, getAvailablePort} from 'app/server/lib/serverUtils'; +import {Sessions} from 'app/server/lib/Sessions'; +import * as testUtils from 'test/server/testUtils'; +import * as session from '@gristlabs/express-session'; + +const SQLiteStore = require('@gristlabs/connect-sqlite3')(session); +promisifyAll(SQLiteStore.prototype); + +describe('Comm', function() { + + testUtils.setTmpLogLevel(process.env.VERBOSE ? 'debug' : 'warn'); + + // Allow test cases to register afterEach callbacks here for easier cleanup. + const cleanup: Array<() => void> = []; + + let server: http.Server; + let sessions: Sessions; + let comm: Comm|null = null; + const sandbox = sinon.createSandbox(); + + before(async function() { + const sessionDB = tmp.fileSync(); + const sessionStore = new SQLiteStore({ + dir: path.dirname(sessionDB.name), + db: path.basename(sessionDB.name), + table: 'sessions' + }); + // Random string to use for the test session secret. + const sessionSecret = 'xkwriagasaqystubgkkbwhqtyyncwqjemyncnmetjpkiwtfzvllejpfneldmoyri'; + sessions = new Sessions(sessionSecret, sessionStore); + }); + + function startComm(methods: {[name: string]: ClientMethod}) { + server = http.createServer(); + comm = new Comm(server, {sessions}); + comm.registerMethods(methods); + return fromCallback(cb => server.listen(0, 'localhost', cb)); + } + + async function stopComm() { + comm?.destroyAllClients(); + return fromCallback(cb => server.close(cb)); + } + + const assortedMethods: {[name: string]: ClientMethod} = { + methodSync: async function(client, x, y) { + return {x: x, y: y, name: "methodSync"}; + }, + methodError: async function(client, x, y) { + throw new Error("fake error"); + }, + methodAsync: async function(client, x, y) { + await delay(20); + return {x: x, y: y, name: "methodAsync"}; + }, + methodSend: async function(client, docFD) { + sendDocMessage(client, docFD, "fooType" as any, "foo"); + sendDocMessage(client, docFD, "barType" as any, "bar"); + } + }; + + beforeEach(function() { + // Silence console messages from client-side Comm.ts. + if (!process.env.VERBOSE) { + sandbox.stub(log, 'debug'); + } + }); + + afterEach(async function() { + // Run the cleanup callbacks registered in cleanup(). + await Promise.all(cleanup.splice(0).map(callback => callback())); + + sandbox.restore(); + }); + + function getMessages(ws: WebSocket, count: number): Promise { + return new Promise((resolve, reject) => { + const messages: object[] = []; + ws.on('error', reject); + ws.on('message', (msg: string) => { + messages.push(JSON.parse(msg)); + if (messages.length >= count) { + resolve(messages); + ws.removeListener('error', reject); + ws.removeAllListeners('message'); + } + }); + }); + } + + /** + * Returns a promise for the connected websocket. + */ + function connect() { + const ws = new WebSocket('ws://localhost:' + (server.address() as AddressInfo).port); + return new Promise((resolve, reject) => { + ws.on('open', () => resolve(ws)); + ws.on('error', reject); + }); + } + + describe("server methods", function() { + let ws: WebSocket; + beforeEach(async function() { + await startComm(assortedMethods); + ws = await connect(); + await getMessages(ws, 1); // consume a clientConnect message + }); + + afterEach(async function() { + await stopComm(); + }); + + it("should return data for valid calls", async function() { + ws.send(JSON.stringify({reqId: 10, method: "methodSync", args: ["hello", "world"]})); + const messages = await getMessages(ws, 1); + const resp = messages[0]; + assert.equal(resp.reqId, 10, `Messages received instead: ${JSON.stringify(messages)}`); + assert.deepEqual(resp.data, {x: "hello", y: "world", name: "methodSync"}); + }); + + it("should work for async calls", async function() { + ws.send(JSON.stringify({reqId: 20, method: "methodAsync", args: ["hello", "world"]})); + const messages = await getMessages(ws, 1); + const resp = messages[0]; + assert.equal(resp.reqId, 20); + assert.deepEqual(resp.data, {x: "hello", y: "world", name: "methodAsync"}); + }); + + it("should work for out-of-order calls", async function() { + ws.send(JSON.stringify({reqId: 30, method: "methodAsync", args: [1, 2]})); + ws.send(JSON.stringify({reqId: 31, method: "methodSync", args: [3, 4]})); + const messages = await getMessages(ws, 2); + assert.equal(messages[0].reqId, 31); + assert.deepEqual(messages[0].data, {x: 3, y: 4, name: "methodSync"}); + assert.equal(messages[1].reqId, 30); + assert.deepEqual(messages[1].data, {x: 1, y: 2, name: "methodAsync"}); + }); + + it("should return error when a call fails", async function() { + const logMessages = await testUtils.captureLog('warn', async () => { + ws.send(JSON.stringify({reqId: 40, method: "methodError", args: ["hello"]})); + const messages = await getMessages(ws, 1); + const resp = messages[0]; + assert.equal(resp.reqId, 40); + assert.equal(resp.data, undefined); + assert(resp.error.indexOf('fake error') >= 0); + }); + testUtils.assertMatchArray(logMessages, [ + /^warn: Client.* Error: fake error[^]+at methodError/, + /^warn: Client.* responding to .* ERROR fake error/, + ]); + }); + + it("should return error for unknown methods", async function() { + const logMessages = await testUtils.captureLog('warn', async () => { + ws.send(JSON.stringify({reqId: 50, method: "someUnknownMethod", args: []})); + const messages = await getMessages(ws, 1); + const resp = messages[0]; + assert.equal(resp.reqId, 50); + assert.equal(resp.data, undefined); + assert(resp.error.indexOf('Unknown method') >= 0); + }); + testUtils.assertMatchArray(logMessages, [ + /^warn: Client.* Unknown method.*someUnknownMethod/ + ]); + }); + + it("should support app-level events correctly", async function() { + comm!.broadcastMessage('fooType' as any, 'hello'); + comm!.broadcastMessage('barType' as any, 'world'); + const messages = await getMessages(ws, 2); + assert.equal(messages[0].type, 'fooType'); + assert.equal(messages[0].data, 'hello'); + assert.equal(messages[1].type, 'barType'); + assert.equal(messages[1].data, 'world'); + }); + + it("should support doc-level events", async function() { + ws.send(JSON.stringify({reqId: 60, method: "methodSend", args: [13]})); + const messages = await getMessages(ws, 3); + assert.equal(messages[0].type, 'fooType'); + assert.equal(messages[0].data, 'foo'); + assert.equal(messages[0].docFD, 13); + assert.equal(messages[1].type, 'barType'); + assert.equal(messages[1].data, 'bar'); + assert.equal(messages[1].docFD, 13); + assert.equal(messages[2].reqId, 60); + assert.equal(messages[2].data, undefined); + assert.equal(messages[2].error, undefined); + }); + }); + + describe("reconnects", function() { + const docId = "docId_abc"; + this.timeout(10000); + + // Helper to set up a Comm server, a Comm client, and a forwarder between them that allows + // simulating disconnects. + async function startManagedConnection(methods: {[name: string]: ClientMethod}) { + // Start the server Comm, providing a few methods. + await startComm(methods); + cleanup.push(() => stopComm()); + + // Create a forwarder, which we use to test disconnects. + const serverPort = (server.address() as AddressInfo).port; + const forwarder = new TcpForwarder(serverPort); + const forwarderPort = await forwarder.pickForwarderPort(); + await forwarder.connect(); + cleanup.push(() => forwarder.disconnect()); + + // To create a client-side Comm object, we need to trick GristWSConnection's check for + // whether there is a worker to connect to. + (global as any).window = undefined; + sandbox.stub(global as any, 'window').value({gristConfig: {getWorker: 'STUB', assignmentId: docId}}); + + // We also need to get GristWSConnection to use a custom GristWSSettings object, and to + // connect to the forwarder's port. + const docWorkerUrl = `http://localhost:${forwarderPort}`; + const settings = getWSSettings(docWorkerUrl); + const stubGristWsCreate = sandbox.stub(GristWSConnection, 'create').callsFake(function(this: any, owner) { + return (stubGristWsCreate as any).wrappedMethod.call(this, owner, settings); + }); + + // Cast with BackboneEvents to allow using cliComm.on(). + const cliComm = ClientComm.create() as ClientComm & BackboneEvents; + cliComm.useDocConnection(docId); + cleanup.push(() => cliComm.dispose()); // Dispose after this test ends. + + return {cliComm, forwarder}; + } + + it('should forward calls on a normal connection', async function() { + const {cliComm} = await startManagedConnection(assortedMethods); + + // A couple of regular requests. + const resp1 = await cliComm._makeRequest(null, null, "methodSync", "foo", 1); + assert.deepEqual(resp1, {name: 'methodSync', x: "foo", y: 1}); + const resp2 = await cliComm._makeRequest(null, null, "methodAsync", "foo", 2); + assert.deepEqual(resp2, {name: 'methodAsync', x: "foo", y: 2}); + + // Try calls that return out of order. + const [resp3, resp4] = await Promise.all([ + cliComm._makeRequest(null, null, "methodAsync", "foo", 3), + cliComm._makeRequest(null, null, "methodSync", "foo", 4), + ]); + assert.deepEqual(resp3, {name: 'methodAsync', x: "foo", y: 3}); + assert.deepEqual(resp4, {name: 'methodSync', x: "foo", y: 4}); + }); + + it('should forward missed responses when a server send fails', async function() { + await testMissedResponses(true); + }); + it('should forward missed responses when a server send is queued', async function() { + await testMissedResponses(false); + }); + + async function testMissedResponses(sendShouldFail: boolean) { + const logMessages = await testUtils.captureLog('debug', async () => { + const {cliComm, forwarder} = await startManagedConnection({...assortedMethods, + // An extra method that simulates a lost connection on server side prior to response. + testDisconnect: async function(client, x, y) { + await delay(0); // disconnect on the same tick. + await forwarder.disconnectServerSide(); + if (!sendShouldFail) { + // Add a delay to let the 'close' event get noticed first. + await delay(20); + } + return {x: x, y: y, name: "testDisconnect"}; + }, + }); + + const resp1 = await cliComm._makeRequest(null, null, "methodSync", "foo", 1); + assert.deepEqual(resp1, {name: 'methodSync', x: "foo", y: 1}); + + // Make more calls, with a disconnect before they return. The server should queue up responses. + const resp2Promise = cliComm._makeRequest(null, null, "testDisconnect", "foo", 2); + const resp3Promise = cliComm._makeRequest(null, null, "methodAsync", "foo", 3); + assert.equal(await isLongerThan(resp2Promise, 250), true); + + // Once we reconnect, the response should arrive. + await forwarder.connect(); + assert.deepEqual(await resp2Promise, {name: 'testDisconnect', x: "foo", y: 2}); + assert.deepEqual(await resp3Promise, {name: 'methodAsync', x: "foo", y: 3}); + }); + + // Check that we saw the situations we were hoping to test. + assert.equal(logMessages.some(m => /^warn: .*send error.*WebSocket is not open/.test(m)), sendShouldFail, + `Expected to see a failed send:\n${logMessages.join('\n')}`); + } + + it("should receive all server messages in order when send doesn't fail", async function() { + await testSendOrdering({noFailedSend: true}); + }); + + it("should order server messages correctly with failedSend before close", async function() { + await testSendOrdering({closeHappensFirst: false}); + }); + + it("should order server messages correctly with close before failedSend", async function() { + await testSendOrdering({closeHappensFirst: true}); + }); + + async function testSendOrdering(options: {noFailedSend?: boolean, closeHappensFirst?: boolean}) { + const eventsSeen: Array<'failedSend'|'close'> = []; + + // Server-side Client object. + let ssClient!: Client; + + const {cliComm, forwarder} = await startManagedConnection({ + ...assortedMethods, + // A method just to help us get a handle on the server-side Client object. + setClient: async (client) => { ssClient = client; }, + }); + + // Intercept the call to _onClose to know when it occurs, since we are trying to hit a + // situation where 'close' and 'failedSend' events happen in either order. + const stubOnClose = sandbox.stub(Client.prototype as any, '_onClose') + .callsFake(function(this: Client) { + eventsSeen.push('close'); + return (stubOnClose as any).wrappedMethod.apply(this, arguments); + }); + + // Intercept calls to client.sendMessage(), to know when it fails, and possibly to delay the + // failures to hit a particular order in which 'close' and 'failedSend' events are seen by + // Client.ts. This is the only reliable way I found to reproduce this order of events. + const stubSendToWebsocket = sandbox.stub(Client.prototype as any, '_sendToWebsocket') + .callsFake(async function(this: Client) { + try { + return await (stubSendToWebsocket as any).wrappedMethod.apply(this, arguments); + } catch (err) { + if (options.closeHappensFirst) { await delay(100); } + eventsSeen.push('failedSend'); + throw err; + } + }); + + // Watch the events received all the way on the client side. + const eventSpy = sinon.spy(); + const clientConnectSpy = sinon.spy(); + cliComm.on('docUserAction', eventSpy); + cliComm.on('clientConnect', clientConnectSpy); + + // We need to simulate an important property of the browser client: when needReload is set + // in the clientConnect message, we are expected to reload the app. In the test, we replace + // the GristWSConnection. + cliComm.on('clientConnect', async (msg: CommClientConnect) => { + if (msg.needReload) { + await delay(0); + cliComm.releaseDocConnection(docId); + cliComm.useDocConnection(docId); + } + }); + + // Make the first event that gives us access to the Client object (ssClient). + await cliComm._makeRequest(null, null, "setClient", "foo", 1); + + // Send large buffers, to fill up the socket's buffers to get it to block. + const data = "x".repeat(1.0e7); + const makeMessage = (n: number) => ({type: 'docUserAction', n, data}); + + let n = 0; + const sendPromises: Array> = []; + const sendNextMessage= () => sendPromises.push(ssClient.sendMessage(makeMessage(n++) as any)); + + await testUtils.captureLog('warn', async () => { + // Make a few sends. These are big enough not to return immediately. Keep the first two + // successful (by awaiting them). And keep a few more that will fail. This is to test the + // ordering of successful and failed messages that may be missed. + sendNextMessage(); + sendNextMessage(); + sendNextMessage(); + await sendPromises[0]; + await sendPromises[1]; + + sendNextMessage(); + sendNextMessage(); + + // Forcibly close the forwarder, so that the server sees a 'close' event. But first let + // some messages get to the client. In case we want all sends to succeed, let them all get + // forwarded before disconnect; otherwise, disconnect after 2 are fowarded. + const countToWaitFor = options.noFailedSend ? 5 : 2; + await waitForCondition(() => eventSpy.callCount >= countToWaitFor); + + void(forwarder.disconnectServerSide()); + + // Wait less than the delay that we add for delayFailedSend, and send another message. There + // used to be a bug that such a message would get recorded into missedMessages out of order. + await delay(50); + sendNextMessage(); + + // Now reconnect, and collect the messages that the client sees. + clientConnectSpy.resetHistory(); + await forwarder.connect(); + + // Wait until we get a clientConnect message that does not require a reload. (Except with + // noFailedSend, the first one would have needReload set; and after the reconnect, we should + // get one without.) + await waitForCondition(() => + (clientConnectSpy.callCount > 0 && clientConnectSpy.lastCall.args[0].needReload === false), + 3000); + }); + + // This test helper is used for 3 different situations. Check that we observed that + // situations we were trying to hit. + if (options.noFailedSend) { + assert.deepEqual(eventsSeen, ['close']); + } else if (options.closeHappensFirst) { + assert.equal(eventsSeen[0], 'close'); + assert.include(eventsSeen, 'failedSend'); + } else { + assert.equal(eventsSeen[0], 'failedSend'); + assert.include(eventsSeen, 'close'); + } + + // After a successful reconnect, subsequent calls should work normally. + assert.deepEqual(await cliComm._makeRequest(null, null, "methodSync", 1, 2), + {name: 'methodSync', x: 1, y: 2}); + + // Check that all the received messages are in order. + const messageNums = eventSpy.getCalls().map(call => call.args[0].n); + assert.isAtLeast(messageNums.length, 2); + assert.deepEqual(messageNums, nrange(0, messageNums.length), + `Unexpected message sequence ${JSON.stringify(messageNums)}`); + + // Subsequent messages should work normally too. + eventSpy.resetHistory(); + sendNextMessage(); + await waitForCondition(() => eventSpy.callCount > 0); + assert.deepEqual(eventSpy.getCalls().map(call => call.args[0].n), [n - 1]); + } + }); +}); + +// Waits for condFunc() to return true, for up to timeoutMs milliseconds, sleeping for stepMs +// between checks. Returns true if succeeded, false if failed. +async function waitForCondition(condFunc: () => boolean, timeoutMs = 1000, stepMs = 10): Promise { + const end = Date.now() + timeoutMs; + while (Date.now() < end) { + if (condFunc()) { return true; } + await delay(stepMs); + } + return false; +} + +// Returns a range of count consecutive numbers starting with start. +function nrange(start: number, count: number): number[] { + return Array.from(Array(count), (_, i) => start + i); +} + +// Returns a GristWSSettings object, for use with GristWSConnection. +function getWSSettings(docWorkerUrl: string): GristWSSettings { + let clientId: string = 'clientid-abc'; + let counter: number = 0; + return { + makeWebSocket(url: string): any { return new WebSocket(url, undefined, {}); }, + async getTimezone() { return 'UTC'; }, + getPageUrl() { return "http://localhost"; }, + async getDocWorkerUrl() { return docWorkerUrl; }, + getClientId(did: any) { return clientId; }, + getUserSelector() { return ''; }, + updateClientId(did: string, cid: string) { clientId = cid; }, + advanceCounter(): string { return String(counter++); }, + log() { (log as any).debug(...arguments); }, + warn() { (log as any).warn(...arguments); }, + }; +} + +// We'll test reconnects by making a connection through this TcpForwarder, which we'll use to +// simulate disconnects. +export class TcpForwarder { + public port: number|null = null; + private _connections = new Map(); + private _server: Server|null = null; + + constructor(private _serverPort: number) {} + + public async pickForwarderPort(): Promise { + this.port = await getAvailablePort(5834); + return this.port; + } + public async connect() { + await this.disconnect(); + this._server = new Server((sock) => this._onConnect(sock)); + await new Promise((resolve, reject) => + this._server!.on('error', reject).listen(this.port, resolve)); + } + public async disconnectClientSide() { + await Promise.all(Array.from(this._connections.keys(), destroySock)); + if (this._server) { + await new Promise((resolve) => this._server!.close(resolve)); + this._server = null; + } + this.cleanup(); + } + public async disconnectServerSide() { + await Promise.all(Array.from(this._connections.values(), destroySock)); + this.cleanup(); + } + public async disconnect() { + await this.disconnectClientSide(); + await this.disconnectServerSide(); + } + public cleanup() { + const pairs = Array.from(this._connections.entries()); + for (const [clientSock, serverSock] of pairs) { + if (clientSock.destroyed && serverSock.destroyed) { + this._connections.delete(clientSock); + } + } + } + private async _onConnect(clientSock: Socket) { + const serverSock = new Socket(); + await new Promise((resolve, reject) => + serverSock.on('error', reject).connect(this._serverPort, resolve)); + clientSock.pipe(serverSock); + serverSock.pipe(clientSock); + clientSock.on('error', (err) => serverSock.destroy(err)); + serverSock.on('error', (err) => clientSock.destroy(err)); + this._connections.set(clientSock, serverSock); + } +} + +async function destroySock(sock: Socket): Promise { + if (!sock.destroyed) { + await new Promise((resolve, reject) => + sock.on('close', resolve).destroy()); + } +} diff --git a/test/server/customUtil.ts b/test/server/customUtil.ts index 91a4175a..0d3f93ca 100644 --- a/test/server/customUtil.ts +++ b/test/server/customUtil.ts @@ -1,5 +1,5 @@ import {getAppRoot} from 'app/server/lib/places'; -import {fromCallback} from 'bluebird'; +import {fromCallback} from 'app/server/lib/serverUtils'; import * as express from 'express'; import * as http from 'http'; import {AddressInfo, Socket} from 'net'; @@ -8,7 +8,7 @@ import {fixturesRoot} from 'test/server/testUtils'; export interface Serving { url: string; - shutdown: () => void; + shutdown: () => Promise; } @@ -45,7 +45,7 @@ export function serveCustomViews(): Promise { export async function serveSomething(setup: (app: express.Express) => void, port= 0): Promise { const app = express(); const server = http.createServer(app); - await fromCallback((cb: any) => server.listen(port, cb)); + await fromCallback(cb => server.listen(port, cb)); const connections = new Set(); server.on('connection', (conn) => { @@ -53,8 +53,8 @@ export async function serveSomething(setup: (app: express.Express) => void, port conn.on('close', () => connections.delete(conn)); }); - function shutdown() { - server.close(); + async function shutdown() { + await fromCallback(cb => server.close(cb)); for (const conn of connections) { conn.destroy(); } } diff --git a/test/server/lib/DocApi.ts b/test/server/lib/DocApi.ts index a508a44d..3823e375 100644 --- a/test/server/lib/DocApi.ts +++ b/test/server/lib/DocApi.ts @@ -2596,7 +2596,7 @@ function testDocApi() { after(async function() { if (!process.env.TEST_REDIS_URL) { this.skip(); } - serving.shutdown(); + await serving.shutdown(); await redisMonitor.quitAsync(); }); diff --git a/test/server/testUtils.ts b/test/server/testUtils.ts index 2e6a17a0..b1553150 100644 --- a/test/server/testUtils.ts +++ b/test/server/testUtils.ts @@ -137,7 +137,9 @@ export async function captureLog(minLevel: string, callback: () => void|Promise< } } - log.transports.file.level = -1 as any; // Suppress all log output. + if (!process.env.VERBOSE) { + log.transports.file.level = -1 as any; // Suppress all log output. + } log.add(CaptureTransport as any, { captureFunc: capture, name }); // types are off. try { await callback();