@ -11,7 +11,7 @@ import { promisifyAll } from 'bluebird';
import * as _ from 'lodash' ;
import * as LRUCache from 'lru-cache' ;
import fetch from 'node-fetch' ;
import { createClient , RedisClient } from 'redis' ;
import { createClient , Multi, RedisClient } from 'redis' ;
promisifyAll ( RedisClient . prototype ) ;
@ -66,29 +66,49 @@ interface Task {
recordDeltas : RecordDeltas ;
}
const MAX_QUEUE_SIZE = 1000 ;
// Processes triggers for records changed as described in ActionSummary objects,
// initiating webhooks and automations.
// The interesting stuff starts in the handle() method.
// Webhooks are placed on an event queue in memory which is replicated on redis as backup.
// The same class instance consumes the queue and sends webhook requests in the background - see _sendLoop().
// Triggers are configured in the document, while details of webhooks (URLs) are kept
// in the Secrets table of the Home DB.
export class DocTriggers {
// Converts a column ref to colId by looking it up in _grist_Tables_column
private _getColId : ( rowId : number ) = > string | undefined ;
// Events that need to be sent to webhooks in FIFO order.
// This is the primary place where events are stored and consumed,
// while a copy of this queue is kept on redis as a backup.
// Modifications to this queue should be replicated on the redis queue.
private _webHookEventQueue : WebHookEvent [ ] = [ ] ;
// DB cache for webhook secrets
private _webhookCache = new LRUCache < string , WebHookSecret > ( { max : 1000 } ) ;
// Set to true by shutdown().
// Indicates that loops (especially for sending requests) should stop.
private _shuttingDown : boolean = false ;
// true if there is a webhook request sending loop running in the background
// to ensure only one loop is running at a time.
private _sending : boolean = false ;
private _redisClient : RedisClient | undefined ;
// Client lazily initiated by _redisClient getter, since most documents don't have triggers
// and therefore don't need a redis connection.
private _redisClientField : RedisClient | undefined ;
// Promise which resolves after we finish fetching the backup queue from redis on startup.
private _getRedisQueuePromise : Promise < void > | undefined ;
constructor ( private _activeDoc : ActiveDoc ) {
const redisUrl = process . env . REDIS_URL ;
if ( redisUrl ) {
this . _redisClient = createClient ( redisUrl ) ;
// TODO check for existing events on redis queue
// We create a transient client just for this purpose because it makes it easy
// to quit it afterwards and avoid keeping a client open for documents without triggers.
this . _getRedisQueuePromise = this . _getRedisQueue ( createClient ( redisUrl ) ) ;
}
}
@ -99,6 +119,11 @@ export class DocTriggers {
}
}
// Called after applying actions to a document and updating its data.
// Checks for triggers configured in a meta table,
// and whether any of those triggers monitor tables which were modified by the actions
// described in the given summary.
// If so, generates events which are pushed to the local and redis queues.
public async handle ( summary : ActionSummary ) {
const docData = this . _activeDoc . docData ;
if ( ! docData ) {
@ -111,11 +136,16 @@ export class DocTriggers {
const triggersByTableRef = _ . groupBy ( triggersTable . getRecords ( ) , "tableRef" ) ;
// Work to do after fetching values from the document
const tasks : Task [ ] = [ ] ;
// For each table in the document which is monitored by one or more triggers...
for ( const tableRef of Object . keys ( triggersByTableRef ) . sort ( ) ) {
const triggers = triggersByTableRef [ tableRef ] ;
const tableId = getTableId ( Number ( tableRef ) ) ! ; // groupBy makes tableRef a string
const tableDelta = summary . tableDeltas [ tableId ] ;
// ...if the monitored table was modified by the summarized actions,
// fetch the modified/created records and note the work that needs to be done.
if ( tableDelta ) {
const recordDeltas = this . _getRecordDeltas ( tableDelta ) ;
const filters = { id : [ . . . recordDeltas . keys ( ) ] } ;
@ -134,21 +164,53 @@ export class DocTriggers {
for ( const task of tasks ) {
events . push ( . . . this . _handleTask ( task , await task . tableDataAction ) ) ;
}
if ( ! events . length ) {
return ;
}
// Only add events to the queue after we finish fetching the backup from redis
// to ensure that events are delivered in the order they were generated.
await this . _getRedisQueuePromise ;
if ( this . _redisClient ) {
await this . _pushToRedisQueue ( events ) ;
}
this . _webHookEventQueue . push ( . . . events ) ;
if ( ! this . _sending && events . length ) {
this . _sending = true ;
this . _startSendLoop ( ) ;
const startSendLoop = ( ) = > {
this . _sendLoop ( ) . catch ( ( e ) = > {
log . error ( ` _sendLoop failed: ${ e } ` ) ;
startSendLoop ( ) ;
} ) ;
} ;
startSendLoop ( ) ;
// Prevent further document activity while the queue is too full.
while ( this . _drainingQueue && ! this . _shuttingDown ) {
await delay ( 1000 ) ;
}
}
private get _docId() {
return this . _activeDoc . docName ;
}
private get _redisQueueKey() {
return ` webhook-queue- ${ this . _docId } ` ;
}
private get _drainingQueue() {
return this . _webHookEventQueue . length >= MAX_QUEUE_SIZE ;
}
private async _pushToRedisQueue ( events : WebHookEvent [ ] ) {
const strings = events . map ( e = > JSON . stringify ( e ) ) ;
await this . _redisClient ! . rpushAsync ( this . _redisQueueKey , . . . strings ) ;
}
// TODO also push to redis queue
private async _getRedisQueue ( redisClient : RedisClient ) {
const strings = await redisClient . lrangeAsync ( this . _redisQueueKey , 0 , - 1 ) ;
if ( strings . length ) {
const events = strings . map ( s = > JSON . parse ( s ) ) ;
this . _webHookEventQueue . unshift ( . . . events ) ;
this . _startSendLoop ( ) ;
}
await redisClient . quitAsync ( ) ;
}
private _getRecordDeltas ( tableDelta : TableDelta ) : RecordDeltas {
@ -292,6 +354,21 @@ export class DocTriggers {
return url ;
}
private _startSendLoop() {
if ( ! this . _sending ) { // only run one loop at a time
this . _sending = true ;
this . _sendLoop ( ) . catch ( ( e ) = > { // run _sendLoop asynchronously (in the background)
log . error ( ` _sendLoop failed: ${ e } ` ) ;
this . _sending = false ; // otherwise the following line will complete instantly
this . _startSendLoop ( ) ; // restart the loop on failure
} ) ;
}
}
// Consumes the webhook event queue and sends HTTP requests.
// Should only be called if there are events to send.
// Managed by _startSendLoop. Runs in the background. Only one loop should run at a time.
// Runs until shutdown.
private async _sendLoop() {
log . info ( "Starting _sendLoop" ) ;
@ -311,12 +388,26 @@ export class DocTriggers {
} else {
success = await this . _sendWebhookWithRetries ( url , body ) ;
}
if ( success ) {
this . _webHookEventQueue . splice ( 0 , batch . length ) ;
// TODO also remove on redis
} else if ( ! this . _shuttingDown ) {
// TODO reorder queue on failure
this . _webHookEventQueue . splice ( 0 , batch . length ) ;
let multi : Multi | null = null ;
if ( this . _redisClient ) {
multi = this . _redisClient . multi ( ) ;
multi . ltrim ( this . _redisQueueKey , batch . length , - 1 ) ;
}
if ( ! success && ! this . _drainingQueue ) {
// Put the failed events at the end of the queue to try again later
// while giving other URLs a chance to receive events.
this . _webHookEventQueue . push ( . . . batch ) ;
if ( multi ) {
const strings = batch . map ( e = > JSON . stringify ( e ) ) ;
multi . rpush ( this . _redisQueueKey , . . . strings ) ;
}
}
await multi ? . execAsync ( ) ;
}
log . info ( "Ended _sendLoop" ) ;
@ -327,11 +418,28 @@ export class DocTriggers {
) ;
}
private get _redisClient() {
if ( this . _redisClientField ) {
return this . _redisClientField ;
}
const redisUrl = process . env . REDIS_URL ;
if ( redisUrl ) {
this . _redisClientField = createClient ( redisUrl ) ;
}
return this . _redisClientField ;
}
private get _maxWebhookAttempts() {
if ( this . _shuttingDown ) {
return 0 ;
}
return this . _drainingQueue ? 5 : 20 ;
}
private async _sendWebhookWithRetries ( url : string , body : string ) {
const maxAttempts = 20 ;
const maxWait = 64 ;
let wait = 1 ;
for ( let attempt = 0 ; attempt < maxAttempts ; attempt ++ ) {
for ( let attempt = 0 ; attempt < this . _ maxWebhook Attempts; attempt ++ ) {
if ( this . _shuttingDown ) {
return false ;
}
@ -352,7 +460,7 @@ export class DocTriggers {
}
// Don't wait any more if this is the last attempt.
if ( attempt >= maxAttempts - 1 ) {
if ( attempt >= this . _ maxWebhook Attempts - 1 ) {
return false ;
}