Add basic concepts for event bus, and implement in request and model
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2021-06-04 01:03:31 -05:00
parent dab3d006c8
commit 61731c4ebd
20 changed files with 375 additions and 52 deletions

11
src/event/Event.ts Normal file
View File

@@ -0,0 +1,11 @@
import {Dispatchable} from './types'
import {JSONState} from '../util'
/**
* Abstract class representing an event that may be fired.
*/
export abstract class Event implements Dispatchable {
abstract dehydrate(): Promise<JSONState>
abstract rehydrate(state: JSONState): void | Promise<void>
}

53
src/event/EventBus.ts Normal file
View File

@@ -0,0 +1,53 @@
import {Singleton, StaticClass} from '../di'
import {Bus, Dispatchable, EventSubscriber, EventSubscriberEntry, EventSubscription} from './types'
import {Awaitable, Collection, uuid4} from '../util'
/**
* A non-queued bus implementation that executes subscribers immediately in the main thread.
*/
@Singleton()
export class EventBus implements Bus {
/**
* Collection of subscribers, by their events.
* @protected
*/
protected subscribers: Collection<EventSubscriberEntry<any>> = new Collection<EventSubscriberEntry<any>>()
subscribe<T extends Dispatchable>(event: StaticClass<T, T>, subscriber: EventSubscriber<T>): Awaitable<EventSubscription> {
const entry: EventSubscriberEntry<T> = {
id: uuid4(),
event,
subscriber,
}
this.subscribers.push(entry)
return this.buildSubscription(entry.id)
}
unsubscribe<T extends Dispatchable>(subscriber: EventSubscriber<T>): Awaitable<void> {
this.subscribers = this.subscribers.where('subscriber', '!=', subscriber)
}
async dispatch(event: Dispatchable): Promise<void> {
const eventClass: StaticClass<typeof event, typeof event> = event.constructor as StaticClass<Dispatchable, Dispatchable>
await this.subscribers.where('event', '=', eventClass)
.promiseMap(entry => entry.subscriber(event))
}
/**
* Build an EventSubscription object for the subscriber of the given ID.
* @param id
* @protected
*/
protected buildSubscription(id: string): EventSubscription {
let subscribed = true
return {
unsubscribe: (): Awaitable<void> => {
if ( subscribed ) {
this.subscribers = this.subscribers.where('id', '!=', id)
subscribed = false
}
},
}
}
}

View File

@@ -0,0 +1,28 @@
import {EventBus} from './EventBus'
import {Collection} from '../util'
import {Bus, Dispatchable} from './types'
/**
* A non-queued bus implementation that executes subscribers immediately in the main thread.
* This bus also supports "propagating" events along to any other connected buses.
* Such behavior is useful, e.g., if we want to have a semi-isolated request-
* level bus whose events still reach the global EventBus instance.
*/
export class PropagatingEventBus extends EventBus {
protected recipients: Collection<Bus> = new Collection<Bus>()
async dispatch(event: Dispatchable): Promise<void> {
await super.dispatch(event)
await this.recipients.promiseMap(bus => bus.dispatch(event))
}
/**
* Register the given bus to receive events fired on this bus.
* @param recipient
*/
connect(recipient: Bus): void {
if ( !this.recipients.includes(recipient) ) {
this.recipients.push(recipient)
}
}
}

47
src/event/types.ts Normal file
View File

@@ -0,0 +1,47 @@
import {Awaitable, Rehydratable} from '../util'
import {StaticClass} from '../di'
/**
* A closure that should be executed with the given event is fired.
*/
export type EventSubscriber<T extends Dispatchable> = (event: T) => Awaitable<void>
/**
* An object used to track event subscriptions internally.
*/
export interface EventSubscriberEntry<T extends Dispatchable> {
/** Globally unique ID of this subscription. */
id: string
/** The event class subscribed to. */
event: StaticClass<T, T>
/** The closure to execute when the event is fired. */
subscriber: EventSubscriber<T>
}
/**
* An object returned upon subscription, used to unsubscribe.
*/
export interface EventSubscription {
/**
* Unsubscribe the associated listener from the event bus.
*/
unsubscribe(): Awaitable<void>
}
/**
* An instance of something that can be fired on an event bus.
*/
export interface Dispatchable extends Rehydratable {
shouldQueue?: boolean
}
/**
* An event-driven bus that manages subscribers and dispatched items.
*/
export interface Bus {
subscribe<T extends Dispatchable>(eventClass: StaticClass<T, T>, subscriber: EventSubscriber<T>): Awaitable<EventSubscription>
unsubscribe<T extends Dispatchable>(subscriber: EventSubscriber<T>): Awaitable<void>
dispatch(event: Dispatchable): Awaitable<void>
}