mirror of
https://github.com/gristlabs/grist-core.git
synced 2026-03-02 04:09:24 +00:00
(core) Initial webhooks implementation
Summary: See https://grist.quip.com/VKd3ASF99ezD/Outgoing-Webhooks - 2 new DocApi endpoints: _subscribe and _unsubscribe, not meant to be user friendly or publicly documented. _unsubscribe should be given the response from _subscribe in the body, e.g: ``` $ curl -X POST -H "Authorization: Bearer 8fd4dc59ecb05ab29ae5a183c03101319b8e6ca9" "http://localhost:8080/api/docs/6WYa23FqWxGNe3AR6DLjCJ/tables/Table2/_subscribe" -H "Content-type: application/json" -d '{"url": "https://webhook.site/a916b526-8afc-46e6-aa8f-a625d0d83ec3", "eventTypes": ["add"], "isReadyColumn": "C"}' {"unsubscribeKey":"3246f158-55b5-4fc7-baa5-093b75ffa86c","triggerId":2,"webhookId":"853b4bfa-9d39-4639-aa33-7d45354903c0"} $ curl -X POST -H "Authorization: Bearer 8fd4dc59ecb05ab29ae5a183c03101319b8e6ca9" "http://localhost:8080/api/docs/6WYa23FqWxGNe3AR6DLjCJ/tables/Table2/_unsubscribe" -H "Content-type: application/json" -d '{"unsubscribeKey":"3246f158-55b5-4fc7-baa5-093b75ffa86c","triggerId":2,"webhookId":"853b4bfa-9d39-4639-aa33-7d45354903c0"}' {"success":true} ``` - New DB entity Secret to hold the webhook URL and unsubscribe key - New document metatable _grist_Triggers subscribes to table changes and points to a secret to use for a webhook - New file Triggers.ts processes action summaries and uses the two new tables to send webhooks. - Also went on a bit of a diversion and made a typesafe subclass of TableData for metatables. I think this is essentially good enough for a first diff, to keep the diffs manageable and to talk about the overall structure. Future diffs can add tests and more robustness using redis etc. After this diff I can also start building the Zapier integration privately. Test Plan: Tested manually: see curl commands in summary for an example. Payloads can be seen in https://webhook.site/#!/a916b526-8afc-46e6-aa8f-a625d0d83ec3/0b9fe335-33f7-49fe-b90b-2db5ba53382d/1 . Great site for testing webhooks btw. Reviewers: dsagal, paulfitz Reviewed By: paulfitz Differential Revision: https://phab.getgrist.com/D3019
This commit is contained in:
276
app/server/lib/Triggers.ts
Normal file
276
app/server/lib/Triggers.ts
Normal file
@@ -0,0 +1,276 @@
|
||||
import {ActionSummary, TableDelta} from "app/common/ActionSummary";
|
||||
import {delay} from "app/common/delay";
|
||||
import {fromTableDataAction, TableColValues} from "app/common/DocActions";
|
||||
import {StringUnion} from "app/common/StringUnion";
|
||||
import {MetaRowRecord} from "app/common/TableData";
|
||||
import {CellValue} from "app/plugin/GristData";
|
||||
import {ActiveDoc} from "app/server/lib/ActiveDoc";
|
||||
import {makeExceptionalDocSession} from "app/server/lib/DocSession";
|
||||
import * as log from "app/server/lib/log";
|
||||
import * as _ from "lodash";
|
||||
import * as LRUCache from "lru-cache";
|
||||
import fetch from "node-fetch";
|
||||
|
||||
// TODO replace with redis
|
||||
// Keeps track of whether records existed before changes to them started
|
||||
// to determine the correct event type when the record is ready
|
||||
const existedBeforeMemory: { [key: string]: boolean } = {};
|
||||
|
||||
// Only owners can manage triggers, but any user's activity can trigger them
|
||||
// and the corresponding actions get the full values
|
||||
const docSession = makeExceptionalDocSession('system');
|
||||
|
||||
// DB cache for webhook secrets
|
||||
const webhookCache = new LRUCache<{ id: string, docId: string }, WebHookSecret>({max: 10 * 1000});
|
||||
|
||||
// Describes the change in existence to a record, which determines the event type
|
||||
interface RecordDelta {
|
||||
existedBefore: boolean;
|
||||
existedAfter: boolean;
|
||||
}
|
||||
|
||||
// Union discriminated by type
|
||||
type TriggerAction = WebhookAction | PythonAction;
|
||||
|
||||
export interface WebhookAction {
|
||||
type: "webhook";
|
||||
id: string;
|
||||
}
|
||||
|
||||
// Just hypothetical
|
||||
interface PythonAction {
|
||||
type: "python";
|
||||
code: string;
|
||||
}
|
||||
|
||||
// Payload sent to webhook
|
||||
// Simply the values in a record
|
||||
interface Event {
|
||||
[colId: string]: CellValue;
|
||||
}
|
||||
|
||||
export const allowedEventTypes = StringUnion("add", "update");
|
||||
|
||||
type EventType = typeof allowedEventTypes.type;
|
||||
|
||||
type Trigger = MetaRowRecord<"_grist_Triggers">;
|
||||
|
||||
export interface WebHookSecret {
|
||||
url: string;
|
||||
unsubscribeKey: string;
|
||||
}
|
||||
|
||||
// Processes triggers for records changed as described in an ActionSummary,
|
||||
// initiating webhooks and automations.
|
||||
// An instance of this class should have .handle() called on it exactly once.
|
||||
export class TriggersHandler {
|
||||
// Converts a column ref to colId by looking it up in _grist_Tables_column
|
||||
private _getColId: (rowId: (number | "new")) => string;
|
||||
|
||||
constructor(private _activeDoc: ActiveDoc) {
|
||||
}
|
||||
|
||||
public handle(summary: ActionSummary) {
|
||||
const docData = this._activeDoc.docData;
|
||||
if (!docData) {
|
||||
return;
|
||||
} // Happens on doc creation while processing InitNewDoc action.
|
||||
|
||||
const triggersTable = docData.getMetaTable("_grist_Triggers");
|
||||
const getTableId = docData.getMetaTable("_grist_Tables").getMetaRowPropFunc("tableId");
|
||||
this._getColId = docData.getMetaTable("_grist_Tables_column").getMetaRowPropFunc("colId");
|
||||
|
||||
const triggersByTableRef = _.groupBy(triggersTable.getRecords(), "tableRef");
|
||||
for (const [tableRef, triggers] of _.toPairs(triggersByTableRef)) {
|
||||
const tableId = getTableId(Number(tableRef)); // groupBy makes tableRef a string
|
||||
const tableDelta = summary.tableDeltas[tableId];
|
||||
if (!tableDelta) {
|
||||
continue; // this table was not modified by these actions
|
||||
}
|
||||
// Handle tables in parallel (fetching table values from document DB)
|
||||
this._handleTableTriggers(
|
||||
tableId, tableDelta, triggers
|
||||
).catch(() => log.error("Error handling triggers"));
|
||||
}
|
||||
}
|
||||
|
||||
private async _handleTableTriggers(
|
||||
tableId: string, delta: TableDelta, triggers: Trigger[],
|
||||
) {
|
||||
const recordDeltas = new Map<number, RecordDelta>();
|
||||
delta.updateRows.forEach(id =>
|
||||
recordDeltas.set(id, {existedBefore: true, existedAfter: true}));
|
||||
// A row ID can appear in both updateRows and addRows, although it probably shouldn't
|
||||
// Added row IDs override updated rows because they didn't exist before
|
||||
delta.addRows.forEach(id =>
|
||||
recordDeltas.set(id, {existedBefore: false, existedAfter: true}));
|
||||
|
||||
// If we allow subscribing to deletion in the future
|
||||
// delta.removeRows.forEach(id =>
|
||||
// recordDeltas.set(id, {existedBefore: true, existedAfter: false}));
|
||||
|
||||
// Fetch the modified records in full so they can be sent in webhooks
|
||||
// They will also be used to check if the record is ready
|
||||
const filters = {id: [...recordDeltas.keys()]};
|
||||
const bulkColValues = fromTableDataAction(await this._activeDoc.fetchQuery(docSession, {tableId, filters}));
|
||||
|
||||
triggers.forEach(trigger => {
|
||||
const actions = JSON.parse(trigger.actions) as TriggerAction[];
|
||||
bulkColValues.id.forEach((rowId, rowIndex) => {
|
||||
// Handle triggers in parallel (talking to redis)
|
||||
this._handleTrigger(
|
||||
trigger, actions, bulkColValues, rowIndex, rowId, recordDeltas.get(rowId)!
|
||||
).catch(() => log.error("Error handling trigger action"));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// Handles a single trigger for a single record, initiating all the corresponding actions
|
||||
private async _handleTrigger(
|
||||
trigger: Trigger, actions: TriggerAction[],
|
||||
bulkColValues: TableColValues, rowIndex: number, rowId: number, recordDelta: RecordDelta,
|
||||
) {
|
||||
let isReady: boolean;
|
||||
if (!trigger.isReadyColRef) {
|
||||
// User hasn't configured a column, so all records are considered ready immediately
|
||||
isReady = true;
|
||||
} else {
|
||||
const colId = this._getColId(trigger.isReadyColRef)!;
|
||||
const isReadyCellValue = bulkColValues[colId]?.[rowIndex];
|
||||
if (typeof isReadyCellValue !== "boolean") {
|
||||
// Likely causes: column not found or error in formula
|
||||
isReady = false;
|
||||
} else {
|
||||
isReady = isReadyCellValue;
|
||||
}
|
||||
}
|
||||
|
||||
// Globally unique identifier of this record and trigger combination
|
||||
// trigger.tableRef is probably redundant given trigger.id, just being cautious
|
||||
const existedBeforeKey = `${this._activeDoc.docName}:${trigger.id}:${trigger.tableRef}:${rowId}`;
|
||||
|
||||
// Only store existedBefore if it isn't stored already
|
||||
const existedBefore = existedBeforeKey in existedBeforeMemory ?
|
||||
existedBeforeMemory[existedBeforeKey] : recordDelta.existedBefore;
|
||||
|
||||
if (!isReady) {
|
||||
existedBeforeMemory[existedBeforeKey] = existedBefore;
|
||||
return;
|
||||
}
|
||||
|
||||
// Now that the record is ready, clear the stored existedBefore value
|
||||
// so that future events for this record are accurate
|
||||
delete existedBeforeMemory[existedBeforeKey];
|
||||
|
||||
let eventType: EventType;
|
||||
if (existedBefore) {
|
||||
eventType = "update";
|
||||
// If we allow subscribing to deletion in the future
|
||||
// if (recordDelta.existedAfter) {
|
||||
// eventType = "update";
|
||||
// } else {
|
||||
// eventType = "remove";
|
||||
// }
|
||||
} else {
|
||||
eventType = "add";
|
||||
}
|
||||
if (!trigger.eventTypes!.includes(eventType)) {
|
||||
// The user hasn't subscribed to the type of change that happened
|
||||
return;
|
||||
}
|
||||
|
||||
// All the values in this record
|
||||
const event = _.mapValues(bulkColValues, col => col[rowIndex]);
|
||||
|
||||
actions.forEach(action => {
|
||||
// Handle actions in parallel
|
||||
this._handleTriggerAction(
|
||||
action, event
|
||||
).catch(() => log.error("Error handling trigger action"));
|
||||
});
|
||||
}
|
||||
|
||||
private async _handleTriggerAction(action: TriggerAction, event: Event) {
|
||||
// TODO use event queue for reliability
|
||||
if (action.type === "webhook") {
|
||||
const key = {id: action.id, docId: this._activeDoc.docName};
|
||||
let webhook = webhookCache.get(key);
|
||||
if (!webhook) {
|
||||
const secret = await this._activeDoc.getHomeDbManager()?.getSecret(key.id, key.docId);
|
||||
if (!secret) {
|
||||
return;
|
||||
}
|
||||
webhook = JSON.parse(secret);
|
||||
webhookCache.set(key, webhook!);
|
||||
}
|
||||
const url = webhook!.url;
|
||||
if (!isUrlAllowed(url)) {
|
||||
log.warn(`Webhook not sent to forbidden URL: ${url}`);
|
||||
return;
|
||||
}
|
||||
pendingEvents.push({url: webhook!.url, event});
|
||||
if (!startedSending) {
|
||||
startedSending = true;
|
||||
setInterval(sendPendingEvents, 2000);
|
||||
}
|
||||
} else {
|
||||
throw new Error("Unknown action type " + action.type);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let pendingEvents: Array<{ url: string, event: Event }> = [];
|
||||
let startedSending = false;
|
||||
|
||||
function sendPendingEvents() {
|
||||
const pending = pendingEvents;
|
||||
pendingEvents = [];
|
||||
for (const [url, group] of _.toPairs(_.groupBy(pending, "url"))) {
|
||||
const body = JSON.stringify(_.map(group, "event").reverse());
|
||||
sendWebhookWithRetries(url, body).catch(() => log.error("Webhook failed!"));
|
||||
}
|
||||
}
|
||||
|
||||
async function sendWebhookWithRetries(url: string, body: string) {
|
||||
const maxAttempts = 20;
|
||||
const maxWait = 64;
|
||||
let wait = 1;
|
||||
for (let i = 0; i < maxAttempts; i++) {
|
||||
const response = await fetch(url, {
|
||||
method: 'POST',
|
||||
body,
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
});
|
||||
if (response.status === 200) {
|
||||
return;
|
||||
} else {
|
||||
await delay((wait + Math.random()) * 1000);
|
||||
if (wait < maxWait) {
|
||||
wait *= 2;
|
||||
}
|
||||
}
|
||||
}
|
||||
throw new Error("Webhook failed!");
|
||||
}
|
||||
|
||||
|
||||
export function isUrlAllowed(urlString: string) {
|
||||
let url: URL;
|
||||
try {
|
||||
url = new URL(urlString);
|
||||
} catch (e) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// http (no s) is only allowed for localhost for testing.
|
||||
// localhost still needs to be explicitly permitted, and it shouldn't be outside dev
|
||||
if (url.protocol !== "https:" && url.hostname !== "localhost") {
|
||||
return false;
|
||||
}
|
||||
|
||||
return (process.env.ALLOWED_WEBHOOK_DOMAINS || "").split(",").some(domain =>
|
||||
domain && url.host.endsWith(domain)
|
||||
);
|
||||
}
|
||||
Reference in New Issue
Block a user