Refactor event bus and queue system; detect cycles in DI realization and make
Some checks failed
continuous-integration/drone/push Build is failing

This commit is contained in:
2022-01-26 19:37:54 -06:00
parent 506fb55c74
commit 6d1cf18680
69 changed files with 1673 additions and 720 deletions

View File

@@ -4,8 +4,8 @@ import {Inject, Injectable, InjectParam} from '../di'
import {Application} from '../lifecycle/Application'
import {Logging} from '../service/Logging'
import {NodeModule, ExtolloAwareNodeModule} from './types'
import {EventBus} from '../event/EventBus'
import {PackageDiscovered} from './PackageDiscovered'
import {Bus} from './bus'
/**
* A helper class for discovering and interacting with
@@ -17,7 +17,7 @@ export class NodeModules {
protected readonly logging!: Logging
@Inject()
protected readonly bus!: EventBus
protected readonly bus!: Bus
constructor(
@InjectParam(Application.NODE_MODULES_INJECTION)
@@ -102,7 +102,7 @@ export class NodeModules {
this.logging.info(`Auto-discovering package: ${key}`)
seen.push(key)
await this.bus.dispatch(new PackageDiscovered(packageJsonData, packageJson.clone()))
await this.bus.push(new PackageDiscovered(packageJsonData, packageJson.clone()))
const packageNodeModules = packageJson.concat('..', 'node_modules')
if ( await packageNodeModules.exists() && packageJsonData?.extollo?.recursiveDependencies?.discover ) {

View File

@@ -1,6 +1,6 @@
import {Event} from '../event/Event'
import {Awaitable, JSONState, UniversalPath} from '../util'
import {Awaitable, UniversalPath} from '../util'
import {ExtolloAwareNodeModule} from './types'
import {BaseEvent} from './bus'
/**
* An event indicating that an NPM package has been discovered
@@ -9,7 +9,7 @@ import {ExtolloAwareNodeModule} from './types'
* Application services can listen for this event to register
* various discovery logic (e.g. automatically boot units
*/
export class PackageDiscovered extends Event {
export class PackageDiscovered extends BaseEvent {
constructor(
public packageConfig: ExtolloAwareNodeModule,
public packageJson: UniversalPath,
@@ -17,17 +17,9 @@ export class PackageDiscovered extends Event {
super()
}
dehydrate(): Awaitable<JSONState> {
return {
packageConfig: this.packageConfig as JSONState,
packageJson: this.packageJson.toString(),
}
}
eventName = '@extollo/lib.PackageDiscovered'
rehydrate(state: JSONState): Awaitable<void> {
if ( typeof state === 'object' ) {
this.packageConfig = (state.packageConfig as ExtolloAwareNodeModule)
this.packageJson = new UniversalPath(String(state.packageJson))
}
shouldBroadcast(): Awaitable<boolean> {
return false
}
}

189
src/support/bus/Bus.ts Normal file
View File

@@ -0,0 +1,189 @@
import {Inject, Singleton, StaticInstantiable} from '../../di'
import {BusSubscriber, Event, EventBus, EventHandler, EventHandlerReturn, EventHandlerSubscription} from './types'
import {Awaitable, Collection, Pipeline, uuid4} from '../../util'
import {Logging} from '../../service/Logging'
import {Unit} from '../../lifecycle/Unit'
export interface BusInternalSubscription {
busUuid: string
subscriberUuid: string
subscription: EventHandlerSubscription
}
/**
* Propagating event bus implementation.
*/
@Singleton()
export class Bus<TEvent extends Event = Event> extends Unit implements EventBus<TEvent> {
@Inject()
protected readonly logging!: Logging
public readonly uuid = uuid4()
/** Local listeners subscribed to events on this bus. */
protected subscribers: Collection<BusSubscriber<Event>> = new Collection()
/** Connections to other event busses to be propagated. */
protected connectors: Collection<EventBus> = new Collection()
protected subscriptions: Collection<BusInternalSubscription> = new Collection()
/** True if the bus has been initialized. */
private isUp = false
/**
* Push an event onto the bus.
* @param event
*/
async push(event: TEvent): Promise<void> {
if ( event.originBusUuid === this.uuid ) {
return
}
if ( !event.originBusUuid ) {
event.originBusUuid = this.uuid
}
if ( await this.callSubscribers(event) ) {
// One of the subscribers halted propagation of the event
return
}
if ( await this.shouldBroadcast(event) ) {
await this.connectors.awaitMapCall('push', event)
}
}
/** Returns true if the given event should be pushed to connected event busses. */
protected async shouldBroadcast(event: Event): Promise<boolean> {
if ( typeof event.shouldBroadcast === 'function' ) {
return event.shouldBroadcast()
}
return Boolean(event.shouldBroadcast)
}
/**
* Call all local listeners for the given event. Returns true if the propagation
* of the event should be halted.
* @param event
* @protected
*/
protected async callSubscribers(event: TEvent): Promise<boolean> {
return this.subscribers
.filter(sub => event instanceof sub.eventKey)
.pluck('handler')
.toAsync()
.some(handler => handler(event))
}
/** Register a pipeline as an event handler. */
pipe<T extends TEvent>(eventKey: StaticInstantiable<T>, line: Pipeline<T, EventHandlerReturn>): Awaitable<EventHandlerSubscription> {
return this.subscribe(eventKey, event => line.apply(event))
}
/**
* Subscribe to an event on the bus.
* @param eventKey
* @param handler
*/
async subscribe<T extends TEvent>(eventKey: StaticInstantiable<T>, handler: EventHandler<T>): Promise<EventHandlerSubscription> {
const uuid = uuid4()
this.subscribers.push({
eventName: eventKey.prototype.eventName, // FIXME this is not working
handler,
eventKey,
uuid,
} as unknown as BusSubscriber<Event>)
this.subscriptions.concat(await this.connectors
.promiseMap<BusInternalSubscription>(async bus => {
return {
busUuid: bus.uuid,
subscriberUuid: uuid,
subscription: await bus.subscribe(eventKey, (event: T) => {
if ( event.originBusUuid !== this.uuid ) {
return handler(event)
}
}),
}
}))
return {
unsubscribe: async () => {
this.subscribers = this.subscribers.where('uuid', '!=', uuid)
await this.subscriptions
.where('subscriberUuid', '=', uuid)
.tap(trashed => this.subscriptions.diffInPlace(trashed))
.pluck('subscription')
.awaitMapCall('unsubscribe')
},
}
}
/** Connect an external event bus to this bus. */
async connect(bus: EventBus): Promise<void> {
if ( this.isUp ) {
await bus.up()
}
this.connectors.push(bus)
await this.subscribers
.promiseMap<BusInternalSubscription>(async subscriber => {
return {
busUuid: bus.uuid,
subscriberUuid: subscriber.uuid,
subscription: await bus.subscribe(subscriber.eventKey, event => {
if ( event.originBusUuid !== this.uuid ) {
return subscriber.handler(event)
}
}),
}
})
}
async disconnect(bus: EventBus): Promise<void> {
await this.subscriptions
.where('busUuid', '=', bus.uuid)
.tap(trashed => this.subscriptions.diffInPlace(trashed))
.pluck('subscription')
.awaitMapCall('unsubscribe')
if ( this.isUp ) {
await bus.down()
}
this.connectors.diffInPlace([bus])
}
/** Initialize this event bus. */
async up(): Promise<void> {
if ( this.isUp ) {
this.logging.warn('Attempted to boot more than once. Skipping.')
return
}
await this.connectors.awaitMapCall('up')
this.isUp = true
}
/** Clean up this event bus. */
async down(): Promise<void> {
if ( !this.isUp ) {
this.logging.warn('Attempted to shut down but was never properly booted. Skipping.')
return
}
await this.subscriptions
.pluck('subscription')
.awaitMapCall('unsubscribe')
await this.connectors.awaitMapCall('down')
this.isUp = false
}
}

130
src/support/bus/LocalBus.ts Normal file
View File

@@ -0,0 +1,130 @@
import {Inject, Injectable, StaticInstantiable} from '../../di'
import {BusSubscriber, Event, EventBus, EventHandler, EventHandlerReturn, EventHandlerSubscription} from './types'
import {Awaitable, Collection, Pipeline, uuid4} from '../../util'
import {Logging} from '../../service/Logging'
import {Bus, BusInternalSubscription} from './Bus'
import {AppClass} from '../../lifecycle/AppClass'
/**
* Non-connectable event bus implementation. Can forward events to the main Bus instance.
*/
@Injectable()
export class LocalBus<TEvent extends Event = Event> extends AppClass implements EventBus<TEvent> {
@Inject()
protected readonly logging!: Logging
@Inject()
protected readonly bus!: Bus
public readonly uuid = uuid4()
/** Local listeners subscribed to events on this bus. */
protected subscribers: Collection<BusSubscriber<TEvent>> = new Collection()
protected subscriptions: Collection<BusInternalSubscription> = new Collection()
/** True if the bus has been initialized. */
private isUp = false
/**
* Push an event onto the bus.
* @param event
*/
async push(event: TEvent): Promise<void> {
if ( event.originBusUuid === this.uuid ) {
return
}
if ( !event.originBusUuid ) {
event.originBusUuid = this.uuid
}
if ( await this.callSubscribers(event) ) {
// One of the subscribers halted propagation of the event
return
}
if ( await this.shouldBroadcast(event) ) {
await this.bus.push(event)
}
}
/** Returns true if the given event should be pushed to connected event busses. */
protected async shouldBroadcast(event: TEvent): Promise<boolean> {
if ( typeof event.shouldBroadcast === 'function' ) {
return event.shouldBroadcast()
}
return Boolean(event.shouldBroadcast)
}
/**
* Call all local listeners for the given event. Returns true if the propagation
* of the event should be halted.
* @param event
* @protected
*/
protected async callSubscribers(event: TEvent): Promise<boolean> {
return this.subscribers
.filter(sub => event instanceof sub.eventKey)
.pluck('handler')
.toAsync()
.some(handler => handler(event))
}
/** Register a pipeline as an event handler. */
pipe<T extends TEvent>(eventKey: StaticInstantiable<T>, line: Pipeline<T, EventHandlerReturn>): Awaitable<EventHandlerSubscription> {
return this.subscribe(eventKey, event => line.apply(event))
}
/**
* Subscribe to an event on the bus.
* @param eventKey
* @param handler
*/
async subscribe<T extends TEvent>(eventKey: StaticInstantiable<T>, handler: EventHandler<T>): Promise<EventHandlerSubscription> {
const uuid = uuid4()
this.subscribers.push({
eventName: eventKey.prototype.eventName,
handler,
eventKey,
uuid,
} as unknown as BusSubscriber<TEvent>)
return {
unsubscribe: async () => {
this.subscribers = this.subscribers.where('uuid', '!=', uuid)
await this.subscriptions
.where('subscriberUuid', '=', uuid)
.tap(trashed => this.subscriptions.diffInPlace(trashed))
.pluck('subscription')
.awaitMapCall('unsubscribe')
},
}
}
async up(): Promise<void> {
if ( this.isUp ) {
this.logging.warn('Attempted to boot more than once. Skipping.')
return
}
this.isUp = true
}
/** Clean up this event bus. */
async down(): Promise<void> {
if ( !this.isUp ) {
this.logging.warn('Attempted to shut down but was never properly booted. Skipping.')
return
}
await this.subscriptions
.pluck('subscription')
.awaitMapCall('unsubscribe')
this.isUp = false
}
}

110
src/support/bus/RedisBus.ts Normal file
View File

@@ -0,0 +1,110 @@
import {BusSubscriber, Event, EventBus, EventHandler, EventHandlerReturn, EventHandlerSubscription} from './types'
import {Inject, Injectable, StaticInstantiable} from '../../di'
import {Awaitable, Collection, Pipeline, uuid4} from '../../util'
import {Redis} from '../redis/Redis'
import {Serialization} from './serial/Serialization'
import * as IORedis from 'ioredis'
/**
* Event bus implementation that does pub/sub over a Redis connection.
*/
@Injectable()
export class RedisBus implements EventBus {
@Inject()
protected readonly redis!: Redis
@Inject()
protected readonly serial!: Serialization
public readonly uuid = uuid4()
/** List of events for which we have created Redis channel subscriptions. */
protected internalSubscriptions: string[] = []
/** List of local subscriptions on this bus. */
protected subscriptions: Collection<BusSubscriber<Event>> = new Collection()
protected subscriberConnection?: IORedis.Redis
protected publisherConnection?: IORedis.Redis
pipe<T extends Event>(eventKey: StaticInstantiable<T>, line: Pipeline<T, EventHandlerReturn>): Awaitable<EventHandlerSubscription> {
return this.subscribe(eventKey, event => line.apply(event))
}
async push(event: Event): Promise<void> {
if ( !this.publisherConnection ) {
throw new Error('Cannot push to RedisQueue: publisher connection is not initialized')
}
const channel = `ex-event-${event.eventName}`
const json = await this.serial.encodeJSON(event)
await this.publisherConnection.publish(channel, json)
}
async subscribe<T extends Event>(eventKey: StaticInstantiable<T>, handler: EventHandler<T>): Promise<EventHandlerSubscription> {
const uuid = uuid4()
const subscriber: BusSubscriber<Event> = {
eventName: eventKey.prototype.eventName,
eventKey,
handler,
uuid,
} as unknown as BusSubscriber<Event>
if ( !this.internalSubscriptions.includes(subscriber.eventName) ) {
await new Promise<void>((res, rej) => {
if ( !this.subscriberConnection ) {
return rej(new Error('RedisBus not initialized on subscription.'))
}
this.subscriberConnection.subscribe(`ex-event-${subscriber.eventName}`, err => {
if ( err ) {
return rej(err)
}
res()
})
})
this.internalSubscriptions.push(subscriber.eventName)
}
this.subscriptions.push(subscriber)
return {
unsubscribe: () => {
this.subscriptions = this.subscriptions.where('uuid', '!=', uuid)
},
}
}
protected async handleEvent(name: string, payload: string): Promise<void> {
const event = await this.serial.decodeJSON<Event>(payload)
await this.subscriptions
.where('eventName', '=', name)
.pluck('handler')
.map(handler => handler(event))
.awaitAll()
}
async up(): Promise<void> {
this.subscriberConnection = await this.redis.getNewConnection()
this.publisherConnection = await this.redis.getNewConnection()
this.subscriberConnection.on('message', (channel: string, message: string) => {
if ( !channel.startsWith('ex-event-') ) {
return
}
const name = channel.substr('ex-event-'.length)
this.handleEvent(name, message)
})
}
down(): Awaitable<void> {
this.subscriberConnection?.disconnect()
this.publisherConnection?.disconnect()
}
}

17
src/support/bus/index.ts Normal file
View File

@@ -0,0 +1,17 @@
export * from './types'
export * from './serial/BaseSerializer'
export * from './serial/SimpleCanonicalItemSerializer'
export * from './serial/Serialization'
export * from './serial/decorators'
export * from './Bus'
export * from './LocalBus'
export * from './RedisBus'
export * from './queue/event/PushingToQueue'
export * from './queue/event/PushedToQueue'
export * from './queue/Queue'
export * from './queue/CacheQueue'
export * from './queue/SyncQueue'
export * from './queue/QueueFactory'

View File

@@ -0,0 +1,33 @@
import {Queue} from './Queue'
import {Inject, Injectable} from '../../../di'
import {Cache, Maybe} from '../../../util'
import {Queueable, ShouldQueue} from '../types'
import {Serialization} from '../serial/Serialization'
/**
* Queue implementation that uses the configured cache driver as a queue.
*/
@Injectable()
export class CacheQueue extends Queue {
@Inject()
protected readonly cache!: Cache
@Inject()
protected readonly serial!: Serialization
protected get queueIdentifier(): string {
return `extollo__queue__${this.name}`
}
protected async push<T extends Queueable>(item: ShouldQueue<T>): Promise<void> {
const json = await this.serial.encodeJSON(item)
await this.cache.arrayPush(this.queueIdentifier, json)
}
async pop(): Promise<Maybe<ShouldQueue<Queueable>>> {
const popped = await this.cache.arrayPop(this.queueIdentifier)
if ( popped ) {
return this.serial.decodeJSON(popped)
}
}
}

View File

@@ -0,0 +1,31 @@
import {BusQueue, Queueable, shouldQueue, ShouldQueue} from '../types'
import {Inject, Injectable} from '../../../di'
import {Awaitable, Maybe} from '../../../util'
import {Bus} from '../Bus'
import {PushingToQueue} from './event/PushingToQueue'
import {PushedToQueue} from './event/PushedToQueue'
@Injectable()
export abstract class Queue implements BusQueue {
@Inject()
protected readonly bus!: Bus
constructor(
public readonly name: string,
) {}
async dispatch<T extends Queueable>(item: T): Promise<void> {
if ( shouldQueue(item) ) {
await this.bus.push(new PushingToQueue(item))
await this.push(item)
await this.bus.push(new PushedToQueue(item))
return
}
await item.execute()
}
protected abstract push<T extends Queueable>(item: ShouldQueue<T>): Awaitable<void>
abstract pop(): Promise<Maybe<ShouldQueue<Queueable>>>
}

View File

@@ -0,0 +1,87 @@
import {
AbstractFactory,
Container,
DependencyRequirement,
PropertyDependency,
isInstantiable,
DEPENDENCY_KEYS_METADATA_KEY,
DEPENDENCY_KEYS_PROPERTY_METADATA_KEY,
StaticInstantiable,
FactoryProducer,
} from '../../../di'
import {Collection, ErrorWithContext} from '../../../util'
import {Logging} from '../../../service/Logging'
import {Config} from '../../../service/Config'
import {Queue} from './Queue'
import {SyncQueue} from './SyncQueue'
/**
* Dependency container factory that matches the abstract Queue token, but
* produces an instance of whatever Queue driver is configured in the `server.queue.driver` config.
*/
@FactoryProducer()
export class QueueFactory extends AbstractFactory<Queue> {
/** true if we have printed the synchronous queue driver warning once. */
private static loggedSyncQueueWarningOnce = false
private di(): [Logging, Config] {
return [
Container.getContainer().make(Logging),
Container.getContainer().make(Config),
]
}
produce(): Queue {
return new (this.getQueueClass())()
}
match(something: unknown): boolean {
return something === Queue
}
getDependencyKeys(): Collection<DependencyRequirement> {
const meta = Reflect.getMetadata(DEPENDENCY_KEYS_METADATA_KEY, this.getQueueClass())
if ( meta ) {
return meta
}
return new Collection<DependencyRequirement>()
}
getInjectedProperties(): Collection<PropertyDependency> {
const meta = new Collection<PropertyDependency>()
let currentToken = this.getQueueClass()
do {
const loadedMeta = Reflect.getMetadata(DEPENDENCY_KEYS_PROPERTY_METADATA_KEY, currentToken)
if ( loadedMeta ) {
meta.concat(loadedMeta)
}
currentToken = Object.getPrototypeOf(currentToken)
} while (Object.getPrototypeOf(currentToken) !== Function.prototype && Object.getPrototypeOf(currentToken) !== Object.prototype)
return meta
}
/**
* Get the configured queue driver and return some Instantiable<Queue>.
* @protected
*/
protected getQueueClass(): StaticInstantiable<Queue> {
const [logging, config] = this.di()
const QueueClass = config.get('server.queue.driver', SyncQueue)
if ( QueueClass === SyncQueue && !QueueFactory.loggedSyncQueueWarningOnce ) {
logging.warn(`You are using the default synchronous queue driver. It is recommended you configure a background queue driver instead.`)
QueueFactory.loggedSyncQueueWarningOnce = true
}
if ( !isInstantiable(QueueClass) || !(QueueClass.prototype instanceof Queue) ) {
const e = new ErrorWithContext('Provided queue class does not extend from @extollo/lib.Queue')
e.context = {
configKey: 'server.queue.driver',
class: QueueClass.toString(),
}
}
return QueueClass
}
}

View File

@@ -0,0 +1,22 @@
import {Queue} from './Queue'
import {Inject, Injectable} from '../../../di'
import {Logging} from '../../../service/Logging'
import {Queueable, ShouldQueue} from '../types'
import {Maybe} from '../../../util'
/**
* Simple queue implementation that executes items immediately in the current process.
*/
@Injectable()
export class SyncQueue extends Queue {
@Inject()
protected readonly logging!: Logging
protected async push<T extends Queueable>(item: ShouldQueue<T>): Promise<void> {
await item.execute()
}
async pop(): Promise<Maybe<ShouldQueue<Queueable>>> {
return undefined
}
}

View File

@@ -0,0 +1,17 @@
import {Event, Queueable, ShouldQueue} from '../../types'
import {uuid4} from '../../../../util'
/**
* Event fired after an item is pushed to the queue.
*/
export class PushedToQueue<T extends ShouldQueue<Queueable>> implements Event {
public readonly eventName = '@extollo/lib:PushedToQueue'
public readonly eventUuid = uuid4()
public readonly shouldBroadcast = true
constructor(
public readonly item: T,
) {}
}

View File

@@ -0,0 +1,17 @@
import {Event, Queueable, ShouldQueue} from '../../types'
import {uuid4} from '../../../../util'
/**
* Event fired before an item is pushed to the queue.
*/
export class PushingToQueue<T extends ShouldQueue<Queueable>> implements Event {
public readonly eventName = '@extollo/lib:PushingToQueue'
public readonly eventUuid = uuid4()
public readonly shouldBroadcast = true
constructor(
public readonly item: T,
) {}
}

View File

@@ -0,0 +1,84 @@
import {Awaitable, JSONState} from '../../../util'
import {SerialPayload} from '../types'
import {Serialization} from './Serialization'
import {Container, TypedDependencyKey} from '../../../di'
import {Request} from '../../../http/lifecycle/Request'
import {RequestLocalStorage} from '../../../http/RequestLocalStorage'
/**
* A core Serializer implementation.
*/
export abstract class BaseSerializer<TActual, TSerial extends JSONState> {
/**
* Return true if the value can be encoded by this serializer.
* @param some
*/
public abstract matchActual(some: TActual): boolean
/**
* Return true if the serial payload can be decoded by this serializer.
* @param serial
*/
public matchSerial(serial: SerialPayload<TActual, TSerial>): boolean {
return serial.serializer === this.getName()
}
/**
* Encode the payload as a JSON state.
* @param actual
* @protected
*/
protected abstract encodeActual(actual: TActual): Awaitable<TSerial>
/**
* Decode the payload back to the original object.
* @param serial
* @protected
*/
protected abstract decodeSerial(serial: TSerial): Awaitable<TActual>
/**
* Get the unique name of this serializer.
* @protected
*/
protected abstract getName(): string
/**
* Encode a value to a serial payload.
* @param actual
*/
public async encode(actual: TActual): Promise<SerialPayload<TActual, TSerial>> {
return {
serializer: this.getName(),
payload: await this.encodeActual(actual),
}
}
/**
* Decode a value from a serial payload.
* @param serial
*/
public async decode(serial: SerialPayload<TActual, TSerial>): Promise<TActual> {
return this.decodeSerial(serial.payload)
}
/** Helper to get an instance of the Serialization service. */
protected getSerialization(): Serialization {
return Container.getContainer()
.make(Serialization)
}
/** Helper to get an instance of the global Request. */
protected getRequest(): Request {
return Container.getContainer()
.make<RequestLocalStorage>(RequestLocalStorage)
.get()
}
/** Get a dependency from the container. */
protected make<T>(key: TypedDependencyKey<T>): T {
return Container.getContainer()
.make(key)
}
}

View File

@@ -0,0 +1,147 @@
import {Container, Inject, Instantiable, Singleton} from '../../../di'
import {Awaitable, Collection, ErrorWithContext, JSONState} from '../../../util'
import {Serializer, SerialPayload} from '../types'
import {Validator} from '../../../validation/Validator'
/**
* Error thrown when attempting to (de-)serialize an object and a serializer cannot be found.
*/
export class NoSerializerError extends ErrorWithContext {
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
constructor(object: any, context?: {[key: string]: any}) {
super('The object could not be (de-)serialized, as no compatible serializer has been registered.', {
object,
...(context || {}),
})
}
}
/**
* Encode a value to JSON using a registered serializer.
* @throws NoSerializerError
* @param value
*/
export function encode<T>(value: T): Promise<string> {
return Container.getContainer()
.make<Serialization>(Serialization)
.encodeJSON(value)
}
/**
* Decode a value from JSON using a registered serializer.
* @throws NoSerializerError
* @param payload
* @param validator
*/
export function decode<T>(payload: string, validator?: Validator<T>): Awaitable<T> {
return Container.getContainer()
.make<Serialization>(Serialization)
.decodeJSON(payload, validator)
}
interface RegisteredSerializer<T extends Serializer<unknown, JSONState>> {
key: Instantiable<T>,
instance?: T
}
/**
* Service that manages (de-)serialization of objects.
*/
@Singleton()
export class Serialization {
@Inject()
protected readonly injector!: Container
/**
* Serializers registered with the service.
* We store the DI keys and realize them as needed, rather than at register time
* since most registration is done via the @ObjectSerializer decorator and the
* ContainerBlueprint. Realizing them at that time can cause loops in the DI call
* to realizeContainer since getContainer() -> realizeContainer() -> make the serializer
* -> getContainer(). This largely defers the realization until after all the DI keys
* are registered with the global Container.
*/
protected serializers: Collection<RegisteredSerializer<Serializer<unknown, JSONState>>> = new Collection()
/** Register a new serializer with the service. */
public register(serializer: Instantiable<Serializer<unknown, JSONState>>): this {
// Prepend instead of push so that later-registered serializers are prioritized when matching
this.serializers.prepend({
key: serializer,
})
return this
}
protected matchActual<T>(actual: T): Serializer<T, JSONState> {
for ( const serializer of this.serializers ) {
if ( !serializer.instance ) {
serializer.instance = this.injector.make(serializer.key)
}
if ( serializer.instance?.matchActual(actual) ) {
return serializer.instance as Serializer<T, JSONState>
}
}
throw new NoSerializerError(actual)
}
protected matchSerial<TSerial extends JSONState>(serial: SerialPayload<unknown, TSerial>): Serializer<unknown, TSerial> {
for ( const serializer of this.serializers ) {
if ( !serializer.instance ) {
serializer.instance = this.injector.make(serializer.key)
}
if ( serializer.instance?.matchSerial(serial) ) {
return serializer.instance as Serializer<unknown, TSerial>
}
}
throw new NoSerializerError(serial)
}
/**
* Encode a value to its serial payload using a registered serializer, if one exists.
* @throws NoSerializerError
* @param value
*/
public encode<T>(value: T): Awaitable<SerialPayload<T, JSONState>> {
return this.matchActual(value).encode(value)
}
/**
* Encode a value to JSON using a registered serializer, if one exists.
* @throws NoSerializerError
* @param value
*/
public async encodeJSON<T>(value: T): Promise<string> {
return JSON.stringify(await this.encode(value))
}
/**
* Decode a serial payload to the original object using a registered serializer, if one exists.
* @throws NoSerializerError
* @param payload
* @param validator
*/
public decode<T>(payload: SerialPayload<unknown, JSONState>, validator?: Validator<T>): Awaitable<T> {
const matched = this.matchSerial(payload)
const decoded = matched.decode(payload) as Awaitable<T>
if ( validator ) {
return validator.parse(decoded)
}
return decoded
}
/**
* Decode a value from JSON using a registered serializer, if one exists.
* @throws NoSerializerError
* @param payload
* @param validator
*/
public async decodeJSON<T>(payload: string, validator?: Validator<T>): Promise<T> {
return this.decode(JSON.parse(payload), validator)
}
}

View File

@@ -0,0 +1,85 @@
import {CanonicalItemClass} from '../../CanonicalReceiver'
import {BaseSerializer} from './BaseSerializer'
import {Awaitable, ErrorWithContext, JSONState, Rehydratable} from '../../../util'
import {Container, Inject, Injectable} from '../../../di'
import {Canon} from '../../../service/Canon'
/** State encoded by this class. */
export interface SimpleCanonicalItemSerialState extends JSONState {
rehydrate?: JSONState
canonicalIdentifier: string
}
/**
* A serializer implementation that serializes class instances derived from the Canon loading system.
* These instances must be CanonicalItemClass instances and take no constructor parameters.
* If the instance is Rehydratable, then the state will be (re-)stored.
*/
@Injectable()
export class SimpleCanonicalItemSerializer<TActual extends CanonicalItemClass> extends BaseSerializer<TActual, SimpleCanonicalItemSerialState> {
@Inject()
protected readonly canon!: Canon
@Inject()
protected readonly container!: Container
protected decodeSerial(serial: SimpleCanonicalItemSerialState): Awaitable<TActual> {
const canon = this.canon.getFromFullyQualified(serial.canonicalIdentifier)
if ( !canon ) {
throw new ErrorWithContext('Unable to decode serialized payload: the canonical identifier was not found', {
serial,
})
}
if ( canon instanceof CanonicalItemClass ) {
if ( serial.rehydrate && typeof (canon as any).rehydrate === 'function' ) {
(canon as unknown as Rehydratable).rehydrate(serial.rehydrate)
}
return canon as TActual
} else if ( canon?.prototype instanceof CanonicalItemClass ) {
const inst = this.container.make(canon)
if ( serial.rehydrate && typeof (inst as any).rehydrate === 'function' ) {
(inst as unknown as Rehydratable).rehydrate(serial.rehydrate)
}
return inst as TActual
}
throw new ErrorWithContext('Attempted to instantiate serialized item into non-Canonical class', {
canon,
serial,
})
}
protected async encodeActual(actual: TActual): Promise<SimpleCanonicalItemSerialState> {
const ctor = actual.constructor as typeof CanonicalItemClass
const canonicalIdentifier = ctor.getFullyQualifiedCanonicalResolver()
if ( !canonicalIdentifier ) {
throw new ErrorWithContext('Unable to determine Canonical resolver for serialization.', [
actual,
])
}
const state: SimpleCanonicalItemSerialState = {
canonicalIdentifier,
}
if ( typeof (actual as any).dehydrate === 'function' ) {
state.rehydrate = await (actual as unknown as Rehydratable).dehydrate()
}
return state
}
protected getName(): string {
return '@extollo/lib:SimpleCanonicalItemSerializer'
}
matchActual(some: TActual): boolean {
return (
some instanceof CanonicalItemClass
&& some.constructor.length === 0
)
}
}

View File

@@ -0,0 +1,21 @@
import {ContainerBlueprint, Instantiable, isInstantiableOf} from '../../../di'
import {JSONState, logIfDebugging} from '../../../util'
import {BaseSerializer} from './BaseSerializer'
import {Serialization} from './Serialization'
import {Serializer} from '../types'
/**
* Register a class as an object serializer with the Serialization service.
* @constructor
*/
export const ObjectSerializer = (): <TFunction extends Instantiable<Serializer<unknown, JSONState>>>(target: TFunction) => TFunction | void => {
return (target: Instantiable<Serializer<unknown, JSONState>>) => {
if ( isInstantiableOf(target, BaseSerializer) ) {
logIfDebugging('extollo.bus.serial.decorators', 'Registering ObjectSerializer blueprint:', target)
ContainerBlueprint.getContainerBlueprint()
.onResolve<Serialization>(Serialization, serial => serial.register(target))
} else {
logIfDebugging('extollo.bus.serial.decorators', 'Skipping ObjectSerializer blueprint:', target)
}
}
}

94
src/support/bus/types.ts Normal file
View File

@@ -0,0 +1,94 @@
import {Awaitable, JSONState, Maybe, Pipeline, TypeTag, uuid4} from '../../util'
import {StaticInstantiable} from '../../di'
// eslint-disable-next-line @typescript-eslint/no-unused-vars
export interface SerialPayload<TActual, TSerial extends JSONState> extends JSONState {
serializer: string
payload: TSerial
}
export interface Serializer<TActual, TSerial extends JSONState> {
matchActual(some: TActual): boolean
matchSerial(serial: SerialPayload<TActual, TSerial>): boolean
encode(actual: TActual): Awaitable<SerialPayload<TActual, TSerial>>
decode(serial: SerialPayload<TActual, TSerial>): Awaitable<TActual>
}
export interface Event {
eventUuid: string
eventName: string
originBusUuid?: string
shouldBroadcast?: boolean | (() => Awaitable<boolean>)
}
export abstract class BaseEvent implements Event {
public readonly eventUuid = uuid4()
public abstract eventName: string
public shouldBroadcast(): Awaitable<boolean> {
return true
}
}
export type EventHandlerReturn = Awaitable<boolean | void | undefined>
export type EventHandler<T extends Event> = (event: T) => EventHandlerReturn
export interface EventHandlerSubscription {
unsubscribe(): Awaitable<void>
}
export interface Queueable {
execute(): Awaitable<void>
shouldQueue?: boolean | (() => boolean)
defaultQueue?: string | (() => string)
}
export type ShouldQueue<T extends Queueable> = T & TypeTag<'@extollo/lib:ShouldQueue'>
export function shouldQueue<T extends Queueable>(something: T): something is ShouldQueue<T> {
if ( typeof something.shouldQueue === 'function' ) {
return something.shouldQueue()
}
return (
typeof something.shouldQueue === 'undefined'
|| something.shouldQueue
)
}
export interface EventBus<TEvent extends Event = Event> {
readonly uuid: string
subscribe<T extends TEvent>(eventKey: StaticInstantiable<T>, handler: EventHandler<T>): Awaitable<EventHandlerSubscription>
pipe<T extends TEvent>(eventKey: StaticInstantiable<T>, line: Pipeline<T, EventHandlerReturn>): Awaitable<EventHandlerSubscription>
push(event: TEvent): Awaitable<void>
up(): Awaitable<void>
down(): Awaitable<void>
}
/** Internal storage format for local event bus subscribers. */
export interface BusSubscriber<T extends Event> {
uuid: string
eventName: string
eventKey: StaticInstantiable<T>
handler: EventHandler<T>
}
export interface BusQueue {
readonly name: string
dispatch<T extends Queueable>(item: T): Promise<void>
pop(): Promise<Maybe<Queueable>>
}

View File

@@ -1,190 +0,0 @@
import {Awaitable, ErrorWithContext, JSONState, Maybe, Rehydratable, Cache} from '../../util'
import {CanonicalItemClass} from '../CanonicalReceiver'
import {Container, Inject, Injectable, isInstantiable} from '../../di'
import {Canon} from '../../service/Canon'
/** Type annotation for a Queueable that should be pushed onto a queue. */
export type ShouldQueue<T> = T & Queueable
/**
* Base class for an object that can be pushed to/popped from a queue.
*/
export abstract class Queueable extends CanonicalItemClass implements Rehydratable {
abstract dehydrate(): Awaitable<JSONState>
abstract rehydrate(state: JSONState): Awaitable<void>
/**
* When the item is popped from the queue, this method is called.
*/
public abstract execute(): Awaitable<void>
/**
* Determine whether the object should be pushed to the queue or not.
*/
public shouldQueue(): boolean {
return true
}
/**
* The name of the queue where this object should be pushed by default.
*/
public defaultQueue(): string {
return 'default'
}
/**
* Get the canonical resolver so we can re-instantiate this class from the queue.
* Throw an error if it could not be determined.
*/
public getFullyQualifiedCanonicalResolver(): string {
const resolver = (this.constructor as typeof Queueable).getFullyQualifiedCanonicalResolver()
if ( !resolver ) {
throw new ErrorWithContext('Cannot push Queueable onto queue: missing canonical resolver.')
}
return resolver
}
}
/**
* Truth function that returns true if an object implements the same interface as Queueable.
* This is done in case some external library needs to be incorporated as the base class for
* a Queueable, and cannot be made to extend Queueable.
* @param something
*/
export function isQueueable(something: unknown): something is Queueable {
if ( something instanceof Queueable ) {
return true
}
return (
typeof something === 'function'
&& typeof (something as any).dehydrate === 'function'
&& typeof (something as any).rehydrate === 'function'
&& typeof (something as any).shouldQueue === 'function'
&& typeof (something as any).defaultQueue === 'function'
&& typeof (something as any).getFullyQualifiedCanonicalResolver === 'function'
)
}
/**
* Truth function that returns true if the given object is Queueable and wants to be
* pushed onto the queue.
* @param something
*/
export function shouldQueue<T>(something: T): something is ShouldQueue<T> {
return isQueueable(something) && something.shouldQueue()
}
/**
* A multi-node queue that accepts & reinstantiates Queueables.
*
* @example
* There are several queue backends your application may use. These are
* configured via the `queue` config. To get the default queue, however,
* use this class as a DI token:
* ```ts
* this.container().make<Queue>(Queue)
* ```
*
* This will resolve the concrete implementation configured by your app.
*/
@Injectable()
export class Queue {
@Inject()
protected readonly cache!: Cache
@Inject()
protected readonly canon!: Canon
@Inject('injector')
protected readonly injector!: Container
constructor(
public readonly name: string,
) { }
public get queueIdentifier(): string {
return `extollo__queue__${this.name}`
}
/** Get the number of items waiting in the queue. */
// public abstract length(): Awaitable<number>
/** Push a new queueable onto the queue. */
public async push(item: ShouldQueue<Queueable>): Promise<void> {
const data = {
q: true,
r: item.getFullyQualifiedCanonicalResolver(),
d: await item.dehydrate(),
}
await this.cache.arrayPush(this.queueIdentifier, JSON.stringify(data))
}
/** Remove and return a queueable from the queue. */
public async pop(): Promise<Maybe<Queueable>> {
const item = await this.cache.arrayPop(this.queueIdentifier)
if ( !item ) {
return
}
const data = JSON.parse(item)
if ( !data.q || !data.r ) {
throw new ErrorWithContext('Cannot pop Queueable: payload is invalid.', {
data,
queueName: this.name,
queueIdentifier: this.queueIdentifier,
})
}
const canonicalItem = this.canon.getFromFullyQualified(data.r)
if ( !canonicalItem ) {
throw new ErrorWithContext('Cannot pop Queueable: canonical name is not resolvable', {
data,
queueName: this.name,
queueIdentifier: this.queueIdentifier,
canonicalName: data.r,
})
}
if ( !isInstantiable(canonicalItem) ) {
throw new ErrorWithContext('Cannot pop Queueable: canonical item is not instantiable', {
data,
canonicalItem,
queueName: this.name,
queueIdentifier: this.queueIdentifier,
canonicalName: data.r,
})
}
const instance = this.injector.make(canonicalItem)
if ( !isQueueable(instance) ) {
throw new ErrorWithContext('Cannot pop Queueable: canonical item instance is not Queueable', {
data,
canonicalItem,
instance,
queueName: this.name,
queueIdentifier: this.queueIdentifier,
canonicalName: data.r,
})
}
await instance.rehydrate(data.d)
return instance
}
/** Push a raw payload onto the queue. */
public async pushRaw(item: JSONState): Promise<void> {
await this.cache.arrayPush(this.queueIdentifier, JSON.stringify(item))
}
/** Remove and return a raw payload from the queue. */
public async popRaw(): Promise<Maybe<JSONState>> {
const item = await this.cache.arrayPop(this.queueIdentifier)
if ( item ) {
return JSON.parse(item)
}
}
}

View File

@@ -48,14 +48,20 @@ export class Redis extends Unit {
*/
public async getConnection(): Promise<IORedis.Redis> {
if ( !this.connection ) {
const options = this.config.get('redis.connection') as RedisOptions
this.logging.verbose(options)
this.connection = new IORedis(options)
this.connection = await this.getNewConnection()
}
return this.connection
}
/**
* Get a new IORedis connection instance.
*/
public async getNewConnection(): Promise<IORedis.Redis> {
const options = this.config.get('redis.connection') as RedisOptions
return new IORedis(options)
}
/**
* Get the IORedis connection in an AsyncPipe.
*/