mirror of
				https://github.com/gristlabs/grist-core.git
				synced 2025-06-13 20:53:59 +00:00 
			
		
		
		
	(core) Starting to make webhooks more robust
Summary: - Puts events on a queue in memory and ensures they are sent in the order they were generated. - Makes the caller (Sharing.ts) wait until changed records have been fetched from the DB, but allows it to continue after while remaining work happens asynchronously. - Gathers all new webhook events into an array so they can be backed up to the queue on redis in a single command (in a future diff). - Uses changes in isReady to determine event type, no more 'existed before' The structure of the code has changed a lot, so I think the scope of the diff needs to stop here. Lots of work is still deferred in TODOs. Test Plan: Updated existing test. Actually dropped testing of retry on failures and slowness because it no longer made sense to keep that as part of the current test, so a new test for that will be added in a future diff. Reviewers: paulfitz Reviewed By: paulfitz Differential Revision: https://phab.getgrist.com/D3074
This commit is contained in:
		
							parent
							
								
									e3801a5eb9
								
							
						
					
					
						commit
						276adc5f51
					
				@ -4,6 +4,8 @@
 | 
			
		||||
 * change events.
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
import { ActionSummary } from "app/common/ActionSummary";
 | 
			
		||||
import { DocTriggers } from "app/server/lib/Triggers";
 | 
			
		||||
import * as assert from 'assert';
 | 
			
		||||
import {Mutex} from 'async-mutex';
 | 
			
		||||
import * as bluebird from 'bluebird';
 | 
			
		||||
