(core) fix sync to s3 when doc is marked as dirty but proves to be clean

Summary:
This fixes a two problems:
 * A mistake in `KeyedMutex.runExclusive`.
 * Logic about saving a document to s3 when the document is found to match what is already there.

`HostedStorageManager.flushDoc` could get caught in a loop if a document was uploaded to s3 and then, without any change to it, marked as dirty.  Low level code would detect there was no change and skip the upload; but then the snapshotId could be unknown, causing an error and retries. This diff fixes that problem by discovering the snapshotId on downloads and tracking it. It also corrects a mutex problem that may have been creating the scenario. A small delay is added to `flushDoc` to mitigate the effect of similar problems in future. Exponential backoff would be good, but `flushDoc` is called in some situations where long delays would negatively impact worker shutdown or user work.

Test Plan: added tests

Reviewers: dsagal

Reviewed By: dsagal

Differential Revision: https://phab.getgrist.com/D2654
pull/3/head
Paul Fitzpatrick 4 years ago
parent 6d95418cc1
commit e30d0fd5d0

@ -12,7 +12,7 @@ export class KeyedMutex {
if (!this._mutexes.has(key)) { if (!this._mutexes.has(key)) {
this._mutexes.set(key, new Mutex()); this._mutexes.set(key, new Mutex());
} }
const mutex = this._mutexes.get(key)! const mutex = this._mutexes.get(key)!;
const unlock = await mutex.acquire(); const unlock = await mutex.acquire();
return () => { return () => {
unlock(); unlock();
@ -27,7 +27,7 @@ export class KeyedMutex {
public async runExclusive<T>(key: string, callback: MutexInterface.Worker<T>): Promise<T> { public async runExclusive<T>(key: string, callback: MutexInterface.Worker<T>): Promise<T> {
const unlock = await this.acquire(key); const unlock = await this.acquire(key);
try { try {
return callback(); return await callback();
} finally { } finally {
unlock(); unlock();
} }

@ -196,9 +196,9 @@ export class DocSnapshotInventory implements IInventory {
log.error(`Surprise in getSnapshots, expected ${expectSnapshotId} for ${key} ` + log.error(`Surprise in getSnapshots, expected ${expectSnapshotId} for ${key} ` +
`but got ${data[0]?.snapshotId}`); `but got ${data[0]?.snapshotId}`);
} }
// Reconstructed data is precious. Save it to S3 and local cache. // Reconstructed data is precious. Make sure it gets saved.
await this._saveToFile(fname, data); await this._saveToFile(fname, data);
await this._meta.upload(key, fname); this._needFlush.add(key);
} }
} }
return data; return data;

@ -30,7 +30,8 @@ export interface ExternalStorage {
// Download content from key to given file. Can download a specific version of the key // Download content from key to given file. Can download a specific version of the key
// if store supports that (should throw a fatal exception if not). // if store supports that (should throw a fatal exception if not).
download(key: string, fname: string, snapshotId?: string): Promise<void>; // Returns snapshotId of version downloaded.
download(key: string, fname: string, snapshotId?: string): Promise<string>;
// Remove content for this key from the store, if it exists. Can delete specific versions // Remove content for this key from the store, if it exists. Can delete specific versions
// if specified. If no version specified, all versions are removed. If versions specified, // if specified. If no version specified, all versions are removed. If versions specified,
@ -132,7 +133,7 @@ export class KeyMappedExternalStorage implements ExternalStorage {
export class ChecksummedExternalStorage implements ExternalStorage { export class ChecksummedExternalStorage implements ExternalStorage {
private _closed: boolean = false; private _closed: boolean = false;
constructor(private _ext: ExternalStorage, private _options: { constructor(readonly label: string, private _ext: ExternalStorage, private _options: {
maxRetries: number, // how many time to retry inconsistent downloads maxRetries: number, // how many time to retry inconsistent downloads
initialDelayMs: number, // how long to wait before retrying initialDelayMs: number, // how long to wait before retrying
localHash: PropStorage, // key/value store for hashes of downloaded content localHash: PropStorage, // key/value store for hashes of downloaded content
@ -158,17 +159,19 @@ export class ChecksummedExternalStorage implements ExternalStorage {
const prevChecksum = await this._options.localHash.load(key); const prevChecksum = await this._options.localHash.load(key);
if (prevChecksum && prevChecksum === checksum && !metadata?.label) { if (prevChecksum && prevChecksum === checksum && !metadata?.label) {
// nothing to do, checksums match // nothing to do, checksums match
log.info("ext upload: %s unchanged, not sending", key); const snapshotId = await this._options.latestVersion.load(key);
return this._options.latestVersion.load(key); log.info("ext %s upload: %s unchanged, not sending (checksum %s, version %s)", this.label, key,
checksum, snapshotId);
return snapshotId;
} }
const snapshotId = await this._ext.upload(key, fname, metadata); const snapshotId = await this._ext.upload(key, fname, metadata);
log.info("ext upload: %s checksum %s", this._ext.url(key), checksum); log.info("ext %s upload: %s checksum %s version %s", this.label, this._ext.url(key), checksum, snapshotId);
if (snapshotId) { await this._options.latestVersion.save(key, snapshotId); } if (snapshotId) { await this._options.latestVersion.save(key, snapshotId); }
await this._options.localHash.save(key, checksum); await this._options.localHash.save(key, checksum);
await this._options.sharedHash.save(key, checksum); await this._options.sharedHash.save(key, checksum);
return snapshotId; return snapshotId;
} catch (err) { } catch (err) {
log.error("ext upload: %s failure to send, error %s", key, err.message); log.error("ext %s upload: %s failure to send, error %s", this.label, key, err.message);
throw err; throw err;
} }
} }
@ -181,7 +184,7 @@ export class ChecksummedExternalStorage implements ExternalStorage {
throw new Error('cannot remove most recent version of a document by id'); throw new Error('cannot remove most recent version of a document by id');
} }
await this._ext.remove(key, snapshotIds); await this._ext.remove(key, snapshotIds);
log.info("ext remove: %s version %s", this._ext.url(key), snapshotIds || 'ALL'); log.info("ext %s remove: %s version %s", this.label, this._ext.url(key), snapshotIds || 'ALL');
if (!snapshotIds) { if (!snapshotIds) {
await this._options.latestVersion.save(key, DELETED_TOKEN); await this._options.latestVersion.save(key, DELETED_TOKEN);
await this._options.sharedHash.save(key, DELETED_TOKEN); await this._options.sharedHash.save(key, DELETED_TOKEN);
@ -192,7 +195,7 @@ export class ChecksummedExternalStorage implements ExternalStorage {
await this._options.sharedHash.save(this._keyWithSnapshot(key, snapshotId), DELETED_TOKEN); await this._options.sharedHash.save(this._keyWithSnapshot(key, snapshotId), DELETED_TOKEN);
} }
} catch (err) { } catch (err) {
log.error("ext delete: %s failure to remove, error %s", key, err.message); log.error("ext %s delete: %s failure to remove, error %s", this.label, key, err.message);
throw err; throw err;
} }
} }
@ -208,11 +211,11 @@ export class ChecksummedExternalStorage implements ExternalStorage {
* doing that. So we add a downloadTo variant that takes before and after keys. * doing that. So we add a downloadTo variant that takes before and after keys.
*/ */
public async downloadTo(fromKey: string, toKey: string, fname: string, snapshotId?: string) { public async downloadTo(fromKey: string, toKey: string, fname: string, snapshotId?: string) {
await this._retry('download', async () => { return this._retry('download', async () => {
const {tmpDir, cleanupCallback} = await createTmpDir({}); const {tmpDir, cleanupCallback} = await createTmpDir({});
const tmpPath = path.join(tmpDir, `${toKey}-tmp`); // NOTE: assumes key is file-system safe. const tmpPath = path.join(tmpDir, `${toKey}-tmp`); // NOTE: assumes key is file-system safe.
try { try {
await this._ext.download(fromKey, tmpPath, snapshotId); const downloadedSnapshotId = await this._ext.download(fromKey, tmpPath, snapshotId);
const checksum = await this._options.computeFileHash(tmpPath); const checksum = await this._options.computeFileHash(tmpPath);
@ -225,9 +228,8 @@ export class ChecksummedExternalStorage implements ExternalStorage {
// you may get an old version for some time. // you may get an old version for some time.
// If a snapshotId was specified, we can skip this check. // If a snapshotId was specified, we can skip this check.
if (expectedChecksum && expectedChecksum !== checksum) { if (expectedChecksum && expectedChecksum !== checksum) {
log.error("ext download: data for %s has wrong checksum: %s (expected %s)", fromKey, log.error("ext %s download: data for %s has wrong checksum: %s (expected %s)",
checksum, this.label, fromKey, checksum, expectedChecksum);
expectedChecksum);
return undefined; return undefined;
} }
} }
@ -235,16 +237,19 @@ export class ChecksummedExternalStorage implements ExternalStorage {
// If successful, rename the temporary file to its proper name. The destination should NOT // If successful, rename the temporary file to its proper name. The destination should NOT
// exist in this case, and this should fail if it does. // exist in this case, and this should fail if it does.
await fse.move(tmpPath, fname, {overwrite: false}); await fse.move(tmpPath, fname, {overwrite: false});
if (fromKey === toKey) {
await this._options.latestVersion.save(toKey, downloadedSnapshotId);
}
await this._options.localHash.save(toKey, checksum); await this._options.localHash.save(toKey, checksum);
log.info("ext download: %s%s%s with checksum %s", fromKey, log.info("ext %s download: %s%s%s with checksum %s and version %s", this.label, fromKey,
snapshotId ? ` [VersionId ${snapshotId}]` : '', snapshotId ? ` [VersionId ${snapshotId}]` : '',
fromKey !== toKey ? ` as ${toKey}` : '', fromKey !== toKey ? ` as ${toKey}` : '',
checksum); checksum, downloadedSnapshotId);
return true; return downloadedSnapshotId;
} catch (err) { } catch (err) {
log.error("ext download: failed to fetch data (%s): %s", fromKey, err.message); log.error("ext %s download: failed to fetch data (%s): %s", this.label, fromKey, err.message);
throw err; throw err;
} finally { } finally {
await cleanupCallback(); await cleanupCallback();

@ -112,7 +112,7 @@ export class HostedStorageManager implements IDocStorageManager {
private _baseStore: ExternalStorage; // External store for documents, without checksumming. private _baseStore: ExternalStorage; // External store for documents, without checksumming.
// Latest checksums of documents. // Latest version ids of documents.
private _latestVersions = new Map<string, string>(); private _latestVersions = new Map<string, string>();
/** /**
@ -418,6 +418,10 @@ export class HostedStorageManager implements IDocStorageManager {
while (!this.isSaved(docName)) { while (!this.isSaved(docName)) {
log.info('HostedStorageManager: waiting for document to finish: %s', docName); log.info('HostedStorageManager: waiting for document to finish: %s', docName);
await this._uploads.expediteOperationAndWait(docName); await this._uploads.expediteOperationAndWait(docName);
if (!this.isSaved(docName)) {
// Throttle slightly in case this operation ends up looping excessively.
await delay(1000);
}
} }
} }
@ -688,7 +692,7 @@ export class HostedStorageManager implements IDocStorageManager {
private _getChecksummedExternalStorage(family: string, core: ExternalStorage, private _getChecksummedExternalStorage(family: string, core: ExternalStorage,
versions: Map<string, string>, versions: Map<string, string>,
options: HostedStorageOptions) { options: HostedStorageOptions) {
return new ChecksummedExternalStorage(core, { return new ChecksummedExternalStorage(family, core, {
maxRetries: 4, maxRetries: 4,
initialDelayMs: options.secondsBeforeFirstRetry * 1000, initialDelayMs: options.secondsBeforeFirstRetry * 1000,
computeFileHash: this._getHash.bind(this), computeFileHash: this._getHash.bind(this),

Loading…
Cancel
Save