diff --git a/app/common/Interval.ts b/app/common/Interval.ts new file mode 100644 index 00000000..7e49702c --- /dev/null +++ b/app/common/Interval.ts @@ -0,0 +1,105 @@ +export interface IntervalOptions { + /** + * Handler for errors that are thrown from the callback. + */ + onError: (e: unknown) => void; +} + +export interface IntervalDelay { + // The base delay in milliseconds. + delayMs: number; + // If set, randomizes the base delay (per interval) by this amount of milliseconds. + varianceMs?: number; +} + +/** + * Interval takes a function to execute, and calls it on an interval based on + * the provided delay. + * + * Supports both fixed and randomized delays between intervals. + */ +export class Interval { + private _timeout?: NodeJS.Timeout | null; + private _lastPendingCall?: Promise | unknown; + private _timeoutDelay?: number; + private _stopped: boolean = true; + + constructor( + private _callback: () => Promise | unknown, + private _delay: IntervalDelay, + private _options: IntervalOptions + ) {} + + /** + * Sets the timeout and schedules the callback to be called on interval. + */ + public enable(): void { + this._stopped = false; + this._setTimeout(); + } + + /** + * Clears the timeout and prevents the next call from being scheduled. + * + * This method does not currently cancel any pending calls. See `disableAndFinish` + * for an async version of this method that supports waiting for the last pending + * call to finish. + */ + public disable(): void { + this._stopped = true; + this._clearTimeout(); + } + + /** + * Like `disable`, but also waits for the last pending call to finish. + */ + public async disableAndFinish(): Promise { + this.disable(); + await this._lastPendingCall; + } + + /** + * Gets the delay in milliseconds of the next scheduled call. + * + * Primarily useful for tests. + */ + public getDelayMs(): number | undefined { + return this._timeoutDelay; + } + + private _clearTimeout() { + if (!this._timeout) { return; } + + clearTimeout(this._timeout); + this._timeout = null; + } + + private _setTimeout() { + this._clearTimeout(); + this._timeoutDelay = this._computeDelayMs(); + this._timeout = setTimeout(() => this._onTimeoutTriggered(), this._timeoutDelay); + } + + private _computeDelayMs() { + const {delayMs, varianceMs} = this._delay; + if (varianceMs !== undefined) { + // Randomize the delay by the specified amount of variance. + const [min, max] = [delayMs - varianceMs, delayMs + varianceMs]; + return Math.floor(Math.random() * (max - min + 1)) + min; + } else { + return delayMs; + } + } + + private async _onTimeoutTriggered() { + this._clearTimeout(); + try { + await (this._lastPendingCall = this._callback()); + } catch (e: unknown) { + this._options.onError(e); + } + if (!this._stopped) { + this._setTimeout(); + } + } +} diff --git a/app/common/RandomizedTimer.ts b/app/common/RandomizedTimer.ts deleted file mode 100644 index 17ef25da..00000000 --- a/app/common/RandomizedTimer.ts +++ /dev/null @@ -1,48 +0,0 @@ -/** - * RandomizedTimer takes a function to execute, and calls it on a randomized interval - * between the minimum and maximum delay. The interval delay is randomized between - * each scheduled call. - */ -export class RandomizedTimer { - private _timeout?: NodeJS.Timeout | null; - - constructor( - private _callback: () => void, - private _minDelayMs: number, - private _maxDelayMs: number, - ) {} - - /** - * Sets the timeout and schedules the callback to be called. - */ - public enable(): void { - this._setTimeout(); - } - - /** - * Clears the timeout and prevents the callback from being called. - */ - public disable(): void { - this._clearTimeout(); - } - - private _clearTimeout() { - if (!this._timeout) { return; } - - clearTimeout(this._timeout); - this._timeout = null; - } - - private _setTimeout() { - this._clearTimeout(); - const [min, max] = [this._minDelayMs, this._maxDelayMs]; - const delay = Math.floor(Math.random() * (max - min + 1)) + min; - this._timeout = setTimeout(() => this._onTimeoutTriggered(), delay); - } - - private _onTimeoutTriggered() { - this._clearTimeout(); - this._callback(); - this._setTimeout(); - } -} diff --git a/app/server/lib/ActiveDoc.ts b/app/server/lib/ActiveDoc.ts index 78e6764d..2b8d968a 100644 --- a/app/server/lib/ActiveDoc.ts +++ b/app/server/lib/ActiveDoc.ts @@ -60,7 +60,7 @@ import {FormulaProperties, getFormulaProperties} from 'app/common/GranularAccess import {parseUrlId} from 'app/common/gristUrls'; import {byteString, countIf, retryOnce, safeJsonParse} from 'app/common/gutil'; import {InactivityTimer} from 'app/common/InactivityTimer'; -import {RandomizedTimer} from 'app/common/RandomizedTimer'; +import {Interval} from 'app/common/Interval'; import * as roles from 'app/common/roles'; import {schema, SCHEMA_VERSION} from 'app/common/schema'; import {MetaRowRecord, SingleCell} from 'app/common/TableData'; @@ -140,13 +140,13 @@ const ACTIVEDOC_TIMEOUT = (process.env.NODE_ENV === 'production') ? 30 : 5; const MEMORY_MEASUREMENT_INTERVAL_MS = 60 * 1000; // Cleanup expired attachments every hour (also happens when shutting down) -const REMOVE_UNUSED_ATTACHMENTS_INTERVAL_MS = 60 * 60 * 1000; +const REMOVE_UNUSED_ATTACHMENTS_DELAY = {delayMs: 60 * 60 * 1000, varianceMs: 30 * 1000}; // Apply the UpdateCurrentTime user action every hour -const UPDATE_CURRENT_TIME_INTERVAL_MS = 60 * 60 * 1000; +const UPDATE_CURRENT_TIME_DELAY = {delayMs: 60 * 60 * 1000, varianceMs: 30 * 1000}; // Measure and broadcast data size every 5 minutes -const UPDATE_DATA_SIZE_INTERVAL_MS = 5 * 60 * 1000; +const UPDATE_DATA_SIZE_DELAY = {delayMs: 5 * 60 * 1000, varianceMs: 30 * 1000}; // A hook for dependency injection. export const Deps = {ACTIVEDOC_TIMEOUT}; @@ -189,7 +189,7 @@ export class ActiveDoc extends EventEmitter { // result). protected _modificationLock: Mutex = new Mutex(); - private _log = new LogMethods('ActiveDoc ', (s: OptDocSession) => this.getLogMeta(s)); + private _log = new LogMethods('ActiveDoc ', (s: OptDocSession | null) => this.getLogMeta(s)); private _triggers: DocTriggers; private _requests: DocRequests; private _dataEngine: Promise|undefined; @@ -220,8 +220,34 @@ export class ActiveDoc extends EventEmitter { private _recoveryMode: boolean = false; private _shuttingDown: boolean = false; - // Randomized timers to clear on shutdown. - private _randomizedTimers: RandomizedTimer[] = []; + /** + * In cases where large numbers of documents are restarted simultaneously + * (like during deployments), there's a tendency for scheduled intervals to + * execute at roughly the same moment in time, which causes spikes in load. + * + * To mitigate this, we use randomized intervals that re-compute their delay + * in-between calls, with a variance of 30 seconds. + */ + private _intervals = [ + // Cleanup expired attachments every hour (also happens when shutting down). + new Interval( + () => this.removeUnusedAttachments(true), + REMOVE_UNUSED_ATTACHMENTS_DELAY, + {onError: (e) => this._log.error(null, 'failed to remove expired attachments', e)}, + ), + // Update the time in formulas every hour. + new Interval( + () => this._applyUserActions(makeExceptionalDocSession('system'), [['UpdateCurrentTime']]), + UPDATE_CURRENT_TIME_DELAY, + {onError: (e) => this._log.error(null, 'failed to update current time', e)}, + ), + // Measure and broadcast data size every 5 minutes. + new Interval( + () => this._checkDataSizeLimitRatio(makeExceptionalDocSession('system')), + UPDATE_DATA_SIZE_DELAY, + {onError: (e) => this._log.error(null, 'failed to update data size', e)}, + ), + ]; constructor(docManager: DocManager, docName: string, private _options?: ICreateActiveDocOptions) { super(); @@ -296,6 +322,10 @@ export class ActiveDoc extends EventEmitter { // unscheduled. If not (e.g. abandoned import, network problems after creating a doc), then // the ActiveDoc will get cleaned up. this._inactivityTimer.enable(); + + for (const interval of this._intervals) { + interval.enable(); + } } public get docName(): string { return this._docName; } @@ -472,8 +502,8 @@ export class ActiveDoc extends EventEmitter { // Clear the MapWithTTL to remove all timers from the event loop. this._fetchCache.clear(); - for (const timer of this._randomizedTimers) { - timer.disable(); + for (const interval of this._intervals) { + await interval.disableAndFinish(); } // We'll defer syncing usage until everything is calculated. const usageOptions = {syncUsageToDatabase: false, broadcastUsageToClients: false}; @@ -2061,7 +2091,6 @@ export class ActiveDoc extends EventEmitter { this._inactivityTimer.setDelay(closeTimeout); this._log.debug(docSession, `loaded in ${loadMs} ms, InactivityTimer set to ${closeTimeout} ms`); void this._initializeDocUsage(docSession); - void this._scheduleBackgroundJobs(); } catch (err) { this._fullyLoaded = true; if (!this._shuttingDown) { @@ -2123,40 +2152,6 @@ export class ActiveDoc extends EventEmitter { } } - private _scheduleBackgroundJobs() { - /* In cases where large numbers of documents are restarted simultaneously - * (like during deployments), there's a tendency for scheduled intervals to - * execute at roughly the same moment in time, which causes spikes in load. - * - * To mitigate this, we use randomized timers that re-compute their delay - * in-between intervals, with a maximum variance of 30 seconds. */ - const VARIANCE_MS = 30000; - this._randomizedTimers = [ - // Cleanup expired attachments every hour (also happens when shutting down). - new RandomizedTimer( - () => this.removeUnusedAttachments(true), - REMOVE_UNUSED_ATTACHMENTS_INTERVAL_MS - VARIANCE_MS, - REMOVE_UNUSED_ATTACHMENTS_INTERVAL_MS + VARIANCE_MS, - ), - // Update the time in formulas every hour. - new RandomizedTimer( - () => this._applyUserActions(makeExceptionalDocSession('system'), [['UpdateCurrentTime']]), - UPDATE_CURRENT_TIME_INTERVAL_MS - VARIANCE_MS, - UPDATE_CURRENT_TIME_INTERVAL_MS + VARIANCE_MS, - ), - // Measure and broadcast data size every 5 minutes. - new RandomizedTimer( - () => this._checkDataSizeLimitRatio(makeExceptionalDocSession('system')), - UPDATE_DATA_SIZE_INTERVAL_MS - VARIANCE_MS, - UPDATE_DATA_SIZE_INTERVAL_MS + VARIANCE_MS, - ), - ]; - - for (const timer of this._randomizedTimers) { - timer.enable(); - } - } - /** * Called before a migration. Makes sure a back-up is made. */ diff --git a/app/server/lib/Throttle.ts b/app/server/lib/Throttle.ts index 3499747a..bad6ee9c 100644 --- a/app/server/lib/Throttle.ts +++ b/app/server/lib/Throttle.ts @@ -19,6 +19,7 @@ */ import pidusage from '@gristlabs/pidusage'; +import {Interval} from 'app/common/Interval'; import log from 'app/server/lib/log'; /** @@ -71,8 +72,8 @@ interface MeterSample { * process from consuming too much cpu until stop() is called. */ export class Throttle { - private _timing: ThrottleTiming; // overall timing parameters - private _meteringInterval: NodeJS.Timeout | undefined; // timer for cpu measurements + private _timing: ThrottleTiming = + this._options.timing || defaultThrottleTiming; // overall timing parameters private _dutyCycleTimeout: NodeJS.Timeout | undefined; // driver for throttle duty cycle private _traceNudgeTimeout: NodeJS.Timeout | undefined; // schedule a nudge to a traced process private _throttleFactor: number = 0; // relative length of paused phase @@ -84,6 +85,13 @@ export class Throttle { private _stopped: boolean = false; // set when stop has been called private _active: boolean = true; // set when we are not trying to pause process + // Interval for CPU measurements. + private _meteringInterval: Interval = new Interval( + () => this._update(), + {delayMs: this._timing.samplePeriodMs}, + {onError: (e) => this._log(`Throttle error: ${e}`, this._options.logMeta)}, + ); + /** * Start monitoring the given process and throttle as needed. * If readPid is set, CPU usage will be read for that process. @@ -127,8 +135,7 @@ export class Throttle { logMeta: log.ILogMeta, timing?: ThrottleTiming }) { - this._timing = this._options.timing || defaultThrottleTiming; - this._meteringInterval = setInterval(() => this._update(), this._timing.samplePeriodMs); + this._meteringInterval.enable(); } /** @@ -285,10 +292,7 @@ export class Throttle { * Make sure measurement of cpu is stopped. */ private _stopMetering() { - if (this._meteringInterval) { - clearInterval(this._meteringInterval); - this._meteringInterval = undefined; - } + this._meteringInterval.disable(); } private _stopTraceNudge() { diff --git a/test/common/Interval.ts b/test/common/Interval.ts new file mode 100644 index 00000000..05eee7ec --- /dev/null +++ b/test/common/Interval.ts @@ -0,0 +1,141 @@ +import {Interval} from 'app/common/Interval'; +import {delay} from 'bluebird'; +import {assert} from 'chai'; +import * as sinon from 'sinon'; + +describe('Interval', function() { + const delayMs = 100; + const varianceMs = 50; + const promiseDelayMs = 200; + const delayBufferMs = 20; + + let interval: Interval; + let spy: sinon.SinonSpy; + + beforeEach(() => { + spy = sinon.spy(); + }); + + afterEach(async () => { + if (interval) { + await interval.disableAndFinish(); + } + }); + + it('is not enabled by default', async function() { + interval = new Interval(spy, {delayMs}, {onError: () => { /* do nothing */ }}); + assert.equal(spy.callCount, 0); + await delay(delayMs + delayBufferMs); + assert.equal(spy.callCount, 0); + }); + + it('can be disabled', async function() { + interval = new Interval(spy, {delayMs}, {onError: () => { /* do nothing */ }}); + interval.enable(); + await delay(delayMs + delayBufferMs); + assert.equal(spy.callCount, 1); + + // Disable the interval, and check that the calls stop. + interval.disable(); + await delay(delayMs + delayBufferMs); + assert.equal(spy.callCount, 1); + + // Enable the interval again, and check that the calls resume. + interval.enable(); + await delay(delayMs + delayBufferMs); + assert.equal(spy.callCount, 2); + spy.resetHistory(); + }); + + it('calls onError if callback throws an error', async function() { + const callback = () => { throw new Error('Something bad happened.'); }; + const onErrorSpy = sinon.spy(); + interval = new Interval(callback, {delayMs}, {onError: onErrorSpy}); + interval.enable(); + + // Check that onError is called when the callback throws. + assert.equal(onErrorSpy.callCount, 0); + await delay(delayMs + delayBufferMs); + assert.equal(onErrorSpy.callCount, 1); + + // Check that the interval didn't stop (since the onError spy silenced the error). + await delay(delayMs + delayBufferMs); + assert.equal(onErrorSpy.callCount, 2); + }); + + describe('with a fixed delay', function() { + beforeEach(() => { + interval = new Interval(spy, {delayMs}, {onError: () => { /* do nothing */ }}); + interval.enable(); + }); + + it('calls the callback on a fixed interval', async function() { + await delay(delayMs + delayBufferMs); + assert.equal(spy.callCount, 1); + await delay(delayMs + delayBufferMs); + assert.equal(spy.callCount, 2); + }); + }); + + describe('with a randomized delay', function() { + beforeEach(() => { + interval = new Interval(spy, {delayMs, varianceMs}, { + onError: () => { /* do nothing */ } + }); + interval.enable(); + }); + + it('calls the callback on a randomized interval', async function() { + const delays: number[] = []; + for (let i = 1; i <= 10; i++) { + // Get the current delay and check that it's within the expected range. + const currentDelayMs = interval.getDelayMs(); + delays.push(currentDelayMs!); + assert.isDefined(currentDelayMs); + assert.isAtMost(currentDelayMs!, delayMs + varianceMs); + assert.isAtLeast(currentDelayMs!, delayMs - varianceMs); + + // Wait for the delay, and check that the spy was called. + await delay(currentDelayMs!); + assert.equal(spy.callCount, i); + } + + // Check that we didn't use the same delay all 10 times. + assert.notEqual([...new Set(delays)].length, 1); + }); + }); + + describe('with a promise-based callback', function() { + let promiseSpy: sinon.SinonSpy; + + beforeEach(() => { + const promise = () => delay(promiseDelayMs); + promiseSpy = sinon.spy(promise); + interval = new Interval(promiseSpy, {delayMs}, {onError: () => { /* do nothing */ }}); + interval.enable(); + }); + + it('waits for promises to settle before scheduling the next call', async function() { + assert.equal(promiseSpy.callCount, 0); + await delay(delayMs + delayBufferMs); + assert.equal(promiseSpy.callCount, 1); + await delay(delayMs + delayBufferMs); + assert.equal(promiseSpy.callCount, 1); // Still 1, because the first promise hasn't settled yet. + await delay(delayMs + delayBufferMs); + assert.equal(promiseSpy.callCount, 1); // Promise now settled, but there's still a 100ms delay. + await delay(delayMs + delayBufferMs); + assert.equal(promiseSpy.callCount, 2); // Now we finally call the callback again. + }); + + it('can wait for last promise to settle when disabling', async function() { + assert.equal(promiseSpy.callCount, 0); + await delay(delayMs + delayBufferMs); + assert.equal(promiseSpy.callCount, 1); + await interval.disableAndFinish(); + + // Check that once disabled, no more calls are scheduled. + await delay(promiseDelayMs + delayMs + delayBufferMs); + assert.equal(promiseSpy.callCount, 1); + }); + }); +});