From e09e9190167e05312d131d1cc2db103f38afbef2 Mon Sep 17 00:00:00 2001 From: Alex Hall Date: Wed, 10 Nov 2021 21:14:23 +0200 Subject: [PATCH] (core) Ensure that large changes are processed in full by triggers (for webhooks) Summary: Uses the new alwaysPreserveColIds option for action summaries in Triggers.ts. Triggers.ts is now responsible for generating the summary to make it easy to pass this option. The value of the option is just all colIds mentioned in triggers configured in this document. Test Plan: Tested adding 200 rows to a subscribed table to ensure the events are not truncated. Also tests batching nicely. Reviewers: paulfitz Reviewed By: paulfitz Differential Revision: https://phab.getgrist.com/D3135 --- app/server/lib/ActiveDoc.ts | 4 +-- app/server/lib/Sharing.ts | 6 ++-- app/server/lib/Triggers.ts | 66 ++++++++++++++++++++++++++----------- 3 files changed, 50 insertions(+), 26 deletions(-) diff --git a/app/server/lib/ActiveDoc.ts b/app/server/lib/ActiveDoc.ts index 89f4470c..24c38771 100644 --- a/app/server/lib/ActiveDoc.ts +++ b/app/server/lib/ActiveDoc.ts @@ -285,8 +285,8 @@ export class ActiveDoc extends EventEmitter { return this._actionHistory; } - public handleTriggers(summary: ActionSummary): Promise { - return this._triggers.handle(summary); + public handleTriggers(localActionBundle: LocalActionBundle): Promise { + return this._triggers.handle(localActionBundle); } /** diff --git a/app/server/lib/Sharing.ts b/app/server/lib/Sharing.ts index 4d083681..6350e28a 100644 --- a/app/server/lib/Sharing.ts +++ b/app/server/lib/Sharing.ts @@ -14,8 +14,7 @@ import {shortDesc} from 'app/server/lib/shortDesc'; import * as assert from 'assert'; import {Mutex} from 'async-mutex'; import * as Deque from 'double-ended-queue'; -import { ActionHistory, asActionGroup, getActionUndoInfo} from './ActionHistory'; -import {summarizeAction} from "./ActionSummary"; +import {ActionHistory, asActionGroup, getActionUndoInfo} from './ActionHistory'; import {ActiveDoc} from './ActiveDoc'; import {makeExceptionalDocSession, OptDocSession} from './DocSession'; import {WorkCoordinator} from './WorkCoordinator'; @@ -299,8 +298,7 @@ export class Sharing { } await this._activeDoc.processActionBundle(ownActionBundle); - const actionSummary = summarizeAction(localActionBundle); - await this._activeDoc.handleTriggers(actionSummary); + const actionSummary = await this._activeDoc.handleTriggers(localActionBundle); // Broadcast the action to connected browsers. const actionGroup = asActionGroup(this._actionHistory, localActionBundle, { diff --git a/app/server/lib/Triggers.ts b/app/server/lib/Triggers.ts index c8ac18f6..2a4bf3ff 100644 --- a/app/server/lib/Triggers.ts +++ b/app/server/lib/Triggers.ts @@ -1,17 +1,19 @@ -import { ActionSummary, TableDelta } from 'app/common/ActionSummary'; -import { delay } from 'app/common/delay'; -import { fromTableDataAction, RowRecord, TableColValues, TableDataAction } from 'app/common/DocActions'; -import { StringUnion } from 'app/common/StringUnion'; -import { MetaRowRecord } from 'app/common/TableData'; -import { CellDelta } from 'app/common/TabularDiff'; -import { ActiveDoc } from 'app/server/lib/ActiveDoc'; -import { makeExceptionalDocSession } from 'app/server/lib/DocSession'; +import {LocalActionBundle} from 'app/common/ActionBundle'; +import {ActionSummary, TableDelta} from 'app/common/ActionSummary'; +import {delay} from 'app/common/delay'; +import {fromTableDataAction, RowRecord, TableColValues, TableDataAction} from 'app/common/DocActions'; +import {StringUnion} from 'app/common/StringUnion'; +import {MetaRowRecord} from 'app/common/TableData'; +import {CellDelta} from 'app/common/TabularDiff'; +import {summarizeAction} from 'app/server/lib/ActionSummary'; +import {ActiveDoc} from 'app/server/lib/ActiveDoc'; +import {makeExceptionalDocSession} from 'app/server/lib/DocSession'; import * as log from 'app/server/lib/log'; -import { promisifyAll } from 'bluebird'; +import {promisifyAll} from 'bluebird'; import * as _ from 'lodash'; import * as LRUCache from 'lru-cache'; import fetch from 'node-fetch'; -import { createClient, Multi, RedisClient } from 'redis'; +import {createClient, Multi, RedisClient} from 'redis'; promisifyAll(RedisClient.prototype); @@ -68,7 +70,7 @@ interface Task { const MAX_QUEUE_SIZE = 1000; -// Processes triggers for records changed as described in ActionSummary objects, +// Processes triggers for records changed as described in action bundles. // initiating webhooks and automations. // The interesting stuff starts in the handle() method. // Webhooks are placed on an event queue in memory which is replicated on redis as backup. @@ -122,12 +124,18 @@ export class DocTriggers { // Called after applying actions to a document and updating its data. // Checks for triggers configured in a meta table, // and whether any of those triggers monitor tables which were modified by the actions - // described in the given summary. + // in the given bundle. // If so, generates events which are pushed to the local and redis queues. - public async handle(summary: ActionSummary) { + // + // Returns an ActionSummary generated from the given LocalActionBundle. + // + // Generating the summary here makes it easy to specify which columns need to + // have all their changes included in the summary without truncation + // so that we can accurately identify which records are ready for sending. + public async handle(localActionBundle: LocalActionBundle): Promise { const docData = this._activeDoc.docData; if (!docData) { - return; + return summarizeAction(localActionBundle); } // Happens on doc creation while processing InitNewDoc action. const triggersTable = docData.getMetaTable("_grist_Triggers"); @@ -135,14 +143,31 @@ export class DocTriggers { this._getColId = docData.getMetaTable("_grist_Tables_column").getMetaRowPropFunc("colId"); const triggersByTableRef = _.groupBy(triggersTable.getRecords(), "tableRef"); + const triggersByTableId: Array<[string, Trigger[]]> = []; + + // First we need a list of columns which must be included in full in the action summary + const isReadyColIds: string[] = []; + for (const tableRef of Object.keys(triggersByTableRef).sort()) { + const triggers = triggersByTableRef[tableRef]; + const tableId = getTableId(Number(tableRef))!; // groupBy makes tableRef a string + triggersByTableId.push([tableId, triggers]); + for (const trigger of triggers) { + if (trigger.isReadyColRef) { + const colId = this._getColId(trigger.isReadyColRef); + if (colId) { + isReadyColIds.push(colId); + } + } + } + } + + const summary = summarizeAction(localActionBundle, {alwaysPreserveColIds: isReadyColIds}); // Work to do after fetching values from the document const tasks: Task[] = []; // For each table in the document which is monitored by one or more triggers... - for (const tableRef of Object.keys(triggersByTableRef).sort()) { - const triggers = triggersByTableRef[tableRef]; - const tableId = getTableId(Number(tableRef))!; // groupBy makes tableRef a string + for (const [tableId, triggers] of triggersByTableId) { const tableDelta = summary.tableDeltas[tableId]; // ...if the monitored table was modified by the summarized actions, // fetch the modified/created records and note the work that needs to be done. @@ -165,7 +190,7 @@ export class DocTriggers { events.push(...this._handleTask(task, await task.tableDataAction)); } if (!events.length) { - return; + return summary; } // Only add events to the queue after we finish fetching the backup from redis @@ -184,6 +209,8 @@ export class DocTriggers { while (this._drainingQueue && !this._shuttingDown) { await delay(1000); } + + return summary; } private get _docId() { @@ -296,8 +323,7 @@ export class DocTriggers { readyBefore = false; } else if (!cellDelta ) { // Cell wasn't changed, and the record is ready now, so it was ready before. - // This assumes that the ActionSummary contains all changes to the isReady column. - // TODO ensure ActionSummary actually contains all changes, right now bulk changes are truncated. + // This requires that the ActionSummary contains all changes to the isReady column. readyBefore = true; } else { const deltaBefore = cellDelta[0];