(core) deleting queue from single webhook

Summary: Using standard tost notification, message about webhook queue being overflown was added. message is permanent as long as queue is full. Message contains linkt to the webhook setings

Test Plan: two nbrowser test was added - one to check if message is show when queue is full, and second to check if message is dismiss when queue was cleaned.

Reviewers: georgegevoian

Reviewed By: georgegevoian

Subscribers: jarek

Differential Revision: https://phab.getgrist.com/D3929
This commit is contained in:
Jakub Serafin
2023-07-18 09:24:10 +02:00
parent 450472f74c
commit d894b60fd4
14 changed files with 475 additions and 151 deletions

View File

@@ -35,6 +35,7 @@ import {
import {ApiError} from 'app/common/ApiError';
import {mapGetOrSet, MapWithTTL} from 'app/common/AsyncCreate';
import {AttachmentColumns, gatherAttachmentIds, getAttachmentColumns} from 'app/common/AttachmentColumns';
import {WebhookMessageType} from 'app/common/CommTypes';
import {
BulkAddRecord,
BulkRemoveRecord,
@@ -129,8 +130,7 @@ import {createAttachmentsIndex, DocStorage, REMOVE_UNUSED_ATTACHMENTS_DELAY} fro
import {expandQuery} from './ExpandedQuery';
import {GranularAccess, GranularAccessForBundle} from './GranularAccess';
import {OnDemandActions} from './OnDemandActions';
import {getLogMetaFromDocSession, getPubSubPrefix, getTelemetryMetaFromDocSession,
timeoutReached} from './serverUtils';
import {getLogMetaFromDocSession, getPubSubPrefix, getTelemetryMetaFromDocSession, timeoutReached} from './serverUtils';
import {findOrAddAllEnvelope, Sharing} from './Sharing';
import cloneDeep = require('lodash/cloneDeep');
import flatten = require('lodash/flatten');
@@ -1769,6 +1769,10 @@ export class ActiveDoc extends EventEmitter {
await this._triggers.clearWebhookQueue();
}
public async clearSingleWebhookQueue(webhookId: string) {
await this._triggers.clearSingleWebhookQueue(webhookId);
}
/**
* Returns the list of outgoing webhook for a table in this document.
*/
@@ -1778,13 +1782,13 @@ export class ActiveDoc extends EventEmitter {
/**
* Send a message to clients connected to the document that something
* webhook-related has happened (a change in configuration, or a
* delivery, or an error). There is room to give details in future,
* if that proves useful, but for now no details are needed.
* webhook-related has happened (a change in configuration, a delivery,
* or an error). It passes information about the type of event (currently data being updated in some way
* or an OverflowError, i.e., too many events waiting to be sent). More data may be added when necessary.
*/
public async sendWebhookNotification() {
public async sendWebhookNotification(type: WebhookMessageType = WebhookMessageType.Update) {
await this.docClients.broadcastDocMessage(null, 'docChatter', {
webhooks: {},
webhooks: {type},
});
}

View File

@@ -763,6 +763,16 @@ export class DocWorkerApi {
// Clears a single webhook in the queue for this document.
this._app.delete('/api/docs/:docId/webhooks/queue/:webhookId', isOwner,
withDoc(async (activeDoc, req, res) => {
const webhookId = req.params.webhookId;
await activeDoc.clearSingleWebhookQueue(webhookId);
await activeDoc.sendWebhookNotification();
res.json({success: true});
})
);
// Lists all webhooks and their current status in the document.
this._app.get('/api/docs/:docId/webhooks', isOwner,
withDoc(async (activeDoc, req, res) => {

View File

@@ -3,6 +3,7 @@ import {summarizeAction} from 'app/common/ActionSummarizer';
import {ActionSummary, TableDelta} from 'app/common/ActionSummary';
import {ApiError} from 'app/common/ApiError';
import {MapWithTTL} from 'app/common/AsyncCreate';
import {WebhookMessageType} from "app/common/CommTypes";
import {fromTableDataAction, RowRecord, TableColValues, TableDataAction} from 'app/common/DocActions';
import {StringUnion} from 'app/common/StringUnion';
import {MetaRowRecord} from 'app/common/TableData';
@@ -234,7 +235,9 @@ export class DocTriggers {
// Prevent further document activity while the queue is too full.
while (this._drainingQueue && !this._shuttingDown) {
await delayAbort(1000, this._loopAbort?.signal);
const sendNotificationPromise = this._activeDoc.sendWebhookNotification(WebhookMessageType.Overflow);
const delayPromise = delayAbort(5000, this._loopAbort?.signal);
await Promise.all([sendNotificationPromise, delayPromise]);
}
return summary;
@@ -335,6 +338,36 @@ export class DocTriggers {
await this._stats.clear();
}
public async clearSingleWebhookQueue(webhookId: string) {
// Make sure we are after start and in sync with redis.
if (this._getRedisQueuePromise) {
await this._getRedisQueuePromise;
}
// Clear in-memory queue for given webhook key.
let removed = 0;
for(let i=0; i< this._webHookEventQueue.length; i++){
if(this._webHookEventQueue[i].id == webhookId){
this._webHookEventQueue.splice(i, 1);
removed++;
}
}
// Notify the loop that it should restart.
this._loopAbort?.abort();
// If we have backup in redis, clear it also.
// NOTE: this is subject to a race condition, currently it is not possible, but any future modification probably
// will require some kind of locking over the queue (or a rewrite)
if (removed && this._redisClient) {
const multi = this._redisClient.multi();
multi.del(this._redisQueueKey);
// Re-add all the remaining events to the queue.
const strings = this._webHookEventQueue.map(e => JSON.stringify(e));
multi.rpush(this._redisQueueKey, ...strings);
await multi.execAsync();
}
await this._stats.clear();
}
// Converts a table to tableId by looking it up in _grist_Tables.
private _getTableId(rowId: number) {
const docData = this._activeDoc.docData;