(core) be careful when reassigning a doc to a worker it was on before

Summary:
Importing a .grist document is implemented in a somewhat clunky way, in a multi-worker setup.

 * First a random worker receives the upload, and updates Grist's various stores appropriately (database, redis, s3).
 * Then a random worker is assigned to serve the document.

If the worker serving the document fails, there is a chance the it will end up assigned to the worker that handled its upload. Currently the worker will misbehave in this case. This diff:

 * Ports a multi-worker test from test/home to run in test/s3, and adds a test simulating a bad scenario seen in the wild.
 * Fixes persistence of any existing document checksum in redis when a worker is assigned.
 * Adds a check when assigned a document to serve, and finding that document already cached locally. It isn't safe to rely only on the document checksum in redis, since that may have expired.
 * Explicitly claims the document on the uploading worker, so this situation becomes even less likely to arise.

Test Plan: added test

Reviewers: dsagal

Reviewed By: dsagal

Subscribers: dsagal

Differential Revision: https://phab.getgrist.com/D3305
This commit is contained in:
Paul Fitzpatrick 2022-03-07 09:27:43 -05:00
parent 321019217d
commit c4d3d7d3bb
9 changed files with 137 additions and 48 deletions

View File

@ -308,26 +308,8 @@ export class DocWorkerMap implements IDocWorkerMap {
* refused and need to retry. * refused and need to retry.
*/ */
public async getDocWorker(docId: string): Promise<DocStatus|null> { public async getDocWorker(docId: string): Promise<DocStatus|null> {
// Fetch the various elements that go into making a DocStatus const {doc} = await this._getDocAndChecksum(docId);
const props = await this._client.multi() return doc;
.hgetall(`doc-${docId}`)
.get(`doc-${docId}-checksum`)
.execAsync() as [{[key: string]: any}|null, string|null]|null;
if (!props) { return null; }
// If there is no worker, return null. An alternative would be to modify
// DocStatus so that it is possible for it to not have a worker assignment.
if (!props[0]) { return null; }
// Fields are JSON encoded since redis cannot store them directly.
const doc = mapValues(props[0], (val) => JSON.parse(val));
// Redis cannot store a null value, so we encode it as 'null', which does
// not match any possible MD5.
doc.docMD5 = props[1] === 'null' ? null : props[1];
// Ok, we have a valid DocStatus at this point.
return doc as DocStatus;
} }
/** /**
@ -377,7 +359,8 @@ export class DocWorkerMap implements IDocWorkerMap {
try { try {
// Now that we've locked, recheck that the worker hasn't been reassigned // Now that we've locked, recheck that the worker hasn't been reassigned
// in the meantime. Return immediately if it has. // in the meantime. Return immediately if it has.
docStatus = await this.getDocWorker(docId); const docAndChecksum = await this._getDocAndChecksum(docId);
docStatus = docAndChecksum.doc;
if (docStatus) { return docStatus; } if (docStatus) { return docStatus; }
if (!workerId) { if (!workerId) {
@ -408,8 +391,9 @@ export class DocWorkerMap implements IDocWorkerMap {
const docWorker = await this._client.hgetallAsync(`worker-${workerId}`) as DocWorkerInfo|null; const docWorker = await this._client.hgetallAsync(`worker-${workerId}`) as DocWorkerInfo|null;
if (!docWorker) { throw new Error('no doc worker contact info available'); } if (!docWorker) { throw new Error('no doc worker contact info available'); }
// We can now construct a DocStatus. // We can now construct a DocStatus, preserving any existing checksum.
const newDocStatus = {docMD5: null, docWorker, isActive: true}; const checksum = docAndChecksum.checksum;
const newDocStatus = {docMD5: checksum, docWorker, isActive: true};
// We add the assignment to worker-{workerId}-docs and save doc-{docId}. // We add the assignment to worker-{workerId}-docs and save doc-{docId}.
const result = await this._client.multi() const result = await this._client.multi()
@ -418,7 +402,7 @@ export class DocWorkerMap implements IDocWorkerMap {
docWorker: JSON.stringify(docWorker), // redis can't store nested objects, strings only docWorker: JSON.stringify(docWorker), // redis can't store nested objects, strings only
isActive: JSON.stringify(true) // redis can't store booleans, strings only isActive: JSON.stringify(true) // redis can't store booleans, strings only
}) })
.setex(`doc-${docId}-checksum`, CHECKSUM_TTL_MSEC / 1000.0, 'null') .setex(`doc-${docId}-checksum`, CHECKSUM_TTL_MSEC / 1000.0, checksum || 'null')
.execAsync(); .execAsync();
if (!result) { throw new Error('failed to store new assignment'); } if (!result) { throw new Error('failed to store new assignment'); }
return newDocStatus; return newDocStatus;
@ -522,6 +506,28 @@ export class DocWorkerMap implements IDocWorkerMap {
public async getDocGroup(docId: string): Promise<string|null> { public async getDocGroup(docId: string): Promise<string|null> {
return this._client.getAsync(`doc-${docId}-group`); return this._client.getAsync(`doc-${docId}-group`);
} }
/**
* Fetch the doc-<docId> hash and doc-<docId>-checksum key from redis.
* Return as a decoded DocStatus and a checksum.
*/
private async _getDocAndChecksum(docId: string): Promise<{
doc: DocStatus|null,
checksum: string|null,
}> {
// Fetch the various elements that go into making a DocStatus
const props = await this._client.multi()
.hgetall(`doc-${docId}`)
.get(`doc-${docId}-checksum`)
.execAsync() as [{[key: string]: any}|null, string|null]|null;
// Fields are JSON encoded since redis cannot store them directly.
const doc = props?.[0] ? mapValues(props[0], (val) => JSON.parse(val)) as DocStatus : null;
// Redis cannot store a null value, so we encode it as 'null', which does
// not match any possible MD5.
const checksum = (props?.[1] === 'null' ? null : props?.[1]) || null;
if (doc) { doc.docMD5 = checksum; } // the checksum goes in the DocStatus too.
return {doc, checksum};
}
} }
// If we don't have redis available and use a DummyDocWorker, it should be a singleton. // If we don't have redis available and use a DummyDocWorker, it should be a singleton.

