(core) Adding /webhooks endpoint

Summary:
- New /webhooks event that lists all webhooks in a document (available for owners),
- Monitoring webhooks usage and saving it in memory or Redis,
- Loosening _usubscribe API endpoint, so that the information returned from the /webhook endpoint is enough to unsubscribe,
- Owners can remove webhook without the unsubscribe key.

The endpoint lists all webhooks that are registered in a document, not just webhooks from a single table.
There are two status fields. First for the webhook, second for the last request attempt.
Webhook can have 5 statuses: 'idle', 'sending', 'retrying', 'postponed', 'error', which roughly describes what the
sendLoop is currently doing. The 'error' status describes a situation when all request attempts failed and the queue needs
to be drained, so some requests were dropped.

The last request status can only be: 'success', 'failure' or 'rejected'. Rejected means that the last batch was dropped because the
queue was too long.

Test Plan: New and updated tests

Reviewers: paulfitz

Reviewed By: paulfitz

Differential Revision: https://phab.getgrist.com/D3727
This commit is contained in:
Jarosław Sadziński
2022-12-13 12:47:50 +01:00
parent e146f95c1c
commit 629fcccd5a
6 changed files with 932 additions and 85 deletions

View File

@@ -1727,6 +1727,13 @@ export class ActiveDoc extends EventEmitter {
await this._triggers.clearWebhookQueue();
}
/**
* Returns the list of outgoing webhook for a table in this document.
*/
public async webhooksSummary() {
return this._triggers.summary();
}
/**
* Loads an open document from DocStorage. Returns a list of the tables it contains.
*/

View File

