(core) Remove DB transaction from webhook update, add mutex to all webhook endpoints

Summary:
This removes problematic code that was holding a HomeDB transaction while applying user actions which could hang indefinitely, especially if the webhook queue is full as in https://grist.slack.com/archives/C05DBJ6LA1F/p1698159750945949.

The discussion about adding this code is here: https://phab.getgrist.com/D3821#inline-45054

The initial motivation was to roll back HomeDB changes if something went wrong while applying user actions, to avoid saving only part of the changes the user requested. I think it's actually fine to just allow such a partial save to happen - I don't see anything particularly undesirable about keeping an update to the webhook URL if other updates requested by the user didn't also get applied, as the fields don't affect each other.

The comment approving the transaction approach said "so we shouldn't end up leave the transaction hanging around too long" which has been falsified.

It looks like there was also some desire to prevent a mess caused by multiple simultaneous calls to this endpoint, which the transaction may have helped with a little, but didn't really seem like a solution. Comments in `Triggers.ts` also mention fears of race conditions when clearing (some of) the queue and the need for some locking. So I wrapped all webhook-related endpoints in a simple `Mutex` held by the `ActiveDoc` to prevent simultaneous changes. I *think* this is a good thing. These endpoints shouldn't be called frequently enough to create a performance issue, and this shouldn't affect actually sending webhook events when records are added/updated. And it does seem like interleaving calls to these endpoints could cause very weird problems.

Test Plan: Nothing yet, I'd like to hear if others think this is sensible.

Reviewers: paulfitz

Reviewed By: paulfitz

Differential Revision: https://phab.getgrist.com/D4111
This commit is contained in:
Alex Hall 2023-11-14 15:46:33 +02:00
parent dda1b5cf1b
commit 3dfe4be5f3
3 changed files with 51 additions and 31 deletions

View File

