From 16e5fa00aa1571ea32747cda2388a55dfbd022d0 Mon Sep 17 00:00:00 2001 From: garrettmills Date: Thu, 27 Jan 2022 10:34:01 -0600 Subject: [PATCH] Implement queue work and listen commands --- src/cli/Directive.ts | 18 ++++++ src/cli/directive/queue/ListenDirective.ts | 64 +++++++++++++++++++ src/cli/directive/queue/WorkDirective.ts | 4 ++ src/cli/index.ts | 3 + src/cli/service/CommandLineApplication.ts | 4 +- src/di/Container.ts | 10 +-- src/service/Queueables.ts | 3 +- src/support/bus/Bus.ts | 37 ++++++++++- src/support/bus/LocalBus.ts | 3 +- src/support/bus/RedisBus.ts | 21 +++++- src/support/bus/getEventName.ts | 35 ++++++++++ src/support/bus/index.ts | 2 + src/support/bus/queue/BaseJob.ts | 13 ++++ src/support/bus/queue/Queue.ts | 4 +- src/support/bus/queue/event/PushedToQueue.ts | 3 +- src/support/bus/queue/event/PushingToQueue.ts | 3 +- .../bus/queue/event/QueueEventSerializer.ts | 43 +++++++++++++ .../serial/SimpleCanonicalItemSerializer.ts | 2 + src/support/bus/types.ts | 16 ++++- 19 files changed, 271 insertions(+), 17 deletions(-) create mode 100644 src/cli/directive/queue/ListenDirective.ts create mode 100644 src/support/bus/getEventName.ts create mode 100644 src/support/bus/queue/BaseJob.ts create mode 100644 src/support/bus/queue/event/QueueEventSerializer.ts diff --git a/src/cli/Directive.ts b/src/cli/Directive.ts index 2c9c068..32de7c1 100644 --- a/src/cli/Directive.ts +++ b/src/cli/Directive.ts @@ -468,4 +468,22 @@ export abstract class Directive extends AppClass { protected nativeOutput(...outputs: any[]): void { console.log(...outputs) // eslint-disable-line no-console } + + /** + * Get a promise that resolves after SIGINT is received, executing a + * callback beforehand. + * @param callback + * @protected + */ + protected async untilInterrupt(callback?: () => unknown): Promise { + return new Promise(res => { + process.on('SIGINT', async () => { + if ( callback ) { + await callback() + } + + res() + }) + }) + } } diff --git a/src/cli/directive/queue/ListenDirective.ts b/src/cli/directive/queue/ListenDirective.ts new file mode 100644 index 0000000..c26b208 --- /dev/null +++ b/src/cli/directive/queue/ListenDirective.ts @@ -0,0 +1,64 @@ +import {Directive, OptionDefinition} from '../../Directive' +import {Inject, Injectable} from '../../../di' +import {Bus, PushedToQueue, Queue} from '../../../support/bus' +import {Queueables} from '../../../service/Queueables' + +@Injectable() +export class ListenDirective extends Directive { + @Inject() + protected readonly queue!: Queue + + @Inject() + protected readonly queueables!: Queueables + + @Inject() + protected readonly bus!: Bus + + getDescription(): string { + return 'listen for jobs pushed to the queue and attempt to execute them' + } + + getKeywords(): string | string[] { + return 'queue-listen' + } + + getOptions(): OptionDefinition[] { + return [] + } + + async handle(): Promise { + this.info('Subscribing to queue events...') + await this.bus.subscribe(PushedToQueue, async () => { + // A new job has been pushed to the queue, so try to pop it and execute it. + // We may get undefined if some other worker is running and picked up this job first. + await this.tryExecuteJob() + }) + + this.info('Setting periodic poll...') + const handle = setInterval(async () => { + await this.tryExecuteJob() + }, 5000) + + this.info('Listening for jobs...') + await this.untilInterrupt() + + this.info('Shutting down...') + clearInterval(handle) + } + + protected async tryExecuteJob(): Promise { + try { + const job = await this.queue.pop() + if ( !job ) { + return // Some other worker already picked up this job + } + + this.info(`Executing: ${job.constructor?.name || 'unknown job'}`) + await job.execute() + this.success('Execution finished.') + } catch (e: unknown) { + this.error('Failed to execute job.') + this.error(e) + } + } +} diff --git a/src/cli/directive/queue/WorkDirective.ts b/src/cli/directive/queue/WorkDirective.ts index 2fb6343..d0b3c0f 100644 --- a/src/cli/directive/queue/WorkDirective.ts +++ b/src/cli/directive/queue/WorkDirective.ts @@ -1,12 +1,16 @@ import {Directive, OptionDefinition} from '../../Directive' import {Inject, Injectable} from '../../../di' import {Queue} from '../../../support/bus' +import {Queueables} from '../../../service/Queueables' @Injectable() export class WorkDirective extends Directive { @Inject() protected readonly queue!: Queue + @Inject() + protected readonly queueables!: Queueables + getDescription(): string { return 'pop a single item from the queue and execute it' } diff --git a/src/cli/index.ts b/src/cli/index.ts index a1a5b07..ecb1e92 100644 --- a/src/cli/index.ts +++ b/src/cli/index.ts @@ -13,3 +13,6 @@ export * from './directive/TemplateDirective' export * from './directive/UsageDirective' export * from './decorators' + +export * from './directive/queue/ListenDirective' +export * from './directive/queue/WorkDirective' diff --git a/src/cli/service/CommandLineApplication.ts b/src/cli/service/CommandLineApplication.ts index d249c50..d604b93 100644 --- a/src/cli/service/CommandLineApplication.ts +++ b/src/cli/service/CommandLineApplication.ts @@ -1,6 +1,6 @@ import {Unit} from '../../lifecycle/Unit' import {Logging} from '../../service/Logging' -import {Singleton, Inject} from '../../di/decorator/injection' +import {Singleton, Inject} from '../../di' import {CommandLine} from './CommandLine' import {UsageDirective} from '../directive/UsageDirective' import {Directive} from '../Directive' @@ -10,6 +10,7 @@ import {RunDirective} from '../directive/RunDirective' import {RoutesDirective} from '../directive/RoutesDirective' import {RouteDirective} from '../directive/RouteDirective' import {WorkDirective} from '../directive/queue/WorkDirective' +import {ListenDirective} from '../directive/queue/ListenDirective' /** * Unit that takes the place of the final unit in the application that handles @@ -48,6 +49,7 @@ export class CommandLineApplication extends Unit { this.cli.registerDirective(RoutesDirective) this.cli.registerDirective(RouteDirective) this.cli.registerDirective(WorkDirective) + this.cli.registerDirective(ListenDirective) const argv = process.argv.slice(2) const match = this.cli.getDirectives() diff --git a/src/di/Container.ts b/src/di/Container.ts index 3c0dd14..3a01ee6 100644 --- a/src/di/Container.ts +++ b/src/di/Container.ts @@ -55,10 +55,6 @@ export class Container { .resolve() .map(factory => container.registerFactory(factory)) - ContainerBlueprint.getContainerBlueprint() - .resolveConstructable() - .map((factory: StaticClass, any>) => console.log(factory)) // eslint-disable-line no-console - ContainerBlueprint.getContainerBlueprint() .resolveConstructable() .map((factory: StaticClass, any>) => container.registerFactory(container.make(factory))) @@ -335,7 +331,7 @@ export class Container { if ( factory ) { return factory } else { - logIfDebugging('extollo.di.injector', 'unable to resolve factory', factory, this.factories) + logIfDebugging('extollo.di.injector', 'unable to resolve factory', key, factory, this.factories) } } @@ -388,6 +384,8 @@ export class Container { * @param {array} parameters */ protected produceFactory(factory: AbstractFactory, parameters: any[]): T { + logIfDebugging('extollo.di.injector', 'Make stack', Container.makeStack) + // Create the dependencies for the factory const keys = factory.getDependencyKeys().filter(req => this.hasKey(req.key)) const dependencies = keys.map(req => { @@ -415,7 +413,9 @@ export class Container { // Produce a new instance const inst = factory.produce(constructorArguments, params.reverse().all()) + logIfDebugging('extollo.di.injector', 'Resolving dependencies for factory', factory) factory.getInjectedProperties().each(dependency => { + logIfDebugging('extollo.di.injector', 'Resolving injected dependency:', dependency) if ( dependency.key && inst ) { (inst as any)[dependency.property] = this.resolveAndCreate(dependency.key) } diff --git a/src/service/Queueables.ts b/src/service/Queueables.ts index 4bdc76c..5a4d625 100644 --- a/src/service/Queueables.ts +++ b/src/service/Queueables.ts @@ -3,7 +3,8 @@ import {Singleton, Instantiable} from '../di' import {Queueable} from '../support/bus' /** - * A canonical unit that resolves Queueable classes from `app/jobs`. + * A canonical unit that resolves Queueable classes from `app/jobs` and sets up + * any non-default queues. */ @Singleton() export class Queueables extends CanonicalStatic> { diff --git a/src/support/bus/Bus.ts b/src/support/bus/Bus.ts index b64d5e2..fb3e9df 100644 --- a/src/support/bus/Bus.ts +++ b/src/support/bus/Bus.ts @@ -1,8 +1,19 @@ import {Inject, Singleton, StaticInstantiable} from '../../di' -import {BusSubscriber, Event, EventBus, EventHandler, EventHandlerReturn, EventHandlerSubscription} from './types' +import { + BusConnectorConfig, + BusSubscriber, + Event, + EventBus, + EventHandler, + EventHandlerReturn, + EventHandlerSubscription, +} from './types' import {Awaitable, Collection, Pipeline, uuid4} from '../../util' import {Logging} from '../../service/Logging' import {Unit} from '../../lifecycle/Unit' +import {Config} from '../../service/Config' +import {RedisBus} from './RedisBus' +import {getEventName} from './getEventName' export interface BusInternalSubscription { busUuid: string @@ -18,6 +29,9 @@ export class Bus extends Unit implements EventBus< @Inject() protected readonly logging!: Logging + @Inject() + protected readonly config!: Config + public readonly uuid = uuid4() /** Local listeners subscribed to events on this bus. */ @@ -37,6 +51,7 @@ export class Bus extends Unit implements EventBus< */ async push(event: TEvent): Promise { if ( event.originBusUuid === this.uuid ) { + this.logging.verbose('Skipping propagation of event, because it originated from this bus.') return } @@ -44,13 +59,19 @@ export class Bus extends Unit implements EventBus< event.originBusUuid = this.uuid } + this.logging.verbose('Raising event on process-local bus:') + this.logging.verbose(event) if ( await this.callSubscribers(event) ) { // One of the subscribers halted propagation of the event + this.logging.verbose('Process-local subscriber halted propagation of event.') return } if ( await this.shouldBroadcast(event) ) { + this.logging.verbose('Raising event on connected busses...') await this.connectors.awaitMapCall('push', event) + } else { + this.logging.verbose('Will not broadcast event.') } } @@ -91,7 +112,7 @@ export class Bus extends Unit implements EventBus< const uuid = uuid4() this.subscribers.push({ - eventName: eventKey.prototype.eventName, // FIXME this is not working + eventName: getEventName(eventKey), handler, eventKey, uuid, @@ -99,6 +120,7 @@ export class Bus extends Unit implements EventBus< this.subscriptions.concat(await this.connectors .promiseMap(async bus => { + this.logging.verbose(`Connecting subscriber to bus ${bus.constructor?.name}#${bus.uuid}...`) return { busUuid: bus.uuid, subscriberUuid: uuid, @@ -166,6 +188,17 @@ export class Bus extends Unit implements EventBus< return } + // Read the connectors from the server.bus config and set them up + const config = this.config.get('server.bus', {}) + if ( Array.isArray(config.connectors) ) { + for ( const connector of (config.connectors as BusConnectorConfig[]) ) { + this.logging.info(`Creating bus connection of type: ${connector.type}`) + if ( connector.type === 'redis' ) { + await this.connect(this.make(RedisBus)) + } + } + } + await this.connectors.awaitMapCall('up') this.isUp = true diff --git a/src/support/bus/LocalBus.ts b/src/support/bus/LocalBus.ts index 25d7648..270203f 100644 --- a/src/support/bus/LocalBus.ts +++ b/src/support/bus/LocalBus.ts @@ -4,6 +4,7 @@ import {Awaitable, Collection, Pipeline, uuid4} from '../../util' import {Logging} from '../../service/Logging' import {Bus, BusInternalSubscription} from './Bus' import {AppClass} from '../../lifecycle/AppClass' +import {getEventName} from './getEventName' /** * Non-connectable event bus implementation. Can forward events to the main Bus instance. @@ -86,7 +87,7 @@ export class LocalBus extends AppClass implements const uuid = uuid4() this.subscribers.push({ - eventName: eventKey.prototype.eventName, + eventName: getEventName(eventKey), handler, eventKey, uuid, diff --git a/src/support/bus/RedisBus.ts b/src/support/bus/RedisBus.ts index eeff920..ae961e3 100644 --- a/src/support/bus/RedisBus.ts +++ b/src/support/bus/RedisBus.ts @@ -1,9 +1,11 @@ import {BusSubscriber, Event, EventBus, EventHandler, EventHandlerReturn, EventHandlerSubscription} from './types' -import {Inject, Injectable, StaticInstantiable} from '../../di' +import {Container, Inject, Injectable, StaticInstantiable} from '../../di' import {Awaitable, Collection, Pipeline, uuid4} from '../../util' import {Redis} from '../redis/Redis' import {Serialization} from './serial/Serialization' import * as IORedis from 'ioredis' +import {Logging} from '../../service/Logging' +import {getEventName} from './getEventName' /** * Event bus implementation that does pub/sub over a Redis connection. @@ -16,6 +18,12 @@ export class RedisBus implements EventBus { @Inject() protected readonly serial!: Serialization + @Inject() + protected readonly logging!: Logging + + @Inject() + protected readonly injector!: Container + public readonly uuid = uuid4() /** List of events for which we have created Redis channel subscriptions. */ @@ -40,18 +48,24 @@ export class RedisBus implements EventBus { const channel = `ex-event-${event.eventName}` const json = await this.serial.encodeJSON(event) + this.logging.verbose(`Pushing event to channel: ${channel}`) + this.logging.verbose(json) + await this.publisherConnection.publish(channel, json) } async subscribe(eventKey: StaticInstantiable, handler: EventHandler): Promise { const uuid = uuid4() const subscriber: BusSubscriber = { - eventName: eventKey.prototype.eventName, + eventName: getEventName(eventKey), eventKey, handler, uuid, } as unknown as BusSubscriber + this.logging.verbose(`Creating subscriber ${uuid}...`) + this.logging.verbose(subscriber) + if ( !this.internalSubscriptions.includes(subscriber.eventName) ) { await new Promise((res, rej) => { if ( !this.subscriberConnection ) { @@ -63,6 +77,7 @@ export class RedisBus implements EventBus { return rej(err) } + this.logging.verbose('Successfully subscribed to channel on Redis...') res() }) }) @@ -95,10 +110,12 @@ export class RedisBus implements EventBus { this.subscriberConnection.on('message', (channel: string, message: string) => { if ( !channel.startsWith('ex-event-') ) { + this.logging.debug(`Ignoring message on invalid channel: ${channel}`) return } const name = channel.substr('ex-event-'.length) + this.logging.verbose(`Received event ${name} on channel ${channel}`) this.handleEvent(name, message) }) } diff --git a/src/support/bus/getEventName.ts b/src/support/bus/getEventName.ts new file mode 100644 index 0000000..bf9be54 --- /dev/null +++ b/src/support/bus/getEventName.ts @@ -0,0 +1,35 @@ +import {Event} from './types' +import {Container, StaticInstantiable} from '../../di' +import {ErrorWithContext} from '../../util' + +export function getEventName(eventKey: StaticInstantiable): string { + const protoName = eventKey.prototype.eventName + if ( protoName ) { + return protoName + } + + try { + const inst = Container.getContainer().make(eventKey) + if ( inst.eventName ) { + return inst.eventName + } + } catch (e: unknown) {} // eslint-disable-line no-empty + + let stringParseName = eventKey.toString() + .split('\n') + .map(x => x.trim()) + .filter(x => x.startsWith('this.eventName = \'') || x.startsWith('this.eventName = "'))?.[0] + ?.split('=')?.[1] + ?.trim() + + if ( stringParseName ) { + stringParseName = stringParseName.endsWith(';') ? stringParseName.slice(1, -2) : stringParseName.slice(1, -1) + if ( stringParseName ) { + return stringParseName + } + } + + throw new ErrorWithContext('Unable to determine eventName from eventKey', { + eventKey, + }) +} diff --git a/src/support/bus/index.ts b/src/support/bus/index.ts index 4fa10c6..18c6b0f 100644 --- a/src/support/bus/index.ts +++ b/src/support/bus/index.ts @@ -11,6 +11,8 @@ export * from './RedisBus' export * from './queue/event/PushingToQueue' export * from './queue/event/PushedToQueue' +export * from './queue/event/QueueEventSerializer' +export * from './queue/BaseJob' export * from './queue/Queue' export * from './queue/CacheQueue' export * from './queue/SyncQueue' diff --git a/src/support/bus/queue/BaseJob.ts b/src/support/bus/queue/BaseJob.ts new file mode 100644 index 0000000..8e3df65 --- /dev/null +++ b/src/support/bus/queue/BaseJob.ts @@ -0,0 +1,13 @@ +import {Queueable} from '../types' +import {Awaitable} from '../../../util' +import {Inject, Injectable} from '../../../di' +import {Logging} from '../../../service/Logging' +import {CanonicalItemClass} from '../../CanonicalReceiver' + +@Injectable() +export abstract class BaseJob extends CanonicalItemClass implements Queueable { + @Inject() + protected readonly logging!: Logging + + abstract execute(): Awaitable +} diff --git a/src/support/bus/queue/Queue.ts b/src/support/bus/queue/Queue.ts index f587242..686993b 100644 --- a/src/support/bus/queue/Queue.ts +++ b/src/support/bus/queue/Queue.ts @@ -16,9 +16,9 @@ export abstract class Queue implements BusQueue { async dispatch(item: T): Promise { if ( shouldQueue(item) ) { - await this.bus.push(new PushingToQueue(item)) + await this.bus.push(new PushingToQueue(item, this.name)) await this.push(item) - await this.bus.push(new PushedToQueue(item)) + await this.bus.push(new PushedToQueue(item, this.name)) return } diff --git a/src/support/bus/queue/event/PushedToQueue.ts b/src/support/bus/queue/event/PushedToQueue.ts index ff78cc3..49462d3 100644 --- a/src/support/bus/queue/event/PushedToQueue.ts +++ b/src/support/bus/queue/event/PushedToQueue.ts @@ -5,7 +5,7 @@ import {uuid4} from '../../../../util' * Event fired after an item is pushed to the queue. */ export class PushedToQueue> implements Event { - public readonly eventName = '@extollo/lib:PushedToQueue' + public readonly eventName = '@extollo/lib.PushedToQueue' public readonly eventUuid = uuid4() @@ -13,5 +13,6 @@ export class PushedToQueue> implements Event { constructor( public readonly item: T, + public readonly queueName: string, ) {} } diff --git a/src/support/bus/queue/event/PushingToQueue.ts b/src/support/bus/queue/event/PushingToQueue.ts index 5a32b1b..dc0ade6 100644 --- a/src/support/bus/queue/event/PushingToQueue.ts +++ b/src/support/bus/queue/event/PushingToQueue.ts @@ -5,7 +5,7 @@ import {uuid4} from '../../../../util' * Event fired before an item is pushed to the queue. */ export class PushingToQueue> implements Event { - public readonly eventName = '@extollo/lib:PushingToQueue' + public readonly eventName = '@extollo/lib.PushingToQueue' public readonly eventUuid = uuid4() @@ -13,5 +13,6 @@ export class PushingToQueue> implements Event { constructor( public readonly item: T, + public readonly queueName: string, ) {} } diff --git a/src/support/bus/queue/event/QueueEventSerializer.ts b/src/support/bus/queue/event/QueueEventSerializer.ts new file mode 100644 index 0000000..d83b914 --- /dev/null +++ b/src/support/bus/queue/event/QueueEventSerializer.ts @@ -0,0 +1,43 @@ +import {PushedToQueue} from './PushedToQueue' +import {Queueable, SerialPayload, ShouldQueue} from '../../types' +import {PushingToQueue} from './PushingToQueue' +import {BaseSerializer} from '../../serial/BaseSerializer' +import {JSONState} from '../../../../util' +import {ObjectSerializer} from '../../serial/decorators' + +export type QueueEvent = PushedToQueue> | PushingToQueue> + +export interface QueueEventSerialPayload extends JSONState { + eventName: '@extollo/lib.PushedToQueue' | '@extollo/lib.PushingToQueue' + itemPayload: SerialPayload + queueName: string +} + +@ObjectSerializer() +export class QueueEventSerializer extends BaseSerializer { + protected async decodeSerial(serial: QueueEventSerialPayload): Promise { + const item = await this.getSerialization().decode(serial.itemPayload) + + if ( serial.eventName === '@extollo/lib.PushedToQueue' ) { + return new PushedToQueue(item as ShouldQueue, serial.queueName) + } else { + return new PushingToQueue(item as ShouldQueue, serial.queueName) + } + } + + protected async encodeActual(actual: QueueEvent): Promise { + return { + eventName: actual.eventName, + queueName: actual.queueName, + itemPayload: await this.getSerialization().encode(actual.item), + } + } + + protected getName(): string { + return '@extollo/lib.QueueEventSerializer' + } + + matchActual(some: QueueEvent): boolean { + return some instanceof PushedToQueue || some instanceof PushingToQueue + } +} diff --git a/src/support/bus/serial/SimpleCanonicalItemSerializer.ts b/src/support/bus/serial/SimpleCanonicalItemSerializer.ts index 633aadd..df684f8 100644 --- a/src/support/bus/serial/SimpleCanonicalItemSerializer.ts +++ b/src/support/bus/serial/SimpleCanonicalItemSerializer.ts @@ -3,6 +3,7 @@ import {BaseSerializer} from './BaseSerializer' import {Awaitable, ErrorWithContext, JSONState, Rehydratable} from '../../../util' import {Container, Inject, Injectable} from '../../../di' import {Canon} from '../../../service/Canon' +import {ObjectSerializer} from './decorators' /** State encoded by this class. */ export interface SimpleCanonicalItemSerialState extends JSONState { @@ -15,6 +16,7 @@ export interface SimpleCanonicalItemSerialState extends JSONState { * These instances must be CanonicalItemClass instances and take no constructor parameters. * If the instance is Rehydratable, then the state will be (re-)stored. */ +@ObjectSerializer() @Injectable() export class SimpleCanonicalItemSerializer extends BaseSerializer { @Inject() diff --git a/src/support/bus/types.ts b/src/support/bus/types.ts index 8b2b132..6350fa8 100644 --- a/src/support/bus/types.ts +++ b/src/support/bus/types.ts @@ -1,5 +1,5 @@ import {Awaitable, JSONState, Maybe, Pipeline, TypeTag, uuid4} from '../../util' -import {StaticInstantiable} from '../../di' +import {Instantiable, StaticInstantiable} from '../../di' // eslint-disable-next-line @typescript-eslint/no-unused-vars export interface SerialPayload extends JSONState { @@ -92,3 +92,17 @@ export interface BusQueue { pop(): Promise> } + +export interface RedisBusConnectorConfig { + type: 'redis' +} + +export type BusConnectorConfig = RedisBusConnectorConfig + +export interface QueueConfig { + driver?: Instantiable, + /* queues?: ({ + name: string, + driver: Instantiable, + })[] */ +}