From 276adc5f519c12bb8bc1a18bec32c65e7542ff23 Mon Sep 17 00:00:00 2001 From: Alex Hall Date: Fri, 15 Oct 2021 15:12:13 +0200 Subject: [PATCH] (core) Starting to make webhooks more robust Summary: - Puts events on a queue in memory and ensures they are sent in the order they were generated. - Makes the caller (Sharing.ts) wait until changed records have been fetched from the DB, but allows it to continue after while remaining work happens asynchronously. - Gathers all new webhook events into an array so they can be backed up to the queue on redis in a single command (in a future diff). - Uses changes in isReady to determine event type, no more 'existed before' The structure of the code has changed a lot, so I think the scope of the diff needs to stop here. Lots of work is still deferred in TODOs. Test Plan: Updated existing test. Actually dropped testing of retry on failures and slowness because it no longer made sense to keep that as part of the current test, so a new test for that will be added in a future diff. Reviewers: paulfitz Reviewed By: paulfitz Differential Revision: https://phab.getgrist.com/D3074 --- app/server/lib/ActiveDoc.ts | 10 + app/server/lib/Sharing.ts | 3 +- app/server/lib/Triggers.ts | 413 +++++++++++++++++++++++------------- 3 files changed, 272 insertions(+), 154 deletions(-) diff --git a/app/server/lib/ActiveDoc.ts b/app/server/lib/ActiveDoc.ts index 9476625a..386dac2e 100644 --- a/app/server/lib/ActiveDoc.ts +++ b/app/server/lib/ActiveDoc.ts @@ -4,6 +4,8 @@ * change events. */ +import { ActionSummary } from "app/common/ActionSummary"; +import { DocTriggers } from "app/server/lib/Triggers"; import * as assert from 'assert'; import {Mutex} from 'async-mutex'; import * as bluebird from 'bluebird'; @@ -138,6 +140,7 @@ export class ActiveDoc extends EventEmitter { // result). protected _modificationLock: Mutex = new Mutex(); + private _triggers: DocTriggers; private _dataEngine: Promise|undefined; private _activeDocImport: ActiveDocImport; private _onDemandActions: OnDemandActions; @@ -168,6 +171,7 @@ export class ActiveDoc extends EventEmitter { this._docName = docName; this.docStorage = new DocStorage(docManager.storageManager, docName); this.docClients = new DocClients(this); + this._triggers = new DocTriggers(this); this._actionHistory = new ActionHistoryImpl(this.docStorage); this.docPluginManager = new DocPluginManager(docManager.pluginManager.getPlugins(), docManager.pluginManager.appRoot!, this, this._docManager.gristServer); @@ -288,6 +292,10 @@ export class ActiveDoc extends EventEmitter { return this._actionHistory; } + public handleTriggers(summary: ActionSummary): Promise { + return this._triggers.handle(summary); + } + /** * Adds a client of this doc to the list of connected clients. * @param client: The client object maintaining the websocket connection. @@ -319,6 +327,8 @@ export class ActiveDoc extends EventEmitter { this.docClients.removeAllClients(); } + this._triggers.shutdown(); + // Clear the MapWithTTL to remove all timers from the event loop. this._fetchCache.clear(); diff --git a/app/server/lib/Sharing.ts b/app/server/lib/Sharing.ts index f9274654..511f9bca 100644 --- a/app/server/lib/Sharing.ts +++ b/app/server/lib/Sharing.ts @@ -18,7 +18,6 @@ import { ActionHistory, asActionGroup, getActionUndoInfo} from './ActionHistory' import {summarizeAction} from "./ActionSummary"; import {ActiveDoc} from './ActiveDoc'; import {makeExceptionalDocSession, OptDocSession} from './DocSession'; -import {TriggersHandler} from "./Triggers"; import {WorkCoordinator} from './WorkCoordinator'; // Describes the request to apply a UserActionBundle. It includes a Client (so that broadcast @@ -284,7 +283,7 @@ export class Sharing { await this._activeDoc.processActionBundle(ownActionBundle); const actionSummary = summarizeAction(localActionBundle); - new TriggersHandler(this._activeDoc).handle(actionSummary); + await this._activeDoc.handleTriggers(actionSummary); // Broadcast the action to connected browsers. const actionGroup = asActionGroup(this._actionHistory, localActionBundle, { diff --git a/app/server/lib/Triggers.ts b/app/server/lib/Triggers.ts index 9e0ad1d6..13b7d87f 100644 --- a/app/server/lib/Triggers.ts +++ b/app/server/lib/Triggers.ts @@ -1,34 +1,32 @@ -import {ActionSummary, TableDelta} from "app/common/ActionSummary"; -import {delay} from "app/common/delay"; -import {fromTableDataAction, TableColValues} from "app/common/DocActions"; -import {StringUnion} from "app/common/StringUnion"; -import {MetaRowRecord} from "app/common/TableData"; -import {CellValue} from "app/plugin/GristData"; -import {ActiveDoc} from "app/server/lib/ActiveDoc"; -import {makeExceptionalDocSession} from "app/server/lib/DocSession"; -import * as log from "app/server/lib/log"; -import * as _ from "lodash"; -import * as LRUCache from "lru-cache"; -import fetch from "node-fetch"; +import { ActionSummary, TableDelta } from 'app/common/ActionSummary'; +import { delay } from 'app/common/delay'; +import { fromTableDataAction, RowRecord, TableColValues, TableDataAction } from 'app/common/DocActions'; +import { StringUnion } from 'app/common/StringUnion'; +import { MetaRowRecord } from 'app/common/TableData'; +import { CellDelta } from 'app/common/TabularDiff'; +import { ActiveDoc } from 'app/server/lib/ActiveDoc'; +import { makeExceptionalDocSession } from 'app/server/lib/DocSession'; +import * as log from 'app/server/lib/log'; +import { promisifyAll } from 'bluebird'; +import * as _ from 'lodash'; +import * as LRUCache from 'lru-cache'; +import fetch from 'node-fetch'; +import { createClient, RedisClient } from 'redis'; -// TODO replace with redis -// Keeps track of whether records existed before changes to them started -// to determine the correct event type when the record is ready -const existedBeforeMemory: { [key: string]: boolean } = {}; +promisifyAll(RedisClient.prototype); // Only owners can manage triggers, but any user's activity can trigger them // and the corresponding actions get the full values const docSession = makeExceptionalDocSession('system'); -// DB cache for webhook secrets -const webhookCache = new LRUCache<{ id: string, docId: string }, WebHookSecret>({max: 10 * 1000}); - // Describes the change in existence to a record, which determines the event type interface RecordDelta { existedBefore: boolean; existedAfter: boolean; } +type RecordDeltas = Map; + // Union discriminated by type type TriggerAction = WebhookAction | PythonAction; @@ -43,10 +41,9 @@ interface PythonAction { code: string; } -// Payload sent to webhook -// Simply the values in a record -interface Event { - [colId: string]: CellValue; +interface WebHookEvent { + payload: RowRecord; + id: string; } export const allowedEventTypes = StringUnion("add", "update"); @@ -60,17 +57,49 @@ export interface WebHookSecret { unsubscribeKey: string; } -// Processes triggers for records changed as described in an ActionSummary, +// Work to do after fetching values from the document +interface Task { + tableId: string; + tableDelta: TableDelta; + triggers: any; + tableDataAction: Promise; + recordDeltas: RecordDeltas; +} + +// Processes triggers for records changed as described in ActionSummary objects, // initiating webhooks and automations. -// An instance of this class should have .handle() called on it exactly once. -export class TriggersHandler { +export class DocTriggers { // Converts a column ref to colId by looking it up in _grist_Tables_column private _getColId: (rowId: number) => string|undefined; + // Events that need to be sent to webhooks in FIFO order. + private _webHookEventQueue: WebHookEvent[] = []; + + // DB cache for webhook secrets + private _webhookCache = new LRUCache({max: 1000}); + + private _shuttingDown: boolean = false; + + private _sending: boolean = false; + + private _redisClient: RedisClient | undefined; + constructor(private _activeDoc: ActiveDoc) { + const redisUrl = process.env.REDIS_URL; + if (redisUrl) { + this._redisClient = createClient(redisUrl); + // TODO check for existing events on redis queue + } } - public handle(summary: ActionSummary) { + public shutdown() { + this._shuttingDown = true; + if (!this._sending) { + this._redisClient?.quitAsync(); + } + } + + public async handle(summary: ActionSummary) { const docData = this._activeDoc.docData; if (!docData) { return; @@ -81,93 +110,156 @@ export class TriggersHandler { this._getColId = docData.getMetaTable("_grist_Tables_column").getMetaRowPropFunc("colId"); const triggersByTableRef = _.groupBy(triggersTable.getRecords(), "tableRef"); - for (const [tableRef, triggers] of _.toPairs(triggersByTableRef)) { + + const tasks: Task[] = []; + for (const tableRef of Object.keys(triggersByTableRef).sort()) { + const triggers = triggersByTableRef[tableRef]; const tableId = getTableId(Number(tableRef))!; // groupBy makes tableRef a string const tableDelta = summary.tableDeltas[tableId]; - if (!tableDelta) { - continue; // this table was not modified by these actions + if (tableDelta) { + const recordDeltas = this._getRecordDeltas(tableDelta); + const filters = {id: [...recordDeltas.keys()]}; + + // Fetch the modified records in full so they can be sent in webhooks + // They will also be used to check if the record is ready + const tableDataAction = this._activeDoc.fetchQuery(docSession, {tableId, filters}); + tasks.push({tableId, tableDelta, triggers, tableDataAction, recordDeltas}); } - // Handle tables in parallel (fetching table values from document DB) - this._handleTableTriggers( - tableId, tableDelta, triggers - ).catch(() => log.error("Error handling triggers")); } + + // Fetch values from document DB in parallel + await Promise.all(tasks.map(t => t.tableDataAction)); + + const events: WebHookEvent[] = []; + for (const task of tasks) { + events.push(...this._handleTask(task, await task.tableDataAction)); + } + this._webHookEventQueue.push(...events); + + if (!this._sending && events.length) { + this._sending = true; + + const startSendLoop = () => { + this._sendLoop().catch((e) => { + log.error(`_sendLoop failed: ${e}`); + startSendLoop(); + }); + }; + startSendLoop(); + } + + // TODO also push to redis queue } - private async _handleTableTriggers( - tableId: string, delta: TableDelta, triggers: Trigger[], - ) { + private _getRecordDeltas(tableDelta: TableDelta): RecordDeltas { const recordDeltas = new Map(); - delta.updateRows.forEach(id => + tableDelta.updateRows.forEach(id => recordDeltas.set(id, {existedBefore: true, existedAfter: true})); // A row ID can appear in both updateRows and addRows, although it probably shouldn't // Added row IDs override updated rows because they didn't exist before - delta.addRows.forEach(id => + tableDelta.addRows.forEach(id => recordDeltas.set(id, {existedBefore: false, existedAfter: true})); // If we allow subscribing to deletion in the future // delta.removeRows.forEach(id => // recordDeltas.set(id, {existedBefore: true, existedAfter: false})); - // Fetch the modified records in full so they can be sent in webhooks - // They will also be used to check if the record is ready - const filters = {id: [...recordDeltas.keys()]}; - const bulkColValues = fromTableDataAction(await this._activeDoc.fetchQuery(docSession, {tableId, filters})); + return recordDeltas; + } + + private _handleTask( + {tableDelta, tableId, triggers, recordDeltas}: Task, + tableDataAction: TableDataAction, + ) { + const bulkColValues = fromTableDataAction(tableDataAction); log.info(`Processing ${triggers.length} triggers for ${bulkColValues.id.length} records of ${tableId}`); + const makePayload = _.memoize((rowIndex: number) => + _.mapValues(bulkColValues, col => col[rowIndex]) as RowRecord + ); + + const result: WebHookEvent[] = []; for (const trigger of triggers) { const actions = JSON.parse(trigger.actions) as TriggerAction[]; + const webhookActions = actions.filter(act => act.type === "webhook") as WebhookAction[]; + if (!webhookActions.length) { + continue; + } - for (let rowIndex = 0; rowIndex < bulkColValues.id.length; rowIndex++) { - const rowId = bulkColValues.id[rowIndex]; - // Handle triggers serially to make order predictable - await this._handleTrigger( - trigger, actions, bulkColValues, rowIndex, rowId, recordDeltas.get(rowId)! - ); + const rowIndexesToSend: number[] = _.range(bulkColValues.id.length).filter(rowIndex => { + const rowId = bulkColValues.id[rowIndex]; + return this._shouldTriggerActions( + trigger, bulkColValues, rowIndex, rowId, recordDeltas.get(rowId)!, tableDelta, + ); + } + ); + + for (const action of webhookActions) { + for (const rowIndex of rowIndexesToSend) { + const event = {id: action.id, payload: makePayload(rowIndex)}; + result.push(event); + } } } + return result; } - // Handles a single trigger for a single record, initiating all the corresponding actions - private async _handleTrigger( - trigger: Trigger, actions: TriggerAction[], - bulkColValues: TableColValues, rowIndex: number, rowId: number, recordDelta: RecordDelta, - ) { - let isReady: boolean; + /** + * Determines if actions should be triggered for a single record and trigger. + */ + private _shouldTriggerActions( + trigger: Trigger, + bulkColValues: TableColValues, + rowIndex: number, + rowId: number, + recordDelta: RecordDelta, + tableDelta: TableDelta, + ): boolean { + let readyBefore: boolean; if (!trigger.isReadyColRef) { // User hasn't configured a column, so all records are considered ready immediately - isReady = true; + readyBefore = recordDelta.existedBefore; } else { - const colId = this._getColId(trigger.isReadyColRef)!; - const isReadyCellValue = bulkColValues[colId]?.[rowIndex]; - if (typeof isReadyCellValue !== "boolean") { - // Likely causes: column not found or error in formula - isReady = false; + const isReadyColId = this._getColId(trigger.isReadyColRef)!; + + // Must be the actual boolean `true`, not just anything truthy + const isReady = bulkColValues[isReadyColId][rowIndex] === true; + if (!isReady) { + return false; + } + + const cellDelta: CellDelta | undefined = tableDelta.columnDeltas[isReadyColId]?.[rowId]; + if (!recordDelta.existedBefore) { + readyBefore = false; + } else if (!cellDelta ) { + // Cell wasn't changed, and the record is ready now, so it was ready before. + // This assumes that the ActionSummary contains all changes to the isReady column. + // TODO ensure ActionSummary actually contains all changes, right now bulk changes are truncated. + readyBefore = true; } else { - isReady = isReadyCellValue; + const deltaBefore = cellDelta[0]; + if (deltaBefore === null) { + // The record didn't exist before, so it definitely wasn't ready + // (although we probably shouldn't reach this since we already checked recordDelta.existedBefore) + readyBefore = false; + } else if (deltaBefore === "?") { + // The ActionSummary shouldn't contain this kind of delta at all + // since it comes from a single action bundle, not a combination of summaries. + log.warn('Unexpected deltaBefore === "?"', {trigger, isReadyColId, rowId, docId: this._activeDoc.docName}); + readyBefore = true; + } else { + // Only remaining case is that deltaBefore is a single-element array containing the previous value. + const [valueBefore] = deltaBefore; + + // Must be the actual boolean `true`, not just anything truthy + readyBefore = valueBefore === true; + } } } - // Globally unique identifier of this record and trigger combination - // trigger.tableRef is probably redundant given trigger.id, just being cautious - const existedBeforeKey = `${this._activeDoc.docName}:${trigger.id}:${trigger.tableRef}:${rowId}`; - - // Only store existedBefore if it isn't stored already - const existedBefore = existedBeforeKey in existedBeforeMemory ? - existedBeforeMemory[existedBeforeKey] : recordDelta.existedBefore; - - if (!isReady) { - existedBeforeMemory[existedBeforeKey] = existedBefore; - return; - } - - // Now that the record is ready, clear the stored existedBefore value - // so that future events for this record are accurate - delete existedBeforeMemory[existedBeforeKey]; - let eventType: EventType; - if (existedBefore) { + if (readyBefore) { eventType = "update"; // If we allow subscribing to deletion in the future // if (recordDelta.existedAfter) { @@ -178,90 +270,107 @@ export class TriggersHandler { } else { eventType = "add"; } - if (!trigger.eventTypes!.includes(eventType)) { - // The user hasn't subscribed to the type of change that happened + + return trigger.eventTypes!.includes(eventType); + } + + private async _getWebHookUrl(id: string): Promise { + let webhook = this._webhookCache.get(id); + if (!webhook) { + const secret = await this._activeDoc.getHomeDbManager()?.getSecret(id, this._activeDoc.docName); + if (!secret) { + return; + } + webhook = JSON.parse(secret); + this._webhookCache.set(id, webhook!); + } + const url = webhook!.url; + if (!isUrlAllowed(url)) { + log.warn(`Webhook not sent to forbidden URL: ${url}`); return; } - - // All the values in this record - const event = _.mapValues(bulkColValues, col => col[rowIndex]); - - // Handle actions serially to make order predictable - for (const action of actions) { - await this._handleTriggerAction(action, event); - } + return url; } - private async _handleTriggerAction(action: TriggerAction, event: Event) { - // TODO use event queue for reliability - if (action.type === "webhook") { - const key = {id: action.id, docId: this._activeDoc.docName}; - let webhook = webhookCache.get(key); - if (!webhook) { - const secret = await this._activeDoc.getHomeDbManager()?.getSecret(key.id, key.docId); - if (!secret) { - return; + private async _sendLoop() { + log.info("Starting _sendLoop"); + + // TODO delay/prevent shutting down while queue isn't empty? + while (!this._shuttingDown) { + if (!this._webHookEventQueue.length) { + await delay(1000); + continue; + } + const id = this._webHookEventQueue[0].id; + const batch = _.takeWhile(this._webHookEventQueue.slice(0, 100), {id}); + const body = JSON.stringify(batch.map(e => e.payload)); + const url = await this._getWebHookUrl(id); + let success: boolean; + if (!url) { + success = true; + } else { + success = await this._sendWebhookWithRetries(url, body); + } + if (success) { + this._webHookEventQueue.splice(0, batch.length); + // TODO also remove on redis + } else if (!this._shuttingDown) { + // TODO reorder queue on failure + } + } + + log.info("Ended _sendLoop"); + + this._redisClient?.quitAsync().catch(e => + // Catch error to prevent sendLoop being restarted + log.warn("Error quitting redis: " + e) + ); + } + + private async _sendWebhookWithRetries(url: string, body: string) { + const maxAttempts = 20; + const maxWait = 64; + let wait = 1; + for (let attempt = 0; attempt < maxAttempts; attempt++) { + if (this._shuttingDown) { + return false; + } + try { + const response = await fetch(url, { + method: 'POST', + body, + headers: { + 'Content-Type': 'application/json', + }, + }); + if (response.status === 200) { + return true; } - webhook = JSON.parse(secret); - webhookCache.set(key, webhook!); + log.warn(`Webhook responded with status ${response.status}`); + } catch (e) { + log.warn(`Webhook error: ${e}`); } - const url = webhook!.url; - if (!isUrlAllowed(url)) { - log.warn(`Webhook not sent to forbidden URL: ${url}`); - return; + + // Don't wait any more if this is the last attempt. + if (attempt >= maxAttempts - 1) { + return false; } - pendingEvents.push({url: webhook!.url, event}); - if (!startedSending) { - startedSending = true; - setInterval(sendPendingEvents, 2000); + + // Wait `wait` seconds, checking this._shuttingDown every second. + for (let waitIndex = 0; waitIndex < wait; waitIndex++) { + if (this._shuttingDown) { + return false; + } + await delay(1000); + } + if (wait < maxWait) { + wait *= 2; } - } else { - throw new Error("Unknown action type " + action.type); } + return false; } } -let pendingEvents: Array<{ url: string, event: Event }> = []; -let startedSending = false; - -function sendPendingEvents() { - const pending = pendingEvents; - pendingEvents = []; - for (const [url, group] of _.toPairs(_.groupBy(pending, "url"))) { - const body = JSON.stringify(_.map(group, "event")); - sendWebhookWithRetries(url, body).catch(() => log.error("Webhook failed!")); - } -} - -async function sendWebhookWithRetries(url: string, body: string) { - const maxAttempts = 20; - const maxWait = 64; - let wait = 1; - for (let i = 0; i < maxAttempts; i++) { - try { - const response = await fetch(url, { - method: 'POST', - body, - headers: { - 'Content-Type': 'application/json', - }, - }); - if (response.status === 200) { - return; - } - log.warn(`Webhook responded with status ${response.status}`); - } catch (e) { - log.warn(`Webhook error: ${e}`); - } - await delay((wait + Math.random()) * 1000); - if (wait < maxWait) { - wait *= 2; - } - } - throw new Error("Webhook failed!"); -} - - export function isUrlAllowed(urlString: string) { let url: URL; try {