(core) When a webhook is disabled, clear its queue

Summary: Also fixes a few bugs found along the way, particularly that webhook payloads could contain stale data.

Test Plan: Added an nbrowser test, made existing test a bit more detailed.

Reviewers: paulfitz

Reviewed By: paulfitz

Subscribers: paulfitz

Differential Revision: https://phab.getgrist.com/D4102
This commit is contained in:
Alex Hall 2023-10-31 15:07:02 +02:00
parent 95cbbb8910
commit b7e9d2705e
3 changed files with 59 additions and 20 deletions

View File

@ -844,6 +844,10 @@ export class DocWorkerApi {
const webhookId = req.params.webhookId; const webhookId = req.params.webhookId;
const {fields, trigger, url} = await getWebhookSettings(activeDoc, req, webhookId, req.body); const {fields, trigger, url} = await getWebhookSettings(activeDoc, req, webhookId, req.body);
if (fields.enabled === false) {
await activeDoc.triggers.clearSingleWebhookQueue(webhookId);
}
const triggerRowId = activeDoc.triggers.getWebhookTriggerRecord(webhookId).id; const triggerRowId = activeDoc.triggers.getWebhookTriggerRecord(webhookId).id;
await this._dbManager.connection.transaction(async manager => { await this._dbManager.connection.transaction(async manager => {

View File

@ -209,7 +209,7 @@ export class DocTriggers {
// Fetch the modified records in full so they can be sent in webhooks // Fetch the modified records in full so they can be sent in webhooks
// They will also be used to check if the record is ready // They will also be used to check if the record is ready
const tableDataAction = this._activeDoc.fetchQuery(docSession, {tableId, filters}) const tableDataAction = this._activeDoc.fetchQuery(docSession, {tableId, filters}, true)
.then(tableFetchResult => tableFetchResult.tableData); .then(tableFetchResult => tableFetchResult.tableData);
tasks.push({tableDelta, triggers, tableDataAction, recordDeltas}); tasks.push({tableDelta, triggers, tableDataAction, recordDeltas});
} }
@ -242,7 +242,7 @@ export class DocTriggers {
// Prevent further document activity while the queue is too full. // Prevent further document activity while the queue is too full.
while (this._drainingQueue && !this._shuttingDown) { while (this._drainingQueue && !this._shuttingDown) {
const sendNotificationPromise = this._activeDoc.sendWebhookNotification(WebhookMessageType.Overflow); const sendNotificationPromise = this._activeDoc.sendWebhookNotification(WebhookMessageType.Overflow);
const delayPromise = delayAbort(5000, this._loopAbort?.signal); const delayPromise = delayAbort(5000, this._loopAbort?.signal).catch(() => {});
await Promise.all([sendNotificationPromise, delayPromise]); await Promise.all([sendNotificationPromise, delayPromise]);
} }
@ -327,6 +327,7 @@ export class DocTriggers {
} }
public async clearWebhookQueue() { public async clearWebhookQueue() {
this._log("Webhook being queue cleared");
// Make sure we are after start and in sync with redis. // Make sure we are after start and in sync with redis.
if (this._getRedisQueuePromise) { if (this._getRedisQueuePromise) {
await this._getRedisQueuePromise; await this._getRedisQueuePromise;
@ -342,21 +343,20 @@ export class DocTriggers {
await this._redisClient.multi().del(this._redisQueueKey).execAsync(); await this._redisClient.multi().del(this._redisQueueKey).execAsync();
} }
await this._stats.clear(); await this._stats.clear();
this._log("Webhook queue cleared", {numRemoved: removed});
} }
public async clearSingleWebhookQueue(webhookId: string) { public async clearSingleWebhookQueue(webhookId: string) {
this._log("Single webhook queue being cleared", {webhookId});
// Make sure we are after start and in sync with redis. // Make sure we are after start and in sync with redis.
if (this._getRedisQueuePromise) { if (this._getRedisQueuePromise) {
await this._getRedisQueuePromise; await this._getRedisQueuePromise;
} }
// Clear in-memory queue for given webhook key. // Clear in-memory queue for given webhook key.
let removed = 0; const lengthBefore = this._webHookEventQueue.length;
for(let i=0; i< this._webHookEventQueue.length; i++){ this._webHookEventQueue = this._webHookEventQueue.filter(e => e.id !== webhookId);
if(this._webHookEventQueue[i].id == webhookId){ const removed = lengthBefore - this._webHookEventQueue.length;
this._webHookEventQueue.splice(i, 1);
removed++;
}
}
// Notify the loop that it should restart. // Notify the loop that it should restart.
this._loopAbort?.abort(); this._loopAbort?.abort();
// If we have backup in redis, clear it also. // If we have backup in redis, clear it also.
@ -367,11 +367,14 @@ export class DocTriggers {
multi.del(this._redisQueueKey); multi.del(this._redisQueueKey);
// Re-add all the remaining events to the queue. // Re-add all the remaining events to the queue.
if (this._webHookEventQueue.length) {
const strings = this._webHookEventQueue.map(e => JSON.stringify(e)); const strings = this._webHookEventQueue.map(e => JSON.stringify(e));
multi.rpush(this._redisQueueKey, ...strings); multi.rpush(this._redisQueueKey, ...strings);
}
await multi.execAsync(); await multi.execAsync();
} }
await this._stats.clear(); await this._stats.clear();
this._log("Single webhook queue cleared", {numRemoved: removed, webhookId});
} }
// Converts a table to tableId by looking it up in _grist_Tables. // Converts a table to tableId by looking it up in _grist_Tables.

View File

@ -13,11 +13,12 @@ describe('WebhookOverflow', function () {
let oldEnv: EnvironmentSnapshot; let oldEnv: EnvironmentSnapshot;
let doc: DocCreationInfo; let doc: DocCreationInfo;
let docApi: DocAPI; let docApi: DocAPI;
gu.bigScreen();
before(async function () { before(async function () {
oldEnv = new EnvironmentSnapshot(); oldEnv = new EnvironmentSnapshot();
process.env.ALLOWED_WEBHOOK_DOMAINS = '*'; process.env.ALLOWED_WEBHOOK_DOMAINS = '*';
process.env.GRIST_MAX_QUEUE_SIZE = '2'; process.env.GRIST_MAX_QUEUE_SIZE = '4';
await server.restart(); await server.restart();
session = await gu.session().teamSite.login(); session = await gu.session().teamSite.login();
const api = session.createHomeApi(); const api = session.createHomeApi();
@ -25,15 +26,17 @@ describe('WebhookOverflow', function () {
docApi = api.getDocAPI(doc.id); docApi = api.getDocAPI(doc.id);
await api.applyUserActions(doc.id, [ await api.applyUserActions(doc.id, [
['AddTable', 'Table2', [{id: 'A'}, {id: 'B'}, {id: 'C'}, {id: 'D'}, {id: 'E'}]], ['AddTable', 'Table2', [{id: 'A'}, {id: 'B'}, {id: 'C'}, {id: 'D'}, {id: 'E'}]],
['AddRecord', 'Table2', null, {}],
]); ]);
const webhookDetails: WebhookFields = { const webhookDetails: WebhookFields = {
url: 'https://localhost/WrongWebhook', url: 'https://localhost/WrongWebhook',
eventTypes: ["add", "update"], eventTypes: ["update"],
enabled: true, enabled: true,
name: 'test webhook', name: 'test webhook',
tableId: 'Table2', tableId: 'Table2',
}; };
await docApi.addWebhook(webhookDetails); await docApi.addWebhook(webhookDetails);
await docApi.addWebhook(webhookDetails);
}); });
after(async function () { after(async function () {
@ -49,28 +52,57 @@ describe('WebhookOverflow', function () {
await driver.sendKeys(...keys); await driver.sendKeys(...keys);
} }
it('should show a message when overflowed', async function () { async function getNumWaiting() {
const cells = await gu.getVisibleDetailCells({col: 'Status', rowNums: [1, 2]});
return cells.map((cell) => {
const status = JSON.parse(cell.replace(/\n/g, ''));
return status.numWaiting;
});
}
async function overflowWebhook() {
await gu.openPage('Table2'); await gu.openPage('Table2');
await gu.getCell('A', 1).click(); await gu.getCell('A', 1).click();
await gu.enterCell('123'); await gu.enterCell(new Date().toString());
await gu.getCell('B', 1).click(); await gu.getCell('B', 1).click();
await enterCellWithoutWaitingOnServer('124'); await enterCellWithoutWaitingOnServer(new Date().toString());
await gu.waitToPass(async () => { await gu.waitToPass(async () => {
const toast = await gu.getToasts(); const toast = await gu.getToasts();
assert.include(toast, 'New changes are temporarily suspended. Webhooks queue overflowed.' + assert.include(toast, 'New changes are temporarily suspended. Webhooks queue overflowed.' +
' Please check webhooks settings, remove invalid webhooks, and clean the queue.\ngo to webhook settings'); ' Please check webhooks settings, remove invalid webhooks, and clean the queue.\ngo to webhook settings');
}, 4000); }, 4000);
}); }
it('message should disappear after clearing queue', async function () { async function overflowResolved() {
await openWebhookPageWithoutWaitForServer();
await driver.findContent('button', /Clear Queue/).click();
await gu.waitForServer(); await gu.waitForServer();
await gu.waitToPass(async () => { await gu.waitToPass(async () => {
const toast = await gu.getToasts(); const toast = await gu.getToasts();
assert.notInclude(toast, 'New changes are temporarily suspended. Webhooks queue overflowed.' + assert.notInclude(toast, 'New changes are temporarily suspended. Webhooks queue overflowed.' +
' Please check webhooks settings, remove invalid webhooks, and clean the queue.\ngo to webhook settings'); ' Please check webhooks settings, remove invalid webhooks, and clean the queue.\ngo to webhook settings');
}, 12500); }, 12500);
}
it('should show a message when overflowed', async function () {
await overflowWebhook();
});
it('message should disappear after clearing queue', async function () {
await openWebhookPageWithoutWaitForServer();
assert.deepEqual(await getNumWaiting(), [2, 2]);
await driver.findContent('button', /Clear Queue/).click();
await overflowResolved();
assert.deepEqual(await getNumWaiting(), [0, 0]);
});
it('should clear a single webhook queue when that webhook is disabled', async function () {
await overflowWebhook();
await openWebhookPageWithoutWaitForServer();
await gu.waitToPass(async () => {
assert.deepEqual(await getNumWaiting(), [2, 2]);
}, 4000);
await gu.getDetailCell({col: 'Enabled', rowNum: 1}).click();
await overflowResolved();
assert.deepEqual(await getNumWaiting(), [0, 2]);
}); });
}); });