import {Directive, OptionDefinition} from '../../Directive' import {Inject, Injectable} from '../../../di' import {Bus, PushedToQueue, Queue} from '../../../support/bus' import {Queueables} from '../../../service/Queueables' @Injectable() export class ListenDirective extends Directive { @Inject() protected readonly queue!: Queue @Inject() protected readonly queueables!: Queueables @Inject() protected readonly bus!: Bus getDescription(): string { return 'listen for jobs pushed to the queue and attempt to execute them' } getKeywords(): string | string[] { return 'queue-listen' } getOptions(): OptionDefinition[] { return [] } async handle(): Promise { this.info('Subscribing to queue events...') await this.bus.subscribe(PushedToQueue, async () => { // A new job has been pushed to the queue, so try to pop it and execute it. // We may get undefined if some other worker is running and picked up this job first. await this.tryExecuteJob() }) this.info('Setting periodic poll...') const handle = setInterval(async () => { await this.tryExecuteJob() }, 5000) this.info('Listening for jobs...') await this.untilInterrupt() this.info('Shutting down...') clearInterval(handle) } protected async tryExecuteJob(): Promise { try { const job = await this.queue.pop() if ( !job ) { return // Some other worker already picked up this job } this.info(`Executing: ${job.constructor?.name || 'unknown job'}`) await job.execute() this.success('Execution finished.') } catch (e: unknown) { this.error('Failed to execute job.') this.error(e) } } }