diff --git a/app/client/components/GristWSConnection.ts b/app/client/components/GristWSConnection.ts index 1538ea59..c8eb021a 100644 --- a/app/client/components/GristWSConnection.ts +++ b/app/client/components/GristWSConnection.ts @@ -110,6 +110,7 @@ export class GristWSSettingsBrowser implements GristWSSettings { export class GristWSConnection extends Disposable { public useCount: number = 0; public on: BackboneEvents['on']; // set by Backbone + public off: BackboneEvents['off']; // set by Backbone protected trigger: BackboneEvents['trigger']; // set by Backbone diff --git a/app/server/lib/Client.ts b/app/server/lib/Client.ts index fa7e14bc..4603cd8b 100644 --- a/app/server/lib/Client.ts +++ b/app/server/lib/Client.ts @@ -15,6 +15,7 @@ import type {Comm} from 'app/server/lib/Comm'; import {DocSession} from 'app/server/lib/DocSession'; import log from 'app/server/lib/log'; import {LogMethods} from "app/server/lib/LogMethods"; +import {MemoryPool} from 'app/server/lib/MemoryPool'; import {shortDesc} from 'app/server/lib/shortDesc'; import {fromCallback} from 'app/server/lib/serverUtils'; import {i18n} from 'i18next'; @@ -22,8 +23,11 @@ import * as crypto from 'crypto'; import moment from 'moment'; import * as WebSocket from 'ws'; -/// How many messages to accumulate for a disconnected client before booting it. +// How many messages and bytes to accumulate for a disconnected client before booting it. +// The benefit is that a client who temporarily disconnects and reconnects without missing much, +// would not need to reload the document. const clientMaxMissedMessages = 100; +const clientMaxMissedBytes = 1_000_000; export type ClientMethod = (client: Client, ...args: any[]) => Promise; @@ -33,6 +37,14 @@ const clientRemovalTimeoutMs = 300 * 1000; // 300s = 5 minutes. // A hook for dependency injection. export const Deps = {clientRemovalTimeoutMs}; +// How much memory to allow using for large JSON responses before waiting for some to clear. +// Max total across all clients and all JSON responses. +const jsonResponseTotalReservation = 500 * 1024 * 1024; +// Estimate of a single JSON response, used before we know how large it is. Together with the +// above, it works to limit parallelism (to 25 responses that can be started in parallel). +const jsonResponseReservation = 20 * 1024 * 1024; +export const jsonMemoryPool = new MemoryPool(jsonResponseTotalReservation); + /** * Generates and returns a random string to use as a clientId. This is better * than numbering clients with consecutive integers; otherwise a reconnecting @@ -81,9 +93,11 @@ export class Client { private _docFDs: Array = []; private _missedMessages = new Map(); + private _missedMessagesTotalLength: number = 0; private _destroyTimer: NodeJS.Timer|null = null; private _destroyed: boolean = false; private _websocket: WebSocket|null; + private _websocketEventHandlers: Array<{event: string, handler: (...args: any[]) => void}> = []; private _org: string|null = null; private _profile: UserProfile|null = null; private _userId: number|null = null; @@ -124,9 +138,13 @@ export class Client { this._counter = counter; this.browserSettings = browserSettings; - websocket.on('error', (err) => this._onError(err)); - websocket.on('close', () => this._onClose()); - websocket.on('message', (msg: string) => this._onMessage(msg)); + const addHandler = (event: string, handler: (...args: any[]) => void) => { + websocket.on(event, handler); + this._websocketEventHandlers.push({event, handler}); + }; + addHandler('error', (err: unknown) => this._onError(err)); + addHandler('close', () => this._onClose()); + addHandler('message', (msg: string) => this._onMessage(msg)); } /** @@ -173,7 +191,7 @@ export class Client { public interruptConnection() { if (this._websocket) { - this._websocket.removeAllListeners(); + this._removeWebsocketListeners(); this._websocket.terminate(); // close() is inadequate when ws routed via loadbalancer this._websocket = null; } @@ -187,36 +205,69 @@ export class Client { return; } - const seqId = this._nextSeqId++; - const message: string = JSON.stringify({...messageObj, seqId}); - - // Log something useful about the message being sent. - if ('error' in messageObj && messageObj.error) { - this._log.warn(null, "responding to #%d ERROR %s", messageObj.reqId, messageObj.error); - } + // Large responses require memory; with many connected clients this can crash the server. We + // manage it using a MemoryPool, waiting for free space to appear. This only controls the + // memory used to hold the JSON.stringify result. Once sent, the reservation is released. + // + // Actual process memory will go up also as the outgoing data is sitting in socket buffers, + // but this isn't part of Node's heap. If an outgoing buffer is full, websocket.send may + // block, and MemoryPool will delay other responses. There is a risk here of unresponsive + // clients exhausing the MemoryPool, perhaps intentionally. To mitigate, we could destroy + // clients that are too slow in reading. This isn't currently done. + // + // Also, we do not manage memory of responses moved to a client's _missedMessages queue. But + // we do limit those in size. + // + // Overall, a better solution would be to stream large responses, or to have the client + // request data piecemeal (as we'd have to for handling large data). + + await jsonMemoryPool.withReserved(jsonResponseReservation, async (updateReservation) => { + if (this._destroyed) { + // If this Client got destroyed while waiting, stop here and release the reservation. + return; + } + const seqId = this._nextSeqId++; + const message: string = JSON.stringify({...messageObj, seqId}); + const size = Buffer.byteLength(message, 'utf8'); + updateReservation(size); + + // Log something useful about the message being sent. + if ('error' in messageObj && messageObj.error) { + this._log.warn(null, "responding to #%d ERROR %s", messageObj.reqId, messageObj.error); + } - if (this._websocket) { - // If we have a websocket, send the message. - try { - await this._sendToWebsocket(message); - } catch (err) { - // Sending failed. Add the message to missedMessages. - this._log.warn(null, "sendMessage: queuing after send error:", err.toString()); + if (this._websocket) { + // If we have a websocket, send the message. + try { + await this._sendToWebsocket(message); + // NOTE: A successful send does NOT mean the message was received. For a better system, see + // https://docs.microsoft.com/en-us/azure/azure-web-pubsub/howto-develop-reliable-clients + // (keeping a copy of messages until acked). With our system, we are more likely to be + // lacking the needed messages on reconnect, and having to reset the client. + return; + } catch (err) { + // Sending failed. Add the message to missedMessages. + this._log.warn(null, "sendMessage: queuing after send error:", err.toString()); + } + } + if (this._missedMessages.size < clientMaxMissedMessages && + this._missedMessagesTotalLength + message.length <= clientMaxMissedBytes) { + // Queue up the message. + // TODO: this keeps the memory but releases jsonMemoryPool reservation, which is wrong -- + // it may allow too much memory to be used. This situation is rare, however, so maybe OK + // as is. Ideally, the queued messages could reserve memory in a "nice to have" mode, and + // if memory is needed for something more important, the queue would get dropped. + // (Holding on to the memory reservation here would creates a risk of freezing future + // responses, which seems *more* dangerous than a crash because a crash would at least + // lead to an eventual recovery.) this._missedMessages.set(seqId, message); - - // NOTE: A successful send does NOT mean the message was received. For a better system, see - // https://docs.microsoft.com/en-us/azure/azure-web-pubsub/howto-develop-reliable-clients - // (keeping a copy of messages until acked). With our system, we are more likely to be - // lacking the needed messages on reconnect, and having to reset the client. + this._missedMessagesTotalLength += message.length; + } else { + // Too many messages queued. Boot the client now, to make it reset when/if it reconnects. + this._log.warn(null, "sendMessage: too many messages queued; booting client"); + this.destroy(); } - } else if (this._missedMessages.size < clientMaxMissedMessages) { - // Queue up the message. - this._missedMessages.set(seqId, message); - } else { - // Too many messages queued. Boot the client now, to make it reset when/if it reconnects. - this._log.warn(null, "sendMessage: too many messages queued; booting client"); - this.destroy(); - } + }); } /** @@ -256,6 +307,7 @@ export class Client { // We collected any missed messages we need; clear the stored map of them. this._missedMessages.clear(); + this._missedMessagesTotalLength = 0; let docsClosed: number|null = null; if (!seamlessReconnect) { @@ -344,6 +396,7 @@ export class Client { this._destroyTimer = null; } this._missedMessages.clear(); + this._missedMessagesTotalLength = 0; this._comm.removeClient(this); this._destroyed = true; } @@ -557,18 +610,31 @@ export class Client { * Processes the closing of a websocket. */ private _onClose() { - this._websocket?.removeAllListeners(); + this._removeWebsocketListeners(); // Remove all references to the websocket. this._websocket = null; - // Schedule the client to be destroyed after a timeout. The timer gets cleared if the same - // client reconnects in the interim. - if (this._destroyTimer) { - this._log.warn(null, "clearing previously scheduled destruction"); - clearTimeout(this._destroyTimer); + if (!this._destroyed) { + // Schedule the client to be destroyed after a timeout. The timer gets cleared if the same + // client reconnects in the interim. + if (this._destroyTimer) { + this._log.warn(null, "clearing previously scheduled destruction"); + clearTimeout(this._destroyTimer); + } + this._log.info(null, "websocket closed; will discard client in %s sec", Deps.clientRemovalTimeoutMs / 1000); + this._destroyTimer = setTimeout(() => this.destroy(), Deps.clientRemovalTimeoutMs); + } + } + + private _removeWebsocketListeners() { + if (this._websocket) { + // Avoiding websocket.removeAllListeners() because WebSocket.Server registers listeners + // internally for websockets it keeps track of, and we should not accidentally remove those. + for (const {event, handler} of this._websocketEventHandlers) { + this._websocket.off(event, handler); + } + this._websocketEventHandlers = []; } - this._log.info(null, "websocket closed; will discard client in %s sec", Deps.clientRemovalTimeoutMs / 1000); - this._destroyTimer = setTimeout(() => this.destroy(), Deps.clientRemovalTimeoutMs); } } diff --git a/app/server/lib/Comm.ts b/app/server/lib/Comm.ts index 185e15b8..4f8331c5 100644 --- a/app/server/lib/Comm.ts +++ b/app/server/lib/Comm.ts @@ -148,6 +148,11 @@ export class Comm extends EventEmitter { public async testServerShutdown() { if (this._wss) { for (const wssi of this._wss) { + // Terminate all clients. WebSocket.Server used to do it automatically in close() but no + // longer does (see https://github.com/websockets/ws/pull/1904#discussion_r668844565). + for (const ws of wssi.clients) { + ws.terminate(); + } await fromCallback((cb) => wssi.close(cb)); } this._wss = null; @@ -259,7 +264,6 @@ export class Comm extends EventEmitter { await this._onWebSocketConnection(websocket, req); } catch (e) { log.error("Comm connection for %s threw exception: %s", req.url, e.message); - websocket.removeAllListeners(); websocket.terminate(); // close() is inadequate when ws routed via loadbalancer } }); diff --git a/app/server/lib/ITestingHooks-ti.ts b/app/server/lib/ITestingHooks-ti.ts index 891a5942..d582d76f 100644 --- a/app/server/lib/ITestingHooks-ti.ts +++ b/app/server/lib/ITestingHooks-ti.ts @@ -13,6 +13,7 @@ export const ITestingHooks = t.iface([], { "commShutdown": t.func("void"), "commRestart": t.func("void"), "commSetClientPersistence": t.func("number", t.param("ttlMs", "number")), + "commSetClientJsonMemoryLimit": t.func("number", t.param("newTotalSize", "number")), "closeDocs": t.func("void"), "setDocWorkerActivation": t.func("void", t.param("workerId", "string"), t.param("active", t.union(t.lit('active'), t.lit('inactive'), t.lit('crash')))), "flushAuthorizerCache": t.func("void"), @@ -21,6 +22,7 @@ export const ITestingHooks = t.iface([], { "setActiveDocTimeout": t.func("number", t.param("seconds", "number")), "setDiscourseConnectVar": t.func(t.union("string", "null"), t.param("varName", "string"), t.param("value", t.union("string", "null"))), "setWidgetRepositoryUrl": t.func("void", t.param("url", "string")), + "getMemoryUsage": t.func("object"), }); const exportedTypeSuite: t.ITypeSuite = { diff --git a/app/server/lib/ITestingHooks.ts b/app/server/lib/ITestingHooks.ts index f4d75073..330b372b 100644 --- a/app/server/lib/ITestingHooks.ts +++ b/app/server/lib/ITestingHooks.ts @@ -9,6 +9,7 @@ export interface ITestingHooks { commShutdown(): Promise; commRestart(): Promise; commSetClientPersistence(ttlMs: number): Promise; + commSetClientJsonMemoryLimit(newTotalSize: number): Promise; closeDocs(): Promise; setDocWorkerActivation(workerId: string, active: 'active'|'inactive'|'crash'): Promise; flushAuthorizerCache(): Promise; @@ -17,4 +18,5 @@ export interface ITestingHooks { setActiveDocTimeout(seconds: number): Promise; setDiscourseConnectVar(varName: string, value: string|null): Promise; setWidgetRepositoryUrl(url: string): Promise; + getMemoryUsage(): Promise; // actually NodeJS.MemoryUsage } diff --git a/app/server/lib/MemoryPool.ts b/app/server/lib/MemoryPool.ts new file mode 100644 index 00000000..43c2ccb3 --- /dev/null +++ b/app/server/lib/MemoryPool.ts @@ -0,0 +1,114 @@ +import Deque from 'double-ended-queue'; + +/** + * Usage: + * + * OPTION 1, using a callback, which may be async (but doesn't have to be). + * + * await mpool.withReserved(initialSize, async (updateReservation) => { + * ... + * updateReservation(newSize); // if needed + * ... + * }); + * + * OPTION 2, lower-level. + * + * Note: dispose() MUST be called (e.g. using try/finally). If not called, other work will + * eventually deadlock waiting for it. + * + * const memoryReservation = await mpool.waitAndReserve(initialSize); + * try { + * ... + * memoryReservation.updateReservation(newSize1); // if needed + * memoryReservation.updateReservation(newSize2); // if needed + * ... + * } finally { + * memoryReservation.dispose(); + * } + * + * With both options, it's common for the initialSize to be a pool estimate. You may call + * updateReservation() to update it. If it lowers the estimate, other work may unblock. If it + * raises it, it may delay future work, but will have no impact on work that's already unblocked. + * So it's always safer for initialSize to be an overestimate. + * + * When it's hard to estimate initialSize in bytes, you may specify it as e.g. + * memPool.getTotalSize() / 20. This way at most 20 such parallel tasks may be unblocked at a + * time, and further ones will wait until some release their memory or revise down their estimate. + */ +export class MemoryPool { + private _reservedSize: number = 0; + private _queue = new Deque(); + + constructor(private _totalSize: number) {} + + public getTotalSize(): number { return this._totalSize; } + public getReservedSize(): number { return this._reservedSize; } + public getAvailableSize(): number { return this._totalSize - this._reservedSize; } + public isEmpty(): boolean { return this._reservedSize === 0; } + public hasSpace(size: number): boolean { return this._reservedSize + size <= this._totalSize; } + + // To avoid failures, allow reserving more than totalSize when memory pool is empty. + public hasSpaceOrIsEmpty(size: number): boolean { return this.hasSpace(size) || this.isEmpty(); } + + public numWaiting(): number { return this._queue.length; } + + public async waitAndReserve(size: number): Promise { + if (this.hasSpaceOrIsEmpty(size)) { + this._updateReserved(size); + } else { + await new Promise(resolve => this._queue.push({size, resolve})); + } + return new MemoryReservation(size, this._updateReserved.bind(this)); + } + + public async withReserved(size: number, callback: (updateRes: UpdateReservation) => void|Promise) { + const memRes = await this.waitAndReserve(size); + try { + return await callback(memRes.updateReservation.bind(memRes)); + } finally { + memRes.dispose(); + } + } + + // Update the total size. Returns the old size. This is intended for testing. + public setTotalSize(newTotalSize: number): number { + const oldTotalSize = this._totalSize; + this._totalSize = newTotalSize; + this._checkWaiting(); + return oldTotalSize; + } + + private _checkWaiting() { + while (!this._queue.isEmpty() && this.hasSpaceOrIsEmpty(this._queue.peekFront()!.size)) { + const item = this._queue.shift()!; + this._updateReserved(item.size); + item.resolve(); + } + } + + private _updateReserved(sizeDelta: number): void { + this._reservedSize += sizeDelta; + this._checkWaiting(); + } +} + +type UpdateReservation = (sizeDelta: number) => void; + +export class MemoryReservation { + constructor(private _size: number, private _updateReserved: UpdateReservation) {} + + public updateReservation(newSize: number) { + this._updateReserved(newSize - this._size); + this._size = newSize; + } + + public dispose() { + this.updateReservation(0); + this._updateReserved = undefined as any; // Make sure we don't keep using it after dispose + } +} + +interface MemoryAwaiter { + size: number; + resolve: () => void; +} diff --git a/app/server/lib/TestingHooks.ts b/app/server/lib/TestingHooks.ts index de502457..4d8fd60d 100644 --- a/app/server/lib/TestingHooks.ts +++ b/app/server/lib/TestingHooks.ts @@ -4,6 +4,7 @@ import {UserProfile} from 'app/common/LoginSessionAPI'; import {Deps as ActiveDocDeps} from 'app/server/lib/ActiveDoc'; import {Deps as DiscourseConnectDeps} from 'app/server/lib/DiscourseConnect'; import {Deps as CommClientDeps} from 'app/server/lib/Client'; +import * as Client from 'app/server/lib/Client'; import {Comm} from 'app/server/lib/Comm'; import log from 'app/server/lib/log'; import {IMessage, Rpc} from 'grain-rpc'; @@ -39,6 +40,8 @@ export function startTestingHooks(socketPath: string, port: number, function connectToSocket(rpc: Rpc, socket: net.Socket): Rpc { socket.setEncoding('utf8'); + // Poor-man's JSON processing, only OK because this is for testing only. If multiple messages + // are received quickly, they may arrive in the same buf, and JSON.parse will fail. socket.on('data', (buf: string) => rpc.receiveMessage(JSON.parse(buf))); rpc.setSendMessage((m: IMessage) => fromCallback(cb => socket.write(JSON.stringify(m), 'utf8', cb))); return rpc; @@ -118,12 +121,19 @@ export class TestingHooks implements ITestingHooks { // Set how long new clients will persist after disconnection. // Returns the previous value. public async commSetClientPersistence(ttlMs: number): Promise { - log.info("TestingHooks.setClientPersistence called with", ttlMs); + log.info("TestingHooks.commSetClientPersistence called with", ttlMs); const prev = CommClientDeps.clientRemovalTimeoutMs; CommClientDeps.clientRemovalTimeoutMs = ttlMs; return prev; } + // Set the amount of memory Client.ts can use for JSON responses, in bytes. + // Returns the old limit. + public async commSetClientJsonMemoryLimit(newTotalSize: number): Promise { + log.info("TestingHooks.commSetClientJsonMemoryLimit called with", newTotalSize); + return Client.jsonMemoryPool.setTotalSize(newTotalSize); + } + public async closeDocs(): Promise { log.info("TestingHooks.closeDocs called"); if (this._server) { @@ -215,4 +225,8 @@ export class TestingHooks implements ITestingHooks { } repo.testOverrideUrl(url); } + + public async getMemoryUsage(): Promise { + return process.memoryUsage(); + } } diff --git a/test/server/Comm.ts b/test/server/Comm.ts index 1891c0e1..8beac886 100644 --- a/test/server/Comm.ts +++ b/test/server/Comm.ts @@ -310,7 +310,11 @@ describe('Comm', function() { `Expected to see a failed send:\n${logMessages.join('\n')}`); } - it("should receive all server messages in order when send doesn't fail", async function() { + it("should receive all server messages (small) in order when send doesn't fail", async function() { + await testSendOrdering({noFailedSend: true, useSmallMsgs: true}); + }); + + it("should receive all server messages (large) in order when send doesn't fail", async function() { await testSendOrdering({noFailedSend: true}); }); @@ -322,17 +326,15 @@ describe('Comm', function() { await testSendOrdering({closeHappensFirst: true}); }); - async function testSendOrdering(options: {noFailedSend?: boolean, closeHappensFirst?: boolean}) { + async function testSendOrdering( + options: {noFailedSend?: boolean, closeHappensFirst?: boolean, useSmallMsgs?: boolean} + ) { const eventsSeen: Array<'failedSend'|'close'> = []; // Server-side Client object. let ssClient!: Client; - const {cliComm, forwarder} = await startManagedConnection({ - ...assortedMethods, - // A method just to help us get a handle on the server-side Client object. - setClient: async (client) => { ssClient = client; }, - }); + const {cliComm, forwarder} = await startManagedConnection(assortedMethods); // Intercept the call to _onClose to know when it occurs, since we are trying to hit a // situation where 'close' and 'failedSend' events happen in either order. @@ -366,6 +368,7 @@ describe('Comm', function() { // in the clientConnect message, we are expected to reload the app. In the test, we replace // the GristWSConnection. cliComm.on('clientConnect', async (msg: CommClientConnect) => { + ssClient = comm!.getClient(msg.clientId); if (msg.needReload) { await delay(0); cliComm.releaseDocConnection(docId); @@ -373,11 +376,11 @@ describe('Comm', function() { } }); - // Make the first event that gives us access to the Client object (ssClient). - await cliComm._makeRequest(null, null, "setClient", "foo", 1); + // Wait for a connect call, which we rely on to get access to the Client object (ssClient). + await waitForCondition(() => (clientConnectSpy.callCount > 0), 1000); // Send large buffers, to fill up the socket's buffers to get it to block. - const data = "x".repeat(1.0e7); + const data = "x".repeat(options.useSmallMsgs ? 100_000 : 10_000_000); const makeMessage = (n: number) => ({type: 'docUserAction', n, data}); let n = 0; @@ -425,7 +428,12 @@ describe('Comm', function() { // This test helper is used for 3 different situations. Check that we observed that // situations we were trying to hit. if (options.noFailedSend) { - assert.deepEqual(eventsSeen, ['close']); + if (options.useSmallMsgs) { + assert.deepEqual(eventsSeen, ['close']); + } else { + // Large messages now cause a send to fail, after filling up buffer, and close the socket. + assert.deepEqual(eventsSeen, ['close', 'close']); + } } else if (options.closeHappensFirst) { assert.equal(eventsSeen[0], 'close'); assert.include(eventsSeen, 'failedSend'); diff --git a/test/server/lib/DocApi.ts b/test/server/lib/DocApi.ts index 0f4aa16e..768113f2 100644 --- a/test/server/lib/DocApi.ts +++ b/test/server/lib/DocApi.ts @@ -218,7 +218,7 @@ function testDocApi() { const ws1 = (await userApi.getOrgWorkspaces('current'))[0].id; // Make sure kiwi isn't allowed here. await userApi.updateOrgPermissions(ORG_NAME, {users: {[kiwiEmail]: null}}); - const kiwiApi = makeUserApi(ORG_NAME, 'kiwi'); + const kiwiApi = home.makeUserApi(ORG_NAME, 'kiwi'); await assert.isRejected(kiwiApi.getWorkspaceAccess(ws1), /Forbidden/); // Add kiwi as an editor for the org. await assert.isRejected(kiwiApi.getOrgAccess(ORG_NAME), /Forbidden/); @@ -238,7 +238,7 @@ function testDocApi() { const ws1 = (await userApi.getOrgWorkspaces('current'))[0].id; await userApi.updateOrgPermissions(ORG_NAME, {users: {[kiwiEmail]: null}}); // Make sure kiwi isn't allowed here. - const kiwiApi = makeUserApi(ORG_NAME, 'kiwi'); + const kiwiApi = home.makeUserApi(ORG_NAME, 'kiwi'); await assert.isRejected(kiwiApi.getWorkspaceAccess(ws1), /Forbidden/); // Add kiwi as an editor of this workspace. await userApi.updateWorkspacePermissions(ws1, {users: {[kiwiEmail]: 'editors'}}); @@ -257,7 +257,7 @@ function testDocApi() { it("should allow only owners to remove a document", async () => { const ws1 = (await userApi.getOrgWorkspaces('current'))[0].id; const doc1 = await userApi.newDoc({name: 'testdeleteme1'}, ws1); - const kiwiApi = makeUserApi(ORG_NAME, 'kiwi'); + const kiwiApi = home.makeUserApi(ORG_NAME, 'kiwi'); // Kiwi is editor of the document, so he can't delete it. await userApi.updateDocPermissions(doc1, {users: {'kiwi@getgrist.com': 'editors'}}); @@ -273,7 +273,7 @@ function testDocApi() { it("should allow only owners to rename a document", async () => { const ws1 = (await userApi.getOrgWorkspaces('current'))[0].id; const doc1 = await userApi.newDoc({name: 'testrenameme1'}, ws1); - const kiwiApi = makeUserApi(ORG_NAME, 'kiwi'); + const kiwiApi = home.makeUserApi(ORG_NAME, 'kiwi'); // Kiwi is editor of the document, so he can't rename it. await userApi.updateDocPermissions(doc1, {users: {'kiwi@getgrist.com': 'editors'}}); @@ -3039,7 +3039,7 @@ function testDocApi() { it("limits daily API usage", async function () { // Make a new document in a test product with a low daily limit - const api = makeUserApi('testdailyapilimit'); + const api = home.makeUserApi('testdailyapilimit'); const workspaceId = await getWorkspaceId(api, 'TestDailyApiLimitWs'); const docId = await api.newDoc({name: 'TestDoc1'}, workspaceId); const max = testDailyApiLimitFeatures.baseMaxApiUnitsPerDocumentPerDay; @@ -3067,7 +3067,7 @@ function testDocApi() { it("limits daily API usage and sets the correct keys in redis", async function () { this.retries(3); // Make a new document in a free team site, currently the only real product which limits daily API usage. - const freeTeamApi = makeUserApi('freeteam'); + const freeTeamApi = home.makeUserApi('freeteam'); const workspaceId = await getWorkspaceId(freeTeamApi, 'FreeTeamWs'); const docId = await freeTeamApi.newDoc({name: 'TestDoc2'}, workspaceId); // Rather than making 5000 requests, set high counts directly for the current and next daily and hourly keys @@ -4329,7 +4329,7 @@ function setup(name: string, cb: () => Promise) { await cb(); // create TestDoc as an empty doc into Private workspace - userApi = api = makeUserApi(ORG_NAME); + userApi = api = home.makeUserApi(ORG_NAME); const wid = await getWorkspaceId(api, 'Private'); docIds.TestDoc = await api.newDoc({name: 'TestDoc'}, wid); }); @@ -4345,15 +4345,6 @@ function setup(name: string, cb: () => Promise) { }); } -function makeUserApi(org: string, user?: string) { - return new UserAPIImpl(`${home.serverUrl}/o/${org}`, { - headers: {Authorization: `Bearer api_key_for_${user || 'chimpy'}`}, - fetch: fetch as any, - newFormData: () => new FormData() as any, - logger: log - }); -} - async function getWorkspaceId(api: UserAPIImpl, name: string) { const workspaces = await api.getOrgWorkspaces('current'); return workspaces.find((w) => w.name === name)!.id; diff --git a/test/server/lib/ManyFetches.ts b/test/server/lib/ManyFetches.ts new file mode 100644 index 00000000..af0f3cc0 --- /dev/null +++ b/test/server/lib/ManyFetches.ts @@ -0,0 +1,241 @@ +import {GristWSConnection} from 'app/client/components/GristWSConnection'; +import {TableFetchResult} from 'app/common/ActiveDocAPI'; +import {UserAPIImpl} from 'app/common/UserAPI'; +import {delay} from 'app/common/delay'; +import * as log from 'app/server/lib/log'; +import {getGristConfig} from 'test/gen-server/testUtils'; +import {prepareDatabase} from 'test/server/lib/helpers/PrepareDatabase'; +import {TestServer} from 'test/server/lib/helpers/TestServer'; +import {createTestDir, EnvironmentSnapshot, setTmpLogLevel} from 'test/server/testUtils'; +import {assert} from 'chai'; +import * as cookie from 'cookie'; +import fetch from 'node-fetch'; +import WebSocket from 'ws'; + +describe('ManyFetches', function() { + this.timeout(30000); + + setTmpLogLevel('warn'); // Set to 'info' to see what heap size actually is. + let oldEnv: EnvironmentSnapshot; + + const userName = 'chimpy'; + const email = 'chimpy@getgrist.com'; + const org = 'docs'; + + let home: TestServer; + let docs: TestServer; + let userApi: UserAPIImpl; + + beforeEach(async function() { + oldEnv = new EnvironmentSnapshot(); // Needed for prepareDatabase, which changes process.env + log.info("Starting servers"); + const testDir = await createTestDir("ManyFetches"); + await prepareDatabase(testDir); + home = await TestServer.startServer('home', testDir, "home"); + docs = await TestServer.startServer('docs', testDir, "docs", { + // The test verifies memory usage by checking heap sizes. The line below limits doc-worker + // process so that it crashes when memory management is wrong. With fetch sizes + // in this test, doc-worker's heap size goes from ~90M to ~330M without memory management; + // this limit is in the middle as another way to verify that memory management helps. + // Without this limit, there is no pressure on node to garbage-collect, so it may use more + // memory than we expect, making the test less reliable. + NODE_OPTIONS: '--max-old-space-size=210', + }, home.serverUrl); + userApi = home.makeUserApi(org, userName); + }); + + afterEach(async function() { + // stop all servers + await home.stop(); + await docs.stop(); + oldEnv.restore(); + }); + + // Assert and log; helpful for working on the test (when setTmpLogLevel is 'info'). + function assertIsBelow(value: number, expected: number) { + log.info("HeapMB", value, `(expected < ${expected})`); + assert.isBelow(value, expected); + } + + it('should limit the memory used to respond to many simultaneuous fetches', async function() { + // Here we create a large document, and fetch it in parallel 200 times, without reading + // responses. This test relies on the fact that the server caches the fetched data, so only + // the serialized responses to clients are responsible for large memory use. This is the + // memory use limited in Client.ts by jsonMemoryPool. + + // Reduce the limit controlling memory for JSON responses from the default of 500MB to 50MB. + await docs.testingHooks.commSetClientJsonMemoryLimit(50 * 1024 * 1024); + + // Create a large document where fetches would have a noticeable memory footprint. + // 40k rows should produce ~2MB fetch response. + const {docId} = await createLargeDoc({rows: 40_000}); + + // When we get results, here's a checker that it looks reasonable. + function checkResults(results: TableFetchResult[]) { + assert.lengthOf(results, 100); + for (const res of results) { + assert.lengthOf(res.tableData[2], 40_000); + assert.lengthOf(res.tableData[3].Num, 40_000); + assert.lengthOf(res.tableData[3].Text, 40_000); + } + } + + // Prepare to make N requests. For N=100, doc-worker should need ~200M of additional memory + // without memory management. + const N = 100; + + // Helper to get doc-worker's heap size. + // If the server dies, testingHooks calls may hang. This wrapper prevents that. + const serverErrorPromise = docs.getExitPromise().then(() => { throw new Error("server exited"); }); + const getMemoryUsage = () => Promise.race([docs.testingHooks.getMemoryUsage(), serverErrorPromise]); + const getHeapMB = async () => Math.round((await getMemoryUsage() as NodeJS.MemoryUsage).heapUsed /1024/1024); + + assertIsBelow(await getHeapMB(), 120); + + // Create all the connections, but don't make the fetches just yet. + const createConnectionFunc = await prepareGristWSConnection(docId); + const connectionsA = Array.from(Array(N), createConnectionFunc); + const fetchersA = await Promise.all(connectionsA.map(c => connect(c, docId))); + + const connectionsB = Array.from(Array(N), createConnectionFunc); + const fetchersB = await Promise.all(connectionsB.map(c => connect(c, docId))); + + try { + assertIsBelow(await getHeapMB(), 120); + + // Start fetches without reading responses. This is a step that should push memory limits. + fetchersA.map(f => f.startPausedFetch()); + + // Give it a few seconds, enough for server to use what memory it can. + await delay(2000); + assertIsBelow(await getHeapMB(), 200); + + // Make N more requests. See that memory hasn't spiked. + fetchersB.map(f => f.startPausedFetch()); + await delay(2000); + assertIsBelow(await getHeapMB(), 200); + + // Complete the first batch of requests. This allows for the fetches to complete, and for + // memory to get released. Also check that results look reasonable. + checkResults(await Promise.all(fetchersA.map(f => f.completeFetch()))); + + assertIsBelow(await getHeapMB(), 200); + + // Complete the outstanding requests. Memory shouldn't spike. + checkResults(await Promise.all(fetchersB.map(f => f.completeFetch()))); + + assertIsBelow(await getHeapMB(), 200); + + } finally { + fetchersA.map(f => f.end()); + fetchersB.map(f => f.end()); + } + }); + + // Creates a document with the given number of rows, and about 50 bytes per row. + async function createLargeDoc({rows}: {rows: number}): Promise<{docId: string}> { + log.info("Preparing a doc of %s rows", rows); + const ws = (await userApi.getOrgWorkspaces('current'))[0].id; + const docId = await userApi.newDoc({name: 'testdoc'}, ws); + await userApi.applyUserActions(docId, [['AddTable', 'TestTable', [ + {id: 'Num', type: 'Numeric'}, + {id: 'Text', type: 'Text'} + ]]]); + const chunk = 10_000; + for (let i = 0; i < rows; i += chunk) { + const currentNumRows = Math.min(chunk, rows - i); + await userApi.getDocAPI(docId).addRows('TestTable', { + // Roughly 8 bytes per row + Num: Array.from(Array(currentNumRows), (_, n) => (i + n) * 100), + // Roughly 40 bytes per row + Text: Array.from(Array(currentNumRows), (_, n) => `Hello, world, again for the ${i + n}th time.`), + }); + } + return {docId}; + } + + // Get all the info for how to create a GristWSConnection, and returns a connection-creating + // function. + async function prepareGristWSConnection(docId: string): Promise<() => GristWSConnection> { + // Use cookies for access to stay as close as possible to regular operation. + const resp = await fetch(`${home.serverUrl}/test/session`); + const sid = cookie.parse(resp.headers.get('set-cookie')).grist_sid; + if (!sid) { throw new Error('no session available'); } + await home.testingHooks.setLoginSessionProfile(sid, {name: userName, email}, org); + + // Load the document html. + const pageUrl = `${home.serverUrl}/o/docs/doc/${docId}`; + const headers = {Cookie: `grist_sid=${sid}`}; + const doc = await fetch(pageUrl, {headers}); + const pageBody = await doc.text(); + + // Pull out the configuration object embedded in the html. + const gristConfig = getGristConfig(pageBody); + const {assignmentId, getWorker, homeUrl} = gristConfig; + if (!homeUrl) { throw new Error('no homeUrl'); } + if (!assignmentId) { throw new Error('no assignmentId'); } + const docWorkerUrl = getWorker && getWorker[assignmentId]; + if (!docWorkerUrl) { throw new Error('no docWorkerUrl'); } + + // Place the config object in window.gristConfig as if we were a + // real browser client. GristWSConnection expects to find it there. + globalThis.window = globalThis.window || {}; + (globalThis.window as any).gristConfig = gristConfig; + + // We return a function that constructs a GristWSConnection. + return function createConnectionFunc() { + let clientId: string = '0'; + return GristWSConnection.create(null, { + makeWebSocket(url: string): any { return new WebSocket(url, undefined, { headers }); }, + getTimezone() { return Promise.resolve('UTC'); }, + getPageUrl() { return pageUrl; }, + getDocWorkerUrl() { return Promise.resolve(docWorkerUrl); }, + getClientId(did) { return clientId; }, + getUserSelector() { return ''; }, + updateClientId(did: string, cid: string) { clientId = cid; }, + advanceCounter(): string { return '0'; }, + log(msg, ...args) {}, + warn(msg, ...args) {}, + }); + }; + } + + // Actually connect GristWSConnection, open the doc, and return a few methods for next steps. + async function connect(connection: GristWSConnection, docId: string) { + async function getMessage(eventType: string, filter: (msg: T) => boolean): Promise { + return new Promise(resolve => { + function callback(msg: T) { + if (filter(msg)) { connection.off(eventType, callback); resolve(msg); } + } + connection.on(eventType, callback); + }); + } + + // Launch the websocket + const connectionPromise = getMessage('connectState', (isConnected: boolean) => isConnected); + connection.initialize(null); + await connectionPromise; // Wait for connection to succeed. + + const openPromise = getMessage('serverMessage', ({reqId}: {reqId?: number}) => (reqId === 0)); + connection.send(JSON.stringify({reqId: 0, method: 'openDoc', args: [docId]})); + await openPromise; + + let fetchPromise: Promise; + return { + startPausedFetch: () => { + fetchPromise = getMessage('serverMessage', ({reqId}: {reqId?: number}) => (reqId === 1)); + (connection as any)._ws.pause(); + connection.send(JSON.stringify({reqId: 1, method: 'fetchTable', args: [0, 'TestTable']})); + }, + + completeFetch: async (): Promise => { + (connection as any)._ws.resume(); + return (await fetchPromise as any).data; + }, + + end: () => { + connection.dispose(); + }, + }; + } +}); diff --git a/test/server/lib/MemoryPool.ts b/test/server/lib/MemoryPool.ts new file mode 100644 index 00000000..e60450c3 --- /dev/null +++ b/test/server/lib/MemoryPool.ts @@ -0,0 +1,115 @@ +import {MemoryPool} from 'app/server/lib/MemoryPool'; +import {delay} from 'app/common/delay'; +import {isLongerThan} from 'app/common/gutil'; +import {assert} from 'chai'; +import * as sinon from 'sinon'; + +async function isResolved(promise: Promise): Promise { + return !await isLongerThan(promise, 0); +} + +async function areResolved(...promises: Promise[]): Promise { + return Promise.all(promises.map(p => isResolved(p))); +} + +function poolInfo(mpool: MemoryPool): {total: number, reserved: number, available: number, awaiters: number} { + return { + total: mpool.getTotalSize(), + reserved: mpool.getReservedSize(), + available: mpool.getAvailableSize(), + awaiters: mpool.numWaiting(), + }; +} + +describe("MemoryPool", function() { + + afterEach(() => { + sinon.restore(); + }); + + it("should wait for enough space", async function() { + const mpool = new MemoryPool(1000); + const spy = sinon.spy(); + let r1: () => void; + let r2: () => void; + let r3: () => void; + let r4: () => void; + const w1 = new Promise(r => { r1 = r; }); + const w2 = new Promise(r => { r2 = r; }); + const w3 = new Promise(r => { r3 = r; }); + const w4 = new Promise(r => { r4 = r; }); + const p1 = mpool.withReserved(400, () => { spy(1); return w1; }); + const p2 = mpool.withReserved(400, () => { spy(2); return w2; }); + const p3 = mpool.withReserved(400, () => { spy(3); return w3; }); + const p4 = mpool.withReserved(400, () => { spy(4); return w4; }); + + // Only two callbacks run initially. + await delay(10); + assert.deepEqual(spy.args, [[1], [2]]); + + // Others are waiting for something to finish. + await delay(50); + assert.deepEqual(spy.args, [[1], [2]]); + + // Once 2nd task finishes, the next one should run. + r2!(); + await delay(10); + assert.deepEqual(spy.args, [[1], [2], [3]]); + await delay(50); + assert.deepEqual(spy.args, [[1], [2], [3]]); + + // Once another task finishes, the last one should run. + r3!(); + await delay(10); + assert.deepEqual(spy.args, [[1], [2], [3], [4]]); + + // Let all tasks finish. + r1!(); + r4!(); + await delay(10); + assert.deepEqual(spy.args, [[1], [2], [3], [4]]); + await Promise.all([p1, p2, p3, p4]); + }); + + it("should allow adjusting reservation", async function() { + const mpool = new MemoryPool(1000); + const res1p = mpool.waitAndReserve(600); + const res2p = mpool.waitAndReserve(600); + + // Initially only the first reservation can happen. + assert.deepEqual(poolInfo(mpool), {total: 1000, reserved: 600, available: 400, awaiters: 1}); + assert.deepEqual(await areResolved(res1p, res2p), [true, false]); + + // Once the first reservation is adjusted, the next one should go. + const res1 = await res1p; + res1.updateReservation(400); + assert.deepEqual(poolInfo(mpool), {total: 1000, reserved: 1000, available: 0, awaiters: 0}); + assert.deepEqual(await areResolved(res1p, res2p), [true, true]); + + const res2 = await res2p; + + // Try some more complex combinations. + const res3p = mpool.waitAndReserve(200); + const res4p = mpool.waitAndReserve(200); + const res5p = mpool.waitAndReserve(200); + assert.deepEqual(poolInfo(mpool), {total: 1000, reserved: 1000, available: 0, awaiters: 3}); + assert.deepEqual(await areResolved(res3p, res4p, res5p), [false, false, false]); + + res1.updateReservation(100); // 300 units freed. + assert.deepEqual(poolInfo(mpool), {total: 1000, reserved: 900, available: 100, awaiters: 2}); + assert.deepEqual(await areResolved(res3p, res4p, res5p), [true, false, false]); + + res1.dispose(); // Another 100 freed. + assert.deepEqual(poolInfo(mpool), {total: 1000, reserved: 1000, available: 0, awaiters: 1}); + assert.deepEqual(await areResolved(res3p, res4p, res5p), [true, true, false]); + + res2.dispose(); // Lots freed. + assert.deepEqual(poolInfo(mpool), {total: 1000, reserved: 600, available: 400, awaiters: 0}); + assert.deepEqual(await areResolved(res3p, res4p, res5p), [true, true, true]); + + (await res5p).dispose(); + (await res4p).dispose(); + (await res3p).dispose(); + assert.deepEqual(poolInfo(mpool), {total: 1000, reserved: 0, available: 1000, awaiters: 0}); + }); +}); diff --git a/test/server/lib/Webhooks-Proxy.ts b/test/server/lib/Webhooks-Proxy.ts index c59fde5b..7ad895b1 100644 --- a/test/server/lib/Webhooks-Proxy.ts +++ b/test/server/lib/Webhooks-Proxy.ts @@ -1,11 +1,8 @@ import {UserAPIImpl} from 'app/common/UserAPI'; import {WebhookSubscription} from 'app/server/lib/DocApi'; -import log from 'app/server/lib/log'; import axios from 'axios'; import * as bodyParser from 'body-parser'; import {assert} from 'chai'; -import FormData from 'form-data'; -import fetch from 'node-fetch'; import {tmpdir} from 'os'; import * as path from 'path'; import {createClient} from 'redis'; @@ -75,7 +72,7 @@ describe('Webhooks-Proxy', function () { await cb(); // create TestDoc as an empty doc into Private workspace - userApi = api = makeUserApi(ORG_NAME, home.serverUrl); + userApi = api = home.makeUserApi(ORG_NAME); const wid = await getWorkspaceId(api, 'Private'); docIds.TestDoc = await api.newDoc({name: 'TestDoc'}, wid); }); @@ -102,7 +99,7 @@ describe('Webhooks-Proxy', function () { }); - function runServerConfigurations(additionaEnvConfiguration: object, subTestCall: Function) { + function runServerConfigurations(additionaEnvConfiguration: NodeJS.ProcessEnv, subTestCall: Function) { additionaEnvConfiguration = { ALLOWED_WEBHOOK_DOMAINS: `example.com,localhost:${webhooksTestPort}`, GRIST_DATA_DIR: dataDir, @@ -322,16 +319,6 @@ describe('Webhooks-Proxy', function () { const ORG_NAME = 'docs-1'; - -function makeUserApi(org: string, homeServerUrl: string, user?: string) { - return new UserAPIImpl(`${homeServerUrl}/o/${org}`, { - headers: {Authorization: `Bearer api_key_for_${user || 'chimpy'}`}, - fetch: fetch as any, - newFormData: () => new FormData() as any, - logger: log - }); -} - async function getWorkspaceId(api: UserAPIImpl, name: string) { const workspaces = await api.getOrgWorkspaces('current'); return workspaces.find((w) => w.name === name)!.id; diff --git a/test/server/lib/helpers/TestServer.ts b/test/server/lib/helpers/TestServer.ts index 6103a1df..304854be 100644 --- a/test/server/lib/helpers/TestServer.ts +++ b/test/server/lib/helpers/TestServer.ts @@ -1,24 +1,28 @@ import {connectTestingHooks, TestingHooksClient} from "app/server/lib/TestingHooks"; import {ChildProcess, execFileSync, spawn} from "child_process"; +import FormData from 'form-data'; import path from "path"; import * as fse from "fs-extra"; import * as testUtils from "test/server/testUtils"; +import {UserAPIImpl} from "app/common/UserAPI"; import {exitPromise} from "app/server/lib/serverUtils"; import log from "app/server/lib/log"; import {delay} from "bluebird"; import fetch from "node-fetch"; +/** + * This starts a server in a separate process. + */ export class TestServer { - public static async startServer - (serverTypes: string, - tempDirectory: string, - suitename: string, - additionalConfig?: Object, - _homeUrl?: string): Promise { + public static async startServer( + serverTypes: string, + tempDirectory: string, + suitename: string, + customEnv?: NodeJS.ProcessEnv, + _homeUrl?: string, + ): Promise { + const server = new TestServer(serverTypes, tempDirectory, suitename); - // Override some env variables in server configuration to serve our test purpose: - const customEnv = { - ...additionalConfig}; await server.start(_homeUrl, customEnv); return server; } @@ -33,9 +37,10 @@ export class TestServer { private readonly _defaultEnv; - constructor(private _serverTypes: string, private _tmpDir: string, private _suiteName: string) { + constructor(private _serverTypes: string, public readonly rootDir: string, private _suiteName: string) { this._defaultEnv = { - GRIST_INST_DIR: this._tmpDir, + GRIST_INST_DIR: this.rootDir, + GRIST_DATA_DIR: path.join(this.rootDir, "data"), GRIST_SERVERS: this._serverTypes, // with port '0' no need to hard code a port number (we can use testing hooks to find out what // port server is listening on). @@ -50,14 +55,14 @@ export class TestServer { ...process.env }; } - public async start(_homeUrl?: string, customEnv?: object) { + public async start(_homeUrl?: string, customEnv?: NodeJS.ProcessEnv) { // put node logs into files with meaningful name that relate to the suite name and server type const fixedName = this._serverTypes.replace(/,/, '_'); - const nodeLogPath = path.join(this._tmpDir, `${this._suiteName}-${fixedName}-node.log`); + const nodeLogPath = path.join(this.rootDir, `${this._suiteName}-${fixedName}-node.log`); const nodeLogFd = await fse.open(nodeLogPath, 'a'); const serverLog = process.env.VERBOSE ? 'inherit' : nodeLogFd; // use a path for socket that relates to suite name and server types - this.testingSocket = path.join(this._tmpDir, `${this._suiteName}-${fixedName}.socket`); + this.testingSocket = path.join(this.rootDir, `${this._suiteName}-${fixedName}.socket`); const env = { APP_HOME_URL: _homeUrl, GRIST_TESTING_SOCKET: this.testingSocket, @@ -116,10 +121,25 @@ export class TestServer { // wait for check return (await fetch(`${this.serverUrl}/status/hooks`, {timeout: 1000})).ok; } catch (err) { + log.warn("Failed to initialize server", err); return false; } } + // Get access to the ChildProcess object for this server, e.g. to get its PID. + public getChildProcess(): ChildProcess { return this._server; } + + // Returns the promise for the ChildProcess's signal or exit code. + public getExitPromise(): Promise { return this._exitPromise; } + + public makeUserApi(org: string, user: string = 'chimpy'): UserAPIImpl { + return new UserAPIImpl(`${this.serverUrl}/o/${org}`, { + headers: {Authorization: `Bearer api_key_for_${user}`}, + fetch: fetch as unknown as typeof globalThis.fetch, + newFormData: () => new FormData() as any, + logger: log + }); + } private async _waitServerReady() { // It's important to clear the timeout, because it can prevent node from exiting otherwise, diff --git a/test/server/testUtils.ts b/test/server/testUtils.ts index fa73770b..31642fcd 100644 --- a/test/server/testUtils.ts +++ b/test/server/testUtils.ts @@ -12,6 +12,7 @@ import * as _ from 'underscore'; import { assert } from 'chai'; +import {tmpdir} from 'os'; import * as path from 'path'; import * as fse from 'fs-extra'; import clone = require('lodash/clone'); @@ -309,6 +310,18 @@ export class EnvironmentSnapshot { } } +export async function createTestDir(suiteName: string): Promise { + const tmpRootDir = process.env.TESTDIR || tmpdir(); + const workerIdText = process.env.MOCHA_WORKER_ID || '0'; + const username = process.env.USER || "nobody"; + const testDir = path.join(tmpRootDir, `grist_test_${username}_${suiteName}_${workerIdText}`); + // Remove any previous tmp dir, and create the new one. + await fse.remove(testDir); + await fse.mkdirs(testDir); + log.warn(`Test logs and data are at: ${testDir}/`); + return testDir; +} + export async function getBuildFile(relativePath: string): Promise { if (await fse.pathExists(path.join('_build', relativePath))) { return path.join('_build', relativePath);