lib/src/support/bus/Bus.ts

242 lines
7.7 KiB
TypeScript

import {AwareOfContainerLifecycle, Inject, Singleton, StaticInstantiable} from '../../di'
import {
BusConnectorConfig,
BusSubscriber,
Event,
EventBus,
EventHandler,
EventHandlerReturn,
EventHandlerSubscription,
} from './types'
import {Awaitable, Collection, ifDebugging, Pipeline, uuid4} from '../../util'
import {Logging} from '../../service/Logging'
import {Unit} from '../../lifecycle/Unit'
import {Config} from '../../service/Config'
import {RedisBus} from './RedisBus'
import {getEventName} from './getEventName'
export interface BusInternalSubscription {
busUuid: string
subscriberUuid: string
subscription: EventHandlerSubscription
}
/**
* Propagating event bus implementation.
*/
@Singleton()
export class Bus<TEvent extends Event = Event> extends Unit implements EventBus<TEvent>, AwareOfContainerLifecycle {
awareOfContainerLifecycle: true = true
@Inject()
protected readonly logging!: Logging
@Inject()
protected readonly config!: Config
public readonly uuid = uuid4()
/** Local listeners subscribed to events on this bus. */
protected subscribers: Collection<BusSubscriber<Event>> = new Collection()
/** Connections to other event busses to be propagated. */
protected connectors: Collection<EventBus> = new Collection()
protected subscriptions: Collection<BusInternalSubscription> = new Collection()
/** True if the bus has been initialized. */
private isUp = false
/**
* Push an event onto the bus.
* @param event
*/
async push(event: TEvent): Promise<void> {
if ( event.originBusUuid === this.uuid ) {
this.logging.verbose('Skipping propagation of event, because it originated from this bus.')
return
}
if ( !event.originBusUuid ) {
event.originBusUuid = this.uuid
}
this.logging.verbose('Raising event on process-local bus:')
this.logging.verbose(event)
if ( await this.callSubscribers(event) ) {
// One of the subscribers halted propagation of the event
this.logging.verbose('Process-local subscriber halted propagation of event.')
return
}
if ( await this.shouldBroadcast(event) ) {
this.logging.verbose('Raising event on connected busses...')
await this.connectors.awaitMapCall('push', event)
} else {
this.logging.verbose('Will not broadcast event.')
}
}
/** Returns true if the given event should be pushed to connected event busses. */
protected async shouldBroadcast(event: Event): Promise<boolean> {
if ( typeof event.shouldBroadcast === 'function' ) {
return event.shouldBroadcast()
}
return Boolean(event.shouldBroadcast)
}
/**
* Call all local listeners for the given event. Returns true if the propagation
* of the event should be halted.
* @param event
* @protected
*/
protected async callSubscribers(event: TEvent): Promise<boolean> {
return this.subscribers
.filter(sub => event instanceof sub.eventKey)
.pluck('handler')
.toAsync()
.some(handler => handler(event))
}
/** Register a pipeline as an event handler. */
pipe<T extends TEvent>(eventKey: StaticInstantiable<T>, line: Pipeline<T, EventHandlerReturn>): Awaitable<EventHandlerSubscription> {
return this.subscribe(eventKey, event => line.apply(event))
}
/**
* Subscribe to an event on the bus.
* @param eventKey
* @param handler
*/
async subscribe<T extends TEvent>(eventKey: StaticInstantiable<T>, handler: EventHandler<T>): Promise<EventHandlerSubscription> {
const uuid = uuid4()
this.subscribers.push({
eventName: getEventName(eventKey),
handler,
eventKey,
uuid,
} as unknown as BusSubscriber<Event>)
this.subscriptions.concat(await this.connectors
.promiseMap<BusInternalSubscription>(async bus => {
this.logging.verbose(`Connecting subscriber to bus ${bus.constructor?.name}#${bus.uuid}...`)
return {
busUuid: bus.uuid,
subscriberUuid: uuid,
subscription: await bus.subscribe(eventKey, (event: T) => {
if ( event.originBusUuid !== this.uuid ) {
return handler(event)
}
}),
}
}))
return {
unsubscribe: async () => {
this.subscribers = this.subscribers.where('uuid', '!=', uuid)
await this.subscriptions
.where('subscriberUuid', '=', uuid)
.tap(trashed => this.subscriptions.diffInPlace(trashed))
.pluck('subscription')
.awaitMapCall('unsubscribe')
},
}
}
/** Connect an external event bus to this bus. */
async connect(bus: EventBus): Promise<void> {
if ( this.isUp && !bus.isConnected() ) {
await bus.up()
}
this.connectors.push(bus)
await this.subscribers
.promiseMap<BusInternalSubscription>(async subscriber => {
return {
busUuid: bus.uuid,
subscriberUuid: subscriber.uuid,
subscription: await bus.subscribe(subscriber.eventKey, event => {
if ( event.originBusUuid !== this.uuid ) {
return subscriber.handler(event)
}
}),
}
})
}
isConnected(): boolean {
return this.isUp
}
async disconnect(bus: EventBus): Promise<void> {
await this.subscriptions
.where('busUuid', '=', bus.uuid)
.tap(trashed => this.subscriptions.diffInPlace(trashed))
.pluck('subscription')
.awaitMapCall('unsubscribe')
if ( this.isUp ) {
await bus.down()
}
this.connectors.diffInPlace([bus])
}
/** Initialize this event bus. */
async up(createUpstreamFromConfig = true): Promise<void> {
if ( this.isUp ) {
this.logging.warn('Attempted to boot more than once. Skipping.')
ifDebugging('extollo.bus', () => {
throw new Error('Attempted to boot more than once. Skipping.')
})
return
}
// Read the connectors from the server.bus config and set them up
if ( createUpstreamFromConfig ) {
const config = this.config.get('server.bus', {})
if ( Array.isArray(config.connectors) ) {
for ( const connector of (config.connectors as BusConnectorConfig[]) ) {
this.logging.verbose(`Creating bus connection of type: ${connector.type}`)
if ( connector.type === 'redis' ) {
await this.connect(this.make(RedisBus))
}
}
}
}
await this.connectors
.filter(bus => !bus.isConnected())
.awaitMapCall('up')
this.isUp = true
}
/** Clean up this event bus. */
async down(): Promise<void> {
if ( !this.isUp ) {
this.logging.warn('Attempted to shut down but was never properly booted. Skipping.')
return
}
await this.subscriptions
.pluck('subscription')
.awaitMapCall('unsubscribe')
await this.connectors.awaitMapCall('down')
this.isUp = false
}
onContainerDestroy(): Awaitable<void> {
if ( this.isUp ) {
this.down()
}
}
}