diff --git a/app/server/lib/DocApi.ts b/app/server/lib/DocApi.ts index a0624204..464ae0c2 100644 --- a/app/server/lib/DocApi.ts +++ b/app/server/lib/DocApi.ts @@ -844,6 +844,10 @@ export class DocWorkerApi { const webhookId = req.params.webhookId; const {fields, trigger, url} = await getWebhookSettings(activeDoc, req, webhookId, req.body); + if (fields.enabled === false) { + await activeDoc.triggers.clearSingleWebhookQueue(webhookId); + } + const triggerRowId = activeDoc.triggers.getWebhookTriggerRecord(webhookId).id; await this._dbManager.connection.transaction(async manager => { diff --git a/app/server/lib/Triggers.ts b/app/server/lib/Triggers.ts index e9e947f6..313cedf4 100644 --- a/app/server/lib/Triggers.ts +++ b/app/server/lib/Triggers.ts @@ -209,7 +209,7 @@ export class DocTriggers { // Fetch the modified records in full so they can be sent in webhooks // They will also be used to check if the record is ready - const tableDataAction = this._activeDoc.fetchQuery(docSession, {tableId, filters}) + const tableDataAction = this._activeDoc.fetchQuery(docSession, {tableId, filters}, true) .then(tableFetchResult => tableFetchResult.tableData); tasks.push({tableDelta, triggers, tableDataAction, recordDeltas}); } @@ -242,7 +242,7 @@ export class DocTriggers { // Prevent further document activity while the queue is too full. while (this._drainingQueue && !this._shuttingDown) { const sendNotificationPromise = this._activeDoc.sendWebhookNotification(WebhookMessageType.Overflow); - const delayPromise = delayAbort(5000, this._loopAbort?.signal); + const delayPromise = delayAbort(5000, this._loopAbort?.signal).catch(() => {}); await Promise.all([sendNotificationPromise, delayPromise]); } @@ -327,6 +327,7 @@ export class DocTriggers { } public async clearWebhookQueue() { + this._log("Webhook being queue cleared"); // Make sure we are after start and in sync with redis. if (this._getRedisQueuePromise) { await this._getRedisQueuePromise; @@ -342,21 +343,20 @@ export class DocTriggers { await this._redisClient.multi().del(this._redisQueueKey).execAsync(); } await this._stats.clear(); + this._log("Webhook queue cleared", {numRemoved: removed}); } public async clearSingleWebhookQueue(webhookId: string) { + this._log("Single webhook queue being cleared", {webhookId}); // Make sure we are after start and in sync with redis. if (this._getRedisQueuePromise) { await this._getRedisQueuePromise; } // Clear in-memory queue for given webhook key. - let removed = 0; - for(let i=0; i< this._webHookEventQueue.length; i++){ - if(this._webHookEventQueue[i].id == webhookId){ - this._webHookEventQueue.splice(i, 1); - removed++; - } - } + const lengthBefore = this._webHookEventQueue.length; + this._webHookEventQueue = this._webHookEventQueue.filter(e => e.id !== webhookId); + const removed = lengthBefore - this._webHookEventQueue.length; + // Notify the loop that it should restart. this._loopAbort?.abort(); // If we have backup in redis, clear it also. @@ -367,11 +367,14 @@ export class DocTriggers { multi.del(this._redisQueueKey); // Re-add all the remaining events to the queue. - const strings = this._webHookEventQueue.map(e => JSON.stringify(e)); - multi.rpush(this._redisQueueKey, ...strings); + if (this._webHookEventQueue.length) { + const strings = this._webHookEventQueue.map(e => JSON.stringify(e)); + multi.rpush(this._redisQueueKey, ...strings); + } await multi.execAsync(); } await this._stats.clear(); + this._log("Single webhook queue cleared", {numRemoved: removed, webhookId}); } // Converts a table to tableId by looking it up in _grist_Tables. diff --git a/test/nbrowser/WebhookOverflow.ts b/test/nbrowser/WebhookOverflow.ts index 1550476b..df52ca15 100644 --- a/test/nbrowser/WebhookOverflow.ts +++ b/test/nbrowser/WebhookOverflow.ts @@ -13,11 +13,12 @@ describe('WebhookOverflow', function () { let oldEnv: EnvironmentSnapshot; let doc: DocCreationInfo; let docApi: DocAPI; + gu.bigScreen(); before(async function () { oldEnv = new EnvironmentSnapshot(); process.env.ALLOWED_WEBHOOK_DOMAINS = '*'; - process.env.GRIST_MAX_QUEUE_SIZE = '2'; + process.env.GRIST_MAX_QUEUE_SIZE = '4'; await server.restart(); session = await gu.session().teamSite.login(); const api = session.createHomeApi(); @@ -25,15 +26,17 @@ describe('WebhookOverflow', function () { docApi = api.getDocAPI(doc.id); await api.applyUserActions(doc.id, [ ['AddTable', 'Table2', [{id: 'A'}, {id: 'B'}, {id: 'C'}, {id: 'D'}, {id: 'E'}]], + ['AddRecord', 'Table2', null, {}], ]); const webhookDetails: WebhookFields = { url: 'https://localhost/WrongWebhook', - eventTypes: ["add", "update"], + eventTypes: ["update"], enabled: true, name: 'test webhook', tableId: 'Table2', }; await docApi.addWebhook(webhookDetails); + await docApi.addWebhook(webhookDetails); }); after(async function () { @@ -49,28 +52,57 @@ describe('WebhookOverflow', function () { await driver.sendKeys(...keys); } - it('should show a message when overflowed', async function () { + async function getNumWaiting() { + const cells = await gu.getVisibleDetailCells({col: 'Status', rowNums: [1, 2]}); + return cells.map((cell) => { + const status = JSON.parse(cell.replace(/\n/g, '')); + return status.numWaiting; + }); + } + + async function overflowWebhook() { await gu.openPage('Table2'); await gu.getCell('A', 1).click(); - await gu.enterCell('123'); + await gu.enterCell(new Date().toString()); await gu.getCell('B', 1).click(); - await enterCellWithoutWaitingOnServer('124'); + await enterCellWithoutWaitingOnServer(new Date().toString()); await gu.waitToPass(async () => { const toast = await gu.getToasts(); assert.include(toast, 'New changes are temporarily suspended. Webhooks queue overflowed.' + ' Please check webhooks settings, remove invalid webhooks, and clean the queue.\ngo to webhook settings'); }, 4000); - }); + } - it('message should disappear after clearing queue', async function () { - await openWebhookPageWithoutWaitForServer(); - await driver.findContent('button', /Clear Queue/).click(); + async function overflowResolved() { await gu.waitForServer(); await gu.waitToPass(async () => { const toast = await gu.getToasts(); assert.notInclude(toast, 'New changes are temporarily suspended. Webhooks queue overflowed.' + ' Please check webhooks settings, remove invalid webhooks, and clean the queue.\ngo to webhook settings'); }, 12500); + } + + it('should show a message when overflowed', async function () { + await overflowWebhook(); + }); + + it('message should disappear after clearing queue', async function () { + await openWebhookPageWithoutWaitForServer(); + assert.deepEqual(await getNumWaiting(), [2, 2]); + await driver.findContent('button', /Clear Queue/).click(); + await overflowResolved(); + assert.deepEqual(await getNumWaiting(), [0, 0]); + }); + + it('should clear a single webhook queue when that webhook is disabled', async function () { + await overflowWebhook(); + await openWebhookPageWithoutWaitForServer(); + await gu.waitToPass(async () => { + assert.deepEqual(await getNumWaiting(), [2, 2]); + }, 4000); + await gu.getDetailCell({col: 'Enabled', rowNum: 1}).click(); + await overflowResolved(); + assert.deepEqual(await getNumWaiting(), [0, 2]); }); });