gristlabs_grist-core/app/server/lib/Triggers.ts
Alex Hall 8c1f8bc9a6 (core) Test webhooks
Summary:
Tests DocApi endpoints _subscribe and _unsubscribe, including various bad inputs.

Tests that webhooks are sent to a test express server, with retrying on failure, filtered by event type, and waiting for isReadyColumn.

Test Plan: this

Reviewers: paulfitz

Reviewed By: paulfitz

Differential Revision: https://phab.getgrist.com/D3042
2021-09-28 01:13:54 +02:00

283 lines
9.5 KiB
TypeScript

import {ActionSummary, TableDelta} from "app/common/ActionSummary";
import {delay} from "app/common/delay";
import {fromTableDataAction, TableColValues} from "app/common/DocActions";
import {StringUnion} from "app/common/StringUnion";
import {MetaRowRecord} from "app/common/TableData";
import {CellValue} from "app/plugin/GristData";
import {ActiveDoc} from "app/server/lib/ActiveDoc";
import {makeExceptionalDocSession} from "app/server/lib/DocSession";
import * as log from "app/server/lib/log";
import * as _ from "lodash";
import * as LRUCache from "lru-cache";
import fetch from "node-fetch";
// TODO replace with redis
// Keeps track of whether records existed before changes to them started
// to determine the correct event type when the record is ready
const existedBeforeMemory: { [key: string]: boolean } = {};
// Only owners can manage triggers, but any user's activity can trigger them
// and the corresponding actions get the full values
const docSession = makeExceptionalDocSession('system');
// DB cache for webhook secrets
const webhookCache = new LRUCache<{ id: string, docId: string }, WebHookSecret>({max: 10 * 1000});
// Describes the change in existence to a record, which determines the event type
interface RecordDelta {
existedBefore: boolean;
existedAfter: boolean;
}
// Union discriminated by type
type TriggerAction = WebhookAction | PythonAction;
export interface WebhookAction {
type: "webhook";
id: string;
}
// Just hypothetical
interface PythonAction {
type: "python";
code: string;
}
// Payload sent to webhook
// Simply the values in a record
interface Event {
[colId: string]: CellValue;
}
export const allowedEventTypes = StringUnion("add", "update");
type EventType = typeof allowedEventTypes.type;
type Trigger = MetaRowRecord<"_grist_Triggers">;
export interface WebHookSecret {
url: string;
unsubscribeKey: string;
}
// Processes triggers for records changed as described in an ActionSummary,
// initiating webhooks and automations.
// An instance of this class should have .handle() called on it exactly once.
export class TriggersHandler {
// Converts a column ref to colId by looking it up in _grist_Tables_column
private _getColId: (rowId: number) => string|undefined;
constructor(private _activeDoc: ActiveDoc) {
}
public handle(summary: ActionSummary) {
const docData = this._activeDoc.docData;
if (!docData) {
return;
} // Happens on doc creation while processing InitNewDoc action.
const triggersTable = docData.getMetaTable("_grist_Triggers");
const getTableId = docData.getMetaTable("_grist_Tables").getMetaRowPropFunc("tableId");
this._getColId = docData.getMetaTable("_grist_Tables_column").getMetaRowPropFunc("colId");
const triggersByTableRef = _.groupBy(triggersTable.getRecords(), "tableRef");
for (const [tableRef, triggers] of _.toPairs(triggersByTableRef)) {
const tableId = getTableId(Number(tableRef))!; // groupBy makes tableRef a string
const tableDelta = summary.tableDeltas[tableId];
if (!tableDelta) {
continue; // this table was not modified by these actions
}
// Handle tables in parallel (fetching table values from document DB)
this._handleTableTriggers(
tableId, tableDelta, triggers
).catch(() => log.error("Error handling triggers"));
}
}
private async _handleTableTriggers(
tableId: string, delta: TableDelta, triggers: Trigger[],
) {
const recordDeltas = new Map<number, RecordDelta>();
delta.updateRows.forEach(id =>
recordDeltas.set(id, {existedBefore: true, existedAfter: true}));
// A row ID can appear in both updateRows and addRows, although it probably shouldn't
// Added row IDs override updated rows because they didn't exist before
delta.addRows.forEach(id =>
recordDeltas.set(id, {existedBefore: false, existedAfter: true}));
// If we allow subscribing to deletion in the future
// delta.removeRows.forEach(id =>
// recordDeltas.set(id, {existedBefore: true, existedAfter: false}));
// Fetch the modified records in full so they can be sent in webhooks
// They will also be used to check if the record is ready
const filters = {id: [...recordDeltas.keys()]};
const bulkColValues = fromTableDataAction(await this._activeDoc.fetchQuery(docSession, {tableId, filters}));
log.info(`Processing ${triggers.length} triggers for ${bulkColValues.id.length} records of ${tableId}`);
for (const trigger of triggers) {
const actions = JSON.parse(trigger.actions) as TriggerAction[];
for (let rowIndex = 0; rowIndex < bulkColValues.id.length; rowIndex++) {
const rowId = bulkColValues.id[rowIndex];
// Handle triggers serially to make order predictable
await this._handleTrigger(
trigger, actions, bulkColValues, rowIndex, rowId, recordDeltas.get(rowId)!
);
}
}
}
// Handles a single trigger for a single record, initiating all the corresponding actions
private async _handleTrigger(
trigger: Trigger, actions: TriggerAction[],
bulkColValues: TableColValues, rowIndex: number, rowId: number, recordDelta: RecordDelta,
) {
let isReady: boolean;
if (!trigger.isReadyColRef) {
// User hasn't configured a column, so all records are considered ready immediately
isReady = true;
} else {
const colId = this._getColId(trigger.isReadyColRef)!;
const isReadyCellValue = bulkColValues[colId]?.[rowIndex];
if (typeof isReadyCellValue !== "boolean") {
// Likely causes: column not found or error in formula
isReady = false;
} else {
isReady = isReadyCellValue;
}
}
// Globally unique identifier of this record and trigger combination
// trigger.tableRef is probably redundant given trigger.id, just being cautious
const existedBeforeKey = `${this._activeDoc.docName}:${trigger.id}:${trigger.tableRef}:${rowId}`;
// Only store existedBefore if it isn't stored already
const existedBefore = existedBeforeKey in existedBeforeMemory ?
existedBeforeMemory[existedBeforeKey] : recordDelta.existedBefore;
if (!isReady) {
existedBeforeMemory[existedBeforeKey] = existedBefore;
return;
}
// Now that the record is ready, clear the stored existedBefore value
// so that future events for this record are accurate
delete existedBeforeMemory[existedBeforeKey];
let eventType: EventType;
if (existedBefore) {
eventType = "update";
// If we allow subscribing to deletion in the future
// if (recordDelta.existedAfter) {
// eventType = "update";
// } else {
// eventType = "remove";
// }
} else {
eventType = "add";
}
if (!trigger.eventTypes!.includes(eventType)) {
// The user hasn't subscribed to the type of change that happened
return;
}
// All the values in this record
const event = _.mapValues(bulkColValues, col => col[rowIndex]);
// Handle actions serially to make order predictable
for (const action of actions) {
await this._handleTriggerAction(action, event);
}
}
private async _handleTriggerAction(action: TriggerAction, event: Event) {
// TODO use event queue for reliability
if (action.type === "webhook") {
const key = {id: action.id, docId: this._activeDoc.docName};
let webhook = webhookCache.get(key);
if (!webhook) {
const secret = await this._activeDoc.getHomeDbManager()?.getSecret(key.id, key.docId);
if (!secret) {
return;
}
webhook = JSON.parse(secret);
webhookCache.set(key, webhook!);
}
const url = webhook!.url;
if (!isUrlAllowed(url)) {
log.warn(`Webhook not sent to forbidden URL: ${url}`);
return;
}
pendingEvents.push({url: webhook!.url, event});
if (!startedSending) {
startedSending = true;
setInterval(sendPendingEvents, 2000);
}
} else {
throw new Error("Unknown action type " + action.type);
}
}
}
let pendingEvents: Array<{ url: string, event: Event }> = [];
let startedSending = false;
function sendPendingEvents() {
const pending = pendingEvents;
pendingEvents = [];
for (const [url, group] of _.toPairs(_.groupBy(pending, "url"))) {
const body = JSON.stringify(_.map(group, "event"));
sendWebhookWithRetries(url, body).catch(() => log.error("Webhook failed!"));
}
}
async function sendWebhookWithRetries(url: string, body: string) {
const maxAttempts = 20;
const maxWait = 64;
let wait = 1;
for (let i = 0; i < maxAttempts; i++) {
try {
const response = await fetch(url, {
method: 'POST',
body,
headers: {
'Content-Type': 'application/json',
},
});
if (response.status === 200) {
return;
}
log.warn(`Webhook responded with status ${response.status}`);
} catch (e) {
log.warn(`Webhook error: ${e}`);
}
await delay((wait + Math.random()) * 1000);
if (wait < maxWait) {
wait *= 2;
}
}
throw new Error("Webhook failed!");
}
export function isUrlAllowed(urlString: string) {
let url: URL;
try {
url = new URL(urlString);
} catch (e) {
return false;
}
// http (no s) is only allowed for localhost for testing.
// localhost still needs to be explicitly permitted, and it shouldn't be outside dev
if (url.protocol !== "https:" && url.hostname !== "localhost") {
return false;
}
return (process.env.ALLOWED_WEBHOOK_DOMAINS || "").split(",").some(domain =>
domain && url.host.endsWith(domain)
);
}