gristlabs_grist-core/app/common/KeyedOps.ts

238 lines
7.9 KiB
TypeScript
Raw Permalink Normal View History

/**
* A class for scheduling a particular operation on resources
* identified by a key. For operations which should be applied
* some time after an event.
*/
export class KeyedOps {
private _operations = new Map<string, OperationStatus>(); // status of operations
private _history = new Map<string, OperationHistory>(); // history of operations
// (will accumulate without limit)
private _changed = new Set<string>(); // set when key needs an operation
private _operating = new Set<string>(); // set when operation is in progress for key
private _stopped: boolean = false; // set to prohibit all new operations or retries
/**
* Provide a function to apply operation, and some optional
* parameters.
*
* - delayBeforeOperationMs: if set, a call to addOperation(key) will have
* a delayed effect. It will schedule (or reschedule) the operation to occur
* after this interval. If the operation is currently in progress, it will
* get rerun after it completes.
*
* - minDelaybetweenOperationsMs: is set, scheduling for operations will have
* additional delays inserted as necessary to keep this minimal delay between
* the start of successive operations.
*
* - retry: if `retry` is set, the operation will be retried
* indefinitely with a rather primitive retry mechanism -
* otherwise no attempt is made to retry failures.
*
* - logError: called when errors occur, with a count of number of failures so
* far.
*/
constructor(private _op: (key: string) => Promise<void>, private _options: {
delayBeforeOperationMs?: number,
minDelayBetweenOperationsMs?: number,
retry?: boolean,
logError?: (key: string, failureCount: number, err: Error) => void
}) {
}
/**
* Request an operation be done (eventually) on the specified resourse.
*/
public addOperation(key: string) {
this._changed.add(key);
this._schedule(key);
}
/**
* Check whether any work is scheduled or in progress.
*/
public hasPendingOperations() {
return this._changed.size > 0 || this._operating.size > 0;
}
/**
* Check whether any work is scheduled or in progress for a specific resource.
*/
public hasPendingOperation(key: string) {
return this._changed.has(key) || this._operating.has(key);
}
/**
* Take all scheduled operations and re-schedule them for right now. Useful
* when shutting down. Affects retries. Cannot be undone. Returns immediately.
*/
public expediteOperations() {
this._options.delayBeforeOperationMs = 0;
this._options.minDelayBetweenOperationsMs = 0;
for (const op of this._operations.values()) {
if (op.timeout) {
this._schedule(op.key, true);
}
}
}
/**
* Don't allow any more operations, or retries of existing operations.
*/
public stopOperations() {
this._stopped = true;
this.expediteOperations();
}
/**
* Wait for all operations to complete. This makes most sense to use during
* shutdown - otherwise it might be a very long wait to reach a moment where
* there are no operations.
*/
public async wait(logRepeat?: (count: number) => void) {
let repeats: number = 0;
while (this.hasPendingOperations()) {
if (repeats && logRepeat) { logRepeat(repeats); }
await Promise.all([...this._operating.keys(), ...this._changed.keys()]
.map(key => this.expediteOperationAndWait(key)));
repeats++;
}
}
/**
* Re-schedules any pending operation on a resource for right now. Returns
* when operations on the resource are complete. Does not affect retries.
*/
public async expediteOperationAndWait(key: string) {
const status = this._getOperationStatus(key);
if (status.promise) {
await status.promise;
return;
}
if (!this._changed.has(key)) { return; }
const callback = new Promise((resolve) => {
status.callbacks.push(resolve);
});
this._schedule(key, true);
await callback;
}
/**
* Schedule an operation for a resource.
* If the operation is already in progress, we do nothing.
* If the operation has not yet happened, it is rescheduled.
* If `immediate` is set, the operation is scheduled with no delay.
*/
private _schedule(key: string, immediate: boolean = false) {
const status = this._getOperationStatus(key);
if (status.promise) { return; }
if (status.timeout) {
clearTimeout(status.timeout);
delete status.timeout;
}
let ticks = this._options.delayBeforeOperationMs || 0;
const {lastStart} = this._getOperationHistory(key);
if (lastStart && this._options.minDelayBetweenOperationsMs && !immediate) {
ticks = Math.max(ticks, lastStart + this._options.minDelayBetweenOperationsMs - Date.now());
}
// Primitive slow-down on retries.
// Will do nothing if neither delayBeforeOperationMs nor minDelayBetweenOperationsMs
// are set.
ticks *= 1 + Math.min(5, status.failures);
status.timeout = setTimeout(() => this._update(key), immediate ? 0 : ticks);
}
private _getOperationStatus(key: string): OperationStatus {
let status = this._operations.get(key);
if (!status) {
status = {
key,
failures: 0,
callbacks: []
};
this._operations.set(key, status);
}
return status;
}
private _getOperationHistory(key: string): OperationHistory {
let hist = this._history.get(key);
if (!hist) {
hist = {};
this._history.set(key, hist);
}
return hist;
}
private async _doOp(key: string) {
if (this._stopped) { throw new Error('operations forcibly stopped'); }
return this._op(key);
}
// Implement the next scheduled operation for a resource.
private _update(key: string) {
const status = this._getOperationStatus(key);
delete status.timeout;
// We don't have to do anything if there have been no changes.
if (!this._changed.has(key)) { return; }
// We don't have to do anything (yet) if an operation is already in progress.
if (status.promise) { return; }
// Switch status from changed to operating.
this._changed.delete(key);
this._operating.add(key);
const history = this._getOperationHistory(key);
history.lastStart = Date.now();
// Store a promise for the operation.
status.promise = this._doOp(key).then(() => {
// Successful push! Reset failure count, notify callbacks.
status.failures = 0;
status.callbacks.forEach(callback => callback());
status.callbacks = [];
}).catch(err => {
// Operation failed. Increment failure count, notify callbacks.
status.failures++;
if (this._options.retry && !this._stopped) {
this._changed.add(key);
}
if (this._options.logError) {
this._options.logError(key, status.failures, err);
}
status.callbacks.forEach(callback => callback(err));
status.callbacks = [];
}).then(() => {
// Clean up and schedule follow-up if necessary.
this._operating.delete(key);
delete status.promise;
if (this._changed.has(key)) {
this._schedule(key);
} else {
// No event information left to track, we can delete our OperationStatus entry.
if (status.failures === 0 && !status.timeout) {
this._operations.delete(key);
}
}
});
}
}
/**
* Status of an operation.
*/
interface OperationStatus {
timeout?: NodeJS.Timeout; // a timeout for a scheduled future operation
promise?: Promise<void>; // a promise for an operation that is under way
key: string; // the operation key
failures: number; // consecutive number of times the operation has failed
callbacks: Array<(err?: Error) => void>; // callbacks for notifications when op is done/fails
}
/**
* History of an operation.
*/
interface OperationHistory {
lastStart?: number; // last time operation was started, in ms since epoch
}