(core) Ensure that large changes are processed in full by triggers (for webhooks)

Summary:
Uses the new alwaysPreserveColIds option for action summaries in Triggers.ts.

Triggers.ts is now responsible for generating the summary to make it easy to pass this option. The value of the option is just all colIds mentioned in triggers configured in this document.

Test Plan: Tested adding 200 rows to a subscribed table to ensure the events are not truncated. Also tests batching nicely.

Reviewers: paulfitz

Reviewed By: paulfitz

Differential Revision: https://phab.getgrist.com/D3135
pull/115/head
Alex Hall 3 years ago
parent 302202b4fb
commit e09e919016

@ -285,8 +285,8 @@ export class ActiveDoc extends EventEmitter {
return this._actionHistory;
}
public handleTriggers(summary: ActionSummary): Promise<void> {
return this._triggers.handle(summary);
public handleTriggers(localActionBundle: LocalActionBundle): Promise<ActionSummary> {
return this._triggers.handle(localActionBundle);
}
/**

@ -14,8 +14,7 @@ import {shortDesc} from 'app/server/lib/shortDesc';
import * as assert from 'assert';
import {Mutex} from 'async-mutex';
import * as Deque from 'double-ended-queue';
import { ActionHistory, asActionGroup, getActionUndoInfo} from './ActionHistory';
import {summarizeAction} from "./ActionSummary";
import {ActionHistory, asActionGroup, getActionUndoInfo} from './ActionHistory';
import {ActiveDoc} from './ActiveDoc';
import {makeExceptionalDocSession, OptDocSession} from './DocSession';
import {WorkCoordinator} from './WorkCoordinator';
@ -299,8 +298,7 @@ export class Sharing {
}
await this._activeDoc.processActionBundle(ownActionBundle);
const actionSummary = summarizeAction(localActionBundle);
await this._activeDoc.handleTriggers(actionSummary);
const actionSummary = await this._activeDoc.handleTriggers(localActionBundle);
// Broadcast the action to connected browsers.
const actionGroup = asActionGroup(this._actionHistory, localActionBundle, {

@ -1,17 +1,19 @@
import { ActionSummary, TableDelta } from 'app/common/ActionSummary';
import { delay } from 'app/common/delay';
import { fromTableDataAction, RowRecord, TableColValues, TableDataAction } from 'app/common/DocActions';
import { StringUnion } from 'app/common/StringUnion';
import { MetaRowRecord } from 'app/common/TableData';
import { CellDelta } from 'app/common/TabularDiff';
import { ActiveDoc } from 'app/server/lib/ActiveDoc';
import { makeExceptionalDocSession } from 'app/server/lib/DocSession';
import {LocalActionBundle} from 'app/common/ActionBundle';
import {ActionSummary, TableDelta} from 'app/common/ActionSummary';
import {delay} from 'app/common/delay';
import {fromTableDataAction, RowRecord, TableColValues, TableDataAction} from 'app/common/DocActions';
import {StringUnion} from 'app/common/StringUnion';
import {MetaRowRecord} from 'app/common/TableData';
import {CellDelta} from 'app/common/TabularDiff';
import {summarizeAction} from 'app/server/lib/ActionSummary';
import {ActiveDoc} from 'app/server/lib/ActiveDoc';
import {makeExceptionalDocSession} from 'app/server/lib/DocSession';
import * as log from 'app/server/lib/log';
import { promisifyAll } from 'bluebird';
import {promisifyAll} from 'bluebird';
import * as _ from 'lodash';
import * as LRUCache from 'lru-cache';
import fetch from 'node-fetch';
import { createClient, Multi, RedisClient } from 'redis';
import {createClient, Multi, RedisClient} from 'redis';
promisifyAll(RedisClient.prototype);
@ -68,7 +70,7 @@ interface Task {
const MAX_QUEUE_SIZE = 1000;
// Processes triggers for records changed as described in ActionSummary objects,
// Processes triggers for records changed as described in action bundles.
// initiating webhooks and automations.
// The interesting stuff starts in the handle() method.
// Webhooks are placed on an event queue in memory which is replicated on redis as backup.
@ -122,12 +124,18 @@ export class DocTriggers {
// Called after applying actions to a document and updating its data.
// Checks for triggers configured in a meta table,
// and whether any of those triggers monitor tables which were modified by the actions
// described in the given summary.
// in the given bundle.
// If so, generates events which are pushed to the local and redis queues.
public async handle(summary: ActionSummary) {
//
// Returns an ActionSummary generated from the given LocalActionBundle.
//
// Generating the summary here makes it easy to specify which columns need to
// have all their changes included in the summary without truncation
// so that we can accurately identify which records are ready for sending.
public async handle(localActionBundle: LocalActionBundle): Promise<ActionSummary> {
const docData = this._activeDoc.docData;
if (!docData) {
return;
return summarizeAction(localActionBundle);
} // Happens on doc creation while processing InitNewDoc action.
const triggersTable = docData.getMetaTable("_grist_Triggers");
@ -135,14 +143,31 @@ export class DocTriggers {
this._getColId = docData.getMetaTable("_grist_Tables_column").getMetaRowPropFunc("colId");
const triggersByTableRef = _.groupBy(triggersTable.getRecords(), "tableRef");
const triggersByTableId: Array<[string, Trigger[]]> = [];
// First we need a list of columns which must be included in full in the action summary
const isReadyColIds: string[] = [];
for (const tableRef of Object.keys(triggersByTableRef).sort()) {
const triggers = triggersByTableRef[tableRef];
const tableId = getTableId(Number(tableRef))!; // groupBy makes tableRef a string
triggersByTableId.push([tableId, triggers]);
for (const trigger of triggers) {
if (trigger.isReadyColRef) {
const colId = this._getColId(trigger.isReadyColRef);
if (colId) {
isReadyColIds.push(colId);
}
}
}
}
const summary = summarizeAction(localActionBundle, {alwaysPreserveColIds: isReadyColIds});
// Work to do after fetching values from the document
const tasks: Task[] = [];
// For each table in the document which is monitored by one or more triggers...
for (const tableRef of Object.keys(triggersByTableRef).sort()) {
const triggers = triggersByTableRef[tableRef];
const tableId = getTableId(Number(tableRef))!; // groupBy makes tableRef a string
for (const [tableId, triggers] of triggersByTableId) {
const tableDelta = summary.tableDeltas[tableId];
// ...if the monitored table was modified by the summarized actions,
// fetch the modified/created records and note the work that needs to be done.
@ -165,7 +190,7 @@ export class DocTriggers {
events.push(...this._handleTask(task, await task.tableDataAction));
}
if (!events.length) {
return;
return summary;
}
// Only add events to the queue after we finish fetching the backup from redis
@ -184,6 +209,8 @@ export class DocTriggers {
while (this._drainingQueue && !this._shuttingDown) {
await delay(1000);
}
return summary;
}
private get _docId() {
@ -296,8 +323,7 @@ export class DocTriggers {
readyBefore = false;
} else if (!cellDelta ) {
// Cell wasn't changed, and the record is ready now, so it was ready before.
// This assumes that the ActionSummary contains all changes to the isReady column.
// TODO ensure ActionSummary actually contains all changes, right now bulk changes are truncated.
// This requires that the ActionSummary contains all changes to the isReady column.
readyBefore = true;
} else {
const deltaBefore = cellDelta[0];

Loading…
Cancel
Save