(core) Manage memory used for websocket responses to reduce the risk of server crashes.

Summary:
- Implements MemoryPool for waiting on memory reservations.
- Uses MemoryPool to control memory used for stringifying JSON responses in Client.ts
- Limits total size of _missedMessages that may be queued for a particular client.
- Upgrades ws library, which may reduce memory usage, and allows pausing the websocket for testing.
  - The upgrade changed subtle behavior corners, requiring various fixes to code and tests.

- dos.ts:
  - Includes Paul's fixes and updates to the dos.ts script for manual stress-testing.
  - Logging tweaks, to avoid excessive dumps on uncaughtError, and include timestamps.

Test Plan:
- Includes a test that measures heap size, and fails without memory management.
- Includes a unittest for MemoryPool
- Some cleanup and additions to TestServer helper; in particular adds makeUserApi() helper used in multiple tests.
- Some fixes related to ws upgrade.

Reviewers: paulfitz

Reviewed By: paulfitz

Differential Revision: https://phab.getgrist.com/D3974
This commit is contained in:
Dmitry S
2023-08-07 10:28:17 -04:00
parent 7c114bf600
commit 526a5df157
14 changed files with 677 additions and 99 deletions

View File

@@ -110,6 +110,7 @@ export class GristWSSettingsBrowser implements GristWSSettings {
export class GristWSConnection extends Disposable {
public useCount: number = 0;
public on: BackboneEvents['on']; // set by Backbone
public off: BackboneEvents['off']; // set by Backbone
protected trigger: BackboneEvents['trigger']; // set by Backbone

View File

@@ -15,6 +15,7 @@ import type {Comm} from 'app/server/lib/Comm';
import {DocSession} from 'app/server/lib/DocSession';
import log from 'app/server/lib/log';
import {LogMethods} from "app/server/lib/LogMethods";
import {MemoryPool} from 'app/server/lib/MemoryPool';
import {shortDesc} from 'app/server/lib/shortDesc';
import {fromCallback} from 'app/server/lib/serverUtils';
import {i18n} from 'i18next';
@@ -22,8 +23,11 @@ import * as crypto from 'crypto';
import moment from 'moment';
import * as WebSocket from 'ws';
/// How many messages to accumulate for a disconnected client before booting it.
// How many messages and bytes to accumulate for a disconnected client before booting it.
// The benefit is that a client who temporarily disconnects and reconnects without missing much,
// would not need to reload the document.
const clientMaxMissedMessages = 100;
const clientMaxMissedBytes = 1_000_000;
export type ClientMethod = (client: Client, ...args: any[]) => Promise<unknown>;
@@ -33,6 +37,14 @@ const clientRemovalTimeoutMs = 300 * 1000; // 300s = 5 minutes.
// A hook for dependency injection.
export const Deps = {clientRemovalTimeoutMs};
// How much memory to allow using for large JSON responses before waiting for some to clear.
// Max total across all clients and all JSON responses.
const jsonResponseTotalReservation = 500 * 1024 * 1024;
// Estimate of a single JSON response, used before we know how large it is. Together with the
// above, it works to limit parallelism (to 25 responses that can be started in parallel).
const jsonResponseReservation = 20 * 1024 * 1024;
export const jsonMemoryPool = new MemoryPool(jsonResponseTotalReservation);
/**
* Generates and returns a random string to use as a clientId. This is better
* than numbering clients with consecutive integers; otherwise a reconnecting
@@ -81,9 +93,11 @@ export class Client {
private _docFDs: Array<DocSession|null> = [];
private _missedMessages = new Map<number, string>();
private _missedMessagesTotalLength: number = 0;
private _destroyTimer: NodeJS.Timer|null = null;
private _destroyed: boolean = false;
private _websocket: WebSocket|null;
private _websocketEventHandlers: Array<{event: string, handler: (...args: any[]) => void}> = [];
private _org: string|null = null;
private _profile: UserProfile|null = null;
private _userId: number|null = null;
@@ -124,9 +138,13 @@ export class Client {
this._counter = counter;
this.browserSettings = browserSettings;
websocket.on('error', (err) => this._onError(err));
websocket.on('close', () => this._onClose());
websocket.on('message', (msg: string) => this._onMessage(msg));
const addHandler = (event: string, handler: (...args: any[]) => void) => {
websocket.on(event, handler);
this._websocketEventHandlers.push({event, handler});
};
addHandler('error', (err: unknown) => this._onError(err));
addHandler('close', () => this._onClose());
addHandler('message', (msg: string) => this._onMessage(msg));
}
/**
@@ -173,7 +191,7 @@ export class Client {
public interruptConnection() {
if (this._websocket) {
this._websocket.removeAllListeners();
this._removeWebsocketListeners();
this._websocket.terminate(); // close() is inadequate when ws routed via loadbalancer
this._websocket = null;
}
@@ -187,36 +205,69 @@ export class Client {
return;
}
const seqId = this._nextSeqId++;
const message: string = JSON.stringify({...messageObj, seqId});
// Large responses require memory; with many connected clients this can crash the server. We
// manage it using a MemoryPool, waiting for free space to appear. This only controls the
// memory used to hold the JSON.stringify result. Once sent, the reservation is released.
//
// Actual process memory will go up also as the outgoing data is sitting in socket buffers,
// but this isn't part of Node's heap. If an outgoing buffer is full, websocket.send may
// block, and MemoryPool will delay other responses. There is a risk here of unresponsive
// clients exhausing the MemoryPool, perhaps intentionally. To mitigate, we could destroy
// clients that are too slow in reading. This isn't currently done.
//
// Also, we do not manage memory of responses moved to a client's _missedMessages queue. But
// we do limit those in size.
//
// Overall, a better solution would be to stream large responses, or to have the client
// request data piecemeal (as we'd have to for handling large data).
// Log something useful about the message being sent.
if ('error' in messageObj && messageObj.error) {
this._log.warn(null, "responding to #%d ERROR %s", messageObj.reqId, messageObj.error);
}
if (this._websocket) {
// If we have a websocket, send the message.
try {
await this._sendToWebsocket(message);
} catch (err) {
// Sending failed. Add the message to missedMessages.
this._log.warn(null, "sendMessage: queuing after send error:", err.toString());
this._missedMessages.set(seqId, message);
// NOTE: A successful send does NOT mean the message was received. For a better system, see
// https://docs.microsoft.com/en-us/azure/azure-web-pubsub/howto-develop-reliable-clients
// (keeping a copy of messages until acked). With our system, we are more likely to be
// lacking the needed messages on reconnect, and having to reset the client.
await jsonMemoryPool.withReserved(jsonResponseReservation, async (updateReservation) => {
if (this._destroyed) {
// If this Client got destroyed while waiting, stop here and release the reservation.
return;
}
} else if (this._missedMessages.size < clientMaxMissedMessages) {
// Queue up the message.
this._missedMessages.set(seqId, message);
} else {
// Too many messages queued. Boot the client now, to make it reset when/if it reconnects.
this._log.warn(null, "sendMessage: too many messages queued; booting client");
this.destroy();
}
const seqId = this._nextSeqId++;
const message: string = JSON.stringify({...messageObj, seqId});
const size = Buffer.byteLength(message, 'utf8');
updateReservation(size);
// Log something useful about the message being sent.
if ('error' in messageObj && messageObj.error) {
this._log.warn(null, "responding to #%d ERROR %s", messageObj.reqId, messageObj.error);
}
if (this._websocket) {
// If we have a websocket, send the message.
try {
await this._sendToWebsocket(message);
// NOTE: A successful send does NOT mean the message was received. For a better system, see
// https://docs.microsoft.com/en-us/azure/azure-web-pubsub/howto-develop-reliable-clients
// (keeping a copy of messages until acked). With our system, we are more likely to be
// lacking the needed messages on reconnect, and having to reset the client.
return;
} catch (err) {
// Sending failed. Add the message to missedMessages.
this._log.warn(null, "sendMessage: queuing after send error:", err.toString());
}
}
if (this._missedMessages.size < clientMaxMissedMessages &&
this._missedMessagesTotalLength + message.length <= clientMaxMissedBytes) {
// Queue up the message.
// TODO: this keeps the memory but releases jsonMemoryPool reservation, which is wrong --
// it may allow too much memory to be used. This situation is rare, however, so maybe OK
// as is. Ideally, the queued messages could reserve memory in a "nice to have" mode, and
// if memory is needed for something more important, the queue would get dropped.
// (Holding on to the memory reservation here would creates a risk of freezing future
// responses, which seems *more* dangerous than a crash because a crash would at least
// lead to an eventual recovery.)
this._missedMessages.set(seqId, message);
this._missedMessagesTotalLength += message.length;
} else {
// Too many messages queued. Boot the client now, to make it reset when/if it reconnects.
this._log.warn(null, "sendMessage: too many messages queued; booting client");
this.destroy();
}
});
}
/**
@@ -256,6 +307,7 @@ export class Client {
// We collected any missed messages we need; clear the stored map of them.
this._missedMessages.clear();
this._missedMessagesTotalLength = 0;
let docsClosed: number|null = null;
if (!seamlessReconnect) {
@@ -344,6 +396,7 @@ export class Client {
this._destroyTimer = null;
}
this._missedMessages.clear();
this._missedMessagesTotalLength = 0;
this._comm.removeClient(this);
this._destroyed = true;
}
@@ -557,18 +610,31 @@ export class Client {
* Processes the closing of a websocket.
*/
private _onClose() {
this._websocket?.removeAllListeners();
this._removeWebsocketListeners();
// Remove all references to the websocket.
this._websocket = null;
// Schedule the client to be destroyed after a timeout. The timer gets cleared if the same
// client reconnects in the interim.
if (this._destroyTimer) {
this._log.warn(null, "clearing previously scheduled destruction");
clearTimeout(this._destroyTimer);
if (!this._destroyed) {
// Schedule the client to be destroyed after a timeout. The timer gets cleared if the same
// client reconnects in the interim.
if (this._destroyTimer) {
this._log.warn(null, "clearing previously scheduled destruction");
clearTimeout(this._destroyTimer);
}
this._log.info(null, "websocket closed; will discard client in %s sec", Deps.clientRemovalTimeoutMs / 1000);
this._destroyTimer = setTimeout(() => this.destroy(), Deps.clientRemovalTimeoutMs);
}
}
private _removeWebsocketListeners() {
if (this._websocket) {
// Avoiding websocket.removeAllListeners() because WebSocket.Server registers listeners
// internally for websockets it keeps track of, and we should not accidentally remove those.
for (const {event, handler} of this._websocketEventHandlers) {
this._websocket.off(event, handler);
}
this._websocketEventHandlers = [];
}
this._log.info(null, "websocket closed; will discard client in %s sec", Deps.clientRemovalTimeoutMs / 1000);
this._destroyTimer = setTimeout(() => this.destroy(), Deps.clientRemovalTimeoutMs);
}
}

View File

@@ -148,6 +148,11 @@ export class Comm extends EventEmitter {
public async testServerShutdown() {
if (this._wss) {
for (const wssi of this._wss) {
// Terminate all clients. WebSocket.Server used to do it automatically in close() but no
// longer does (see https://github.com/websockets/ws/pull/1904#discussion_r668844565).
for (const ws of wssi.clients) {
ws.terminate();
}
await fromCallback((cb) => wssi.close(cb));
}
this._wss = null;
@@ -259,7 +264,6 @@ export class Comm extends EventEmitter {
await this._onWebSocketConnection(websocket, req);
} catch (e) {
log.error("Comm connection for %s threw exception: %s", req.url, e.message);
websocket.removeAllListeners();
websocket.terminate(); // close() is inadequate when ws routed via loadbalancer
}
});

View File

@@ -13,6 +13,7 @@ export const ITestingHooks = t.iface([], {
"commShutdown": t.func("void"),
"commRestart": t.func("void"),
"commSetClientPersistence": t.func("number", t.param("ttlMs", "number")),
"commSetClientJsonMemoryLimit": t.func("number", t.param("newTotalSize", "number")),
"closeDocs": t.func("void"),
"setDocWorkerActivation": t.func("void", t.param("workerId", "string"), t.param("active", t.union(t.lit('active'), t.lit('inactive'), t.lit('crash')))),
"flushAuthorizerCache": t.func("void"),
@@ -21,6 +22,7 @@ export const ITestingHooks = t.iface([], {
"setActiveDocTimeout": t.func("number", t.param("seconds", "number")),
"setDiscourseConnectVar": t.func(t.union("string", "null"), t.param("varName", "string"), t.param("value", t.union("string", "null"))),
"setWidgetRepositoryUrl": t.func("void", t.param("url", "string")),
"getMemoryUsage": t.func("object"),
});
const exportedTypeSuite: t.ITypeSuite = {

View File

@@ -9,6 +9,7 @@ export interface ITestingHooks {
commShutdown(): Promise<void>;
commRestart(): Promise<void>;
commSetClientPersistence(ttlMs: number): Promise<number>;
commSetClientJsonMemoryLimit(newTotalSize: number): Promise<number>;
closeDocs(): Promise<void>;
setDocWorkerActivation(workerId: string, active: 'active'|'inactive'|'crash'): Promise<void>;
flushAuthorizerCache(): Promise<void>;
@@ -17,4 +18,5 @@ export interface ITestingHooks {
setActiveDocTimeout(seconds: number): Promise<number>;
setDiscourseConnectVar(varName: string, value: string|null): Promise<string|null>;
setWidgetRepositoryUrl(url: string): Promise<void>;
getMemoryUsage(): Promise<object>; // actually NodeJS.MemoryUsage
}

View File

@@ -0,0 +1,114 @@
import Deque from 'double-ended-queue';
/**
* Usage:
*
* OPTION 1, using a callback, which may be async (but doesn't have to be).
*
* await mpool.withReserved(initialSize, async (updateReservation) => {
* ...
* updateReservation(newSize); // if needed
* ...
* });
*
* OPTION 2, lower-level.
*
* Note: dispose() MUST be called (e.g. using try/finally). If not called, other work will
* eventually deadlock waiting for it.
*
* const memoryReservation = await mpool.waitAndReserve(initialSize);
* try {
* ...
* memoryReservation.updateReservation(newSize1); // if needed
* memoryReservation.updateReservation(newSize2); // if needed
* ...
* } finally {
* memoryReservation.dispose();
* }
*
* With both options, it's common for the initialSize to be a pool estimate. You may call
* updateReservation() to update it. If it lowers the estimate, other work may unblock. If it
* raises it, it may delay future work, but will have no impact on work that's already unblocked.
* So it's always safer for initialSize to be an overestimate.
*
* When it's hard to estimate initialSize in bytes, you may specify it as e.g.
* memPool.getTotalSize() / 20. This way at most 20 such parallel tasks may be unblocked at a
* time, and further ones will wait until some release their memory or revise down their estimate.
*/
export class MemoryPool {
private _reservedSize: number = 0;
private _queue = new Deque<MemoryAwaiter>();
constructor(private _totalSize: number) {}
public getTotalSize(): number { return this._totalSize; }
public getReservedSize(): number { return this._reservedSize; }
public getAvailableSize(): number { return this._totalSize - this._reservedSize; }
public isEmpty(): boolean { return this._reservedSize === 0; }
public hasSpace(size: number): boolean { return this._reservedSize + size <= this._totalSize; }
// To avoid failures, allow reserving more than totalSize when memory pool is empty.
public hasSpaceOrIsEmpty(size: number): boolean { return this.hasSpace(size) || this.isEmpty(); }
public numWaiting(): number { return this._queue.length; }
public async waitAndReserve(size: number): Promise<MemoryReservation> {
if (this.hasSpaceOrIsEmpty(size)) {
this._updateReserved(size);
} else {
await new Promise<void>(resolve => this._queue.push({size, resolve}));
}
return new MemoryReservation(size, this._updateReserved.bind(this));
}
public async withReserved(size: number, callback: (updateRes: UpdateReservation) => void|Promise<void>) {
const memRes = await this.waitAndReserve(size);
try {
return await callback(memRes.updateReservation.bind(memRes));
} finally {
memRes.dispose();
}
}
// Update the total size. Returns the old size. This is intended for testing.
public setTotalSize(newTotalSize: number): number {
const oldTotalSize = this._totalSize;
this._totalSize = newTotalSize;
this._checkWaiting();
return oldTotalSize;
}
private _checkWaiting() {
while (!this._queue.isEmpty() && this.hasSpaceOrIsEmpty(this._queue.peekFront()!.size)) {
const item = this._queue.shift()!;
this._updateReserved(item.size);
item.resolve();
}
}
private _updateReserved(sizeDelta: number): void {
this._reservedSize += sizeDelta;
this._checkWaiting();
}
}
type UpdateReservation = (sizeDelta: number) => void;
export class MemoryReservation {
constructor(private _size: number, private _updateReserved: UpdateReservation) {}
public updateReservation(newSize: number) {
this._updateReserved(newSize - this._size);
this._size = newSize;
}
public dispose() {
this.updateReservation(0);
this._updateReserved = undefined as any; // Make sure we don't keep using it after dispose
}
}
interface MemoryAwaiter {
size: number;
resolve: () => void;
}

View File

@@ -4,6 +4,7 @@ import {UserProfile} from 'app/common/LoginSessionAPI';
import {Deps as ActiveDocDeps} from 'app/server/lib/ActiveDoc';
import {Deps as DiscourseConnectDeps} from 'app/server/lib/DiscourseConnect';
import {Deps as CommClientDeps} from 'app/server/lib/Client';
import * as Client from 'app/server/lib/Client';
import {Comm} from 'app/server/lib/Comm';
import log from 'app/server/lib/log';
import {IMessage, Rpc} from 'grain-rpc';
@@ -39,6 +40,8 @@ export function startTestingHooks(socketPath: string, port: number,
function connectToSocket(rpc: Rpc, socket: net.Socket): Rpc {
socket.setEncoding('utf8');
// Poor-man's JSON processing, only OK because this is for testing only. If multiple messages
// are received quickly, they may arrive in the same buf, and JSON.parse will fail.
socket.on('data', (buf: string) => rpc.receiveMessage(JSON.parse(buf)));
rpc.setSendMessage((m: IMessage) => fromCallback(cb => socket.write(JSON.stringify(m), 'utf8', cb)));
return rpc;
@@ -118,12 +121,19 @@ export class TestingHooks implements ITestingHooks {
// Set how long new clients will persist after disconnection.
// Returns the previous value.
public async commSetClientPersistence(ttlMs: number): Promise<number> {
log.info("TestingHooks.setClientPersistence called with", ttlMs);
log.info("TestingHooks.commSetClientPersistence called with", ttlMs);
const prev = CommClientDeps.clientRemovalTimeoutMs;
CommClientDeps.clientRemovalTimeoutMs = ttlMs;
return prev;
}
// Set the amount of memory Client.ts can use for JSON responses, in bytes.
// Returns the old limit.
public async commSetClientJsonMemoryLimit(newTotalSize: number): Promise<number> {
log.info("TestingHooks.commSetClientJsonMemoryLimit called with", newTotalSize);
return Client.jsonMemoryPool.setTotalSize(newTotalSize);
}
public async closeDocs(): Promise<void> {
log.info("TestingHooks.closeDocs called");
if (this._server) {
@@ -215,4 +225,8 @@ export class TestingHooks implements ITestingHooks {
}
repo.testOverrideUrl(url);
}
public async getMemoryUsage(): Promise<NodeJS.MemoryUsage> {
return process.memoryUsage();
}
}