gristlabs_grist-core/app/common/KeyedOps.ts
Paul Fitzpatrick 05d1cdf140 (core) limit retries of uploads to external store in tests
Summary:
If an external store fails completely, Grist will continue to
retry uploading to it. This diff updated the HostedStorageManager
test to limit the extent of these retries to the test itself -
otherwise they continue for all other tests in the same process,
potentially disrupting those that read logs. There are other tests
that use s3, but they aren't run in the same process with delicate
log-reading tests, and it isn't quite as clear what improvement
to make there.

Test Plan:
artificially made external store fail, and checked that
test contamination seen previously no longer occurs.

Reviewers: georgegevoian

Reviewed By: georgegevoian

Differential Revision: https://phab.getgrist.com/D3469
2022-06-06 16:19:41 -04:00

238 lines
7.9 KiB
TypeScript

/**
* A class for scheduling a particular operation on resources
* identified by a key. For operations which should be applied
* some time after an event.
*/
export class KeyedOps {
private _operations = new Map<string, OperationStatus>(); // status of operations
private _history = new Map<string, OperationHistory>(); // history of operations
// (will accumulate without limit)
private _changed = new Set<string>(); // set when key needs an operation
private _operating = new Set<string>(); // set when operation is in progress for key
private _stopped: boolean = false; // set to prohibit all new operations or retries
/**
* Provide a function to apply operation, and some optional
* parameters.
*
* - delayBeforeOperationMs: if set, a call to addOperation(key) will have
* a delayed effect. It will schedule (or reschedule) the operation to occur
* after this interval. If the operation is currently in progress, it will
* get rerun after it completes.
*
* - minDelaybetweenOperationsMs: is set, scheduling for operations will have
* additional delays inserted as necessary to keep this minimal delay between
* the start of successive operations.
*
* - retry: if `retry` is set, the operation will be retried
* indefinitely with a rather primitive retry mechanism -
* otherwise no attempt is made to retry failures.
*
* - logError: called when errors occur, with a count of number of failures so
* far.
*/
constructor(private _op: (key: string) => Promise<void>, private _options: {
delayBeforeOperationMs?: number,
minDelayBetweenOperationsMs?: number,
retry?: boolean,
logError?: (key: string, failureCount: number, err: Error) => void
}) {
}
/**
* Request an operation be done (eventually) on the specified resourse.
*/
public addOperation(key: string) {
this._changed.add(key);
this._schedule(key);
}
/**
* Check whether any work is scheduled or in progress.
*/
public hasPendingOperations() {
return this._changed.size > 0 || this._operating.size > 0;
}
/**
* Check whether any work is scheduled or in progress for a specific resource.
*/
public hasPendingOperation(key: string) {
return this._changed.has(key) || this._operating.has(key);
}
/**
* Take all scheduled operations and re-schedule them for right now. Useful
* when shutting down. Affects retries. Cannot be undone. Returns immediately.
*/
public expediteOperations() {
this._options.delayBeforeOperationMs = 0;
this._options.minDelayBetweenOperationsMs = 0;
for (const op of this._operations.values()) {
if (op.timeout) {
this._schedule(op.key, true);
}
}
}
/**
* Don't allow any more operations, or retries of existing operations.
*/
public stopOperations() {
this._stopped = true;
this.expediteOperations();
}
/**
* Wait for all operations to complete. This makes most sense to use during
* shutdown - otherwise it might be a very long wait to reach a moment where
* there are no operations.
*/
public async wait(logRepeat?: (count: number) => void) {
let repeats: number = 0;
while (this.hasPendingOperations()) {
if (repeats && logRepeat) { logRepeat(repeats); }
await Promise.all([...this._operating.keys(), ...this._changed.keys()]
.map(key => this.expediteOperationAndWait(key)));
repeats++;
}
}
/**
* Re-schedules any pending operation on a resource for right now. Returns
* when operations on the resource are complete. Does not affect retries.
*/
public async expediteOperationAndWait(key: string) {
const status = this._getOperationStatus(key);
if (status.promise) {
await status.promise;
return;
}
if (!this._changed.has(key)) { return; }
const callback = new Promise((resolve) => {
status.callbacks.push(resolve);
});
this._schedule(key, true);
await callback;
}
/**
* Schedule an operation for a resource.
* If the operation is already in progress, we do nothing.
* If the operation has not yet happened, it is rescheduled.
* If `immediate` is set, the operation is scheduled with no delay.
*/
private _schedule(key: string, immediate: boolean = false) {
const status = this._getOperationStatus(key);
if (status.promise) { return; }
if (status.timeout) {
clearTimeout(status.timeout);
delete status.timeout;
}
let ticks = this._options.delayBeforeOperationMs || 0;
const {lastStart} = this._getOperationHistory(key);
if (lastStart && this._options.minDelayBetweenOperationsMs && !immediate) {
ticks = Math.max(ticks, lastStart + this._options.minDelayBetweenOperationsMs - Date.now());
}
// Primitive slow-down on retries.
// Will do nothing if neither delayBeforeOperationMs nor minDelayBetweenOperationsMs
// are set.
ticks *= 1 + Math.min(5, status.failures);
status.timeout = setTimeout(() => this._update(key), immediate ? 0 : ticks);
}
private _getOperationStatus(key: string): OperationStatus {
let status = this._operations.get(key);
if (!status) {
status = {
key,
failures: 0,
callbacks: []
};
this._operations.set(key, status);
}
return status;
}
private _getOperationHistory(key: string): OperationHistory {
let hist = this._history.get(key);
if (!hist) {
hist = {};
this._history.set(key, hist);
}
return hist;
}
private async _doOp(key: string) {
if (this._stopped) { throw new Error('operations forcibly stopped'); }
return this._op(key);
}
// Implement the next scheduled operation for a resource.
private _update(key: string) {
const status = this._getOperationStatus(key);
delete status.timeout;
// We don't have to do anything if there have been no changes.
if (!this._changed.has(key)) { return; }
// We don't have to do anything (yet) if an operation is already in progress.
if (status.promise) { return; }
// Switch status from changed to operating.
this._changed.delete(key);
this._operating.add(key);
const history = this._getOperationHistory(key);
history.lastStart = Date.now();
// Store a promise for the operation.
status.promise = this._doOp(key).then(() => {
// Successful push! Reset failure count, notify callbacks.
status.failures = 0;
status.callbacks.forEach(callback => callback());
status.callbacks = [];
}).catch(err => {
// Operation failed. Increment failure count, notify callbacks.
status.failures++;
if (this._options.retry && !this._stopped) {
this._changed.add(key);
}
if (this._options.logError) {
this._options.logError(key, status.failures, err);
}
status.callbacks.forEach(callback => callback(err));
status.callbacks = [];
}).then(() => {
// Clean up and schedule follow-up if necessary.
this._operating.delete(key);
delete status.promise;
if (this._changed.has(key)) {
this._schedule(key);
} else {
// No event information left to track, we can delete our OperationStatus entry.
if (status.failures === 0 && !status.timeout) {
this._operations.delete(key);
}
}
});
}
}
/**
* Status of an operation.
*/
interface OperationStatus {
timeout?: NodeJS.Timeout; // a timeout for a scheduled future operation
promise?: Promise<void>; // a promise for an operation that is under way
key: string; // the operation key
failures: number; // consecutive number of times the operation has failed
callbacks: Array<(err?: Error) => void>; // callbacks for notifications when op is done/fails
}
/**
* History of an operation.
*/
interface OperationHistory {
lastStart?: number; // last time operation was started, in ms since epoch
}