diff --git a/app/common/KeyedMutex.ts b/app/common/KeyedMutex.ts index 136b3fd1..c0d66acc 100644 --- a/app/common/KeyedMutex.ts +++ b/app/common/KeyedMutex.ts @@ -12,7 +12,7 @@ export class KeyedMutex { if (!this._mutexes.has(key)) { this._mutexes.set(key, new Mutex()); } - const mutex = this._mutexes.get(key)! + const mutex = this._mutexes.get(key)!; const unlock = await mutex.acquire(); return () => { unlock(); @@ -27,7 +27,7 @@ export class KeyedMutex { public async runExclusive(key: string, callback: MutexInterface.Worker): Promise { const unlock = await this.acquire(key); try { - return callback(); + return await callback(); } finally { unlock(); } diff --git a/app/server/lib/DocSnapshots.ts b/app/server/lib/DocSnapshots.ts index ca909ec0..6523b6fd 100644 --- a/app/server/lib/DocSnapshots.ts +++ b/app/server/lib/DocSnapshots.ts @@ -196,9 +196,9 @@ export class DocSnapshotInventory implements IInventory { log.error(`Surprise in getSnapshots, expected ${expectSnapshotId} for ${key} ` + `but got ${data[0]?.snapshotId}`); } - // Reconstructed data is precious. Save it to S3 and local cache. + // Reconstructed data is precious. Make sure it gets saved. await this._saveToFile(fname, data); - await this._meta.upload(key, fname); + this._needFlush.add(key); } } return data; diff --git a/app/server/lib/ExternalStorage.ts b/app/server/lib/ExternalStorage.ts index 21b51472..515e89c3 100644 --- a/app/server/lib/ExternalStorage.ts +++ b/app/server/lib/ExternalStorage.ts @@ -30,7 +30,8 @@ export interface ExternalStorage { // Download content from key to given file. Can download a specific version of the key // if store supports that (should throw a fatal exception if not). - download(key: string, fname: string, snapshotId?: string): Promise; + // Returns snapshotId of version downloaded. + download(key: string, fname: string, snapshotId?: string): Promise; // Remove content for this key from the store, if it exists. Can delete specific versions // if specified. If no version specified, all versions are removed. If versions specified, @@ -132,7 +133,7 @@ export class KeyMappedExternalStorage implements ExternalStorage { export class ChecksummedExternalStorage implements ExternalStorage { private _closed: boolean = false; - constructor(private _ext: ExternalStorage, private _options: { + constructor(readonly label: string, private _ext: ExternalStorage, private _options: { maxRetries: number, // how many time to retry inconsistent downloads initialDelayMs: number, // how long to wait before retrying localHash: PropStorage, // key/value store for hashes of downloaded content @@ -158,17 +159,19 @@ export class ChecksummedExternalStorage implements ExternalStorage { const prevChecksum = await this._options.localHash.load(key); if (prevChecksum && prevChecksum === checksum && !metadata?.label) { // nothing to do, checksums match - log.info("ext upload: %s unchanged, not sending", key); - return this._options.latestVersion.load(key); + const snapshotId = await this._options.latestVersion.load(key); + log.info("ext %s upload: %s unchanged, not sending (checksum %s, version %s)", this.label, key, + checksum, snapshotId); + return snapshotId; } const snapshotId = await this._ext.upload(key, fname, metadata); - log.info("ext upload: %s checksum %s", this._ext.url(key), checksum); + log.info("ext %s upload: %s checksum %s version %s", this.label, this._ext.url(key), checksum, snapshotId); if (snapshotId) { await this._options.latestVersion.save(key, snapshotId); } await this._options.localHash.save(key, checksum); await this._options.sharedHash.save(key, checksum); return snapshotId; } catch (err) { - log.error("ext upload: %s failure to send, error %s", key, err.message); + log.error("ext %s upload: %s failure to send, error %s", this.label, key, err.message); throw err; } } @@ -181,7 +184,7 @@ export class ChecksummedExternalStorage implements ExternalStorage { throw new Error('cannot remove most recent version of a document by id'); } await this._ext.remove(key, snapshotIds); - log.info("ext remove: %s version %s", this._ext.url(key), snapshotIds || 'ALL'); + log.info("ext %s remove: %s version %s", this.label, this._ext.url(key), snapshotIds || 'ALL'); if (!snapshotIds) { await this._options.latestVersion.save(key, DELETED_TOKEN); await this._options.sharedHash.save(key, DELETED_TOKEN); @@ -192,7 +195,7 @@ export class ChecksummedExternalStorage implements ExternalStorage { await this._options.sharedHash.save(this._keyWithSnapshot(key, snapshotId), DELETED_TOKEN); } } catch (err) { - log.error("ext delete: %s failure to remove, error %s", key, err.message); + log.error("ext %s delete: %s failure to remove, error %s", this.label, key, err.message); throw err; } } @@ -208,11 +211,11 @@ export class ChecksummedExternalStorage implements ExternalStorage { * doing that. So we add a downloadTo variant that takes before and after keys. */ public async downloadTo(fromKey: string, toKey: string, fname: string, snapshotId?: string) { - await this._retry('download', async () => { + return this._retry('download', async () => { const {tmpDir, cleanupCallback} = await createTmpDir({}); const tmpPath = path.join(tmpDir, `${toKey}-tmp`); // NOTE: assumes key is file-system safe. try { - await this._ext.download(fromKey, tmpPath, snapshotId); + const downloadedSnapshotId = await this._ext.download(fromKey, tmpPath, snapshotId); const checksum = await this._options.computeFileHash(tmpPath); @@ -225,9 +228,8 @@ export class ChecksummedExternalStorage implements ExternalStorage { // you may get an old version for some time. // If a snapshotId was specified, we can skip this check. if (expectedChecksum && expectedChecksum !== checksum) { - log.error("ext download: data for %s has wrong checksum: %s (expected %s)", fromKey, - checksum, - expectedChecksum); + log.error("ext %s download: data for %s has wrong checksum: %s (expected %s)", + this.label, fromKey, checksum, expectedChecksum); return undefined; } } @@ -235,16 +237,19 @@ export class ChecksummedExternalStorage implements ExternalStorage { // If successful, rename the temporary file to its proper name. The destination should NOT // exist in this case, and this should fail if it does. await fse.move(tmpPath, fname, {overwrite: false}); + if (fromKey === toKey) { + await this._options.latestVersion.save(toKey, downloadedSnapshotId); + } await this._options.localHash.save(toKey, checksum); - log.info("ext download: %s%s%s with checksum %s", fromKey, + log.info("ext %s download: %s%s%s with checksum %s and version %s", this.label, fromKey, snapshotId ? ` [VersionId ${snapshotId}]` : '', fromKey !== toKey ? ` as ${toKey}` : '', - checksum); + checksum, downloadedSnapshotId); - return true; + return downloadedSnapshotId; } catch (err) { - log.error("ext download: failed to fetch data (%s): %s", fromKey, err.message); + log.error("ext %s download: failed to fetch data (%s): %s", this.label, fromKey, err.message); throw err; } finally { await cleanupCallback(); diff --git a/app/server/lib/HostedStorageManager.ts b/app/server/lib/HostedStorageManager.ts index e111e8df..c2873b22 100644 --- a/app/server/lib/HostedStorageManager.ts +++ b/app/server/lib/HostedStorageManager.ts @@ -112,7 +112,7 @@ export class HostedStorageManager implements IDocStorageManager { private _baseStore: ExternalStorage; // External store for documents, without checksumming. - // Latest checksums of documents. + // Latest version ids of documents. private _latestVersions = new Map(); /** @@ -418,6 +418,10 @@ export class HostedStorageManager implements IDocStorageManager { while (!this.isSaved(docName)) { log.info('HostedStorageManager: waiting for document to finish: %s', docName); await this._uploads.expediteOperationAndWait(docName); + if (!this.isSaved(docName)) { + // Throttle slightly in case this operation ends up looping excessively. + await delay(1000); + } } } @@ -688,7 +692,7 @@ export class HostedStorageManager implements IDocStorageManager { private _getChecksummedExternalStorage(family: string, core: ExternalStorage, versions: Map, options: HostedStorageOptions) { - return new ChecksummedExternalStorage(core, { + return new ChecksummedExternalStorage(family, core, { maxRetries: 4, initialDelayMs: options.secondsBeforeFirstRetry * 1000, computeFileHash: this._getHash.bind(this),