(core) Adding DELETE /api/docs/webhooks/queue endpoint to clear the queue

Summary:
Creating an API endpoint to cancel any queued webhook messages from
a document.

Test Plan: Updated

Reviewers: paulfitz, georgegevoian

Reviewed By: paulfitz, georgegevoian

Differential Revision: https://phab.getgrist.com/D3713
This commit is contained in:
Jarosław Sadziński
2022-11-30 13:04:27 +01:00
parent 29a7eadb85
commit 92d4fca855
10 changed files with 380 additions and 24 deletions

View File

@@ -1,7 +1,6 @@
import {LocalActionBundle} from 'app/common/ActionBundle';
import {ActionSummary, TableDelta} from 'app/common/ActionSummary';
import {MapWithTTL} from 'app/common/AsyncCreate';
import {delay} from 'app/common/delay';
import {fromTableDataAction, RowRecord, TableColValues, TableDataAction} from 'app/common/DocActions';
import {StringUnion} from 'app/common/StringUnion';
import {MetaRowRecord} from 'app/common/TableData';
@@ -10,11 +9,13 @@ import {summarizeAction} from 'app/server/lib/ActionSummary';
import {ActiveDoc} from 'app/server/lib/ActiveDoc';
import {makeExceptionalDocSession} from 'app/server/lib/DocSession';
import log from 'app/server/lib/log';
import {matchesBaseDomain} from 'app/server/lib/requestUtils';
import {delayAbort} from 'app/server/lib/serverUtils';
import {promisifyAll} from 'bluebird';
import * as _ from 'lodash';
import {AbortController, AbortSignal} from 'node-abort-controller';
import fetch from 'node-fetch';
import {createClient, Multi, RedisClient} from 'redis';
import {matchesBaseDomain} from 'app/server/lib/requestUtils';
promisifyAll(RedisClient.prototype);
@@ -72,6 +73,10 @@ const MAX_QUEUE_SIZE = 1000;
const WEBHOOK_CACHE_TTL = 10000;
// A time to wait for between retries of a webhook. Exposed for tests.
const TRIGGER_WAIT_DELAY =
process.env.GRIST_TRIGGER_WAIT_DELAY ? parseInt(process.env.GRIST_TRIGGER_WAIT_DELAY, 10) : 1000;
// Processes triggers for records changed as described in action bundles.
// initiating webhooks and automations.
// The interesting stuff starts in the handle() method.
@@ -107,6 +112,9 @@ export class DocTriggers {
// Promise which resolves after we finish fetching the backup queue from redis on startup.
private _getRedisQueuePromise: Promise<void> | undefined;
// Abort controller for the loop that sends webhooks.
private _loopAbort: AbortController|undefined;
constructor(private _activeDoc: ActiveDoc) {
const redisUrl = process.env.REDIS_URL;
if (redisUrl) {
@@ -118,6 +126,7 @@ export class DocTriggers {
public shutdown() {
this._shuttingDown = true;
this._loopAbort?.abort();
if (!this._sending) {
void(this._redisClientField?.quitAsync());
}
@@ -210,12 +219,30 @@ export class DocTriggers {
// Prevent further document activity while the queue is too full.
while (this._drainingQueue && !this._shuttingDown) {
await delay(1000);
await delayAbort(1000, this._loopAbort?.signal);
}
return summary;
}
public async clearWebhookQueue() {
// Make sure we are after start and in sync with redis.
if (this._getRedisQueuePromise) {
await this._getRedisQueuePromise;
}
// Clear in-memory queue.
const removed = this._webHookEventQueue.splice(0, this._webHookEventQueue.length).length;
// Notify the loop that it should restart.
this._loopAbort?.abort();
// If we have backup in redis, clear it also.
// NOTE: this is subject to a race condition, currently it is not possible, but any future modification probably
// will require some kind of locking over the queue (or a rewrite)
if (removed && this._redisClient) {
await this._redisClient.multi().ltrim(this._redisQueueKey, 0, -1).execAsync();
}
}
private get _docId() {
return this._activeDoc.docName;
}
@@ -421,14 +448,18 @@ export class DocTriggers {
// TODO delay/prevent shutting down while queue isn't empty?
while (!this._shuttingDown) {
this._loopAbort = new AbortController();
if (!this._webHookEventQueue.length) {
await delay(1000);
await delayAbort(TRIGGER_WAIT_DELAY, this._loopAbort.signal).catch(() => {});
continue;
}
const id = this._webHookEventQueue[0].id;
const batch = _.takeWhile(this._webHookEventQueue.slice(0, 100), {id});
const body = JSON.stringify(batch.map(e => e.payload));
const url = await this._getWebHookUrl(id);
if (this._loopAbort.signal.aborted) {
continue;
}
let meta: Record<string, any>|undefined;
let success: boolean;
@@ -437,7 +468,11 @@ export class DocTriggers {
} else {
meta = {numEvents: batch.length, webhookId: id, host: new URL(url).host};
this._log("Sending batch of webhook events", meta);
success = await this._sendWebhookWithRetries(url, body);
success = await this._sendWebhookWithRetries(url, body, this._loopAbort.signal);
}
if (this._loopAbort.signal.aborted) {
continue;
}
this._webHookEventQueue.splice(0, batch.length);
@@ -493,7 +528,7 @@ export class DocTriggers {
return this._drainingQueue ? 5 : 20;
}
private async _sendWebhookWithRetries(url: string, body: string) {
private async _sendWebhookWithRetries(url: string, body: string, signal: AbortSignal) {
const maxWait = 64;
let wait = 1;
for (let attempt = 0; attempt < this._maxWebhookAttempts; attempt++) {
@@ -507,6 +542,7 @@ export class DocTriggers {
headers: {
'Content-Type': 'application/json',
},
signal,
});
if (response.status === 200) {
return true;
@@ -516,6 +552,10 @@ export class DocTriggers {
this._log(`Webhook sending error: ${e}`, {level: 'warn', attempt});
}
if (signal.aborted) {
return false;
}
// Don't wait any more if this is the last attempt.
if (attempt >= this._maxWebhookAttempts - 1) {
return false;
@@ -526,7 +566,11 @@ export class DocTriggers {
if (this._shuttingDown) {
return false;
}
await delay(1000);
try {
await delayAbort(TRIGGER_WAIT_DELAY, signal);
} catch (e) {
return false;
}
}
if (wait < maxWait) {
wait *= 2;