mirror of
https://github.com/gristlabs/grist-core.git
synced 2024-10-27 20:44:07 +00:00
(core) Fix issue with 'UNEXPECTED ORDER OF CALLBACKS' in Client.ts.
Summary: - Substantial refactoring of the logic when the server fails to send some messages to a client. - Add seqId numbers to server messages to ensure reliable order. - Add a needReload flag in clientConnect for a clear indication whent the browser client needs to reload the app. - Reproduce some potential failure scenarios in a test case (some of which previously could have led to incorrectly ordered messages). - Convert other Comm tests to typescript. - Tweak logging of Comm and Client to be slightly more concise (in particular, avoid logging sessionId) Note that despite the big refactoring, this only addresses a fairly rare situation, with websocket failures while server is trying to send to the client. It includes no improvements for failures while the client is sending to the server. (I looked for an existing library that would take care of these issues. A relevant article I found is https://docs.microsoft.com/en-us/azure/azure-web-pubsub/howto-develop-reliable-clients, but it doesn't include a library for both ends, and is still in review. Other libraries with similar purposes did not inspire enough confidence.) Test Plan: New test cases, which reproduce some previously problematic scenarios. Reviewers: paulfitz Reviewed By: paulfitz Differential Revision: https://phab.getgrist.com/D3470
This commit is contained in:
parent
9b08666f96
commit
a91d493ffc
@ -24,7 +24,8 @@
|
||||
|
||||
import {GristWSConnection} from 'app/client/components/GristWSConnection';
|
||||
import * as dispose from 'app/client/lib/dispose';
|
||||
import {CommMessage, CommRequest, CommResponse, CommResponseError, ValidEvent} from 'app/common/CommTypes';
|
||||
import * as log from 'app/client/lib/log';
|
||||
import {CommRequest, CommResponse, CommResponseBase, CommResponseError, ValidEvent} from 'app/common/CommTypes';
|
||||
import {UserAction} from 'app/common/DocActions';
|
||||
import {DocListAPI, OpenLocalDocResult} from 'app/common/DocListAPI';
|
||||
import {GristServerAPI} from 'app/common/GristServerAPI';
|
||||
@ -161,7 +162,7 @@ export class Comm extends dispose.Disposable implements GristServerAPI, DocListA
|
||||
public useDocConnection(docId: string): GristWSConnection {
|
||||
const connection = this._connection(docId);
|
||||
connection.useCount += 1;
|
||||
console.log(`Comm.useDocConnection(${docId}): useCount now ${connection.useCount}`);
|
||||
log.debug(`Comm.useDocConnection(${docId}): useCount now ${connection.useCount}`);
|
||||
return connection;
|
||||
}
|
||||
|
||||
@ -175,7 +176,7 @@ export class Comm extends dispose.Disposable implements GristServerAPI, DocListA
|
||||
const connection = this._connections.get(docId);
|
||||
if (connection) {
|
||||
connection.useCount -= 1;
|
||||
console.log(`Comm.releaseDocConnection(${docId}): useCount now ${connection.useCount}`);
|
||||
log.debug(`Comm.releaseDocConnection(${docId}): useCount now ${connection.useCount}`);
|
||||
// Dispose the connection if it is no longer in use (except in "classic grist").
|
||||
if (!this._singleWorkerMode && connection.useCount <= 0) {
|
||||
this.stopListening(connection);
|
||||
@ -249,7 +250,7 @@ export class Comm extends dispose.Disposable implements GristServerAPI, DocListA
|
||||
methodName: string, ...args: any[]): Promise<any> {
|
||||
const connection = this._connection(docId);
|
||||
if (clientId !== null && clientId !== connection.clientId) {
|
||||
console.log("Comm: Rejecting " + methodName + " for outdated clientId %s (current %s)",
|
||||
log.warn("Comm: Rejecting " + methodName + " for outdated clientId %s (current %s)",
|
||||
clientId, connection.clientId);
|
||||
return Promise.reject(new Error('Comm: outdated session'));
|
||||
}
|
||||
@ -258,7 +259,7 @@ export class Comm extends dispose.Disposable implements GristServerAPI, DocListA
|
||||
method: methodName,
|
||||
args
|
||||
};
|
||||
console.log("Comm request #" + request.reqId + " " + methodName, request.args);
|
||||
log.debug("Comm request #" + request.reqId + " " + methodName, request.args);
|
||||
return new Promise((resolve, reject) => {
|
||||
const requestMsg = JSON.stringify(request);
|
||||
const sent = connection.send(requestMsg);
|
||||
@ -304,7 +305,7 @@ export class Comm extends dispose.Disposable implements GristServerAPI, DocListA
|
||||
const error = "GristWSConnection disposed";
|
||||
for (const [reqId, req] of this.pendingRequests) {
|
||||
if (reqMatchesConnection(req.docId, docId)) {
|
||||
console.log(`Comm: Rejecting req #${reqId} ${req.methodName}: ${error}`);
|
||||
log.warn(`Comm: Rejecting req #${reqId} ${req.methodName}: ${error}`);
|
||||
this.pendingRequests.delete(reqId);
|
||||
req.reject(new Error('Comm: ' + error));
|
||||
}
|
||||
@ -319,8 +320,7 @@ export class Comm extends dispose.Disposable implements GristServerAPI, DocListA
|
||||
* We should watch timeouts, and log something when there is no response for a while.
|
||||
* There is probably no need for callers to deal with timeouts.
|
||||
*/
|
||||
private _onServerMessage(docId: string|null,
|
||||
message: CommResponse | CommResponseError | CommMessage) {
|
||||
private _onServerMessage(docId: string|null, message: CommResponseBase) {
|
||||
if ('reqId' in message) {
|
||||
const reqId = message.reqId;
|
||||
const r = this.pendingRequests.get(reqId);
|
||||
@ -331,7 +331,7 @@ export class Comm extends dispose.Disposable implements GristServerAPI, DocListA
|
||||
// We should not let the user see the document any more. Let's reload the
|
||||
// page, reducing this to the problem of arriving at a document the user
|
||||
// doesn't have access to, which is already handled.
|
||||
console.log(`Comm response #${reqId} ${r.methodName} issued AUTH_NO_VIEW - closing`);
|
||||
log.warn(`Comm response #${reqId} ${r.methodName} issued AUTH_NO_VIEW - closing`);
|
||||
window.location.reload();
|
||||
}
|
||||
if (isCommResponseError(message)) {
|
||||
@ -345,19 +345,19 @@ export class Comm extends dispose.Disposable implements GristServerAPI, DocListA
|
||||
err.details = message.details;
|
||||
}
|
||||
err.shouldFork = message.shouldFork;
|
||||
console.log(`Comm response #${reqId} ${r.methodName} ERROR:${code} ${message.error}`
|
||||
log.warn(`Comm response #${reqId} ${r.methodName} ERROR:${code} ${message.error}`
|
||||
+ (message.shouldFork ? ` (should fork)` : ''));
|
||||
this._reportError?.(err);
|
||||
r.reject(err);
|
||||
} else {
|
||||
console.log(`Comm response #${reqId} ${r.methodName} OK`);
|
||||
log.debug(`Comm response #${reqId} ${r.methodName} OK`);
|
||||
r.resolve(message.data);
|
||||
}
|
||||
} finally {
|
||||
this.pendingRequests.delete(reqId);
|
||||
}
|
||||
} else {
|
||||
console.log("Comm: Response to unknown reqId " + reqId);
|
||||
log.warn("Comm: Response to unknown reqId " + reqId);
|
||||
}
|
||||
} else {
|
||||
if (message.type === 'clientConnect') {
|
||||
@ -372,10 +372,10 @@ export class Comm extends dispose.Disposable implements GristServerAPI, DocListA
|
||||
|
||||
// Another asynchronous message that's not a response. Broadcast it as an event.
|
||||
if (ValidEvent.guard(message.type)) {
|
||||
console.log("Comm: Triggering event " + message.type);
|
||||
log.debug("Comm: Triggering event " + message.type);
|
||||
this.trigger(message.type, message);
|
||||
} else {
|
||||
console.log("Comm: Server message of unknown type " + message.type);
|
||||
log.warn("Comm: Server message of unknown type " + message.type);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -395,7 +395,7 @@ export class Comm extends dispose.Disposable implements GristServerAPI, DocListA
|
||||
r.sent = connection.send(r.requestMsg);
|
||||
}
|
||||
if (error) {
|
||||
console.log("Comm: Rejecting req #" + reqId + " " + r.methodName + ": " + error);
|
||||
log.warn("Comm: Rejecting req #" + reqId + " " + r.methodName + ": " + error);
|
||||
r.reject(new Error('Comm: ' + error));
|
||||
this.pendingRequests.delete(reqId);
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
import {get as getBrowserGlobals} from 'app/client/lib/browserGlobals';
|
||||
import {guessTimezone} from 'app/client/lib/guessTimezone';
|
||||
import {getWorker} from 'app/client/models/gristConfigCache';
|
||||
import {CommResponseBase} from 'app/common/CommTypes';
|
||||
import * as gutil from 'app/common/gutil';
|
||||
import {addOrgToPath, docUrl, getGristConfig} from 'app/common/urlUtils';
|
||||
import {UserAPI, UserAPIImpl} from 'app/common/UserAPI';
|
||||
@ -121,6 +122,10 @@ export class GristWSConnection extends Disposable {
|
||||
private _wantReconnect: boolean = true;
|
||||
private _ws: WebSocket|null = null;
|
||||
|
||||
// The server sends incremental seqId numbers with each message on the connection, starting with
|
||||
// 0. We keep track of them to allow for seamless reconnects.
|
||||
private _lastReceivedSeqId: number|null = null;
|
||||
|
||||
constructor(private _settings: GristWSSettings = new GristWSSettingsBrowser()) {
|
||||
super();
|
||||
this._clientCounter = _settings.advanceCounter();
|
||||
@ -204,45 +209,14 @@ export class GristWSConnection extends Disposable {
|
||||
* @event serverMessage Triggered when a message arrives from the server. Callbacks receive
|
||||
* the raw message data as an additional argument.
|
||||
*/
|
||||
public onmessage(ev: any) {
|
||||
this._log('GristWSConnection: onmessage (%d bytes)', ev.data.length);
|
||||
public onmessage(ev: MessageEvent) {
|
||||
if (!this._ws) {
|
||||
// It's possible to receive a message after we disconnect, at least in tests (where
|
||||
// WebSocket is a node library). Ignoring is easier than unsubscribing properly.
|
||||
return;
|
||||
}
|
||||
this._scheduleHeartbeat();
|
||||
const message = JSON.parse(ev.data);
|
||||
|
||||
// clientConnect is the first message from the server that sets the clientId. We only consider
|
||||
// the connection established once we receive it.
|
||||
if (message.type === 'clientConnect') {
|
||||
if (this._established) {
|
||||
this._log("GristWSConnection skipping duplicate 'clientConnect' message");
|
||||
return;
|
||||
}
|
||||
this._established = true;
|
||||
// Add a flag to the message to indicate if the active session changed, and warrants a reload.
|
||||
message.resetClientId = (message.clientId !== this._clientId && !this._firstConnect);
|
||||
this._log(`GristWSConnection established: clientId ${message.clientId} counter ${this._clientCounter}` +
|
||||
` resetClientId ${message.resetClientId}`);
|
||||
if (message.dup) {
|
||||
this._warn("GristWSConnection missed initial 'clientConnect', processing its duplicate");
|
||||
}
|
||||
if (message.clientId !== this._clientId) {
|
||||
this._clientId = message.clientId;
|
||||
if (this._settings) {
|
||||
this._settings.updateClientId(this._assignmentId, message.clientId);
|
||||
}
|
||||
}
|
||||
this._firstConnect = false;
|
||||
this.trigger('connectState', true);
|
||||
|
||||
// Process any missed messages. (Should only have any if resetClientId is false.)
|
||||
for (const msg of message.missedMessages) {
|
||||
this.trigger('serverMessage', JSON.parse(msg));
|
||||
}
|
||||
}
|
||||
if (!this._established) {
|
||||
this._log("GristWSConnection not yet established; ignoring message", message);
|
||||
return;
|
||||
}
|
||||
this.trigger('serverMessage', message);
|
||||
this._processReceivedMessage(ev.data, true);
|
||||
}
|
||||
|
||||
public send(message: any) {
|
||||
@ -255,6 +229,70 @@ export class GristWSConnection extends Disposable {
|
||||
return true;
|
||||
}
|
||||
|
||||
private _processReceivedMessage(msgData: string, processClientConnect: boolean) {
|
||||
this._log('GristWSConnection: onmessage (%d bytes)', msgData.length);
|
||||
const message: CommResponseBase & {seqId: number} = JSON.parse(msgData);
|
||||
|
||||
if (typeof message.seqId === 'number') {
|
||||
// For sequenced messages (all except clientConnect), check that seqId is as expected, and
|
||||
// update this._lastReceivedSeqId.
|
||||
if (this._lastReceivedSeqId !== null && message.seqId !== this._lastReceivedSeqId + 1) {
|
||||
this._log('GristWSConnection: unexpected seqId after %s: %s', this._lastReceivedSeqId, message.seqId);
|
||||
this.disconnect();
|
||||
return;
|
||||
}
|
||||
this._lastReceivedSeqId = message.seqId;
|
||||
}
|
||||
|
||||
// clientConnect is the first message from the server that sets the clientId. We only consider
|
||||
// the connection established once we receive it.
|
||||
let needReload = false;
|
||||
if ('type' in message && message.type === 'clientConnect' && processClientConnect) {
|
||||
if (this._established) {
|
||||
this._log("GristWSConnection skipping duplicate 'clientConnect' message");
|
||||
return;
|
||||
}
|
||||
this._established = true;
|
||||
|
||||
// Update flag to indicate if the active session changed, and warrants a reload. (The server
|
||||
// should be setting needReload too, so this shouldn't be strictly needed.)
|
||||
if (message.clientId !== this._clientId && !this._firstConnect) {
|
||||
message.needReload = true;
|
||||
}
|
||||
needReload = Boolean(message.needReload);
|
||||
this._log(`GristWSConnection established: clientId ${message.clientId} counter ${this._clientCounter}` +
|
||||
` needReload ${needReload}`);
|
||||
if (message.dup) {
|
||||
this._warn("GristWSConnection missed initial 'clientConnect', processing its duplicate");
|
||||
}
|
||||
if (message.clientId !== this._clientId) {
|
||||
this._clientId = message.clientId;
|
||||
this._settings.updateClientId(this._assignmentId, message.clientId);
|
||||
}
|
||||
this._firstConnect = false;
|
||||
this.trigger('connectState', true);
|
||||
|
||||
// Process any missed messages. (Should only have any if needReload is false.)
|
||||
if (!needReload && message.missedMessages) {
|
||||
for (const msg of message.missedMessages) {
|
||||
this._processReceivedMessage(msg, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!this._established) {
|
||||
this._log("GristWSConnection not yet established; ignoring message", message);
|
||||
return;
|
||||
}
|
||||
|
||||
if (needReload) {
|
||||
// If we are unable to resume this connection, disconnect to avoid accept more messages on
|
||||
// this connection, they are likely to only cause errors. Elsewhere, the app will reload.
|
||||
this._log('GristWSConnection: needReload');
|
||||
this.disconnect();
|
||||
}
|
||||
this.trigger('serverMessage', message);
|
||||
}
|
||||
|
||||
// unschedule any pending heartbeat message
|
||||
private _clearHeartbeat() {
|
||||
if (this._heartbeatTimeout) {
|
||||
@ -309,18 +347,16 @@ export class GristWSConnection extends Disposable {
|
||||
|
||||
this._ws.onmessage = this.onmessage.bind(this);
|
||||
|
||||
this._ws.onerror = (ev: Event) => {
|
||||
this._log('GristWSConnection: onerror', ev);
|
||||
this._ws.onerror = (ev: Event|ErrorEvent) => {
|
||||
this._log('GristWSConnection: onerror', 'error' in ev ? String(ev.error) : ev);
|
||||
};
|
||||
|
||||
this._ws.onclose = () => {
|
||||
if (this._settings) {
|
||||
this._log('GristWSConnection: onclose');
|
||||
}
|
||||
if (this.isDisposed()) {
|
||||
return;
|
||||
}
|
||||
|
||||
this._log('GristWSConnection: onclose');
|
||||
this._established = false;
|
||||
this._ws = null;
|
||||
this.trigger('connectState', false);
|
||||
@ -344,6 +380,9 @@ export class GristWSConnection extends Disposable {
|
||||
url.searchParams.append('clientId', this._clientId || '0');
|
||||
url.searchParams.append('counter', this._clientCounter);
|
||||
url.searchParams.append('newClient', String(isReconnecting ? 0 : 1));
|
||||
if (isReconnecting && this._lastReceivedSeqId !== null) {
|
||||
url.searchParams.append('lastSeqId', String(this._lastReceivedSeqId));
|
||||
}
|
||||
url.searchParams.append('browserSettings', JSON.stringify({timezone}));
|
||||
url.searchParams.append('user', this._settings.getUserSelector());
|
||||
return url.href;
|
||||
@ -362,29 +401,8 @@ export class GristWSConnection extends Disposable {
|
||||
}
|
||||
}
|
||||
|
||||
// Log a message using the configured logger, or send it to console if no
|
||||
// logger available.
|
||||
private _log(...args: any[]): void {
|
||||
if (!this._settings) {
|
||||
// tslint:disable-next-line:no-console
|
||||
console.warn('log called without settings in GristWSConnection');
|
||||
console.log(...args); // tslint:disable-line:no-console
|
||||
} else {
|
||||
this._settings.log(...args);
|
||||
}
|
||||
}
|
||||
|
||||
// Log a warning using the configured logger, or send it to console if no
|
||||
// logger available.
|
||||
private _warn(...args: any[]): void {
|
||||
if (!this._settings) {
|
||||
// tslint:disable-next-line:no-console
|
||||
console.warn('warn called without settings in GristWSConnection');
|
||||
console.warn(...args); // tslint:disable-line:no-console
|
||||
} else {
|
||||
this._settings.warn(...args);
|
||||
}
|
||||
}
|
||||
private _log = (...args: any[]) => this._settings.log(...args);
|
||||
private _warn = (...args: any[]) => this._settings.warn(...args);
|
||||
}
|
||||
|
||||
Object.assign(GristWSConnection.prototype, BackboneEvents);
|
||||
|
@ -134,7 +134,7 @@ export class App extends DisposableWithEvents {
|
||||
}, this, isHelpPaneVisible));
|
||||
|
||||
this.listenTo(this.comm, 'clientConnect', (message) => {
|
||||
console.log(`App clientConnect event: resetClientId ${message.resetClientId} version ${message.serverVersion}`);
|
||||
console.log(`App clientConnect event: needReload ${message.needReload} version ${message.serverVersion}`);
|
||||
this._settings(message.settings);
|
||||
if (message.serverVersion === 'dead' || (this._serverVersion && this._serverVersion !== message.serverVersion)) {
|
||||
console.log("Upgrading...");
|
||||
@ -142,9 +142,9 @@ export class App extends DisposableWithEvents {
|
||||
return this.reload();
|
||||
}
|
||||
this._serverVersion = message.serverVersion;
|
||||
// If the clientId changed, then we need to reload any open documents. We'll simply reload the
|
||||
// active component of the App regardless of what it is.
|
||||
if (message.resetClientId) {
|
||||
// Reload any open documents if needed (if clientId changed, or client can't get all missed
|
||||
// messages). We'll simply reload the active component of the App regardless of what it is.
|
||||
if (message.needReload) {
|
||||
this.reloadPane();
|
||||
}
|
||||
});
|
||||
|
@ -53,6 +53,8 @@ export interface CommMessageBase {
|
||||
export type CommDocMessage = CommDocUserAction | CommDocUsage | CommDocShutdown | CommDocError;
|
||||
export type CommMessage = CommDocMessage | CommDocListAction | CommClientConnect;
|
||||
|
||||
export type CommResponseBase = CommResponse | CommResponseError | CommMessage;
|
||||
|
||||
export type CommDocEventType = CommDocMessage['type']
|
||||
|
||||
/**
|
||||
@ -130,8 +132,11 @@ export interface CommClientConnect extends CommMessageBase {
|
||||
// the server.
|
||||
clientId: string;
|
||||
|
||||
// If set, the reconnecting client cannot be sent all missed messages, and needs to reload.
|
||||
needReload?: boolean;
|
||||
|
||||
// Array of serialized messages missed from the server while disconnected.
|
||||
missedMessages: string[];
|
||||
missedMessages?: string[];
|
||||
|
||||
// Which version the server reports for itself.
|
||||
serverVersion?: string;
|
||||
|
@ -73,12 +73,12 @@ export class Client {
|
||||
|
||||
private _session: ScopedSession|null = null;
|
||||
|
||||
private _log = new LogMethods('Client ', (s: null) => this.getLogMeta());
|
||||
private _log = new LogMethods('Client ', (extra?: object|null) => this.getLogMeta(extra || {}));
|
||||
|
||||
// Maps docFDs to DocSession objects.
|
||||
private _docFDs: Array<DocSession|null> = [];
|
||||
|
||||
private _missedMessages: string[] = [];
|
||||
private _missedMessages = new Map<number, string>();
|
||||
private _destroyTimer: NodeJS.Timer|null = null;
|
||||
private _destroyed: boolean = false;
|
||||
private _websocket: WebSocket|null;
|
||||
@ -88,13 +88,15 @@ export class Client {
|
||||
private _userName: string|null = null;
|
||||
private _firstLoginAt: Date|null = null;
|
||||
private _isAnonymous: boolean = false;
|
||||
private _nextSeqId: number = 0; // Next sequence-ID for messages sent to the client
|
||||
|
||||
// Identifier for the current GristWSConnection object connected to this client.
|
||||
private _counter: string|null = null;
|
||||
|
||||
constructor(
|
||||
private _comm: Comm,
|
||||
private _methods: Map<string, ClientMethod>,
|
||||
private _locale: string,
|
||||
// Identifier for the current GristWSConnection object connected to this client.
|
||||
private _counter: string|null,
|
||||
) {
|
||||
this.clientId = generateClientId();
|
||||
}
|
||||
@ -105,13 +107,14 @@ export class Client {
|
||||
return this._locale;
|
||||
}
|
||||
|
||||
public setConnection(websocket: WebSocket, browserSettings: BrowserSettings) {
|
||||
public setConnection(websocket: WebSocket, counter: string|null, browserSettings: BrowserSettings) {
|
||||
this._websocket = websocket;
|
||||
this._counter = counter;
|
||||
this.browserSettings = browserSettings;
|
||||
|
||||
websocket.on('error', this._onError.bind(this));
|
||||
websocket.on('close', this._onClose.bind(this));
|
||||
websocket.on('message', this._onMessage.bind(this));
|
||||
websocket.on('error', (err) => this._onError(err));
|
||||
websocket.on('close', () => this._onClose());
|
||||
websocket.on('message', (msg: string) => this._onMessage(msg));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -138,25 +141,10 @@ export class Client {
|
||||
this._docFDs[fd] = null;
|
||||
}
|
||||
|
||||
// Check that client still has access to all documents. Used to determine whether
|
||||
// a Comm client can be safely reused after a reconnect. Without this check, the client
|
||||
// would be reused even if access to a document has been lost (although an error would be
|
||||
// issued later, on first use of the document).
|
||||
public async isAuthorized(): Promise<boolean> {
|
||||
for (const docFD of this._docFDs) {
|
||||
try {
|
||||
if (docFD !== null) { await docFD.authorizer.assertAccess('viewers'); }
|
||||
} catch (e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes all docs.
|
||||
* Closes all docs. Returns the number of documents closed.
|
||||
*/
|
||||
public closeAllDocs() {
|
||||
public closeAllDocs(): number {
|
||||
let count = 0;
|
||||
for (let fd = 0; fd < this._docFDs.length; fd++) {
|
||||
const docSession = this._docFDs[fd];
|
||||
@ -168,7 +156,7 @@ export class Client {
|
||||
}
|
||||
this._docFDs[fd] = null;
|
||||
}
|
||||
this._log.debug(null, "closeAllDocs() closed %d doc(s)", count);
|
||||
return count;
|
||||
}
|
||||
|
||||
public interruptConnection() {
|
||||
@ -187,7 +175,8 @@ export class Client {
|
||||
return;
|
||||
}
|
||||
|
||||
const message: string = JSON.stringify(messageObj);
|
||||
const seqId = this._nextSeqId++;
|
||||
const message: string = JSON.stringify({...messageObj, seqId});
|
||||
|
||||
// Log something useful about the message being sent.
|
||||
if ('error' in messageObj && messageObj.error) {
|
||||
@ -199,89 +188,123 @@ export class Client {
|
||||
try {
|
||||
await this._sendToWebsocket(message);
|
||||
} catch (err) {
|
||||
// Sending failed. Presumably we should be getting onClose around now too.
|
||||
// NOTE: if this handler is run after onClose, we could have messages end up out of order.
|
||||
// Let's check to make sure. If this can happen, we need to refactor for correct ordering.
|
||||
if (!this._websocket) {
|
||||
this._log.error(null, "sendMessage: UNEXPECTED ORDER OF CALLBACKS");
|
||||
// Sending failed. Add the message to missedMessages.
|
||||
this._log.warn(null, "sendMessage: queuing after send error:", err.toString());
|
||||
this._missedMessages.set(seqId, message);
|
||||
|
||||
// NOTE: A successful send does NOT mean the message was received. For a better system, see
|
||||
// https://docs.microsoft.com/en-us/azure/azure-web-pubsub/howto-develop-reliable-clients
|
||||
// (keeping a copy of messages until acked). With our system, we are more likely to be
|
||||
// lacking the needed messages on reconnect, and having to reset the client.
|
||||
}
|
||||
this._log.warn(null, "sendMessage: queuing after send error: %s", err.toString());
|
||||
this._missedMessages.push(message);
|
||||
}
|
||||
} else if (this._missedMessages.length < clientMaxMissedMessages) {
|
||||
} else if (this._missedMessages.size < clientMaxMissedMessages) {
|
||||
// Queue up the message.
|
||||
this._missedMessages.push(message);
|
||||
this._missedMessages.set(seqId, message);
|
||||
} else {
|
||||
// Too many messages queued. Boot the client now, to make it reset when/if it reconnects.
|
||||
this._log.error(null, "sendMessage: too many messages queued; booting client");
|
||||
if (this._destroyTimer) {
|
||||
clearTimeout(this._destroyTimer);
|
||||
this._destroyTimer = null;
|
||||
}
|
||||
this._log.warn(null, "sendMessage: too many messages queued; booting client");
|
||||
this.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called from Comm.ts to prepare this Client object to accept a new connection that requests
|
||||
* the same clientId. Returns whether this Client is available and ready for this connection.
|
||||
* Called from Comm.ts to decide whether this Client is available to accept a new connection
|
||||
* that requests the same clientId.
|
||||
*/
|
||||
public async reconnect(counter: string|null, newClient: boolean): Promise<boolean> {
|
||||
// Refuse reconnect if another websocket is currently active. It may be a new browser tab,
|
||||
// and will need its own Client object.
|
||||
if (this._websocket) { return false; }
|
||||
|
||||
// Don't reuse this Client object if it's no longer authorized to access the open documents.
|
||||
if (!await this.isAuthorized()) { return false; }
|
||||
|
||||
this._counter = counter;
|
||||
|
||||
this._log.info(null, "existing client reconnected (%d missed messages)", this._missedMessages.length);
|
||||
if (this._destroyTimer) {
|
||||
this._log.warn(null, "clearing scheduled destruction");
|
||||
clearTimeout(this._destroyTimer);
|
||||
this._destroyTimer = null;
|
||||
}
|
||||
if (newClient) {
|
||||
// If newClient is set, then we assume that the browser client lost its state (e.g.
|
||||
// reloaded the page), so we treat it as a disconnect followed by a new connection to the
|
||||
// same state. At the moment, this only means that we close all docs.
|
||||
if (this._missedMessages.length) {
|
||||
this._log.warn(null, "clearing missed messages for new client");
|
||||
}
|
||||
this._missedMessages.length = 0;
|
||||
this.closeAllDocs();
|
||||
}
|
||||
return true;
|
||||
public canAcceptConnection(): boolean {
|
||||
// Refuse reconnect if another websocket is currently active. It may be a new browser tab
|
||||
// (which may reuse clientId from a copy of sessinStorage). It will need its own Client object.
|
||||
return !this._websocket;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send the initial 'clientConnect' message after receiving a connection.
|
||||
* Complete initialization of a new connection, and send the initial 'clientConnect' message.
|
||||
* See comments at the top of app/server/lib/Comm.ts for some relevant notes.
|
||||
*/
|
||||
public async sendConnectMessage(parts: Partial<CommClientConnect>): Promise<void> {
|
||||
this._log.debug(null, `sending clientConnect with ${this._missedMessages.length} missed messages`);
|
||||
public async sendConnectMessage(
|
||||
newClient: boolean, reuseClient: boolean, lastSeqId: number|null, parts: Partial<CommClientConnect>
|
||||
): Promise<void> {
|
||||
if (this._destroyTimer) {
|
||||
clearTimeout(this._destroyTimer);
|
||||
this._destroyTimer = null;
|
||||
}
|
||||
|
||||
let missedMessages: string[]|undefined = undefined;
|
||||
let seamlessReconnect = false;
|
||||
if (!newClient && reuseClient && await this._isAuthorized()) {
|
||||
// Websocket-level reconnect: existing browser tab reconnected to an existing Client object.
|
||||
// We also check that the Client is still authorized to access all open docs. If not, we'll
|
||||
// close the docs and tell the Client to reload the app.
|
||||
missedMessages = this.getMissedMessages(lastSeqId);
|
||||
if (missedMessages) {
|
||||
// We have all the needed messages (possibly an empty array); can do a seamless reconnect.
|
||||
seamlessReconnect = true;
|
||||
}
|
||||
}
|
||||
|
||||
// We collected any missed messages we need; clear the stored map of them.
|
||||
this._missedMessages.clear();
|
||||
|
||||
let docsClosed: number|null = null;
|
||||
if (!seamlessReconnect) {
|
||||
// The browser client can't recover from missed messages and will need to reopen docs. Close
|
||||
// all docs we kept open. If it's a new Client object, this is a no-op.
|
||||
docsClosed = this.closeAllDocs();
|
||||
}
|
||||
|
||||
// An existing browser client that can't recover, or that connected to a new Client object,
|
||||
// will need to reopen docs. Tell it to reload.
|
||||
const needReload = !newClient && !seamlessReconnect;
|
||||
|
||||
this._log.debug({newClient, needReload, docsClosed, missedMessages: missedMessages?.length},
|
||||
'sending clientConnect');
|
||||
|
||||
// Don't use sendMessage here, since we don't want to queue up this message on failure.
|
||||
const clientConnectMsg: CommClientConnect = {
|
||||
...parts,
|
||||
type: 'clientConnect',
|
||||
clientId: this.clientId,
|
||||
missedMessages: this._missedMessages.slice(0),
|
||||
profile: this._profile,
|
||||
missedMessages,
|
||||
needReload,
|
||||
};
|
||||
// If reconnecting a client with missed messages, clear them now.
|
||||
this._missedMessages.length = 0;
|
||||
|
||||
try {
|
||||
await this._sendToWebsocket(JSON.stringify(clientConnectMsg));
|
||||
|
||||
if (needReload) {
|
||||
// If the client should reload, close the socket without waiting. This connection should
|
||||
// not be used anyway, and we want it released by the time the new connection comes in.
|
||||
this._websocket?.close();
|
||||
return;
|
||||
}
|
||||
|
||||
// A heavy-handed fix to T396, since 'clientConnect' is sometimes not seen in the browser,
|
||||
// (seemingly when the 'message' event is triggered before 'open' on the native WebSocket.)
|
||||
// See also my report at https://stackoverflow.com/a/48411315/328565
|
||||
await delay(250);
|
||||
|
||||
if (this._destroyed || this._websocket?.readyState !== WebSocket.OPEN) {
|
||||
this._log.debug(null, `websocket closed right after clientConnect`);
|
||||
} else {
|
||||
if (!this._destroyed && this._websocket?.readyState === WebSocket.OPEN) {
|
||||
await this._sendToWebsocket(JSON.stringify({...clientConnectMsg, dup: true}));
|
||||
}
|
||||
} catch (err) {
|
||||
// It's possible that the connection was closed while we were preparing this response.
|
||||
// We just warn, and let _onClose() take care of cleanup.
|
||||
this._log.warn(null, "failed to prepare or send clientConnect:", err.toString());
|
||||
}
|
||||
}
|
||||
|
||||
// Get messages in order of their key in the _missedMessages map.
|
||||
public getMissedMessages(lastSeqId: number|null): string[]|undefined {
|
||||
const result: string[] = [];
|
||||
if (lastSeqId !== null) {
|
||||
for (let i = lastSeqId + 1; i < this._nextSeqId; i++) {
|
||||
const m = this._missedMessages.get(i);
|
||||
if (m === undefined) { return; }
|
||||
result.push(m);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
// Assigns the given ScopedSession to the client.
|
||||
@ -302,11 +325,13 @@ export class Client {
|
||||
* object and clientId.
|
||||
*/
|
||||
public destroy() {
|
||||
this._log.info(null, "client gone");
|
||||
this.closeAllDocs();
|
||||
const docsClosed = this.closeAllDocs();
|
||||
this._log.info({docsClosed}, "client gone");
|
||||
if (this._destroyTimer) {
|
||||
clearTimeout(this._destroyTimer);
|
||||
this._destroyTimer = null;
|
||||
}
|
||||
this._missedMessages.clear();
|
||||
this._comm.removeClient(this);
|
||||
this._destroyed = true;
|
||||
}
|
||||
@ -379,8 +404,7 @@ export class Client {
|
||||
throw new ApiError(this._profile ? `user not known: ${this._profile.email}` : 'user not set', 403);
|
||||
}
|
||||
|
||||
public getLogMeta() {
|
||||
const meta: {[key: string]: any} = {};
|
||||
public getLogMeta(meta: {[key: string]: any} = {}) {
|
||||
if (this._profile) { meta.email = this._profile.email; }
|
||||
// We assume the _userId has already been cached, which will be true always (for all practical
|
||||
// purposes) because it's set when the Authorizer checks this client.
|
||||
@ -455,6 +479,21 @@ export class Client {
|
||||
undefined;
|
||||
}
|
||||
|
||||
// Check that client still has access to all documents. Used to determine whether
|
||||
// a Comm client can be safely reused after a reconnect. Without this check, the client
|
||||
// would be reused even if access to a document has been lost (although an error would be
|
||||
// issued later, on first use of the document).
|
||||
private async _isAuthorized(): Promise<boolean> {
|
||||
for (const docFD of this._docFDs) {
|
||||
try {
|
||||
if (docFD !== null) { await docFD.authorizer.assertAccess('viewers'); }
|
||||
} catch (e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// Returns the next unused docFD number.
|
||||
private _getNextDocFD(): number {
|
||||
let fd = 0;
|
||||
@ -478,7 +517,6 @@ export class Client {
|
||||
* Processes the closing of a websocket.
|
||||
*/
|
||||
private _onClose() {
|
||||
this._log.info(null, "onClose");
|
||||
this._websocket?.removeAllListeners();
|
||||
|
||||
// Remove all references to the websocket.
|
||||
@ -490,7 +528,7 @@ export class Client {
|
||||
this._log.warn(null, "clearing previously scheduled destruction");
|
||||
clearTimeout(this._destroyTimer);
|
||||
}
|
||||
this._log.warn(null, "will discard client in %s sec", Deps.clientRemovalTimeoutMs / 1000);
|
||||
this._log.info(null, "websocket closed; will discard client in %s sec", Deps.clientRemovalTimeoutMs / 1000);
|
||||
this._destroyTimer = setTimeout(() => this.destroy(), Deps.clientRemovalTimeoutMs);
|
||||
}
|
||||
}
|
||||
|
@ -14,6 +14,22 @@
|
||||
* browser window, and should persist across brief disconnects. A Client has a 'clientId'
|
||||
* property, which uniquely identifies a client within the currently running server. Method
|
||||
* registered with Comm always receive a Client object as the first argument.
|
||||
*
|
||||
* NOTES:
|
||||
*
|
||||
* The communication setup involves primarily the modules app/server/lib/{Comm,Client}.ts, and
|
||||
* app/client/components/{Comm,GristWSConnection}.ts. In particular, these implement reconnect
|
||||
* logic, which is particularly confusing as done here because it combines two layers:
|
||||
*
|
||||
* - Websocket-level reconnects, where an existing browser tab may reconnect and attempt to
|
||||
* restore state seamlessly by recovering any missed messages.
|
||||
*
|
||||
* - Application-level reconnects, where even in case of a failed websocket-level reconnect (e.g.
|
||||
* a reloaded browser tab, or existing tab that can't recover missed messages), the tab may
|
||||
* connect to existing state. This matters for undo/redo history (to allow a user to undo after
|
||||
* reloading a browser tab), but the only thing this relies on is preserving the clientId.
|
||||
*
|
||||
* In other words, there is an opportunity for untangling and simplifying.
|
||||
*/
|
||||
|
||||
import {EventEmitter} from 'events';
|
||||
@ -185,7 +201,6 @@ export class Comm extends EventEmitter {
|
||||
* Processes a new websocket connection, and associates the websocket and a Client object.
|
||||
*/
|
||||
private async _onWebSocketConnection(websocket: WebSocket, req: http.IncomingMessage) {
|
||||
log.info("Comm: Got WebSocket connection: %s", req.url);
|
||||
if (this._options.hosts) {
|
||||
// DocWorker ID (/dw/) and version tag (/v/) may be present in this request but are not
|
||||
// needed. addOrgInfo assumes req.url starts with /o/ if present.
|
||||
@ -197,37 +212,37 @@ export class Comm extends EventEmitter {
|
||||
// Parse the cookie in the request to get the sessionId.
|
||||
const sessionId = this.sessions.getSessionIdFromRequest(req);
|
||||
|
||||
const params = new URL(req.url!, `http://${req.headers.host}`).searchParams;
|
||||
const params = new URL(req.url!, `ws://${req.headers.host}`).searchParams;
|
||||
const existingClientId = params.get('clientId');
|
||||
const browserSettings = safeJsonParse(params.get('browserSettings') || '', {});
|
||||
const newClient = (params.get('newClient') === '1');
|
||||
const newClient = (params.get('newClient') !== '0'); // Treat omitted as new, for the sake of tests.
|
||||
const lastSeqIdStr = params.get('lastSeqId');
|
||||
const lastSeqId = lastSeqIdStr ? parseInt(lastSeqIdStr) : null;
|
||||
const counter = params.get('counter');
|
||||
const userSelector = params.get('user') || '';
|
||||
|
||||
const scopedSession = this.getOrCreateSession(sessionId!, req as RequestWithOrg, userSelector);
|
||||
const profile = await this._getSessionProfile(scopedSession, req);
|
||||
|
||||
// Associate an ID with each websocket, reusing the supplied one if it's valid.
|
||||
let client: Client|undefined = this._clients.get(existingClientId!);
|
||||
if (!client || !await client.reconnect(counter, newClient)) {
|
||||
client = new Client(this, this._methods, localeFromRequest(req), counter);
|
||||
let reuseClient = true;
|
||||
if (!client?.canAcceptConnection()) {
|
||||
reuseClient = false;
|
||||
client = new Client(this, this._methods, localeFromRequest(req));
|
||||
this._clients.set(client.clientId, client);
|
||||
}
|
||||
// Add a Session object to the client.
|
||||
log.info(`Comm ${client}: using session ${sessionId}`);
|
||||
const scopedSession = this.getOrCreateSession(sessionId!, req as RequestWithOrg, userSelector);
|
||||
client.setSession(scopedSession);
|
||||
|
||||
// Associate the client with this websocket.
|
||||
client.setConnection(websocket, browserSettings);
|
||||
log.rawInfo('Comm: Got Websocket connection', {...client.getLogMeta(), urlPath: req.url, reuseClient});
|
||||
|
||||
const profile = await this._getSessionProfile(scopedSession, req);
|
||||
client.setSession(scopedSession); // Add a Session object to the client.
|
||||
client.setOrg((req as RequestWithOrg).org || "");
|
||||
client.setProfile(profile);
|
||||
client.setConnection(websocket, counter, browserSettings);
|
||||
|
||||
client.sendConnectMessage({
|
||||
await client.sendConnectMessage(newClient, reuseClient, lastSeqId, {
|
||||
serverVersion: this._serverVersion || version.gitcommit,
|
||||
settings: this._options.settings,
|
||||
})
|
||||
.catch(err => {
|
||||
log.error(`Comm ${client}: failed to prepare or send clientConnect:`, err);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -31,7 +31,7 @@ export function fromCallback<T>(nodeFunc: NodeCallbackFunc<T>): Promise<T> {
|
||||
* @param {Number} optCount: Number of ports to check, defaults to 200.
|
||||
* @returns Promise<Number>: Promise for an available port.
|
||||
*/
|
||||
export function getAvailablePort(firstPort: number = 8000, optCount: number = 200) {
|
||||
export function getAvailablePort(firstPort: number = 8000, optCount: number = 200): Promise<number> {
|
||||
const lastPort = firstPort + optCount - 1;
|
||||
function checkNext(port: number): Promise<number> {
|
||||
if (port > lastPort) {
|
||||
|
547
test/server/Comm.ts
Normal file
547
test/server/Comm.ts
Normal file
@ -0,0 +1,547 @@
|
||||
import {Events as BackboneEvents} from 'backbone';
|
||||
import {promisifyAll} from 'bluebird';
|
||||
import {assert} from 'chai';
|
||||
import * as http from 'http';
|
||||
import {AddressInfo, Server, Socket} from 'net';
|
||||
import * as sinon from 'sinon';
|
||||
import * as WebSocket from 'ws';
|
||||
import * as path from 'path';
|
||||
import * as tmp from 'tmp';
|
||||
|
||||
import {GristWSConnection, GristWSSettings} from 'app/client/components/GristWSConnection';
|
||||
import {Comm as ClientComm} from 'app/client/components/Comm';
|
||||
import * as log from 'app/client/lib/log';
|
||||
import {Comm, sendDocMessage} from 'app/server/lib/Comm';
|
||||
import {Client, ClientMethod} from 'app/server/lib/Client';
|
||||
import {CommClientConnect} from 'app/common/CommTypes';
|
||||
import {delay} from 'app/common/delay';
|
||||
import {isLongerThan} from 'app/common/gutil';
|
||||
import {fromCallback, getAvailablePort} from 'app/server/lib/serverUtils';
|
||||
import {Sessions} from 'app/server/lib/Sessions';
|
||||
import * as testUtils from 'test/server/testUtils';
|
||||
import * as session from '@gristlabs/express-session';
|
||||
|
||||
const SQLiteStore = require('@gristlabs/connect-sqlite3')(session);
|
||||
promisifyAll(SQLiteStore.prototype);
|
||||
|
||||
describe('Comm', function() {
|
||||
|
||||
testUtils.setTmpLogLevel(process.env.VERBOSE ? 'debug' : 'warn');
|
||||
|
||||
// Allow test cases to register afterEach callbacks here for easier cleanup.
|
||||
const cleanup: Array<() => void> = [];
|
||||
|
||||
let server: http.Server;
|
||||
let sessions: Sessions;
|
||||
let comm: Comm|null = null;
|
||||
const sandbox = sinon.createSandbox();
|
||||
|
||||
before(async function() {
|
||||
const sessionDB = tmp.fileSync();
|
||||
const sessionStore = new SQLiteStore({
|
||||
dir: path.dirname(sessionDB.name),
|
||||
db: path.basename(sessionDB.name),
|
||||
table: 'sessions'
|
||||
});
|
||||
// Random string to use for the test session secret.
|
||||
const sessionSecret = 'xkwriagasaqystubgkkbwhqtyyncwqjemyncnmetjpkiwtfzvllejpfneldmoyri';
|
||||
sessions = new Sessions(sessionSecret, sessionStore);
|
||||
});
|
||||
|
||||
function startComm(methods: {[name: string]: ClientMethod}) {
|
||||
server = http.createServer();
|
||||
comm = new Comm(server, {sessions});
|
||||
comm.registerMethods(methods);
|
||||
return fromCallback(cb => server.listen(0, 'localhost', cb));
|
||||
}
|
||||
|
||||
async function stopComm() {
|
||||
comm?.destroyAllClients();
|
||||
return fromCallback(cb => server.close(cb));
|
||||
}
|
||||
|
||||
const assortedMethods: {[name: string]: ClientMethod} = {
|
||||
methodSync: async function(client, x, y) {
|
||||
return {x: x, y: y, name: "methodSync"};
|
||||
},
|
||||
methodError: async function(client, x, y) {
|
||||
throw new Error("fake error");
|
||||
},
|
||||
methodAsync: async function(client, x, y) {
|
||||
await delay(20);
|
||||
return {x: x, y: y, name: "methodAsync"};
|
||||
},
|
||||
methodSend: async function(client, docFD) {
|
||||
sendDocMessage(client, docFD, "fooType" as any, "foo");
|
||||
sendDocMessage(client, docFD, "barType" as any, "bar");
|
||||
}
|
||||
};
|
||||
|
||||
beforeEach(function() {
|
||||
// Silence console messages from client-side Comm.ts.
|
||||
if (!process.env.VERBOSE) {
|
||||
sandbox.stub(log, 'debug');
|
||||
}
|
||||
});
|
||||
|
||||
afterEach(async function() {
|
||||
// Run the cleanup callbacks registered in cleanup().
|
||||
await Promise.all(cleanup.splice(0).map(callback => callback()));
|
||||
|
||||
sandbox.restore();
|
||||
});
|
||||
|
||||
function getMessages(ws: WebSocket, count: number): Promise<any[]> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const messages: object[] = [];
|
||||
ws.on('error', reject);
|
||||
ws.on('message', (msg: string) => {
|
||||
messages.push(JSON.parse(msg));
|
||||
if (messages.length >= count) {
|
||||
resolve(messages);
|
||||
ws.removeListener('error', reject);
|
||||
ws.removeAllListeners('message');
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a promise for the connected websocket.
|
||||
*/
|
||||
function connect() {
|
||||
const ws = new WebSocket('ws://localhost:' + (server.address() as AddressInfo).port);
|
||||
return new Promise<WebSocket>((resolve, reject) => {
|
||||
ws.on('open', () => resolve(ws));
|
||||
ws.on('error', reject);
|
||||
});
|
||||
}
|
||||
|
||||
describe("server methods", function() {
|
||||
let ws: WebSocket;
|
||||
beforeEach(async function() {
|
||||
await startComm(assortedMethods);
|
||||
ws = await connect();
|
||||
await getMessages(ws, 1); // consume a clientConnect message
|
||||
});
|
||||
|
||||
afterEach(async function() {
|
||||
await stopComm();
|
||||
});
|
||||
|
||||
it("should return data for valid calls", async function() {
|
||||
ws.send(JSON.stringify({reqId: 10, method: "methodSync", args: ["hello", "world"]}));
|
||||
const messages = await getMessages(ws, 1);
|
||||
const resp = messages[0];
|
||||
assert.equal(resp.reqId, 10, `Messages received instead: ${JSON.stringify(messages)}`);
|
||||
assert.deepEqual(resp.data, {x: "hello", y: "world", name: "methodSync"});
|
||||
});
|
||||
|
||||
it("should work for async calls", async function() {
|
||||
ws.send(JSON.stringify({reqId: 20, method: "methodAsync", args: ["hello", "world"]}));
|
||||
const messages = await getMessages(ws, 1);
|
||||
const resp = messages[0];
|
||||
assert.equal(resp.reqId, 20);
|
||||
assert.deepEqual(resp.data, {x: "hello", y: "world", name: "methodAsync"});
|
||||
});
|
||||
|
||||
it("should work for out-of-order calls", async function() {
|
||||
ws.send(JSON.stringify({reqId: 30, method: "methodAsync", args: [1, 2]}));
|
||||
ws.send(JSON.stringify({reqId: 31, method: "methodSync", args: [3, 4]}));
|
||||
const messages = await getMessages(ws, 2);
|
||||
assert.equal(messages[0].reqId, 31);
|
||||
assert.deepEqual(messages[0].data, {x: 3, y: 4, name: "methodSync"});
|
||||
assert.equal(messages[1].reqId, 30);
|
||||
assert.deepEqual(messages[1].data, {x: 1, y: 2, name: "methodAsync"});
|
||||
});
|
||||
|
||||
it("should return error when a call fails", async function() {
|
||||
const logMessages = await testUtils.captureLog('warn', async () => {
|
||||
ws.send(JSON.stringify({reqId: 40, method: "methodError", args: ["hello"]}));
|
||||
const messages = await getMessages(ws, 1);
|
||||
const resp = messages[0];
|
||||
assert.equal(resp.reqId, 40);
|
||||
assert.equal(resp.data, undefined);
|
||||
assert(resp.error.indexOf('fake error') >= 0);
|
||||
});
|
||||
testUtils.assertMatchArray(logMessages, [
|
||||
/^warn: Client.* Error: fake error[^]+at methodError/,
|
||||
/^warn: Client.* responding to .* ERROR fake error/,
|
||||
]);
|
||||
});
|
||||
|
||||
it("should return error for unknown methods", async function() {
|
||||
const logMessages = await testUtils.captureLog('warn', async () => {
|
||||
ws.send(JSON.stringify({reqId: 50, method: "someUnknownMethod", args: []}));
|
||||
const messages = await getMessages(ws, 1);
|
||||
const resp = messages[0];
|
||||
assert.equal(resp.reqId, 50);
|
||||
assert.equal(resp.data, undefined);
|
||||
assert(resp.error.indexOf('Unknown method') >= 0);
|
||||
});
|
||||
testUtils.assertMatchArray(logMessages, [
|
||||
/^warn: Client.* Unknown method.*someUnknownMethod/
|
||||
]);
|
||||
});
|
||||
|
||||
it("should support app-level events correctly", async function() {
|
||||
comm!.broadcastMessage('fooType' as any, 'hello');
|
||||
comm!.broadcastMessage('barType' as any, 'world');
|
||||
const messages = await getMessages(ws, 2);
|
||||
assert.equal(messages[0].type, 'fooType');
|
||||
assert.equal(messages[0].data, 'hello');
|
||||
assert.equal(messages[1].type, 'barType');
|
||||
assert.equal(messages[1].data, 'world');
|
||||
});
|
||||
|
||||
it("should support doc-level events", async function() {
|
||||
ws.send(JSON.stringify({reqId: 60, method: "methodSend", args: [13]}));
|
||||
const messages = await getMessages(ws, 3);
|
||||
assert.equal(messages[0].type, 'fooType');
|
||||
assert.equal(messages[0].data, 'foo');
|
||||
assert.equal(messages[0].docFD, 13);
|
||||
assert.equal(messages[1].type, 'barType');
|
||||
assert.equal(messages[1].data, 'bar');
|
||||
assert.equal(messages[1].docFD, 13);
|
||||
assert.equal(messages[2].reqId, 60);
|
||||
assert.equal(messages[2].data, undefined);
|
||||
assert.equal(messages[2].error, undefined);
|
||||
});
|
||||
});
|
||||
|
||||
describe("reconnects", function() {
|
||||
const docId = "docId_abc";
|
||||
this.timeout(10000);
|
||||
|
||||
// Helper to set up a Comm server, a Comm client, and a forwarder between them that allows
|
||||
// simulating disconnects.
|
||||
async function startManagedConnection(methods: {[name: string]: ClientMethod}) {
|
||||
// Start the server Comm, providing a few methods.
|
||||
await startComm(methods);
|
||||
cleanup.push(() => stopComm());
|
||||
|
||||
// Create a forwarder, which we use to test disconnects.
|
||||
const serverPort = (server.address() as AddressInfo).port;
|
||||
const forwarder = new TcpForwarder(serverPort);
|
||||
const forwarderPort = await forwarder.pickForwarderPort();
|
||||
await forwarder.connect();
|
||||
cleanup.push(() => forwarder.disconnect());
|
||||
|
||||
// To create a client-side Comm object, we need to trick GristWSConnection's check for
|
||||
// whether there is a worker to connect to.
|
||||
(global as any).window = undefined;
|
||||
sandbox.stub(global as any, 'window').value({gristConfig: {getWorker: 'STUB', assignmentId: docId}});
|
||||
|
||||
// We also need to get GristWSConnection to use a custom GristWSSettings object, and to
|
||||
// connect to the forwarder's port.
|
||||
const docWorkerUrl = `http://localhost:${forwarderPort}`;
|
||||
const settings = getWSSettings(docWorkerUrl);
|
||||
const stubGristWsCreate = sandbox.stub(GristWSConnection, 'create').callsFake(function(this: any, owner) {
|
||||
return (stubGristWsCreate as any).wrappedMethod.call(this, owner, settings);
|
||||
});
|
||||
|
||||
// Cast with BackboneEvents to allow using cliComm.on().
|
||||
const cliComm = ClientComm.create() as ClientComm & BackboneEvents;
|
||||
cliComm.useDocConnection(docId);
|
||||
cleanup.push(() => cliComm.dispose()); // Dispose after this test ends.
|
||||
|
||||
return {cliComm, forwarder};
|
||||
}
|
||||
|
||||
it('should forward calls on a normal connection', async function() {
|
||||
const {cliComm} = await startManagedConnection(assortedMethods);
|
||||
|
||||
// A couple of regular requests.
|
||||
const resp1 = await cliComm._makeRequest(null, null, "methodSync", "foo", 1);
|
||||
assert.deepEqual(resp1, {name: 'methodSync', x: "foo", y: 1});
|
||||
const resp2 = await cliComm._makeRequest(null, null, "methodAsync", "foo", 2);
|
||||
assert.deepEqual(resp2, {name: 'methodAsync', x: "foo", y: 2});
|
||||
|
||||
// Try calls that return out of order.
|
||||
const [resp3, resp4] = await Promise.all([
|
||||
cliComm._makeRequest(null, null, "methodAsync", "foo", 3),
|
||||
cliComm._makeRequest(null, null, "methodSync", "foo", 4),
|
||||
]);
|
||||
assert.deepEqual(resp3, {name: 'methodAsync', x: "foo", y: 3});
|
||||
assert.deepEqual(resp4, {name: 'methodSync', x: "foo", y: 4});
|
||||
});
|
||||
|
||||
it('should forward missed responses when a server send fails', async function() {
|
||||
await testMissedResponses(true);
|
||||
});
|
||||
it('should forward missed responses when a server send is queued', async function() {
|
||||
await testMissedResponses(false);
|
||||
});
|
||||
|
||||
async function testMissedResponses(sendShouldFail: boolean) {
|
||||
const logMessages = await testUtils.captureLog('debug', async () => {
|
||||
const {cliComm, forwarder} = await startManagedConnection({...assortedMethods,
|
||||
// An extra method that simulates a lost connection on server side prior to response.
|
||||
testDisconnect: async function(client, x, y) {
|
||||
await delay(0); // disconnect on the same tick.
|
||||
await forwarder.disconnectServerSide();
|
||||
if (!sendShouldFail) {
|
||||
// Add a delay to let the 'close' event get noticed first.
|
||||
await delay(20);
|
||||
}
|
||||
return {x: x, y: y, name: "testDisconnect"};
|
||||
},
|
||||
});
|
||||
|
||||
const resp1 = await cliComm._makeRequest(null, null, "methodSync", "foo", 1);
|
||||
assert.deepEqual(resp1, {name: 'methodSync', x: "foo", y: 1});
|
||||
|
||||
// Make more calls, with a disconnect before they return. The server should queue up responses.
|
||||
const resp2Promise = cliComm._makeRequest(null, null, "testDisconnect", "foo", 2);
|
||||
const resp3Promise = cliComm._makeRequest(null, null, "methodAsync", "foo", 3);
|
||||
assert.equal(await isLongerThan(resp2Promise, 250), true);
|
||||
|
||||
// Once we reconnect, the response should arrive.
|
||||
await forwarder.connect();
|
||||
assert.deepEqual(await resp2Promise, {name: 'testDisconnect', x: "foo", y: 2});
|
||||
assert.deepEqual(await resp3Promise, {name: 'methodAsync', x: "foo", y: 3});
|
||||
});
|
||||
|
||||
// Check that we saw the situations we were hoping to test.
|
||||
assert.equal(logMessages.some(m => /^warn: .*send error.*WebSocket is not open/.test(m)), sendShouldFail,
|
||||
`Expected to see a failed send:\n${logMessages.join('\n')}`);
|
||||
}
|
||||
|
||||
it("should receive all server messages in order when send doesn't fail", async function() {
|
||||
await testSendOrdering({noFailedSend: true});
|
||||
});
|
||||
|
||||
it("should order server messages correctly with failedSend before close", async function() {
|
||||
await testSendOrdering({closeHappensFirst: false});
|
||||
});
|
||||
|
||||
it("should order server messages correctly with close before failedSend", async function() {
|
||||
await testSendOrdering({closeHappensFirst: true});
|
||||
});
|
||||
|
||||
async function testSendOrdering(options: {noFailedSend?: boolean, closeHappensFirst?: boolean}) {
|
||||
const eventsSeen: Array<'failedSend'|'close'> = [];
|
||||
|
||||
// Server-side Client object.
|
||||
let ssClient!: Client;
|
||||
|
||||
const {cliComm, forwarder} = await startManagedConnection({
|
||||
...assortedMethods,
|
||||
// A method just to help us get a handle on the server-side Client object.
|
||||
setClient: async (client) => { ssClient = client; },
|
||||
});
|
||||
|
||||
// Intercept the call to _onClose to know when it occurs, since we are trying to hit a
|
||||
// situation where 'close' and 'failedSend' events happen in either order.
|
||||
const stubOnClose = sandbox.stub(Client.prototype as any, '_onClose')
|
||||
.callsFake(function(this: Client) {
|
||||
eventsSeen.push('close');
|
||||
return (stubOnClose as any).wrappedMethod.apply(this, arguments);
|
||||
});
|
||||
|
||||
// Intercept calls to client.sendMessage(), to know when it fails, and possibly to delay the
|
||||
// failures to hit a particular order in which 'close' and 'failedSend' events are seen by
|
||||
// Client.ts. This is the only reliable way I found to reproduce this order of events.
|
||||
const stubSendToWebsocket = sandbox.stub(Client.prototype as any, '_sendToWebsocket')
|
||||
.callsFake(async function(this: Client) {
|
||||
try {
|
||||
return await (stubSendToWebsocket as any).wrappedMethod.apply(this, arguments);
|
||||
} catch (err) {
|
||||
if (options.closeHappensFirst) { await delay(100); }
|
||||
eventsSeen.push('failedSend');
|
||||
throw err;
|
||||
}
|
||||
});
|
||||
|
||||
// Watch the events received all the way on the client side.
|
||||
const eventSpy = sinon.spy();
|
||||
const clientConnectSpy = sinon.spy();
|
||||
cliComm.on('docUserAction', eventSpy);
|
||||
cliComm.on('clientConnect', clientConnectSpy);
|
||||
|
||||
// We need to simulate an important property of the browser client: when needReload is set
|
||||
// in the clientConnect message, we are expected to reload the app. In the test, we replace
|
||||
// the GristWSConnection.
|
||||
cliComm.on('clientConnect', async (msg: CommClientConnect) => {
|
||||
if (msg.needReload) {
|
||||
await delay(0);
|
||||
cliComm.releaseDocConnection(docId);
|
||||
cliComm.useDocConnection(docId);
|
||||
}
|
||||
});
|
||||
|
||||
// Make the first event that gives us access to the Client object (ssClient).
|
||||
await cliComm._makeRequest(null, null, "setClient", "foo", 1);
|
||||
|
||||
// Send large buffers, to fill up the socket's buffers to get it to block.
|
||||
const data = "x".repeat(1.0e7);
|
||||
const makeMessage = (n: number) => ({type: 'docUserAction', n, data});
|
||||
|
||||
let n = 0;
|
||||
const sendPromises: Array<Promise<void>> = [];
|
||||
const sendNextMessage= () => sendPromises.push(ssClient.sendMessage(makeMessage(n++) as any));
|
||||
|
||||
await testUtils.captureLog('warn', async () => {
|
||||
// Make a few sends. These are big enough not to return immediately. Keep the first two
|
||||
// successful (by awaiting them). And keep a few more that will fail. This is to test the
|
||||
// ordering of successful and failed messages that may be missed.
|
||||
sendNextMessage();
|
||||
sendNextMessage();
|
||||
sendNextMessage();
|
||||
await sendPromises[0];
|
||||
await sendPromises[1];
|
||||
|
||||
sendNextMessage();
|
||||
sendNextMessage();
|
||||
|
||||
// Forcibly close the forwarder, so that the server sees a 'close' event. But first let
|
||||
// some messages get to the client. In case we want all sends to succeed, let them all get
|
||||
// forwarded before disconnect; otherwise, disconnect after 2 are fowarded.
|
||||
const countToWaitFor = options.noFailedSend ? 5 : 2;
|
||||
await waitForCondition(() => eventSpy.callCount >= countToWaitFor);
|
||||
|
||||
void(forwarder.disconnectServerSide());
|
||||
|
||||
// Wait less than the delay that we add for delayFailedSend, and send another message. There
|
||||
// used to be a bug that such a message would get recorded into missedMessages out of order.
|
||||
await delay(50);
|
||||
sendNextMessage();
|
||||
|
||||
// Now reconnect, and collect the messages that the client sees.
|
||||
clientConnectSpy.resetHistory();
|
||||
await forwarder.connect();
|
||||
|
||||
// Wait until we get a clientConnect message that does not require a reload. (Except with
|
||||
// noFailedSend, the first one would have needReload set; and after the reconnect, we should
|
||||
// get one without.)
|
||||
await waitForCondition(() =>
|
||||
(clientConnectSpy.callCount > 0 && clientConnectSpy.lastCall.args[0].needReload === false),
|
||||
3000);
|
||||
});
|
||||
|
||||
// This test helper is used for 3 different situations. Check that we observed that
|
||||
// situations we were trying to hit.
|
||||
if (options.noFailedSend) {
|
||||
assert.deepEqual(eventsSeen, ['close']);
|
||||
} else if (options.closeHappensFirst) {
|
||||
assert.equal(eventsSeen[0], 'close');
|
||||
assert.include(eventsSeen, 'failedSend');
|
||||
} else {
|
||||
assert.equal(eventsSeen[0], 'failedSend');
|
||||
assert.include(eventsSeen, 'close');
|
||||
}
|
||||
|
||||
// After a successful reconnect, subsequent calls should work normally.
|
||||
assert.deepEqual(await cliComm._makeRequest(null, null, "methodSync", 1, 2),
|
||||
{name: 'methodSync', x: 1, y: 2});
|
||||
|
||||
// Check that all the received messages are in order.
|
||||
const messageNums = eventSpy.getCalls().map(call => call.args[0].n);
|
||||
assert.isAtLeast(messageNums.length, 2);
|
||||
assert.deepEqual(messageNums, nrange(0, messageNums.length),
|
||||
`Unexpected message sequence ${JSON.stringify(messageNums)}`);
|
||||
|
||||
// Subsequent messages should work normally too.
|
||||
eventSpy.resetHistory();
|
||||
sendNextMessage();
|
||||
await waitForCondition(() => eventSpy.callCount > 0);
|
||||
assert.deepEqual(eventSpy.getCalls().map(call => call.args[0].n), [n - 1]);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// Waits for condFunc() to return true, for up to timeoutMs milliseconds, sleeping for stepMs
|
||||
// between checks. Returns true if succeeded, false if failed.
|
||||
async function waitForCondition(condFunc: () => boolean, timeoutMs = 1000, stepMs = 10): Promise<boolean> {
|
||||
const end = Date.now() + timeoutMs;
|
||||
while (Date.now() < end) {
|
||||
if (condFunc()) { return true; }
|
||||
await delay(stepMs);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// Returns a range of count consecutive numbers starting with start.
|
||||
function nrange(start: number, count: number): number[] {
|
||||
return Array.from(Array(count), (_, i) => start + i);
|
||||
}
|
||||
|
||||
// Returns a GristWSSettings object, for use with GristWSConnection.
|
||||
function getWSSettings(docWorkerUrl: string): GristWSSettings {
|
||||
let clientId: string = 'clientid-abc';
|
||||
let counter: number = 0;
|
||||
return {
|
||||
makeWebSocket(url: string): any { return new WebSocket(url, undefined, {}); },
|
||||
async getTimezone() { return 'UTC'; },
|
||||
getPageUrl() { return "http://localhost"; },
|
||||
async getDocWorkerUrl() { return docWorkerUrl; },
|
||||
getClientId(did: any) { return clientId; },
|
||||
getUserSelector() { return ''; },
|
||||
updateClientId(did: string, cid: string) { clientId = cid; },
|
||||
advanceCounter(): string { return String(counter++); },
|
||||
log() { (log as any).debug(...arguments); },
|
||||
warn() { (log as any).warn(...arguments); },
|
||||
};
|
||||
}
|
||||
|
||||
// We'll test reconnects by making a connection through this TcpForwarder, which we'll use to
|
||||
// simulate disconnects.
|
||||
export class TcpForwarder {
|
||||
public port: number|null = null;
|
||||
private _connections = new Map<Socket, Socket>();
|
||||
private _server: Server|null = null;
|
||||
|
||||
constructor(private _serverPort: number) {}
|
||||
|
||||
public async pickForwarderPort(): Promise<number> {
|
||||
this.port = await getAvailablePort(5834);
|
||||
return this.port;
|
||||
}
|
||||
public async connect() {
|
||||
await this.disconnect();
|
||||
this._server = new Server((sock) => this._onConnect(sock));
|
||||
await new Promise((resolve, reject) =>
|
||||
this._server!.on('error', reject).listen(this.port, resolve));
|
||||
}
|
||||
public async disconnectClientSide() {
|
||||
await Promise.all(Array.from(this._connections.keys(), destroySock));
|
||||
if (this._server) {
|
||||
await new Promise((resolve) => this._server!.close(resolve));
|
||||
this._server = null;
|
||||
}
|
||||
this.cleanup();
|
||||
}
|
||||
public async disconnectServerSide() {
|
||||
await Promise.all(Array.from(this._connections.values(), destroySock));
|
||||
this.cleanup();
|
||||
}
|
||||
public async disconnect() {
|
||||
await this.disconnectClientSide();
|
||||
await this.disconnectServerSide();
|
||||
}
|
||||
public cleanup() {
|
||||
const pairs = Array.from(this._connections.entries());
|
||||
for (const [clientSock, serverSock] of pairs) {
|
||||
if (clientSock.destroyed && serverSock.destroyed) {
|
||||
this._connections.delete(clientSock);
|
||||
}
|
||||
}
|
||||
}
|
||||
private async _onConnect(clientSock: Socket) {
|
||||
const serverSock = new Socket();
|
||||
await new Promise((resolve, reject) =>
|
||||
serverSock.on('error', reject).connect(this._serverPort, resolve));
|
||||
clientSock.pipe(serverSock);
|
||||
serverSock.pipe(clientSock);
|
||||
clientSock.on('error', (err) => serverSock.destroy(err));
|
||||
serverSock.on('error', (err) => clientSock.destroy(err));
|
||||
this._connections.set(clientSock, serverSock);
|
||||
}
|
||||
}
|
||||
|
||||
async function destroySock(sock: Socket): Promise<void> {
|
||||
if (!sock.destroyed) {
|
||||
await new Promise((resolve, reject) =>
|
||||
sock.on('close', resolve).destroy());
|
||||
}
|
||||
}
|
@ -1,5 +1,5 @@
|
||||
import {getAppRoot} from 'app/server/lib/places';
|
||||
import {fromCallback} from 'bluebird';
|
||||
import {fromCallback} from 'app/server/lib/serverUtils';
|
||||
import * as express from 'express';
|
||||
import * as http from 'http';
|
||||
import {AddressInfo, Socket} from 'net';
|
||||
@ -8,7 +8,7 @@ import {fixturesRoot} from 'test/server/testUtils';
|
||||
|
||||
export interface Serving {
|
||||
url: string;
|
||||
shutdown: () => void;
|
||||
shutdown: () => Promise<void>;
|
||||
}
|
||||
|
||||
|
||||
@ -45,7 +45,7 @@ export function serveCustomViews(): Promise<Serving> {
|
||||
export async function serveSomething(setup: (app: express.Express) => void, port= 0): Promise<Serving> {
|
||||
const app = express();
|
||||
const server = http.createServer(app);
|
||||
await fromCallback((cb: any) => server.listen(port, cb));
|
||||
await fromCallback(cb => server.listen(port, cb));
|
||||
|
||||
const connections = new Set<Socket>();
|
||||
server.on('connection', (conn) => {
|
||||
@ -53,8 +53,8 @@ export async function serveSomething(setup: (app: express.Express) => void, port
|
||||
conn.on('close', () => connections.delete(conn));
|
||||
});
|
||||
|
||||
function shutdown() {
|
||||
server.close();
|
||||
async function shutdown() {
|
||||
await fromCallback(cb => server.close(cb));
|
||||
for (const conn of connections) { conn.destroy(); }
|
||||
}
|
||||
|
||||
|
@ -2596,7 +2596,7 @@ function testDocApi() {
|
||||
|
||||
after(async function() {
|
||||
if (!process.env.TEST_REDIS_URL) { this.skip(); }
|
||||
serving.shutdown();
|
||||
await serving.shutdown();
|
||||
await redisMonitor.quitAsync();
|
||||
});
|
||||
|
||||
|
@ -137,7 +137,9 @@ export async function captureLog(minLevel: string, callback: () => void|Promise<
|
||||
}
|
||||
}
|
||||
|
||||
if (!process.env.VERBOSE) {
|
||||
log.transports.file.level = -1 as any; // Suppress all log output.
|
||||
}
|
||||
log.add(CaptureTransport as any, { captureFunc: capture, name }); // types are off.
|
||||
try {
|
||||
await callback();
|
||||
|
Loading…
Reference in New Issue
Block a user