2022-03-31 04:04:00 +00:00
|
|
|
import {AwareOfContainerLifecycle, Inject, Injectable, StaticInstantiable} from '../../di'
|
2022-01-27 01:37:54 +00:00
|
|
|
import {BusSubscriber, Event, EventBus, EventHandler, EventHandlerReturn, EventHandlerSubscription} from './types'
|
2022-03-30 22:24:45 +00:00
|
|
|
import {Awaitable, Collection, ifDebugging, Pipeline, uuid4} from '../../util'
|
2022-01-27 01:37:54 +00:00
|
|
|
import {Logging} from '../../service/Logging'
|
|
|
|
import {Bus, BusInternalSubscription} from './Bus'
|
2022-01-27 16:34:01 +00:00
|
|
|
import {getEventName} from './getEventName'
|
2022-02-23 21:15:02 +00:00
|
|
|
import {CanonicalItemClass} from '../CanonicalReceiver'
|
2022-01-27 01:37:54 +00:00
|
|
|
|
|
|
|
/**
|
|
|
|
* Non-connectable event bus implementation. Can forward events to the main Bus instance.
|
|
|
|
*/
|
|
|
|
@Injectable()
|
2022-03-31 04:04:00 +00:00
|
|
|
export class LocalBus<TEvent extends Event = Event> extends CanonicalItemClass implements EventBus<TEvent>, AwareOfContainerLifecycle {
|
|
|
|
awareOfContainerLifecycle: true = true
|
|
|
|
|
2022-01-27 01:37:54 +00:00
|
|
|
@Inject()
|
|
|
|
protected readonly logging!: Logging
|
|
|
|
|
|
|
|
@Inject()
|
|
|
|
protected readonly bus!: Bus
|
|
|
|
|
|
|
|
public readonly uuid = uuid4()
|
|
|
|
|
|
|
|
/** Local listeners subscribed to events on this bus. */
|
|
|
|
protected subscribers: Collection<BusSubscriber<TEvent>> = new Collection()
|
|
|
|
|
|
|
|
protected subscriptions: Collection<BusInternalSubscription> = new Collection()
|
|
|
|
|
|
|
|
/** True if the bus has been initialized. */
|
|
|
|
private isUp = false
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Push an event onto the bus.
|
|
|
|
* @param event
|
|
|
|
*/
|
|
|
|
async push(event: TEvent): Promise<void> {
|
|
|
|
if ( event.originBusUuid === this.uuid ) {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
if ( !event.originBusUuid ) {
|
|
|
|
event.originBusUuid = this.uuid
|
|
|
|
}
|
|
|
|
|
|
|
|
if ( await this.callSubscribers(event) ) {
|
|
|
|
// One of the subscribers halted propagation of the event
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
if ( await this.shouldBroadcast(event) ) {
|
|
|
|
await this.bus.push(event)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/** Returns true if the given event should be pushed to connected event busses. */
|
|
|
|
protected async shouldBroadcast(event: TEvent): Promise<boolean> {
|
|
|
|
if ( typeof event.shouldBroadcast === 'function' ) {
|
|
|
|
return event.shouldBroadcast()
|
|
|
|
}
|
|
|
|
|
|
|
|
return Boolean(event.shouldBroadcast)
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Call all local listeners for the given event. Returns true if the propagation
|
|
|
|
* of the event should be halted.
|
|
|
|
* @param event
|
|
|
|
* @protected
|
|
|
|
*/
|
|
|
|
protected async callSubscribers(event: TEvent): Promise<boolean> {
|
|
|
|
return this.subscribers
|
|
|
|
.filter(sub => event instanceof sub.eventKey)
|
|
|
|
.pluck('handler')
|
|
|
|
.toAsync()
|
|
|
|
.some(handler => handler(event))
|
|
|
|
}
|
|
|
|
|
|
|
|
/** Register a pipeline as an event handler. */
|
|
|
|
pipe<T extends TEvent>(eventKey: StaticInstantiable<T>, line: Pipeline<T, EventHandlerReturn>): Awaitable<EventHandlerSubscription> {
|
|
|
|
return this.subscribe(eventKey, event => line.apply(event))
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Subscribe to an event on the bus.
|
|
|
|
* @param eventKey
|
|
|
|
* @param handler
|
|
|
|
*/
|
|
|
|
async subscribe<T extends TEvent>(eventKey: StaticInstantiable<T>, handler: EventHandler<T>): Promise<EventHandlerSubscription> {
|
|
|
|
const uuid = uuid4()
|
|
|
|
|
|
|
|
this.subscribers.push({
|
2022-01-27 16:34:01 +00:00
|
|
|
eventName: getEventName(eventKey),
|
2022-01-27 01:37:54 +00:00
|
|
|
handler,
|
|
|
|
eventKey,
|
|
|
|
uuid,
|
|
|
|
} as unknown as BusSubscriber<TEvent>)
|
|
|
|
|
|
|
|
return {
|
|
|
|
unsubscribe: async () => {
|
|
|
|
this.subscribers = this.subscribers.where('uuid', '!=', uuid)
|
|
|
|
|
|
|
|
await this.subscriptions
|
|
|
|
.where('subscriberUuid', '=', uuid)
|
|
|
|
.tap(trashed => this.subscriptions.diffInPlace(trashed))
|
|
|
|
.pluck('subscription')
|
|
|
|
.awaitMapCall('unsubscribe')
|
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async up(): Promise<void> {
|
|
|
|
if ( this.isUp ) {
|
|
|
|
this.logging.warn('Attempted to boot more than once. Skipping.')
|
2022-03-30 22:24:45 +00:00
|
|
|
ifDebugging('extollo.bus', () => {
|
|
|
|
throw new Error('Attempted to boot more than once. Skipping.')
|
|
|
|
})
|
2022-01-27 01:37:54 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
this.isUp = true
|
|
|
|
}
|
|
|
|
|
2022-03-30 22:34:19 +00:00
|
|
|
isConnected(): boolean {
|
|
|
|
return this.isUp
|
|
|
|
}
|
|
|
|
|
2022-01-27 01:37:54 +00:00
|
|
|
/** Clean up this event bus. */
|
|
|
|
async down(): Promise<void> {
|
|
|
|
if ( !this.isUp ) {
|
|
|
|
this.logging.warn('Attempted to shut down but was never properly booted. Skipping.')
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
await this.subscriptions
|
|
|
|
.pluck('subscription')
|
|
|
|
.awaitMapCall('unsubscribe')
|
|
|
|
|
|
|
|
this.isUp = false
|
|
|
|
}
|
2022-03-31 04:04:00 +00:00
|
|
|
|
|
|
|
onContainerRelease(): Awaitable<void> {
|
2022-03-31 04:40:10 +00:00
|
|
|
if ( this.isUp ) {
|
|
|
|
this.down()
|
|
|
|
}
|
2022-03-31 04:04:00 +00:00
|
|
|
}
|
2022-01-27 01:37:54 +00:00
|
|
|
}
|