@ -61,9 +61,8 @@ export interface WebHookSecret {
// Work to do after fetching values from the document
interface Task {
tableId : string ;
tableDelta : TableDelta ;
triggers : any ;
triggers : Trigger[ ] ;
tableDataAction : Promise < TableDataAction > ;
recordDeltas : RecordDeltas ;
}
@ -178,7 +177,7 @@ export class DocTriggers {
// Fetch the modified records in full so they can be sent in webhooks
// They will also be used to check if the record is ready
const tableDataAction = this . _activeDoc . fetchQuery ( docSession , { tableId , filters } ) ;
tasks . push ( { table Id, table Delta, triggers , tableDataAction , recordDeltas } ) ;
tasks . push ( { table Delta, triggers , tableDataAction , recordDeltas } ) ;
}
}
@ -192,6 +191,7 @@ export class DocTriggers {
if ( ! events . length ) {
return summary ;
}
this . _log ( "Total number of webhook events generated by bundle" , { numEvents : events.length } ) ;
// Only add events to the queue after we finish fetching the backup from redis
// to ensure that events are delivered in the order they were generated.
@ -225,6 +225,18 @@ export class DocTriggers {
return this . _webHookEventQueue . length >= MAX_QUEUE_SIZE ;
}
private _log ( msg : string , { level = 'info' , . . . meta } : any = { } ) {
log . origLog ( level , 'DocTriggers: ' + msg , {
. . . meta ,
docId : this._docId ,
queueLength : this._webHookEventQueue.length ,
drainingQueue : this._drainingQueue ,
shuttingDown : this._shuttingDown ,
sending : this._sending ,
redisClient : Boolean ( this . _redisClientField ) ,
} ) ;
}
private async _pushToRedisQueue ( events : WebHookEvent [ ] ) {
const strings = events . map ( e = > JSON . stringify ( e ) ) ;
await this . _redisClient ! . rpushAsync ( this . _redisQueueKey , . . . strings ) ;
@ -233,6 +245,7 @@ export class DocTriggers {
private async _getRedisQueue ( redisClient : RedisClient ) {
const strings = await redisClient . lrangeAsync ( this . _redisQueueKey , 0 , - 1 ) ;
if ( strings . length ) {
this . _log ( "Webhook events found on redis queue" , { numEvents : strings.length } ) ;
const events = strings . map ( s = > JSON . parse ( s ) ) ;
this . _webHookEventQueue . unshift ( . . . events ) ;
this . _startSendLoop ( ) ;
@ -257,12 +270,13 @@ export class DocTriggers {
}
private _handleTask (
{ tableDelta , t ableId, t riggers, recordDeltas } : Task ,
{ tableDelta , t riggers, recordDeltas } : Task ,
tableDataAction : TableDataAction ,
) {
const bulkColValues = fromTableDataAction ( tableDataAction ) ;
log . info ( ` Processing ${ triggers . length } triggers for ${ bulkColValues . id . length } records of ${ tableId } ` ) ;
const meta = { numTriggers : triggers.length , numRecords : bulkColValues.id.length } ;
this . _log ( ` Processing triggers ` , meta ) ;
const makePayload = _ . memoize ( ( rowIndex : number ) = >
_ . mapValues ( bulkColValues , col = > col [ rowIndex ] ) as RowRecord
@ -291,6 +305,9 @@ export class DocTriggers {
}
}
}
this . _log ( "Generated events from triggers" , { numEvents : result.length , . . . meta } ) ;
return result ;
}
@ -334,7 +351,7 @@ export class DocTriggers {
} else if ( deltaBefore === "?" ) {
// The ActionSummary shouldn't contain this kind of delta at all
// since it comes from a single action bundle, not a combination of summaries.
log . warn ( 'Unexpected deltaBefore === "?"' , { trigger , isReadyColId , rowId , docId : this._activeDoc.docName } ) ;
this . _log ( 'Unexpected deltaBefore === "?"' , { level : 'warn' , trigger } ) ;
readyBefore = true ;
} else {
// Only remaining case is that deltaBefore is a single-element array containing the previous value.
@ -365,8 +382,9 @@ export class DocTriggers {
private async _getWebHookUrl ( id : string ) : Promise < string | undefined > {
let webhook = this . _webhookCache . get ( id ) ;
if ( ! webhook ) {
const secret = await this . _activeDoc . getHomeDbManager ( ) ? . getSecret ( id , this . _ activeDoc. docName ) ;
const secret = await this . _activeDoc . getHomeDbManager ( ) ? . getSecret ( id , this . _ docId ) ;
if ( ! secret ) {
this . _log ( ` No webhook secret found ` , { level : 'warn' , id } ) ;
return ;
}
webhook = JSON . parse ( secret ) ;
@ -374,7 +392,7 @@ export class DocTriggers {
}
const url = webhook ! . url ;
if ( ! isUrlAllowed ( url ) ) {
log . warn ( ` Webhook not sent to forbidden URL: ${ url } ` ) ;
this . _log ( ` Webhook not sent to forbidden URL ` , { level : 'warn' , url } ) ;
return ;
}
return url ;
@ -384,7 +402,7 @@ export class DocTriggers {
if ( ! this . _sending ) { // only run one loop at a time
this . _sending = true ;
this . _sendLoop ( ) . catch ( ( e ) = > { // run _sendLoop asynchronously (in the background)
log . error ( ` _sendLoop failed: ${ e } ` ) ;
this . _log ( ` _sendLoop failed: ${ e } ` , { level : 'error' } ) ;
this . _sending = false ; // otherwise the following line will complete instantly
this . _startSendLoop ( ) ; // restart the loop on failure
} ) ;
@ -396,7 +414,7 @@ export class DocTriggers {
// Managed by _startSendLoop. Runs in the background. Only one loop should run at a time.
// Runs until shutdown.
private async _sendLoop() {
log . info ( "Starting _sendLoop" ) ;
this . _log ( "Starting _sendLoop" ) ;
// TODO delay/prevent shutting down while queue isn't empty?
while ( ! this . _shuttingDown ) {
@ -408,10 +426,14 @@ export class DocTriggers {
const batch = _ . takeWhile ( this . _webHookEventQueue . slice ( 0 , 100 ) , { id } ) ;
const body = JSON . stringify ( batch . map ( e = > e . payload ) ) ;
const url = await this . _getWebHookUrl ( id ) ;
let meta : Record < string , any > | undefined ;
let success : boolean ;
if ( ! url ) {
success = true ;
} else {
meta = { numEvents : batch.length , webhookId : id , host : new URL ( url ) . host } ;
this . _log ( "Sending batch of webhook events" , meta ) ;
success = await this . _sendWebhookWithRetries ( url , body ) ;
}
@ -423,24 +445,29 @@ export class DocTriggers {
multi . ltrim ( this . _redisQueueKey , batch . length , - 1 ) ;
}
if ( ! success && ! this . _drainingQueue ) {
// Put the failed events at the end of the queue to try again later
// while giving other URLs a chance to receive events.
this . _webHookEventQueue . push ( . . . batch ) ;
if ( multi ) {
const strings = batch . map ( e = > JSON . stringify ( e ) ) ;
multi . rpush ( this . _redisQueueKey , . . . strings ) ;
if ( ! success ) {
this . _log ( "Failed to send batch of webhook events" , { . . . meta , level : 'warn' } ) ;
if ( ! this . _drainingQueue ) {
// Put the failed events at the end of the queue to try again later
// while giving other URLs a chance to receive events.
this . _webHookEventQueue . push ( . . . batch ) ;
if ( multi ) {
const strings = batch . map ( e = > JSON . stringify ( e ) ) ;
multi . rpush ( this . _redisQueueKey , . . . strings ) ;
}
}
} else if ( meta ) {
this . _log ( "Successfully sent batch of webhook events" , meta ) ;
}
await multi ? . execAsync ( ) ;
}
log . info ( "Ended _sendLoop" ) ;
this . _log ( "Ended _sendLoop" ) ;
this . _redisClient ? . quitAsync ( ) . catch ( e = >
// Catch error to prevent sendLoop being restarted
log . warn ( "Error quitting redis: " + e )
this . _log ( "Error quitting redis: " + e , { level : 'warn' } )
) ;
}
@ -450,6 +477,7 @@ export class DocTriggers {
}
const redisUrl = process . env . REDIS_URL ;
if ( redisUrl ) {
this . _log ( "Creating redis client" ) ;
this . _redisClientField = createClient ( redisUrl ) ;
}
return this . _redisClientField ;
@ -480,9 +508,9 @@ export class DocTriggers {
if ( response . status === 200 ) {
return true ;
}
log . warn ( ` Webhook responded with status ${ response . status } ` ) ;
this . _log ( ` Webhook responded with non-200 status ` , { level : 'warn' , status : response.status , attempt } ) ;
} catch ( e ) {
log . warn ( ` Webhook error: ${ e } ` ) ;
this . _log ( ` Webhook sending error: ${ e } ` , { level : 'warn' , attempt } ) ;
}
// Don't wait any more if this is the last attempt.