(core) Add timeouts to prevent ActiveDoc bad state during shutdown.

Summary:
Add two shutdown-related timeouts.

1. One is to limit the duration of any work that happens once shutdown
   begins. In particular, waiting for an update to current time could block
   indefinitely if the data engine is unresponsive. Such awaits are now
   limited to 5 seconds.

2. The other is to allow documents to get shutdown for inactivity even when
   some work takes forever. Certain work (e.g. applying user actions)
   generally prevents a document from shutting down while it's pending. This
   prevention is now limited to 5 minutes.

   Shutting down a doc while something is pending may break some
   assumptions, and lead to errors. The timeout is long to let us assume
   that the work is stuck, and that errors are better than waiting forever.

Other changes:
- Periodic ActiveDoc work (intervals) is now started when a doc finishes
  loading rather than in the constructor. The difference only showed up in
  tests which makes the intervals much shorter.

- Move timeoutReached() utility function to gutil, and use it for
  isLongerThan(), since they are basically identical. Also makes sure that the
  timer in these is cleared in all cases.

- Remove duplicate waitForIt implementation (previously had a copy in both
  test/server and core/test/server).

- Change testUtil.captureLog to pass messages to its callback, to allow asserts
  on messages within the callback.

Test Plan:
Added new unittests for the new shutdowns, including a replication
of a bad state that was possible during shutdown.

Reviewers: paulfitz

Reviewed By: paulfitz

Subscribers: paulfitz

Differential Revision: https://phab.getgrist.com/D4040
pull/686/head
Dmitry S 8 months ago
parent b2ac603ae7
commit 2705d41c34

