gristlabs_grist-core/test/server/lib/ManyFetches.ts
Dmitry S 526a5df157 (core) Manage memory used for websocket responses to reduce the risk of server crashes.
Summary:
- Implements MemoryPool for waiting on memory reservations.
- Uses MemoryPool to control memory used for stringifying JSON responses in Client.ts
- Limits total size of _missedMessages that may be queued for a particular client.
- Upgrades ws library, which may reduce memory usage, and allows pausing the websocket for testing.
  - The upgrade changed subtle behavior corners, requiring various fixes to code and tests.

- dos.ts:
  - Includes Paul's fixes and updates to the dos.ts script for manual stress-testing.
  - Logging tweaks, to avoid excessive dumps on uncaughtError, and include timestamps.

Test Plan:
- Includes a test that measures heap size, and fails without memory management.
- Includes a unittest for MemoryPool
- Some cleanup and additions to TestServer helper; in particular adds makeUserApi() helper used in multiple tests.
- Some fixes related to ws upgrade.

Reviewers: paulfitz

Reviewed By: paulfitz

Differential Revision: https://phab.getgrist.com/D3974
2023-08-07 11:28:31 -04:00

242 lines
10 KiB
TypeScript

import {GristWSConnection} from 'app/client/components/GristWSConnection';
import {TableFetchResult} from 'app/common/ActiveDocAPI';
import {UserAPIImpl} from 'app/common/UserAPI';
import {delay} from 'app/common/delay';
import * as log from 'app/server/lib/log';
import {getGristConfig} from 'test/gen-server/testUtils';
import {prepareDatabase} from 'test/server/lib/helpers/PrepareDatabase';
import {TestServer} from 'test/server/lib/helpers/TestServer';
import {createTestDir, EnvironmentSnapshot, setTmpLogLevel} from 'test/server/testUtils';
import {assert} from 'chai';
import * as cookie from 'cookie';
import fetch from 'node-fetch';
import WebSocket from 'ws';
describe('ManyFetches', function() {
this.timeout(30000);
setTmpLogLevel('warn'); // Set to 'info' to see what heap size actually is.
let oldEnv: EnvironmentSnapshot;
const userName = 'chimpy';
const email = 'chimpy@getgrist.com';
const org = 'docs';
let home: TestServer;
let docs: TestServer;
let userApi: UserAPIImpl;
beforeEach(async function() {
oldEnv = new EnvironmentSnapshot(); // Needed for prepareDatabase, which changes process.env
log.info("Starting servers");
const testDir = await createTestDir("ManyFetches");
await prepareDatabase(testDir);
home = await TestServer.startServer('home', testDir, "home");
docs = await TestServer.startServer('docs', testDir, "docs", {
// The test verifies memory usage by checking heap sizes. The line below limits doc-worker
// process so that it crashes when memory management is wrong. With fetch sizes
// in this test, doc-worker's heap size goes from ~90M to ~330M without memory management;
// this limit is in the middle as another way to verify that memory management helps.
// Without this limit, there is no pressure on node to garbage-collect, so it may use more
// memory than we expect, making the test less reliable.
NODE_OPTIONS: '--max-old-space-size=210',
}, home.serverUrl);
userApi = home.makeUserApi(org, userName);
});
afterEach(async function() {
// stop all servers
await home.stop();
await docs.stop();
oldEnv.restore();
});
// Assert and log; helpful for working on the test (when setTmpLogLevel is 'info').
function assertIsBelow(value: number, expected: number) {
log.info("HeapMB", value, `(expected < ${expected})`);
assert.isBelow(value, expected);
}
it('should limit the memory used to respond to many simultaneuous fetches', async function() {
// Here we create a large document, and fetch it in parallel 200 times, without reading
// responses. This test relies on the fact that the server caches the fetched data, so only
// the serialized responses to clients are responsible for large memory use. This is the
// memory use limited in Client.ts by jsonMemoryPool.
// Reduce the limit controlling memory for JSON responses from the default of 500MB to 50MB.
await docs.testingHooks.commSetClientJsonMemoryLimit(50 * 1024 * 1024);
// Create a large document where fetches would have a noticeable memory footprint.
// 40k rows should produce ~2MB fetch response.
const {docId} = await createLargeDoc({rows: 40_000});
// When we get results, here's a checker that it looks reasonable.
function checkResults(results: TableFetchResult[]) {
assert.lengthOf(results, 100);
for (const res of results) {
assert.lengthOf(res.tableData[2], 40_000);
assert.lengthOf(res.tableData[3].Num, 40_000);
assert.lengthOf(res.tableData[3].Text, 40_000);
}
}
// Prepare to make N requests. For N=100, doc-worker should need ~200M of additional memory
// without memory management.
const N = 100;
// Helper to get doc-worker's heap size.
// If the server dies, testingHooks calls may hang. This wrapper prevents that.
const serverErrorPromise = docs.getExitPromise().then(() => { throw new Error("server exited"); });
const getMemoryUsage = () => Promise.race([docs.testingHooks.getMemoryUsage(), serverErrorPromise]);
const getHeapMB = async () => Math.round((await getMemoryUsage() as NodeJS.MemoryUsage).heapUsed /1024/1024);
assertIsBelow(await getHeapMB(), 120);
// Create all the connections, but don't make the fetches just yet.
const createConnectionFunc = await prepareGristWSConnection(docId);
const connectionsA = Array.from(Array(N), createConnectionFunc);
const fetchersA = await Promise.all(connectionsA.map(c => connect(c, docId)));
const connectionsB = Array.from(Array(N), createConnectionFunc);
const fetchersB = await Promise.all(connectionsB.map(c => connect(c, docId)));
try {
assertIsBelow(await getHeapMB(), 120);
// Start fetches without reading responses. This is a step that should push memory limits.
fetchersA.map(f => f.startPausedFetch());
// Give it a few seconds, enough for server to use what memory it can.
await delay(2000);
assertIsBelow(await getHeapMB(), 200);
// Make N more requests. See that memory hasn't spiked.
fetchersB.map(f => f.startPausedFetch());
await delay(2000);
assertIsBelow(await getHeapMB(), 200);
// Complete the first batch of requests. This allows for the fetches to complete, and for
// memory to get released. Also check that results look reasonable.
checkResults(await Promise.all(fetchersA.map(f => f.completeFetch())));
assertIsBelow(await getHeapMB(), 200);
// Complete the outstanding requests. Memory shouldn't spike.
checkResults(await Promise.all(fetchersB.map(f => f.completeFetch())));
assertIsBelow(await getHeapMB(), 200);
} finally {
fetchersA.map(f => f.end());
fetchersB.map(f => f.end());
}
});
// Creates a document with the given number of rows, and about 50 bytes per row.
async function createLargeDoc({rows}: {rows: number}): Promise<{docId: string}> {
log.info("Preparing a doc of %s rows", rows);
const ws = (await userApi.getOrgWorkspaces('current'))[0].id;
const docId = await userApi.newDoc({name: 'testdoc'}, ws);
await userApi.applyUserActions(docId, [['AddTable', 'TestTable', [
{id: 'Num', type: 'Numeric'},
{id: 'Text', type: 'Text'}
]]]);
const chunk = 10_000;
for (let i = 0; i < rows; i += chunk) {
const currentNumRows = Math.min(chunk, rows - i);
await userApi.getDocAPI(docId).addRows('TestTable', {
// Roughly 8 bytes per row
Num: Array.from(Array(currentNumRows), (_, n) => (i + n) * 100),
// Roughly 40 bytes per row
Text: Array.from(Array(currentNumRows), (_, n) => `Hello, world, again for the ${i + n}th time.`),
});
}
return {docId};
}
// Get all the info for how to create a GristWSConnection, and returns a connection-creating
// function.
async function prepareGristWSConnection(docId: string): Promise<() => GristWSConnection> {
// Use cookies for access to stay as close as possible to regular operation.
const resp = await fetch(`${home.serverUrl}/test/session`);
const sid = cookie.parse(resp.headers.get('set-cookie')).grist_sid;
if (!sid) { throw new Error('no session available'); }
await home.testingHooks.setLoginSessionProfile(sid, {name: userName, email}, org);
// Load the document html.
const pageUrl = `${home.serverUrl}/o/docs/doc/${docId}`;
const headers = {Cookie: `grist_sid=${sid}`};
const doc = await fetch(pageUrl, {headers});
const pageBody = await doc.text();
// Pull out the configuration object embedded in the html.
const gristConfig = getGristConfig(pageBody);
const {assignmentId, getWorker, homeUrl} = gristConfig;
if (!homeUrl) { throw new Error('no homeUrl'); }
if (!assignmentId) { throw new Error('no assignmentId'); }
const docWorkerUrl = getWorker && getWorker[assignmentId];
if (!docWorkerUrl) { throw new Error('no docWorkerUrl'); }
// Place the config object in window.gristConfig as if we were a
// real browser client. GristWSConnection expects to find it there.
globalThis.window = globalThis.window || {};
(globalThis.window as any).gristConfig = gristConfig;
// We return a function that constructs a GristWSConnection.
return function createConnectionFunc() {
let clientId: string = '0';
return GristWSConnection.create(null, {
makeWebSocket(url: string): any { return new WebSocket(url, undefined, { headers }); },
getTimezone() { return Promise.resolve('UTC'); },
getPageUrl() { return pageUrl; },
getDocWorkerUrl() { return Promise.resolve(docWorkerUrl); },
getClientId(did) { return clientId; },
getUserSelector() { return ''; },
updateClientId(did: string, cid: string) { clientId = cid; },
advanceCounter(): string { return '0'; },
log(msg, ...args) {},
warn(msg, ...args) {},
});
};
}
// Actually connect GristWSConnection, open the doc, and return a few methods for next steps.
async function connect(connection: GristWSConnection, docId: string) {
async function getMessage<T>(eventType: string, filter: (msg: T) => boolean): Promise<T> {
return new Promise<T>(resolve => {
function callback(msg: T) {
if (filter(msg)) { connection.off(eventType, callback); resolve(msg); }
}
connection.on(eventType, callback);
});
}
// Launch the websocket
const connectionPromise = getMessage('connectState', (isConnected: boolean) => isConnected);
connection.initialize(null);
await connectionPromise; // Wait for connection to succeed.
const openPromise = getMessage('serverMessage', ({reqId}: {reqId?: number}) => (reqId === 0));
connection.send(JSON.stringify({reqId: 0, method: 'openDoc', args: [docId]}));
await openPromise;
let fetchPromise: Promise<TableFetchResult>;
return {
startPausedFetch: () => {
fetchPromise = getMessage<any>('serverMessage', ({reqId}: {reqId?: number}) => (reqId === 1));
(connection as any)._ws.pause();
connection.send(JSON.stringify({reqId: 1, method: 'fetchTable', args: [0, 'TestTable']}));
},
completeFetch: async (): Promise<TableFetchResult> => {
(connection as any)._ws.resume();
return (await fetchPromise as any).data;
},
end: () => {
connection.dispose();
},
};
}
});