gristlabs_grist-core/app/common/KeyedMutex.ts
Paul Fitzpatrick e30d0fd5d0 (core) fix sync to s3 when doc is marked as dirty but proves to be clean
Summary:
This fixes a two problems:
 * A mistake in `KeyedMutex.runExclusive`.
 * Logic about saving a document to s3 when the document is found to match what is already there.

`HostedStorageManager.flushDoc` could get caught in a loop if a document was uploaded to s3 and then, without any change to it, marked as dirty.  Low level code would detect there was no change and skip the upload; but then the snapshotId could be unknown, causing an error and retries. This diff fixes that problem by discovering the snapshotId on downloads and tracking it. It also corrects a mutex problem that may have been creating the scenario. A small delay is added to `flushDoc` to mitigate the effect of similar problems in future. Exponential backoff would be good, but `flushDoc` is called in some situations where long delays would negatively impact worker shutdown or user work.

Test Plan: added tests

Reviewers: dsagal

Reviewed By: dsagal

Differential Revision: https://phab.getgrist.com/D2654
2020-11-10 08:12:31 -05:00

47 lines
1.3 KiB
TypeScript

import { Mutex, MutexInterface } from 'async-mutex';
/**
* A per-key mutex. It has the same interface as Mutex, but with an extra key supplied.
* Maintains an independent mutex for each key on need.
*/
export class KeyedMutex {
private _mutexes = new Map<string, Mutex>();
public async acquire(key: string): Promise<MutexInterface.Releaser> {
// Create a new mutex if we need one.
if (!this._mutexes.has(key)) {
this._mutexes.set(key, new Mutex());
}
const mutex = this._mutexes.get(key)!;
const unlock = await mutex.acquire();
return () => {
unlock();
// After unlocking, clean-up the mutex if it is no longer needed.
// unlock() leaves the mutex locked if anyone has been waiting for it.
if (!mutex.isLocked()) {
this._mutexes.delete(key);
}
};
}
public async runExclusive<T>(key: string, callback: MutexInterface.Worker<T>): Promise<T> {
const unlock = await this.acquire(key);
try {
return await callback();
} finally {
unlock();
}
}
public isLocked(key: string): boolean {
const mutex = this._mutexes.get(key);
if (!mutex) { return false; }
return mutex.isLocked();
}
// Check how many mutexes are in use.
public get size(): number {
return this._mutexes.size;
}
}