@ -114,15 +114,19 @@ export class TriggersHandler {
const filters = { id : [ . . . recordDeltas . keys ( ) ] } ;
const bulkColValues = fromTableDataAction ( await this . _activeDoc . fetchQuery ( docSession , { tableId , filters } ) ) ;
triggers . forEach ( trigger = > {
log . info ( ` Processing ${ triggers . length } triggers for ${ bulkColValues . id . length } records of ${ tableId } ` ) ;
for ( const trigger of triggers ) {
const actions = JSON . parse ( trigger . actions ) as TriggerAction [ ] ;
bulkColValues . id . forEach ( ( rowId , rowIndex ) = > {
// Handle triggers in parallel (talking to redis)
this . _handleTrigger (
for ( let rowIndex = 0 ; rowIndex < bulkColValues . id . length ; rowIndex ++ ) {
const rowId = bulkColValues . id [ rowIndex ] ;
// Handle triggers serially to make order predictable
await this . _handleTrigger (
trigger , actions , bulkColValues , rowIndex , rowId , recordDeltas . get ( rowId ) !
) . catch ( ( ) = > log . error ( "Error handling trigger action" ) ) ;
} ) ;
} ) ;
) ;
}
}
}
// Handles a single trigger for a single record, initiating all the corresponding actions
@ -182,12 +186,10 @@ export class TriggersHandler {
// All the values in this record
const event = _ . mapValues ( bulkColValues , col = > col [ rowIndex ] ) ;
actions . forEach ( action = > {
// Handle actions in parallel
this . _handleTriggerAction (
action , event
) . catch ( ( ) = > log . error ( "Error handling trigger action" ) ) ;
} ) ;
// Handle actions serially to make order predictable
for ( const action of actions ) {
await this . _handleTriggerAction ( action , event ) ;
}
}
private async _handleTriggerAction ( action : TriggerAction , event : Event ) {
@ -226,7 +228,7 @@ function sendPendingEvents() {
const pending = pendingEvents ;
pendingEvents = [ ] ;
for ( const [ url , group ] of _ . toPairs ( _ . groupBy ( pending , "url" ) ) ) {
const body = JSON . stringify ( _ . map ( group , "event" ) .reverse ( ) );
const body = JSON . stringify ( _ . map ( group , "event" ) );
sendWebhookWithRetries ( url , body ) . catch ( ( ) = > log . error ( "Webhook failed!" ) ) ;
}
}
@ -236,20 +238,24 @@ async function sendWebhookWithRetries(url: string, body: string) {
const maxWait = 64 ;
let wait = 1 ;
for ( let i = 0 ; i < maxAttempts ; i ++ ) {
const response = await fetch ( url , {
method : 'POST' ,
body ,
headers : {
'Content-Type' : 'application/json' ,
} ,
} ) ;
if ( response . status === 200 ) {
return ;
} else {
await delay ( ( wait + Math . random ( ) ) * 1000 ) ;
if ( wait < maxWait ) {
wait *= 2 ;
try {
const response = await fetch ( url , {
method : 'POST' ,
body ,
headers : {
'Content-Type' : 'application/json' ,
} ,
} ) ;
if ( response . status === 200 ) {
return ;
}
log . warn ( ` Webhook responded with status ${ response . status } ` ) ;
} catch ( e ) {
log . warn ( ` Webhook error: ${ e } ` ) ;
}
await delay ( ( wait + Math . random ( ) ) * 1000 ) ;
if ( wait < maxWait ) {
wait *= 2 ;
}
}
throw new Error ( "Webhook failed!" ) ;