View File

@ -530,7 +530,13 @@ export class DocManager extends EventEmitter {
const srcDocPath = uploadInfo.files[0].absPath; const srcDocPath = uploadInfo.files[0].absPath;
await checkAllegedGristDoc(docSession, srcDocPath); await checkAllegedGristDoc(docSession, srcDocPath);
await docUtils.copyFile(srcDocPath, docPath); await docUtils.copyFile(srcDocPath, docPath);
await this.storageManager.addToStorage(docName); // Go ahead and claim this document. If we wanted to serve it
// from a potentially different worker, we'd call addToStorage(docName)
// instead (we used to do this). The upload should already be happening
// on a randomly assigned worker due to the special treatment of the
// 'import' assignmentId.
await this.storageManager.prepareLocalDoc(docName);
this.storageManager.markAsChanged(docName, 'edit');
return {title: basename, id: docName}; return {title: basename, id: docName};
} else { } else {
const doc = await this.createNewEmptyDoc(docSession, id); const doc = await this.createNewEmptyDoc(docSession, id);

View File

@ -163,6 +163,18 @@ export class DocSnapshotInventory implements IInventory {
}); });
} }
/**
* Wipe local cached state of the inventory.
*/
public async clear(key: string) {
await this._mutex.runExclusive(key, async() => {
await this._flush(key);
const fname = await this._getFilename(key);
// NOTE: fse.remove succeeds also when the file does not exist.
await fse.remove(fname);
});
}
/** /**
* Remove a set of snapshots from the inventory, and then flush to S3. * Remove a set of snapshots from the inventory, and then flush to S3.
*/ */

View File

