import {AwareOfContainerLifecycle, Inject, Singleton, StaticInstantiable} from '../../di' import { BusConnectorConfig, BusSubscriber, Event, EventBus, EventHandler, EventHandlerReturn, EventHandlerSubscription, } from './types' import {Awaitable, Collection, ifDebugging, Pipeline, uuid4} from '../../util' import {Logging} from '../../service/Logging' import {Unit} from '../../lifecycle/Unit' import {Config} from '../../service/Config' import {RedisBus} from './RedisBus' import {getEventName} from './getEventName' export interface BusInternalSubscription { busUuid: string subscriberUuid: string subscription: EventHandlerSubscription } /** * Propagating event bus implementation. */ @Singleton() export class Bus extends Unit implements EventBus, AwareOfContainerLifecycle { awareOfContainerLifecycle: true = true @Inject() protected readonly logging!: Logging @Inject() protected readonly config!: Config public readonly uuid = uuid4() /** Local listeners subscribed to events on this bus. */ protected subscribers: Collection> = new Collection() /** Connections to other event busses to be propagated. */ protected connectors: Collection = new Collection() protected subscriptions: Collection = new Collection() /** True if the bus has been initialized. */ private isUp = false /** * Push an event onto the bus. * @param event */ async push(event: TEvent): Promise { if ( event.originBusUuid === this.uuid ) { this.logging.verbose('Skipping propagation of event, because it originated from this bus.') return } if ( !event.originBusUuid ) { event.originBusUuid = this.uuid } this.logging.verbose('Raising event on process-local bus:') this.logging.verbose(event) if ( await this.callSubscribers(event) ) { // One of the subscribers halted propagation of the event this.logging.verbose('Process-local subscriber halted propagation of event.') return } if ( await this.shouldBroadcast(event) ) { this.logging.verbose('Raising event on connected busses...') await this.connectors.awaitMapCall('push', event) } else { this.logging.verbose('Will not broadcast event.') } } /** Returns true if the given event should be pushed to connected event busses. */ protected async shouldBroadcast(event: Event): Promise { if ( typeof event.shouldBroadcast === 'function' ) { return event.shouldBroadcast() } return Boolean(event.shouldBroadcast) } /** * Call all local listeners for the given event. Returns true if the propagation * of the event should be halted. * @param event * @protected */ protected async callSubscribers(event: TEvent): Promise { return this.subscribers .filter(sub => event instanceof sub.eventKey) .pluck('handler') .toAsync() .some(handler => handler(event)) } /** Register a pipeline as an event handler. */ pipe(eventKey: StaticInstantiable, line: Pipeline): Awaitable { return this.subscribe(eventKey, event => line.apply(event)) } /** * Subscribe to an event on the bus. * @param eventKey * @param handler */ async subscribe(eventKey: StaticInstantiable, handler: EventHandler): Promise { const uuid = uuid4() this.subscribers.push({ eventName: getEventName(eventKey), handler, eventKey, uuid, } as unknown as BusSubscriber) this.subscriptions.concat(await this.connectors .promiseMap(async bus => { this.logging.verbose(`Connecting subscriber to bus ${bus.constructor?.name}#${bus.uuid}...`) return { busUuid: bus.uuid, subscriberUuid: uuid, subscription: await bus.subscribe(eventKey, (event: T) => { if ( event.originBusUuid !== this.uuid ) { return handler(event) } }), } })) return { unsubscribe: async () => { this.subscribers = this.subscribers.where('uuid', '!=', uuid) await this.subscriptions .where('subscriberUuid', '=', uuid) .tap(trashed => this.subscriptions.diffInPlace(trashed)) .pluck('subscription') .awaitMapCall('unsubscribe') }, } } /** Connect an external event bus to this bus. */ async connect(bus: EventBus): Promise { if ( this.isUp && !bus.isConnected() ) { await bus.up() } this.connectors.push(bus) await this.subscribers .promiseMap(async subscriber => { return { busUuid: bus.uuid, subscriberUuid: subscriber.uuid, subscription: await bus.subscribe(subscriber.eventKey, event => { if ( event.originBusUuid !== this.uuid ) { return subscriber.handler(event) } }), } }) } isConnected(): boolean { return this.isUp } async disconnect(bus: EventBus): Promise { await this.subscriptions .where('busUuid', '=', bus.uuid) .tap(trashed => this.subscriptions.diffInPlace(trashed)) .pluck('subscription') .awaitMapCall('unsubscribe') if ( this.isUp ) { await bus.down() } this.connectors.diffInPlace([bus]) } /** Initialize this event bus. */ async up(createUpstreamFromConfig = true): Promise { if ( this.isUp ) { this.logging.warn('Attempted to boot more than once. Skipping.') ifDebugging('extollo.bus', () => { throw new Error('Attempted to boot more than once. Skipping.') }) return } // Read the connectors from the server.bus config and set them up if ( createUpstreamFromConfig ) { const config = this.config.get('server.bus', {}) if ( Array.isArray(config.connectors) ) { for ( const connector of (config.connectors as BusConnectorConfig[]) ) { this.logging.verbose(`Creating bus connection of type: ${connector.type}`) if ( connector.type === 'redis' ) { await this.connect(this.make(RedisBus)) } } } } await this.connectors .filter(bus => !bus.isConnected()) .awaitMapCall('up') this.isUp = true } /** Clean up this event bus. */ async down(): Promise { if ( !this.isUp ) { this.logging.warn('Attempted to shut down but was never properly booted. Skipping.') return } await this.subscriptions .pluck('subscription') .awaitMapCall('unsubscribe') await this.connectors.awaitMapCall('down') this.isUp = false } onContainerDestroy(): Awaitable { if ( this.isUp ) { this.down() } } }