diff --git a/app/client/components/Comm.ts b/app/client/components/Comm.ts index 67e727a7..30bcfa74 100644 --- a/app/client/components/Comm.ts +++ b/app/client/components/Comm.ts @@ -16,118 +16,26 @@ * * Implementation * -------------- - * Messages are serialized as follows. Note that this is a matter between the client's and the - * server's communication libraries, and code outside of them should not rely on these details. - * Requests: { - * reqId: Number, - * method: String, - * args: Array - * } - * Responses: { - * reqId: Number, // distinguishes responses from async messages - * error: String // if the request failed - * data: Object // if the request succeeded, may be undefined if nothing to return - * } - * Async messages from server: { - * type: String, // 'docListAction' or 'docUserAction' or 'clientConnect' - * docFD: Number, // For 'docUserAction', the file descriptor of the open document. - * data: Object // The message data. - * // other keys may exist depending on message type. - * } + * Messages are serialized as JSON using types CommRequest, CommResponse, CommResponseError for + * method calls, and CommMessage for async messages from the server. These are all defined in + * app/common/CommTypes. Note that this is a matter between the client's and the server's + * communication libraries, and code outside of them should not rely on these details. */ import {GristWSConnection} from 'app/client/components/GristWSConnection'; import * as dispose from 'app/client/lib/dispose'; +import {CommMessage, CommRequest, CommResponse, CommResponseError, ValidEvent} from 'app/common/CommTypes'; import {UserAction} from 'app/common/DocActions'; import {DocListAPI, OpenLocalDocResult} from 'app/common/DocListAPI'; import {GristServerAPI} from 'app/common/GristServerAPI'; -import {StringUnion} from 'app/common/StringUnion'; import {getInitialDocAssignment} from 'app/common/urlUtils'; import {Events as BackboneEvents} from 'backbone'; -// tslint:disable:no-console - -/** - * Event for a change to the document list. - * These are sent to all connected clients, regardless of which documents they have open. - * TODO: implement and document. - * @event docListAction - */ - -/** - * Event for a user action on a document, or part of one. Sent to all clients that have this - * document open. - * @event docUserAction - * @property {Number} docFD - The file descriptor of the open document, specific to each client. - * @property {Array} data.actionGroup - ActionGroup object containing user action, and doc actions. - * @property {Boolean} fromSelf - Flag to indicate whether the action originated from this client. - */ - -/** - * Event for a change to document usage. Sent to all clients that have this document open. - * @event docUsage - * @property {Number} docFD - The file descriptor of the open document, specific to each client. - * @property {FilteredDocUsageSummary} data.docUsage - Document usage summary. - * @property {Product} data.product - Product that was used to compute `data.docUsage` - */ - -/** - * Event for when a document is forcibly shutdown, and requires the client to re-open it. - * @event docShutdown - * @property {Number} docFD - The file descriptor of the open document, specific to each client. - */ - -/** - * Event sent by server received when a client first connects. - * @event clientConnect - * @property {Number} clientId - The ID for the client, which may be reused if a client reconnects - * to reattach to its state on the server. - * @property {Number} missedMessages - Array of messages missed from the server. - * @property {Object} settings - Object containing server settings and features which - * should be used to initialize the client. - * @property {Object} profile - Object containing session profile information if the user - * is signed in, or null otherwise. See "clientLogin" message below for fields. - */ - -/** - * Event sent by server to all clients in the session when the updated profile is retrieved. - * Does not necessarily contain all properties, may only include updated properties. - * Gets sent on login with all properties. - * @event profileFetch - * @property {String} email User email. - * @property {String} name User name, - * @property {String} imageUrl The url of the user's profile image. - */ - -/** - * Event sent by server to all clients in the session when the user settings are updated. - * @event userSettings - * @property {Object} features - Object containing feature flags such as login, indicating - * which features are activated. - */ - -/** - * Event sent by server to all clients in the session when a client logs out. - * @event clientLogout - */ - -/** - * Event sent by server to all clients when an invite is received or for all invites received - * while away when a user logs in. - * @event receiveInvites - * @property {Number} data - An array of unread invites (see app/common/sharing). - */ - -const ValidEvent = StringUnion('docListAction', 'docUserAction', 'docShutdown', 'docError', - 'docUsage', 'clientConnect', 'clientLogout', - 'profileFetch', 'userSettings', 'receiveInvites'); -type ValidEvent = typeof ValidEvent.type; - /** * A request that is currently being processed. */ export interface CommRequestInFlight { - resolve: (result: any) => void; + resolve: (result: unknown) => void; reject: (err: Error) => void; // clientId is non-null for those requests which should not be re-sent on reconnect if // the clientId has changed; it is null when it's safe to re-send. @@ -138,50 +46,10 @@ export interface CommRequestInFlight { sent: boolean; } -/** - * A request in the appropriate form for sending to the server. - */ -export interface CommRequest { - reqId: number; - method: string; - args: any[]; -} - -/** - * A regular, successful response from the server. - */ -export interface CommResponse { - reqId: number; - data: any; - error?: null; // TODO: keep until sure server never sets this on regular responses. -} - -/** - * An exceptional response from the server when there is an error. - */ -export interface CommResponseError { - reqId: number; - error: string; - errorCode: string; - shouldFork?: boolean; // if set, the server suggests forking the document. - details?: any; // if set, error has extra details available. TODO - the treatment of - // details could do with some harmonisation between rest API and ws API, - // and between front-end and back-end types. -} - function isCommResponseError(msg: CommResponse | CommResponseError): msg is CommResponseError { return Boolean(msg.error); } -/** - * A message pushed from the server, not in response to a request. - */ -export interface CommMessage { - type: ValidEvent; - docFD: number; - data: any; -} - /** * Comm object provides the interfaces to communicate with the server. * Each method that calls to the server returns a promise for the response. diff --git a/app/client/components/DocComm.ts b/app/client/components/DocComm.ts index b03f043c..fbacbd91 100644 --- a/app/client/components/DocComm.ts +++ b/app/client/components/DocComm.ts @@ -1,35 +1,14 @@ -import {Comm, CommMessage} from 'app/client/components/Comm'; +import {Comm} from 'app/client/components/Comm'; import {reportError, reportMessage} from 'app/client/models/errors'; import {Notifier} from 'app/client/models/NotifyModel'; -import {ActionGroup} from 'app/common/ActionGroup'; import {ActiveDocAPI, ApplyUAOptions, ApplyUAResult} from 'app/common/ActiveDocAPI'; -import {DocAction, UserAction} from 'app/common/DocActions'; +import {CommMessage} from 'app/common/CommTypes'; +import {UserAction} from 'app/common/DocActions'; import {OpenLocalDocResult} from 'app/common/DocListAPI'; -import {FilteredDocUsageSummary} from 'app/common/DocUsage'; -import {Product} from 'app/common/Features'; import {docUrl} from 'app/common/urlUtils'; import {Events as BackboneEvents} from 'backbone'; import {Disposable, Emitter} from 'grainjs'; -// tslint:disable:no-console - -export interface DocUserAction extends CommMessage { - fromSelf?: boolean; - data: { - docActions: DocAction[]; - actionGroup: ActionGroup; - docUsage: FilteredDocUsageSummary; - error?: string; - }; -} - -export interface DocUsageMessage extends CommMessage { - data: { - docUsage: FilteredDocUsageSummary; - product?: Product; - }; -} - const SLOW_NOTIFICATION_TIMEOUT_MS = 1000; // applies to user actions only /** diff --git a/app/client/components/GristDoc.ts b/app/client/components/GristDoc.ts index ae3a9ae5..4fdf80da 100644 --- a/app/client/components/GristDoc.ts +++ b/app/client/components/GristDoc.ts @@ -11,7 +11,7 @@ import {CodeEditorPanel} from 'app/client/components/CodeEditorPanel'; import * as commands from 'app/client/components/commands'; import {CursorPos} from 'app/client/components/Cursor'; import {CursorMonitor, ViewCursorPos} from "app/client/components/CursorMonitor"; -import {DocComm, DocUsageMessage, DocUserAction} from 'app/client/components/DocComm'; +import {DocComm} from 'app/client/components/DocComm'; import * as DocConfigTab from 'app/client/components/DocConfigTab'; import {Drafts} from "app/client/components/Drafts"; import {EditorMonitor} from "app/client/components/EditorMonitor"; @@ -51,6 +51,7 @@ import {FieldEditor} from "app/client/widgets/FieldEditor"; import {MinimalActionGroup} from 'app/common/ActionGroup'; import {ClientQuery} from "app/common/ActiveDocAPI"; import {delay} from 'app/common/delay'; +import {CommDocUsage, CommDocUserAction} from 'app/common/CommTypes'; import {DisposableWithEvents} from 'app/common/DisposableWithEvents'; import {isSchemaAction, UserAction} from 'app/common/DocActions'; import {OpenLocalDocResult} from 'app/common/DocListAPI'; @@ -441,7 +442,7 @@ export class GristDoc extends DisposableWithEvents { * Process actions received from the server by forwarding them to `docData.receiveAction()` and * pushing them to actionLog. */ - public onDocUserAction(message: DocUserAction) { + public onDocUserAction(message: CommDocUserAction) { console.log("GristDoc.onDocUserAction", message); let schemaUpdated = false; /** @@ -489,7 +490,7 @@ export class GristDoc extends DisposableWithEvents { * Process usage and product received from the server by updating their respective * observables. */ - public onDocUsageMessage(message: DocUsageMessage) { + public onDocUsageMessage(message: CommDocUsage) { if (!this.docComm.isActionFromThisDoc(message)) { return; } bundleChanges(() => { diff --git a/app/client/ui/App.ts b/app/client/ui/App.ts index 0a5306be..b7238ab9 100644 --- a/app/client/ui/App.ts +++ b/app/client/ui/App.ts @@ -15,6 +15,7 @@ import {createAppUI} from 'app/client/ui/AppUI'; import {addViewportTag} from 'app/client/ui/viewport'; import {attachCssRootVars} from 'app/client/ui2018/cssVars'; import {BaseAPI} from 'app/common/BaseAPI'; +import {CommDocError} from 'app/common/CommTypes'; import {DisposableWithEvents} from 'app/common/DisposableWithEvents'; import {fetchFromHome} from 'app/common/urlUtils'; import {ISupportedFeatures} from 'app/common/UserConfig'; @@ -158,7 +159,7 @@ export class App extends DisposableWithEvents { setTimeout(() => this.reloadPane(), 0); }); - this.listenTo(this.comm, 'docError', (msg) => { + this.listenTo(this.comm, 'docError', (msg: CommDocError) => { this._checkError(new Error(msg.data.message)); }); diff --git a/app/common/CommTypes.ts b/app/common/CommTypes.ts new file mode 100644 index 00000000..fc925669 --- /dev/null +++ b/app/common/CommTypes.ts @@ -0,0 +1,146 @@ +import {ActionGroup} from 'app/common/ActionGroup'; +import {DocAction} from 'app/common/DocActions'; +import {FilteredDocUsageSummary} from 'app/common/DocUsage'; +import {Product} from 'app/common/Features'; +import {StringUnion} from 'app/common/StringUnion'; +import {UserProfile} from 'app/common/LoginSessionAPI'; + +export const ValidEvent = StringUnion('docListAction', 'docUserAction', 'docShutdown', 'docError', + 'docUsage', 'clientConnect'); +export type ValidEvent = typeof ValidEvent.type; + + +/** + * A request in the appropriate form for sending to the server. + */ +export interface CommRequest { + reqId: number; + method: string; + args: any[]; +} + +/** + * A regular, successful response from the server. + */ +export interface CommResponse { + reqId: number; + data: any; + error?: null; // TODO: keep until sure server never sets this on regular responses. +} + +/** + * An exceptional response from the server when there is an error. + */ +export interface CommResponseError { + reqId: number; + error: string; + errorCode?: string; + shouldFork?: boolean; // if set, the server suggests forking the document. + details?: any; // if set, error has extra details available. TODO - the treatment of + // details could do with some harmonisation between rest API and ws API, + // and between front-end and back-end types. +} + +/** + * A message pushed from the server, not in response to a request. + */ +export interface CommMessageBase { + type: ValidEvent; + docFD?: number; + data?: unknown; +} + +export type CommDocMessage = CommDocUserAction | CommDocUsage | CommDocShutdown | CommDocError; +export type CommMessage = CommDocMessage | CommDocListAction | CommClientConnect; + +export type CommDocEventType = CommDocMessage['type'] + +/** + * Event for a change to the document list. + * These are sent to all connected clients, regardless of which documents they have open. + * TODO: This is entirely unused at the moment. + */ +export interface CommDocListAction extends CommMessageBase { + type: 'docListAction'; + addDocs?: string[]; // names of documents to add to the docList. + removeDocs?: string[]; // names of documents that got removed. + renameDocs?: string[]; // [oldName, newName] pairs for renamed docs. + addInvites?: string[]; // document invite names to add. + removeInvites?: string[]; // documents invite names to remove. +} + +/** + * Event for a user action on a document, or part of one. Sent to all clients that have this + * document open. + */ +export interface CommDocUserAction extends CommMessageBase { + type: 'docUserAction'; + docFD: number; // The file descriptor of the open document, specific to each client. + fromSelf?: boolean; // Flag to indicate whether the action originated from this client. + + // ActionGroup object containing user action, and doc actions. + data: { + docActions: DocAction[]; + actionGroup: ActionGroup; + docUsage: FilteredDocUsageSummary; + error?: string; + }; +} + +/** + * Event for a change to document usage. Sent to all clients that have this document open. + */ +export interface CommDocUsage extends CommMessageBase { + type: 'docUsage'; + docFD: number; // The file descriptor of the open document, specific to each client. + data: { + docUsage: FilteredDocUsageSummary; // Document usage summary. + product?: Product; //Product that was used to compute `data.docUsage` + }; +} + +/** + * Event for when a document is forcibly shutdown, and requires the client to re-open it. + */ +export interface CommDocShutdown extends CommMessageBase { + type: 'docShutdown'; + docFD: number; + data: null; +} + +/** + * Event that signals an error while opening a doc. + */ +export interface CommDocError extends CommMessageBase { + type: 'docError'; + docFD: number; + data: { + when: string; + message: string; + } +} + +/** + * Event sent by server received when a client first connects. + */ +export interface CommClientConnect extends CommMessageBase { + type: 'clientConnect'; + + // ID for the client, which may be reused if a client reconnects to reattach to its state on + // the server. + clientId: string; + + // Array of serialized messages missed from the server while disconnected. + missedMessages: string[]; + + // Which version the server reports for itself. + serverVersion?: string; + + // Object containing server settings and features which should be used to initialize the client. + settings?: {[key: string]: unknown}; + + // Object containing session profile information if the user is signed in, or null otherwise. + profile: UserProfile|null; + + dup?: boolean; // Flag that's set to true when it's a duplicate clientConnect message. +} diff --git a/app/server/declarations.d.ts b/app/server/declarations.d.ts index 327b87fd..51b20a94 100644 --- a/app/server/declarations.d.ts +++ b/app/server/declarations.d.ts @@ -2,31 +2,6 @@ declare module "app/server/lib/ActionLog"; declare module "app/server/lib/sandboxUtil"; declare module "app/server/lib/User"; -declare module "app/server/lib/Comm" { - import {Client, ClientMethod} from "app/server/lib/Client"; - import {ScopedSession} from "app/server/lib/BrowserSession"; - import * as http from "http"; - - class Comm { - constructor(server: http.Server, options: any); - public broadcastMessage(type: string, messageData: any): void; - public destroyAllClients(): void; - public setServerVersion(serverVersion: string|null): void; - public setServerActivation(active: boolean): void; - public getSessionIdFromCookie(gristSidCookie: string): string; - public getOrCreateSession(sessionId: string, req: any): ScopedSession; - public registerMethods(methods: {[name: string]: ClientMethod}): void; - public getClient(clientId: string): Client; - public testServerShutdown(): Promise; - public testServerRestart(): Promise; - public testSetClientPersistence(ttlMs: number): void; - } - namespace Comm { - function sendDocMessage(client: Client, docFD: number, type: string, mesageData: any, fromSelf: boolean): void; - } - export = Comm; -} - declare module "app/server/lib/shutdown" { export function addCleanupHandler(context: T, method: (this: T) => void, timeout?: number, name?: string): void; export function removeCleanupHandlers(context: T): void; @@ -91,9 +66,6 @@ declare module "app-module-path" { export function addPath(path: string): void; } -// Used in tests -declare module "ws"; - // version of pidusage that has correct ctime on linux declare module '@gristlabs/pidusage' { import * as pidusage from 'pidusage'; diff --git a/app/server/lib/Client.ts b/app/server/lib/Client.ts index 35fe359d..2ded558c 100644 --- a/app/server/lib/Client.ts +++ b/app/server/lib/Client.ts @@ -1,5 +1,7 @@ import {ApiError} from 'app/common/ApiError'; import {BrowserSettings} from 'app/common/BrowserSettings'; +import {delay} from 'app/common/delay'; +import {CommClientConnect, CommMessage, CommResponse, CommResponseError} from 'app/common/CommTypes'; import {ErrorWithCode} from 'app/common/ErrorWithCode'; import {UserProfile} from 'app/common/LoginSessionAPI'; import {ANONYMOUS_USER_EMAIL} from 'app/common/UserAPI'; @@ -8,17 +10,26 @@ import {HomeDBManager} from 'app/gen-server/lib/HomeDBManager'; import {ActiveDoc} from 'app/server/lib/ActiveDoc'; import {Authorizer} from 'app/server/lib/Authorizer'; import {ScopedSession} from 'app/server/lib/BrowserSession'; +import type {Comm} from 'app/server/lib/Comm'; import {DocSession} from 'app/server/lib/DocSession'; import * as log from 'app/server/lib/log'; import {LogMethods} from "app/server/lib/LogMethods"; import {shortDesc} from 'app/server/lib/shortDesc'; +import {fromCallback} from 'app/server/lib/serverUtils'; import * as crypto from 'crypto'; import * as moment from 'moment'; +import * as WebSocket from 'ws'; /// How many messages to accumulate for a disconnected client before booting it. const clientMaxMissedMessages = 100; -export type ClientMethod = (client: Client, ...args: any[]) => Promise; +export type ClientMethod = (client: Client, ...args: any[]) => Promise; + +// How long the client state persists after a disconnect. +const clientRemovalTimeoutMs = 300 * 1000; // 300s = 5 minutes. + +// A hook for dependency injection. +export const Deps = {clientRemovalTimeoutMs}; /** * Generates and returns a random string to use as a clientId. This is better @@ -41,9 +52,6 @@ function generateClientId(): string { */ const MESSAGE_TYPES_NO_AUTH = new Set([ 'clientConnect', - 'profileFetch', - 'userSettings', - 'clientLogout', ]); // tslint:disable-next-line:no-unused-expression Silence "unused variable" warning. @@ -55,7 +63,7 @@ void(MESSAGE_TYPES_NO_AUTH); * TODO: this could provide a cleaner interface. * * @param comm: parent Comm object - * @param websocket: websocket connection, promisified to have a sendAsync method + * @param websocket: websocket connection * @param methods: a mapping from method names to server methods (must return promises) */ export class Client { @@ -70,45 +78,40 @@ export class Client { // Maps docFDs to DocSession objects. private _docFDs: Array = []; - private _missedMessages: any = []; + private _missedMessages: string[] = []; private _destroyTimer: NodeJS.Timer|null = null; private _destroyed: boolean = false; - private _websocket: any; + private _websocket: WebSocket|null; private _org: string|null = null; private _profile: UserProfile|null = null; private _userId: number|null = null; private _userName: string|null = null; private _firstLoginAt: Date|null = null; private _isAnonymous: boolean = false; - // Identifier for the current GristWSConnection object connected to this client. - private _counter: string|null = null; constructor( - private _comm: any, - private _methods: any, - private _host: string, - private _locale?: string, + private _comm: Comm, + private _methods: Map, + private _locale: string, + // Identifier for the current GristWSConnection object connected to this client. + private _counter: string|null, ) { this.clientId = generateClientId(); } public toString() { return `Client ${this.clientId} #${this._counter}`; } - public setCounter(counter: string) { - this._counter = counter; - } - - public get host(): string { - return this._host; - } - public get locale(): string|undefined { return this._locale; } - public setConnection(websocket: any, reqHost: string, browserSettings: BrowserSettings) { + public setConnection(websocket: WebSocket, browserSettings: BrowserSettings) { this._websocket = websocket; this.browserSettings = browserSettings; + + websocket.on('error', this._onError.bind(this)); + websocket.on('close', this._onClose.bind(this)); + websocket.on('message', this._onMessage.bind(this)); } /** @@ -179,7 +182,7 @@ export class Client { /** * Sends a message to the client, queuing it up on failure or if the client is disconnected. */ - public async sendMessage(messageObj: any): Promise { + public async sendMessage(messageObj: CommMessage|CommResponse|CommResponseError): Promise { if (this._destroyed) { return; } @@ -187,14 +190,14 @@ export class Client { const message: string = JSON.stringify(messageObj); // Log something useful about the message being sent. - if (messageObj.error) { + if ('error' in messageObj && messageObj.error) { this._log.warn(null, "responding to #%d ERROR %s", messageObj.reqId, messageObj.error); } if (this._websocket) { // If we have a websocket, send the message. try { - await this._websocket.sendAsync(message); + await this._sendToWebsocket(message); } catch (err) { // Sending failed. Presumably we should be getting onClose around now too. // NOTE: if this handler is run after onClose, we could have messages end up out of order. @@ -215,7 +218,69 @@ export class Client { clearTimeout(this._destroyTimer); this._destroyTimer = null; } - this._comm._destroyClient(this); + this.destroy(); + } + } + + /** + * Called from Comm.ts to prepare this Client object to accept a new connection that requests + * the same clientId. Returns whether this Client is available and ready for this connection. + */ + public async reconnect(counter: string|null, newClient: boolean): Promise { + // Refuse reconnect if another websocket is currently active. It may be a new browser tab, + // and will need its own Client object. + if (this._websocket) { return false; } + + // Don't reuse this Client object if it's no longer authorized to access the open documents. + if (!await this.isAuthorized()) { return false; } + + this._counter = counter; + + this._log.info(null, "existing client reconnected (%d missed messages)", this._missedMessages.length); + if (this._destroyTimer) { + this._log.warn(null, "clearing scheduled destruction"); + clearTimeout(this._destroyTimer); + this._destroyTimer = null; + } + if (newClient) { + // If newClient is set, then we assume that the browser client lost its state (e.g. + // reloaded the page), so we treat it as a disconnect followed by a new connection to the + // same state. At the moment, this only means that we close all docs. + if (this._missedMessages.length) { + this._log.warn(null, "clearing missed messages for new client"); + } + this._missedMessages.length = 0; + this.closeAllDocs(); + } + return true; + } + + /** + * Send the initial 'clientConnect' message after receiving a connection. + */ + public async sendConnectMessage(parts: Partial): Promise { + this._log.debug(null, `sending clientConnect with ${this._missedMessages.length} missed messages`); + // Don't use sendMessage here, since we don't want to queue up this message on failure. + const clientConnectMsg: CommClientConnect = { + ...parts, + type: 'clientConnect', + clientId: this.clientId, + missedMessages: this._missedMessages.slice(0), + profile: this._profile, + }; + // If reconnecting a client with missed messages, clear them now. + this._missedMessages.length = 0; + + await this._sendToWebsocket(JSON.stringify(clientConnectMsg)); + // A heavy-handed fix to T396, since 'clientConnect' is sometimes not seen in the browser, + // (seemingly when the 'message' event is triggered before 'open' on the native WebSocket.) + // See also my report at https://stackoverflow.com/a/48411315/328565 + await delay(250); + + if (this._destroyed || this._websocket?.readyState !== WebSocket.OPEN) { + this._log.debug(null, `websocket closed right after clientConnect`); + } else { + await this._sendToWebsocket(JSON.stringify({...clientConnectMsg, dup: true})); } } @@ -232,59 +297,18 @@ export class Client { return this._session?.getAltSessionId(); } - public destroy() { - this._destroyed = true; - } - /** - * Processes a request from a client. All requests from a client get a response, at least to - * indicate success or failure. + * Destroys a client. If the same browser window reconnects later, it will get a new Client + * object and clientId. */ - public async onMessage(message: string): Promise { - const request = JSON.parse(message); - if (request.beat) { - // this is a heart beat, to keep the websocket alive. No need to reply. - log.rawInfo('heartbeat', { - ...this.getLogMeta(), - url: request.url, - docId: request.docId, // caution: trusting client for docId for this purpose. - }); - return; - } else { - this._log.info(null, "onMessage", shortDesc(message)); - } - const response: any = {reqId: request.reqId}; - const method = this._methods[request.method]; - if (!method) { - response.error = `Unknown method ${request.method}`; - } else { - try { - response.data = await method(this, ...request.args); - } catch (error) { - const err: ErrorWithCode = error; - // Print the error stack, except for SandboxErrors, for which the JS stack isn't that useful. - // Also not helpful is the stack of AUTH_NO_VIEW|EDIT errors produced by the Authorizer. - const code: unknown = err.code; - const skipStack = ( - !err.stack || - err.stack.match(/^SandboxError:/) || - (typeof code === 'string' && code.startsWith('AUTH_NO')) - ); - - this._log.warn(null, "Error %s %s", skipStack ? err : err.stack, code || ''); - response.error = err.message; - if (err.code) { - response.errorCode = err.code; - } - if (err.details) { - response.details = err.details; - } - if (typeof code === 'string' && code === 'AUTH_NO_EDIT' && err.accessMode === 'fork') { - response.shouldFork = true; - } - } + public destroy() { + this._log.info(null, "client gone"); + this.closeAllDocs(); + if (this._destroyTimer) { + clearTimeout(this._destroyTimer); } - await this.sendMessage(response); + this._comm.removeClient(this); + this._destroyed = true; } public setOrg(org: string): void { @@ -325,22 +349,10 @@ export class Client { } : null; } - public async getSessionProfile(): Promise { - return this._session?.getSessionProfile(); - } - - public async getSessionEmail(): Promise { - return (await this.getSessionProfile())?.email || null; - } - public getCachedUserId(): number|null { return this._userId; } - public isAnonymous(): boolean { - return this._isAnonymous; - } - // Returns the userId for profile.email, or null when profile is not set; with caching. public async getUserId(dbManager: HomeDBManager): Promise { if (!this._userId) { @@ -385,6 +397,57 @@ export class Client { return meta; } + /** + * Processes a request from a client. All requests from a client get a response, at least to + * indicate success or failure. + */ + private async _onMessage(message: string): Promise { + const request = JSON.parse(message); + if (request.beat) { + // this is a heart beat, to keep the websocket alive. No need to reply. + log.rawInfo('heartbeat', { + ...this.getLogMeta(), + url: request.url, + docId: request.docId, // caution: trusting client for docId for this purpose. + }); + return; + } else { + this._log.info(null, "onMessage", shortDesc(message)); + } + let response: CommResponse|CommResponseError; + const method = this._methods.get(request.method); + if (!method) { + response = {reqId: request.reqId, error: `Unknown method ${request.method}`}; + } else { + try { + response = {reqId: request.reqId, data: await method(this, ...request.args)}; + } catch (error) { + const err: ErrorWithCode = error; + // Print the error stack, except for SandboxErrors, for which the JS stack isn't that useful. + // Also not helpful is the stack of AUTH_NO_VIEW|EDIT errors produced by the Authorizer. + const code: unknown = err.code; + const skipStack = ( + !err.stack || + err.stack.match(/^SandboxError:/) || + (typeof code === 'string' && code.startsWith('AUTH_NO')) + ); + + this._log.warn(null, "Error %s %s", skipStack ? err : err.stack, code || ''); + response = {reqId: request.reqId, error: err.message}; + if (err.code) { + response.errorCode = err.code; + } + if (err.details) { + response.details = err.details; + } + if (typeof code === 'string' && code === 'AUTH_NO_EDIT' && err.accessMode === 'fork') { + response.shouldFork = true; + } + } + } + await this.sendMessage(response); + } + // Fetch the user database record from profile.email, or null when profile is not set. private async _fetchUser(dbManager: HomeDBManager): Promise { return this._profile && this._profile.email ? @@ -398,4 +461,36 @@ export class Client { while (this._docFDs[fd]) { fd++; } return fd; } + + private _sendToWebsocket(message: string): Promise { + return fromCallback(cb => this._websocket!.send(message, cb)); + } + + /** + * Processes an error on the websocket. + */ + private _onError(err: unknown) { + this._log.warn(null, "onError", err); + // TODO Make sure that this is followed by onClose when the connection is lost. + } + + /** + * Processes the closing of a websocket. + */ + private _onClose() { + this._log.info(null, "onClose"); + this._websocket?.removeAllListeners(); + + // Remove all references to the websocket. + this._websocket = null; + + // Schedule the client to be destroyed after a timeout. The timer gets cleared if the same + // client reconnects in the interim. + if (this._destroyTimer) { + this._log.warn(null, "clearing previously scheduled destruction"); + clearTimeout(this._destroyTimer); + } + this._log.warn(null, "will discard client in %s sec", Deps.clientRemovalTimeoutMs / 1000); + this._destroyTimer = setTimeout(() => this.destroy(), Deps.clientRemovalTimeoutMs); + } } diff --git a/app/server/lib/Comm.js b/app/server/lib/Comm.js deleted file mode 100644 index 6e7eaca4..00000000 --- a/app/server/lib/Comm.js +++ /dev/null @@ -1,411 +0,0 @@ -/** - * The server's Comm object implements communication with the client. - * - * The server receives requests, to which it sends a response (or an error). The server can - * also send asynchronous messages to the client. Available methods should be provided via - * comm.registerMethods(). - * - * To send async messages, you may call broadcastMessage() or sendDocMessage(). - * - * In practice, requests which modify the document are done via UserActions.js, and result in an - * asynchronous message updating the document (which is sent to all clients who have the document - * open), and the response could return some useful value, but does not have to. - * - * See app/client/components/Comm.js for other details of the communication protocol. - * - * - * Currently, this module also implements the concept of a "Client". A Client corresponds to a - * browser window, and should persist across brief disconnects. A Client has a 'clientId' - * property, which uniquely identifies a client within the currently running server. Method - * registered with Comm always receive a Client object as the first argument. - * - * In the future, we may want to have a separate Client.js file with documentation of the various - * properties that may be associated with a client. - * - * Note that users of this module should never use the websocket of a Client, since that's an - * implementation detail of Comm.js. - */ - -/** - * Event for DocList changes. - * @event docListAction Emitted when the document list changes in any way. - * @property {Array[String]} [addDocs] Array of names of documents to add to the docList. - * @property {Array[String]} [removeDocs] Array of names of documents that got removed. - * @property {Array[String]} [renameDocs] Array of [oldName, newName] pairs for renamed docs. - * @property {Array[String]} [addInvites] Array of document invite names to add. - * @property {Array[String]} [removeInvites] Array of documents invite names to remove. - */ - - - -var events = require('events'); -var url = require('url'); -var util = require('util'); -var ws = require('ws'); -var Promise = require('bluebird'); - -var log = require('./log'); -var gutil = require('app/common/gutil'); -const {parseFirstUrlPart} = require('app/common/gristUrls'); -const version = require('app/common/version'); -const {Client} = require('./Client'); -const {localeFromRequest} = require('app/server/lib/ServerLocale'); -const {getRequestProfile} = require('app/server/lib/Authorizer'); - -// Bluebird promisification, to be able to use e.g. websocket.sendAsync method. -Promise.promisifyAll(ws.prototype); - -/// How long the client state persists after a disconnect. -var clientRemovalTimeoutMsDefault = 300 * 1000; // 300s = 5 minutes. -var clientRemovalTimeoutMs = clientRemovalTimeoutMsDefault; - -/** - * Constructs a Comm object. - * @param {Object} server - The HTTP server. - * @param {Object} options.sessions - A collection of sessions - * @param {Object} options.settings - The config object containing instance settings - * including features. - * @param {Object} options.instanceManager - Instance manager, giving access to InstanceStore - * and per-instance objects. If null, HubUserClient will not be created. - * @param {Object} options.hosts - Hosts object from extractOrg.ts. if set, we use - * hosts.getOrgInfo(req) to extract an organization from a (possibly versioned) url. - */ -function Comm(server, options) { - events.EventEmitter.call(this); - this._server = server; - this._httpsServer = options.httpsServer; - this.wss = this._startServer(); - - // Maps client IDs to websocket objects. - this._clients = {}; // Maps clientIds to Client objects. - this.clientList = []; // List of all active Clients, ordered by clientId. - - // Maps sessionIds to ScopedSession objects. - this.sessions = options.sessions; - - this._settings = options.settings; - this._hosts = options.hosts; - - // This maps method names to their implementation. - this.methods = {}; - - // For testing, we need a way to override the server version reported. - // For upgrading, we use this to set the server version for a defunct server - // to "dead" so that a client will know that it needs to periodically recheck - // for a valid server. - this._serverVersion = null; -} -util.inherits(Comm, events.EventEmitter); - - -/** - * Registers server methods. - * @param {Object[String:Function]} Mapping of method name to their implementations. All methods - * receive the client as the first argument, and the arguments from the request. - */ -Comm.prototype.registerMethods = function(serverMethods) { - // Wrap methods to translate return values and exceptions to promises. - for (var methodName in serverMethods) { - this.methods[methodName] = Promise.method(serverMethods[methodName]); - } -}; - -/** - * Returns the Client object associated with the given clientId, or throws an Error if not found. - */ -Comm.prototype.getClient = function(clientId) { - const client = this._clients[clientId]; - if (!client) { throw new Error('Unrecognized clientId'); } - return client; -}; - -/** - * Returns a ScopedSession object with the given session id from the list of sessions, - * or adds a new one and returns that. - */ -Comm.prototype.getOrCreateSession = function(sid, req, userSelector) { - // ScopedSessions are specific to a session id / org combination. - const org = req.org || ""; - return this.sessions.getOrCreateSession(sid, org, userSelector); -}; - - -/** - * Returns the sessionId from the signed grist cookie. - */ -Comm.prototype.getSessionIdFromCookie = function(gristCookie) { - return this.sessions.getSessionIdFromCookie(gristCookie); -}; - - -/** - * Broadcasts an app-level message to all clients. - * @param {String} type - Type of message, e.g. 'docListAction'. - * @param {Object} messageData - The data for this type of message. - */ -Comm.prototype.broadcastMessage = function(type, messageData) { - return this._broadcastMessage(type, messageData, this.clientList); -}; - -Comm.prototype._broadcastMessage = function(type, data, clients) { - clients.forEach(client => client.sendMessage({type, data})); -}; - - -/** - * Returns a profile based on the request or session. - */ -Comm.prototype._getSessionProfile = function(scopedSession, req) { - const profile = getRequestProfile(req); - if (profile) { - return Promise.resolve(profile); - } else { - return scopedSession.getSessionProfile(); - } -}; - - -/** - * Sends a per-doc message to the given client. - * @param {Object} client - The client object, as passed to all per-doc methods. - * @param {Number} docFD - The document's file descriptor in the given client. - * @param {String} type - The type of the message, e.g. 'docUserAction'. - * @param {Object} messageData - The data for this type of message. - * @param {Boolean} fromSelf - Whether `client` is the originator of this message. - */ -Comm.sendDocMessage = function(client, docFD, type, data, fromSelf = undefined) { - client.sendMessage({type, docFD, data, fromSelf}); -}; - -/** - * Processes a new websocket connection. - * TODO: Currently it always creates a new client, but in the future the creation of a client - * should possibly be delayed until some hello message, so that a previous client may reconnect - * without losing state. - */ -Comm.prototype._onWebSocketConnection = async function(websocket, req) { - log.info("Comm: Got WebSocket connection: %s", req.url); - if (this._hosts) { - // DocWorker ID (/dw/) and version tag (/v/) may be present in this request but are not - // needed. addOrgInfo assumes req.url starts with /o/ if present. - req.url = parseFirstUrlPart('dw', req.url).path; - req.url = parseFirstUrlPart('v', req.url).path; - await this._hosts.addOrgInfo(req); - } - - websocket.on('error', this.onError.bind(this, websocket)); - websocket.on('close', this.onClose.bind(this, websocket)); - // message handler is added later, after we create a Client but before any async operations - - // Parse the cookie in the request to get the sessionId. - var sessionId = this.sessions.getSessionIdFromRequest(req); - var urlObj = url.parse(req.url, true); - var existingClientId = urlObj.query.clientId; - var browserSettings = urlObj.query.browserSettings ? JSON.parse(urlObj.query.browserSettings) : {}; - var newClient = (parseInt(urlObj.query.newClient, 10) === 1); - const counter = urlObj.query.counter; - const userSelector = urlObj.query.user || ''; - - // Associate an ID with each websocket, reusing the supplied one if it's valid. - var client; - if (existingClientId && this._clients.hasOwnProperty(existingClientId) && - !this._clients[existingClientId]._websocket && - await this._clients[existingClientId].isAuthorized()) { - client = this._clients[existingClientId]; - client.setCounter(counter); - log.info("Comm %s: existing client reconnected (%d missed messages)", client, - client._missedMessages.length); - if (client._destroyTimer) { - log.warn("Comm %s: clearing scheduled destruction", client); - clearTimeout(client._destroyTimer); - client._destroyTimer = null; - } - if (newClient) { - // If this isn't a reconnect, then we assume that the browser client lost its state (e.g. - // reloaded the page), so we treat it as a disconnect followed by a new connection to the - // same state. At the moment, this only means that we close all docs. - if (client._missedMessages.length) { - log.warn("Comm %s: clearing missed messages for new client", client); - } - client._missedMessages.length = 0; - client.closeAllDocs(); - } - client.setConnection(websocket, req.headers.host, browserSettings); - } else { - client = new Client(this, this.methods, req.headers.host, localeFromRequest(req)); - client.setCounter(counter); - client.setConnection(websocket, req.headers.host, browserSettings); - this._clients[client.clientId] = client; - this.clientList.push(client); - log.info("Comm %s: new client", client); - } - - websocket._commClient = client; - websocket.clientId = client.clientId; - - // Add a Session object to the client. - log.info(`Comm ${client}: using session ${sessionId}`); - const scopedSession = this.getOrCreateSession(sessionId, req, userSelector); - client.setSession(scopedSession); - - // Delegate message handling to the client - websocket.on('message', client.onMessage.bind(client)); - - this._getSessionProfile(scopedSession, req) - .then((profile) => { - log.debug(`Comm ${client}: sending clientConnect with ` + - `${client._missedMessages.length} missed messages`); - // Don't use sendMessage here, since we don't want to queue up this message on failure. - client.setOrg(req.org || ""); - client.setProfile(profile); - const clientConnectMsg = { - type: 'clientConnect', - clientId: client.clientId, - serverVersion: this._serverVersion || version.gitcommit, - missedMessages: client._missedMessages.slice(0), - settings: this._settings, - profile, - }; - // If reconnecting a client with missed messages, clear them now. - client._missedMessages.length = 0; - return websocket.sendAsync(JSON.stringify(clientConnectMsg)) - // A heavy-handed fix to T396, since 'clientConnect' is sometimes not seen in the browser, - // (seemingly when the 'message' event is triggered before 'open' on the native WebSocket.) - // See also my report at https://stackoverflow.com/a/48411315/328565 - .delay(250).then(() => { - if (client._destroyed) { return; } // object is already closed - don't show messages - if (websocket.readyState === websocket.OPEN) { - return websocket.sendAsync(JSON.stringify(Object.assign(clientConnectMsg, {dup: true}))); - } else { - log.debug(`Comm ${client}: websocket closed right after clientConnect`); - } - }); - }) - .then(() => { - if (!client._destroyed) { log.debug(`Comm ${client}: clientConnect sent successfully`); } - }) - .catch(err => { - log.error(`Comm ${client}: failed to prepare or send clientConnect:`, err); - }); -}; - -/** - * Processes an error on the websocket. - */ -Comm.prototype.onError = function(websocket, err) { - log.warn("Comm cid %s: onError", websocket.clientId, err); - // TODO Make sure that this is followed by onClose when the connection is lost. -}; - -/** - * Processes the closing of a websocket. - */ -Comm.prototype.onClose = function(websocket) { - log.info("Comm cid %s: onClose", websocket.clientId); - websocket.removeAllListeners(); - - var client = websocket._commClient; - if (client) { - // Remove all references to the websocket. - client._websocket = null; - - // Schedule the client to be destroyed after a timeout. The timer gets cleared if the same - // client reconnects in the interim. - if (client._destroyTimer) { - log.warn("Comm cid %s: clearing previously scheduled destruction", websocket.clientId); - clearTimeout(client._destroyTimer); - } - log.warn("Comm cid %s: will discard client in %s sec", - websocket.clientId, clientRemovalTimeoutMs / 1000); - client._destroyTimer = setTimeout(this._destroyClient.bind(this, client), - clientRemovalTimeoutMs); - } -}; - -Comm.prototype._startServer = function() { - const servers = [this._server]; - if (this._httpsServer) { servers.push(this._httpsServer); } - const wss = []; - for (const server of servers) { - const wssi = new ws.Server({server}); - wssi.on('connection', async (websocket, req) => { - try { - await this._onWebSocketConnection(websocket, req); - } catch (e) { - log.error("Comm connection for %s threw exception: %s", req.url, e.message); - websocket.removeAllListeners(); - websocket.terminate(); // close() is inadequate when ws routed via loadbalancer - } - }); - wss.push(wssi); - } - return wss; -}; - -Comm.prototype.testServerShutdown = async function() { - if (this.wss) { - for (const wssi of this.wss) { - await Promise.fromCallback((cb) => wssi.close(cb)); - } - this.wss = null; - } -}; - -Comm.prototype.testServerRestart = async function() { - await this.testServerShutdown(); - this.wss = this._startServer(); -}; - -/** - * Destroy all clients, forcing reconnections. - */ -Comm.prototype.destroyAllClients = function() { - // Iterate over all clients. Take a copy of the list of clients since it will be changing - // during the loop as we remove them one by one. - for (const client of this.clientList.slice()) { - client.interruptConnection(); - this._destroyClient(client); - } -}; - -/** - * Destroys a client. If the same browser window reconnects later, it will get a new Client - * object and clientId. - */ -Comm.prototype._destroyClient = function(client) { - log.info("Comm %s: client gone", client); - client.closeAllDocs(); - if (client._destroyTimer) { - clearTimeout(client._destroyTimer); - } - delete this._clients[client.clientId]; - gutil.arrayRemove(this.clientList, client); - client.destroy(); -}; - -/** - * Override the version string Comm will report to clients. - * Call with null to reset the override. - * - */ -Comm.prototype.setServerVersion = function (serverVersion) { - this._serverVersion = serverVersion; -}; - -/** - * Mark the server as active or inactive. If inactive, any client that manages to - * connect to it will read a server version of "dead". - */ -Comm.prototype.setServerActivation = function (active) { - this._serverVersion = active ? null : 'dead'; -}; - -/** - * Set how long clients persist on the server after disconnection. Call with - * 0 to return to the default. - */ -Comm.prototype.testSetClientPersistence = function (ttlMs) { - clientRemovalTimeoutMs = ttlMs || clientRemovalTimeoutMsDefault; -} - -module.exports = Comm; diff --git a/app/server/lib/Comm.ts b/app/server/lib/Comm.ts new file mode 100644 index 00000000..8dbd16bb --- /dev/null +++ b/app/server/lib/Comm.ts @@ -0,0 +1,277 @@ +/** + * The server's Comm object implements communication with the client. + * + * The server receives requests, to which it sends a response (or an error). The server can + * also send asynchronous messages to the client. Available methods should be provided via + * comm.registerMethods(). + * + * To send async messages, you may call broadcastMessage() or sendDocMessage(). + * + * See app/client/components/Comm.ts for other details of the communication protocol. + * + * + * This module relies on the concept of a "Client" (see Client.ts). A Client corresponds to a + * browser window, and should persist across brief disconnects. A Client has a 'clientId' + * property, which uniquely identifies a client within the currently running server. Method + * registered with Comm always receive a Client object as the first argument. + */ + +import {EventEmitter} from 'events'; +import * as http from 'http'; +import * as https from 'https'; +import * as WebSocket from 'ws'; + +import {CommDocEventType, CommMessage} from 'app/common/CommTypes'; +import {parseFirstUrlPart} from 'app/common/gristUrls'; +import {safeJsonParse} from 'app/common/gutil'; +import {UserProfile} from 'app/common/LoginSessionAPI'; +import * as version from 'app/common/version'; +import {getRequestProfile} from 'app/server/lib/Authorizer'; +import {ScopedSession} from "app/server/lib/BrowserSession"; +import {Client, ClientMethod} from "app/server/lib/Client"; +import {Hosts, RequestWithOrg} from 'app/server/lib/extractOrg'; +import * as log from 'app/server/lib/log'; +import {localeFromRequest} from 'app/server/lib/ServerLocale'; +import {fromCallback} from 'app/server/lib/serverUtils'; +import {Sessions} from 'app/server/lib/Sessions'; + +export interface CommOptions { + sessions: Sessions; // A collection of all sessions for this instance of Grist + settings?: {[key: string]: unknown}; // The config object containing instance settings including features. + hosts?: Hosts; // If set, we use hosts.getOrgInfo(req) to extract an organization from a (possibly versioned) url. + httpsServer?: https.Server; // An optional HTTPS server to listen on too. +} + +/** + * Constructs a Comm object. + * @param {Object} server - The HTTP server. + * @param {Object} options.sessions - A collection of sessions + * @param {Object} options.settings - The config object containing instance settings + * including features. + * @param {Object} options.instanceManager - Instance manager, giving access to InstanceStore + * and per-instance objects. If null, HubUserClient will not be created. + * @param {Object} options.hosts - Hosts object from extractOrg.ts. if set, we use + * hosts.getOrgInfo(req) to extract an organization from a (possibly versioned) url. + */ +export class Comm extends EventEmitter { + // Collection of all sessions; maps sessionIds to ScopedSession objects. + public readonly sessions: Sessions; + private _wss: WebSocket.Server[]|null = null; + + // The config object containing instance settings including features. + private _settings?: {[key: string]: unknown}; + + // If set, we use hosts.getOrgInfo(req) to extract an organization from a (possibly versioned) url. + private _hosts?: Hosts; + + // An optional HTTPS server to listen on too. + private _httpsServer?: https.Server; + + private _clients = new Map(); // Maps clientIds to Client objects. + + private _methods = new Map(); // Maps method names to their implementation. + + // For testing, we need a way to override the server version reported. + // For upgrading, we use this to set the server version for a defunct server + // to "dead" so that a client will know that it needs to periodically recheck + // for a valid server. + private _serverVersion: string|null = null; + + constructor(private _server: http.Server, options: CommOptions) { + super(); + this._httpsServer = options.httpsServer; + this._wss = this._startServer(); + + this.sessions = options.sessions; + this._settings = options.settings; + this._hosts = options.hosts; + } + + /** + * Registers server methods. + * @param {Object[String:Function]} Mapping of method name to their implementations. All methods + * receive the client as the first argument, and the arguments from the request. + */ + public registerMethods(serverMethods: {[name: string]: ClientMethod}): void { + // Wrap methods to translate return values and exceptions to promises. + for (const methodName in serverMethods) { + this._methods.set(methodName, serverMethods[methodName]); + } + } + + /** + * Returns the Client object associated with the given clientId, or throws an Error if not found. + */ + public getClient(clientId: string): Client { + const client = this._clients.get(clientId); + if (!client) { throw new Error('Unrecognized clientId'); } + return client; + } + + /** + * Returns a ScopedSession object with the given session id from the list of sessions, + * or adds a new one and returns that. + */ + public getOrCreateSession(sessionId: string, req: {org?: string}, userSelector: string = ''): ScopedSession { + // ScopedSessions are specific to a session id / org combination. + const org = req.org || ""; + return this.sessions.getOrCreateSession(sessionId, org, userSelector); + } + + + /** + * Returns the sessionId from the signed grist cookie. + */ + public getSessionIdFromCookie(gristCookie: string): string|null { + return this.sessions.getSessionIdFromCookie(gristCookie) || null; + } + + /** + * Broadcasts an app-level message to all clients. Only suitable for non-doc-specific messages. + */ + public broadcastMessage(type: 'docListAction', data: unknown) { + for (const client of this._clients.values()) { + client.sendMessage({type, data}).catch(() => {}); + } + } + + public removeClient(client: Client) { + this._clients.delete(client.clientId); + } + + public async testServerShutdown() { + if (this._wss) { + for (const wssi of this._wss) { + await fromCallback((cb) => wssi.close(cb)); + } + this._wss = null; + } + } + + public async testServerRestart() { + await this.testServerShutdown(); + this._wss = this._startServer(); + } + + /** + * Destroy all clients, forcing reconnections. + */ + public destroyAllClients() { + // Iterate over all clients. Take a copy of the list of clients since it will be changing + // during the loop as we remove them one by one. + for (const client of Array.from(this._clients.values())) { + client.interruptConnection(); + client.destroy(); + } + } + + /** + * Override the version string Comm will report to clients. + * Call with null to reset the override. + * + */ + public setServerVersion(serverVersion: string|null) { + this._serverVersion = serverVersion; + } + + /** + * Mark the server as active or inactive. If inactive, any client that manages to + * connect to it will read a server version of "dead". + */ + public setServerActivation(active: boolean) { + this._serverVersion = active ? null : 'dead'; + } + + /** + * Returns a profile based on the request or session. + */ + private async _getSessionProfile(scopedSession: ScopedSession, req: http.IncomingMessage): Promise { + return getRequestProfile(req) || scopedSession.getSessionProfile(); + } + + /** + * Processes a new websocket connection, and associates the websocket and a Client object. + */ + private async _onWebSocketConnection(websocket: WebSocket, req: http.IncomingMessage) { + log.info("Comm: Got WebSocket connection: %s", req.url); + if (this._hosts) { + // DocWorker ID (/dw/) and version tag (/v/) may be present in this request but are not + // needed. addOrgInfo assumes req.url starts with /o/ if present. + req.url = parseFirstUrlPart('dw', req.url!).path; + req.url = parseFirstUrlPart('v', req.url).path; + await this._hosts.addOrgInfo(req); + } + + // Parse the cookie in the request to get the sessionId. + const sessionId = this.sessions.getSessionIdFromRequest(req); + + const params = new URL(req.url!, `http://${req.headers.host}`).searchParams; + const existingClientId = params.get('clientId'); + const browserSettings = safeJsonParse(params.get('browserSettings') || '', {}); + const newClient = (params.get('newClient') === '1'); + const counter = params.get('counter'); + const userSelector = params.get('user') || ''; + + // Associate an ID with each websocket, reusing the supplied one if it's valid. + let client: Client|undefined = this._clients.get(existingClientId!); + if (!client || !await client.reconnect(counter, newClient)) { + client = new Client(this, this._methods, localeFromRequest(req), counter); + this._clients.set(client.clientId, client); + } + // Add a Session object to the client. + log.info(`Comm ${client}: using session ${sessionId}`); + const scopedSession = this.getOrCreateSession(sessionId!, req as RequestWithOrg, userSelector); + client.setSession(scopedSession); + + // Associate the client with this websocket. + client.setConnection(websocket, browserSettings); + + const profile = await this._getSessionProfile(scopedSession, req); + client.setOrg((req as RequestWithOrg).org || ""); + client.setProfile(profile); + + client.sendConnectMessage({ + serverVersion: this._serverVersion || version.gitcommit, + settings: this._settings, + }) + .catch(err => { + log.error(`Comm ${client}: failed to prepare or send clientConnect:`, err); + }); + } + + private _startServer() { + const servers = [this._server]; + if (this._httpsServer) { servers.push(this._httpsServer); } + const wss = []; + for (const server of servers) { + const wssi = new WebSocket.Server({server}); + wssi.on('connection', async (websocket: WebSocket, req) => { + try { + await this._onWebSocketConnection(websocket, req); + } catch (e) { + log.error("Comm connection for %s threw exception: %s", req.url, e.message); + websocket.removeAllListeners(); + websocket.terminate(); // close() is inadequate when ws routed via loadbalancer + } + }); + wss.push(wssi); + } + return wss; + } +} + +/** + * Sends a per-doc message to the given client. + * @param {Object} client - The client object, as passed to all per-doc methods. + * @param {Number} docFD - The document's file descriptor in the given client. + * @param {String} type - The type of the message, e.g. 'docUserAction'. + * @param {Object} messageData - The data for this type of message. + * @param {Boolean} fromSelf - Whether `client` is the originator of this message. + */ +export function sendDocMessage( + client: Client, docFD: number, type: CommDocEventType, data: unknown, fromSelf?: boolean +) { + // TODO Warning disabled to preserve past behavior, but perhaps better to return the Promise? + // eslint-disable-next-line @typescript-eslint/no-floating-promises + client.sendMessage({type, docFD, data, fromSelf} as CommMessage); +} diff --git a/app/server/lib/DocClients.ts b/app/server/lib/DocClients.ts index 44929aea..b66d5d5a 100644 --- a/app/server/lib/DocClients.ts +++ b/app/server/lib/DocClients.ts @@ -3,6 +3,7 @@ * open, and what FD they are using. */ +import {CommDocEventType} from 'app/common/CommTypes'; import {arrayRemove} from 'app/common/gutil'; import {ActiveDoc} from 'app/server/lib/ActiveDoc'; import {Authorizer} from 'app/server/lib/Authorizer'; @@ -81,7 +82,7 @@ export class DocClients { * @param {Object} messageData: The data for this type of message. * @param {Object} filterMessage: Optional callback to filter message per client. */ - public async broadcastDocMessage(client: Client|null, type: string, messageData: any, + public async broadcastDocMessage(client: Client|null, type: CommDocEventType, messageData: any, filterMessage?: (docSession: OptDocSession, messageData: any) => Promise): Promise { const send = (curr: DocSession) => this._send(curr, client, type, messageData, filterMessage); @@ -102,7 +103,7 @@ export class DocClients { /** * Send a message to a single client. See broadcastDocMessage for parameters. */ - private async _send(target: DocSession, client: Client|null, type: string, messageData: any, + private async _send(target: DocSession, client: Client|null, type: CommDocEventType, messageData: any, filterMessage?: (docSession: OptDocSession, messageData: any) => Promise): Promise { const fromSelf = (target.client === client); diff --git a/app/server/lib/DocStorageManager.ts b/app/server/lib/DocStorageManager.ts index 52475905..47dfce75 100644 --- a/app/server/lib/DocStorageManager.ts +++ b/app/server/lib/DocStorageManager.ts @@ -8,7 +8,7 @@ import {DocEntry, DocEntryTag} from 'app/common/DocListAPI'; import {DocSnapshots} from 'app/common/DocSnapshot'; import {DocumentUsage} from 'app/common/DocUsage'; import * as gutil from 'app/common/gutil'; -import * as Comm from 'app/server/lib/Comm'; +import {Comm} from 'app/server/lib/Comm'; import * as docUtils from 'app/server/lib/docUtils'; import {GristServer} from 'app/server/lib/GristServer'; import {IDocStorageManager} from 'app/server/lib/IDocStorageManager'; diff --git a/app/server/lib/DocWorker.ts b/app/server/lib/DocWorker.ts index 24cdf7c4..f05fa6c4 100644 --- a/app/server/lib/DocWorker.ts +++ b/app/server/lib/DocWorker.ts @@ -6,7 +6,7 @@ import {HomeDBManager} from 'app/gen-server/lib/HomeDBManager'; import {ActionHistoryImpl} from 'app/server/lib/ActionHistoryImpl'; import {assertAccess, getOrSetDocAuth, RequestWithLogin} from 'app/server/lib/Authorizer'; import {Client} from 'app/server/lib/Client'; -import * as Comm from 'app/server/lib/Comm'; +import {Comm} from 'app/server/lib/Comm'; import {DocSession, docSessionFromRequest} from 'app/server/lib/DocSession'; import {filterDocumentInPlace} from 'app/server/lib/filterUtils'; import {IDocStorageManager} from 'app/server/lib/IDocStorageManager'; diff --git a/app/server/lib/FlexServer.ts b/app/server/lib/FlexServer.ts index 63a823ed..42e7d52a 100644 --- a/app/server/lib/FlexServer.ts +++ b/app/server/lib/FlexServer.ts @@ -23,7 +23,7 @@ import {addRequestUser, getUser, getUserId, isSingleUserMode, redirectToLoginUnconditionally} from 'app/server/lib/Authorizer'; import {redirectToLogin, RequestWithLogin, signInStatusMiddleware} from 'app/server/lib/Authorizer'; import {forceSessionChange} from 'app/server/lib/BrowserSession'; -import * as Comm from 'app/server/lib/Comm'; +import {Comm} from 'app/server/lib/Comm'; import {create} from 'app/server/lib/create'; import {addDiscourseConnectEndpoints} from 'app/server/lib/DiscourseConnect'; import {addDocApiRoutes} from 'app/server/lib/DocApi'; diff --git a/app/server/lib/GristServer.ts b/app/server/lib/GristServer.ts index 04e0e113..5a6d408f 100644 --- a/app/server/lib/GristServer.ts +++ b/app/server/lib/GristServer.ts @@ -5,7 +5,7 @@ import { Organization } from 'app/gen-server/entity/Organization'; import { Workspace } from 'app/gen-server/entity/Workspace'; import { HomeDBManager } from 'app/gen-server/lib/HomeDBManager'; import { RequestWithLogin } from 'app/server/lib/Authorizer'; -import * as Comm from 'app/server/lib/Comm'; +import { Comm } from 'app/server/lib/Comm'; import { Hosts } from 'app/server/lib/extractOrg'; import { ICreate } from 'app/server/lib/ICreate'; import { IDocStorageManager } from 'app/server/lib/IDocStorageManager'; diff --git a/app/server/lib/ITestingHooks-ti.ts b/app/server/lib/ITestingHooks-ti.ts index 951c6cb9..891a5942 100644 --- a/app/server/lib/ITestingHooks-ti.ts +++ b/app/server/lib/ITestingHooks-ti.ts @@ -12,7 +12,7 @@ export const ITestingHooks = t.iface([], { "disconnectClients": t.func("void"), "commShutdown": t.func("void"), "commRestart": t.func("void"), - "commSetClientPersistence": t.func("void", t.param("ttlMs", "number")), + "commSetClientPersistence": t.func("number", t.param("ttlMs", "number")), "closeDocs": t.func("void"), "setDocWorkerActivation": t.func("void", t.param("workerId", "string"), t.param("active", t.union(t.lit('active'), t.lit('inactive'), t.lit('crash')))), "flushAuthorizerCache": t.func("void"), diff --git a/app/server/lib/ITestingHooks.ts b/app/server/lib/ITestingHooks.ts index ad93a3b4..f4d75073 100644 --- a/app/server/lib/ITestingHooks.ts +++ b/app/server/lib/ITestingHooks.ts @@ -8,7 +8,7 @@ export interface ITestingHooks { disconnectClients(): Promise; commShutdown(): Promise; commRestart(): Promise; - commSetClientPersistence(ttlMs: number): Promise; + commSetClientPersistence(ttlMs: number): Promise; closeDocs(): Promise; setDocWorkerActivation(workerId: string, active: 'active'|'inactive'|'crash'): Promise; flushAuthorizerCache(): Promise; diff --git a/app/server/lib/ServerLocale.ts b/app/server/lib/ServerLocale.ts index 833ba94e..8aebfcdb 100644 --- a/app/server/lib/ServerLocale.ts +++ b/app/server/lib/ServerLocale.ts @@ -1,12 +1,12 @@ import {parse as languageParser} from "accept-language-parser"; -import {Request} from 'express'; +import {IncomingMessage} from 'http'; import {locales} from 'app/common/Locales'; /** * Returns the locale from a request, falling back to `defaultLocale` * if unable to determine the locale. */ -export function localeFromRequest(req: Request, defaultLocale: string = 'en-US') { +export function localeFromRequest(req: IncomingMessage, defaultLocale: string = 'en-US') { const language = languageParser(req.headers["accept-language"] as string)[0]; if (!language) { return defaultLocale; } diff --git a/app/server/lib/Sessions.ts b/app/server/lib/Sessions.ts index 93f8b7fd..e73b2c94 100644 --- a/app/server/lib/Sessions.ts +++ b/app/server/lib/Sessions.ts @@ -3,6 +3,7 @@ import {cookieName, SessionStore} from 'app/server/lib/gristSessions'; import * as cookie from 'cookie'; import * as cookieParser from 'cookie-parser'; import {Request} from 'express'; +import {IncomingMessage} from 'http'; /** * @@ -74,14 +75,14 @@ export class Sessions { /** * Returns the sessionId from the signed grist cookie. */ - public getSessionIdFromCookie(gristCookie: string) { + public getSessionIdFromCookie(gristCookie: string): string|false { return cookieParser.signedCookie(gristCookie, this._sessionSecret); } /** * Get the session id from the grist cookie. Returns null if no cookie found. */ - public getSessionIdFromRequest(req: Request): string|null { + public getSessionIdFromRequest(req: Request|IncomingMessage): string|null { if (req.headers.cookie) { const cookies = cookie.parse(req.headers.cookie); const sessionId = this.getSessionIdFromCookie(cookies[cookieName]); diff --git a/app/server/lib/TestingHooks.ts b/app/server/lib/TestingHooks.ts index 5fc066c4..d406de32 100644 --- a/app/server/lib/TestingHooks.ts +++ b/app/server/lib/TestingHooks.ts @@ -3,7 +3,8 @@ import * as net from 'net'; import {UserProfile} from 'app/common/LoginSessionAPI'; import {Deps as ActiveDocDeps} from 'app/server/lib/ActiveDoc'; import {Deps as DiscourseConnectDeps} from 'app/server/lib/DiscourseConnect'; -import * as Comm from 'app/server/lib/Comm'; +import {Deps as CommClientDeps} from 'app/server/lib/Client'; +import {Comm} from 'app/server/lib/Comm'; import * as log from 'app/server/lib/log'; import {IMessage, Rpc} from 'grain-rpc'; import {Request} from 'express'; @@ -76,7 +77,7 @@ export class TestingHooks implements ITestingHooks { public async setLoginSessionProfile(gristSidCookie: string, profile: UserProfile|null, org?: string): Promise { log.info("TestingHooks.setLoginSessionProfile called with", gristSidCookie, profile, org); const sessionId = this._comm.getSessionIdFromCookie(gristSidCookie); - const scopedSession = this._comm.getOrCreateSession(sessionId, {org}); + const scopedSession = this._comm.getOrCreateSession(sessionId as string, {org}); const req = {} as Request; await scopedSession.updateUserProfile(req, profile); this._server.getSessions().clearCacheIfNeeded({email: profile?.email, org}); @@ -115,13 +116,12 @@ export class TestingHooks implements ITestingHooks { } // Set how long new clients will persist after disconnection. - // Call with 0 to return to default duration. - public async commSetClientPersistence(ttlMs: number) { + // Returns the previous value. + public async commSetClientPersistence(ttlMs: number): Promise { log.info("TestingHooks.setClientPersistence called with", ttlMs); - this._comm.testSetClientPersistence(ttlMs); - for (const server of this._workerServers) { - server.getComm().testSetClientPersistence(ttlMs); - } + const prev = CommClientDeps.clientRemovalTimeoutMs; + CommClientDeps.clientRemovalTimeoutMs = ttlMs; + return prev; } public async closeDocs(): Promise { diff --git a/app/server/lib/extractOrg.ts b/app/server/lib/extractOrg.ts index 06ed7f57..59d7e0e9 100644 --- a/app/server/lib/extractOrg.ts +++ b/app/server/lib/extractOrg.ts @@ -119,7 +119,7 @@ export class Hosts { } } - public async addOrgInfo(req: Request): Promise { + public async addOrgInfo(req: T): Promise { return Object.assign(req, await this.getOrgInfo(req)); } diff --git a/package.json b/package.json index 6f9ddcac..78f2473a 100644 --- a/package.json +++ b/package.json @@ -59,6 +59,7 @@ "@types/tmp": "0.0.33", "@types/uuid": "3.4.4", "@types/which": "2.0.1", + "@types/ws": "^6", "app-module-path": "2.2.0", "catw": "1.0.1", "chai": "4.2.0", diff --git a/test/server/gristClient.ts b/test/server/gristClient.ts index f84fce87..78512f42 100644 --- a/test/server/gristClient.ts +++ b/test/server/gristClient.ts @@ -143,7 +143,7 @@ export async function openClient(server: FlexServer, email: string, org: string, if (email !== 'anon@getgrist.com') { const cid = decodeURIComponent(cookie.split('=')[1].split(';')[0]); const comm = server.getComm(); - const sessionId = comm.getSessionIdFromCookie(cid); + const sessionId = comm.getSessionIdFromCookie(cid)!; const scopedSession = comm.getOrCreateSession(sessionId, {org}); const profile = { email, email_verified: true, name: "Someone" }; await scopedSession.updateUserProfile({} as any, profile); @@ -155,6 +155,7 @@ export async function openClient(server: FlexServer, email: string, org: string, const ws = new WebSocket('ws://localhost:' + server.getOwnPort() + `/o/${org}`, { headers }); + const client = new GristClient(ws); await new Promise(function(resolve, reject) { ws.on('open', function() { resolve(ws); @@ -163,5 +164,5 @@ export async function openClient(server: FlexServer, email: string, org: string, reject(err); }); }); - return new GristClient(ws); + return client; } diff --git a/yarn.lock b/yarn.lock index 8771d771..65d91a70 100644 --- a/yarn.lock +++ b/yarn.lock @@ -473,6 +473,13 @@ resolved "https://registry.yarnpkg.com/@types/which/-/which-2.0.1.tgz#27ecd67f915b7c3d6ba552135bb1eecd66e63501" integrity sha512-Jjakcv8Roqtio6w1gr0D7y6twbhx6gGgFGF5BLwajPpnOIOxFkakFhCq+LmyyeAz7BX6ULrjBOxdKaCDy+4+dQ== +"@types/ws@^6": + version "6.0.4" + resolved "https://registry.yarnpkg.com/@types/ws/-/ws-6.0.4.tgz#7797707c8acce8f76d8c34b370d4645b70421ff1" + integrity sha512-PpPrX7SZW9re6+Ha8ojZG4Se8AZXgf0GK6zmfqEuCsY49LFDNXO3SByp44X3dFEqtB73lkCDAdUazhAjVPiNwg== + dependencies: + "@types/node" "*" + "@webassemblyjs/ast@1.8.5": version "1.8.5" resolved "https://registry.yarnpkg.com/@webassemblyjs/ast/-/ast-1.8.5.tgz#51b1c5fe6576a34953bf4b253df9f0d490d9e359"