@ -138,6 +140,7 @@ export class ActiveDoc extends EventEmitter {
 | 
			
		||||
  // result).
 | 
			
		||||
  protected _modificationLock: Mutex = new Mutex();
 | 
			
		||||
 | 
			
		||||
  private _triggers: DocTriggers;
 | 
			
		||||
  private _dataEngine: Promise<ISandbox>|undefined;
 | 
			
		||||
  private _activeDocImport: ActiveDocImport;
 | 
			
		||||
  private _onDemandActions: OnDemandActions;
 | 
			
		||||
@ -168,6 +171,7 @@ export class ActiveDoc extends EventEmitter {
 | 
			
		||||
    this._docName = docName;
 | 
			
		||||
    this.docStorage = new DocStorage(docManager.storageManager, docName);
 | 
			
		||||
    this.docClients = new DocClients(this);
 | 
			
		||||
    this._triggers = new DocTriggers(this);
 | 
			
		||||
    this._actionHistory = new ActionHistoryImpl(this.docStorage);
 | 
			
		||||
    this.docPluginManager = new DocPluginManager(docManager.pluginManager.getPlugins(),
 | 
			
		||||
      docManager.pluginManager.appRoot!, this, this._docManager.gristServer);
 | 
			
		||||
@ -288,6 +292,10 @@ export class ActiveDoc extends EventEmitter {
 | 
			
		||||
    return this._actionHistory;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public handleTriggers(summary: ActionSummary): Promise<void> {
 | 
			
		||||
    return this._triggers.handle(summary);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * Adds a client of this doc to the list of connected clients.
 | 
			
		||||
   * @param client: The client object maintaining the websocket connection.
 | 
			
		||||
@ -319,6 +327,8 @@ export class ActiveDoc extends EventEmitter {
 | 
			
		||||
      this.docClients.removeAllClients();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    this._triggers.shutdown();
 | 
			
		||||
 | 
			
		||||
    // Clear the MapWithTTL to remove all timers from the event loop.
 | 
			
		||||
    this._fetchCache.clear();
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -18,7 +18,6 @@ import { ActionHistory, asActionGroup, getActionUndoInfo} from './ActionHistory'
 | 
			
		||||
import {summarizeAction} from "./ActionSummary";
 | 
			
		||||
import {ActiveDoc} from './ActiveDoc';
 | 
			
		||||
import {makeExceptionalDocSession, OptDocSession} from './DocSession';
 | 
			
		||||
import {TriggersHandler} from "./Triggers";
 | 
			
		||||
import {WorkCoordinator} from './WorkCoordinator';
 | 
			
		||||
 | 
			
		||||
// Describes the request to apply a UserActionBundle. It includes a Client (so that broadcast
 | 
			
		||||
@ -284,7 +283,7 @@ export class Sharing {
 | 
			
		||||
      await this._activeDoc.processActionBundle(ownActionBundle);
 | 
			
		||||
 | 
			
		||||
      const actionSummary = summarizeAction(localActionBundle);
 | 
			
		||||
      new TriggersHandler(this._activeDoc).handle(actionSummary);
 | 
			
		||||
      await this._activeDoc.handleTriggers(actionSummary);
 | 
			
		||||
 | 
			
		||||
      // Broadcast the action to connected browsers.
 | 
			
		||||
      const actionGroup = asActionGroup(this._actionHistory, localActionBundle, {
 | 
			
		||||
 | 
			
		||||
@ -1,34 +1,32 @@
 | 
			
		||||
import {ActionSummary, TableDelta} from "app/common/ActionSummary";
 | 
			
		||||
import {delay} from "app/common/delay";
 | 
			
		||||
import {fromTableDataAction, TableColValues} from "app/common/DocActions";
 | 
			
		||||
import {StringUnion} from "app/common/StringUnion";
 | 
			
		||||
import {MetaRowRecord} from "app/common/TableData";
 | 
			
		||||
import {CellValue} from "app/plugin/GristData";
 | 
			
		||||
import {ActiveDoc} from "app/server/lib/ActiveDoc";
 | 
			
		||||
import {makeExceptionalDocSession} from "app/server/lib/DocSession";
 | 
			
		||||
import * as log from "app/server/lib/log";
 | 
			
		||||
import * as _ from "lodash";
 | 
			
		||||
import * as LRUCache from "lru-cache";
 | 
			
		||||
import fetch from "node-fetch";
 | 
			
		||||
import { ActionSummary, TableDelta } from 'app/common/ActionSummary';
 | 
			
		||||
import { delay } from 'app/common/delay';
 | 
			
		||||
import { fromTableDataAction, RowRecord, TableColValues, TableDataAction } from 'app/common/DocActions';
 | 
			
		||||
import { StringUnion } from 'app/common/StringUnion';
 | 
			
		||||
import { MetaRowRecord } from 'app/common/TableData';
 | 
			
		||||
import { CellDelta } from 'app/common/TabularDiff';
 | 
			
		||||
import { ActiveDoc } from 'app/server/lib/ActiveDoc';
 | 
			
		||||
import { makeExceptionalDocSession } from 'app/server/lib/DocSession';
 | 
			
		||||
import * as log from 'app/server/lib/log';
 | 
			
		||||
import { promisifyAll } from 'bluebird';
 | 
			
		||||
import * as _ from 'lodash';
 | 
			
		||||
import * as LRUCache from 'lru-cache';
 | 
			
		||||
import fetch from 'node-fetch';
 | 
			
		||||
import { createClient, RedisClient } from 'redis';
 | 
			
		||||
 | 
			
		||||
// TODO replace with redis
 | 
			
		||||
// Keeps track of whether records existed before changes to them started
 | 
			
		||||
// to determine the correct event type when the record is ready
 | 
			
		||||
const existedBeforeMemory: { [key: string]: boolean } = {};
 | 
			
		||||
promisifyAll(RedisClient.prototype);
 | 
			
		||||
 | 
			
		||||
// Only owners can manage triggers, but any user's activity can trigger them
 | 
			
		||||
// and the corresponding actions get the full values
 | 
			
		||||
const docSession = makeExceptionalDocSession('system');
 | 
			
		||||
 | 
			
		||||
// DB cache for webhook secrets
 | 
			
		||||
const webhookCache = new LRUCache<{ id: string, docId: string }, WebHookSecret>({max: 10 * 1000});
 | 
			
		||||
 | 
			
		||||
// Describes the change in existence to a record, which determines the event type
 | 
			
		||||
interface RecordDelta {
 | 
			
		||||
  existedBefore: boolean;
 | 
			
		||||
  existedAfter: boolean;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type RecordDeltas = Map<number, RecordDelta>;
 | 
			
		||||
 | 
			
		||||
// Union discriminated by type
 | 
			
		||||
type TriggerAction = WebhookAction | PythonAction;
 | 
			
		||||
 | 
			
		||||
@ -43,10 +41,9 @@ interface PythonAction {
 | 
			
		||||
  code: string;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Payload sent to webhook
 | 
			
		||||
// Simply the values in a record
 | 
			
		||||
interface Event {
 | 
			
		||||
  [colId: string]: CellValue;
 | 
			
		||||
interface WebHookEvent {
 | 
			
		||||
  payload: RowRecord;
 | 
			
		||||
  id: string;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export const allowedEventTypes = StringUnion("add", "update");
 | 
			
		||||
@ -60,17 +57,49 @@ export interface WebHookSecret {
 | 
			
		||||
  unsubscribeKey: string;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Processes triggers for records changed as described in an ActionSummary,
 | 
			
		||||
// Work to do after fetching values from the document
 | 
			
		||||
interface Task {
 | 
			
		||||
  tableId: string;
 | 
			
		||||
  tableDelta: TableDelta;
 | 
			
		||||
  triggers: any;
 | 
			
		||||
  tableDataAction: Promise<TableDataAction>;
 | 
			
		||||
  recordDeltas: RecordDeltas;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Processes triggers for records changed as described in ActionSummary objects,
 | 
			
		||||
// initiating webhooks and automations.
 | 
			
		||||
// An instance of this class should have .handle() called on it exactly once.
 | 
			
		||||
export class TriggersHandler {
 | 
			
		||||
export class DocTriggers {
 | 
			
		||||
  // Converts a column ref to colId by looking it up in _grist_Tables_column
 | 
			
		||||
  private _getColId: (rowId: number) => string|undefined;
 | 
			
		||||
 | 
			
		||||
  // Events that need to be sent to webhooks in FIFO order.
 | 
			
		||||
  private _webHookEventQueue: WebHookEvent[] = [];
 | 
			
		||||
 | 
			
		||||
  // DB cache for webhook secrets
 | 
			
		||||
  private _webhookCache = new LRUCache<string, WebHookSecret>({max: 1000});
 | 
			
		||||
 | 
			
		||||
  private _shuttingDown: boolean = false;
 | 
			
		||||
 | 
			
		||||
  private _sending: boolean = false;
 | 
			
		||||
 | 
			
		||||
  private _redisClient: RedisClient | undefined;
 | 
			
		||||
 | 
			
		||||
  constructor(private _activeDoc: ActiveDoc) {
 | 
			
		||||
    const redisUrl = process.env.REDIS_URL;
 | 
			
		||||
    if (redisUrl) {
 | 
			
		||||
      this._redisClient = createClient(redisUrl);
 | 
			
		||||
      // TODO check for existing events on redis queue
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public handle(summary: ActionSummary) {
 | 
			
		||||
  public shutdown() {
 | 
			
		||||
    this._shuttingDown = true;
 | 
			
		||||
    if (!this._sending) {
 | 
			
		||||
      this._redisClient?.quitAsync();
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public async handle(summary: ActionSummary) {
 | 
			
		||||
    const docData = this._activeDoc.docData;
 | 
			
		||||
    if (!docData) {
 | 
			
		||||
      return;
 | 
			
		||||
@ -81,93 +110,156 @@ export class TriggersHandler {
 | 
			
		||||
    this._getColId = docData.getMetaTable("_grist_Tables_column").getMetaRowPropFunc("colId");
 | 
			
		||||
 | 
			
		||||
    const triggersByTableRef = _.groupBy(triggersTable.getRecords(), "tableRef");
 | 
			
		||||
    for (const [tableRef, triggers] of _.toPairs(triggersByTableRef)) {
 | 
			
		||||
 | 
			
		||||
    const tasks: Task[] = [];
 | 
			
		||||
    for (const tableRef of Object.keys(triggersByTableRef).sort()) {
 | 
			
		||||
      const triggers = triggersByTableRef[tableRef];
 | 
			
		||||
      const tableId = getTableId(Number(tableRef))!;  // groupBy makes tableRef a string
 | 
			
		||||
      const tableDelta = summary.tableDeltas[tableId];
 | 
			
		||||
      if (!tableDelta) {
 | 
			
		||||
        continue;  // this table was not modified by these actions
 | 
			
		||||
      if (tableDelta) {
 | 
			
		||||
        const recordDeltas = this._getRecordDeltas(tableDelta);
 | 
			
		||||
        const filters = {id: [...recordDeltas.keys()]};
 | 
			
		||||
 | 
			
		||||
        // Fetch the modified records in full so they can be sent in webhooks
 | 
			
		||||
        // They will also be used to check if the record is ready
 | 
			
		||||
        const tableDataAction = this._activeDoc.fetchQuery(docSession, {tableId, filters});
 | 
			
		||||
        tasks.push({tableId, tableDelta, triggers, tableDataAction, recordDeltas});
 | 
			
		||||
      }
 | 
			
		||||
      // Handle tables in parallel (fetching table values from document DB)
 | 
			
		||||
      this._handleTableTriggers(
 | 
			
		||||
        tableId, tableDelta, triggers
 | 
			
		||||
      ).catch(() => log.error("Error handling triggers"));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // Fetch values from document DB in parallel
 | 
			
		||||
    await Promise.all(tasks.map(t => t.tableDataAction));
 | 
			
		||||
 | 
			
		||||
    const events: WebHookEvent[] = [];
 | 
			
		||||
    for (const task of tasks) {
 | 
			
		||||
      events.push(...this._handleTask(task, await task.tableDataAction));
 | 
			
		||||
    }
 | 
			
		||||
    this._webHookEventQueue.push(...events);
 | 
			
		||||
 | 
			
		||||
    if (!this._sending && events.length) {
 | 
			
		||||
      this._sending = true;
 | 
			
		||||
 | 
			
		||||
      const startSendLoop = () => {
 | 
			
		||||
        this._sendLoop().catch((e) => {
 | 
			
		||||
          log.error(`_sendLoop failed: ${e}`);
 | 
			
		||||
          startSendLoop();
 | 
			
		||||
        });
 | 
			
		||||
      };
 | 
			
		||||
      startSendLoop();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // TODO also push to redis queue
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private async _handleTableTriggers(
 | 
			
		||||
    tableId: string, delta: TableDelta, triggers: Trigger[],
 | 
			
		||||
  ) {
 | 
			
		||||
  private _getRecordDeltas(tableDelta: TableDelta): RecordDeltas {
 | 
			
		||||
    const recordDeltas = new Map<number, RecordDelta>();
 | 
			
		||||
    delta.updateRows.forEach(id =>
 | 
			
		||||
    tableDelta.updateRows.forEach(id =>
 | 
			
		||||
      recordDeltas.set(id, {existedBefore: true, existedAfter: true}));
 | 
			
		||||
    // A row ID can appear in both updateRows and addRows, although it probably shouldn't
 | 
			
		||||
    // Added row IDs override updated rows because they didn't exist before
 | 
			
		||||
    delta.addRows.forEach(id =>
 | 
			
		||||
    tableDelta.addRows.forEach(id =>
 | 
			
		||||
      recordDeltas.set(id, {existedBefore: false, existedAfter: true}));
 | 
			
		||||
 | 
			
		||||
    // If we allow subscribing to deletion in the future
 | 
			
		||||
    // delta.removeRows.forEach(id =>
 | 
			
		||||
    //   recordDeltas.set(id, {existedBefore: true, existedAfter: false}));
 | 
			
		||||
 | 
			
		||||
    // Fetch the modified records in full so they can be sent in webhooks
 | 
			
		||||
    // They will also be used to check if the record is ready
 | 
			
		||||
    const filters = {id: [...recordDeltas.keys()]};
 | 
			
		||||
    const bulkColValues = fromTableDataAction(await this._activeDoc.fetchQuery(docSession, {tableId, filters}));
 | 
			
		||||
    return recordDeltas;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private _handleTask(
 | 
			
		||||
    {tableDelta, tableId, triggers, recordDeltas}: Task,
 | 
			
		||||
    tableDataAction: TableDataAction,
 | 
			
		||||
  ) {
 | 
			
		||||
    const bulkColValues = fromTableDataAction(tableDataAction);
 | 
			
		||||
 | 
			
		||||
    log.info(`Processing ${triggers.length} triggers for ${bulkColValues.id.length} records of ${tableId}`);
 | 
			
		||||
 | 
			
		||||
    const makePayload = _.memoize((rowIndex: number) =>
 | 
			
		||||
      _.mapValues(bulkColValues, col => col[rowIndex]) as RowRecord
 | 
			
		||||
    );
 | 
			
		||||
 | 
			
		||||
    const result: WebHookEvent[] = [];
 | 
			
		||||
    for (const trigger of triggers) {
 | 
			
		||||
      const actions = JSON.parse(trigger.actions) as TriggerAction[];
 | 
			
		||||
      const webhookActions = actions.filter(act => act.type === "webhook") as WebhookAction[];
 | 
			
		||||
      if (!webhookActions.length) {
 | 
			
		||||
        continue;
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      for (let rowIndex = 0; rowIndex < bulkColValues.id.length; rowIndex++) {
 | 
			
		||||
        const rowId = bulkColValues.id[rowIndex];
 | 
			
		||||
        // Handle triggers serially to make order predictable
 | 
			
		||||
        await this._handleTrigger(
 | 
			
		||||
          trigger, actions, bulkColValues, rowIndex, rowId, recordDeltas.get(rowId)!
 | 
			
		||||
        );
 | 
			
		||||
      const rowIndexesToSend: number[] = _.range(bulkColValues.id.length).filter(rowIndex => {
 | 
			
		||||
          const rowId = bulkColValues.id[rowIndex];
 | 
			
		||||
          return this._shouldTriggerActions(
 | 
			
		||||
            trigger, bulkColValues, rowIndex, rowId, recordDeltas.get(rowId)!, tableDelta,
 | 
			
		||||
          );
 | 
			
		||||
        }
 | 
			
		||||
      );
 | 
			
		||||
 | 
			
		||||
      for (const action of webhookActions) {
 | 
			
		||||
        for (const rowIndex of rowIndexesToSend) {
 | 
			
		||||
          const event = {id: action.id, payload: makePayload(rowIndex)};
 | 
			
		||||
          result.push(event);
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
    return result;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  // Handles a single trigger for a single record, initiating all the corresponding actions
 | 
			
		||||
  private async _handleTrigger(
 | 
			
		||||
    trigger: Trigger, actions: TriggerAction[],
 | 
			
		||||
    bulkColValues: TableColValues, rowIndex: number, rowId: number, recordDelta: RecordDelta,
 | 
			
		||||
  ) {
 | 
			
		||||
    let isReady: boolean;
 | 
			
		||||
  /**
 | 
			
		||||
   * Determines if actions should be triggered for a single record and trigger.
 | 
			
		||||
   */
 | 
			
		||||
  private _shouldTriggerActions(
 | 
			
		||||
    trigger: Trigger,
 | 
			
		||||
    bulkColValues: TableColValues,
 | 
			
		||||
    rowIndex: number,
 | 
			
		||||
    rowId: number,
 | 
			
		||||
    recordDelta: RecordDelta,
 | 
			
		||||
    tableDelta: TableDelta,
 | 
			
		||||
  ): boolean {
 | 
			
		||||
    let readyBefore: boolean;
 | 
			
		||||
    if (!trigger.isReadyColRef) {
 | 
			
		||||
      // User hasn't configured a column, so all records are considered ready immediately
 | 
			
		||||
      isReady = true;
 | 
			
		||||
      readyBefore = recordDelta.existedBefore;
 | 
			
		||||
    } else {
 | 
			
		||||
      const colId = this._getColId(trigger.isReadyColRef)!;
 | 
			
		||||
      const isReadyCellValue = bulkColValues[colId]?.[rowIndex];
 | 
			
		||||
      if (typeof isReadyCellValue !== "boolean") {
 | 
			
		||||
        // Likely causes: column not found or error in formula
 | 
			
		||||
        isReady = false;
 | 
			
		||||
      const isReadyColId = this._getColId(trigger.isReadyColRef)!;
 | 
			
		||||
 | 
			
		||||
      // Must be the actual boolean `true`, not just anything truthy
 | 
			
		||||
      const isReady = bulkColValues[isReadyColId][rowIndex] === true;
 | 
			
		||||
      if (!isReady) {
 | 
			
		||||
         return false;
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      const cellDelta: CellDelta | undefined = tableDelta.columnDeltas[isReadyColId]?.[rowId];
 | 
			
		||||
      if (!recordDelta.existedBefore) {
 | 
			
		||||
        readyBefore = false;
 | 
			
		||||
      } else if (!cellDelta ) {
 | 
			
		||||
        // Cell wasn't changed, and the record is ready now, so it was ready before.
 | 
			
		||||
        // This assumes that the ActionSummary contains all changes to the isReady column.
 | 
			
		||||
        // TODO ensure ActionSummary actually contains all changes, right now bulk changes are truncated.
 | 
			
		||||
        readyBefore = true;
 | 
			
		||||
      } else {
 | 
			
		||||
        isReady = isReadyCellValue;
 | 
			
		||||
        const deltaBefore = cellDelta[0];
 | 
			
		||||
        if (deltaBefore === null) {
 | 
			
		||||
          // The record didn't exist before, so it definitely wasn't ready
 | 
			
		||||
          // (although we probably shouldn't reach this since we already checked recordDelta.existedBefore)
 | 
			
		||||
          readyBefore = false;
 | 
			
		||||
        } else if (deltaBefore === "?") {
 | 
			
		||||
          // The ActionSummary shouldn't contain this kind of delta at all
 | 
			
		||||
          // since it comes from a single action bundle, not a combination of summaries.
 | 
			
		||||
          log.warn('Unexpected deltaBefore === "?"', {trigger, isReadyColId, rowId, docId: this._activeDoc.docName});
 | 
			
		||||
          readyBefore = true;
 | 
			
		||||
        } else {
 | 
			
		||||
          // Only remaining case is that deltaBefore is a single-element array containing the previous value.
 | 
			
		||||
          const [valueBefore] = deltaBefore;
 | 
			
		||||
 | 
			
		||||
          // Must be the actual boolean `true`, not just anything truthy
 | 
			
		||||
          readyBefore = valueBefore === true;
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // Globally unique identifier of this record and trigger combination
 | 
			
		||||
    // trigger.tableRef is probably redundant given trigger.id, just being cautious
 | 
			
		||||
    const existedBeforeKey = `${this._activeDoc.docName}:${trigger.id}:${trigger.tableRef}:${rowId}`;
 | 
			
		||||
 | 
			
		||||
    // Only store existedBefore if it isn't stored already
 | 
			
		||||
    const existedBefore = existedBeforeKey in existedBeforeMemory ?
 | 
			
		||||
      existedBeforeMemory[existedBeforeKey] : recordDelta.existedBefore;
 | 
			
		||||
 | 
			
		||||
    if (!isReady) {
 | 
			
		||||
      existedBeforeMemory[existedBeforeKey] = existedBefore;
 | 
			
		||||
      return;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // Now that the record is ready, clear the stored existedBefore value
 | 
			
		||||
    // so that future events for this record are accurate
 | 
			
		||||
    delete existedBeforeMemory[existedBeforeKey];
 | 
			
		||||
 | 
			
		||||
    let eventType: EventType;
 | 
			
		||||
    if (existedBefore) {
 | 
			
		||||
    if (readyBefore) {
 | 
			
		||||
      eventType = "update";
 | 
			
		||||
      // If we allow subscribing to deletion in the future
 | 
			
		||||
      // if (recordDelta.existedAfter) {
 | 
			
		||||
@ -178,90 +270,107 @@ export class TriggersHandler {
 | 
			
		||||
    } else {
 | 
			
		||||
      eventType = "add";
 | 
			
		||||
    }
 | 
			
		||||
    if (!trigger.eventTypes!.includes(eventType)) {
 | 
			
		||||
      // The user hasn't subscribed to the type of change that happened
 | 
			
		||||
 | 
			
		||||
    return trigger.eventTypes!.includes(eventType);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private async _getWebHookUrl(id: string): Promise<string | undefined> {
 | 
			
		||||
    let webhook = this._webhookCache.get(id);
 | 
			
		||||
    if (!webhook) {
 | 
			
		||||
      const secret = await this._activeDoc.getHomeDbManager()?.getSecret(id, this._activeDoc.docName);
 | 
			
		||||
      if (!secret) {
 | 
			
		||||
        return;
 | 
			
		||||
      }
 | 
			
		||||
      webhook = JSON.parse(secret);
 | 
			
		||||
      this._webhookCache.set(id, webhook!);
 | 
			
		||||
    }
 | 
			
		||||
    const url = webhook!.url;
 | 
			
		||||
    if (!isUrlAllowed(url)) {
 | 
			
		||||
      log.warn(`Webhook not sent to forbidden URL: ${url}`);
 | 
			
		||||
      return;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // All the values in this record
 | 
			
		||||
    const event = _.mapValues(bulkColValues, col => col[rowIndex]);
 | 
			
		||||
 | 
			
		||||
    // Handle actions serially to make order predictable
 | 
			
		||||
    for (const action of actions) {
 | 
			
		||||
      await this._handleTriggerAction(action, event);
 | 
			
		||||
    }
 | 
			
		||||
    return url;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private async _handleTriggerAction(action: TriggerAction, event: Event) {
 | 
			
		||||
    // TODO use event queue for reliability
 | 
			
		||||
    if (action.type === "webhook") {
 | 
			
		||||
      const key = {id: action.id, docId: this._activeDoc.docName};
 | 
			
		||||
      let webhook = webhookCache.get(key);
 | 
			
		||||
      if (!webhook) {
 | 
			
		||||
        const secret = await this._activeDoc.getHomeDbManager()?.getSecret(key.id, key.docId);
 | 
			
		||||
        if (!secret) {
 | 
			
		||||
          return;
 | 
			
		||||
  private async _sendLoop() {
 | 
			
		||||
    log.info("Starting _sendLoop");
 | 
			
		||||
 | 
			
		||||
    // TODO delay/prevent shutting down while queue isn't empty?
 | 
			
		||||
    while (!this._shuttingDown) {
 | 
			
		||||
      if (!this._webHookEventQueue.length) {
 | 
			
		||||
        await delay(1000);
 | 
			
		||||
        continue;
 | 
			
		||||
      }
 | 
			
		||||
      const id = this._webHookEventQueue[0].id;
 | 
			
		||||
      const batch = _.takeWhile(this._webHookEventQueue.slice(0, 100), {id});
 | 
			
		||||
      const body = JSON.stringify(batch.map(e => e.payload));
 | 
			
		||||
      const url = await this._getWebHookUrl(id);
 | 
			
		||||
      let success: boolean;
 | 
			
		||||
      if (!url) {
 | 
			
		||||
        success = true;
 | 
			
		||||
      } else {
 | 
			
		||||
        success = await this._sendWebhookWithRetries(url, body);
 | 
			
		||||
      }
 | 
			
		||||
      if (success) {
 | 
			
		||||
        this._webHookEventQueue.splice(0, batch.length);
 | 
			
		||||
        // TODO also remove on redis
 | 
			
		||||
      } else if (!this._shuttingDown) {
 | 
			
		||||
        // TODO reorder queue on failure
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    log.info("Ended _sendLoop");
 | 
			
		||||
 | 
			
		||||
    this._redisClient?.quitAsync().catch(e =>
 | 
			
		||||
      // Catch error to prevent sendLoop being restarted
 | 
			
		||||
      log.warn("Error quitting redis: " + e)
 | 
			
		||||
    );
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private async _sendWebhookWithRetries(url: string, body: string) {
 | 
			
		||||
    const maxAttempts = 20;
 | 
			
		||||
    const maxWait = 64;
 | 
			
		||||
    let wait = 1;
 | 
			
		||||
    for (let attempt = 0; attempt < maxAttempts; attempt++) {
 | 
			
		||||
      if (this._shuttingDown) {
 | 
			
		||||
        return false;
 | 
			
		||||
      }
 | 
			
		||||
      try {
 | 
			
		||||
        const response = await fetch(url, {
 | 
			
		||||
          method: 'POST',
 | 
			
		||||
          body,
 | 
			
		||||
          headers: {
 | 
			
		||||
            'Content-Type': 'application/json',
 | 
			
		||||
          },
 | 
			
		||||
        });
 | 
			
		||||
        if (response.status === 200) {
 | 
			
		||||
          return true;
 | 
			
		||||
        }
 | 
			
		||||
        webhook = JSON.parse(secret);
 | 
			
		||||
        webhookCache.set(key, webhook!);
 | 
			
		||||
        log.warn(`Webhook responded with status ${response.status}`);
 | 
			
		||||
      } catch (e) {
 | 
			
		||||
        log.warn(`Webhook error: ${e}`);
 | 
			
		||||
      }
 | 
			
		||||
      const url = webhook!.url;
 | 
			
		||||
      if (!isUrlAllowed(url)) {
 | 
			
		||||
        log.warn(`Webhook not sent to forbidden URL: ${url}`);
 | 
			
		||||
        return;
 | 
			
		||||
 | 
			
		||||
      // Don't wait any more if this is the last attempt.
 | 
			
		||||
      if (attempt >= maxAttempts - 1) {
 | 
			
		||||
        return false;
 | 
			
		||||
      }
 | 
			
		||||
      pendingEvents.push({url: webhook!.url, event});
 | 
			
		||||
      if (!startedSending) {
 | 
			
		||||
        startedSending = true;
 | 
			
		||||
        setInterval(sendPendingEvents, 2000);
 | 
			
		||||
 | 
			
		||||
      // Wait `wait` seconds, checking this._shuttingDown every second.
 | 
			
		||||
      for (let waitIndex = 0; waitIndex < wait; waitIndex++) {
 | 
			
		||||
        if (this._shuttingDown) {
 | 
			
		||||
          return false;
 | 
			
		||||
        }
 | 
			
		||||
        await delay(1000);
 | 
			
		||||
      }
 | 
			
		||||
      if (wait < maxWait) {
 | 
			
		||||
        wait *= 2;
 | 
			
		||||
      }
 | 
			
		||||
    } else {
 | 
			
		||||
      throw new Error("Unknown action type " + action.type);
 | 
			
		||||
    }
 | 
			
		||||
    return false;
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
let pendingEvents: Array<{ url: string, event: Event }> = [];
 | 
			
		||||
let startedSending = false;
 | 
			
		||||
 | 
			
		||||
function sendPendingEvents() {
 | 
			
		||||
  const pending = pendingEvents;
 | 
			
		||||
  pendingEvents = [];
 | 
			
		||||
  for (const [url, group] of _.toPairs(_.groupBy(pending, "url"))) {
 | 
			
		||||
    const body = JSON.stringify(_.map(group, "event"));
 | 
			
		||||
    sendWebhookWithRetries(url, body).catch(() => log.error("Webhook failed!"));
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async function sendWebhookWithRetries(url: string, body: string) {
 | 
			
		||||
  const maxAttempts = 20;
 | 
			
		||||
  const maxWait = 64;
 | 
			
		||||
  let wait = 1;
 | 
			
		||||
  for (let i = 0; i < maxAttempts; i++) {
 | 
			
		||||
    try {
 | 
			
		||||
      const response = await fetch(url, {
 | 
			
		||||
        method: 'POST',
 | 
			
		||||
        body,
 | 
			
		||||
        headers: {
 | 
			
		||||
          'Content-Type': 'application/json',
 | 
			
		||||
        },
 | 
			
		||||
      });
 | 
			
		||||
      if (response.status === 200) {
 | 
			
		||||
        return;
 | 
			
		||||
      }
 | 
			
		||||
      log.warn(`Webhook responded with status ${response.status}`);
 | 
			
		||||
    } catch (e) {
 | 
			
		||||
      log.warn(`Webhook error: ${e}`);
 | 
			
		||||
    }
 | 
			
		||||
    await delay((wait + Math.random()) * 1000);
 | 
			
		||||
    if (wait < maxWait) {
 | 
			
		||||
      wait *= 2;
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  throw new Error("Webhook failed!");
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
export function isUrlAllowed(urlString: string) {
 | 
			
		||||
  let url: URL;
 | 
			
		||||
  try {
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user