diff --git a/app/gen-server/lib/DocApiForwarder.ts b/app/gen-server/lib/DocApiForwarder.ts index 6ffc4c1e..8770c5a2 100644 --- a/app/gen-server/lib/DocApiForwarder.ts +++ b/app/gen-server/lib/DocApiForwarder.ts @@ -56,6 +56,7 @@ export class DocApiForwarder { app.use('/api/docs/:docId/compare', withDoc); app.use('/api/docs/:docId/assign', withDocWithoutAuth); app.use('/api/docs/:docId/webhooks/queue', withDoc); + app.use('/api/docs/:docId/webhooks', withDoc); app.use('^/api/docs$', withoutDoc); } diff --git a/app/gen-server/lib/HomeDBManager.ts b/app/gen-server/lib/HomeDBManager.ts index f22e4cac..7355e7f2 100644 --- a/app/gen-server/lib/HomeDBManager.ts +++ b/app/gen-server/lib/HomeDBManager.ts @@ -1807,18 +1807,23 @@ export class HomeDBManager extends EventEmitter { return secret?.value; } - public async removeWebhook(id: string, docId: string, unsubscribeKey: string): Promise { - if (!(id && unsubscribeKey)) { - throw new ApiError('Bad request: id and unsubscribeKey both required', 400); + public async removeWebhook(id: string, docId: string, unsubscribeKey: string, checkKey: boolean): Promise { + if (!id) { + throw new ApiError('Bad request: id required', 400); + } + if (!unsubscribeKey && checkKey) { + throw new ApiError('Bad request: unsubscribeKey required', 400); } return await this._connection.transaction(async manager => { - const secret = await this.getSecret(id, docId, manager); - if (!secret) { - throw new ApiError('Webhook with given id not found', 404); - } - const webhook = JSON.parse(secret) as WebHookSecret; - if (webhook.unsubscribeKey !== unsubscribeKey) { - throw new ApiError('Wrong unsubscribeKey', 401); + if (checkKey) { + const secret = await this.getSecret(id, docId, manager); + if (!secret) { + throw new ApiError('Webhook with given id not found', 404); + } + const webhook = JSON.parse(secret) as WebHookSecret; + if (webhook.unsubscribeKey !== unsubscribeKey) { + throw new ApiError('Wrong unsubscribeKey', 401); + } } await manager.createQueryBuilder() .delete() diff --git a/app/server/lib/ActiveDoc.ts b/app/server/lib/ActiveDoc.ts index 53299193..5c43f00e 100644 --- a/app/server/lib/ActiveDoc.ts +++ b/app/server/lib/ActiveDoc.ts @@ -1727,6 +1727,13 @@ export class ActiveDoc extends EventEmitter { await this._triggers.clearWebhookQueue(); } + /** + * Returns the list of outgoing webhook for a table in this document. + */ + public async webhooksSummary() { + return this._triggers.summary(); + } + /** * Loads an open document from DocStorage. Returns a list of the tables it contains. */ diff --git a/app/server/lib/DocApi.ts b/app/server/lib/DocApi.ts index 1fea4a71..9dca4b37 100644 --- a/app/server/lib/DocApi.ts +++ b/app/server/lib/DocApi.ts @@ -589,31 +589,32 @@ export class DocWorkerApi { withDoc(async (activeDoc, req, res) => { const metaTables = await getMetaTables(activeDoc, req); const tableRef = tableIdToRef(metaTables, req.params.tableId); - const {triggerId, unsubscribeKey, webhookId} = req.body; + const {unsubscribeKey, webhookId} = req.body as WebhookSubscription; // Validate combination of triggerId, webhookId, and tableRef. // This is overly strict, webhookId should be enough, // but it should be easy to relax that later if we want. const [, , triggerRowIds, triggerColData] = metaTables._grist_Triggers; - const triggerRowIndex = triggerRowIds.indexOf(triggerId); + const triggerRowIndex = triggerColData.actions.findIndex(a => { + const actions: any[] = JSON.parse((a || '[]') as string); + return actions.some(action => action.id === webhookId && action?.type === "webhook"); + }); if (triggerRowIndex === -1) { - throw new ApiError(`Trigger not found "${triggerId}"`, 404); + throw new ApiError(`Webhook not found "${webhookId || ''}"`, 404); } if (triggerColData.tableRef[triggerRowIndex] !== tableRef) { throw new ApiError(`Wrong table`, 400); } - const actions = JSON.parse(triggerColData.actions[triggerRowIndex] as string); - if (!_.find(actions, {type: "webhook", id: webhookId})) { - throw new ApiError(`Webhook not found "${webhookId}"`, 404); - } + const triggerRowId = triggerRowIds[triggerRowIndex]; + const checkKey = !(await this._isOwner(req)); // Validate unsubscribeKey before deleting trigger from document - await this._dbManager.removeWebhook(webhookId, activeDoc.docName, unsubscribeKey); + await this._dbManager.removeWebhook(webhookId, activeDoc.docName, unsubscribeKey, checkKey); // TODO handle trigger containing other actions when that becomes possible await handleSandboxError("_grist_Triggers", [], activeDoc.applyUserActions( docSessionFromRequest(req), - [['RemoveRecord', "_grist_Triggers", triggerId]])); + [['RemoveRecord', "_grist_Triggers", triggerRowId]])); res.json({success: true}); }) @@ -627,6 +628,13 @@ export class DocWorkerApi { }) ); + // Lists all webhooks and their current status in the document. + this._app.get('/api/docs/:docId/webhooks', isOwner, + withDoc(async (activeDoc, req, res) => { + res.json(await activeDoc.webhooksSummary()); + }) + ); + // Reload a document forcibly (in fact this closes the doc, it will be automatically // reopened on use). this._app.post('/api/docs/:docId/force-reload', canEdit, throttled(async (req, res) => { @@ -1429,3 +1437,8 @@ export function getDocApiUsageKeysToIncr( } // Usage exceeded all the time buckets, so return undefined to reject the request. } + +export interface WebhookSubscription { + unsubscribeKey: string; + webhookId: string; +} diff --git a/app/server/lib/Triggers.ts b/app/server/lib/Triggers.ts index 02c7f2cc..e20114ab 100644 --- a/app/server/lib/Triggers.ts +++ b/app/server/lib/Triggers.ts @@ -5,6 +5,7 @@ import {fromTableDataAction, RowRecord, TableColValues, TableDataAction} from 'a import {StringUnion} from 'app/common/StringUnion'; import {MetaRowRecord} from 'app/common/TableData'; import {CellDelta} from 'app/common/TabularDiff'; +import {decodeObject} from 'app/plugin/objtypes'; import {summarizeAction} from 'app/server/lib/ActionSummary'; import {ActiveDoc} from 'app/server/lib/ActiveDoc'; import {makeExceptionalDocSession} from 'app/server/lib/DocSession'; @@ -34,6 +35,44 @@ type RecordDeltas = Map; // Union discriminated by type type TriggerAction = WebhookAction | PythonAction; +type WebhookBatchStatus = 'success'|'failure'|'rejected'; +type WebhookStatus = 'idle'|'sending'|'retrying'|'postponed'|'error'; + +export interface WebhookSummary { + id: string; + fields: { + url: string; + unsubscribeKey: string; + eventTypes: string[]; + isReadyColumn?: string|null; + tableId: string; + enabled: boolean; + }, + usage: WebhookUsage|null, +} + +interface WebhookUsage { + // As minimum we need number of waiting events and status (by default pending). + numWaiting: number, + status: WebhookStatus; + updatedTime?: number|null; + lastSuccessTime?: number|null; + lastFailureTime?: number|null; + lastErrorMessage?: string|null; + lastHttpStatus?: number|null; + lastEventBatch?: null | { + size: number; + errorMessage: string|null; + httpStatus: number|null; + status: WebhookBatchStatus; + attempts: number; + }, + numSuccess?: { + pastHour: number; + past24Hours: number; + }, +} + export interface WebhookAction { type: "webhook"; id: string; @@ -69,14 +108,20 @@ interface Task { recordDeltas: RecordDeltas; } -const MAX_QUEUE_SIZE = 1000; +const MAX_QUEUE_SIZE = + process.env.GRIST_MAX_QUEUE_SIZE ? parseInt(process.env.GRIST_MAX_QUEUE_SIZE, 10) : 1000; -const WEBHOOK_CACHE_TTL = 10000; +const WEBHOOK_CACHE_TTL = 10_000; + +const WEBHOOK_STATS_CACHE_TTL = 1000 /*s*/ * 60 /*m*/ * 24/*h*/; // A time to wait for between retries of a webhook. Exposed for tests. const TRIGGER_WAIT_DELAY = process.env.GRIST_TRIGGER_WAIT_DELAY ? parseInt(process.env.GRIST_TRIGGER_WAIT_DELAY, 10) : 1000; +const TRIGGER_MAX_ATTEMPTS = + process.env.GRIST_TRIGGER_MAX_ATTEMPTS ? parseInt(process.env.GRIST_TRIGGER_MAX_ATTEMPTS, 10) : 20; + // Processes triggers for records changed as described in action bundles. // initiating webhooks and automations. // The interesting stuff starts in the handle() method. @@ -115,6 +160,8 @@ export class DocTriggers { // Abort controller for the loop that sends webhooks. private _loopAbort: AbortController|undefined; + private _stats: WebhookStatistics; + constructor(private _activeDoc: ActiveDoc) { const redisUrl = process.env.REDIS_URL; if (redisUrl) { @@ -122,6 +169,7 @@ export class DocTriggers { // to quit it afterwards and avoid keeping a client open for documents without triggers. this._getRedisQueuePromise = this._getRedisQueue(createClient(redisUrl)); } + this._stats = new WebhookStatistics(this._docId, () => this._redisClient ?? null); } public shutdown() { @@ -225,6 +273,63 @@ export class DocTriggers { return summary; } + /** + * Creates summary for all webhooks in the document. + */ + public async summary(): Promise { + // Prepare some data we will use. + const docData = this._activeDoc.docData!; + const triggersTable = docData.getMetaTable("_grist_Triggers"); + const getTableId = docData.getMetaTable("_grist_Tables").getRowPropFunc("tableId"); + const getColId = docData.getMetaTable("_grist_Tables_column").getRowPropFunc("colId"); + const getUrl = async (id: string) => (await this._getWebHook(id))?.url ?? ''; + const getUnsubscribeKey = async (id: string) => (await this._getWebHook(id))?.unsubscribeKey ?? ''; + const result: WebhookSummary[] = []; + + // Go through all triggers int the document that we have. + for (const t of triggersTable.getRecords()) { + // Each trigger has associated table and a bunch of trigger actions (currently only 1 that is webhook). + const actions = JSON.parse(t.actions) as TriggerAction[]; + // Get only webhooks for this trigger. + const webhookActions = actions.filter(act => act.type === "webhook") as WebhookAction[]; + for (const act of webhookActions) { + // Url, probably should be hidden for non-owners (but currently this API is owners only). + const url = await getUrl(act.id); + // Same story, should be hidden. + const unsubscribeKey = await getUnsubscribeKey(act.id); + if (!url || !unsubscribeKey) { + // Webhook might have been deleted in the mean time. + continue; + } + // Report some basic info and usage stats. + const entry: WebhookSummary = { + // Id of the webhook + id: act.id, + fields: { + // Url, probably should be hidden for non-owners (but currently this API is owners only). + url, + unsubscribeKey, + // Other fields used to register this webhook. + eventTypes: decodeObject(t.eventTypes) as string[], + isReadyColumn: getColId(t.isReadyColRef) ?? null, + tableId: getTableId(t.tableRef) ?? null, + // For future use - for now every webhook is enabled. + enabled: true, + }, + // Create some statics and status info. + usage: await this._stats.getUsage(act.id, this._webHookEventQueue), + }; + result.push(entry); + } + } + return result; + } + + public webhookDeleted(id: string) { + // We can't do much about that as the loop might be in progress and it is not safe to modify the queue. + // But we can clear the webHook cache, so that the next time we check the webhook url it will be gone. + this._webhookCache.delete(id); + } public async clearWebhookQueue() { // Make sure we are after start and in sync with redis. @@ -239,8 +344,9 @@ export class DocTriggers { // NOTE: this is subject to a race condition, currently it is not possible, but any future modification probably // will require some kind of locking over the queue (or a rewrite) if (removed && this._redisClient) { - await this._redisClient.multi().ltrim(this._redisQueueKey, 0, -1).execAsync(); + await this._redisClient.multi().del(this._redisQueueKey).execAsync(); } + await this._stats.clear(); } private get _docId() { @@ -409,7 +515,7 @@ export class DocTriggers { return trigger.eventTypes!.includes(eventType); } - private async _getWebHookUrl(id: string): Promise { + private async _getWebHook(id: string): Promise { let webhook = this._webhookCache.get(id); if (!webhook) { const secret = await this._activeDoc.getHomeDbManager()?.getSecret(id, this._docId); @@ -420,8 +526,13 @@ export class DocTriggers { webhook = JSON.parse(secret); this._webhookCache.set(id, webhook!); } - const url = webhook!.url; + return webhook!; + } + + private async _getWebHookUrl(id: string): Promise { + const url = (await this._getWebHook(id))?.url ?? ''; if (!isUrlAllowed(url)) { + // TODO: this is not a good place for a validation. this._log(`Webhook not sent to forbidden URL`, {level: 'warn', url}); return; } @@ -466,9 +577,13 @@ export class DocTriggers { if (!url) { success = true; } else { + await this._stats.logStatus(id, 'sending'); meta = {numEvents: batch.length, webhookId: id, host: new URL(url).host}; this._log("Sending batch of webhook events", meta); - success = await this._sendWebhookWithRetries(url, body, this._loopAbort.signal); + success = await this._sendWebhookWithRetries(id, url, body, batch.length, this._loopAbort.signal); + if (this._loopAbort.signal.aborted) { + continue; + } } if (this._loopAbort.signal.aborted) { @@ -493,9 +608,18 @@ export class DocTriggers { const strings = batch.map(e => JSON.stringify(e)); multi.rpush(this._redisQueueKey, ...strings); } + // We are postponed, so mark that. + await this._stats.logStatus(id, 'postponed'); + } else { + // We are draining the queue and we skipped some events, so mark that. + await this._stats.logStatus(id, 'error'); + await this._stats.logBatch(id, 'rejected'); + } + } else { + await this._stats.logStatus(id, 'idle'); + if (meta) { + this._log("Successfully sent batch of webhook events", meta); } - } else if (meta) { - this._log("Successfully sent batch of webhook events", meta); } await multi?.execAsync(); @@ -525,17 +649,21 @@ export class DocTriggers { if (this._shuttingDown) { return 0; } - return this._drainingQueue ? 5 : 20; + return this._drainingQueue ? Math.min(5, TRIGGER_MAX_ATTEMPTS) : TRIGGER_MAX_ATTEMPTS; } - private async _sendWebhookWithRetries(url: string, body: string, signal: AbortSignal) { + private async _sendWebhookWithRetries(id: string, url: string, body: string, size: number, signal: AbortSignal) { const maxWait = 64; let wait = 1; + let now = Date.now(); for (let attempt = 0; attempt < this._maxWebhookAttempts; attempt++) { if (this._shuttingDown) { return false; } try { + if (attempt > 0) { + await this._stats.logStatus(id, 'retrying'); + } const response = await fetch(url, { method: 'POST', body, @@ -544,11 +672,25 @@ export class DocTriggers { }, signal, }); + now = Date.now(); if (response.status === 200) { + await this._stats.logBatch(id, 'success', now, { size, httpStatus: 200, error: null, attempts: attempt + 1 }); return true; } + await this._stats.logBatch(id, 'failure', now, { + httpStatus: response.status, + error: await response.text(), + attempts: attempt + 1, + size, + }); this._log(`Webhook responded with non-200 status`, {level: 'warn', status: response.status, attempt}); } catch (e) { + await this._stats.logBatch(id, 'failure', now, { + httpStatus: null, + error: (e.message || 'Unrecognized error during fetch'), + attempts: attempt + 1, + size, + }); this._log(`Webhook sending error: ${e}`, {level: 'warn', attempt}); } @@ -569,6 +711,7 @@ export class DocTriggers { try { await delayAbort(TRIGGER_WAIT_DELAY, signal); } catch (e) { + // If signal was aborted, don't log anything as we probably was cleared. return false; } } @@ -598,3 +741,192 @@ export function isUrlAllowed(urlString: string) { domain && matchesBaseDomain(url.host, domain) ); } + + +/** + * Implementation detail, helper to provide a persisted storage to a derived class. + */ +class PersistedStore { + /** In memory fallback if redis is not available */ + private _statsCache = new MapWithTTL(WEBHOOK_STATS_CACHE_TTL); + private _redisKey: string; + + constructor( + docId: string, + private _redisClientDep: () => RedisClient | null + ) { + this._redisKey = `webhooks:${docId}:statistics`; + } + + public async clear() { + this._statsCache.clear(); + if (this._redisClient) { + await this._redisClient.delAsync(this._redisKey).catch(() => {}); + } + } + + protected async set(id: string, keyValues: [Keys, string][]) { + if (this._redisClient) { + const multi = this._redisClient.multi(); + for (const [key, value] of keyValues) { + multi.hset(this._redisKey, `${id}:${key}`, value); + multi.expire(this._redisKey, WEBHOOK_STATS_CACHE_TTL); + } + await multi.execAsync(); + } else { + for (const [key, value] of keyValues) { + this._statsCache.set(`${id}:${key}`, value); + } + } + } + + protected async get(id: string, keys: Keys[]): Promise<[Keys, string][]> { + if (this._redisClient) { + const values = (await this._redisClient.hgetallAsync(this._redisKey)) || {}; + return keys.map(key => [key, values[`${id}:${key}`] || '']); + } else { + return keys.map(key => [key, this._statsCache.get(`${id}:${key}`) || '']); + } + } + + private get _redisClient() { + return this._redisClientDep(); + } +} + +/** + * Helper class that monitors and saves (either in memory or in Redis) usage statics and current + * status of webhooks. + */ +class WebhookStatistics extends PersistedStore { + /** + * Retrieves and calculates all the statistics for a given webhook. + * @param id Webhook ID + * @param queue Current webhook task queue + */ + public async getUsage(id: string, queue: WebHookEvent[]): Promise { + // Get all the keys from the store for this webhook, and create a dictionary. + const values: Record = _.fromPairs(await this.get(id, [ + `batchStatus`, + `httpStatus`, + `errorMessage`, + `size`, + `status`, + `updatedTime`, + `lastFailureTime`, + `lastSuccessTime`, + `lastErrorMessage`, + `lastHttpStatus`, + `attempts`, + ])) as Record; + + // If everything is empty, we don't have any stats yet. + if (Array.from(Object.values(values)).every(v => !v)) { + return { + status: 'idle', + numWaiting: queue.filter(e => e.id === id).length, + lastEventBatch: null, + }; + } + + const usage: WebhookUsage = { + // Overall status of the webhook. + status: values.status as WebhookStatus || 'idle', + numWaiting: queue.filter(x => x.id === id).length, + updatedTime: parseInt(values.updatedTime || "0", 10), + // Last values from batches. + lastEventBatch: null, + lastSuccessTime: parseInt(values.lastSuccessTime, 10), + lastFailureTime: parseInt(values.lastFailureTime, 10), + lastErrorMessage: values.lastErrorMessage || null, + lastHttpStatus: values.lastHttpStatus ? parseInt(values.lastHttpStatus, 10) : null, + }; + + // If we have a batchStatus (so we actually run it at least once - or it wasn't cleared). + if (values.batchStatus) { + usage.lastEventBatch = { + status: values.batchStatus as WebhookBatchStatus, + httpStatus: values.httpStatus ? parseInt(values.httpStatus || "0", 10) : null, + errorMessage: values.errorMessage || null, + size: parseInt(values.size || "0", 10), + attempts: parseInt(values.attempts|| "0", 10), + }; + } + + return usage; + } + + /** + * Logs a status of a webhook. Now is passed as a parameter so that updates that happen in almost the same + * millisecond were seen as the same update. + */ + public async logStatus(id: string, status: WebhookStatus, now?: number|null) { + await this.set(id, [ + ['status', status], + ['updatedTime', (now ?? Date.now()).toString()], + ]); + } + + /** + * Logs a status of the active batch. + */ + public async logBatch( + id: string, + status: WebhookBatchStatus, + now?: number|null, + stats?: { + httpStatus?: number|null, + error?: string|null, + size?: number|null, + attempts?: number|null, + } + ) { + now ??= Date.now(); + + // Update batchStats. + const batchStats: [StatsKey, string][] = [ + [`batchStatus`, status], + [`updatedTime`, now.toString()], + ]; + if (stats?.httpStatus !== undefined) { + batchStats.push([`httpStatus`, (stats.httpStatus || '').toString()]); + } + if (stats?.attempts !== undefined) { + batchStats.push([`attempts`, (stats.attempts || '0').toString()]); + } + if (stats?.error !== undefined) { + batchStats.push([`errorMessage`, stats?.error || '']); + } + if (stats?.size !== undefined) { + batchStats.push([`size`, (stats.size || '').toString()]); + } + + const batchSummary: [StatsKey, string][] = []; + // Update webhook stats. + if (status === 'success') { + batchSummary.push([`lastSuccessTime`, now.toString()]); + } else if (status === 'failure') { + batchSummary.push([`lastFailureTime`, now.toString()]); + } + if (stats?.error) { + batchSummary.push([`lastErrorMessage`, stats.error]); + } + if (stats?.httpStatus) { + batchSummary.push([`lastHttpStatus`, (stats.httpStatus || '').toString()]); + } + await this.set(id, batchStats.concat(batchSummary)); + } +} + +type StatsKey = + 'batchStatus' | + 'httpStatus' | + 'errorMessage' | + 'attempts' | + 'size'| + 'updatedTime' | + 'lastFailureTime' | + 'lastSuccessTime' | + 'lastErrorMessage' | + 'lastHttpStatus' | + 'status'; diff --git a/test/server/lib/DocApi.ts b/test/server/lib/DocApi.ts index 4e0006fb..e14e8e9e 100644 --- a/test/server/lib/DocApi.ts +++ b/test/server/lib/DocApi.ts @@ -1,16 +1,18 @@ import {ActionSummary} from 'app/common/ActionSummary'; import {BulkColValues, UserAction} from 'app/common/DocActions'; import {arrayRepeat} from 'app/common/gutil'; -import {DocState, UserAPIImpl} from 'app/common/UserAPI'; +import {DocAPI, DocState, UserAPIImpl} from 'app/common/UserAPI'; import {testDailyApiLimitFeatures} from 'app/gen-server/entity/Product'; import {AddOrUpdateRecord, Record as ApiRecord} from 'app/plugin/DocApiTypes'; import {CellValue, GristObjCode} from 'app/plugin/GristData'; import {applyQueryParameters, docApiUsagePeriods, docPeriodicApiUsageKey, - getDocApiUsageKeysToIncr} from 'app/server/lib/DocApi'; + getDocApiUsageKeysToIncr, WebhookSubscription} from 'app/server/lib/DocApi'; import log from 'app/server/lib/log'; +import {WebhookSummary} from 'app/server/lib/Triggers'; +import {waitForIt} from 'test/server/wait'; import {delayAbort, exitPromise} from 'app/server/lib/serverUtils'; import {connectTestingHooks, TestingHooksClient} from 'app/server/lib/TestingHooks'; -import axios, {AxiosResponse} from 'axios'; +import axios, {AxiosRequestConfig, AxiosResponse} from 'axios'; import {delay} from 'bluebird'; import * as bodyParser from 'body-parser'; import {assert} from 'chai'; @@ -59,7 +61,7 @@ let docs: TestServer; let userApi: UserAPIImpl; describe('DocApi', function() { - this.timeout(20000); + this.timeout(30000); testUtils.setTmpLogLevel('error'); const oldEnv = clone(process.env); @@ -2627,36 +2629,70 @@ function testDocApi() { await check({eventTypes: ["add"], url: "https://example.com", isReadyColumn: "bar"}, 404, `Column not found "bar"`); }); - it("POST /docs/{did}/tables/{tid}/_unsubscribe validates inputs", async function() { + async function userCheck(user: AxiosRequestConfig, requestBody: any, status: number, responseBody: any) { + const resp = await axios.post( + `${serverUrl}/api/docs/${docIds.Timesheets}/tables/Table1/_unsubscribe`, + requestBody, user + ); + assert.equal(resp.status, status); + if (status !== 200) { + responseBody = {error: responseBody}; + } + assert.deepEqual(resp.data, responseBody); + } + + it("POST /docs/{did}/tables/{tid}/_unsubscribe validates inputs for owners", async function() { const subscribeResponse = await axios.post( `${serverUrl}/api/docs/${docIds.Timesheets}/tables/Table1/_subscribe`, {eventTypes: ["add"], url: "https://example.com"}, chimpy ); assert.equal(subscribeResponse.status, 200); - const {triggerId, unsubscribeKey, webhookId} = subscribeResponse.data; + // Owner doesn't need unsubscribeKey. + const {webhookId} = subscribeResponse.data; - async function check(requestBody: any, status: number, responseBody: any) { - const resp = await axios.post( - `${serverUrl}/api/docs/${docIds.Timesheets}/tables/Table1/_unsubscribe`, - requestBody, chimpy - ); - assert.equal(resp.status, status); - if (status !== 200) { - responseBody = {error: responseBody}; - } - assert.deepEqual(resp.data, responseBody); - } + const check = userCheck.bind(null, chimpy); + + await check({webhookId: "foo"}, 404, `Webhook not found "foo"`); + await check({}, 404, `Webhook not found ""`); + + // Actually unsubscribe + await check({webhookId}, 200, {success: true}); + + // Trigger is now deleted! + await check({webhookId}, 404, `Webhook not found "${webhookId}"`); + }); + + it("POST /docs/{did}/tables/{tid}/_unsubscribe validates inputs for editors", async function() { + const subscribeResponse = await axios.post( + `${serverUrl}/api/docs/${docIds.Timesheets}/tables/Table1/_subscribe`, + {eventTypes: ["add"], url: "https://example.com"}, chimpy + ); + assert.equal(subscribeResponse.status, 200); + // Editor needs unsubscribeKey. + const {unsubscribeKey, webhookId} = subscribeResponse.data; + + const delta = { + users: {"kiwi@getgrist.com": 'editors' as string|null} + }; + let accessResp = await axios.patch(`${homeUrl}/api/docs/${docIds.Timesheets}/access`, {delta}, chimpy); + assert.equal(accessResp.status, 200); + + const check = userCheck.bind(null, kiwi); - await check({triggerId: 999}, 404, `Trigger not found "999"`); - await check({triggerId, webhookId: "foo"}, 404, `Webhook not found "foo"`); - await check({triggerId, webhookId}, 400, 'Bad request: id and unsubscribeKey both required'); - await check({triggerId, webhookId, unsubscribeKey: "foo"}, 401, 'Wrong unsubscribeKey'); + await check({webhookId: "foo"}, 404, `Webhook not found "foo"`); + await check({webhookId}, 400, 'Bad request: unsubscribeKey required'); + await check({webhookId, unsubscribeKey: "foo"}, 401, 'Wrong unsubscribeKey'); // Actually unsubscribe - await check({triggerId, webhookId, unsubscribeKey}, 200, {success: true}); + await check({webhookId, unsubscribeKey}, 200, {success: true}); // Trigger is now deleted! - await check({triggerId, webhookId, unsubscribeKey}, 404, `Trigger not found "${triggerId}"`); + await check({webhookId, unsubscribeKey}, 404, `Webhook not found "${webhookId}"`); + + // Remove editor access + delta.users['kiwi@getgrist.com'] = null; + accessResp = await axios.patch(`${homeUrl}/api/docs/${docIds.Timesheets}/access`, {delta}, chimpy); + assert.equal(accessResp.status, 200); }); describe("Daily API Limit", () => { @@ -2893,13 +2929,67 @@ function testDocApi() { const notFoundCalled = signal(); const longStarted = signal(); const longFinished = signal(); + // /probe endpoint will return this status when aborted. + let probeStatus = 200; + let probeMessage: string|null = "OK"; // Create an abort controller for the latest request. We will // use it to abort the delay on the longEndpoint. let controller = new AbortController(); + async function autoSubscribe( + endpoint: string, docId: string, options?: { + tableId?: string, + isReadyColumn?: string|null, + eventTypes?: string[] + }) { + // Subscribe helper that returns a method to unsubscribe. + const data = await subscribe(endpoint, docId, options); + return () => unsubscribe(docId, data, options?.tableId ?? 'Table1'); + } + + function unsubscribe(docId: string, data: any, tableId = 'Table1') { + return axios.post( + `${serverUrl}/api/docs/${docId}/tables/${tableId}/_unsubscribe`, + data, chimpy + ); + } + + async function subscribe(endpoint: string, docId: string, options?: { + tableId?: string, + isReadyColumn?: string|null, + eventTypes?: string[] + }) { + // Subscribe helper that returns a method to unsubscribe. + const {data, status} = await axios.post( + `${serverUrl}/api/docs/${docId}/tables/${options?.tableId ?? 'Table1'}/_subscribe`, + { + eventTypes: options?.eventTypes ?? ['add', 'update'], + url: `${serving.url}/${endpoint}`, + isReadyColumn: options?.isReadyColumn === undefined ? 'B' : options?.isReadyColumn + }, chimpy + ); + assert.equal(status, 200); + return data as WebhookSubscription; + } + + async function clearQueue(docId: string) { + const deleteResult = await axios.delete( + `${serverUrl}/api/docs/${docId}/webhooks/queue`, chimpy + ); + assert.equal(deleteResult.status, 200); + } + + async function readStats(docId: string): Promise { + const result = await axios.get( + `${serverUrl}/api/docs/${docId}/webhooks`, chimpy + ); + assert.equal(result.status, 200); + return result.data; + } + before(async function() { - this.timeout(20000); + this.timeout(30000); if (!process.env.TEST_REDIS_URL) { this.skip(); } requests = { "add,update": [], @@ -2925,6 +3015,22 @@ function testDocApi() { res.sendStatus(404); // Webhooks treats it as an error and will retry. Probably it shouldn't work this way. res.end(); }); + app.post('/probe', async ({body}, res) => { + longStarted.emit(body.map((r: any) => r.A)); + // We are scoping the controller to this call, so any subsequent + // call will have a new controller. Caller can save this value to abort the previous calls. + const scoped = new AbortController(); + controller = scoped; + try { + await delayAbort(20000, scoped.signal); // We don't expect to wait for this, we should be aborted + assert.fail('Should have been aborted'); + } catch(exc) { + res.status(probeStatus); + res.send(probeMessage); + res.end(); + longFinished.emit(body.map((r: any) => r.A)); + } + }); app.post('/long', async ({body}, res) => { longStarted.emit(body[0].A); // We are scoping the controller to this call, so any subsequent @@ -2932,7 +3038,7 @@ function testDocApi() { const scoped = new AbortController(); controller = scoped; try { - await delayAbort(2000, scoped.signal); // We don't expect to wait for this. + await delayAbort(20000, scoped.signal); // We don't expect to wait for this. res.sendStatus(200); res.end(); longFinished.emit(body[0].A); @@ -3107,31 +3213,16 @@ function testDocApi() { await axios.post(`${serverUrl}/api/docs/${docId}/apply`, [ ['ModifyColumn', 'Table1', 'B', {type: 'Bool'}], ], chimpy); - // Subscribe helper that returns a method to unsubscribe. - const subscribe = async (endpoint: string) => { - const {data, status} = await axios.post( - `${serverUrl}/api/docs/${docId}/tables/Table1/_subscribe`, - {eventTypes: ['add', 'update'], url: `${serving.url}/${endpoint}`, isReadyColumn: "B"}, chimpy - ); - assert.equal(status, 200); - return () => axios.post( - `${serverUrl}/api/docs/${docId}/tables/Table1/_unsubscribe`, - data, chimpy - ); - }; // Try to clear the queue, even if it is empty. - const deleteResult = await axios.delete( - `${serverUrl}/api/docs/${docId}/webhooks/queue`, chimpy - ); - assert.equal(deleteResult.status, 200); + await clearQueue(docId); const cleanup: (() => Promise)[] = []; // Subscribe a valid webhook endpoint. - cleanup.push(await subscribe('200')); + cleanup.push(await autoSubscribe('200', docId)); // Subscribe an invalid webhook endpoint. - cleanup.push(await subscribe('404')); + cleanup.push(await autoSubscribe('404', docId)); // Prepare signals, we will be waiting for those two to be called. successCalled.reset(); @@ -3166,9 +3257,8 @@ function testDocApi() { assert.isFalse(successCalled.called()); // Now reset the queue. - await axios.delete( - `${serverUrl}/api/docs/${docId}/webhooks/queue`, chimpy - ); + await clearQueue(docId); + assert.isFalse(successCalled.called()); assert.isFalse(notFoundCalled.called()); @@ -3192,13 +3282,11 @@ function testDocApi() { // Cleanup everything, we will now test request timeouts. await Promise.all(cleanup.map(fn => fn())).finally(() => cleanup.length = 0); - assert.isTrue((await axios.delete( - `${serverUrl}/api/docs/${docId}/webhooks/queue`, chimpy - )).status === 200); + await clearQueue(docId); // Create 2 webhooks, one that is very long. - cleanup.push(await subscribe('200')); - cleanup.push(await subscribe('long')); + cleanup.push(await autoSubscribe('200', docId)); + cleanup.push(await autoSubscribe('long', docId)); successCalled.reset(); longFinished.reset(); longStarted.reset(); @@ -3268,9 +3356,406 @@ function testDocApi() { // Cleanup all await Promise.all(cleanup.map(fn => fn())).finally(() => cleanup.length = 0); - assert.isTrue((await axios.delete( - `${serverUrl}/api/docs/${docId}/webhooks/queue`, chimpy - )).status === 200); + await clearQueue(docId); + }); + + it("should not call to a deleted webhook", async() => { + // Create a test document. + const ws1 = (await userApi.getOrgWorkspaces('current'))[0].id; + const docId = await userApi.newDoc({name: 'testdoc4'}, ws1); + const doc = userApi.getDocAPI(docId); + await axios.post(`${serverUrl}/api/docs/${docId}/apply`, [ + ['ModifyColumn', 'Table1', 'B', {type: 'Bool'}], + ], chimpy); + + // Subscribe to 2 webhooks, we will remove the second one. + const webhook1 = await autoSubscribe('probe', docId); + const webhook2 = await autoSubscribe('200', docId); + + probeStatus = 200; + successCalled.reset(); + longFinished.reset(); + // Trigger them. + await doc.addRows("Table1", { + A: [1], + B: [true], + }); + + // Wait for the first one to be called. + await longStarted.waitAndReset(); + // Now why we are on the call remove the second one. + // Check that it is queued. + const stats = await readStats(docId); + assert.equal(2, _.sum(stats.map(x => x.usage?.numWaiting ?? 0))); + await webhook2(); + // Let the first one finish. + controller.abort(); + await longFinished.waitAndReset(); + // The second one is not called. + assert.isFalse(successCalled.called()); + // Triggering next event, we will get only calls to the probe (first webhook). + await doc.addRows("Table1", { + A: [2], + B: [true], + }); + await longStarted.waitAndReset(); + controller.abort(); + await longFinished.waitAndReset(); + + // Unsubscribe. + await webhook1(); + }); + + describe("/webhooks endpoint", function() { + let docId: string; + let doc: DocAPI; + let stats: WebhookSummary[]; + before(async() => { + // Create a test document. + const ws1 = (await userApi.getOrgWorkspaces('current'))[0].id; + docId = await userApi.newDoc({name: 'testdoc2'}, ws1); + doc = userApi.getDocAPI(docId); + await axios.post(`${serverUrl}/api/docs/${docId}/apply`, [ + ['ModifyColumn', 'Table1', 'B', {type: 'Bool'}], + ], chimpy); + }); + + const waitForQueue = async (length: number) => { + await waitForIt(async () => { + stats = await readStats(docId); + assert.equal(length, _.sum(stats.map(x => x.usage?.numWaiting ?? 0))); + }, 1000, 200); + }; + + it("should return statistics", async() => { + await clearQueue(docId); + // Read stats, it should be empty. + assert.deepEqual(await readStats(docId), []); + // Now subscribe couple of webhooks. + const first = await subscribe('200', docId); + const second = await subscribe('404', docId); + // And compare stats. + assert.deepEqual(await readStats(docId), [ + { + id: first.webhookId, + fields : { + url: `${serving.url}/200`, + unsubscribeKey: first.unsubscribeKey, + eventTypes: ['add', 'update'], + enabled: true, + isReadyColumn: 'B', + tableId: 'Table1' + }, usage : { + status: 'idle', + numWaiting: 0, + lastEventBatch: null + } + }, + { + id: second.webhookId, + fields : { + url: `${serving.url}/404`, + unsubscribeKey: second.unsubscribeKey, + eventTypes: ['add', 'update'], + enabled: true, + isReadyColumn: 'B', + tableId: 'Table1' + }, usage : { + status: 'idle', + numWaiting: 0, + lastEventBatch: null + } + }, + ]); + + // We should be able to unsubscribe using info that we got. + await unsubscribe(docId, first); + await unsubscribe(docId, second); + assert.deepEqual(await readStats(docId), []); + + // Test that stats work when there is no ready column. + let unsubscribe1 = await autoSubscribe('200', docId, { isReadyColumn: null }); + assert.isNull((await readStats(docId))[0].fields.isReadyColumn); + await unsubscribe1(); + + // Now test that we receive some useful information and the state transition works. + unsubscribe1 = await autoSubscribe('probe', docId); + // Test also dates update. + let now = Date.now(); + // Webhook starts as idle (tested already). Now we will trigger it. + longStarted.reset(); + longFinished.reset(); + await doc.addRows("Table1", { + A: [1], + B: [true], + }); + // It will call our probe endpoint, so we will be able to see changes as they happen. + await longStarted.waitAndReset(); + stats = await readStats(docId); + assert.isNotNull(stats[0].usage); + assert.equal(stats[0].usage?.numWaiting, 1); + assert.equal(stats[0].usage?.status, 'sending'); + assert.isNotNull(stats[0].usage?.updatedTime); + assert.isAbove(stats[0].usage?.updatedTime ?? 0, now); + assert.isNull(stats[0].usage?.lastErrorMessage); + assert.isNull(stats[0].usage?.lastSuccessTime); + assert.isNull(stats[0].usage?.lastFailureTime); + assert.isNull(stats[0].usage?.lastHttpStatus); + assert.isNull(stats[0].usage?.lastEventBatch); + // Ok, we can return success now. + probeStatus = 200; + controller.abort(); + now = Date.now(); + await longFinished.waitAndReset(); + // After releasing the hook, we are not 100% sure stats are updated, so we will wait a bit. + // If we are checking stats while we are holding the hook (in the probe endpoint) it is safe + // to assume that stats are up to date. + await waitForIt(async () => { + stats = await readStats(docId); + assert.equal(stats[0].usage?.numWaiting, 0); + }, 1000, 200); + assert.equal(stats[0].usage?.numWaiting, 0); + assert.equal(stats[0].usage?.status, 'idle'); + assert.isAbove(stats[0].usage?.updatedTime ?? 0, now); + assert.isNull(stats[0].usage?.lastErrorMessage); + assert.isNull(stats[0].usage?.lastFailureTime); + assert.isAbove(stats[0].usage?.lastSuccessTime ?? 0, now); + assert.equal(stats[0].usage?.lastHttpStatus, 200); + assert.deepEqual(stats[0].usage?.lastEventBatch, { + status: 'success', + attempts: 1, + size: 1, + errorMessage: null, + httpStatus: 200, + }); + + // Now trigger the endpoint once again. + now = Date.now(); + await doc.addRows("Table1", { + A: [2], + B: [true], + }); + await longStarted.waitAndReset(); + // This time, return an error, so we will have another attempt. + probeStatus = 404; + probeMessage = null; + controller.abort(); + await longFinished.waitAndReset(); + // Wait for the second attempt. + await longStarted.waitAndReset(); + stats = await readStats(docId); + assert.equal(stats[0].usage?.numWaiting, 1); + assert.equal(stats[0].usage?.status, 'retrying'); + assert.isAbove(stats[0].usage?.updatedTime ?? 0, now); + // There was no body in the response yet. + assert.isNull(stats[0].usage?.lastErrorMessage); + // Now we have a failure, and the success was before. + assert.isAbove(stats[0].usage?.lastFailureTime ?? 0, now); + assert.isBelow(stats[0].usage?.lastSuccessTime ?? 0, now); + assert.equal(stats[0].usage?.lastHttpStatus, 404); + // Batch contains info about last attempt. + assert.deepEqual(stats[0].usage?.lastEventBatch, { + status: 'failure', + attempts: 1, + size: 1, + errorMessage: null, + httpStatus: 404, + }); + // Now make an error with some message. + probeStatus = 500; + probeMessage = 'Some error'; + controller.abort(); + await longFinished.waitAndReset(); + await longStarted.waitAndReset(); + // We have 3rd attempt, with an error message. + stats = await readStats(docId); + assert.equal(stats[0].usage?.numWaiting, 1); + assert.equal(stats[0].usage?.status, 'retrying'); + assert.equal(stats[0].usage?.lastHttpStatus, 500); + assert.equal(stats[0].usage?.lastErrorMessage, probeMessage); + assert.deepEqual(stats[0].usage?.lastEventBatch, { + status: 'failure', + attempts: 2, + size: 1, + errorMessage: probeMessage, + httpStatus: 500, + }); + // Now we will succeed. + probeStatus = 200; + controller.abort(); + await longFinished.waitAndReset(); + // Give it some time to update stats. + await waitForIt(async () => { + stats = await readStats(docId); + assert.equal(stats[0].usage?.numWaiting, 0); + }, 1000, 200); + stats = await readStats(docId); + assert.equal(stats[0].usage?.numWaiting, 0); + assert.equal(stats[0].usage?.status, 'idle'); + assert.equal(stats[0].usage?.lastHttpStatus, 200); + assert.equal(stats[0].usage?.lastErrorMessage, probeMessage); + assert.isAbove(stats[0].usage?.lastFailureTime ?? 0, now); + assert.isAbove(stats[0].usage?.lastSuccessTime ?? 0, now); + assert.deepEqual(stats[0].usage?.lastEventBatch, { + status: 'success', + attempts: 3, + size: 1, + // Errors are cleared. + errorMessage: null, + httpStatus: 200, + }); + // Clear everything. + await clearQueue(docId); + stats = await readStats(docId); + assert.isNotNull(stats[0].usage); + assert.equal(stats[0].usage?.numWaiting, 0); + assert.equal(stats[0].usage?.status, 'idle'); + // Now pile some events with two webhooks to the probe. + const unsubscribe2 = await autoSubscribe('probe', docId); + await doc.addRows("Table1", { + A: [3], + B: [true], + }); + await doc.addRows("Table1", { + A: [4], + B: [true], + }); + await doc.addRows("Table1", { + A: [5], + B: [true], + }); + assert.deepEqual(await longStarted.waitAndReset(), [3]); + stats = await readStats(docId); + assert.lengthOf(stats, 2); + // First one is pending and second one didn't have a chance to be executed yet. + assert.equal(stats[0].usage?.status, 'sending'); + assert.equal(stats[1].usage?.status, 'idle'); + assert.isNull(stats[0].usage?.lastEventBatch); + assert.isNull(stats[1].usage?.lastEventBatch); + assert.equal(6, _.sum(stats.map(x => x.usage?.numWaiting ?? 0))); + // Now let them finish in deterministic order. + controller.abort(); + assert.deepEqual(await longFinished.waitAndReset(), [3]); + // We had 6 events to go, we've just finished the first one. + const nextPass = async (length: number, A: number) => { + assert.deepEqual(await longStarted.waitAndReset(), [A]); + stats = await readStats(docId); + assert.equal(length, _.sum(stats.map(x => x.usage?.numWaiting ?? 0))); + controller.abort(); + assert.deepEqual(await longFinished.waitAndReset(), [A]); + }; + // Now we have 5 events to go. + await nextPass(5, 3); + await nextPass(4, 4); + await nextPass(3, 4); + await nextPass(2, 5); + await nextPass(1, 5); + + await waitForQueue(0); + await unsubscribe2(); + await unsubscribe1(); + }); + + it("should monitor failures", async() => { + const webhook3 = await subscribe('probe', docId); + const webhook4 = await subscribe('probe', docId); + // Now we have two webhooks, both will fail, but the first one will + // be put in the idle state and server will start to send the second one. + probeStatus = 509; + probeMessage = "fail"; + await doc.addRows("Table1", { + A: [5], + B: [true], + }); + + const pass = async () => { + await longStarted.waitAndReset(); + controller.abort(); + await longFinished.waitAndReset(); + }; + // Server will retry this 4 times (GRIST_TRIGGER_MAX_ATTEMPTS = 4) + await pass(); + await pass(); + await pass(); + await pass(); + // And will fail, next it will call the second webhook. + await longStarted.waitAndReset(); + // Hold it a bit (by not aborting). + + // Read stats, first one is idle and has an error message, second one is active. + // (We don't need to wait - stats are up to date since triggers are waiting for us). + stats = await readStats(docId); + assert.equal(stats.length, 2); + assert.equal(stats[0].id, webhook3.webhookId); + assert.equal(stats[1].id, webhook4.webhookId); + assert.equal(stats[0].usage?.status, 'postponed'); + assert.equal(stats[1].usage?.status, 'sending'); + assert.equal(stats[0].usage?.numWaiting, 1); + assert.equal(stats[1].usage?.numWaiting, 1); + assert.equal(stats[0].usage?.lastErrorMessage, probeMessage); + assert.equal(stats[0].usage?.lastHttpStatus, 509); + assert.equal(stats[0].usage?.lastEventBatch?.status, "failure"); + assert.isNull(stats[1].usage?.lastErrorMessage); + + // We will now drain the queue, using the second webhook. + // First webhook is postponed, and the second is waiting for us. We have 2 events in total. + // To drain the queue, and cause this webhook to fail we will need to generate 10 more events, + // max queue since is 10, but if we generate exactly 10 events it will not work + // as the queue size will be 9 when the triggers decide to reject them. + await waitForQueue(2); + const addRowProm = doc.addRows("Table1", { + A: arrayRepeat(5, 100), // there are 2 webhooks, so 5 events per webhook. + B: arrayRepeat(5, true) + }).catch(() => {}); + // WARNING: we can't wait for it, as the Webhooks will literally stop the document, and wait + // for the queue to drain. So we will carefully go further, and wait for the queue to drain. + + // It will try 4 times before giving up (the first call is in progress) + probeStatus = 429; + // First. + controller.abort(); + await longFinished.waitAndReset(); + // Second and third. + await pass(); + await pass(); + + // Before the last one, we will wait for the add rows operation but in a different way. + // We will count how many webhook events were added so far, we should have 10 in total. + await waitForQueue(12); + // We are good to go, after trying for the 4th time it will gave up and remove this + // event from the queue. + await pass(); + + // Wait for the first webhook to start. + await longStarted.waitAndReset(); + + // And make sure we have info about rejected batch. + stats = await readStats(docId); + assert.equal(stats.length, 2); + assert.equal(stats[0].id, webhook3.webhookId); + assert.equal(stats[0].usage?.status, 'sending'); + assert.equal(stats[0].usage?.numWaiting, 6); + assert.equal(stats[0].usage?.lastErrorMessage, probeMessage); + assert.equal(stats[0].usage?.lastHttpStatus, 509); + + assert.equal(stats[1].id, webhook4.webhookId); + assert.equal(stats[1].usage?.status, 'error'); // webhook is in error state, some events were lost. + assert.equal(stats[1].usage?.lastEventBatch?.status, "rejected"); + assert.equal(stats[1].usage?.numWaiting, 5); // We skipped one event. + + // Now unfreeze document by handling all events (they are aligned so will be handled in just 2 batches, first + // one is already waiting in our /probe endpoint). + probeStatus = 200; + controller.abort(); + await longFinished.waitAndReset(); + await pass(); + await waitForQueue(0); + + // Now can wait for the rows to process. + await addRowProm; + await unsubscribe(docId, webhook3); + await unsubscribe(docId, webhook4); + }); }); }); @@ -3463,7 +3948,10 @@ class TestServer { APP_HOME_URL: _homeUrl, ALLOWED_WEBHOOK_DOMAINS: `example.com,localhost:${webhooksTestPort}`, GRIST_ALLOWED_HOSTS: `example.com,localhost`, - GRIST_TRIGGER_WAIT_DELAY: '200', + GRIST_TRIGGER_WAIT_DELAY: '100', + // this is calculated value, some tests expect 4 attempts and some will try 3 times + GRIST_TRIGGER_MAX_ATTEMPTS: '4', + GRIST_MAX_QUEUE_SIZE: '10', ...process.env }; @@ -3571,7 +4059,8 @@ function signal() { if (!promise) { throw new Error("signal.wait() called before signal.reset()"); } - return await promise; + const proms = Promise.race([promise, delay(2000).then(() => { throw new Error("signal.wait() timed out"); })]); + return await proms; }, async waitAndReset() { try {