mirror of
https://github.com/gristlabs/grist-core.git
synced 2024-10-27 20:44:07 +00:00
e30d0fd5d0
Summary: This fixes a two problems: * A mistake in `KeyedMutex.runExclusive`. * Logic about saving a document to s3 when the document is found to match what is already there. `HostedStorageManager.flushDoc` could get caught in a loop if a document was uploaded to s3 and then, without any change to it, marked as dirty. Low level code would detect there was no change and skip the upload; but then the snapshotId could be unknown, causing an error and retries. This diff fixes that problem by discovering the snapshotId on downloads and tracking it. It also corrects a mutex problem that may have been creating the scenario. A small delay is added to `flushDoc` to mitigate the effect of similar problems in future. Exponential backoff would be good, but `flushDoc` is called in some situations where long delays would negatively impact worker shutdown or user work. Test Plan: added tests Reviewers: dsagal Reviewed By: dsagal Differential Revision: https://phab.getgrist.com/D2654
47 lines
1.3 KiB
TypeScript
47 lines
1.3 KiB
TypeScript
import { Mutex, MutexInterface } from 'async-mutex';
|
|
|
|
/**
|
|
* A per-key mutex. It has the same interface as Mutex, but with an extra key supplied.
|
|
* Maintains an independent mutex for each key on need.
|
|
*/
|
|
export class KeyedMutex {
|
|
private _mutexes = new Map<string, Mutex>();
|
|
|
|
public async acquire(key: string): Promise<MutexInterface.Releaser> {
|
|
// Create a new mutex if we need one.
|
|
if (!this._mutexes.has(key)) {
|
|
this._mutexes.set(key, new Mutex());
|
|
}
|
|
const mutex = this._mutexes.get(key)!;
|
|
const unlock = await mutex.acquire();
|
|
return () => {
|
|
unlock();
|
|
// After unlocking, clean-up the mutex if it is no longer needed.
|
|
// unlock() leaves the mutex locked if anyone has been waiting for it.
|
|
if (!mutex.isLocked()) {
|
|
this._mutexes.delete(key);
|
|
}
|
|
};
|
|
}
|
|
|
|
public async runExclusive<T>(key: string, callback: MutexInterface.Worker<T>): Promise<T> {
|
|
const unlock = await this.acquire(key);
|
|
try {
|
|
return await callback();
|
|
} finally {
|
|
unlock();
|
|
}
|
|
}
|
|
|
|
public isLocked(key: string): boolean {
|
|
const mutex = this._mutexes.get(key);
|
|
if (!mutex) { return false; }
|
|
return mutex.isLocked();
|
|
}
|
|
|
|
// Check how many mutexes are in use.
|
|
public get size(): number {
|
|
return this._mutexes.size;
|
|
}
|
|
}
|