@ -886,7 +886,9 @@ export function isValidHex(val: string): boolean {
* Resolves to true if promise is still pending after msec milliseconds have passed. Otherwise * Resolves to true if promise is still pending after msec milliseconds have passed. Otherwise
* returns false, including when promise is rejected. * returns false, including when promise is rejected.
*/ */
export async function timeoutReached(msec: number, promise: Promise<unknown>): Promise<boolean> { export async function timeoutReached(
msec: number, promise: Promise<unknown>, options: {rethrow: boolean} = {rethrow: false}
): Promise<boolean> {
const timedOut = {}; const timedOut = {};
// Be careful to clean up the timer after ourselves, so it doesn't remain in the event loop. // Be careful to clean up the timer after ourselves, so it doesn't remain in the event loop.
let timer: NodeJS.Timer; let timer: NodeJS.Timer;
@ -895,6 +897,9 @@ export async function timeoutReached(msec: number, promise: Promise<unknown>): P
const res = await Promise.race([promise, delayPromise]); const res = await Promise.race([promise, delayPromise]);
return res == timedOut; return res == timedOut;
} catch (err) { } catch (err) {
if (options.rethrow) {
throw err;
}
return false; return false;
} finally { } finally {
clearTimeout(timer!); clearTimeout(timer!);

View File

@ -216,6 +216,8 @@ export class ActiveDoc extends EventEmitter {
public readonly docPluginManager: DocPluginManager|null; public readonly docPluginManager: DocPluginManager|null;
public readonly docClients: DocClients; // Only exposed for Sharing.ts public readonly docClients: DocClients; // Only exposed for Sharing.ts
public docData: DocData|null = null; public docData: DocData|null = null;
// Used by DocApi to only allow one webhook-related endpoint to run at a time.
public readonly triggersLock: Mutex = new Mutex();
protected _actionHistory: ActionHistory; protected _actionHistory: ActionHistory;
protected _docManager: DocManager; protected _docManager: DocManager;

View File

@ -12,7 +12,7 @@ import {
} from 'app/common/DocActions'; } from 'app/common/DocActions';
import {isRaisedException} from "app/common/gristTypes"; import {isRaisedException} from "app/common/gristTypes";
import {buildUrlId, parseUrlId} from "app/common/gristUrls"; import {buildUrlId, parseUrlId} from "app/common/gristUrls";
import {isAffirmative} from "app/common/gutil"; import {isAffirmative, timeoutReached} from "app/common/gutil";
import {SchemaTypes} from "app/common/schema"; import {SchemaTypes} from "app/common/schema";
import {SortFunc} from 'app/common/SortFunc'; import {SortFunc} from 'app/common/SortFunc';
import {Sort} from 'app/common/SortSpec'; import {Sort} from 'app/common/SortSpec';
@ -101,6 +101,9 @@ const MAX_PARALLEL_REQUESTS_PER_DOC = 10;
// then the _dailyUsage cache may become unreliable and users may be able to exceed their allocated requests. // then the _dailyUsage cache may become unreliable and users may be able to exceed their allocated requests.
const MAX_ACTIVE_DOCS_USAGE_CACHE = 1000; const MAX_ACTIVE_DOCS_USAGE_CACHE = 1000;
// Maximum amount of time that a webhook endpoint can hold the mutex for in withDocTriggersLock.
const MAX_DOC_TRIGGERS_LOCK_MS = 15_000;
// Maximum duration of a call to /sql. Does not apply to internal calls to SQLite. // Maximum duration of a call to /sql. Does not apply to internal calls to SQLite.
const MAX_CUSTOM_SQL_MSEC = appSettings.section('integrations') const MAX_CUSTOM_SQL_MSEC = appSettings.section('integrations')
.section('sql').flag('timeout').requireInt({ .section('sql').flag('timeout').requireInt({
@ -186,8 +189,26 @@ export class DocWorkerApi {
// Middleware to limit number of outstanding requests per document. Will also // Middleware to limit number of outstanding requests per document. Will also
// handle errors like expressWrap would. // handle errors like expressWrap would.
const throttled = this._apiThrottle.bind(this); const throttled = this._apiThrottle.bind(this);
const withDoc = (callback: WithDocHandler) => throttled(this._requireActiveDoc(callback)); const withDoc = (callback: WithDocHandler) => throttled(this._requireActiveDoc(callback));
// Like withDoc, but only one such callback can run at a time per active doc.
// This is used for webhook endpoints to prevent simultaneous changes to configuration
// or clearing queues which could lead to weird problems.
const withDocTriggersLock = (callback: WithDocHandler) => withDoc(
async (activeDoc: ActiveDoc, req: RequestWithLogin, resp: Response) =>
await activeDoc.triggersLock.runExclusive(async () => {
// We don't want to hold the mutex indefinitely so that if one call gets stuck
// (especially while trying to apply user actions which are stalled by a full queue)
// another call which would clear a queue, disable a webhook, or fix something related
// can eventually succeed.
if (await timeoutReached(MAX_DOC_TRIGGERS_LOCK_MS, callback(activeDoc, req, resp), {rethrow: true})) {
log.rawError(`Webhook endpoint timed out, releasing mutex`,
{method: req.method, path: req.path, docId: activeDoc.docName});
}
})
);
// Apply user actions to a document. // Apply user actions to a document.
this._app.post('/api/docs/:docId/apply', canEdit, withDoc(async (activeDoc, req, res) => { this._app.post('/api/docs/:docId/apply', canEdit, withDoc(async (activeDoc, req, res) => {
const parseStrings = !isAffirmative(req.query.noparse); const parseStrings = !isAffirmative(req.query.noparse);
@ -376,7 +397,6 @@ export class DocWorkerApi {
return { return {
fields, fields,
url, url,
trigger,
}; };
} }
@ -793,7 +813,7 @@ export class DocWorkerApi {
// Add a new webhook and trigger // Add a new webhook and trigger
this._app.post('/api/docs/:docId/webhooks', isOwner, validate(WebhookSubscribeCollection), this._app.post('/api/docs/:docId/webhooks', isOwner, validate(WebhookSubscribeCollection),
withDoc(async (activeDoc, req, res) => { withDocTriggersLock(async (activeDoc, req, res) => {
const registeredWebhooks: Array<WebhookSubscription> = []; const registeredWebhooks: Array<WebhookSubscription> = [];
for(const webhook of req.body.webhooks) { for(const webhook of req.body.webhooks) {
const registeredWebhook = await registerWebhook(activeDoc, req, webhook.fields); const registeredWebhook = await registerWebhook(activeDoc, req, webhook.fields);
@ -809,7 +829,7 @@ export class DocWorkerApi {
@deprecated please call to POST /webhooks instead, this endpoint is only for sake of backward compatibility @deprecated please call to POST /webhooks instead, this endpoint is only for sake of backward compatibility
*/ */
this._app.post('/api/docs/:docId/tables/:tableId/_subscribe', isOwner, validate(WebhookSubscribe), this._app.post('/api/docs/:docId/tables/:tableId/_subscribe', isOwner, validate(WebhookSubscribe),
withDoc(async (activeDoc, req, res) => { withDocTriggersLock(async (activeDoc, req, res) => {
const registeredWebhook = await registerWebhook(activeDoc, req, req.body); const registeredWebhook = await registerWebhook(activeDoc, req, req.body);
res.json(registeredWebhook); res.json(registeredWebhook);
}) })
@ -817,7 +837,7 @@ export class DocWorkerApi {
// Clears all outgoing webhooks in the queue for this document. // Clears all outgoing webhooks in the queue for this document.
this._app.delete('/api/docs/:docId/webhooks/queue', isOwner, this._app.delete('/api/docs/:docId/webhooks/queue', isOwner,
withDoc(async (activeDoc, req, res) => { withDocTriggersLock(async (activeDoc, req, res) => {
await activeDoc.clearWebhookQueue(); await activeDoc.clearWebhookQueue();
await activeDoc.sendWebhookNotification(); await activeDoc.sendWebhookNotification();
res.json({success: true}); res.json({success: true});
@ -826,23 +846,24 @@ export class DocWorkerApi {
// Remove webhook and trigger created above // Remove webhook and trigger created above
this._app.delete('/api/docs/:docId/webhooks/:webhookId', isOwner, this._app.delete('/api/docs/:docId/webhooks/:webhookId', isOwner,
withDoc(removeWebhook) withDocTriggersLock(removeWebhook)
); );
/** /**
@deprecated please call to DEL /webhooks instead, this endpoint is only for sake of backward compatibility @deprecated please call to DEL /webhooks instead, this endpoint is only for sake of backward compatibility
*/ */
this._app.post('/api/docs/:docId/tables/:tableId/_unsubscribe', canEdit, this._app.post('/api/docs/:docId/tables/:tableId/_unsubscribe', canEdit,
withDoc(removeWebhook) withDocTriggersLock(removeWebhook)
); );
// Update a webhook // Update a webhook
this._app.patch( this._app.patch(
'/api/docs/:docId/webhooks/:webhookId', isOwner, validate(WebhookPatch), withDoc(async (activeDoc, req, res) => { '/api/docs/:docId/webhooks/:webhookId', isOwner, validate(WebhookPatch),
withDocTriggersLock(async (activeDoc, req, res) => {
const docId = activeDoc.docName; const docId = activeDoc.docName;
const webhookId = req.params.webhookId; const webhookId = req.params.webhookId;
const {fields, trigger, url} = await getWebhookSettings(activeDoc, req, webhookId, req.body); const {fields, url} = await getWebhookSettings(activeDoc, req, webhookId, req.body);
if (fields.enabled === false) { if (fields.enabled === false) {
await activeDoc.triggers.clearSingleWebhookQueue(webhookId); await activeDoc.triggers.clearSingleWebhookQueue(webhookId);
@ -850,24 +871,18 @@ export class DocWorkerApi {
const triggerRowId = activeDoc.triggers.getWebhookTriggerRecord(webhookId).id; const triggerRowId = activeDoc.triggers.getWebhookTriggerRecord(webhookId).id;
await this._dbManager.connection.transaction(async manager => { // update url in homedb
if (url) {
await this._dbManager.updateWebhookUrl(webhookId, docId, url);
activeDoc.triggers.webhookDeleted(webhookId); // clear cache
}
// update url // then update document
if (url) { if (Object.keys(fields).length) {
await this._dbManager.updateWebhookUrl(webhookId, docId, url, manager); await handleSandboxError("_grist_Triggers", [], activeDoc.applyUserActions(
activeDoc.triggers.webhookDeleted(webhookId); // clear cache docSessionFromRequest(req),
} [['UpdateRecord', "_grist_Triggers", triggerRowId, fields]]));
}
// then update sqlite.
if (Object.keys(fields).length) {
// In order to make sure to push a valid modification, let's update all fields since
// some may have changed since lookup.
_.defaults(fields, _.omit(trigger, 'id'));
await handleSandboxError("_grist_Triggers", [], activeDoc.applyUserActions(
docSessionFromRequest(req),
[['UpdateRecord', "_grist_Triggers", triggerRowId, fields]]));
}
});
await activeDoc.sendWebhookNotification(); await activeDoc.sendWebhookNotification();
@ -875,11 +890,9 @@ export class DocWorkerApi {
}) })
); );
// Clears a single webhook in the queue for this document. // Clears a single webhook in the queue for this document.
this._app.delete('/api/docs/:docId/webhooks/queue/:webhookId', isOwner, this._app.delete('/api/docs/:docId/webhooks/queue/:webhookId', isOwner,
withDoc(async (activeDoc, req, res) => { withDocTriggersLock(async (activeDoc, req, res) => {
const webhookId = req.params.webhookId; const webhookId = req.params.webhookId;
await activeDoc.clearSingleWebhookQueue(webhookId); await activeDoc.clearSingleWebhookQueue(webhookId);
await activeDoc.sendWebhookNotification(); await activeDoc.sendWebhookNotification();
@ -889,7 +902,7 @@ export class DocWorkerApi {
// Lists all webhooks and their current status in the document. // Lists all webhooks and their current status in the document.
this._app.get('/api/docs/:docId/webhooks', isOwner, this._app.get('/api/docs/:docId/webhooks', isOwner,
withDoc(async (activeDoc, req, res) => { withDocTriggersLock(async (activeDoc, req, res) => {
res.json(await activeDoc.webhooksSummary()); res.json(await activeDoc.webhooksSummary());
}) })
); );