@ -158,9 +158,14 @@ export class FlexServer implements GristServer {
log.info(`== Grist version is ${version.version} (commit ${version.gitcommit})`); log.info(`== Grist version is ${version.version} (commit ${version.gitcommit})`);
this.info.push(['appRoot', this.appRoot]); this.info.push(['appRoot', this.appRoot]);
// This directory hold Grist documents. // This directory hold Grist documents.
const docsRoot = path.resolve((this.options && this.options.dataDir) || let docsRoot = path.resolve((this.options && this.options.dataDir) ||
process.env.GRIST_DATA_DIR || process.env.GRIST_DATA_DIR ||
getAppPathTo(this.appRoot, 'samples')); getAppPathTo(this.appRoot, 'samples'));
// In testing, it can be useful to separate out document roots used
// by distinct FlexServers.
if (process.env.GRIST_TEST_ADD_PORT_TO_DOCS_ROOT === 'true') {
docsRoot = path.resolve(docsRoot, String(port));
}
// Create directory if it doesn't exist. // Create directory if it doesn't exist.
// TODO: track down all dependencies on 'samples' existing in tests and // TODO: track down all dependencies on 'samples' existing in tests and
// in dev environment, and remove them. Then it would probably be best // in dev environment, and remove them. Then it would probably be best
@ -1308,6 +1313,16 @@ export class FlexServer implements GristServer {
return this.tag; return this.tag;
} }
/**
* Make sure external storage of all docs is up to date.
*/
public async testFlushDocs() {
const assignments = await this._docWorkerMap.getAssignments(this.worker.id);
for (const assignment of assignments) {
await this._storageManager.flushDoc(assignment);
}
}
// Adds endpoints that support imports and exports. // Adds endpoints that support imports and exports.
private _addSupportPaths(docAccessMiddleware: express.RequestHandler[]) { private _addSupportPaths(docAccessMiddleware: express.RequestHandler[]) {
if (!this._docWorker) { throw new Error("need DocWorker"); } if (!this._docWorker) { throw new Error("need DocWorker"); }

View File

@ -112,6 +112,7 @@ export class HostedStorageManager implements IDocStorageManager {
// Latest version ids of documents. // Latest version ids of documents.
private _latestVersions = new Map<string, string>(); private _latestVersions = new Map<string, string>();
private _latestMetaVersions = new Map<string, string>();
private _log = new LogMethods('HostedStorageManager ', (docId: string|null) => ({docId})); private _log = new LogMethods('HostedStorageManager ', (docId: string|null) => ({docId}));
@ -158,7 +159,7 @@ export class HostedStorageManager implements IDocStorageManager {
throw new Error('bug: external storage should be created for "meta" if it is created for "doc"'); throw new Error('bug: external storage should be created for "meta" if it is created for "doc"');
} }
this._extMeta = this._getChecksummedExternalStorage('meta', baseStoreMeta, this._extMeta = this._getChecksummedExternalStorage('meta', baseStoreMeta,
new Map(), this._latestMetaVersions,
options); options);
this._inventory = new DocSnapshotInventory(this._ext, this._extMeta, this._inventory = new DocSnapshotInventory(this._ext, this._extMeta,
@ -570,9 +571,18 @@ export class HostedStorageManager implements IDocStorageManager {
const existsLocally = await fse.pathExists(this.getPath(docName)); const existsLocally = await fse.pathExists(this.getPath(docName));
if (existsLocally) { if (existsLocally) {
if (!docStatus.docMD5 || docStatus.docMD5 === DELETED_TOKEN) { if (!docStatus.docMD5 || docStatus.docMD5 === DELETED_TOKEN) {
// New doc appears to already exist, but not in S3 (according to redis). // New doc appears to already exist, but may not exist in S3.
// Go ahead and use local version. // Let's check.
return true; const head = await this._ext.head(docName);
const lastLocalVersionSeen = this._latestVersions.get(docName);
if (head && lastLocalVersionSeen !== head.snapshotId) {
// Exists in S3, with a version not known to be latest seen
// by this worker - so wipe local version and defer to S3.
await this._wipeCache(docName);
} else {
// Go ahead and use local version.
return true;
}
} else { } else {
// Doc exists locally and in S3 (according to redis). // Doc exists locally and in S3 (according to redis).
// Make sure the checksum matches. // Make sure the checksum matches.
@ -586,8 +596,7 @@ export class HostedStorageManager implements IDocStorageManager {
// On the assumption that the local file is outdated, delete it. // On the assumption that the local file is outdated, delete it.
// TODO: may want to be more careful in case the local file has modifications that // TODO: may want to be more careful in case the local file has modifications that
// simply never made it to S3 due to some kind of breakage. // simply never made it to S3 due to some kind of breakage.
// NOTE: fse.remove succeeds also when the file does not exist. await this._wipeCache(docName);
await fse.remove(this.getPath(docName));
} }
} }
} }
@ -598,6 +607,19 @@ export class HostedStorageManager implements IDocStorageManager {
}); });
} }
/**
* Remove local version of a document, and state related to it.
*/
private async _wipeCache(docName: string) {
// NOTE: fse.remove succeeds also when the file does not exist.
await fse.remove(this.getPath(docName));
await fse.remove(this._getHashFile(this.getPath(docName), 'doc'));
await fse.remove(this._getHashFile(this.getPath(docName), 'meta'));
await this._inventory.clear(docName);
this._latestVersions.delete(docName);
this._latestMetaVersions.delete(docName);
}
/** /**
* Fetch a document from s3 and save it locally as destId.grist * Fetch a document from s3 and save it locally as destId.grist
* *

View File

@ -16,6 +16,7 @@ export const ITestingHooks = t.iface([], {
"closeDocs": t.func("void"), "closeDocs": t.func("void"),
"setDocWorkerActivation": t.func("void", t.param("workerId", "string"), t.param("active", t.union(t.lit('active'), t.lit('inactive'), t.lit('crash')))), "setDocWorkerActivation": t.func("void", t.param("workerId", "string"), t.param("active", t.union(t.lit('active'), t.lit('inactive'), t.lit('crash')))),
"flushAuthorizerCache": t.func("void"), "flushAuthorizerCache": t.func("void"),
"flushDocs": t.func("void"),
"getDocClientCounts": t.func(t.array(t.tuple("string", "number"))), "getDocClientCounts": t.func(t.array(t.tuple("string", "number"))),
"setActiveDocTimeout": t.func("number", t.param("seconds", "number")), "setActiveDocTimeout": t.func("number", t.param("seconds", "number")),
"setDiscourseConnectVar": t.func(t.union("string", "null"), t.param("varName", "string"), t.param("value", t.union("string", "null"))), "setDiscourseConnectVar": t.func(t.union("string", "null"), t.param("varName", "string"), t.param("value", t.union("string", "null"))),

View File

@ -12,6 +12,7 @@ export interface ITestingHooks {
closeDocs(): Promise<void>; closeDocs(): Promise<void>;
setDocWorkerActivation(workerId: string, active: 'active'|'inactive'|'crash'): Promise<void>; setDocWorkerActivation(workerId: string, active: 'active'|'inactive'|'crash'): Promise<void>;
flushAuthorizerCache(): Promise<void>; flushAuthorizerCache(): Promise<void>;
flushDocs(): Promise<void>;
getDocClientCounts(): Promise<Array<[string, number]>>; getDocClientCounts(): Promise<Array<[string, number]>>;
setActiveDocTimeout(seconds: number): Promise<number>; setActiveDocTimeout(seconds: number): Promise<number>;
setDiscourseConnectVar(varName: string, value: string|null): Promise<string|null>; setDiscourseConnectVar(varName: string, value: string|null): Promise<string|null>;

View File

@ -137,23 +137,27 @@ export class TestingHooks implements ITestingHooks {
public async setDocWorkerActivation(workerId: string, active: 'active'|'inactive'|'crash'): public async setDocWorkerActivation(workerId: string, active: 'active'|'inactive'|'crash'):
Promise<void> { Promise<void> {
log.info("TestingHooks.setDocWorkerActivation called with", workerId, active); log.info("TestingHooks.setDocWorkerActivation called with", workerId, active);
for (const server of this._workerServers) { const matches = this._workerServers.filter(
if (server.worker.id === workerId || server.worker.publicUrl === workerId) { server => server.worker.id === workerId ||
switch (active) { server.worker.publicUrl === workerId ||
case 'active': (server.worker.publicUrl.startsWith('http://localhost:') &&
await server.restartListening(); workerId.startsWith('http://localhost:') &&
break; new URL(server.worker.publicUrl).host === new URL(workerId).host));
case 'inactive': if (matches.length !== 1) {
await server.stopListening(); throw new Error(`could not find worker: ${workerId}`);
break; }
case 'crash': const server = matches[0];
await server.stopListening('crash'); switch (active) {
break; case 'active':
} await server.restartListening();
return; break;
} case 'inactive':
await server.stopListening();
break;
case 'crash':
await server.stopListening('crash');
break;
} }
throw new Error(`could not find worker: ${workerId}`);
} }
public async flushAuthorizerCache(): Promise<void> { public async flushAuthorizerCache(): Promise<void> {
@ -164,6 +168,13 @@ export class TestingHooks implements ITestingHooks {
} }
} }
public async flushDocs(): Promise<void> {
log.info("TestingHooks.flushDocs called");
for (const server of this._workerServers) {
await server.testFlushDocs();
}
}
// Returns a Map from docId to number of connected clients for all open docs across servers, // Returns a Map from docId to number of connected clients for all open docs across servers,
// but represented as an array of pairs, to be serializable. // but represented as an array of pairs, to be serializable.
public async getDocClientCounts(): Promise<Array<[string, number]>> { public async getDocClientCounts(): Promise<Array<[string, number]>> {

View File

@ -151,6 +151,21 @@ export async function testCurrentUrl(pattern: RegExp|string) {
return (typeof pattern === 'string') ? url.includes(pattern) : pattern.test(url); return (typeof pattern === 'string') ? url.includes(pattern) : pattern.test(url);
} }
export async function getDocWorkerUrls(): Promise<string[]> {
const result = await driver.wait(() => driver.executeScript(`
return Array.from(window.gristApp.comm.listConnections().values());
`), 1000) as string[];
return result;
}
export async function getDocWorkerUrl(): Promise<string> {
const urls = await getDocWorkerUrls();
if (urls.length > 1) {
throw new Error(`Expected a single docWorker URL, received ${urls}`);
}
return urls[0] || '';
}
export async function waitForUrl(pattern: RegExp|string, waitMs: number = 2000) { export async function waitForUrl(pattern: RegExp|string, waitMs: number = 2000) {
await driver.wait(() => testCurrentUrl(pattern), waitMs); await driver.wait(() => testCurrentUrl(pattern), waitMs);
} }