diff --git a/src/event/Event.ts b/src/event/Event.ts new file mode 100644 index 0000000..958c4fa --- /dev/null +++ b/src/event/Event.ts @@ -0,0 +1,11 @@ +import {Dispatchable} from './types' +import {JSONState} from '../util' + +/** + * Abstract class representing an event that may be fired. + */ +export abstract class Event implements Dispatchable { + abstract dehydrate(): Promise + + abstract rehydrate(state: JSONState): void | Promise +} diff --git a/src/event/EventBus.ts b/src/event/EventBus.ts new file mode 100644 index 0000000..5620ab9 --- /dev/null +++ b/src/event/EventBus.ts @@ -0,0 +1,53 @@ +import {Singleton, StaticClass} from '../di' +import {Bus, Dispatchable, EventSubscriber, EventSubscriberEntry, EventSubscription} from './types' +import {Awaitable, Collection, uuid4} from '../util' + +/** + * A non-queued bus implementation that executes subscribers immediately in the main thread. + */ +@Singleton() +export class EventBus implements Bus { + /** + * Collection of subscribers, by their events. + * @protected + */ + protected subscribers: Collection> = new Collection>() + + subscribe(event: StaticClass, subscriber: EventSubscriber): Awaitable { + const entry: EventSubscriberEntry = { + id: uuid4(), + event, + subscriber, + } + + this.subscribers.push(entry) + return this.buildSubscription(entry.id) + } + + unsubscribe(subscriber: EventSubscriber): Awaitable { + this.subscribers = this.subscribers.where('subscriber', '!=', subscriber) + } + + async dispatch(event: Dispatchable): Promise { + const eventClass: StaticClass = event.constructor as StaticClass + await this.subscribers.where('event', '=', eventClass) + .promiseMap(entry => entry.subscriber(event)) + } + + /** + * Build an EventSubscription object for the subscriber of the given ID. + * @param id + * @protected + */ + protected buildSubscription(id: string): EventSubscription { + let subscribed = true + return { + unsubscribe: (): Awaitable => { + if ( subscribed ) { + this.subscribers = this.subscribers.where('id', '!=', id) + subscribed = false + } + }, + } + } +} diff --git a/src/event/PropagatingEventBus.ts b/src/event/PropagatingEventBus.ts new file mode 100644 index 0000000..dc703c3 --- /dev/null +++ b/src/event/PropagatingEventBus.ts @@ -0,0 +1,28 @@ +import {EventBus} from './EventBus' +import {Collection} from '../util' +import {Bus, Dispatchable} from './types' + +/** + * A non-queued bus implementation that executes subscribers immediately in the main thread. + * This bus also supports "propagating" events along to any other connected buses. + * Such behavior is useful, e.g., if we want to have a semi-isolated request- + * level bus whose events still reach the global EventBus instance. + */ +export class PropagatingEventBus extends EventBus { + protected recipients: Collection = new Collection() + + async dispatch(event: Dispatchable): Promise { + await super.dispatch(event) + await this.recipients.promiseMap(bus => bus.dispatch(event)) + } + + /** + * Register the given bus to receive events fired on this bus. + * @param recipient + */ + connect(recipient: Bus): void { + if ( !this.recipients.includes(recipient) ) { + this.recipients.push(recipient) + } + } +} diff --git a/src/event/types.ts b/src/event/types.ts new file mode 100644 index 0000000..57f5d0e --- /dev/null +++ b/src/event/types.ts @@ -0,0 +1,47 @@ +import {Awaitable, Rehydratable} from '../util' +import {StaticClass} from '../di' + +/** + * A closure that should be executed with the given event is fired. + */ +export type EventSubscriber = (event: T) => Awaitable + +/** + * An object used to track event subscriptions internally. + */ +export interface EventSubscriberEntry { + /** Globally unique ID of this subscription. */ + id: string + + /** The event class subscribed to. */ + event: StaticClass + + /** The closure to execute when the event is fired. */ + subscriber: EventSubscriber +} + +/** + * An object returned upon subscription, used to unsubscribe. + */ +export interface EventSubscription { + /** + * Unsubscribe the associated listener from the event bus. + */ + unsubscribe(): Awaitable +} + +/** + * An instance of something that can be fired on an event bus. + */ +export interface Dispatchable extends Rehydratable { + shouldQueue?: boolean +} + +/** + * An event-driven bus that manages subscribers and dispatched items. + */ +export interface Bus { + subscribe(eventClass: StaticClass, subscriber: EventSubscriber): Awaitable + unsubscribe(subscriber: EventSubscriber): Awaitable + dispatch(event: Dispatchable): Awaitable +} diff --git a/src/http/kernel/module/InjectRequestEventBusHTTPModule.ts b/src/http/kernel/module/InjectRequestEventBusHTTPModule.ts new file mode 100644 index 0000000..4479912 --- /dev/null +++ b/src/http/kernel/module/InjectRequestEventBusHTTPModule.ts @@ -0,0 +1,28 @@ +import {HTTPKernelModule} from '../HTTPKernelModule' +import {Inject, Injectable} from '../../../di' +import {HTTPKernel} from '../HTTPKernel' +import {Request} from '../../lifecycle/Request' +import {EventBus} from '../../../event/EventBus' +import {PropagatingEventBus} from '../../../event/PropagatingEventBus' + +/** + * HTTP kernel module that creates a request-specific event bus + * and injects it into the request container. + */ +@Injectable() +export class InjectRequestEventBusHTTPModule extends HTTPKernelModule { + @Inject() + protected bus!: EventBus + + public static register(kernel: HTTPKernel): void { + kernel.register(this).first() + } + + public async apply(request: Request): Promise { + const bus = this.make(PropagatingEventBus) + bus.connect(this.bus) + + request.purge(EventBus).registerProducer(EventBus, () => bus) + return request + } +} diff --git a/src/index.ts b/src/index.ts index 95cd9b4..957a53b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,6 +1,11 @@ export * from './util' export * from './di' +export * from './event/types' +export * from './event/Event' +export * from './event/EventBus' +export * from './event/PropagatingEventBus' + export * from './service/Logging' export * from './lifecycle/RunLevelErrorHandler' diff --git a/src/orm/index.ts b/src/orm/index.ts index b2b051e..b568f12 100644 --- a/src/orm/index.ts +++ b/src/orm/index.ts @@ -15,6 +15,7 @@ export * from './model/Field' export * from './model/ModelBuilder' export * from './model/ModelBuilder' export * from './model/ModelResultIterable' +export * from './model/events' export * from './model/Model' export * from './services/Database' diff --git a/src/orm/model/Model.ts b/src/orm/model/Model.ts index 2a3728b..d627967 100644 --- a/src/orm/model/Model.ts +++ b/src/orm/model/Model.ts @@ -1,18 +1,26 @@ import {ModelKey, QueryRow, QuerySource} from '../types' -import {Container, Inject} from '../../di' +import {Container, Inject, StaticClass} from '../../di' import {DatabaseService} from '../DatabaseService' import {ModelBuilder} from './ModelBuilder' import {getFieldsMeta, ModelField} from './Field' -import {deepCopy, BehaviorSubject, Pipe, Collection} from '../../util' +import {deepCopy, Pipe, Collection, Awaitable, uuid4} from '../../util' import {EscapeValueObject} from '../dialect/SQLDialect' import {AppClass} from '../../lifecycle/AppClass' import {Logging} from '../../service/Logging' import {Connection} from '../connection/Connection' +import {Bus, Dispatchable, EventSubscriber, EventSubscriberEntry, EventSubscription} from '../../event/types' +import {ModelRetrievedEvent} from './events/ModelRetrievedEvent' +import {ModelSavingEvent} from './events/ModelSavingEvent' +import {ModelSavedEvent} from './events/ModelSavedEvent' +import {ModelUpdatingEvent} from './events/ModelUpdatingEvent' +import {ModelUpdatedEvent} from './events/ModelUpdatedEvent' +import {ModelCreatingEvent} from './events/ModelCreatingEvent' +import {ModelCreatedEvent} from './events/ModelCreatedEvent' /** * Base for classes that are mapped to tables in a database. */ -export abstract class Model> extends AppClass { +export abstract class Model> extends AppClass implements Bus { @Inject() protected readonly logging!: Logging; @@ -78,49 +86,10 @@ export abstract class Model> extends AppClass { protected originalSourceRow?: QueryRow /** - * Behavior subject that fires after the model is populated. - */ - protected retrieved$ = new BehaviorSubject>() - - /** - * Behavior subject that fires right before the model is saved. - */ - protected saving$ = new BehaviorSubject>() - - /** - * Behavior subject that fires right after the model is saved. - */ - protected saved$ = new BehaviorSubject>() - - /** - * Behavior subject that fires right before the model is updated. - */ - protected updating$ = new BehaviorSubject>() - - /** - * Behavior subject that fires right after the model is updated. - */ - protected updated$ = new BehaviorSubject>() - - /** - * Behavior subject that fires right before the model is inserted. - */ - protected creating$ = new BehaviorSubject>() - - /** - * Behavior subject that fires right after the model is inserted. - */ - protected created$ = new BehaviorSubject>() - - /** - * Behavior subject that fires right before the model is deleted. - */ - protected deleting$ = new BehaviorSubject>() - - /** - * Behavior subject that fires right after the model is deleted. + * Collection of event subscribers, by their events. + * @protected */ - protected deleted$ = new BehaviorSubject>() + protected modelEventBusSubscribers: Collection> = new Collection>() /** * Get the table name for this model. @@ -193,9 +162,16 @@ export abstract class Model> extends AppClass { values?: {[key: string]: any}, ) { super() + this.initialize() this.boot(values) } + /** + * Called when the model is instantiated. Use for any setup of events, &c. + * @protected + */ + protected initialize(): void {} // eslint-disable-line @typescript-eslint/no-empty-function + /** * Initialize the model's properties from the given values and do any other initial setup. * @@ -228,7 +204,7 @@ export abstract class Model> extends AppClass { this.setFieldFromObject(field.modelKey, field.databaseKey, row) }) - await this.retrieved$.next(this) + await this.dispatch(new ModelRetrievedEvent(this as any)) return this } @@ -592,11 +568,11 @@ export abstract class Model> extends AppClass { * @param withoutTimestamps */ public async save({ withoutTimestamps = false } = {}): Promise> { - await this.saving$.next(this) + await this.dispatch(new ModelSavingEvent(this as any)) const ctor = this.constructor as typeof Model if ( this.exists() && this.isDirty() ) { - await this.updating$.next(this) + await this.dispatch(new ModelUpdatingEvent(this as any)) if ( !withoutTimestamps && ctor.timestamps && ctor.UPDATED_AT ) { (this as any)[ctor.UPDATED_AT] = new Date() @@ -617,9 +593,9 @@ export abstract class Model> extends AppClass { await this.assumeFromSource(data) } - await this.updated$.next(this) + await this.dispatch(new ModelUpdatedEvent(this as any)) } else if ( !this.exists() ) { - await this.creating$.next(this) + await this.dispatch(new ModelCreatingEvent(this as any)) if ( !withoutTimestamps ) { if ( ctor.timestamps && ctor.CREATED_AT ) { @@ -647,10 +623,11 @@ export abstract class Model> extends AppClass { if ( data ) { await this.assumeFromSource(result) } - await this.created$.next(this) + + await this.dispatch(new ModelCreatedEvent(this as any)) } - await this.saved$.next(this) + await this.dispatch(new ModelSavedEvent(this as any)) return this } @@ -822,4 +799,42 @@ export abstract class Model> extends AppClass { protected setFieldFromObject(thisFieldName: string | symbol, objectFieldName: string, object: QueryRow): void { (this as any)[thisFieldName] = object[objectFieldName] } + + subscribe(event: StaticClass, subscriber: EventSubscriber): Awaitable { + const entry: EventSubscriberEntry = { + id: uuid4(), + event, + subscriber, + } + + this.modelEventBusSubscribers.push(entry) + return this.buildSubscription(entry.id) + } + + unsubscribe(subscriber: EventSubscriber): Awaitable { + this.modelEventBusSubscribers = this.modelEventBusSubscribers.where('subscriber', '!=', subscriber) + } + + async dispatch(event: Dispatchable): Promise { + const eventClass: StaticClass = event.constructor as StaticClass + await this.modelEventBusSubscribers.where('event', '=', eventClass) + .promiseMap(entry => entry.subscriber(event)) + } + + /** + * Build an EventSubscription object for the subscriber of the given ID. + * @param id + * @protected + */ + protected buildSubscription(id: string): EventSubscription { + let subscribed = true + return { + unsubscribe: (): Awaitable => { + if ( subscribed ) { + this.modelEventBusSubscribers = this.modelEventBusSubscribers.where('id', '!=', id) + subscribed = false + } + }, + } + } } diff --git a/src/orm/model/events/ModelCreatedEvent.ts b/src/orm/model/events/ModelCreatedEvent.ts new file mode 100644 index 0000000..177164d --- /dev/null +++ b/src/orm/model/events/ModelCreatedEvent.ts @@ -0,0 +1,9 @@ +import {Model} from '../Model' +import {ModelEvent} from './ModelEvent' + +/** + * Event fired right after a model is inserted. + */ +export class ModelCreatedEvent> extends ModelEvent { + +} diff --git a/src/orm/model/events/ModelCreatingEvent.ts b/src/orm/model/events/ModelCreatingEvent.ts new file mode 100644 index 0000000..0b2d6b6 --- /dev/null +++ b/src/orm/model/events/ModelCreatingEvent.ts @@ -0,0 +1,9 @@ +import {Model} from '../Model' +import {ModelEvent} from './ModelEvent' + +/** + * Event fired right before a model is inserted. + */ +export class ModelCreatingEvent> extends ModelEvent { + +} diff --git a/src/orm/model/events/ModelDeletedEvent.ts b/src/orm/model/events/ModelDeletedEvent.ts new file mode 100644 index 0000000..8ba6cdc --- /dev/null +++ b/src/orm/model/events/ModelDeletedEvent.ts @@ -0,0 +1,9 @@ +import {Model} from '../Model' +import {ModelEvent} from './ModelEvent' + +/** + * Event fired right after a model is deleted. + */ +export class ModelDeletedEvent> extends ModelEvent { + +} diff --git a/src/orm/model/events/ModelDeletingEvent.ts b/src/orm/model/events/ModelDeletingEvent.ts new file mode 100644 index 0000000..51ac53a --- /dev/null +++ b/src/orm/model/events/ModelDeletingEvent.ts @@ -0,0 +1,9 @@ +import {Model} from '../Model' +import {ModelEvent} from './ModelEvent' + +/** + * Event fired right before a model is deleted. + */ +export class ModelDeletingEvent> extends ModelEvent { + +} diff --git a/src/orm/model/events/ModelEvent.ts b/src/orm/model/events/ModelEvent.ts new file mode 100644 index 0000000..26512e3 --- /dev/null +++ b/src/orm/model/events/ModelEvent.ts @@ -0,0 +1,31 @@ +import {Model} from '../Model' +import {Event} from '../../../event/Event' +import {JSONState} from '../../../util' + +/** + * Base class for events that concern an instance of a model. + */ +export abstract class ModelEvent> extends Event { + /** + * The instance of the model. + */ + public instance!: T + + constructor( + instance?: T, + ) { + super() + if ( instance ) { + this.instance = instance + } + } + + // TODO implement serialization here + dehydrate(): Promise { + return Promise.resolve({}) + } + + rehydrate(/* state: JSONState */): void | Promise { + return undefined + } +} diff --git a/src/orm/model/events/ModelRetrievedEvent.ts b/src/orm/model/events/ModelRetrievedEvent.ts new file mode 100644 index 0000000..d52edc4 --- /dev/null +++ b/src/orm/model/events/ModelRetrievedEvent.ts @@ -0,0 +1,9 @@ +import {Model} from '../Model' +import {ModelEvent} from './ModelEvent' + +/** + * Event fired right after a model's data is loaded from the source. + */ +export class ModelRetrievedEvent> extends ModelEvent { + +} diff --git a/src/orm/model/events/ModelSavedEvent.ts b/src/orm/model/events/ModelSavedEvent.ts new file mode 100644 index 0000000..2fb54ee --- /dev/null +++ b/src/orm/model/events/ModelSavedEvent.ts @@ -0,0 +1,9 @@ +import {Model} from '../Model' +import {ModelEvent} from './ModelEvent' + +/** + * Event fired right after a model is persisted to the source. + */ +export class ModelSavedEvent> extends ModelEvent { + +} diff --git a/src/orm/model/events/ModelSavingEvent.ts b/src/orm/model/events/ModelSavingEvent.ts new file mode 100644 index 0000000..45883ee --- /dev/null +++ b/src/orm/model/events/ModelSavingEvent.ts @@ -0,0 +1,9 @@ +import {Model} from '../Model' +import {ModelEvent} from './ModelEvent' + +/** + * Event fired right before a model is persisted to the source. + */ +export class ModelSavingEvent> extends ModelEvent { + +} diff --git a/src/orm/model/events/ModelUpdatedEvent.ts b/src/orm/model/events/ModelUpdatedEvent.ts new file mode 100644 index 0000000..88a84d4 --- /dev/null +++ b/src/orm/model/events/ModelUpdatedEvent.ts @@ -0,0 +1,9 @@ +import {Model} from '../Model' +import {ModelEvent} from './ModelEvent' + +/** + * Event fired right after a model's data is updated. + */ +export class ModelUpdatedEvent> extends ModelEvent { + +} diff --git a/src/orm/model/events/ModelUpdatingEvent.ts b/src/orm/model/events/ModelUpdatingEvent.ts new file mode 100644 index 0000000..19f28d1 --- /dev/null +++ b/src/orm/model/events/ModelUpdatingEvent.ts @@ -0,0 +1,9 @@ +import {Model} from '../Model' +import {ModelEvent} from './ModelEvent' + +/** + * Event fired right before a model's data is updated. + */ +export class ModelUpdatingEvent> extends ModelEvent { + +} diff --git a/src/orm/model/events/index.ts b/src/orm/model/events/index.ts new file mode 100644 index 0000000..f334246 --- /dev/null +++ b/src/orm/model/events/index.ts @@ -0,0 +1,21 @@ +import {ModelCreatedEvent} from './ModelCreatedEvent' +import {ModelUpdatingEvent} from './ModelUpdatingEvent' +import {ModelCreatingEvent} from './ModelCreatingEvent' +import {ModelSavedEvent} from './ModelSavedEvent' +import {ModelDeletedEvent} from './ModelDeletedEvent' +import {ModelDeletingEvent} from './ModelDeletingEvent' +import {ModelRetrievedEvent} from './ModelRetrievedEvent' +import {ModelUpdatedEvent} from './ModelUpdatedEvent' +import {ModelEvent} from './ModelEvent' + +export const ModelEvents = { + ModelCreatedEvent, + ModelCreatingEvent, + ModelDeletedEvent, + ModelDeletingEvent, + ModelEvent, + ModelRetrievedEvent, + ModelSavedEvent, + ModelUpdatedEvent, + ModelUpdatingEvent, +} diff --git a/src/service/HTTPServer.ts b/src/service/HTTPServer.ts index 485c962..e72f049 100644 --- a/src/service/HTTPServer.ts +++ b/src/service/HTTPServer.ts @@ -16,6 +16,7 @@ import {ExecuteResolvedRoutePreflightHTTPModule} from '../http/kernel/module/Exe import {ExecuteResolvedRoutePostflightHTTPModule} from '../http/kernel/module/ExecuteResolvedRoutePostflightHTTPModule' import {ParseIncomingBodyHTTPModule} from '../http/kernel/module/ParseIncomingBodyHTTPModule' import {Config} from './Config' +import {InjectRequestEventBusHTTPModule} from '../http/kernel/module/InjectRequestEventBusHTTPModule' /** * Application unit that starts the HTTP/S server, creates Request and Response objects @@ -48,6 +49,7 @@ export class HTTPServer extends Unit { ExecuteResolvedRoutePreflightHTTPModule.register(this.kernel) ExecuteResolvedRoutePostflightHTTPModule.register(this.kernel) ParseIncomingBodyHTTPModule.register(this.kernel) + InjectRequestEventBusHTTPModule.register(this.kernel) await new Promise(res => { this.server = createServer(this.handler)