diff --git a/src/auth/context/SecurityContext.ts b/src/auth/context/SecurityContext.ts index 8c2ab35..407ad35 100644 --- a/src/auth/context/SecurityContext.ts +++ b/src/auth/context/SecurityContext.ts @@ -1,10 +1,10 @@ import {Inject, Injectable} from '../../di' -import {EventBus} from '../../event/EventBus' import {Awaitable, Maybe} from '../../util' import {Authenticatable, AuthenticatableRepository} from '../types' import {Logging} from '../../service/Logging' import {UserAuthenticatedEvent} from '../event/UserAuthenticatedEvent' import {UserFlushedEvent} from '../event/UserFlushedEvent' +import {Bus} from '../../support/bus' /** * Base-class for a context that authenticates users and manages security. @@ -12,7 +12,7 @@ import {UserFlushedEvent} from '../event/UserFlushedEvent' @Injectable() export abstract class SecurityContext { @Inject() - protected readonly bus!: EventBus + protected readonly bus!: Bus @Inject() protected readonly logging!: Logging @@ -40,7 +40,7 @@ export abstract class SecurityContext { */ async authenticateOnce(user: Authenticatable): Promise { this.authenticatedUser = user - await this.bus.dispatch(new UserAuthenticatedEvent(user, this)) + await this.bus.push(new UserAuthenticatedEvent(user, this)) } /** @@ -50,7 +50,7 @@ export abstract class SecurityContext { async authenticate(user: Authenticatable): Promise { this.authenticatedUser = user await this.persist() - await this.bus.dispatch(new UserAuthenticatedEvent(user, this)) + await this.bus.push(new UserAuthenticatedEvent(user, this)) } /** @@ -60,7 +60,7 @@ export abstract class SecurityContext { const user = this.authenticatedUser if ( user ) { this.authenticatedUser = undefined - await this.bus.dispatch(new UserFlushedEvent(user, this)) + await this.bus.push(new UserFlushedEvent(user, this)) } } @@ -72,7 +72,7 @@ export abstract class SecurityContext { if ( user ) { this.authenticatedUser = undefined await this.persist() - await this.bus.dispatch(new UserFlushedEvent(user, this)) + await this.bus.push(new UserFlushedEvent(user, this)) } } diff --git a/src/auth/context/SessionSecurityContext.ts b/src/auth/context/SessionSecurityContext.ts index 4a17d09..137cfcc 100644 --- a/src/auth/context/SessionSecurityContext.ts +++ b/src/auth/context/SessionSecurityContext.ts @@ -32,7 +32,7 @@ export class SessionSecurityContext extends SecurityContext { const user = await this.repository.getByIdentifier(identifier) if ( user ) { this.authenticatedUser = user - await this.bus.dispatch(new UserAuthenticationResumedEvent(user, this)) + await this.bus.push(new UserAuthenticationResumedEvent(user, this)) } } } diff --git a/src/auth/event/AuthenticationEvent.ts b/src/auth/event/AuthenticationEvent.ts index 960a84d..df717a9 100644 --- a/src/auth/event/AuthenticationEvent.ts +++ b/src/auth/event/AuthenticationEvent.ts @@ -1,27 +1,12 @@ -import {Event} from '../../event/Event' import {SecurityContext} from '../context/SecurityContext' -import {Awaitable, JSONState} from '../../util' import {Authenticatable} from '../types' +import {BaseEvent} from '../../support/bus' -/** - * Event fired when a user is authenticated. - */ -export class AuthenticationEvent extends Event { +export abstract class AuthenticationEvent extends BaseEvent { constructor( public readonly user: Authenticatable, public readonly context: SecurityContext, ) { super() } - - async dehydrate(): Promise { - return { - user: await this.user.dehydrate(), - contextName: this.context.name, - } - } - - rehydrate(state: JSONState): Awaitable { // eslint-disable-line @typescript-eslint/no-unused-vars - // TODO fill this in - } } diff --git a/src/auth/event/UserAuthenticatedEvent.ts b/src/auth/event/UserAuthenticatedEvent.ts index f999f91..9c5d3bf 100644 --- a/src/auth/event/UserAuthenticatedEvent.ts +++ b/src/auth/event/UserAuthenticatedEvent.ts @@ -3,4 +3,6 @@ import {AuthenticationEvent} from './AuthenticationEvent' /** * Event fired when a user is authenticated. */ -export class UserAuthenticatedEvent extends AuthenticationEvent {} +export class UserAuthenticatedEvent extends AuthenticationEvent { + public readonly eventName = '@extollo/lib:UserAuthenticatedEvent' +} diff --git a/src/auth/event/UserAuthenticationResumedEvent.ts b/src/auth/event/UserAuthenticationResumedEvent.ts index 180cc41..4717a4f 100644 --- a/src/auth/event/UserAuthenticationResumedEvent.ts +++ b/src/auth/event/UserAuthenticationResumedEvent.ts @@ -3,4 +3,6 @@ import {AuthenticationEvent} from './AuthenticationEvent' /** * Event raised when a user is re-authenticated to a security context */ -export class UserAuthenticationResumedEvent extends AuthenticationEvent {} +export class UserAuthenticationResumedEvent extends AuthenticationEvent { + public readonly eventName = '@extollo/lib:UserAuthenticationResumedEvent' +} diff --git a/src/auth/event/UserFlushedEvent.ts b/src/auth/event/UserFlushedEvent.ts index 601d3dc..0fec173 100644 --- a/src/auth/event/UserFlushedEvent.ts +++ b/src/auth/event/UserFlushedEvent.ts @@ -3,4 +3,6 @@ import {AuthenticationEvent} from './AuthenticationEvent' /** * Event fired when a user is unauthenticated. */ -export class UserFlushedEvent extends AuthenticationEvent {} +export class UserFlushedEvent extends AuthenticationEvent { + public readonly eventName = '@extollo/lib:UserFlushedEvent' +} diff --git a/src/auth/index.ts b/src/auth/index.ts index 2474f0e..d5c2919 100644 --- a/src/auth/index.ts +++ b/src/auth/index.ts @@ -11,6 +11,8 @@ export * from './event/UserAuthenticatedEvent' export * from './event/UserAuthenticationResumedEvent' export * from './event/UserFlushedEvent' +export * from './serial/AuthenticationEventSerializer' + export * from './repository/orm/ORMUser' export * from './repository/orm/ORMUserRepository' diff --git a/src/auth/serial/AuthenticationEventSerializer.ts b/src/auth/serial/AuthenticationEventSerializer.ts new file mode 100644 index 0000000..9fd653a --- /dev/null +++ b/src/auth/serial/AuthenticationEventSerializer.ts @@ -0,0 +1,54 @@ +import {BaseSerializer, ObjectSerializer, SerialPayload} from '../../support/bus' +import {AuthenticationEvent} from '../event/AuthenticationEvent' +import {ErrorWithContext, JSONState} from '../../util' +import {Authenticatable} from '../types' +import {StaticInstantiable} from '../../di' +import {SecurityContext} from '../context/SecurityContext' +import {UserAuthenticatedEvent} from '../event/UserAuthenticatedEvent' +import {UserAuthenticationResumedEvent} from '../event/UserAuthenticationResumedEvent' +import {UserFlushedEvent} from '../event/UserFlushedEvent' + +export interface AuthenticationEventSerialPayload extends JSONState { + user: SerialPayload + eventName: string +} + +@ObjectSerializer() +export class AuthenticationEventSerializer extends BaseSerializer { + protected async decodeSerial(serial: AuthenticationEventSerialPayload): Promise { + const user = await this.getSerialization().decode(serial.user) + const context = await this.getRequest().make(SecurityContext) + + const EventClass = this.getEventClass(serial.eventName) + return new EventClass(user, context) + } + + protected async encodeActual(actual: AuthenticationEvent): Promise { + return { + eventName: actual.eventName, + user: await this.getSerialization().encode(actual.user), + } + } + + protected getName(): string { + return '@extollo/lib:AuthenticationEventSerializer' + } + + matchActual(some: AuthenticationEvent): boolean { + return some instanceof AuthenticationEvent + } + + protected getEventClass(name: string): StaticInstantiable { + if ( name === '@extollo/lib:UserAuthenticatedEvent' ) { + return UserAuthenticatedEvent + } else if ( name === '@extollo/lib:UserAuthenticationResumedEvent' ) { + return UserAuthenticationResumedEvent + } else if ( name === '@extollo/lib:UserFlushedEvent' ) { + return UserFlushedEvent + } + + throw new ErrorWithContext('Unable to map event name to AuthenticationEvent implementation', { + eventName: name, + }) + } +} diff --git a/src/cli/decorators.ts b/src/cli/decorators.ts index 9cf706c..9a03b87 100644 --- a/src/cli/decorators.ts +++ b/src/cli/decorators.ts @@ -13,7 +13,7 @@ export const CLIDirective = (): ClassDecorator => { if ( isInstantiableOf(target, Directive) ) { logIfDebugging('extollo.cli.decorators', 'Registering CLIDirective blueprint:', target) ContainerBlueprint.getContainerBlueprint() - .onResolve(CommandLine, cli => { + .onResolve(CommandLine, (cli: CommandLine) => { cli.registerDirective(target as Instantiable) }) } else { diff --git a/src/cli/directive/queue/WorkDirective.ts b/src/cli/directive/queue/WorkDirective.ts new file mode 100644 index 0000000..2fb6343 --- /dev/null +++ b/src/cli/directive/queue/WorkDirective.ts @@ -0,0 +1,39 @@ +import {Directive, OptionDefinition} from '../../Directive' +import {Inject, Injectable} from '../../../di' +import {Queue} from '../../../support/bus' + +@Injectable() +export class WorkDirective extends Directive { + @Inject() + protected readonly queue!: Queue + + getDescription(): string { + return 'pop a single item from the queue and execute it' + } + + getKeywords(): string | string[] { + return 'queue-work' + } + + getOptions(): OptionDefinition[] { + return [] + } + + async handle(): Promise { + try { + const queueable = await this.queue.pop() + if ( !queueable ) { + this.info('There are no items in the queue.') + return + } + + this.info(`Fetched 1 item from the queue`) + await queueable.execute() + this.success('Executed 1 item from the queue') + } catch (e: unknown) { + this.error('Failed to execute queueable:') + this.error(e) + process.exitCode = 1 + } + } +} diff --git a/src/cli/service/CommandLineApplication.ts b/src/cli/service/CommandLineApplication.ts index 3fa9761..d249c50 100644 --- a/src/cli/service/CommandLineApplication.ts +++ b/src/cli/service/CommandLineApplication.ts @@ -9,6 +9,7 @@ import {TemplateDirective} from '../directive/TemplateDirective' import {RunDirective} from '../directive/RunDirective' import {RoutesDirective} from '../directive/RoutesDirective' import {RouteDirective} from '../directive/RouteDirective' +import {WorkDirective} from '../directive/queue/WorkDirective' /** * Unit that takes the place of the final unit in the application that handles @@ -46,6 +47,7 @@ export class CommandLineApplication extends Unit { this.cli.registerDirective(RunDirective) this.cli.registerDirective(RoutesDirective) this.cli.registerDirective(RouteDirective) + this.cli.registerDirective(WorkDirective) const argv = process.argv.slice(2) const match = this.cli.getDirectives() diff --git a/src/di/Container.ts b/src/di/Container.ts index c4f6a6a..3c0dd14 100644 --- a/src/di/Container.ts +++ b/src/di/Container.ts @@ -8,7 +8,7 @@ import { TypedDependencyKey, } from './types' import {AbstractFactory} from './factory/AbstractFactory' -import {collect, Collection, globalRegistry, logIfDebugging} from '../util' +import {collect, Collection, ErrorWithContext, globalRegistry, logIfDebugging} from '../util' import {Factory} from './factory/Factory' import {DuplicateFactoryKeyError} from './error/DuplicateFactoryKeyError' import {ClosureFactory} from './factory/ClosureFactory' @@ -25,6 +25,27 @@ export type ResolvedDependency = { paramIndex: number, key: DependencyKey, resol * A container of resolve-able dependencies that are created via inversion-of-control. */ export class Container { + /** + * Set to true when we're realizing a container. + * Used to prevent infinite recursion when `getContainer()` is accidentally called + * from somewhere within the `realizeContainer()` call. + */ + protected static realizingContainer = false + + /** + * List of dependency keys currently being `make`'d as a reverse stack. + * This is used to detect dependency cycles. + * @protected + */ + protected static makeStack?: Collection + + /** + * The 100 most recent dependency keys that were `make`'d. Used to help with + * debugging cyclic dependency errors. + * @protected + */ + protected static makeHistory?: Collection + /** * Given a Container instance, apply the ContainerBlueprint to it. * @param container @@ -34,6 +55,10 @@ export class Container { .resolve() .map(factory => container.registerFactory(factory)) + ContainerBlueprint.getContainerBlueprint() + .resolveConstructable() + .map((factory: StaticClass, any>) => console.log(factory)) // eslint-disable-line no-console + ContainerBlueprint.getContainerBlueprint() .resolveConstructable() .map((factory: StaticClass, any>) => container.registerFactory(container.make(factory))) @@ -54,8 +79,14 @@ export class Container { public static getContainer(): Container { const existing = globalRegistry.getGlobal('extollo/injector') if ( !existing ) { + if ( this.realizingContainer ) { + throw new ErrorWithContext('Attempted getContainer call during container realization.') + } + + this.realizingContainer = true const container = Container.realizeContainer(new Container()) globalRegistry.setGlobal('extollo/injector', container) + this.realizingContainer = false return container } @@ -403,13 +434,92 @@ export class Container { * @param {...any} parameters */ make(target: DependencyKey, ...parameters: any[]): T { + if ( !Container.makeStack ) { + Container.makeStack = new Collection() + } + + if ( !Container.makeHistory ) { + Container.makeHistory = new Collection() + } + + Container.makeStack.push(target) + + if ( Container.makeHistory.length > 100 ) { + Container.makeHistory = Container.makeHistory.slice(1, 100) + } + Container.makeHistory.push(target) + + this.checkForMakeCycles() + if ( this.hasKey(target) ) { - return this.resolveAndCreate(target, ...parameters) + const realized = this.resolveAndCreate(target, ...parameters) + Container.makeStack.pop() + return realized } else if ( typeof target !== 'string' && isInstantiable(target) ) { - return this.produceFactory(new Factory(target), parameters) - } else { - throw new TypeError(`Invalid or unknown make target: ${target}`) + const realized = this.produceFactory(new Factory(target), parameters) + Container.makeStack.pop() + return realized + } + + Container.makeStack.pop() + throw new TypeError(`Invalid or unknown make target: ${target}`) + } + + /** + * Check the `makeStack` for duplicates and throw an error if a dependency cycle is + * detected. This is used to prevent infinite mutual recursion when cyclic dependencies + * occur. + * @protected + */ + protected checkForMakeCycles(): void { + if ( !Container.makeStack ) { + Container.makeStack = new Collection() + } + + if ( !Container.makeHistory ) { + Container.makeHistory = new Collection() } + + if ( Container.makeStack.unique().length !== Container.makeStack.length ) { + const displayKey = (key: DependencyKey) => { + if ( typeof key === 'string' ) { + return 'key: `' + key + '`' + } else { + return `key: ${key.name}` + } + } + + const makeStack = Container.makeStack + .reverse() + .map(displayKey) + + const makeHistory = Container.makeHistory + .reverse() + .map(displayKey) + + console.error('Make Stack:') // eslint-disable-line no-console + console.error(makeStack.join('\n')) // eslint-disable-line no-console + console.error('Make History:') // eslint-disable-line no-console + console.error(makeHistory.join('\n')) // eslint-disable-line no-console + throw new ErrorWithContext('Cyclic dependency chain detected', { + makeStack, + makeHistory, + }) + } + } + + /** + * Create a new instance of the dependency key using this container, ignoring any pre-existing instances + * in this container. + * @param key + * @param parameters + */ + makeNew(key: TypedDependencyKey, ...parameters: any[]): T { + if ( isInstantiable(key) ) { + return this.produceFactory(new Factory(key), parameters) + } + + throw new TypeError(`Invalid or unknown make target: ${key}`) } /** diff --git a/src/event/Event.ts b/src/event/Event.ts deleted file mode 100644 index 71e492a..0000000 --- a/src/event/Event.ts +++ /dev/null @@ -1,13 +0,0 @@ -import {Dispatchable} from './types' -import {Awaitable, JSONState} from '../util' - -/** - * Abstract class representing an event that may be fired. - */ -export abstract class Event implements Dispatchable { - - - abstract dehydrate(): Awaitable - - abstract rehydrate(state: JSONState): Awaitable -} diff --git a/src/event/EventBus.ts b/src/event/EventBus.ts deleted file mode 100644 index 76d6e49..0000000 --- a/src/event/EventBus.ts +++ /dev/null @@ -1,53 +0,0 @@ -import {Instantiable, Singleton, StaticClass} from '../di' -import {Bus, Dispatchable, EventSubscriber, EventSubscriberEntry, EventSubscription} from './types' -import {Awaitable, Collection, uuid4} from '../util' - -/** - * A non-queued bus implementation that executes subscribers immediately in the main thread. - */ -@Singleton() -export class EventBus implements Bus { - /** - * Collection of subscribers, by their events. - * @protected - */ - protected subscribers: Collection> = new Collection>() - - subscribe(event: StaticClass>, subscriber: EventSubscriber): Awaitable { - const entry: EventSubscriberEntry = { - id: uuid4(), - event, - subscriber, - } - - this.subscribers.push(entry) - return this.buildSubscription(entry.id) - } - - unsubscribe(subscriber: EventSubscriber): Awaitable { - this.subscribers = this.subscribers.where('subscriber', '!=', subscriber) - } - - async dispatch(event: Dispatchable): Promise { - const eventClass: StaticClass = event.constructor as StaticClass - await this.subscribers.where('event', '=', eventClass) - .promiseMap(entry => entry.subscriber(event)) - } - - /** - * Build an EventSubscription object for the subscriber of the given ID. - * @param id - * @protected - */ - protected buildSubscription(id: string): EventSubscription { - let subscribed = true - return { - unsubscribe: (): Awaitable => { - if ( subscribed ) { - this.subscribers = this.subscribers.where('id', '!=', id) - subscribed = false - } - }, - } - } -} diff --git a/src/event/PropagatingEventBus.ts b/src/event/PropagatingEventBus.ts deleted file mode 100644 index dc703c3..0000000 --- a/src/event/PropagatingEventBus.ts +++ /dev/null @@ -1,28 +0,0 @@ -import {EventBus} from './EventBus' -import {Collection} from '../util' -import {Bus, Dispatchable} from './types' - -/** - * A non-queued bus implementation that executes subscribers immediately in the main thread. - * This bus also supports "propagating" events along to any other connected buses. - * Such behavior is useful, e.g., if we want to have a semi-isolated request- - * level bus whose events still reach the global EventBus instance. - */ -export class PropagatingEventBus extends EventBus { - protected recipients: Collection = new Collection() - - async dispatch(event: Dispatchable): Promise { - await super.dispatch(event) - await this.recipients.promiseMap(bus => bus.dispatch(event)) - } - - /** - * Register the given bus to receive events fired on this bus. - * @param recipient - */ - connect(recipient: Bus): void { - if ( !this.recipients.includes(recipient) ) { - this.recipients.push(recipient) - } - } -} diff --git a/src/event/types.ts b/src/event/types.ts deleted file mode 100644 index d62d7b0..0000000 --- a/src/event/types.ts +++ /dev/null @@ -1,47 +0,0 @@ -import {Awaitable, Rehydratable} from '../util' -import {Instantiable, StaticClass} from '../di' - -/** - * A closure that should be executed with the given event is fired. - */ -export type EventSubscriber = (event: T) => Awaitable - -/** - * An object used to track event subscriptions internally. - */ -export interface EventSubscriberEntry { - /** Globally unique ID of this subscription. */ - id: string - - /** The event class subscribed to. */ - event: StaticClass> - - /** The closure to execute when the event is fired. */ - subscriber: EventSubscriber -} - -/** - * An object returned upon subscription, used to unsubscribe. - */ -export interface EventSubscription { - /** - * Unsubscribe the associated listener from the event bus. - */ - unsubscribe(): Awaitable -} - -/** - * An instance of something that can be fired on an event bus. - */ -export interface Dispatchable extends Rehydratable { - shouldQueue?: boolean -} - -/** - * An event-driven bus that manages subscribers and dispatched items. - */ -export interface Bus { - subscribe(eventClass: StaticClass>, subscriber: EventSubscriber): Awaitable - unsubscribe(subscriber: EventSubscriber): Awaitable - dispatch(event: Dispatchable): Awaitable -} diff --git a/src/http/kernel/module/ClearRequestEventBusHTTPModule.ts b/src/http/kernel/module/ClearRequestEventBusHTTPModule.ts new file mode 100644 index 0000000..ff62a41 --- /dev/null +++ b/src/http/kernel/module/ClearRequestEventBusHTTPModule.ts @@ -0,0 +1,28 @@ +import {HTTPKernelModule} from '../HTTPKernelModule' +import {Inject, Injectable} from '../../../di' +import {HTTPKernel} from '../HTTPKernel' +import {Request} from '../../lifecycle/Request' +import {Bus} from '../../../support/bus' + +/** + * HTTP kernel module that creates a request-specific event bus + * and injects it into the request container. + */ +@Injectable() +export class ClearRequestEventBusHTTPModule extends HTTPKernelModule { + @Inject() + protected bus!: Bus + + public static register(kernel: HTTPKernel): void { + kernel.register(this).first() + } + + public async apply(request: Request): Promise { + const requestBus = request.make(Bus) + await requestBus.down() + + // FIXME disconnect request bus from global event bus + + return request + } +} diff --git a/src/http/kernel/module/InjectRequestEventBusHTTPModule.ts b/src/http/kernel/module/InjectRequestEventBusHTTPModule.ts index 4479912..0e538f3 100644 --- a/src/http/kernel/module/InjectRequestEventBusHTTPModule.ts +++ b/src/http/kernel/module/InjectRequestEventBusHTTPModule.ts @@ -2,8 +2,7 @@ import {HTTPKernelModule} from '../HTTPKernelModule' import {Inject, Injectable} from '../../../di' import {HTTPKernel} from '../HTTPKernel' import {Request} from '../../lifecycle/Request' -import {EventBus} from '../../../event/EventBus' -import {PropagatingEventBus} from '../../../event/PropagatingEventBus' +import {Bus} from '../../../support/bus' /** * HTTP kernel module that creates a request-specific event bus @@ -12,17 +11,18 @@ import {PropagatingEventBus} from '../../../event/PropagatingEventBus' @Injectable() export class InjectRequestEventBusHTTPModule extends HTTPKernelModule { @Inject() - protected bus!: EventBus + protected bus!: Bus public static register(kernel: HTTPKernel): void { kernel.register(this).first() } public async apply(request: Request): Promise { - const bus = this.make(PropagatingEventBus) - bus.connect(this.bus) + const requestBus = this.container().makeNew(Bus) + await requestBus.up() + await requestBus.connect(this.bus) - request.purge(EventBus).registerProducer(EventBus, () => bus) + request.purge(Bus).registerProducer(Bus, () => requestBus) return request } } diff --git a/src/index.ts b/src/index.ts index 30e2461..9172a45 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,13 +2,10 @@ export * from './util' export * from './lib' export * from './di' -export * from './event/types' -export * from './event/Event' -export * from './event/EventBus' -export * from './event/PropagatingEventBus' - export * from './service/Logging' +export * from './support/bus/index' + export * from './lifecycle/RunLevelErrorHandler' export * from './lifecycle/Application' export * from './lifecycle/AppClass' @@ -30,6 +27,7 @@ export * from './http/kernel/module/AbstractResolvedRouteHandlerHTTPModule' export * from './http/kernel/module/ExecuteResolvedRoutePreflightHTTPModule' export * from './http/kernel/module/ExecuteResolvedRouteHandlerHTTPModule' export * from './http/kernel/module/ExecuteResolvedRoutePostflightHTTPModule' +export * from './http/kernel/module/ClearRequestEventBusHTTPModule' export * from './http/kernel/HTTPKernel' export * from './http/kernel/HTTPKernelModule' @@ -89,7 +87,6 @@ export * from './support/cache/MemoryCache' export * from './support/cache/RedisCache' export * from './support/cache/CacheFactory' export * from './support/NodeModules' -export * from './support/queue/Queue' export * from './service/Queueables' diff --git a/src/orm/connection/Connection.ts b/src/orm/connection/Connection.ts index e336ef5..209932d 100644 --- a/src/orm/connection/Connection.ts +++ b/src/orm/connection/Connection.ts @@ -3,9 +3,9 @@ import {QueryResult} from '../types' import {SQLDialect} from '../dialect/SQLDialect' import {AppClass} from '../../lifecycle/AppClass' import {Inject, Injectable} from '../../di' -import {EventBus} from '../../event/EventBus' import {QueryExecutedEvent} from './event/QueryExecutedEvent' import {Schema} from '../schema/Schema' +import {Bus} from '../../support/bus' /** * Error thrown when a connection is used before it is ready. @@ -25,7 +25,7 @@ export class ConnectionNotReadyError extends ErrorWithContext { @Injectable() export abstract class Connection extends AppClass { @Inject() - protected bus!: EventBus + protected readonly bus!: Bus constructor( /** @@ -82,6 +82,6 @@ export abstract class Connection extends AppClass { */ protected async queryExecuted(query: string): Promise { const event = new QueryExecutedEvent(this.name, this, query) - await this.bus.dispatch(event) + await this.bus.push(event) } } diff --git a/src/orm/connection/event/QueryExecutedEvent.ts b/src/orm/connection/event/QueryExecutedEvent.ts index 9f1f8c9..59f2086 100644 --- a/src/orm/connection/event/QueryExecutedEvent.ts +++ b/src/orm/connection/event/QueryExecutedEvent.ts @@ -1,67 +1,17 @@ -import {Event} from '../../../event/Event' -import {Inject, Injectable} from '../../../di' -import {InvalidJSONStateError, JSONState} from '../../../util' import {Connection} from '../Connection' -import {DatabaseService} from '../../DatabaseService' +import {BaseEvent} from '../../../support/bus' /** * Event fired when a query is executed. */ -@Injectable() -export class QueryExecutedEvent extends Event { - @Inject() - protected database!: DatabaseService - - /** - * The name of the connection where the query was executed. - * @protected - */ - public connectionName!: string - - /** - * The connection where the query was executed. - */ - public connection!: Connection - - /** - * The query that was executed. - */ - public query!: string - +export class QueryExecutedEvent extends BaseEvent { constructor( - connectionName?: string, - connection?: Connection, - query?: string, + public readonly connectionName: string, + public readonly connection: Connection, + public readonly query: string, ) { super() - if ( connectionName ) { - this.connectionName = connectionName - } - - if ( connection ) { - this.connection = connection - } - - if ( query ) { - this.query = query - } - } - - async dehydrate(): Promise { - return { - connectionName: this.connectionName, - query: this.query, - } - } - - rehydrate(state: JSONState): void { - if ( !state.connectionName || !state.query ) { - throw new InvalidJSONStateError('Missing connectionName or query from QueryExecutedEvent state.') - } - - this.query = String(state.query) - this.connectionName = String(state.connectionName) - this.connection = this.database.get(this.connectionName) } + eventName = '@extollo/lib.QueryExecutedEvent' } diff --git a/src/orm/connection/event/QueryExecutedEventSerializer.ts b/src/orm/connection/event/QueryExecutedEventSerializer.ts new file mode 100644 index 0000000..002ee0c --- /dev/null +++ b/src/orm/connection/event/QueryExecutedEventSerializer.ts @@ -0,0 +1,38 @@ +import {BaseSerializer} from '../../../support/bus' +import {QueryExecutedEvent} from './QueryExecutedEvent' +import {Awaitable, JSONState} from '../../../util' +import {Container, Inject, Injectable} from '../../../di' +import {DatabaseService} from '../../DatabaseService' +import {ObjectSerializer} from '../../../support/bus/serial/decorators' + +export interface QueryExecutedEventSerialPayload extends JSONState { + connectionName: string + query: string +} + +@ObjectSerializer() +@Injectable() +export class QueryExecutedEventSerializer extends BaseSerializer { + @Inject() + protected readonly injector!: Container + + protected decodeSerial(serial: QueryExecutedEventSerialPayload): Awaitable { + const connection = this.injector.make(DatabaseService).get(serial.connectionName) + return new QueryExecutedEvent(serial.connectionName, connection, serial.query) + } + + protected encodeActual(actual: QueryExecutedEvent): Awaitable { + return { + connectionName: actual.connectionName, + query: actual.query, + } + } + + protected getName(): string { + return '@extollo/lib.QueryExecutedEventSerializer' + } + + matchActual(some: QueryExecutedEvent): boolean { + return some instanceof QueryExecutedEvent + } +} diff --git a/src/orm/directive/MigrateDirective.ts b/src/orm/directive/MigrateDirective.ts index 125e234..0a9dfd5 100644 --- a/src/orm/directive/MigrateDirective.ts +++ b/src/orm/directive/MigrateDirective.ts @@ -1,13 +1,12 @@ -import {Directive, OptionDefinition} from '../../cli' +import {Directive, OptionDefinition, CLIDirective} from '../../cli' import {Container, Inject, Injectable} from '../../di' -import {EventBus} from '../../event/EventBus' +import {Collection} from '../../util' +import {Bus, EventHandlerSubscription} from '../../support/bus' import {Migrations} from '../services/Migrations' import {Migrator} from '../migrations/Migrator' import {ApplyingMigrationEvent} from '../migrations/events/ApplyingMigrationEvent' import {AppliedMigrationEvent} from '../migrations/events/AppliedMigrationEvent' -import {EventSubscription} from '../../event/types' import {NothingToMigrateError} from '../migrations/NothingToMigrateError' -import {CLIDirective} from '../../cli/decorators' /** * CLI directive that applies migrations using the default Migrator. @@ -17,13 +16,13 @@ import {CLIDirective} from '../../cli/decorators' @CLIDirective() export class MigrateDirective extends Directive { @Inject() - protected readonly bus!: EventBus + protected readonly bus!: Bus @Inject('injector') protected readonly injector!: Container /** Event bus subscriptions. */ - protected subscriptions: EventSubscription[] = [] + protected subscriptions: Collection = new Collection() getKeywords(): string | string[] { return ['migrate'] @@ -113,7 +112,6 @@ export class MigrateDirective extends Directive { /** Remove event bus listeners before finish. */ protected async removeListeners(): Promise { - await Promise.all(this.subscriptions.map(x => x.unsubscribe())) - this.subscriptions = [] + await this.subscriptions.awaitMapCall('unsubscribe') } } diff --git a/src/orm/directive/RollbackDirective.ts b/src/orm/directive/RollbackDirective.ts index 9370d7a..a942556 100644 --- a/src/orm/directive/RollbackDirective.ts +++ b/src/orm/directive/RollbackDirective.ts @@ -1,13 +1,12 @@ -import {Directive, OptionDefinition} from '../../cli' +import {Directive, OptionDefinition, CLIDirective} from '../../cli' import {Container, Inject, Injectable} from '../../di' -import {EventBus} from '../../event/EventBus' +import {Collection} from '../../util' +import {Bus, EventHandlerSubscription} from '../../support/bus' import {Migrator} from '../migrations/Migrator' import {Migrations} from '../services/Migrations' import {RollingBackMigrationEvent} from '../migrations/events/RollingBackMigrationEvent' import {RolledBackMigrationEvent} from '../migrations/events/RolledBackMigrationEvent' -import {EventSubscription} from '../../event/types' import {NothingToMigrateError} from '../migrations/NothingToMigrateError' -import {CLIDirective} from '../../cli/decorators' /** * CLI directive that undoes applied migrations using the default Migrator. @@ -17,7 +16,7 @@ import {CLIDirective} from '../../cli/decorators' @CLIDirective() export class RollbackDirective extends Directive { @Inject() - protected readonly bus!: EventBus + protected readonly bus!: Bus @Inject('injector') protected readonly injector!: Container @@ -26,7 +25,7 @@ export class RollbackDirective extends Directive { protected readonly migrations!: Migrations /** Event bus subscriptions. */ - protected subscriptions: EventSubscription[] = [] + protected subscriptions: Collection = new Collection() getKeywords(): string | string[] { return ['rollback'] @@ -98,7 +97,6 @@ export class RollbackDirective extends Directive { /** Remove event bus listeners before finish. */ protected async removeListeners(): Promise { - await Promise.all(this.subscriptions.map(x => x.unsubscribe())) - this.subscriptions = [] + await this.subscriptions.awaitMapCall('unsubscribe') } } diff --git a/src/orm/migrations/Migrator.ts b/src/orm/migrations/Migrator.ts index 754c043..cf643d0 100644 --- a/src/orm/migrations/Migrator.ts +++ b/src/orm/migrations/Migrator.ts @@ -2,12 +2,12 @@ import {Container, Inject, Injectable} from '../../di' import {Awaitable, collect, ErrorWithContext} from '../../util' import {Migration} from './Migration' import {Migrations} from '../services/Migrations' -import {EventBus} from '../../event/EventBus' import {ApplyingMigrationEvent} from './events/ApplyingMigrationEvent' import {AppliedMigrationEvent} from './events/AppliedMigrationEvent' import {RollingBackMigrationEvent} from './events/RollingBackMigrationEvent' import {RolledBackMigrationEvent} from './events/RolledBackMigrationEvent' import {NothingToMigrateError} from './NothingToMigrateError' +import {Bus} from '../../support/bus' /** * Manages single-run patches/migrations. @@ -18,7 +18,7 @@ export abstract class Migrator { protected readonly migrations!: Migrations @Inject() - protected readonly bus!: EventBus + protected readonly bus!: Bus @Inject('injector') protected readonly injector!: Container @@ -193,31 +193,11 @@ export abstract class Migrator { * @protected */ protected async filterAppliedMigrations(identifiers: string[]): Promise { - return collect(identifiers) - .partialMap(identifier => { - const migration = this.migrations.get(identifier) - if ( migration ) { - return { - identifier, - migration, - } - } - }) - .asyncPipe() - .tap(coll => { - return coll.promiseMap(async group => { - return { - ...group, - has: await this.has(group.migration), - } - }) - }) - .tap(coll => { - return coll.filter(group => !group.has) - .pluck('identifier') - .all() - }) - .resolve() + const ids = await collect(identifiers) + .toAsync() + .filterOut(async id => this.has(this.migrations.getOrFail(id))) + + return ids.all() } /** @@ -226,31 +206,11 @@ export abstract class Migrator { * @protected */ protected async filterPendingMigrations(identifiers: string[]): Promise { - return collect(identifiers) - .partialMap(identifier => { - const migration = this.migrations.get(identifier) - if ( migration ) { - return { - identifier, - migration, - } - } - }) - .asyncPipe() - .tap(coll => { - return coll.promiseMap(async group => { - return { - ...group, - has: await this.has(group.migration), - } - }) - }) - .tap(coll => { - return coll.filter(group => group.has) - .pluck('identifier') - .all() - }) - .resolve() + const ids = await collect(identifiers) + .toAsync() + .filter(async id => this.has(this.migrations.getOrFail(id))) + + return ids.all() } /** @@ -260,7 +220,7 @@ export abstract class Migrator { */ protected async applying(migration: Migration): Promise { const event = this.injector.make(ApplyingMigrationEvent, migration) - await this.bus.dispatch(event) + await this.bus.push(event) } /** @@ -270,7 +230,7 @@ export abstract class Migrator { */ protected async applied(migration: Migration): Promise { const event = this.injector.make(AppliedMigrationEvent, migration) - await this.bus.dispatch(event) + await this.bus.push(event) } /** @@ -280,7 +240,7 @@ export abstract class Migrator { */ protected async rollingBack(migration: Migration): Promise { const event = this.injector.make(RollingBackMigrationEvent, migration) - await this.bus.dispatch(event) + await this.bus.push(event) } /** @@ -290,6 +250,6 @@ export abstract class Migrator { */ protected async rolledBack(migration: Migration): Promise { const event = this.injector.make(RolledBackMigrationEvent, migration) - await this.bus.dispatch(event) + await this.bus.push(event) } } diff --git a/src/orm/migrations/events/AppliedMigrationEvent.ts b/src/orm/migrations/events/AppliedMigrationEvent.ts index 145c66c..6fdd0d2 100644 --- a/src/orm/migrations/events/AppliedMigrationEvent.ts +++ b/src/orm/migrations/events/AppliedMigrationEvent.ts @@ -5,4 +5,6 @@ import {MigrationEvent} from './MigrationEvent' * Event fired after a migration is applied. */ @Injectable() -export class AppliedMigrationEvent extends MigrationEvent {} +export class AppliedMigrationEvent extends MigrationEvent { + eventName = '@extollo/lib.AppliedMigrationEvent' +} diff --git a/src/orm/migrations/events/ApplyingMigrationEvent.ts b/src/orm/migrations/events/ApplyingMigrationEvent.ts index 6149423..546ec1b 100644 --- a/src/orm/migrations/events/ApplyingMigrationEvent.ts +++ b/src/orm/migrations/events/ApplyingMigrationEvent.ts @@ -5,4 +5,6 @@ import {MigrationEvent} from './MigrationEvent' * Event fired before a migration is applied. */ @Injectable() -export class ApplyingMigrationEvent extends MigrationEvent {} +export class ApplyingMigrationEvent extends MigrationEvent { + eventName = '@extollo/lib.ApplyingMigrationEvent' +} diff --git a/src/orm/migrations/events/MigrationEvent.ts b/src/orm/migrations/events/MigrationEvent.ts index aa78cfd..e5ae29c 100644 --- a/src/orm/migrations/events/MigrationEvent.ts +++ b/src/orm/migrations/events/MigrationEvent.ts @@ -1,49 +1,13 @@ -import {Event} from '../../../event/Event' import {Migration} from '../Migration' -import {Inject, Injectable} from '../../../di' -import {Migrations} from '../../services/Migrations' -import {ErrorWithContext} from '../../../util' +import {BaseEvent} from '../../../support/bus' /** * Generic base-class for migration-related events. */ -@Injectable() -export abstract class MigrationEvent extends Event { - @Inject() - protected readonly migrations!: Migrations - - /** The migration relevant to this event. */ - private internalMigration: Migration - - /** - * Get the relevant migration. - */ - public get migration(): Migration { - return this.internalMigration - } - +export abstract class MigrationEvent extends BaseEvent { constructor( - migration: Migration, + public readonly migration: Migration, ) { super() - this.internalMigration = migration - } - - dehydrate(): {identifier: string} { - return { - identifier: this.migration.identifier, - } - } - - rehydrate(state: {identifier: string}): void { - const migration = this.migrations.get(state.identifier) - - if ( !migration ) { - throw new ErrorWithContext(`Unable to find migration with identifier: ${state.identifier}`, { - identifier: state.identifier, - }) - } - - this.internalMigration = migration } } diff --git a/src/orm/migrations/events/MigrationEventSerializer.ts b/src/orm/migrations/events/MigrationEventSerializer.ts new file mode 100644 index 0000000..eb6a845 --- /dev/null +++ b/src/orm/migrations/events/MigrationEventSerializer.ts @@ -0,0 +1,59 @@ +import {BaseSerializer} from '../../../support/bus' +import {ObjectSerializer} from '../../../support/bus/serial/decorators' +import {Injectable, Instantiable} from '../../../di' +import {Awaitable, ErrorWithContext, JSONState} from '../../../util' +import {MigrationEvent} from './MigrationEvent' +import {Migrations} from '../../services/Migrations' +import {AppliedMigrationEvent} from './AppliedMigrationEvent' +import {ApplyingMigrationEvent} from './ApplyingMigrationEvent' +import {RolledBackMigrationEvent} from './RolledBackMigrationEvent' +import {RollingBackMigrationEvent} from './RollingBackMigrationEvent' + +export interface MigrationEventSerialPayload extends JSONState { + identifier: string + eventType: string +} + +@ObjectSerializer() +@Injectable() +export class MigrationEventSerializer extends BaseSerializer { + protected decodeSerial(serial: MigrationEventSerialPayload): Awaitable { + const migration = this.make(Migrations).get(serial.identifier) + if ( !migration ) { + throw new ErrorWithContext(`Invalid serialized migration identifier: ${serial.identifier}`) + } + + return (new (this.stringToEvent(serial.eventType))(migration)) + } + + protected encodeActual(actual: MigrationEvent): Awaitable { + return { + identifier: actual.migration.identifier, + eventType: actual.eventName, + } + } + + protected getName(): string { + return '@extollo/lib.MigrationEventSerializer' + } + + matchActual(some: MigrationEvent): boolean { + return some instanceof MigrationEvent + } + + private stringToEvent(name: string): Instantiable { + if ( name === '@extollo/lib.AppliedMigrationEvent' ) { + return AppliedMigrationEvent + } else if ( name === '@extollo/lib.ApplyingMigrationEvent' ) { + return ApplyingMigrationEvent + } else if ( name === '@extollo/lib.RollingBackMigrationEvent' ) { + return RollingBackMigrationEvent + } else if ( name === '@extollo/lib.RolledBackMigrationEvent' ) { + return RolledBackMigrationEvent + } + + throw new ErrorWithContext(`Invalid migration event name: ${name}`, { + name, + }) + } +} diff --git a/src/orm/migrations/events/RolledBackMigrationEvent.ts b/src/orm/migrations/events/RolledBackMigrationEvent.ts index 71b0c15..ef003ef 100644 --- a/src/orm/migrations/events/RolledBackMigrationEvent.ts +++ b/src/orm/migrations/events/RolledBackMigrationEvent.ts @@ -5,4 +5,6 @@ import {MigrationEvent} from './MigrationEvent' * Event fired after a migration has been rolled-back. */ @Injectable() -export class RolledBackMigrationEvent extends MigrationEvent {} +export class RolledBackMigrationEvent extends MigrationEvent { + eventName = '@extollo/lib.RolledBackMigrationEvent' +} diff --git a/src/orm/migrations/events/RollingBackMigrationEvent.ts b/src/orm/migrations/events/RollingBackMigrationEvent.ts index 8504d15..73ccceb 100644 --- a/src/orm/migrations/events/RollingBackMigrationEvent.ts +++ b/src/orm/migrations/events/RollingBackMigrationEvent.ts @@ -5,4 +5,6 @@ import {MigrationEvent} from './MigrationEvent' * Event fired before a migration is rolled back. */ @Injectable() -export class RollingBackMigrationEvent extends MigrationEvent {} +export class RollingBackMigrationEvent extends MigrationEvent { + eventName = '@extollo/lib.RollingBackMigrationEvent' +} diff --git a/src/orm/model/Model.ts b/src/orm/model/Model.ts index 3f7f0ac..5b913dc 100644 --- a/src/orm/model/Model.ts +++ b/src/orm/model/Model.ts @@ -1,14 +1,12 @@ import {ModelKey, QueryRow, QuerySource} from '../types' -import {Container, Inject, Instantiable, isInstantiable, StaticClass} from '../../di' +import {Container, Inject, Instantiable, isInstantiable} from '../../di' import {DatabaseService} from '../DatabaseService' import {ModelBuilder} from './ModelBuilder' import {getFieldsMeta, ModelField} from './Field' -import {deepCopy, Collection, Awaitable, uuid4, isKeyof, Pipeline} from '../../util' +import {deepCopy, Collection, uuid4, isKeyof, Pipeline} from '../../util' import {EscapeValueObject} from '../dialect/SQLDialect' -import {AppClass} from '../../lifecycle/AppClass' import {Logging} from '../../service/Logging' import {Connection} from '../connection/Connection' -import {Bus, Dispatchable, EventSubscriber, EventSubscriberEntry, EventSubscription} from '../../event/types' import {ModelRetrievedEvent} from './events/ModelRetrievedEvent' import {ModelSavingEvent} from './events/ModelSavingEvent' import {ModelSavedEvent} from './events/ModelSavedEvent' @@ -16,23 +14,21 @@ import {ModelUpdatingEvent} from './events/ModelUpdatingEvent' import {ModelUpdatedEvent} from './events/ModelUpdatedEvent' import {ModelCreatingEvent} from './events/ModelCreatingEvent' import {ModelCreatedEvent} from './events/ModelCreatedEvent' -import {EventBus} from '../../event/EventBus' import {Relation, RelationValue} from './relation/Relation' import {HasOne} from './relation/HasOne' import {HasMany} from './relation/HasMany' import {HasOneOrMany} from './relation/HasOneOrMany' import {Scope, ScopeClosure} from './scope/Scope' +import {LocalBus} from '../../support/bus/LocalBus' +import {ModelEvent} from './events/ModelEvent' /** * Base for classes that are mapped to tables in a database. */ -export abstract class Model> extends AppClass implements Bus { +export abstract class Model> extends LocalBus> { @Inject() protected readonly logging!: Logging - @Inject() - protected readonly bus!: EventBus - /** * The name of the connection this model should run through. * @type string @@ -100,12 +96,6 @@ export abstract class Model> extends AppClass implements Bus */ protected originalSourceRow?: QueryRow - /** - * Collection of event subscribers, by their events. - * @protected - */ - protected modelEventBusSubscribers: Collection> = new Collection>() - /** * Cache of relation instances by property accessor. * This is used by the `@Relation()` decorator to cache Relation instances. @@ -257,7 +247,7 @@ export abstract class Model> extends AppClass implements Bus this.setFieldFromObject(field.modelKey, field.databaseKey, row) }) - await this.dispatch(new ModelRetrievedEvent(this as any)) + await this.push(new ModelRetrievedEvent(this as any)) return this } @@ -627,11 +617,11 @@ export abstract class Model> extends AppClass implements Bus * @param withoutTimestamps */ public async save({ withoutTimestamps = false } = {}): Promise> { - await this.dispatch(new ModelSavingEvent(this as any)) + await this.push(new ModelSavingEvent(this as any)) const ctor = this.constructor as typeof Model if ( this.exists() && this.isDirty() ) { - await this.dispatch(new ModelUpdatingEvent(this as any)) + await this.push(new ModelUpdatingEvent(this as any)) if ( !withoutTimestamps && ctor.timestamps && ctor.UPDATED_AT ) { (this as any)[ctor.UPDATED_AT] = new Date() @@ -652,9 +642,9 @@ export abstract class Model> extends AppClass implements Bus await this.assumeFromSource(data) } - await this.dispatch(new ModelUpdatedEvent(this as any)) + await this.push(new ModelUpdatedEvent(this as any)) } else if ( !this.exists() ) { - await this.dispatch(new ModelCreatingEvent(this as any)) + await this.push(new ModelCreatingEvent(this as any)) if ( !withoutTimestamps ) { if ( ctor.timestamps && ctor.CREATED_AT ) { @@ -685,10 +675,10 @@ export abstract class Model> extends AppClass implements Bus await this.assumeFromSource(data) } - await this.dispatch(new ModelCreatedEvent(this as any)) + await this.push(new ModelCreatedEvent(this as any)) } - await this.dispatch(new ModelSavedEvent(this as any)) + await this.push(new ModelSavedEvent(this as any)) return this } @@ -825,13 +815,6 @@ export abstract class Model> extends AppClass implements Bus return !this.is(other) } - /** - * Creates a new Pipe instance containing this model instance. - */ - public pipe(pipeline: Pipeline): TOut { - return pipeline.apply(this) - } - /** * Get a wrapped function that compares whether the given model field * on the current instance differs from the originally fetched value. @@ -886,46 +869,6 @@ export abstract class Model> extends AppClass implements Bus (this as any)[thisFieldName] = object[objectFieldName] } - subscribe(event: StaticClass>, subscriber: EventSubscriber): Awaitable { - const entry: EventSubscriberEntry = { - id: uuid4(), - event, - subscriber, - } - - this.modelEventBusSubscribers.push(entry) - return this.buildSubscription(entry.id) - } - - unsubscribe(subscriber: EventSubscriber): Awaitable { - this.modelEventBusSubscribers = this.modelEventBusSubscribers.where('subscriber', '!=', subscriber) - } - - async dispatch(event: Dispatchable): Promise { - const eventClass: StaticClass = event.constructor as StaticClass - await this.modelEventBusSubscribers.where('event', '=', eventClass) - .promiseMap(entry => entry.subscriber(event)) - - await this.bus.dispatch(event) - } - - /** - * Build an EventSubscription object for the subscriber of the given ID. - * @param id - * @protected - */ - protected buildSubscription(id: string): EventSubscription { - let subscribed = true - return { - unsubscribe: (): Awaitable => { - if ( subscribed ) { - this.modelEventBusSubscribers = this.modelEventBusSubscribers.where('id', '!=', id) - subscribed = false - } - }, - } - } - /** * Create a new one-to-one relation instance. Should be called from a method on the model: * diff --git a/src/orm/model/events/ModelCreatedEvent.ts b/src/orm/model/events/ModelCreatedEvent.ts index 177164d..0f45bb9 100644 --- a/src/orm/model/events/ModelCreatedEvent.ts +++ b/src/orm/model/events/ModelCreatedEvent.ts @@ -5,5 +5,5 @@ import {ModelEvent} from './ModelEvent' * Event fired right after a model is inserted. */ export class ModelCreatedEvent> extends ModelEvent { - + eventName = '@extollo/lib.ModelCreatedEvent' } diff --git a/src/orm/model/events/ModelCreatingEvent.ts b/src/orm/model/events/ModelCreatingEvent.ts index 0b2d6b6..a200b1a 100644 --- a/src/orm/model/events/ModelCreatingEvent.ts +++ b/src/orm/model/events/ModelCreatingEvent.ts @@ -5,5 +5,5 @@ import {ModelEvent} from './ModelEvent' * Event fired right before a model is inserted. */ export class ModelCreatingEvent> extends ModelEvent { - + eventName = '@extollo/lib.ModelCreatingEvent' } diff --git a/src/orm/model/events/ModelDeletedEvent.ts b/src/orm/model/events/ModelDeletedEvent.ts index 8ba6cdc..1fbde62 100644 --- a/src/orm/model/events/ModelDeletedEvent.ts +++ b/src/orm/model/events/ModelDeletedEvent.ts @@ -5,5 +5,5 @@ import {ModelEvent} from './ModelEvent' * Event fired right after a model is deleted. */ export class ModelDeletedEvent> extends ModelEvent { - + eventName = '@extollo/lib.ModelDeletedEvent' } diff --git a/src/orm/model/events/ModelDeletingEvent.ts b/src/orm/model/events/ModelDeletingEvent.ts index 51ac53a..c0bb1fc 100644 --- a/src/orm/model/events/ModelDeletingEvent.ts +++ b/src/orm/model/events/ModelDeletingEvent.ts @@ -5,5 +5,5 @@ import {ModelEvent} from './ModelEvent' * Event fired right before a model is deleted. */ export class ModelDeletingEvent> extends ModelEvent { - + eventName = '@extollo/lib.ModelDeletingEvent' } diff --git a/src/orm/model/events/ModelEvent.ts b/src/orm/model/events/ModelEvent.ts index 26512e3..6d495e6 100644 --- a/src/orm/model/events/ModelEvent.ts +++ b/src/orm/model/events/ModelEvent.ts @@ -1,31 +1,19 @@ import {Model} from '../Model' -import {Event} from '../../../event/Event' -import {JSONState} from '../../../util' +import {BaseEvent} from '../../../support/bus' +import {Awaitable} from '../../../util' /** * Base class for events that concern an instance of a model. + * @fixme support serialization */ -export abstract class ModelEvent> extends Event { - /** - * The instance of the model. - */ - public instance!: T - +export abstract class ModelEvent> extends BaseEvent { constructor( - instance?: T, + public readonly instance: T, ) { super() - if ( instance ) { - this.instance = instance - } - } - - // TODO implement serialization here - dehydrate(): Promise { - return Promise.resolve({}) } - rehydrate(/* state: JSONState */): void | Promise { - return undefined + shouldBroadcast(): Awaitable { + return false } } diff --git a/src/orm/model/events/ModelRetrievedEvent.ts b/src/orm/model/events/ModelRetrievedEvent.ts index d52edc4..e93d7f0 100644 --- a/src/orm/model/events/ModelRetrievedEvent.ts +++ b/src/orm/model/events/ModelRetrievedEvent.ts @@ -5,5 +5,5 @@ import {ModelEvent} from './ModelEvent' * Event fired right after a model's data is loaded from the source. */ export class ModelRetrievedEvent> extends ModelEvent { - + eventName = '@extollo/lib.ModelRetrievedEvent' } diff --git a/src/orm/model/events/ModelSavedEvent.ts b/src/orm/model/events/ModelSavedEvent.ts index 2fb54ee..0eec102 100644 --- a/src/orm/model/events/ModelSavedEvent.ts +++ b/src/orm/model/events/ModelSavedEvent.ts @@ -5,5 +5,5 @@ import {ModelEvent} from './ModelEvent' * Event fired right after a model is persisted to the source. */ export class ModelSavedEvent> extends ModelEvent { - + eventName = '@extollo/lib.ModelSavedEvent' } diff --git a/src/orm/model/events/ModelSavingEvent.ts b/src/orm/model/events/ModelSavingEvent.ts index 45883ee..1d902aa 100644 --- a/src/orm/model/events/ModelSavingEvent.ts +++ b/src/orm/model/events/ModelSavingEvent.ts @@ -5,5 +5,5 @@ import {ModelEvent} from './ModelEvent' * Event fired right before a model is persisted to the source. */ export class ModelSavingEvent> extends ModelEvent { - + eventName = '@extollo/lib.ModelSavingEvent' } diff --git a/src/orm/model/events/ModelUpdatedEvent.ts b/src/orm/model/events/ModelUpdatedEvent.ts index 88a84d4..287c8da 100644 --- a/src/orm/model/events/ModelUpdatedEvent.ts +++ b/src/orm/model/events/ModelUpdatedEvent.ts @@ -5,5 +5,5 @@ import {ModelEvent} from './ModelEvent' * Event fired right after a model's data is updated. */ export class ModelUpdatedEvent> extends ModelEvent { - + eventName = '@extollo/lib.ModelUpdatedEvent' } diff --git a/src/orm/model/events/ModelUpdatingEvent.ts b/src/orm/model/events/ModelUpdatingEvent.ts index 19f28d1..2018865 100644 --- a/src/orm/model/events/ModelUpdatingEvent.ts +++ b/src/orm/model/events/ModelUpdatingEvent.ts @@ -5,5 +5,5 @@ import {ModelEvent} from './ModelEvent' * Event fired right before a model's data is updated. */ export class ModelUpdatingEvent> extends ModelEvent { - + eventName = '@extollo/lib.ModelUpdatingEvent' } diff --git a/src/orm/schema/PostgresSchema.ts b/src/orm/schema/PostgresSchema.ts index 44d2b8a..8e17437 100644 --- a/src/orm/schema/PostgresSchema.ts +++ b/src/orm/schema/PostgresSchema.ts @@ -94,7 +94,7 @@ export class PostgresSchema extends Schema { .type(ConstraintType.Unique) .tap(constraint => { collect<{column_name: string}>(uniques[key]) // eslint-disable-line camelcase - .pluck('column_name') + .pluck('column_name') .each(column => constraint.field(column)) }) .flagAsExistingInSchema() @@ -125,7 +125,7 @@ export class PostgresSchema extends Schema { } }) .whereNotIn('column', nonNullable.pluck('column_name')) - .pluck('column') + .pluck('column') .each(column => { table.column(column) .nullable() @@ -161,7 +161,7 @@ export class PostgresSchema extends Schema { return builder .peek(idx => { collect<{column_name: string}>(groupedIndexes[key]) // eslint-disable-line camelcase - .pluck('column_name') + .pluck('column_name') .each(col => idx.field(col)) }) .when(groupedIndexes[key]?.[0]?.indisprimary, idx => idx.primary()) diff --git a/src/service/Canonical.ts b/src/service/Canonical.ts index e911409..a4bf973 100644 --- a/src/service/Canonical.ts +++ b/src/service/Canonical.ts @@ -155,6 +155,18 @@ export abstract class Canonical extends Unit { return `${this.canonicalItem}s` } + /** Get a canonical item by key, throwing an error if it could not be found. */ + public getOrFail(key: string): T { + const result = this.get(key) + if ( !result ) { + throw new ErrorWithContext(`Unable to resolve Canonical key: ${key}`, { + key, + }) + } + + return result + } + /** Get a canonical item by key. */ public get(key: string): T | undefined { if ( key.startsWith('@') ) { @@ -221,24 +233,26 @@ export abstract class Canonical extends Unit { } public async up(): Promise { - for await ( const entry of this.path.walk() ) { - if ( !entry.endsWith(this.suffix) ) { - this.logging.debug(`Skipping file with invalid suffix: ${entry}`) - continue + if ( await this.path.exists() ) { + for await ( const entry of this.path.walk() ) { + if ( !entry.endsWith(this.suffix) ) { + this.logging.debug(`Skipping file with invalid suffix: ${entry}`) + continue + } + + const definition = await this.buildCanonicalDefinition(entry) + this.logging.verbose(`Registering canonical ${this.canonicalItem} "${definition.canonicalName}" from ${entry}`) + const resolvedItem = await this.initCanonicalItem(definition) + + if ( isCanonicalReceiver(resolvedItem) ) { + resolvedItem.setCanonicalResolver( + `${this.canonicalItems}::${definition.canonicalName}`, + definition.canonicalName, + ) + } + + this.loadedItems[definition.canonicalName] = resolvedItem } - - const definition = await this.buildCanonicalDefinition(entry) - this.logging.verbose(`Registering canonical ${this.canonicalItem} "${definition.canonicalName}" from ${entry}`) - const resolvedItem = await this.initCanonicalItem(definition) - - if ( isCanonicalReceiver(resolvedItem) ) { - resolvedItem.setCanonicalResolver( - `${this.canonicalItems}::${definition.canonicalName}`, - definition.canonicalName, - ) - } - - this.loadedItems[definition.canonicalName] = resolvedItem } this.canon.registerCanonical(this) diff --git a/src/service/HTTPServer.ts b/src/service/HTTPServer.ts index ea237d5..c50a2cd 100644 --- a/src/service/HTTPServer.ts +++ b/src/service/HTTPServer.ts @@ -18,8 +18,8 @@ import {ParseIncomingBodyHTTPModule} from '../http/kernel/module/ParseIncomingBo import {Config} from './Config' import {InjectRequestEventBusHTTPModule} from '../http/kernel/module/InjectRequestEventBusHTTPModule' import {Routing} from './Routing' -import {EventBus} from '../event/EventBus' import {RequestLocalStorage} from '../http/RequestLocalStorage' +import {Bus} from '../support/bus' /** * Application unit that starts the HTTP/S server, creates Request and Response objects @@ -40,7 +40,7 @@ export class HTTPServer extends Unit { protected readonly routing!: Routing @Inject() - protected readonly bus!: EventBus + protected readonly bus!: Bus @Inject() protected readonly requestLocalStorage!: RequestLocalStorage diff --git a/src/service/Queueables.ts b/src/service/Queueables.ts index 8d2543a..4bdc76c 100644 --- a/src/service/Queueables.ts +++ b/src/service/Queueables.ts @@ -1,25 +1,15 @@ import {CanonicalStatic} from './CanonicalStatic' -import {Singleton, Instantiable, StaticClass} from '../di' -import {CanonicalDefinition} from './Canonical' -import {Queueable} from '../support/queue/Queue' +import {Singleton, Instantiable} from '../di' +import {Queueable} from '../support/bus' /** - * A canonical unit that resolves Queueable classes from `app/queueables`. + * A canonical unit that resolves Queueable classes from `app/jobs`. */ @Singleton() -export class Queueables extends CanonicalStatic> { +export class Queueables extends CanonicalStatic> { protected appPath = ['jobs'] protected canonicalItem = 'job' protected suffix = '.job.js' - - public async initCanonicalItem(definition: CanonicalDefinition): Promise>> { - const item = await super.initCanonicalItem(definition) - if ( !(item.prototype instanceof Queueable) ) { - throw new TypeError(`Invalid middleware definition: ${definition.originalName}. Controllers must extend from @extollo/lib.Queueable.`) - } - - return item - } } diff --git a/src/service/Routing.ts b/src/service/Routing.ts index 6affbf4..26e1f42 100644 --- a/src/service/Routing.ts +++ b/src/service/Routing.ts @@ -8,9 +8,9 @@ import {ViewEngineFactory} from '../views/ViewEngineFactory' import {ViewEngine} from '../views/ViewEngine' import {lib} from '../lib' import {Config} from './Config' -import {EventBus} from '../event/EventBus' import {PackageDiscovered} from '../support/PackageDiscovered' import {staticServer} from '../http/servers/static' +import {Bus} from '../support/bus' /** * Application unit that loads the various route files from `app/http/routes` and pre-compiles the route handlers. @@ -24,7 +24,7 @@ export class Routing extends Unit { protected readonly config!: Config @Inject() - protected readonly bus!: EventBus + protected readonly bus!: Bus protected compiledRoutes: Collection> = new Collection>() @@ -54,7 +54,7 @@ export class Routing extends Unit { this.logging.verbose(`${route}`) }) - this.bus.subscribe(PackageDiscovered, async (event: PackageDiscovered) => { + await this.bus.subscribe(PackageDiscovered, async (event: PackageDiscovered) => { const loadFrom = event.packageConfig?.extollo?.routes?.loadFrom if ( Array.isArray(loadFrom) ) { for ( const path of loadFrom ) { diff --git a/src/support/NodeModules.ts b/src/support/NodeModules.ts index 83489c1..e0b4ff2 100644 --- a/src/support/NodeModules.ts +++ b/src/support/NodeModules.ts @@ -4,8 +4,8 @@ import {Inject, Injectable, InjectParam} from '../di' import {Application} from '../lifecycle/Application' import {Logging} from '../service/Logging' import {NodeModule, ExtolloAwareNodeModule} from './types' -import {EventBus} from '../event/EventBus' import {PackageDiscovered} from './PackageDiscovered' +import {Bus} from './bus' /** * A helper class for discovering and interacting with @@ -17,7 +17,7 @@ export class NodeModules { protected readonly logging!: Logging @Inject() - protected readonly bus!: EventBus + protected readonly bus!: Bus constructor( @InjectParam(Application.NODE_MODULES_INJECTION) @@ -102,7 +102,7 @@ export class NodeModules { this.logging.info(`Auto-discovering package: ${key}`) seen.push(key) - await this.bus.dispatch(new PackageDiscovered(packageJsonData, packageJson.clone())) + await this.bus.push(new PackageDiscovered(packageJsonData, packageJson.clone())) const packageNodeModules = packageJson.concat('..', 'node_modules') if ( await packageNodeModules.exists() && packageJsonData?.extollo?.recursiveDependencies?.discover ) { diff --git a/src/support/PackageDiscovered.ts b/src/support/PackageDiscovered.ts index 5e0fb95..935ec34 100644 --- a/src/support/PackageDiscovered.ts +++ b/src/support/PackageDiscovered.ts @@ -1,6 +1,6 @@ -import {Event} from '../event/Event' -import {Awaitable, JSONState, UniversalPath} from '../util' +import {Awaitable, UniversalPath} from '../util' import {ExtolloAwareNodeModule} from './types' +import {BaseEvent} from './bus' /** * An event indicating that an NPM package has been discovered @@ -9,7 +9,7 @@ import {ExtolloAwareNodeModule} from './types' * Application services can listen for this event to register * various discovery logic (e.g. automatically boot units */ -export class PackageDiscovered extends Event { +export class PackageDiscovered extends BaseEvent { constructor( public packageConfig: ExtolloAwareNodeModule, public packageJson: UniversalPath, @@ -17,17 +17,9 @@ export class PackageDiscovered extends Event { super() } - dehydrate(): Awaitable { - return { - packageConfig: this.packageConfig as JSONState, - packageJson: this.packageJson.toString(), - } - } + eventName = '@extollo/lib.PackageDiscovered' - rehydrate(state: JSONState): Awaitable { - if ( typeof state === 'object' ) { - this.packageConfig = (state.packageConfig as ExtolloAwareNodeModule) - this.packageJson = new UniversalPath(String(state.packageJson)) - } + shouldBroadcast(): Awaitable { + return false } } diff --git a/src/support/bus/Bus.ts b/src/support/bus/Bus.ts new file mode 100644 index 0000000..b64d5e2 --- /dev/null +++ b/src/support/bus/Bus.ts @@ -0,0 +1,189 @@ +import {Inject, Singleton, StaticInstantiable} from '../../di' +import {BusSubscriber, Event, EventBus, EventHandler, EventHandlerReturn, EventHandlerSubscription} from './types' +import {Awaitable, Collection, Pipeline, uuid4} from '../../util' +import {Logging} from '../../service/Logging' +import {Unit} from '../../lifecycle/Unit' + +export interface BusInternalSubscription { + busUuid: string + subscriberUuid: string + subscription: EventHandlerSubscription +} + +/** + * Propagating event bus implementation. + */ +@Singleton() +export class Bus extends Unit implements EventBus { + @Inject() + protected readonly logging!: Logging + + public readonly uuid = uuid4() + + /** Local listeners subscribed to events on this bus. */ + protected subscribers: Collection> = new Collection() + + /** Connections to other event busses to be propagated. */ + protected connectors: Collection = new Collection() + + protected subscriptions: Collection = new Collection() + + /** True if the bus has been initialized. */ + private isUp = false + + /** + * Push an event onto the bus. + * @param event + */ + async push(event: TEvent): Promise { + if ( event.originBusUuid === this.uuid ) { + return + } + + if ( !event.originBusUuid ) { + event.originBusUuid = this.uuid + } + + if ( await this.callSubscribers(event) ) { + // One of the subscribers halted propagation of the event + return + } + + if ( await this.shouldBroadcast(event) ) { + await this.connectors.awaitMapCall('push', event) + } + } + + /** Returns true if the given event should be pushed to connected event busses. */ + protected async shouldBroadcast(event: Event): Promise { + if ( typeof event.shouldBroadcast === 'function' ) { + return event.shouldBroadcast() + } + + return Boolean(event.shouldBroadcast) + } + + /** + * Call all local listeners for the given event. Returns true if the propagation + * of the event should be halted. + * @param event + * @protected + */ + protected async callSubscribers(event: TEvent): Promise { + return this.subscribers + .filter(sub => event instanceof sub.eventKey) + .pluck('handler') + .toAsync() + .some(handler => handler(event)) + } + + /** Register a pipeline as an event handler. */ + pipe(eventKey: StaticInstantiable, line: Pipeline): Awaitable { + return this.subscribe(eventKey, event => line.apply(event)) + } + + /** + * Subscribe to an event on the bus. + * @param eventKey + * @param handler + */ + async subscribe(eventKey: StaticInstantiable, handler: EventHandler): Promise { + const uuid = uuid4() + + this.subscribers.push({ + eventName: eventKey.prototype.eventName, // FIXME this is not working + handler, + eventKey, + uuid, + } as unknown as BusSubscriber) + + this.subscriptions.concat(await this.connectors + .promiseMap(async bus => { + return { + busUuid: bus.uuid, + subscriberUuid: uuid, + subscription: await bus.subscribe(eventKey, (event: T) => { + if ( event.originBusUuid !== this.uuid ) { + return handler(event) + } + }), + } + })) + + return { + unsubscribe: async () => { + this.subscribers = this.subscribers.where('uuid', '!=', uuid) + + await this.subscriptions + .where('subscriberUuid', '=', uuid) + .tap(trashed => this.subscriptions.diffInPlace(trashed)) + .pluck('subscription') + .awaitMapCall('unsubscribe') + }, + } + } + + /** Connect an external event bus to this bus. */ + async connect(bus: EventBus): Promise { + if ( this.isUp ) { + await bus.up() + } + + this.connectors.push(bus) + + await this.subscribers + .promiseMap(async subscriber => { + return { + busUuid: bus.uuid, + subscriberUuid: subscriber.uuid, + subscription: await bus.subscribe(subscriber.eventKey, event => { + if ( event.originBusUuid !== this.uuid ) { + return subscriber.handler(event) + } + }), + } + }) + } + + async disconnect(bus: EventBus): Promise { + await this.subscriptions + .where('busUuid', '=', bus.uuid) + .tap(trashed => this.subscriptions.diffInPlace(trashed)) + .pluck('subscription') + .awaitMapCall('unsubscribe') + + if ( this.isUp ) { + await bus.down() + } + + this.connectors.diffInPlace([bus]) + } + + /** Initialize this event bus. */ + async up(): Promise { + if ( this.isUp ) { + this.logging.warn('Attempted to boot more than once. Skipping.') + return + } + + await this.connectors.awaitMapCall('up') + + this.isUp = true + } + + /** Clean up this event bus. */ + async down(): Promise { + if ( !this.isUp ) { + this.logging.warn('Attempted to shut down but was never properly booted. Skipping.') + return + } + + await this.subscriptions + .pluck('subscription') + .awaitMapCall('unsubscribe') + + await this.connectors.awaitMapCall('down') + + this.isUp = false + } +} diff --git a/src/support/bus/LocalBus.ts b/src/support/bus/LocalBus.ts new file mode 100644 index 0000000..25d7648 --- /dev/null +++ b/src/support/bus/LocalBus.ts @@ -0,0 +1,130 @@ +import {Inject, Injectable, StaticInstantiable} from '../../di' +import {BusSubscriber, Event, EventBus, EventHandler, EventHandlerReturn, EventHandlerSubscription} from './types' +import {Awaitable, Collection, Pipeline, uuid4} from '../../util' +import {Logging} from '../../service/Logging' +import {Bus, BusInternalSubscription} from './Bus' +import {AppClass} from '../../lifecycle/AppClass' + +/** + * Non-connectable event bus implementation. Can forward events to the main Bus instance. + */ +@Injectable() +export class LocalBus extends AppClass implements EventBus { + @Inject() + protected readonly logging!: Logging + + @Inject() + protected readonly bus!: Bus + + public readonly uuid = uuid4() + + /** Local listeners subscribed to events on this bus. */ + protected subscribers: Collection> = new Collection() + + protected subscriptions: Collection = new Collection() + + /** True if the bus has been initialized. */ + private isUp = false + + /** + * Push an event onto the bus. + * @param event + */ + async push(event: TEvent): Promise { + if ( event.originBusUuid === this.uuid ) { + return + } + + if ( !event.originBusUuid ) { + event.originBusUuid = this.uuid + } + + if ( await this.callSubscribers(event) ) { + // One of the subscribers halted propagation of the event + return + } + + if ( await this.shouldBroadcast(event) ) { + await this.bus.push(event) + } + } + + /** Returns true if the given event should be pushed to connected event busses. */ + protected async shouldBroadcast(event: TEvent): Promise { + if ( typeof event.shouldBroadcast === 'function' ) { + return event.shouldBroadcast() + } + + return Boolean(event.shouldBroadcast) + } + + /** + * Call all local listeners for the given event. Returns true if the propagation + * of the event should be halted. + * @param event + * @protected + */ + protected async callSubscribers(event: TEvent): Promise { + return this.subscribers + .filter(sub => event instanceof sub.eventKey) + .pluck('handler') + .toAsync() + .some(handler => handler(event)) + } + + /** Register a pipeline as an event handler. */ + pipe(eventKey: StaticInstantiable, line: Pipeline): Awaitable { + return this.subscribe(eventKey, event => line.apply(event)) + } + + /** + * Subscribe to an event on the bus. + * @param eventKey + * @param handler + */ + async subscribe(eventKey: StaticInstantiable, handler: EventHandler): Promise { + const uuid = uuid4() + + this.subscribers.push({ + eventName: eventKey.prototype.eventName, + handler, + eventKey, + uuid, + } as unknown as BusSubscriber) + + return { + unsubscribe: async () => { + this.subscribers = this.subscribers.where('uuid', '!=', uuid) + + await this.subscriptions + .where('subscriberUuid', '=', uuid) + .tap(trashed => this.subscriptions.diffInPlace(trashed)) + .pluck('subscription') + .awaitMapCall('unsubscribe') + }, + } + } + + async up(): Promise { + if ( this.isUp ) { + this.logging.warn('Attempted to boot more than once. Skipping.') + return + } + + this.isUp = true + } + + /** Clean up this event bus. */ + async down(): Promise { + if ( !this.isUp ) { + this.logging.warn('Attempted to shut down but was never properly booted. Skipping.') + return + } + + await this.subscriptions + .pluck('subscription') + .awaitMapCall('unsubscribe') + + this.isUp = false + } +} diff --git a/src/support/bus/RedisBus.ts b/src/support/bus/RedisBus.ts new file mode 100644 index 0000000..eeff920 --- /dev/null +++ b/src/support/bus/RedisBus.ts @@ -0,0 +1,110 @@ +import {BusSubscriber, Event, EventBus, EventHandler, EventHandlerReturn, EventHandlerSubscription} from './types' +import {Inject, Injectable, StaticInstantiable} from '../../di' +import {Awaitable, Collection, Pipeline, uuid4} from '../../util' +import {Redis} from '../redis/Redis' +import {Serialization} from './serial/Serialization' +import * as IORedis from 'ioredis' + +/** + * Event bus implementation that does pub/sub over a Redis connection. + */ +@Injectable() +export class RedisBus implements EventBus { + @Inject() + protected readonly redis!: Redis + + @Inject() + protected readonly serial!: Serialization + + public readonly uuid = uuid4() + + /** List of events for which we have created Redis channel subscriptions. */ + protected internalSubscriptions: string[] = [] + + /** List of local subscriptions on this bus. */ + protected subscriptions: Collection> = new Collection() + + protected subscriberConnection?: IORedis.Redis + + protected publisherConnection?: IORedis.Redis + + pipe(eventKey: StaticInstantiable, line: Pipeline): Awaitable { + return this.subscribe(eventKey, event => line.apply(event)) + } + + async push(event: Event): Promise { + if ( !this.publisherConnection ) { + throw new Error('Cannot push to RedisQueue: publisher connection is not initialized') + } + + const channel = `ex-event-${event.eventName}` + const json = await this.serial.encodeJSON(event) + + await this.publisherConnection.publish(channel, json) + } + + async subscribe(eventKey: StaticInstantiable, handler: EventHandler): Promise { + const uuid = uuid4() + const subscriber: BusSubscriber = { + eventName: eventKey.prototype.eventName, + eventKey, + handler, + uuid, + } as unknown as BusSubscriber + + if ( !this.internalSubscriptions.includes(subscriber.eventName) ) { + await new Promise((res, rej) => { + if ( !this.subscriberConnection ) { + return rej(new Error('RedisBus not initialized on subscription.')) + } + + this.subscriberConnection.subscribe(`ex-event-${subscriber.eventName}`, err => { + if ( err ) { + return rej(err) + } + + res() + }) + }) + + this.internalSubscriptions.push(subscriber.eventName) + } + + this.subscriptions.push(subscriber) + + return { + unsubscribe: () => { + this.subscriptions = this.subscriptions.where('uuid', '!=', uuid) + }, + } + } + + protected async handleEvent(name: string, payload: string): Promise { + const event = await this.serial.decodeJSON(payload) + + await this.subscriptions + .where('eventName', '=', name) + .pluck('handler') + .map(handler => handler(event)) + .awaitAll() + } + + async up(): Promise { + this.subscriberConnection = await this.redis.getNewConnection() + this.publisherConnection = await this.redis.getNewConnection() + + this.subscriberConnection.on('message', (channel: string, message: string) => { + if ( !channel.startsWith('ex-event-') ) { + return + } + + const name = channel.substr('ex-event-'.length) + this.handleEvent(name, message) + }) + } + + down(): Awaitable { + this.subscriberConnection?.disconnect() + this.publisherConnection?.disconnect() + } +} diff --git a/src/support/bus/index.ts b/src/support/bus/index.ts new file mode 100644 index 0000000..4fa10c6 --- /dev/null +++ b/src/support/bus/index.ts @@ -0,0 +1,17 @@ +export * from './types' + +export * from './serial/BaseSerializer' +export * from './serial/SimpleCanonicalItemSerializer' +export * from './serial/Serialization' +export * from './serial/decorators' + +export * from './Bus' +export * from './LocalBus' +export * from './RedisBus' + +export * from './queue/event/PushingToQueue' +export * from './queue/event/PushedToQueue' +export * from './queue/Queue' +export * from './queue/CacheQueue' +export * from './queue/SyncQueue' +export * from './queue/QueueFactory' diff --git a/src/support/bus/queue/CacheQueue.ts b/src/support/bus/queue/CacheQueue.ts new file mode 100644 index 0000000..b42b3e3 --- /dev/null +++ b/src/support/bus/queue/CacheQueue.ts @@ -0,0 +1,33 @@ +import {Queue} from './Queue' +import {Inject, Injectable} from '../../../di' +import {Cache, Maybe} from '../../../util' +import {Queueable, ShouldQueue} from '../types' +import {Serialization} from '../serial/Serialization' + +/** + * Queue implementation that uses the configured cache driver as a queue. + */ +@Injectable() +export class CacheQueue extends Queue { + @Inject() + protected readonly cache!: Cache + + @Inject() + protected readonly serial!: Serialization + + protected get queueIdentifier(): string { + return `extollo__queue__${this.name}` + } + + protected async push(item: ShouldQueue): Promise { + const json = await this.serial.encodeJSON(item) + await this.cache.arrayPush(this.queueIdentifier, json) + } + + async pop(): Promise>> { + const popped = await this.cache.arrayPop(this.queueIdentifier) + if ( popped ) { + return this.serial.decodeJSON(popped) + } + } +} diff --git a/src/support/bus/queue/Queue.ts b/src/support/bus/queue/Queue.ts new file mode 100644 index 0000000..f587242 --- /dev/null +++ b/src/support/bus/queue/Queue.ts @@ -0,0 +1,31 @@ +import {BusQueue, Queueable, shouldQueue, ShouldQueue} from '../types' +import {Inject, Injectable} from '../../../di' +import {Awaitable, Maybe} from '../../../util' +import {Bus} from '../Bus' +import {PushingToQueue} from './event/PushingToQueue' +import {PushedToQueue} from './event/PushedToQueue' + +@Injectable() +export abstract class Queue implements BusQueue { + @Inject() + protected readonly bus!: Bus + + constructor( + public readonly name: string, + ) {} + + async dispatch(item: T): Promise { + if ( shouldQueue(item) ) { + await this.bus.push(new PushingToQueue(item)) + await this.push(item) + await this.bus.push(new PushedToQueue(item)) + return + } + + await item.execute() + } + + protected abstract push(item: ShouldQueue): Awaitable + + abstract pop(): Promise>> +} diff --git a/src/support/bus/queue/QueueFactory.ts b/src/support/bus/queue/QueueFactory.ts new file mode 100644 index 0000000..b3a851c --- /dev/null +++ b/src/support/bus/queue/QueueFactory.ts @@ -0,0 +1,87 @@ +import { + AbstractFactory, + Container, + DependencyRequirement, + PropertyDependency, + isInstantiable, + DEPENDENCY_KEYS_METADATA_KEY, + DEPENDENCY_KEYS_PROPERTY_METADATA_KEY, + StaticInstantiable, + FactoryProducer, +} from '../../../di' +import {Collection, ErrorWithContext} from '../../../util' +import {Logging} from '../../../service/Logging' +import {Config} from '../../../service/Config' +import {Queue} from './Queue' +import {SyncQueue} from './SyncQueue' + +/** + * Dependency container factory that matches the abstract Queue token, but + * produces an instance of whatever Queue driver is configured in the `server.queue.driver` config. + */ +@FactoryProducer() +export class QueueFactory extends AbstractFactory { + /** true if we have printed the synchronous queue driver warning once. */ + private static loggedSyncQueueWarningOnce = false + + private di(): [Logging, Config] { + return [ + Container.getContainer().make(Logging), + Container.getContainer().make(Config), + ] + } + + produce(): Queue { + return new (this.getQueueClass())() + } + + match(something: unknown): boolean { + return something === Queue + } + + getDependencyKeys(): Collection { + const meta = Reflect.getMetadata(DEPENDENCY_KEYS_METADATA_KEY, this.getQueueClass()) + if ( meta ) { + return meta + } + return new Collection() + } + + getInjectedProperties(): Collection { + const meta = new Collection() + let currentToken = this.getQueueClass() + + do { + const loadedMeta = Reflect.getMetadata(DEPENDENCY_KEYS_PROPERTY_METADATA_KEY, currentToken) + if ( loadedMeta ) { + meta.concat(loadedMeta) + } + currentToken = Object.getPrototypeOf(currentToken) + } while (Object.getPrototypeOf(currentToken) !== Function.prototype && Object.getPrototypeOf(currentToken) !== Object.prototype) + + return meta + } + + /** + * Get the configured queue driver and return some Instantiable. + * @protected + */ + protected getQueueClass(): StaticInstantiable { + const [logging, config] = this.di() + const QueueClass = config.get('server.queue.driver', SyncQueue) + if ( QueueClass === SyncQueue && !QueueFactory.loggedSyncQueueWarningOnce ) { + logging.warn(`You are using the default synchronous queue driver. It is recommended you configure a background queue driver instead.`) + QueueFactory.loggedSyncQueueWarningOnce = true + } + + if ( !isInstantiable(QueueClass) || !(QueueClass.prototype instanceof Queue) ) { + const e = new ErrorWithContext('Provided queue class does not extend from @extollo/lib.Queue') + e.context = { + configKey: 'server.queue.driver', + class: QueueClass.toString(), + } + } + + return QueueClass + } +} diff --git a/src/support/bus/queue/SyncQueue.ts b/src/support/bus/queue/SyncQueue.ts new file mode 100644 index 0000000..288c42c --- /dev/null +++ b/src/support/bus/queue/SyncQueue.ts @@ -0,0 +1,22 @@ +import {Queue} from './Queue' +import {Inject, Injectable} from '../../../di' +import {Logging} from '../../../service/Logging' +import {Queueable, ShouldQueue} from '../types' +import {Maybe} from '../../../util' + +/** + * Simple queue implementation that executes items immediately in the current process. + */ +@Injectable() +export class SyncQueue extends Queue { + @Inject() + protected readonly logging!: Logging + + protected async push(item: ShouldQueue): Promise { + await item.execute() + } + + async pop(): Promise>> { + return undefined + } +} diff --git a/src/support/bus/queue/event/PushedToQueue.ts b/src/support/bus/queue/event/PushedToQueue.ts new file mode 100644 index 0000000..ff78cc3 --- /dev/null +++ b/src/support/bus/queue/event/PushedToQueue.ts @@ -0,0 +1,17 @@ +import {Event, Queueable, ShouldQueue} from '../../types' +import {uuid4} from '../../../../util' + +/** + * Event fired after an item is pushed to the queue. + */ +export class PushedToQueue> implements Event { + public readonly eventName = '@extollo/lib:PushedToQueue' + + public readonly eventUuid = uuid4() + + public readonly shouldBroadcast = true + + constructor( + public readonly item: T, + ) {} +} diff --git a/src/support/bus/queue/event/PushingToQueue.ts b/src/support/bus/queue/event/PushingToQueue.ts new file mode 100644 index 0000000..5a32b1b --- /dev/null +++ b/src/support/bus/queue/event/PushingToQueue.ts @@ -0,0 +1,17 @@ +import {Event, Queueable, ShouldQueue} from '../../types' +import {uuid4} from '../../../../util' + +/** + * Event fired before an item is pushed to the queue. + */ +export class PushingToQueue> implements Event { + public readonly eventName = '@extollo/lib:PushingToQueue' + + public readonly eventUuid = uuid4() + + public readonly shouldBroadcast = true + + constructor( + public readonly item: T, + ) {} +} diff --git a/src/support/bus/serial/BaseSerializer.ts b/src/support/bus/serial/BaseSerializer.ts new file mode 100644 index 0000000..fc99161 --- /dev/null +++ b/src/support/bus/serial/BaseSerializer.ts @@ -0,0 +1,84 @@ +import {Awaitable, JSONState} from '../../../util' +import {SerialPayload} from '../types' +import {Serialization} from './Serialization' +import {Container, TypedDependencyKey} from '../../../di' +import {Request} from '../../../http/lifecycle/Request' +import {RequestLocalStorage} from '../../../http/RequestLocalStorage' + +/** + * A core Serializer implementation. + */ +export abstract class BaseSerializer { + + /** + * Return true if the value can be encoded by this serializer. + * @param some + */ + public abstract matchActual(some: TActual): boolean + + /** + * Return true if the serial payload can be decoded by this serializer. + * @param serial + */ + public matchSerial(serial: SerialPayload): boolean { + return serial.serializer === this.getName() + } + + /** + * Encode the payload as a JSON state. + * @param actual + * @protected + */ + protected abstract encodeActual(actual: TActual): Awaitable + + /** + * Decode the payload back to the original object. + * @param serial + * @protected + */ + protected abstract decodeSerial(serial: TSerial): Awaitable + + /** + * Get the unique name of this serializer. + * @protected + */ + protected abstract getName(): string + + /** + * Encode a value to a serial payload. + * @param actual + */ + public async encode(actual: TActual): Promise> { + return { + serializer: this.getName(), + payload: await this.encodeActual(actual), + } + } + + /** + * Decode a value from a serial payload. + * @param serial + */ + public async decode(serial: SerialPayload): Promise { + return this.decodeSerial(serial.payload) + } + + /** Helper to get an instance of the Serialization service. */ + protected getSerialization(): Serialization { + return Container.getContainer() + .make(Serialization) + } + + /** Helper to get an instance of the global Request. */ + protected getRequest(): Request { + return Container.getContainer() + .make(RequestLocalStorage) + .get() + } + + /** Get a dependency from the container. */ + protected make(key: TypedDependencyKey): T { + return Container.getContainer() + .make(key) + } +} diff --git a/src/support/bus/serial/Serialization.ts b/src/support/bus/serial/Serialization.ts new file mode 100644 index 0000000..93db927 --- /dev/null +++ b/src/support/bus/serial/Serialization.ts @@ -0,0 +1,147 @@ +import {Container, Inject, Instantiable, Singleton} from '../../../di' +import {Awaitable, Collection, ErrorWithContext, JSONState} from '../../../util' +import {Serializer, SerialPayload} from '../types' +import {Validator} from '../../../validation/Validator' + +/** + * Error thrown when attempting to (de-)serialize an object and a serializer cannot be found. + */ +export class NoSerializerError extends ErrorWithContext { + // eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types + constructor(object: any, context?: {[key: string]: any}) { + super('The object could not be (de-)serialized, as no compatible serializer has been registered.', { + object, + ...(context || {}), + }) + } +} + +/** + * Encode a value to JSON using a registered serializer. + * @throws NoSerializerError + * @param value + */ +export function encode(value: T): Promise { + return Container.getContainer() + .make(Serialization) + .encodeJSON(value) +} + +/** + * Decode a value from JSON using a registered serializer. + * @throws NoSerializerError + * @param payload + * @param validator + */ +export function decode(payload: string, validator?: Validator): Awaitable { + return Container.getContainer() + .make(Serialization) + .decodeJSON(payload, validator) +} + +interface RegisteredSerializer> { + key: Instantiable, + instance?: T +} + +/** + * Service that manages (de-)serialization of objects. + */ +@Singleton() +export class Serialization { + @Inject() + protected readonly injector!: Container + + /** + * Serializers registered with the service. + * We store the DI keys and realize them as needed, rather than at register time + * since most registration is done via the @ObjectSerializer decorator and the + * ContainerBlueprint. Realizing them at that time can cause loops in the DI call + * to realizeContainer since getContainer() -> realizeContainer() -> make the serializer + * -> getContainer(). This largely defers the realization until after all the DI keys + * are registered with the global Container. + */ + protected serializers: Collection>> = new Collection() + + /** Register a new serializer with the service. */ + public register(serializer: Instantiable>): this { + // Prepend instead of push so that later-registered serializers are prioritized when matching + this.serializers.prepend({ + key: serializer, + }) + + return this + } + + protected matchActual(actual: T): Serializer { + for ( const serializer of this.serializers ) { + if ( !serializer.instance ) { + serializer.instance = this.injector.make(serializer.key) + } + + if ( serializer.instance?.matchActual(actual) ) { + return serializer.instance as Serializer + } + } + + throw new NoSerializerError(actual) + } + + protected matchSerial(serial: SerialPayload): Serializer { + for ( const serializer of this.serializers ) { + if ( !serializer.instance ) { + serializer.instance = this.injector.make(serializer.key) + } + + if ( serializer.instance?.matchSerial(serial) ) { + return serializer.instance as Serializer + } + } + + throw new NoSerializerError(serial) + } + + /** + * Encode a value to its serial payload using a registered serializer, if one exists. + * @throws NoSerializerError + * @param value + */ + public encode(value: T): Awaitable> { + return this.matchActual(value).encode(value) + } + + /** + * Encode a value to JSON using a registered serializer, if one exists. + * @throws NoSerializerError + * @param value + */ + public async encodeJSON(value: T): Promise { + return JSON.stringify(await this.encode(value)) + } + + /** + * Decode a serial payload to the original object using a registered serializer, if one exists. + * @throws NoSerializerError + * @param payload + * @param validator + */ + public decode(payload: SerialPayload, validator?: Validator): Awaitable { + const matched = this.matchSerial(payload) + const decoded = matched.decode(payload) as Awaitable + if ( validator ) { + return validator.parse(decoded) + } + + return decoded + } + + /** + * Decode a value from JSON using a registered serializer, if one exists. + * @throws NoSerializerError + * @param payload + * @param validator + */ + public async decodeJSON(payload: string, validator?: Validator): Promise { + return this.decode(JSON.parse(payload), validator) + } +} diff --git a/src/support/bus/serial/SimpleCanonicalItemSerializer.ts b/src/support/bus/serial/SimpleCanonicalItemSerializer.ts new file mode 100644 index 0000000..633aadd --- /dev/null +++ b/src/support/bus/serial/SimpleCanonicalItemSerializer.ts @@ -0,0 +1,85 @@ +import {CanonicalItemClass} from '../../CanonicalReceiver' +import {BaseSerializer} from './BaseSerializer' +import {Awaitable, ErrorWithContext, JSONState, Rehydratable} from '../../../util' +import {Container, Inject, Injectable} from '../../../di' +import {Canon} from '../../../service/Canon' + +/** State encoded by this class. */ +export interface SimpleCanonicalItemSerialState extends JSONState { + rehydrate?: JSONState + canonicalIdentifier: string +} + +/** + * A serializer implementation that serializes class instances derived from the Canon loading system. + * These instances must be CanonicalItemClass instances and take no constructor parameters. + * If the instance is Rehydratable, then the state will be (re-)stored. + */ +@Injectable() +export class SimpleCanonicalItemSerializer extends BaseSerializer { + @Inject() + protected readonly canon!: Canon + + @Inject() + protected readonly container!: Container + + protected decodeSerial(serial: SimpleCanonicalItemSerialState): Awaitable { + const canon = this.canon.getFromFullyQualified(serial.canonicalIdentifier) + if ( !canon ) { + throw new ErrorWithContext('Unable to decode serialized payload: the canonical identifier was not found', { + serial, + }) + } + + if ( canon instanceof CanonicalItemClass ) { + if ( serial.rehydrate && typeof (canon as any).rehydrate === 'function' ) { + (canon as unknown as Rehydratable).rehydrate(serial.rehydrate) + } + + return canon as TActual + } else if ( canon?.prototype instanceof CanonicalItemClass ) { + const inst = this.container.make(canon) + if ( serial.rehydrate && typeof (inst as any).rehydrate === 'function' ) { + (inst as unknown as Rehydratable).rehydrate(serial.rehydrate) + } + + return inst as TActual + } + + throw new ErrorWithContext('Attempted to instantiate serialized item into non-Canonical class', { + canon, + serial, + }) + } + + protected async encodeActual(actual: TActual): Promise { + const ctor = actual.constructor as typeof CanonicalItemClass + const canonicalIdentifier = ctor.getFullyQualifiedCanonicalResolver() + if ( !canonicalIdentifier ) { + throw new ErrorWithContext('Unable to determine Canonical resolver for serialization.', [ + actual, + ]) + } + + const state: SimpleCanonicalItemSerialState = { + canonicalIdentifier, + } + + if ( typeof (actual as any).dehydrate === 'function' ) { + state.rehydrate = await (actual as unknown as Rehydratable).dehydrate() + } + + return state + } + + protected getName(): string { + return '@extollo/lib:SimpleCanonicalItemSerializer' + } + + matchActual(some: TActual): boolean { + return ( + some instanceof CanonicalItemClass + && some.constructor.length === 0 + ) + } +} diff --git a/src/support/bus/serial/decorators.ts b/src/support/bus/serial/decorators.ts new file mode 100644 index 0000000..2dcf75c --- /dev/null +++ b/src/support/bus/serial/decorators.ts @@ -0,0 +1,21 @@ +import {ContainerBlueprint, Instantiable, isInstantiableOf} from '../../../di' +import {JSONState, logIfDebugging} from '../../../util' +import {BaseSerializer} from './BaseSerializer' +import {Serialization} from './Serialization' +import {Serializer} from '../types' + +/** + * Register a class as an object serializer with the Serialization service. + * @constructor + */ +export const ObjectSerializer = (): >>(target: TFunction) => TFunction | void => { + return (target: Instantiable>) => { + if ( isInstantiableOf(target, BaseSerializer) ) { + logIfDebugging('extollo.bus.serial.decorators', 'Registering ObjectSerializer blueprint:', target) + ContainerBlueprint.getContainerBlueprint() + .onResolve(Serialization, serial => serial.register(target)) + } else { + logIfDebugging('extollo.bus.serial.decorators', 'Skipping ObjectSerializer blueprint:', target) + } + } +} diff --git a/src/support/bus/types.ts b/src/support/bus/types.ts new file mode 100644 index 0000000..8b2b132 --- /dev/null +++ b/src/support/bus/types.ts @@ -0,0 +1,94 @@ +import {Awaitable, JSONState, Maybe, Pipeline, TypeTag, uuid4} from '../../util' +import {StaticInstantiable} from '../../di' + +// eslint-disable-next-line @typescript-eslint/no-unused-vars +export interface SerialPayload extends JSONState { + serializer: string + payload: TSerial +} + +export interface Serializer { + matchActual(some: TActual): boolean + + matchSerial(serial: SerialPayload): boolean + + encode(actual: TActual): Awaitable> + + decode(serial: SerialPayload): Awaitable +} + +export interface Event { + eventUuid: string + eventName: string + originBusUuid?: string + shouldBroadcast?: boolean | (() => Awaitable) +} + +export abstract class BaseEvent implements Event { + public readonly eventUuid = uuid4() + + public abstract eventName: string + + public shouldBroadcast(): Awaitable { + return true + } +} + +export type EventHandlerReturn = Awaitable + +export type EventHandler = (event: T) => EventHandlerReturn + +export interface EventHandlerSubscription { + unsubscribe(): Awaitable +} + +export interface Queueable { + execute(): Awaitable + + shouldQueue?: boolean | (() => boolean) + + defaultQueue?: string | (() => string) +} + +export type ShouldQueue = T & TypeTag<'@extollo/lib:ShouldQueue'> + +export function shouldQueue(something: T): something is ShouldQueue { + if ( typeof something.shouldQueue === 'function' ) { + return something.shouldQueue() + } + + return ( + typeof something.shouldQueue === 'undefined' + || something.shouldQueue + ) +} + +export interface EventBus { + readonly uuid: string + + subscribe(eventKey: StaticInstantiable, handler: EventHandler): Awaitable + + pipe(eventKey: StaticInstantiable, line: Pipeline): Awaitable + + push(event: TEvent): Awaitable + + up(): Awaitable + + down(): Awaitable +} + +/** Internal storage format for local event bus subscribers. */ +export interface BusSubscriber { + uuid: string + eventName: string + eventKey: StaticInstantiable + handler: EventHandler +} + +export interface BusQueue { + readonly name: string + + dispatch(item: T): Promise + + pop(): Promise> +} diff --git a/src/support/queue/Queue.ts b/src/support/queue/Queue.ts deleted file mode 100644 index d3f9982..0000000 --- a/src/support/queue/Queue.ts +++ /dev/null @@ -1,190 +0,0 @@ -import {Awaitable, ErrorWithContext, JSONState, Maybe, Rehydratable, Cache} from '../../util' -import {CanonicalItemClass} from '../CanonicalReceiver' -import {Container, Inject, Injectable, isInstantiable} from '../../di' -import {Canon} from '../../service/Canon' - -/** Type annotation for a Queueable that should be pushed onto a queue. */ -export type ShouldQueue = T & Queueable - -/** - * Base class for an object that can be pushed to/popped from a queue. - */ -export abstract class Queueable extends CanonicalItemClass implements Rehydratable { - abstract dehydrate(): Awaitable - - abstract rehydrate(state: JSONState): Awaitable - - /** - * When the item is popped from the queue, this method is called. - */ - public abstract execute(): Awaitable - - /** - * Determine whether the object should be pushed to the queue or not. - */ - public shouldQueue(): boolean { - return true - } - - /** - * The name of the queue where this object should be pushed by default. - */ - public defaultQueue(): string { - return 'default' - } - - /** - * Get the canonical resolver so we can re-instantiate this class from the queue. - * Throw an error if it could not be determined. - */ - public getFullyQualifiedCanonicalResolver(): string { - const resolver = (this.constructor as typeof Queueable).getFullyQualifiedCanonicalResolver() - if ( !resolver ) { - throw new ErrorWithContext('Cannot push Queueable onto queue: missing canonical resolver.') - } - - return resolver - } -} - -/** - * Truth function that returns true if an object implements the same interface as Queueable. - * This is done in case some external library needs to be incorporated as the base class for - * a Queueable, and cannot be made to extend Queueable. - * @param something - */ -export function isQueueable(something: unknown): something is Queueable { - if ( something instanceof Queueable ) { - return true - } - - return ( - typeof something === 'function' - && typeof (something as any).dehydrate === 'function' - && typeof (something as any).rehydrate === 'function' - && typeof (something as any).shouldQueue === 'function' - && typeof (something as any).defaultQueue === 'function' - && typeof (something as any).getFullyQualifiedCanonicalResolver === 'function' - ) -} - -/** - * Truth function that returns true if the given object is Queueable and wants to be - * pushed onto the queue. - * @param something - */ -export function shouldQueue(something: T): something is ShouldQueue { - return isQueueable(something) && something.shouldQueue() -} - -/** - * A multi-node queue that accepts & reinstantiates Queueables. - * - * @example - * There are several queue backends your application may use. These are - * configured via the `queue` config. To get the default queue, however, - * use this class as a DI token: - * ```ts - * this.container().make(Queue) - * ``` - * - * This will resolve the concrete implementation configured by your app. - */ -@Injectable() -export class Queue { - @Inject() - protected readonly cache!: Cache - - @Inject() - protected readonly canon!: Canon - - @Inject('injector') - protected readonly injector!: Container - - constructor( - public readonly name: string, - ) { } - - public get queueIdentifier(): string { - return `extollo__queue__${this.name}` - } - - /** Get the number of items waiting in the queue. */ - // public abstract length(): Awaitable - - /** Push a new queueable onto the queue. */ - public async push(item: ShouldQueue): Promise { - const data = { - q: true, - r: item.getFullyQualifiedCanonicalResolver(), - d: await item.dehydrate(), - } - - await this.cache.arrayPush(this.queueIdentifier, JSON.stringify(data)) - } - - /** Remove and return a queueable from the queue. */ - public async pop(): Promise> { - const item = await this.cache.arrayPop(this.queueIdentifier) - if ( !item ) { - return - } - - const data = JSON.parse(item) - if ( !data.q || !data.r ) { - throw new ErrorWithContext('Cannot pop Queueable: payload is invalid.', { - data, - queueName: this.name, - queueIdentifier: this.queueIdentifier, - }) - } - - const canonicalItem = this.canon.getFromFullyQualified(data.r) - if ( !canonicalItem ) { - throw new ErrorWithContext('Cannot pop Queueable: canonical name is not resolvable', { - data, - queueName: this.name, - queueIdentifier: this.queueIdentifier, - canonicalName: data.r, - }) - } - - if ( !isInstantiable(canonicalItem) ) { - throw new ErrorWithContext('Cannot pop Queueable: canonical item is not instantiable', { - data, - canonicalItem, - queueName: this.name, - queueIdentifier: this.queueIdentifier, - canonicalName: data.r, - }) - } - - const instance = this.injector.make(canonicalItem) - if ( !isQueueable(instance) ) { - throw new ErrorWithContext('Cannot pop Queueable: canonical item instance is not Queueable', { - data, - canonicalItem, - instance, - queueName: this.name, - queueIdentifier: this.queueIdentifier, - canonicalName: data.r, - }) - } - - await instance.rehydrate(data.d) - return instance - } - - /** Push a raw payload onto the queue. */ - public async pushRaw(item: JSONState): Promise { - await this.cache.arrayPush(this.queueIdentifier, JSON.stringify(item)) - } - - /** Remove and return a raw payload from the queue. */ - public async popRaw(): Promise> { - const item = await this.cache.arrayPop(this.queueIdentifier) - if ( item ) { - return JSON.parse(item) - } - } -} diff --git a/src/support/redis/Redis.ts b/src/support/redis/Redis.ts index 73e390c..f6cb364 100644 --- a/src/support/redis/Redis.ts +++ b/src/support/redis/Redis.ts @@ -48,14 +48,20 @@ export class Redis extends Unit { */ public async getConnection(): Promise { if ( !this.connection ) { - const options = this.config.get('redis.connection') as RedisOptions - this.logging.verbose(options) - this.connection = new IORedis(options) + this.connection = await this.getNewConnection() } return this.connection } + /** + * Get a new IORedis connection instance. + */ + public async getNewConnection(): Promise { + const options = this.config.get('redis.connection') as RedisOptions + return new IORedis(options) + } + /** * Get the IORedis connection in an AsyncPipe. */ diff --git a/src/util/collection/AsyncCollection.ts b/src/util/collection/AsyncCollection.ts index ca85531..0accd4d 100644 --- a/src/util/collection/AsyncCollection.ts +++ b/src/util/collection/AsyncCollection.ts @@ -8,6 +8,7 @@ import { import {Iterable, StopIteration} from './Iterable' import {applyWhere, WhereOperator} from './where' import {AsyncPipe, Pipeline} from '../support/Pipe' +import {Awaitable} from '../support/types' type AsyncCollectionComparable = CollectionItem[] | Collection | AsyncCollection type AsyncKeyFunction = (item: CollectionItem, index: number) => CollectionItem | Promise> type AsyncCollectionFunction = (items: AsyncCollection) => T2 @@ -39,14 +40,26 @@ export class AsyncCollection { private async inChunksAll(key: KeyOperator, callback: (items: Collection) => any): Promise { await this.storedItems.chunk(this.iteratorChunkSize, async items => { - await callback(items.pluck(key)) + if ( typeof key !== 'function' ) { + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + key = x => x[key] + } + + await callback(items.map(key)) }) await this.storedItems.reset() } private async inChunksAllNumbers(key: KeyOperator, callback: (items: number[]) => any): Promise { await this.storedItems.chunk(this.iteratorChunkSize, async items => { - await callback(items.pluck(key).map(x => Number(x)) + if ( typeof key !== 'function' ) { + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + key = x => x[key] + } + + await callback(items.map(key).map(x => Number(x)) .all()) }) await this.storedItems.reset() @@ -275,12 +288,12 @@ export class AsyncCollection { * @param {function} operator - item => boolean * @return Promise */ - async some(operator: (item: T) => boolean): Promise { + async some(operator: (item: T) => Awaitable): Promise { let contains = false - await this.inChunks(items => { + await this.inChunks(async items => { for ( const item of items.all() ) { - if ( operator(item) ) { + if ( await operator(item) ) { contains = true throw new StopIteration() } @@ -394,6 +407,14 @@ export class AsyncCollection { return new Collection(newItems) } + /** + * Like filter, but inverted. That is, filters out items that DO match the criterion. + * @param func + */ + async filterOut(func: KeyFunction): Promise> { + return this.filter(async (...args) => !(await func(...args))) + } + /** * Calls the passed in function if the boolean condition is true. Allows for functional syntax. * @param {boolean} bool @@ -677,14 +698,16 @@ export class AsyncCollection { * @param {KeyOperator} key * @return Promise */ - async pluck(key: KeyOperator): Promise> { - let newItems: CollectionItem[] = [] + async pluck(key: T2): Promise> { + let newItems: CollectionItem[] = [] await this.inChunksAll(key, async items => { + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore newItems = newItems.concat(items.all()) }) - return new Collection(newItems) + return new Collection(newItems) } /** diff --git a/src/util/collection/Collection.ts b/src/util/collection/Collection.ts index 0224bd1..18defbf 100644 --- a/src/util/collection/Collection.ts +++ b/src/util/collection/Collection.ts @@ -14,7 +14,9 @@ type MaybeCollectionIndex = CollectionIndex | undefined type ComparisonFunction = (item: CollectionItem, otherItem: CollectionItem) => number import { WhereOperator, applyWhere, whereMatch } from './where' -import {Awaitable, Either, isLeft, Maybe, MethodsOf, right, unright} from '../support/types' +import {Awaitable, Awaited, Either, isLeft, Maybe, MethodsOf, right, unright} from '../support/types' +import {AsyncCollection} from './AsyncCollection' +import {ArrayIterable} from './ArrayIterable' const collect = (items: CollectionItem[]): Collection => Collection.collect(items) const toString = (item: unknown): string => String(item) @@ -265,6 +267,16 @@ class Collection { return new Collection(matches) } + /** + * Like diff, but mutates the current collection. + * @param items + */ + diffInPlace(items: CollectionComparable): this { + const exclude = items instanceof Collection ? items.all() : items + this.storedItems = this.storedItems.filter(item => !exclude.includes(item)) + return this + } + /** * Return a collection of items that ARE in this collection, but NOT In the `items` collection * using a helper function to determine whether two items are equal. @@ -355,6 +367,14 @@ class Collection { return right(new Collection(newItems)) } + /** + * Get the collection as an AsyncCollection. + */ + toAsync(): AsyncCollection { + const iter = new ArrayIterable([...this.storedItems]) + return new AsyncCollection(iter) + } + /** * Map a method on the underlying type, passing it any required parameters. * This is delightfully type-safe. @@ -365,6 +385,22 @@ class Collection { return this.map(x => x[method](...params)) } + /** + * Shortcut for .mapCall(...).awaitAll(). + * @param method + * @param params + */ + async awaitMapCall>(method: T2, ...params: Parameters): Promise>>> { + return this.mapCall(method, ...params).awaitAll() + } + + /** + * Await all values in the collection. + */ + async awaitAll(): Promise>> { + return this.promiseMap(async x => x as Awaited) + } + /** * Map each element in the collection to a string. */ @@ -462,6 +498,14 @@ class Collection { return new Collection(this.storedItems.filter(func ?? Boolean)) } + /** + * Like filter, but inverted. That is, removes items that DO match the criterion. + * @param func + */ + filterOut(func?: KeyFunction): Collection { + return this.filter((...args) => !(func ?? Boolean)(...args)) + } + whereDefined(): Collection> { return this.filter() as unknown as Collection> } @@ -793,8 +837,8 @@ class Collection { * * @param key */ - pluck(key: KeyOperator): Collection { - return new Collection(this.allOperator(key)) + pluck(key: T2): Collection { + return new Collection(this.allOperator(key)) } /** @@ -1188,7 +1232,7 @@ class Collection { * * @param func */ - tap(func: CollectionFunction): Collection { + tap(func: CollectionFunction): this { func(this) return this } diff --git a/src/util/support/types.ts b/src/util/support/types.ts index 7033ed9..8fe5ced 100644 --- a/src/util/support/types.ts +++ b/src/util/support/types.ts @@ -72,3 +72,5 @@ export type TypeArraySignature = (...params: TA export type MethodsOf any> = { [K in keyof T]: T[K] extends TMethod ? K : never }[keyof T] + +export type Awaited = T extends PromiseLike ? U : T