diff --git a/app/server/lib/Triggers.ts b/app/server/lib/Triggers.ts index 2a4bf3ff..a7fd05ed 100644 --- a/app/server/lib/Triggers.ts +++ b/app/server/lib/Triggers.ts @@ -61,9 +61,8 @@ export interface WebHookSecret { // Work to do after fetching values from the document interface Task { - tableId: string; tableDelta: TableDelta; - triggers: any; + triggers: Trigger[]; tableDataAction: Promise; recordDeltas: RecordDeltas; } @@ -178,7 +177,7 @@ export class DocTriggers { // Fetch the modified records in full so they can be sent in webhooks // They will also be used to check if the record is ready const tableDataAction = this._activeDoc.fetchQuery(docSession, {tableId, filters}); - tasks.push({tableId, tableDelta, triggers, tableDataAction, recordDeltas}); + tasks.push({tableDelta, triggers, tableDataAction, recordDeltas}); } } @@ -192,6 +191,7 @@ export class DocTriggers { if (!events.length) { return summary; } + this._log("Total number of webhook events generated by bundle", {numEvents: events.length}); // Only add events to the queue after we finish fetching the backup from redis // to ensure that events are delivered in the order they were generated. @@ -225,6 +225,18 @@ export class DocTriggers { return this._webHookEventQueue.length >= MAX_QUEUE_SIZE; } + private _log(msg: string, {level = 'info', ...meta}: any = {}) { + log.origLog(level, 'DocTriggers: ' + msg, { + ...meta, + docId: this._docId, + queueLength: this._webHookEventQueue.length, + drainingQueue: this._drainingQueue, + shuttingDown: this._shuttingDown, + sending: this._sending, + redisClient: Boolean(this._redisClientField), + }); + } + private async _pushToRedisQueue(events: WebHookEvent[]) { const strings = events.map(e => JSON.stringify(e)); await this._redisClient!.rpushAsync(this._redisQueueKey, ...strings); @@ -233,6 +245,7 @@ export class DocTriggers { private async _getRedisQueue(redisClient: RedisClient) { const strings = await redisClient.lrangeAsync(this._redisQueueKey, 0, -1); if (strings.length) { + this._log("Webhook events found on redis queue", {numEvents: strings.length}); const events = strings.map(s => JSON.parse(s)); this._webHookEventQueue.unshift(...events); this._startSendLoop(); @@ -257,12 +270,13 @@ export class DocTriggers { } private _handleTask( - {tableDelta, tableId, triggers, recordDeltas}: Task, + {tableDelta, triggers, recordDeltas}: Task, tableDataAction: TableDataAction, ) { const bulkColValues = fromTableDataAction(tableDataAction); - log.info(`Processing ${triggers.length} triggers for ${bulkColValues.id.length} records of ${tableId}`); + const meta = {numTriggers: triggers.length, numRecords: bulkColValues.id.length}; + this._log(`Processing triggers`, meta); const makePayload = _.memoize((rowIndex: number) => _.mapValues(bulkColValues, col => col[rowIndex]) as RowRecord @@ -291,6 +305,9 @@ export class DocTriggers { } } } + + this._log("Generated events from triggers", {numEvents: result.length, ...meta}); + return result; } @@ -334,7 +351,7 @@ export class DocTriggers { } else if (deltaBefore === "?") { // The ActionSummary shouldn't contain this kind of delta at all // since it comes from a single action bundle, not a combination of summaries. - log.warn('Unexpected deltaBefore === "?"', {trigger, isReadyColId, rowId, docId: this._activeDoc.docName}); + this._log('Unexpected deltaBefore === "?"', {level: 'warn', trigger}); readyBefore = true; } else { // Only remaining case is that deltaBefore is a single-element array containing the previous value. @@ -365,8 +382,9 @@ export class DocTriggers { private async _getWebHookUrl(id: string): Promise { let webhook = this._webhookCache.get(id); if (!webhook) { - const secret = await this._activeDoc.getHomeDbManager()?.getSecret(id, this._activeDoc.docName); + const secret = await this._activeDoc.getHomeDbManager()?.getSecret(id, this._docId); if (!secret) { + this._log(`No webhook secret found`, {level: 'warn', id}); return; } webhook = JSON.parse(secret); @@ -374,7 +392,7 @@ export class DocTriggers { } const url = webhook!.url; if (!isUrlAllowed(url)) { - log.warn(`Webhook not sent to forbidden URL: ${url}`); + this._log(`Webhook not sent to forbidden URL`, {level: 'warn', url}); return; } return url; @@ -384,7 +402,7 @@ export class DocTriggers { if (!this._sending) { // only run one loop at a time this._sending = true; this._sendLoop().catch((e) => { // run _sendLoop asynchronously (in the background) - log.error(`_sendLoop failed: ${e}`); + this._log(`_sendLoop failed: ${e}`, {level: 'error'}); this._sending = false; // otherwise the following line will complete instantly this._startSendLoop(); // restart the loop on failure }); @@ -396,7 +414,7 @@ export class DocTriggers { // Managed by _startSendLoop. Runs in the background. Only one loop should run at a time. // Runs until shutdown. private async _sendLoop() { - log.info("Starting _sendLoop"); + this._log("Starting _sendLoop"); // TODO delay/prevent shutting down while queue isn't empty? while (!this._shuttingDown) { @@ -408,10 +426,14 @@ export class DocTriggers { const batch = _.takeWhile(this._webHookEventQueue.slice(0, 100), {id}); const body = JSON.stringify(batch.map(e => e.payload)); const url = await this._getWebHookUrl(id); + let meta: Record|undefined; + let success: boolean; if (!url) { success = true; } else { + meta = {numEvents: batch.length, webhookId: id, host: new URL(url).host}; + this._log("Sending batch of webhook events", meta); success = await this._sendWebhookWithRetries(url, body); } @@ -423,24 +445,29 @@ export class DocTriggers { multi.ltrim(this._redisQueueKey, batch.length, -1); } - if (!success && !this._drainingQueue) { - // Put the failed events at the end of the queue to try again later - // while giving other URLs a chance to receive events. - this._webHookEventQueue.push(...batch); - if (multi) { - const strings = batch.map(e => JSON.stringify(e)); - multi.rpush(this._redisQueueKey, ...strings); + if (!success) { + this._log("Failed to send batch of webhook events", {...meta, level: 'warn'}); + if (!this._drainingQueue) { + // Put the failed events at the end of the queue to try again later + // while giving other URLs a chance to receive events. + this._webHookEventQueue.push(...batch); + if (multi) { + const strings = batch.map(e => JSON.stringify(e)); + multi.rpush(this._redisQueueKey, ...strings); + } } + } else if (meta) { + this._log("Successfully sent batch of webhook events", meta); } await multi?.execAsync(); } - log.info("Ended _sendLoop"); + this._log("Ended _sendLoop"); this._redisClient?.quitAsync().catch(e => // Catch error to prevent sendLoop being restarted - log.warn("Error quitting redis: " + e) + this._log("Error quitting redis: " + e, {level: 'warn'}) ); } @@ -450,6 +477,7 @@ export class DocTriggers { } const redisUrl = process.env.REDIS_URL; if (redisUrl) { + this._log("Creating redis client"); this._redisClientField = createClient(redisUrl); } return this._redisClientField; @@ -480,9 +508,9 @@ export class DocTriggers { if (response.status === 200) { return true; } - log.warn(`Webhook responded with status ${response.status}`); + this._log(`Webhook responded with non-200 status`, {level: 'warn', status: response.status, attempt}); } catch (e) { - log.warn(`Webhook error: ${e}`); + this._log(`Webhook sending error: ${e}`, {level: 'warn', attempt}); } // Don't wait any more if this is the last attempt.