@@ -589,31 +589,32 @@ export class DocWorkerApi {
withDoc(async (activeDoc, req, res) => {
const metaTables = await getMetaTables(activeDoc, req);
const tableRef = tableIdToRef(metaTables, req.params.tableId);
const {triggerId, unsubscribeKey, webhookId} = req.body;
const {unsubscribeKey, webhookId} = req.body as WebhookSubscription;
// Validate combination of triggerId, webhookId, and tableRef.
// This is overly strict, webhookId should be enough,
// but it should be easy to relax that later if we want.
const [, , triggerRowIds, triggerColData] = metaTables._grist_Triggers;
const triggerRowIndex = triggerRowIds.indexOf(triggerId);
const triggerRowIndex = triggerColData.actions.findIndex(a => {
const actions: any[] = JSON.parse((a || '[]') as string);
return actions.some(action => action.id === webhookId && action?.type === "webhook");
});
if (triggerRowIndex === -1) {
throw new ApiError(`Trigger not found "${triggerId}"`, 404);
throw new ApiError(`Webhook not found "${webhookId || ''}"`, 404);
}
if (triggerColData.tableRef[triggerRowIndex] !== tableRef) {
throw new ApiError(`Wrong table`, 400);
}
const actions = JSON.parse(triggerColData.actions[triggerRowIndex] as string);
if (!_.find(actions, {type: "webhook", id: webhookId})) {
throw new ApiError(`Webhook not found "${webhookId}"`, 404);
}
const triggerRowId = triggerRowIds[triggerRowIndex];
const checkKey = !(await this._isOwner(req));
// Validate unsubscribeKey before deleting trigger from document
await this._dbManager.removeWebhook(webhookId, activeDoc.docName, unsubscribeKey);
await this._dbManager.removeWebhook(webhookId, activeDoc.docName, unsubscribeKey, checkKey);
// TODO handle trigger containing other actions when that becomes possible
await handleSandboxError("_grist_Triggers", [], activeDoc.applyUserActions(
docSessionFromRequest(req),
[['RemoveRecord', "_grist_Triggers", triggerId]]));
[['RemoveRecord', "_grist_Triggers", triggerRowId]]));
res.json({success: true});
})
@@ -627,6 +628,13 @@ export class DocWorkerApi {
})
);
// Lists all webhooks and their current status in the document.
this._app.get('/api/docs/:docId/webhooks', isOwner,
withDoc(async (activeDoc, req, res) => {
res.json(await activeDoc.webhooksSummary());
})
);
// Reload a document forcibly (in fact this closes the doc, it will be automatically
// reopened on use).
this._app.post('/api/docs/:docId/force-reload', canEdit, throttled(async (req, res) => {
@@ -1429,3 +1437,8 @@ export function getDocApiUsageKeysToIncr(
}
// Usage exceeded all the time buckets, so return undefined to reject the request.
}
export interface WebhookSubscription {
unsubscribeKey: string;
webhookId: string;
}

View File

@@ -5,6 +5,7 @@ import {fromTableDataAction, RowRecord, TableColValues, TableDataAction} from 'a
import {StringUnion} from 'app/common/StringUnion';
import {MetaRowRecord} from 'app/common/TableData';
import {CellDelta} from 'app/common/TabularDiff';
import {decodeObject} from 'app/plugin/objtypes';
import {summarizeAction} from 'app/server/lib/ActionSummary';
import {ActiveDoc} from 'app/server/lib/ActiveDoc';
import {makeExceptionalDocSession} from 'app/server/lib/DocSession';
@@ -34,6 +35,44 @@ type RecordDeltas = Map<number, RecordDelta>;
// Union discriminated by type
type TriggerAction = WebhookAction | PythonAction;
type WebhookBatchStatus = 'success'|'failure'|'rejected';
type WebhookStatus = 'idle'|'sending'|'retrying'|'postponed'|'error';
export interface WebhookSummary {
id: string;
fields: {
url: string;
unsubscribeKey: string;
eventTypes: string[];
isReadyColumn?: string|null;
tableId: string;
enabled: boolean;
},
usage: WebhookUsage|null,
}
interface WebhookUsage {
// As minimum we need number of waiting events and status (by default pending).
numWaiting: number,
status: WebhookStatus;
updatedTime?: number|null;
lastSuccessTime?: number|null;
lastFailureTime?: number|null;
lastErrorMessage?: string|null;
lastHttpStatus?: number|null;
lastEventBatch?: null | {
size: number;
errorMessage: string|null;
httpStatus: number|null;
status: WebhookBatchStatus;
attempts: number;
},
numSuccess?: {
pastHour: number;
past24Hours: number;
},
}
export interface WebhookAction {
type: "webhook";
id: string;
@@ -69,14 +108,20 @@ interface Task {
recordDeltas: RecordDeltas;
}
const MAX_QUEUE_SIZE = 1000;
const MAX_QUEUE_SIZE =
process.env.GRIST_MAX_QUEUE_SIZE ? parseInt(process.env.GRIST_MAX_QUEUE_SIZE, 10) : 1000;
const WEBHOOK_CACHE_TTL = 10000;
const WEBHOOK_CACHE_TTL = 10_000;
const WEBHOOK_STATS_CACHE_TTL = 1000 /*s*/ * 60 /*m*/ * 24/*h*/;
// A time to wait for between retries of a webhook. Exposed for tests.
const TRIGGER_WAIT_DELAY =
process.env.GRIST_TRIGGER_WAIT_DELAY ? parseInt(process.env.GRIST_TRIGGER_WAIT_DELAY, 10) : 1000;
const TRIGGER_MAX_ATTEMPTS =
process.env.GRIST_TRIGGER_MAX_ATTEMPTS ? parseInt(process.env.GRIST_TRIGGER_MAX_ATTEMPTS, 10) : 20;
// Processes triggers for records changed as described in action bundles.
// initiating webhooks and automations.
// The interesting stuff starts in the handle() method.
@@ -115,6 +160,8 @@ export class DocTriggers {
// Abort controller for the loop that sends webhooks.
private _loopAbort: AbortController|undefined;
private _stats: WebhookStatistics;
constructor(private _activeDoc: ActiveDoc) {
const redisUrl = process.env.REDIS_URL;
if (redisUrl) {
@@ -122,6 +169,7 @@ export class DocTriggers {
// to quit it afterwards and avoid keeping a client open for documents without triggers.
this._getRedisQueuePromise = this._getRedisQueue(createClient(redisUrl));
}
this._stats = new WebhookStatistics(this._docId, () => this._redisClient ?? null);
}
public shutdown() {
@@ -225,6 +273,63 @@ export class DocTriggers {
return summary;
}
/**
* Creates summary for all webhooks in the document.
*/
public async summary(): Promise<WebhookSummary[]> {
// Prepare some data we will use.
const docData = this._activeDoc.docData!;
const triggersTable = docData.getMetaTable("_grist_Triggers");
const getTableId = docData.getMetaTable("_grist_Tables").getRowPropFunc("tableId");
const getColId = docData.getMetaTable("_grist_Tables_column").getRowPropFunc("colId");
const getUrl = async (id: string) => (await this._getWebHook(id))?.url ?? '';
const getUnsubscribeKey = async (id: string) => (await this._getWebHook(id))?.unsubscribeKey ?? '';
const result: WebhookSummary[] = [];
// Go through all triggers int the document that we have.
for (const t of triggersTable.getRecords()) {
// Each trigger has associated table and a bunch of trigger actions (currently only 1 that is webhook).
const actions = JSON.parse(t.actions) as TriggerAction[];
// Get only webhooks for this trigger.
const webhookActions = actions.filter(act => act.type === "webhook") as WebhookAction[];
for (const act of webhookActions) {
// Url, probably should be hidden for non-owners (but currently this API is owners only).
const url = await getUrl(act.id);
// Same story, should be hidden.
const unsubscribeKey = await getUnsubscribeKey(act.id);
if (!url || !unsubscribeKey) {
// Webhook might have been deleted in the mean time.
continue;
}
// Report some basic info and usage stats.
const entry: WebhookSummary = {
// Id of the webhook
id: act.id,
fields: {
// Url, probably should be hidden for non-owners (but currently this API is owners only).
url,
unsubscribeKey,
// Other fields used to register this webhook.
eventTypes: decodeObject(t.eventTypes) as string[],
isReadyColumn: getColId(t.isReadyColRef) ?? null,
tableId: getTableId(t.tableRef) ?? null,
// For future use - for now every webhook is enabled.
enabled: true,
},
// Create some statics and status info.
usage: await this._stats.getUsage(act.id, this._webHookEventQueue),
};
result.push(entry);
}
}
return result;
}
public webhookDeleted(id: string) {
// We can't do much about that as the loop might be in progress and it is not safe to modify the queue.
// But we can clear the webHook cache, so that the next time we check the webhook url it will be gone.
this._webhookCache.delete(id);
}
public async clearWebhookQueue() {
// Make sure we are after start and in sync with redis.
@@ -239,8 +344,9 @@ export class DocTriggers {
// NOTE: this is subject to a race condition, currently it is not possible, but any future modification probably
// will require some kind of locking over the queue (or a rewrite)
if (removed && this._redisClient) {
await this._redisClient.multi().ltrim(this._redisQueueKey, 0, -1).execAsync();
await this._redisClient.multi().del(this._redisQueueKey).execAsync();
}
await this._stats.clear();
}
private get _docId() {
@@ -409,7 +515,7 @@ export class DocTriggers {
return trigger.eventTypes!.includes(eventType);
}
private async _getWebHookUrl(id: string): Promise<string | undefined> {
private async _getWebHook(id: string): Promise<WebHookSecret | undefined> {
let webhook = this._webhookCache.get(id);
if (!webhook) {
const secret = await this._activeDoc.getHomeDbManager()?.getSecret(id, this._docId);
@@ -420,8 +526,13 @@ export class DocTriggers {
webhook = JSON.parse(secret);
this._webhookCache.set(id, webhook!);
}
const url = webhook!.url;
return webhook!;
}
private async _getWebHookUrl(id: string): Promise<string | undefined> {
const url = (await this._getWebHook(id))?.url ?? '';
if (!isUrlAllowed(url)) {
// TODO: this is not a good place for a validation.
this._log(`Webhook not sent to forbidden URL`, {level: 'warn', url});
return;
}
@@ -466,9 +577,13 @@ export class DocTriggers {
if (!url) {
success = true;
} else {
await this._stats.logStatus(id, 'sending');
meta = {numEvents: batch.length, webhookId: id, host: new URL(url).host};
this._log("Sending batch of webhook events", meta);
success = await this._sendWebhookWithRetries(url, body, this._loopAbort.signal);
success = await this._sendWebhookWithRetries(id, url, body, batch.length, this._loopAbort.signal);
if (this._loopAbort.signal.aborted) {
continue;
}
}
if (this._loopAbort.signal.aborted) {
@@ -493,9 +608,18 @@ export class DocTriggers {
const strings = batch.map(e => JSON.stringify(e));
multi.rpush(this._redisQueueKey, ...strings);
}
// We are postponed, so mark that.
await this._stats.logStatus(id, 'postponed');
} else {
// We are draining the queue and we skipped some events, so mark that.
await this._stats.logStatus(id, 'error');
await this._stats.logBatch(id, 'rejected');
}
} else {
await this._stats.logStatus(id, 'idle');
if (meta) {
this._log("Successfully sent batch of webhook events", meta);
}
} else if (meta) {
this._log("Successfully sent batch of webhook events", meta);
}
await multi?.execAsync();
@@ -525,17 +649,21 @@ export class DocTriggers {
if (this._shuttingDown) {
return 0;
}
return this._drainingQueue ? 5 : 20;
return this._drainingQueue ? Math.min(5, TRIGGER_MAX_ATTEMPTS) : TRIGGER_MAX_ATTEMPTS;
}
private async _sendWebhookWithRetries(url: string, body: string, signal: AbortSignal) {
private async _sendWebhookWithRetries(id: string, url: string, body: string, size: number, signal: AbortSignal) {
const maxWait = 64;
let wait = 1;
let now = Date.now();
for (let attempt = 0; attempt < this._maxWebhookAttempts; attempt++) {
if (this._shuttingDown) {
return false;
}
try {
if (attempt > 0) {
await this._stats.logStatus(id, 'retrying');
}
const response = await fetch(url, {
method: 'POST',
body,
@@ -544,11 +672,25 @@ export class DocTriggers {
},
signal,
});
now = Date.now();
if (response.status === 200) {
await this._stats.logBatch(id, 'success', now, { size, httpStatus: 200, error: null, attempts: attempt + 1 });
return true;
}
await this._stats.logBatch(id, 'failure', now, {
httpStatus: response.status,
error: await response.text(),
attempts: attempt + 1,
size,
});
this._log(`Webhook responded with non-200 status`, {level: 'warn', status: response.status, attempt});
} catch (e) {
await this._stats.logBatch(id, 'failure', now, {
httpStatus: null,
error: (e.message || 'Unrecognized error during fetch'),
attempts: attempt + 1,
size,
});
this._log(`Webhook sending error: ${e}`, {level: 'warn', attempt});
}
@@ -569,6 +711,7 @@ export class DocTriggers {
try {
await delayAbort(TRIGGER_WAIT_DELAY, signal);
} catch (e) {
// If signal was aborted, don't log anything as we probably was cleared.
return false;
}
}
@@ -598,3 +741,192 @@ export function isUrlAllowed(urlString: string) {
domain && matchesBaseDomain(url.host, domain)
);
}
/**
* Implementation detail, helper to provide a persisted storage to a derived class.
*/
class PersistedStore<Keys> {
/** In memory fallback if redis is not available */
private _statsCache = new MapWithTTL<string, string>(WEBHOOK_STATS_CACHE_TTL);
private _redisKey: string;
constructor(
docId: string,
private _redisClientDep: () => RedisClient | null
) {
this._redisKey = `webhooks:${docId}:statistics`;
}
public async clear() {
this._statsCache.clear();
if (this._redisClient) {
await this._redisClient.delAsync(this._redisKey).catch(() => {});
}
}
protected async set(id: string, keyValues: [Keys, string][]) {
if (this._redisClient) {
const multi = this._redisClient.multi();
for (const [key, value] of keyValues) {
multi.hset(this._redisKey, `${id}:${key}`, value);
multi.expire(this._redisKey, WEBHOOK_STATS_CACHE_TTL);
}
await multi.execAsync();
} else {
for (const [key, value] of keyValues) {
this._statsCache.set(`${id}:${key}`, value);
}
}
}
protected async get(id: string, keys: Keys[]): Promise<[Keys, string][]> {
if (this._redisClient) {
const values = (await this._redisClient.hgetallAsync(this._redisKey)) || {};
return keys.map(key => [key, values[`${id}:${key}`] || '']);
} else {
return keys.map(key => [key, this._statsCache.get(`${id}:${key}`) || '']);
}
}
private get _redisClient() {
return this._redisClientDep();
}
}
/**
* Helper class that monitors and saves (either in memory or in Redis) usage statics and current
* status of webhooks.
*/
class WebhookStatistics extends PersistedStore<StatsKey> {
/**
* Retrieves and calculates all the statistics for a given webhook.
* @param id Webhook ID
* @param queue Current webhook task queue
*/
public async getUsage(id: string, queue: WebHookEvent[]): Promise<WebhookUsage|null> {
// Get all the keys from the store for this webhook, and create a dictionary.
const values: Record<StatsKey, string> = _.fromPairs(await this.get(id, [
`batchStatus`,
`httpStatus`,
`errorMessage`,
`size`,
`status`,
`updatedTime`,
`lastFailureTime`,
`lastSuccessTime`,
`lastErrorMessage`,
`lastHttpStatus`,
`attempts`,
])) as Record<StatsKey, string>;
// If everything is empty, we don't have any stats yet.
if (Array.from(Object.values(values)).every(v => !v)) {
return {
status: 'idle',
numWaiting: queue.filter(e => e.id === id).length,
lastEventBatch: null,
};
}
const usage: WebhookUsage = {
// Overall status of the webhook.
status: values.status as WebhookStatus || 'idle',
numWaiting: queue.filter(x => x.id === id).length,
updatedTime: parseInt(values.updatedTime || "0", 10),
// Last values from batches.
lastEventBatch: null,
lastSuccessTime: parseInt(values.lastSuccessTime, 10),
lastFailureTime: parseInt(values.lastFailureTime, 10),
lastErrorMessage: values.lastErrorMessage || null,
lastHttpStatus: values.lastHttpStatus ? parseInt(values.lastHttpStatus, 10) : null,
};
// If we have a batchStatus (so we actually run it at least once - or it wasn't cleared).
if (values.batchStatus) {
usage.lastEventBatch = {
status: values.batchStatus as WebhookBatchStatus,
httpStatus: values.httpStatus ? parseInt(values.httpStatus || "0", 10) : null,
errorMessage: values.errorMessage || null,
size: parseInt(values.size || "0", 10),
attempts: parseInt(values.attempts|| "0", 10),
};
}
return usage;
}
/**
* Logs a status of a webhook. Now is passed as a parameter so that updates that happen in almost the same
* millisecond were seen as the same update.
*/
public async logStatus(id: string, status: WebhookStatus, now?: number|null) {
await this.set(id, [
['status', status],
['updatedTime', (now ?? Date.now()).toString()],
]);
}
/**
* Logs a status of the active batch.
*/
public async logBatch(
id: string,
status: WebhookBatchStatus,
now?: number|null,
stats?: {
httpStatus?: number|null,
error?: string|null,
size?: number|null,
attempts?: number|null,
}
) {
now ??= Date.now();
// Update batchStats.
const batchStats: [StatsKey, string][] = [
[`batchStatus`, status],
[`updatedTime`, now.toString()],
];
if (stats?.httpStatus !== undefined) {
batchStats.push([`httpStatus`, (stats.httpStatus || '').toString()]);
}
if (stats?.attempts !== undefined) {
batchStats.push([`attempts`, (stats.attempts || '0').toString()]);
}
if (stats?.error !== undefined) {
batchStats.push([`errorMessage`, stats?.error || '']);
}
if (stats?.size !== undefined) {
batchStats.push([`size`, (stats.size || '').toString()]);
}
const batchSummary: [StatsKey, string][] = [];
// Update webhook stats.
if (status === 'success') {
batchSummary.push([`lastSuccessTime`, now.toString()]);
} else if (status === 'failure') {
batchSummary.push([`lastFailureTime`, now.toString()]);
}
if (stats?.error) {
batchSummary.push([`lastErrorMessage`, stats.error]);
}
if (stats?.httpStatus) {
batchSummary.push([`lastHttpStatus`, (stats.httpStatus || '').toString()]);
}
await this.set(id, batchStats.concat(batchSummary));
}
}
type StatsKey =
'batchStatus' |
'httpStatus' |
'errorMessage' |
'attempts' |
'size'|
'updatedTime' |
'lastFailureTime' |
'lastSuccessTime' |
'lastErrorMessage' |
'lastHttpStatus' |
'status';