diff --git a/app/gen-server/lib/DocWorkerMap.ts b/app/gen-server/lib/DocWorkerMap.ts index 93105e0c..b39d943a 100644 --- a/app/gen-server/lib/DocWorkerMap.ts +++ b/app/gen-server/lib/DocWorkerMap.ts @@ -308,26 +308,8 @@ export class DocWorkerMap implements IDocWorkerMap { * refused and need to retry. */ public async getDocWorker(docId: string): Promise { - // Fetch the various elements that go into making a DocStatus - const props = await this._client.multi() - .hgetall(`doc-${docId}`) - .get(`doc-${docId}-checksum`) - .execAsync() as [{[key: string]: any}|null, string|null]|null; - if (!props) { return null; } - - // If there is no worker, return null. An alternative would be to modify - // DocStatus so that it is possible for it to not have a worker assignment. - if (!props[0]) { return null; } - - // Fields are JSON encoded since redis cannot store them directly. - const doc = mapValues(props[0], (val) => JSON.parse(val)); - - // Redis cannot store a null value, so we encode it as 'null', which does - // not match any possible MD5. - doc.docMD5 = props[1] === 'null' ? null : props[1]; - - // Ok, we have a valid DocStatus at this point. - return doc as DocStatus; + const {doc} = await this._getDocAndChecksum(docId); + return doc; } /** @@ -377,7 +359,8 @@ export class DocWorkerMap implements IDocWorkerMap { try { // Now that we've locked, recheck that the worker hasn't been reassigned // in the meantime. Return immediately if it has. - docStatus = await this.getDocWorker(docId); + const docAndChecksum = await this._getDocAndChecksum(docId); + docStatus = docAndChecksum.doc; if (docStatus) { return docStatus; } if (!workerId) { @@ -408,8 +391,9 @@ export class DocWorkerMap implements IDocWorkerMap { const docWorker = await this._client.hgetallAsync(`worker-${workerId}`) as DocWorkerInfo|null; if (!docWorker) { throw new Error('no doc worker contact info available'); } - // We can now construct a DocStatus. - const newDocStatus = {docMD5: null, docWorker, isActive: true}; + // We can now construct a DocStatus, preserving any existing checksum. + const checksum = docAndChecksum.checksum; + const newDocStatus = {docMD5: checksum, docWorker, isActive: true}; // We add the assignment to worker-{workerId}-docs and save doc-{docId}. const result = await this._client.multi() @@ -418,7 +402,7 @@ export class DocWorkerMap implements IDocWorkerMap { docWorker: JSON.stringify(docWorker), // redis can't store nested objects, strings only isActive: JSON.stringify(true) // redis can't store booleans, strings only }) - .setex(`doc-${docId}-checksum`, CHECKSUM_TTL_MSEC / 1000.0, 'null') + .setex(`doc-${docId}-checksum`, CHECKSUM_TTL_MSEC / 1000.0, checksum || 'null') .execAsync(); if (!result) { throw new Error('failed to store new assignment'); } return newDocStatus; @@ -522,6 +506,28 @@ export class DocWorkerMap implements IDocWorkerMap { public async getDocGroup(docId: string): Promise { return this._client.getAsync(`doc-${docId}-group`); } + + /** + * Fetch the doc- hash and doc--checksum key from redis. + * Return as a decoded DocStatus and a checksum. + */ + private async _getDocAndChecksum(docId: string): Promise<{ + doc: DocStatus|null, + checksum: string|null, + }> { + // Fetch the various elements that go into making a DocStatus + const props = await this._client.multi() + .hgetall(`doc-${docId}`) + .get(`doc-${docId}-checksum`) + .execAsync() as [{[key: string]: any}|null, string|null]|null; + // Fields are JSON encoded since redis cannot store them directly. + const doc = props?.[0] ? mapValues(props[0], (val) => JSON.parse(val)) as DocStatus : null; + // Redis cannot store a null value, so we encode it as 'null', which does + // not match any possible MD5. + const checksum = (props?.[1] === 'null' ? null : props?.[1]) || null; + if (doc) { doc.docMD5 = checksum; } // the checksum goes in the DocStatus too. + return {doc, checksum}; + } } // If we don't have redis available and use a DummyDocWorker, it should be a singleton. diff --git a/app/server/lib/DocManager.ts b/app/server/lib/DocManager.ts index e4efd3b4..a11846d8 100644 --- a/app/server/lib/DocManager.ts +++ b/app/server/lib/DocManager.ts @@ -530,7 +530,13 @@ export class DocManager extends EventEmitter { const srcDocPath = uploadInfo.files[0].absPath; await checkAllegedGristDoc(docSession, srcDocPath); await docUtils.copyFile(srcDocPath, docPath); - await this.storageManager.addToStorage(docName); + // Go ahead and claim this document. If we wanted to serve it + // from a potentially different worker, we'd call addToStorage(docName) + // instead (we used to do this). The upload should already be happening + // on a randomly assigned worker due to the special treatment of the + // 'import' assignmentId. + await this.storageManager.prepareLocalDoc(docName); + this.storageManager.markAsChanged(docName, 'edit'); return {title: basename, id: docName}; } else { const doc = await this.createNewEmptyDoc(docSession, id); diff --git a/app/server/lib/DocSnapshots.ts b/app/server/lib/DocSnapshots.ts index ec0344af..5aeacf83 100644 --- a/app/server/lib/DocSnapshots.ts +++ b/app/server/lib/DocSnapshots.ts @@ -163,6 +163,18 @@ export class DocSnapshotInventory implements IInventory { }); } + /** + * Wipe local cached state of the inventory. + */ + public async clear(key: string) { + await this._mutex.runExclusive(key, async() => { + await this._flush(key); + const fname = await this._getFilename(key); + // NOTE: fse.remove succeeds also when the file does not exist. + await fse.remove(fname); + }); + } + /** * Remove a set of snapshots from the inventory, and then flush to S3. */ diff --git a/app/server/lib/FlexServer.ts b/app/server/lib/FlexServer.ts index 5913a1d2..fc82e852 100644 --- a/app/server/lib/FlexServer.ts +++ b/app/server/lib/FlexServer.ts @@ -158,9 +158,14 @@ export class FlexServer implements GristServer { log.info(`== Grist version is ${version.version} (commit ${version.gitcommit})`); this.info.push(['appRoot', this.appRoot]); // This directory hold Grist documents. - const docsRoot = path.resolve((this.options && this.options.dataDir) || + let docsRoot = path.resolve((this.options && this.options.dataDir) || process.env.GRIST_DATA_DIR || getAppPathTo(this.appRoot, 'samples')); + // In testing, it can be useful to separate out document roots used + // by distinct FlexServers. + if (process.env.GRIST_TEST_ADD_PORT_TO_DOCS_ROOT === 'true') { + docsRoot = path.resolve(docsRoot, String(port)); + } // Create directory if it doesn't exist. // TODO: track down all dependencies on 'samples' existing in tests and // in dev environment, and remove them. Then it would probably be best @@ -1308,6 +1313,16 @@ export class FlexServer implements GristServer { return this.tag; } + /** + * Make sure external storage of all docs is up to date. + */ + public async testFlushDocs() { + const assignments = await this._docWorkerMap.getAssignments(this.worker.id); + for (const assignment of assignments) { + await this._storageManager.flushDoc(assignment); + } + } + // Adds endpoints that support imports and exports. private _addSupportPaths(docAccessMiddleware: express.RequestHandler[]) { if (!this._docWorker) { throw new Error("need DocWorker"); } diff --git a/app/server/lib/HostedStorageManager.ts b/app/server/lib/HostedStorageManager.ts index 1aa24313..2f3e3f88 100644 --- a/app/server/lib/HostedStorageManager.ts +++ b/app/server/lib/HostedStorageManager.ts @@ -112,6 +112,7 @@ export class HostedStorageManager implements IDocStorageManager { // Latest version ids of documents. private _latestVersions = new Map(); + private _latestMetaVersions = new Map(); private _log = new LogMethods('HostedStorageManager ', (docId: string|null) => ({docId})); @@ -158,7 +159,7 @@ export class HostedStorageManager implements IDocStorageManager { throw new Error('bug: external storage should be created for "meta" if it is created for "doc"'); } this._extMeta = this._getChecksummedExternalStorage('meta', baseStoreMeta, - new Map(), + this._latestMetaVersions, options); this._inventory = new DocSnapshotInventory(this._ext, this._extMeta, @@ -570,9 +571,18 @@ export class HostedStorageManager implements IDocStorageManager { const existsLocally = await fse.pathExists(this.getPath(docName)); if (existsLocally) { if (!docStatus.docMD5 || docStatus.docMD5 === DELETED_TOKEN) { - // New doc appears to already exist, but not in S3 (according to redis). - // Go ahead and use local version. - return true; + // New doc appears to already exist, but may not exist in S3. + // Let's check. + const head = await this._ext.head(docName); + const lastLocalVersionSeen = this._latestVersions.get(docName); + if (head && lastLocalVersionSeen !== head.snapshotId) { + // Exists in S3, with a version not known to be latest seen + // by this worker - so wipe local version and defer to S3. + await this._wipeCache(docName); + } else { + // Go ahead and use local version. + return true; + } } else { // Doc exists locally and in S3 (according to redis). // Make sure the checksum matches. @@ -586,8 +596,7 @@ export class HostedStorageManager implements IDocStorageManager { // On the assumption that the local file is outdated, delete it. // TODO: may want to be more careful in case the local file has modifications that // simply never made it to S3 due to some kind of breakage. - // NOTE: fse.remove succeeds also when the file does not exist. - await fse.remove(this.getPath(docName)); + await this._wipeCache(docName); } } } @@ -598,6 +607,19 @@ export class HostedStorageManager implements IDocStorageManager { }); } + /** + * Remove local version of a document, and state related to it. + */ + private async _wipeCache(docName: string) { + // NOTE: fse.remove succeeds also when the file does not exist. + await fse.remove(this.getPath(docName)); + await fse.remove(this._getHashFile(this.getPath(docName), 'doc')); + await fse.remove(this._getHashFile(this.getPath(docName), 'meta')); + await this._inventory.clear(docName); + this._latestVersions.delete(docName); + this._latestMetaVersions.delete(docName); + } + /** * Fetch a document from s3 and save it locally as destId.grist * diff --git a/app/server/lib/ITestingHooks-ti.ts b/app/server/lib/ITestingHooks-ti.ts index 3e1ec0d4..951c6cb9 100644 --- a/app/server/lib/ITestingHooks-ti.ts +++ b/app/server/lib/ITestingHooks-ti.ts @@ -16,6 +16,7 @@ export const ITestingHooks = t.iface([], { "closeDocs": t.func("void"), "setDocWorkerActivation": t.func("void", t.param("workerId", "string"), t.param("active", t.union(t.lit('active'), t.lit('inactive'), t.lit('crash')))), "flushAuthorizerCache": t.func("void"), + "flushDocs": t.func("void"), "getDocClientCounts": t.func(t.array(t.tuple("string", "number"))), "setActiveDocTimeout": t.func("number", t.param("seconds", "number")), "setDiscourseConnectVar": t.func(t.union("string", "null"), t.param("varName", "string"), t.param("value", t.union("string", "null"))), diff --git a/app/server/lib/ITestingHooks.ts b/app/server/lib/ITestingHooks.ts index 05ca8679..ad93a3b4 100644 --- a/app/server/lib/ITestingHooks.ts +++ b/app/server/lib/ITestingHooks.ts @@ -12,6 +12,7 @@ export interface ITestingHooks { closeDocs(): Promise; setDocWorkerActivation(workerId: string, active: 'active'|'inactive'|'crash'): Promise; flushAuthorizerCache(): Promise; + flushDocs(): Promise; getDocClientCounts(): Promise>; setActiveDocTimeout(seconds: number): Promise; setDiscourseConnectVar(varName: string, value: string|null): Promise; diff --git a/app/server/lib/TestingHooks.ts b/app/server/lib/TestingHooks.ts index 473a2d1d..5fc066c4 100644 --- a/app/server/lib/TestingHooks.ts +++ b/app/server/lib/TestingHooks.ts @@ -137,23 +137,27 @@ export class TestingHooks implements ITestingHooks { public async setDocWorkerActivation(workerId: string, active: 'active'|'inactive'|'crash'): Promise { log.info("TestingHooks.setDocWorkerActivation called with", workerId, active); - for (const server of this._workerServers) { - if (server.worker.id === workerId || server.worker.publicUrl === workerId) { - switch (active) { - case 'active': - await server.restartListening(); - break; - case 'inactive': - await server.stopListening(); - break; - case 'crash': - await server.stopListening('crash'); - break; - } - return; - } + const matches = this._workerServers.filter( + server => server.worker.id === workerId || + server.worker.publicUrl === workerId || + (server.worker.publicUrl.startsWith('http://localhost:') && + workerId.startsWith('http://localhost:') && + new URL(server.worker.publicUrl).host === new URL(workerId).host)); + if (matches.length !== 1) { + throw new Error(`could not find worker: ${workerId}`); + } + const server = matches[0]; + switch (active) { + case 'active': + await server.restartListening(); + break; + case 'inactive': + await server.stopListening(); + break; + case 'crash': + await server.stopListening('crash'); + break; } - throw new Error(`could not find worker: ${workerId}`); } public async flushAuthorizerCache(): Promise { @@ -164,6 +168,13 @@ export class TestingHooks implements ITestingHooks { } } + public async flushDocs(): Promise { + log.info("TestingHooks.flushDocs called"); + for (const server of this._workerServers) { + await server.testFlushDocs(); + } + } + // Returns a Map from docId to number of connected clients for all open docs across servers, // but represented as an array of pairs, to be serializable. public async getDocClientCounts(): Promise> { diff --git a/test/nbrowser/gristUtils.ts b/test/nbrowser/gristUtils.ts index 66318083..402ed231 100644 --- a/test/nbrowser/gristUtils.ts +++ b/test/nbrowser/gristUtils.ts @@ -151,6 +151,21 @@ export async function testCurrentUrl(pattern: RegExp|string) { return (typeof pattern === 'string') ? url.includes(pattern) : pattern.test(url); } +export async function getDocWorkerUrls(): Promise { + const result = await driver.wait(() => driver.executeScript(` + return Array.from(window.gristApp.comm.listConnections().values()); + `), 1000) as string[]; + return result; +} + +export async function getDocWorkerUrl(): Promise { + const urls = await getDocWorkerUrls(); + if (urls.length > 1) { + throw new Error(`Expected a single docWorker URL, received ${urls}`); + } + return urls[0] || ''; +} + export async function waitForUrl(pattern: RegExp|string, waitMs: number = 2000) { await driver.wait(() => testCurrentUrl(pattern), waitMs); }