From 3c4d71aeca4391c14ecdc988ec164d93f4e9b936 Mon Sep 17 00:00:00 2001 From: Alex Hall Date: Thu, 23 Sep 2021 01:06:23 +0200 Subject: [PATCH] (core) Initial webhooks implementation Summary: See https://grist.quip.com/VKd3ASF99ezD/Outgoing-Webhooks - 2 new DocApi endpoints: _subscribe and _unsubscribe, not meant to be user friendly or publicly documented. _unsubscribe should be given the response from _subscribe in the body, e.g: ``` $ curl -X POST -H "Authorization: Bearer 8fd4dc59ecb05ab29ae5a183c03101319b8e6ca9" "http://localhost:8080/api/docs/6WYa23FqWxGNe3AR6DLjCJ/tables/Table2/_subscribe" -H "Content-type: application/json" -d '{"url": "https://webhook.site/a916b526-8afc-46e6-aa8f-a625d0d83ec3", "eventTypes": ["add"], "isReadyColumn": "C"}' {"unsubscribeKey":"3246f158-55b5-4fc7-baa5-093b75ffa86c","triggerId":2,"webhookId":"853b4bfa-9d39-4639-aa33-7d45354903c0"} $ curl -X POST -H "Authorization: Bearer 8fd4dc59ecb05ab29ae5a183c03101319b8e6ca9" "http://localhost:8080/api/docs/6WYa23FqWxGNe3AR6DLjCJ/tables/Table2/_unsubscribe" -H "Content-type: application/json" -d '{"unsubscribeKey":"3246f158-55b5-4fc7-baa5-093b75ffa86c","triggerId":2,"webhookId":"853b4bfa-9d39-4639-aa33-7d45354903c0"}' {"success":true} ``` - New DB entity Secret to hold the webhook URL and unsubscribe key - New document metatable _grist_Triggers subscribes to table changes and points to a secret to use for a webhook - New file Triggers.ts processes action summaries and uses the two new tables to send webhooks. - Also went on a bit of a diversion and made a typesafe subclass of TableData for metatables. I think this is essentially good enough for a first diff, to keep the diffs manageable and to talk about the overall structure. Future diffs can add tests and more robustness using redis etc. After this diff I can also start building the Zapier integration privately. Test Plan: Tested manually: see curl commands in summary for an example. Payloads can be seen in https://webhook.site/#!/a916b526-8afc-46e6-aa8f-a625d0d83ec3/0b9fe335-33f7-49fe-b90b-2db5ba53382d/1 . Great site for testing webhooks btw. Reviewers: dsagal, paulfitz Reviewed By: paulfitz Differential Revision: https://phab.getgrist.com/D3019 --- app/common/DocData.ts | 13 +- app/common/TableData.ts | 29 ++ app/common/schema.ts | 14 + app/gen-server/entity/Document.ts | 4 + app/gen-server/entity/Secret.ts | 16 + app/gen-server/lib/HomeDBManager.ts | 44 +++ .../migration/1631286208009-Secret.ts | 38 +++ app/server/lib/ActiveDoc.ts | 6 +- app/server/lib/DocApi.ts | 141 +++++++-- app/server/lib/Sharing.ts | 7 +- app/server/lib/Triggers.ts | 276 ++++++++++++++++++ sandbox/gen_js_schema.py | 1 + sandbox/grist/migrations.py | 12 + sandbox/grist/schema.py | 10 +- 14 files changed, 584 insertions(+), 27 deletions(-) create mode 100644 app/gen-server/entity/Secret.ts create mode 100644 app/gen-server/migration/1631286208009-Secret.ts create mode 100644 app/server/lib/Triggers.ts diff --git a/app/common/DocData.ts b/app/common/DocData.ts index 9b7da29e..96687650 100644 --- a/app/common/DocData.ts +++ b/app/common/DocData.ts @@ -3,13 +3,13 @@ * subscribes to actions which change it, and forwards those actions to individual tables. * It also provides the interface to apply actions to data. */ -import {schema} from 'app/common/schema'; +import {schema, SchemaTypes} from 'app/common/schema'; import fromPairs = require('lodash/fromPairs'); import groupBy = require('lodash/groupBy'); import {ActionDispatcher} from './ActionDispatcher'; import {BulkColValues, ColInfo, ColInfoWithId, ColValues, DocAction, RowRecord, TableDataAction} from './DocActions'; -import {ColTypeMap, TableData} from './TableData'; +import {ColTypeMap, MetaTableData, TableData} from './TableData'; type FetchTableFunc = (tableId: string) => Promise; @@ -46,7 +46,7 @@ export class DocData extends ActionDispatcher { * Creates a new TableData object. A derived class may override to return an object derived from TableData. */ public createTableData(tableId: string, tableData: TableDataAction|null, colTypes: ColTypeMap): TableData { - return new TableData(tableId, tableData, colTypes); + return new (tableId in schema ? MetaTableData : TableData)(tableId, tableData, colTypes); } /** @@ -56,6 +56,13 @@ export class DocData extends ActionDispatcher { return this._tables.get(tableId); } + /** + * Like getTable, but the result knows about the types of its records + */ + public getMetaTable(tableId: TableId): MetaTableData { + return this.getTable(tableId) as any; + } + /** * Returns an unsorted list of all tableIds in this doc, including both metadata and user tables. */ diff --git a/app/common/TableData.ts b/app/common/TableData.ts index ce9e0a7d..fcc34be5 100644 --- a/app/common/TableData.ts +++ b/app/common/TableData.ts @@ -7,6 +7,7 @@ import {ActionDispatcher} from './ActionDispatcher'; import {BulkColValues, CellValue, ColInfo, ColInfoWithId, ColValues, DocAction, isSchemaAction, ReplaceTableData, RowRecord, TableDataAction} from './DocActions'; import {arrayRemove, arraySplice} from './gutil'; +import {SchemaTypes} from "./schema"; export interface ColTypeMap { [colId: string]: string; } @@ -470,6 +471,34 @@ export class TableData extends ActionDispatcher implements SkippableRows { } } +export type MetaRowRecord = SchemaTypes[TableId] & RowRecord; + +/** + * Behaves the same as TableData, but uses SchemaTypes for type safety of its columns. + */ +export class MetaTableData extends TableData { + constructor(tableId: TableId, tableData: TableDataAction | null, colTypes: ColTypeMap) { + super(tableId, tableData, colTypes); + } + + public getRecords(): Array> { + return super.getRecords() as any; + } + + public getRecord(rowId: number): MetaRowRecord | undefined { + return super.getRecord(rowId) as any; + } + + /** + * Same as getRowPropFunc, but I couldn't get a direct override to compile. + */ + public getMetaRowPropFunc( + colId: ColId + ): ((rowId: number | "new") => SchemaTypes[TableId][ColId]) { + return super.getRowPropFunc(colId as any) as any; + } +} + function reassignArray(targetArray: T[], sourceArray: T[]): void { targetArray.length = 0; arraySplice(targetArray, 0, sourceArray); diff --git a/app/common/schema.ts b/app/common/schema.ts index aff7a8d9..dd0059c5 100644 --- a/app/common/schema.ts +++ b/app/common/schema.ts @@ -143,6 +143,13 @@ export const schema = { timeUploaded : "DateTime", }, + "_grist_Triggers": { + tableRef : "Ref:_grist_Tables", + eventTypes : "ChoiceList", + isReadyColRef : "Ref:_grist_Tables_column", + actions : "Text", + }, + "_grist_ACLRules": { resource : "Ref:_grist_ACLResources", permissions : "Int", @@ -317,6 +324,13 @@ export interface SchemaTypes { timeUploaded: number; }; + "_grist_Triggers": { + tableRef: number; + eventTypes: ['L', ...string[]]|null; + isReadyColRef: number; + actions: string; + }; + "_grist_ACLRules": { resource: number; permissions: number; diff --git a/app/gen-server/entity/Document.ts b/app/gen-server/entity/Document.ts index 661aa337..c0a1e0f3 100644 --- a/app/gen-server/entity/Document.ts +++ b/app/gen-server/entity/Document.ts @@ -6,6 +6,7 @@ import {Column, Entity, JoinColumn, ManyToOne, OneToMany, PrimaryColumn} from "t import {AclRuleDoc} from "./AclRule"; import {Alias} from "./Alias"; import {Resource} from "./Resource"; +import {Secret} from "./Secret"; import {Workspace} from "./Workspace"; // Acceptable ids for use in document urls. @@ -58,6 +59,9 @@ export class Document extends Resource { @Column({name: 'options', type: nativeValues.jsonEntityType, nullable: true}) public options: DocumentOptions | null; + @OneToMany(_type => Secret, secret => secret.doc) + public secrets: Secret[]; + public checkProperties(props: any): props is Partial { return super.checkProperties(props, documentPropertyKeys); } diff --git a/app/gen-server/entity/Secret.ts b/app/gen-server/entity/Secret.ts new file mode 100644 index 00000000..d38d7466 --- /dev/null +++ b/app/gen-server/entity/Secret.ts @@ -0,0 +1,16 @@ +import {BaseEntity, Column, Entity, JoinColumn, ManyToOne, PrimaryColumn} from "typeorm"; +import {Document} from "./Document"; + +@Entity({name: 'secrets'}) +export class Secret extends BaseEntity { + @PrimaryColumn() + public id: string; // generally a UUID + + @Column({name: 'value'}) + public value: string; + + @ManyToOne(_type => Document, { onDelete: 'CASCADE' }) + @JoinColumn({name: 'doc_id'}) + public doc: Document; + +} diff --git a/app/gen-server/lib/HomeDBManager.ts b/app/gen-server/lib/HomeDBManager.ts index 63eca33a..776ccebd 100644 --- a/app/gen-server/lib/HomeDBManager.ts +++ b/app/gen-server/lib/HomeDBManager.ts @@ -22,6 +22,7 @@ import {Login} from "app/gen-server/entity/Login"; import {AccessOption, AccessOptionWithRole, Organization} from "app/gen-server/entity/Organization"; import {Pref} from "app/gen-server/entity/Pref"; import {getDefaultProductNames, Product, starterFeatures} from "app/gen-server/entity/Product"; +import {Secret} from "app/gen-server/entity/Secret"; import {User} from "app/gen-server/entity/User"; import {Workspace} from "app/gen-server/entity/Workspace"; import {Permissions} from 'app/gen-server/lib/Permissions'; @@ -31,11 +32,13 @@ import {bitOr, getRawAndEntities, now, readJson} from 'app/gen-server/sqlUtils'; import {makeId} from 'app/server/lib/idUtils'; import * as log from 'app/server/lib/log'; import {Permit} from 'app/server/lib/Permit'; +import {WebHookSecret} from "app/server/lib/Triggers"; import {EventEmitter} from 'events'; import flatten = require('lodash/flatten'); import pick = require('lodash/pick'); import {Brackets, Connection, createConnection, DatabaseType, EntityManager, getConnection, SelectQueryBuilder, WhereExpression} from "typeorm"; +import * as uuidv4 from "uuid/v4"; // Support transactions in Sqlite in async code. This is a monkey patch, affecting // the prototypes of various TypeORM classes. @@ -1548,6 +1551,47 @@ export class HomeDBManager extends EventEmitter { }); } + public addSecret(value: string, docId: string): Promise { + return this._connection.transaction(async manager => { + const secret = new Secret(); + secret.id = uuidv4(); + secret.value = value; + secret.doc = {id: docId} as any; + await manager.save([secret]); + return secret; + }); + } + + public async getSecret(id: string, docId: string, manager?: EntityManager): Promise { + const secret = await (manager || this._connection).createQueryBuilder() + .select('secrets') + .from(Secret, 'secrets') + .where('id = :id AND doc_id = :docId', {id, docId}) + .getOne(); + return secret?.value; + } + + public async removeWebhook(id: string, docId: string, unsubscribeKey: string): Promise { + if (!(id && unsubscribeKey)) { + throw new ApiError('Bad request: id and unsubscribeKey both required', 400); + } + return await this._connection.transaction(async manager => { + const secret = await this.getSecret(id, docId, manager); + if (!secret) { + throw new ApiError('Webhook with given id not found', 404); + } + const webhook = JSON.parse(secret) as WebHookSecret; + if (webhook.unsubscribeKey !== unsubscribeKey) { + throw new ApiError('Wrong unsubscribeKey', 401); + } + await manager.createQueryBuilder() + .delete() + .from(Secret) + .where('id = :id', {id}) + .execute(); + }); + } + // Checks that the user has UPDATE permissions to the given doc. If not, throws an // error. Otherwise updates the given doc with the given name. Returns an empty // query result with status 200 on success. diff --git a/app/gen-server/migration/1631286208009-Secret.ts b/app/gen-server/migration/1631286208009-Secret.ts new file mode 100644 index 00000000..7dba92d6 --- /dev/null +++ b/app/gen-server/migration/1631286208009-Secret.ts @@ -0,0 +1,38 @@ +import {MigrationInterface, QueryRunner, Table} from "typeorm"; + +export class Secret1631286208009 implements MigrationInterface { + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.createTable(new Table({ + name: "secrets", + columns: [ + { + name: "id", + type: "varchar", + isPrimary: true + }, + { + name: "value", + type: "varchar", + }, + { + name: "doc_id", + type: "varchar", + } + ], + foreignKeys: [ + { + columnNames: ["doc_id"], + referencedColumnNames: ["id"], + referencedTableName: "docs", + onDelete: 'CASCADE' // delete secret if linked to doc that is deleted + } + ] + })); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.dropTable('secrets'); + } + +} diff --git a/app/server/lib/ActiveDoc.ts b/app/server/lib/ActiveDoc.ts index 27b753f5..5c41622e 100644 --- a/app/server/lib/ActiveDoc.ts +++ b/app/server/lib/ActiveDoc.ts @@ -438,11 +438,15 @@ export class ActiveDoc extends EventEmitter { await this._actionHistory.initialize(); this._granularAccess = new GranularAccess(this.docData, this.docClients, (query) => { return this._fetchQueryFromDB(query, false); - }, this.recoveryMode, this._docManager.getHomeDbManager(), this.docName); + }, this.recoveryMode, this.getHomeDbManager(), this.docName); await this._granularAccess.update(); this._sharing = new Sharing(this, this._actionHistory, this._modificationLock); } + public getHomeDbManager() { + return this._docManager.getHomeDbManager(); + } + /** * Adds a small table to start off a newly-created blank document. */ diff --git a/app/server/lib/DocApi.ts b/app/server/lib/DocApi.ts index 6efe42cb..8b23f8b5 100644 --- a/app/server/lib/DocApi.ts +++ b/app/server/lib/DocApi.ts @@ -1,7 +1,10 @@ import { createEmptyActionSummary } from "app/common/ActionSummary"; import { ApiError } from 'app/common/ApiError'; import { BrowserSettings } from "app/common/BrowserSettings"; -import {CellValue, fromTableDataAction, TableColValues, TableRecordValue} from 'app/common/DocActions'; +import { + CellValue, fromTableDataAction, TableColValues, TableDataAction, TableRecordValue, +} from 'app/common/DocActions'; +import {isRaisedException} from "app/common/gristTypes"; import { arrayRepeat, isAffirmative } from "app/common/gutil"; import { SortFunc } from 'app/common/SortFunc'; import { DocReplacementOptions, DocState, DocStateComparison, DocStates, NEW_DOCUMENT_CODE} from 'app/common/UserAPI'; @@ -14,8 +17,13 @@ import { DocManager } from "app/server/lib/DocManager"; import { docSessionFromRequest, makeExceptionalDocSession, OptDocSession } from "app/server/lib/DocSession"; import { DocWorker } from "app/server/lib/DocWorker"; import { IDocWorkerMap } from "app/server/lib/DocWorkerMap"; +import { parseExportParameters } from "app/server/lib/Export"; +import { downloadCSV, DownloadCSVOptions } from "app/server/lib/ExportCSV"; +import { downloadXLSX, DownloadXLSXOptions } from "app/server/lib/ExportXLSX"; import { expressWrap } from 'app/server/lib/expressWrap'; import { filterDocumentInPlace } from "app/server/lib/filterUtils"; +import { googleAuthTokenMiddleware } from "app/server/lib/GoogleAuth"; +import { exportToDrive } from "app/server/lib/GoogleExport"; import { GristServer } from 'app/server/lib/GristServer'; import { HashUtil } from 'app/server/lib/HashUtil'; import { makeForkIds } from "app/server/lib/idUtils"; @@ -23,19 +31,15 @@ import { getDocId, getDocScope, integerParam, isParameterOn, optStringParam, sendOkReply, sendReply, stringParam } from 'app/server/lib/requestUtils'; import { SandboxError } from "app/server/lib/sandboxUtil"; +import {localeFromRequest} from "app/server/lib/ServerLocale"; +import {allowedEventTypes, isUrlAllowed, WebhookAction, WebHookSecret} from "app/server/lib/Triggers"; import { handleOptionalUpload, handleUpload } from "app/server/lib/uploads"; import * as contentDisposition from 'content-disposition'; import { Application, NextFunction, Request, RequestHandler, Response } from "express"; +import * as _ from "lodash"; import fetch from 'node-fetch'; import * as path from 'path'; -import { exportToDrive } from "app/server/lib/GoogleExport"; -import { googleAuthTokenMiddleware } from "app/server/lib/GoogleAuth"; -import * as _ from "lodash"; -import {isRaisedException} from "app/common/gristTypes"; -import {localeFromRequest} from "app/server/lib/ServerLocale"; -import { downloadCSV, DownloadCSVOptions } from "app/server/lib/ExportCSV"; -import { downloadXLSX, DownloadXLSXOptions } from "app/server/lib/ExportXLSX"; -import { parseExportParameters } from "app/server/lib/Export"; +import * as uuidv4 from "uuid/v4"; // Cap on the number of requests that can be outstanding on a single document via the // rest doc api. When this limit is exceeded, incoming requests receive an immediate @@ -159,21 +163,26 @@ export class DocWorkerApi { }) ); + async function getMetaTables(activeDoc: ActiveDoc, req: RequestWithLogin) { + return await handleSandboxError("", [], + activeDoc.fetchMetaTables(docSessionFromRequest(req))); + } + + function tableIdToRef(metaTables: { [p: string]: TableDataAction }, tableId: any) { + const [, , tableRefs, tableData] = metaTables._grist_Tables; + const tableRowIndex = tableData.tableId.indexOf(tableId); + if (tableRowIndex === -1) { + throw new ApiError(`Table not found "${tableId}"`, 404); + } + return tableRefs[tableRowIndex]; + } + // Get the columns of the specified table in recordish format this._app.get('/api/docs/:docId/tables/:tableId/columns', canView, withDoc(async (activeDoc, req, res) => { - const metaTables = await handleSandboxError("", [], - activeDoc.fetchMetaTables(docSessionFromRequest(req))); - - const [, , tableRefs, tableData] = metaTables["_grist_Tables"]; - const [, , colRefs, columnData] = metaTables["_grist_Tables_column"]; - - const tableId = req.params.tableId; - const tableRowIndex = tableData.tableId.indexOf(tableId); - if (tableRowIndex === -1) { - throw new ApiError(`Table not found "${tableId}"`, 404); - } - const tableRef = tableRefs[tableRowIndex]; + const metaTables = await getMetaTables(activeDoc, req); + const tableRef = tableIdToRef(metaTables, req.params.tableId); + const [, , colRefs, columnData] = metaTables._grist_Tables_column; // colId is pulled out of fields and used as the root id const fieldNames = _.without(Object.keys(columnData), "colId"); @@ -364,6 +373,96 @@ export class DocWorkerApi { }) ); + // Add a new webhook and trigger + this._app.post('/api/docs/:docId/tables/:tableId/_subscribe', isOwner, + withDoc(async (activeDoc, req, res) => { + const {isReadyColumn, eventTypes, url} = req.body; + + if (!(Array.isArray(eventTypes) && eventTypes.length)) { + throw new ApiError(`eventTypes must be a non-empty array`, 400); + } + if (!eventTypes.every(allowedEventTypes.guard)) { + throw new ApiError(`Allowed values in eventTypes are: ${allowedEventTypes.values}`, 400); + } + if (!url) { + throw new ApiError('Bad request: url required', 400); + } + if (!isUrlAllowed(url)) { + throw new ApiError('Provided url is forbidden', 403); + } + + const unsubscribeKey = uuidv4(); + const webhook: WebHookSecret = {unsubscribeKey, url}; + const secretValue = JSON.stringify(webhook); + const webhookId = (await this._dbManager.addSecret(secretValue, activeDoc.docName)).id; + + const metaTables = await getMetaTables(activeDoc, req); + const tableRef = tableIdToRef(metaTables, req.params.tableId); + + let isReadyColRef = 0; + if (isReadyColumn) { + const [, , colRefs, columnData] = metaTables._grist_Tables_column; + const colRowIndex = columnData.colId.indexOf(isReadyColumn); + if (colRowIndex === -1) { + throw new ApiError(`Column not found "${isReadyColumn}"`, 404); + } + isReadyColRef = colRefs[colRowIndex]; + } + + const webhookAction: WebhookAction = {type: "webhook", id: webhookId}; + + const sandboxRes = await handleSandboxError("_grist_Triggers", [], activeDoc.applyUserActions( + docSessionFromRequest(req), + [['AddRecord', "_grist_Triggers", null, { + tableRef, + isReadyColRef, + eventTypes: ["L", ...eventTypes], + actions: JSON.stringify([webhookAction]) + }]])); + + res.json({ + unsubscribeKey, + triggerId: sandboxRes.retValues[0], + webhookId, + }); + }) + ); + + // Remove webhook and trigger created above + this._app.post('/api/docs/:docId/tables/:tableId/_unsubscribe', canEdit, + withDoc(async (activeDoc, req, res) => { + const metaTables = await getMetaTables(activeDoc, req); + const tableRef = tableIdToRef(metaTables, req.params.tableId); + const {triggerId, unsubscribeKey, webhookId} = req.body; + + // Validate combination of triggerId, webhookId, and tableRef. + // This is overly strict, webhookId should be enough, + // but it should be easy to relax that later if we want. + const [, , triggerRowIds, triggerColData] = metaTables._grist_Triggers; + const triggerRowIndex = triggerRowIds.indexOf(triggerId); + if (triggerRowIndex === -1) { + throw new ApiError(`Trigger not found "${triggerId}"`, 404); + } + if (triggerColData.tableRef[triggerRowIndex] !== tableRef) { + throw new ApiError(`Wrong table`, 400); + } + const actions = JSON.parse(triggerColData.actions[triggerRowIndex] as string); + if (!_.find(actions, {type: "webhook", id: webhookId})) { + throw new ApiError(`Webhook not found "${webhookId}"`, 404); + } + + // Validate unsubscribeKey before deleting trigger from document + await this._dbManager.removeWebhook(webhookId, activeDoc.docName, unsubscribeKey); + + // TODO handle trigger containing other actions when that becomes possible + await handleSandboxError("_grist_Triggers", [], activeDoc.applyUserActions( + docSessionFromRequest(req), + [['RemoveRecord', "_grist_Triggers", triggerId]])); + + res.json({success: true}); + }) + ); + // Reload a document forcibly (in fact this closes the doc, it will be automatically // reopened on use). this._app.post('/api/docs/:docId/force-reload', canEdit, throttled(async (req, res) => { diff --git a/app/server/lib/Sharing.ts b/app/server/lib/Sharing.ts index d92b3f7b..c716adea 100644 --- a/app/server/lib/Sharing.ts +++ b/app/server/lib/Sharing.ts @@ -15,8 +15,10 @@ import * as assert from 'assert'; import {Mutex} from 'async-mutex'; import * as Deque from 'double-ended-queue'; import {ActionHistory, asActionGroup} from './ActionHistory'; +import {summarizeAction} from "./ActionSummary"; import {ActiveDoc} from './ActiveDoc'; import {makeExceptionalDocSession, OptDocSession} from './DocSession'; +import {TriggersHandler} from "./Triggers"; import {WorkCoordinator} from './WorkCoordinator'; // Describes the request to apply a UserActionBundle. It includes a Client (so that broadcast @@ -279,15 +281,18 @@ export class Sharing { } await this._activeDoc.processActionBundle(ownActionBundle); + const actionSummary = summarizeAction(localActionBundle); + new TriggersHandler(this._activeDoc).handle(actionSummary); + // Broadcast the action to connected browsers. const actionGroup = asActionGroup(this._actionHistory, localActionBundle, { client, retValues: sandboxActionBundle.retValues, - summarize: true, // Mark the on-open Calculate action as internal. In future, synchronizing fields to today's // date and other changes from external values may count as internal. internal: isCalculate, }); + actionGroup.actionSummary = actionSummary; await accessControl.appliedBundle(); await accessControl.sendDocUpdateForBundle(actionGroup); if (docSession) { diff --git a/app/server/lib/Triggers.ts b/app/server/lib/Triggers.ts new file mode 100644 index 00000000..87609c51 --- /dev/null +++ b/app/server/lib/Triggers.ts @@ -0,0 +1,276 @@ +import {ActionSummary, TableDelta} from "app/common/ActionSummary"; +import {delay} from "app/common/delay"; +import {fromTableDataAction, TableColValues} from "app/common/DocActions"; +import {StringUnion} from "app/common/StringUnion"; +import {MetaRowRecord} from "app/common/TableData"; +import {CellValue} from "app/plugin/GristData"; +import {ActiveDoc} from "app/server/lib/ActiveDoc"; +import {makeExceptionalDocSession} from "app/server/lib/DocSession"; +import * as log from "app/server/lib/log"; +import * as _ from "lodash"; +import * as LRUCache from "lru-cache"; +import fetch from "node-fetch"; + +// TODO replace with redis +// Keeps track of whether records existed before changes to them started +// to determine the correct event type when the record is ready +const existedBeforeMemory: { [key: string]: boolean } = {}; + +// Only owners can manage triggers, but any user's activity can trigger them +// and the corresponding actions get the full values +const docSession = makeExceptionalDocSession('system'); + +// DB cache for webhook secrets +const webhookCache = new LRUCache<{ id: string, docId: string }, WebHookSecret>({max: 10 * 1000}); + +// Describes the change in existence to a record, which determines the event type +interface RecordDelta { + existedBefore: boolean; + existedAfter: boolean; +} + +// Union discriminated by type +type TriggerAction = WebhookAction | PythonAction; + +export interface WebhookAction { + type: "webhook"; + id: string; +} + +// Just hypothetical +interface PythonAction { + type: "python"; + code: string; +} + +// Payload sent to webhook +// Simply the values in a record +interface Event { + [colId: string]: CellValue; +} + +export const allowedEventTypes = StringUnion("add", "update"); + +type EventType = typeof allowedEventTypes.type; + +type Trigger = MetaRowRecord<"_grist_Triggers">; + +export interface WebHookSecret { + url: string; + unsubscribeKey: string; +} + +// Processes triggers for records changed as described in an ActionSummary, +// initiating webhooks and automations. +// An instance of this class should have .handle() called on it exactly once. +export class TriggersHandler { + // Converts a column ref to colId by looking it up in _grist_Tables_column + private _getColId: (rowId: (number | "new")) => string; + + constructor(private _activeDoc: ActiveDoc) { + } + + public handle(summary: ActionSummary) { + const docData = this._activeDoc.docData; + if (!docData) { + return; + } // Happens on doc creation while processing InitNewDoc action. + + const triggersTable = docData.getMetaTable("_grist_Triggers"); + const getTableId = docData.getMetaTable("_grist_Tables").getMetaRowPropFunc("tableId"); + this._getColId = docData.getMetaTable("_grist_Tables_column").getMetaRowPropFunc("colId"); + + const triggersByTableRef = _.groupBy(triggersTable.getRecords(), "tableRef"); + for (const [tableRef, triggers] of _.toPairs(triggersByTableRef)) { + const tableId = getTableId(Number(tableRef)); // groupBy makes tableRef a string + const tableDelta = summary.tableDeltas[tableId]; + if (!tableDelta) { + continue; // this table was not modified by these actions + } + // Handle tables in parallel (fetching table values from document DB) + this._handleTableTriggers( + tableId, tableDelta, triggers + ).catch(() => log.error("Error handling triggers")); + } + } + + private async _handleTableTriggers( + tableId: string, delta: TableDelta, triggers: Trigger[], + ) { + const recordDeltas = new Map(); + delta.updateRows.forEach(id => + recordDeltas.set(id, {existedBefore: true, existedAfter: true})); + // A row ID can appear in both updateRows and addRows, although it probably shouldn't + // Added row IDs override updated rows because they didn't exist before + delta.addRows.forEach(id => + recordDeltas.set(id, {existedBefore: false, existedAfter: true})); + + // If we allow subscribing to deletion in the future + // delta.removeRows.forEach(id => + // recordDeltas.set(id, {existedBefore: true, existedAfter: false})); + + // Fetch the modified records in full so they can be sent in webhooks + // They will also be used to check if the record is ready + const filters = {id: [...recordDeltas.keys()]}; + const bulkColValues = fromTableDataAction(await this._activeDoc.fetchQuery(docSession, {tableId, filters})); + + triggers.forEach(trigger => { + const actions = JSON.parse(trigger.actions) as TriggerAction[]; + bulkColValues.id.forEach((rowId, rowIndex) => { + // Handle triggers in parallel (talking to redis) + this._handleTrigger( + trigger, actions, bulkColValues, rowIndex, rowId, recordDeltas.get(rowId)! + ).catch(() => log.error("Error handling trigger action")); + }); + }); + } + + // Handles a single trigger for a single record, initiating all the corresponding actions + private async _handleTrigger( + trigger: Trigger, actions: TriggerAction[], + bulkColValues: TableColValues, rowIndex: number, rowId: number, recordDelta: RecordDelta, + ) { + let isReady: boolean; + if (!trigger.isReadyColRef) { + // User hasn't configured a column, so all records are considered ready immediately + isReady = true; + } else { + const colId = this._getColId(trigger.isReadyColRef)!; + const isReadyCellValue = bulkColValues[colId]?.[rowIndex]; + if (typeof isReadyCellValue !== "boolean") { + // Likely causes: column not found or error in formula + isReady = false; + } else { + isReady = isReadyCellValue; + } + } + + // Globally unique identifier of this record and trigger combination + // trigger.tableRef is probably redundant given trigger.id, just being cautious + const existedBeforeKey = `${this._activeDoc.docName}:${trigger.id}:${trigger.tableRef}:${rowId}`; + + // Only store existedBefore if it isn't stored already + const existedBefore = existedBeforeKey in existedBeforeMemory ? + existedBeforeMemory[existedBeforeKey] : recordDelta.existedBefore; + + if (!isReady) { + existedBeforeMemory[existedBeforeKey] = existedBefore; + return; + } + + // Now that the record is ready, clear the stored existedBefore value + // so that future events for this record are accurate + delete existedBeforeMemory[existedBeforeKey]; + + let eventType: EventType; + if (existedBefore) { + eventType = "update"; + // If we allow subscribing to deletion in the future + // if (recordDelta.existedAfter) { + // eventType = "update"; + // } else { + // eventType = "remove"; + // } + } else { + eventType = "add"; + } + if (!trigger.eventTypes!.includes(eventType)) { + // The user hasn't subscribed to the type of change that happened + return; + } + + // All the values in this record + const event = _.mapValues(bulkColValues, col => col[rowIndex]); + + actions.forEach(action => { + // Handle actions in parallel + this._handleTriggerAction( + action, event + ).catch(() => log.error("Error handling trigger action")); + }); + } + + private async _handleTriggerAction(action: TriggerAction, event: Event) { + // TODO use event queue for reliability + if (action.type === "webhook") { + const key = {id: action.id, docId: this._activeDoc.docName}; + let webhook = webhookCache.get(key); + if (!webhook) { + const secret = await this._activeDoc.getHomeDbManager()?.getSecret(key.id, key.docId); + if (!secret) { + return; + } + webhook = JSON.parse(secret); + webhookCache.set(key, webhook!); + } + const url = webhook!.url; + if (!isUrlAllowed(url)) { + log.warn(`Webhook not sent to forbidden URL: ${url}`); + return; + } + pendingEvents.push({url: webhook!.url, event}); + if (!startedSending) { + startedSending = true; + setInterval(sendPendingEvents, 2000); + } + } else { + throw new Error("Unknown action type " + action.type); + } + } +} + +let pendingEvents: Array<{ url: string, event: Event }> = []; +let startedSending = false; + +function sendPendingEvents() { + const pending = pendingEvents; + pendingEvents = []; + for (const [url, group] of _.toPairs(_.groupBy(pending, "url"))) { + const body = JSON.stringify(_.map(group, "event").reverse()); + sendWebhookWithRetries(url, body).catch(() => log.error("Webhook failed!")); + } +} + +async function sendWebhookWithRetries(url: string, body: string) { + const maxAttempts = 20; + const maxWait = 64; + let wait = 1; + for (let i = 0; i < maxAttempts; i++) { + const response = await fetch(url, { + method: 'POST', + body, + headers: { + 'Content-Type': 'application/json', + }, + }); + if (response.status === 200) { + return; + } else { + await delay((wait + Math.random()) * 1000); + if (wait < maxWait) { + wait *= 2; + } + } + } + throw new Error("Webhook failed!"); +} + + +export function isUrlAllowed(urlString: string) { + let url: URL; + try { + url = new URL(urlString); + } catch (e) { + return false; + } + + // http (no s) is only allowed for localhost for testing. + // localhost still needs to be explicitly permitted, and it shouldn't be outside dev + if (url.protocol !== "https:" && url.hostname !== "localhost") { + return false; + } + + return (process.env.ALLOWED_WEBHOOK_DOMAINS || "").split(",").some(domain => + domain && url.host.endsWith(domain) + ); +} diff --git a/sandbox/gen_js_schema.py b/sandbox/gen_js_schema.py index 8c765c60..7a6094b8 100644 --- a/sandbox/gen_js_schema.py +++ b/sandbox/gen_js_schema.py @@ -13,6 +13,7 @@ _ts_types = { "PositionNumber": "number", "Ref": "number", "RefList": "['L', ...number[]]|null", # Non-primitive values are encoded + "ChoiceList": "['L', ...string[]]|null", "Text": "string", } diff --git a/sandbox/grist/migrations.py b/sandbox/grist/migrations.py index 3b54ec0f..fbbef779 100644 --- a/sandbox/grist/migrations.py +++ b/sandbox/grist/migrations.py @@ -795,3 +795,15 @@ def migration23(tdset): add_column('_grist_DocInfo', 'documentSettings', 'Text'), actions.UpdateRecord('_grist_DocInfo', 1, {'documentSettings': '{"locale":"en-US"}'}) ]) + + +@migration(schema_version=24) +def migration24(tdset): + return tdset.apply_doc_actions([ + actions.AddTable('_grist_Triggers', [ + schema.make_column("tableRef", "Ref:_grist_Tables"), + schema.make_column("eventTypes", "ChoiceList"), + schema.make_column("isReadyColRef", "Ref:_grist_Tables_column"), + schema.make_column("actions", "Text"), # JSON + ]), + ]) diff --git a/sandbox/grist/schema.py b/sandbox/grist/schema.py index 9e19974c..e904490c 100644 --- a/sandbox/grist/schema.py +++ b/sandbox/grist/schema.py @@ -15,7 +15,7 @@ import six import actions -SCHEMA_VERSION = 23 +SCHEMA_VERSION = 24 def make_column(col_id, col_type, formula='', isFormula=False): return { @@ -236,6 +236,14 @@ def schema_create_actions(): ]), + # Triggers subscribing to changes in tables + actions.AddTable("_grist_Triggers", [ + make_column("tableRef", "Ref:_grist_Tables"), + make_column("eventTypes", "ChoiceList"), + make_column("isReadyColRef", "Ref:_grist_Tables_column"), + make_column("actions", "Text"), # JSON + ]), + # All of the ACL rules. actions.AddTable('_grist_ACLRules', [ make_column('resource', 'Ref:_grist_ACLResources'),