diff --git a/app/server/lib/DocApi.ts b/app/server/lib/DocApi.ts index 8b23f8b5..b83608d1 100644 --- a/app/server/lib/DocApi.ts +++ b/app/server/lib/DocApi.ts @@ -391,11 +391,6 @@ export class DocWorkerApi { throw new ApiError('Provided url is forbidden', 403); } - const unsubscribeKey = uuidv4(); - const webhook: WebHookSecret = {unsubscribeKey, url}; - const secretValue = JSON.stringify(webhook); - const webhookId = (await this._dbManager.addSecret(secretValue, activeDoc.docName)).id; - const metaTables = await getMetaTables(activeDoc, req); const tableRef = tableIdToRef(metaTables, req.params.tableId); @@ -409,6 +404,11 @@ export class DocWorkerApi { isReadyColRef = colRefs[colRowIndex]; } + const unsubscribeKey = uuidv4(); + const webhook: WebHookSecret = {unsubscribeKey, url}; + const secretValue = JSON.stringify(webhook); + const webhookId = (await this._dbManager.addSecret(secretValue, activeDoc.docName)).id; + const webhookAction: WebhookAction = {type: "webhook", id: webhookId}; const sandboxRes = await handleSandboxError("_grist_Triggers", [], activeDoc.applyUserActions( diff --git a/app/server/lib/Triggers.ts b/app/server/lib/Triggers.ts index ff3bbb56..9e0ad1d6 100644 --- a/app/server/lib/Triggers.ts +++ b/app/server/lib/Triggers.ts @@ -114,15 +114,19 @@ export class TriggersHandler { const filters = {id: [...recordDeltas.keys()]}; const bulkColValues = fromTableDataAction(await this._activeDoc.fetchQuery(docSession, {tableId, filters})); - triggers.forEach(trigger => { + log.info(`Processing ${triggers.length} triggers for ${bulkColValues.id.length} records of ${tableId}`); + + for (const trigger of triggers) { const actions = JSON.parse(trigger.actions) as TriggerAction[]; - bulkColValues.id.forEach((rowId, rowIndex) => { - // Handle triggers in parallel (talking to redis) - this._handleTrigger( + + for (let rowIndex = 0; rowIndex < bulkColValues.id.length; rowIndex++) { + const rowId = bulkColValues.id[rowIndex]; + // Handle triggers serially to make order predictable + await this._handleTrigger( trigger, actions, bulkColValues, rowIndex, rowId, recordDeltas.get(rowId)! - ).catch(() => log.error("Error handling trigger action")); - }); - }); + ); + } + } } // Handles a single trigger for a single record, initiating all the corresponding actions @@ -182,12 +186,10 @@ export class TriggersHandler { // All the values in this record const event = _.mapValues(bulkColValues, col => col[rowIndex]); - actions.forEach(action => { - // Handle actions in parallel - this._handleTriggerAction( - action, event - ).catch(() => log.error("Error handling trigger action")); - }); + // Handle actions serially to make order predictable + for (const action of actions) { + await this._handleTriggerAction(action, event); + } } private async _handleTriggerAction(action: TriggerAction, event: Event) { @@ -226,7 +228,7 @@ function sendPendingEvents() { const pending = pendingEvents; pendingEvents = []; for (const [url, group] of _.toPairs(_.groupBy(pending, "url"))) { - const body = JSON.stringify(_.map(group, "event").reverse()); + const body = JSON.stringify(_.map(group, "event")); sendWebhookWithRetries(url, body).catch(() => log.error("Webhook failed!")); } } @@ -236,20 +238,24 @@ async function sendWebhookWithRetries(url: string, body: string) { const maxWait = 64; let wait = 1; for (let i = 0; i < maxAttempts; i++) { - const response = await fetch(url, { - method: 'POST', - body, - headers: { - 'Content-Type': 'application/json', - }, - }); - if (response.status === 200) { - return; - } else { - await delay((wait + Math.random()) * 1000); - if (wait < maxWait) { - wait *= 2; + try { + const response = await fetch(url, { + method: 'POST', + body, + headers: { + 'Content-Type': 'application/json', + }, + }); + if (response.status === 200) { + return; } + log.warn(`Webhook responded with status ${response.status}`); + } catch (e) { + log.warn(`Webhook error: ${e}`); + } + await delay((wait + Math.random()) * 1000); + if (wait < maxWait) { + wait *= 2; } } throw new Error("Webhook failed!");