@ -1,4 +1,3 @@
import {delay} from 'app/common/delay';
import {BindableValue, DomElementMethod, IKnockoutReadObservable, ISubscribable, Listener, Observable,
subscribeElem, UseCB, UseCBOwner} from 'grainjs';
import {Observable as KoObservable} from 'knockout';
@ -883,19 +882,31 @@ export function isValidHex(val: string): boolean {
return /^#([0-9A-F]{6})$/i.test(val);
}
/**
* Resolves to true if promise is still pending after msec milliseconds have passed. Otherwise
* returns false, including when promise is rejected.
*/
export async function timeoutReached(msec: number, promise: Promise<unknown>): Promise<boolean> {
const timedOut = {};
// Be careful to clean up the timer after ourselves, so it doesn't remain in the event loop.
let timer: NodeJS.Timer;
const delayPromise = new Promise<any>((resolve) => { timer = setTimeout(() => resolve(timedOut), msec); });
try {
const res = await Promise.race([promise, delayPromise]);
return res == timedOut;
} catch (err) {
return false;
} finally {
clearTimeout(timer!);
}
}
/**
* Returns a promise that resolves to true if promise takes longer than timeoutMsec to resolve. If not
* or if promise throws returns false.
*/
export async function isLongerThan(promise: Promise<any>, timeoutMsec: number): Promise<boolean> {
let isPending = true;
const done = () => { isPending = false; };
await Promise.race([
promise.then(done, done),
delay(timeoutMsec)
]);
return isPending;
* or if promise throws returns false. Same as timeoutReached(), with reversed order of arguments.
*/
export async function isLongerThan(promise: Promise<unknown>, timeoutMsec: number): Promise<boolean> {
return timeoutReached(timeoutMsec, promise);
}
/**

@ -67,7 +67,7 @@ import {Product} from 'app/common/Features';
import {FormulaProperties, getFormulaProperties} from 'app/common/GranularAccessClause';
import {isHiddenCol} from 'app/common/gristTypes';
import {commonUrls, parseUrlId} from 'app/common/gristUrls';
import {byteString, countIf, retryOnce, safeJsonParse} from 'app/common/gutil';
import {byteString, countIf, retryOnce, safeJsonParse, timeoutReached} from 'app/common/gutil';
import {InactivityTimer} from 'app/common/InactivityTimer';
import {Interval} from 'app/common/Interval';
import * as roles from 'app/common/roles';
@ -131,7 +131,7 @@ import {createAttachmentsIndex, DocStorage, REMOVE_UNUSED_ATTACHMENTS_DELAY} fro
import {expandQuery} from './ExpandedQuery';
import {GranularAccess, GranularAccessForBundle} from './GranularAccess';
import {OnDemandActions} from './OnDemandActions';
import {getLogMetaFromDocSession, getPubSubPrefix, getTelemetryMetaFromDocSession, timeoutReached} from './serverUtils';
import {getLogMetaFromDocSession, getPubSubPrefix, getTelemetryMetaFromDocSession} from './serverUtils';
import {findOrAddAllEnvelope, Sharing} from './Sharing';
import cloneDeep = require('lodash/cloneDeep');
import flatten = require('lodash/flatten');
@ -166,10 +166,23 @@ const UPDATE_DATA_SIZE_DELAY = {delayMs: 5 * 60 * 1000, varianceMs: 30 * 1000};
// Log document metrics every hour
const LOG_DOCUMENT_METRICS_DELAY = {delayMs: 60 * 60 * 1000, varianceMs: 30 * 1000};
// For items of work that need to happen at shutdown, timeout before aborting the wait for them.
const SHUTDOWN_ITEM_TIMEOUT_MS = 5000;
// We keep a doc open while a user action is pending, but not longer than this. If it's pending
// this long, the ACTIVEDOC_TIMEOUT will still kick in afterwards, and in the absence of other
// activity, the doc would still get shut down, with the action's effect lost. This is to prevent
// indefinitely running processes in case of an infinite loop in a formula.
const KEEP_DOC_OPEN_TIMEOUT_MS = 5 * 60 * 1000;
// A hook for dependency injection.
export const Deps = {
ACTIVEDOC_TIMEOUT,
ACTIVEDOC_TIMEOUT_ACTION: 'shutdown' as 'shutdown'|'ignore',
UPDATE_CURRENT_TIME_DELAY,
SHUTDOWN_ITEM_TIMEOUT_MS,
KEEP_DOC_OPEN_TIMEOUT_MS,
};
interface UpdateUsageOptions {
@ -187,12 +200,15 @@ interface UpdateUsageOptions {
export class ActiveDoc extends EventEmitter {
/**
* Decorator for ActiveDoc methods that prevents shutdown while the method is running, i.e.
* until the returned promise is resolved.
* until the returned promise is resolved, or KEEP_DOC_OPEN_TIMEOUT_MS passes.
*/
public static keepDocOpen(target: ActiveDoc, propertyKey: string, descriptor: PropertyDescriptor) {
const origFunc = descriptor.value;
descriptor.value = function(this: ActiveDoc) {
return this._inactivityTimer.disableUntilFinish(origFunc.apply(this, arguments));
const result = origFunc.apply(this, arguments);
this._inactivityTimer.disableUntilFinish(timeoutReached(Deps.KEEP_DOC_OPEN_TIMEOUT_MS, result))
.catch(() => {});
return result;
};
}
@ -275,7 +291,7 @@ export class ActiveDoc extends EventEmitter {
// Update the time in formulas every hour.
new Interval(
() => this._applyUserActions(makeExceptionalDocSession('system'), [['UpdateCurrentTime']]),
UPDATE_CURRENT_TIME_DELAY,
Deps.UPDATE_CURRENT_TIME_DELAY,
{onError: (e) => this._log.error(null, 'failed to update current time', e)},
),
// Measure and broadcast data size every 5 minutes.
@ -364,10 +380,6 @@ export class ActiveDoc extends EventEmitter {
// unscheduled. If not (e.g. abandoned import, network problems after creating a doc), then
// the ActiveDoc will get cleaned up.
this._inactivityTimer.enable();
for (const interval of this._intervals) {
interval.enable();
}
}
public get docName(): string { return this._docName; }
@ -1940,6 +1952,17 @@ export class ActiveDoc extends EventEmitter {
private async _doShutdownImpl(): Promise<void> {
const docSession = makeExceptionalDocSession('system');
this._log.debug(docSession, "shutdown starting");
const safeCallAndWait = async (funcDesc: string, func: () => Promise<unknown>) => {
try {
if (await timeoutReached(Deps.SHUTDOWN_ITEM_TIMEOUT_MS, func())) {
this._log.error(docSession, `${funcDesc} timed out`);
}
} catch (err) {
this._log.error(docSession, `${funcDesc} failed`, err);
}
};
try {
this.setMuted();
this._inactivityTimer.disable();
@ -1958,9 +1981,9 @@ export class ActiveDoc extends EventEmitter {
// Clear the MapWithTTL to remove all timers from the event loop.
this._fetchCache.clear();
for (const interval of this._intervals) {
await interval.disableAndFinish();
}
await Promise.all(this._intervals.map(interval =>
safeCallAndWait("interval.disableAndFinish", () => interval.disableAndFinish())));
// We'll defer syncing usage until everything is calculated.
const usageOptions = {syncUsageToDatabase: false, broadcastUsageToClients: false};
@ -1970,32 +1993,17 @@ export class ActiveDoc extends EventEmitter {
// Remove expired attachments, i.e. attachments that were soft deleted a while ago. This
// needs to happen periodically, and doing it here means we can guarantee that it happens
// even if the doc is only ever opened briefly, without having to slow down startup.
const removeAttachmentsPromise = this.removeUnusedAttachments(true, usageOptions);
await safeCallAndWait("removeUnusedAttachments", () => this.removeUnusedAttachments(true, usageOptions));
// Update data size; we'll be syncing both it and attachments size to the database soon.
const updateDataSizePromise = this._updateDataSize(usageOptions);
try {
await removeAttachmentsPromise;
} catch (e) {
this._log.error(docSession, "Failed to remove expired attachments", e);
}
try {
await updateDataSizePromise;
} catch (e) {
this._log.error(docSession, "Failed to update data size", e);
}
await safeCallAndWait("_updateDataSize", () => this._updateDataSize(usageOptions));
}
this._syncDocUsageToDatabase(true);
this._logDocMetrics(docSession, 'docClose');
try {
await this._docManager.storageManager.closeDocument(this.docName);
} catch (err) {
log.error('Problem shutting down document: %s %s', this.docName, err.message);
}
await safeCallAndWait("storageManager.closeDocument",
() => this._docManager.storageManager.closeDocument(this.docName));
try {
const dataEngine = this._dataEngine ? await this._getEngine() : null;
@ -2014,24 +2022,16 @@ export class ActiveDoc extends EventEmitter {
]);
// The this.waitForInitialization promise may not yet have resolved, but
// should do so quickly now we've killed everything it depends on.
try {
await this.waitForInitialization();
} catch (err) {
// Initialization errors do not matter at this point.
}
await safeCallAndWait("waitForInitialization", () => this.waitForInitialization());
} catch (err) {
this._log.error(docSession, "failed to shutdown some resources", err);
}
// No timeout on this callback: if it hangs, it will make the document unusable.
await this._afterShutdownCallback?.();
} finally {
this._docManager.removeActiveDoc(this);
}
try {
await this._granularAccess.close();
} catch (err) {
// This should not happen.
this._log.error(docSession, "failed to shutdown granular access", err);
}
await safeCallAndWait("_granularAccess.close", () => this._granularAccess.close());
this._log.debug(docSession, "shutdown complete");
}
@ -2347,6 +2347,13 @@ export class ActiveDoc extends EventEmitter {
this._inactivityTimer.setDelay(closeTimeout);
this._log.debug(docSession, `loaded in ${loadMs} ms, InactivityTimer set to ${closeTimeout} ms`);
void this._initializeDocUsage(docSession);
// Start the periodic work, unless this doc has already started shutting down.
if (!this.muted) {
for (const interval of this._intervals) {
interval.enable();
}
}
} catch (err) {
this._fullyLoaded = true;
if (!this._shuttingDown) {

@ -12,6 +12,9 @@ import log from 'app/server/lib/log';
import {OpenMode, SQLiteDB} from 'app/server/lib/SQLiteDB';
import {getDocSessionAccessOrNull, getDocSessionUser, OptDocSession} from './DocSession';
// This method previously lived in this file. Re-export to avoid changing imports all over.
export {timeoutReached} from 'app/common/gutil';
/**
* Promisify a node-style callback function. E.g.
* fromCallback(cb => someAsyncFunc(someArgs, cb));
@ -99,20 +102,6 @@ export function exitPromise(child: ChildProcess): Promise<number|string> {
});
}
/**
* Resolves to true if promise is still pending after msec milliseconds have passed. Otherwise
* returns false, including when promise is rejected.
*/
export function timeoutReached<T>(msec: number, promise: Promise<T>): Promise<boolean> {
const timedOut = {};
// Be careful to clean up the timer after ourselves, so it doesn't remain in the event loop.
let timer: NodeJS.Timer;
const delayPromise = new Promise<any>((resolve) => (timer = setTimeout(() => resolve(timedOut), msec)));
return Promise.race([promise, delayPromise])
.then((res) => { clearTimeout(timer); return res === timedOut; })
.catch(() => false);
}
/**
* Get database url in DATABASE_URL format popularized by heroku, suitable for
* use by psql, sqlalchemy, etc.

@ -124,6 +124,23 @@ describe('gutil2', function() {
});
});
describe("timeoutReached", function() {
const DELAY_1 = 20;
const DELAY_2 = 2 * DELAY_1;
it("should return true for timed out promise", async function() {
assert.isTrue(await gutil.timeoutReached(DELAY_1, delay(DELAY_2)));
assert.isTrue(await gutil.timeoutReached(DELAY_1, delay(DELAY_2).then(() => { throw new Error("test error"); })));
});
it("should return false for promise that completes before timeout", async function() {
assert.isFalse(await gutil.timeoutReached(DELAY_2, delay(DELAY_1)));
assert.isFalse(await gutil.timeoutReached(DELAY_2, delay(DELAY_1)
.then(() => { throw new Error("test error"); })));
assert.isFalse(await gutil.timeoutReached(DELAY_2, Promise.resolve('foo')));
assert.isFalse(await gutil.timeoutReached(DELAY_2, Promise.reject('bar')));
});
});
describe("isValidHex", function() {
it('should work correctly', async function() {
assert.equal(gutil.isValidHex('#FF00FF'), true);

@ -124,7 +124,9 @@ export function setTmpLogLevel(level: string, optCaptureFunc?: (level: string, m
* captures those at minLevel and higher. Returns a promise for the array of "level: message"
* strings. These may be tested using testUtils.assertMatchArray(). Callback may return a promise.
*/
export async function captureLog(minLevel: string, callback: () => void|Promise<void>): Promise<string[]> {
export async function captureLog(
minLevel: string, callback: (messages: string[]) => void|Promise<void>
): Promise<string[]> {
const messages: string[] = [];
const prevLogLevel = log.transports.file.level;
const name = _.uniqueId('CaptureLog');
@ -140,7 +142,7 @@ export async function captureLog(minLevel: string, callback: () => void|Promise<
}
log.add(CaptureTransport as any, { captureFunc: capture, name }); // types are off.
try {
await callback();
await callback(messages);
} finally {
log.remove(name);
log.transports.file.level = prevLogLevel;

@ -3,7 +3,7 @@ import * as bluebird from 'bluebird';
/**
* Wait some time for a check to pass. Allow a pause between checks.
*/
export async function waitForIt(check: () => Promise<void>, maxWaitMs: number,
export async function waitForIt(check: () => Promise<void>|void, maxWaitMs: number,
stepWaitMs: number = 1000) {
const start = Date.now();
for (;;) {

Loading…
Cancel
Save