Implement queue work and listen commands
Some checks failed
continuous-integration/drone/push Build is failing

This commit is contained in:
2022-01-27 10:34:01 -06:00
parent e098a5edb7
commit 16e5fa00aa
19 changed files with 271 additions and 17 deletions

View File

@@ -1,8 +1,19 @@
import {Inject, Singleton, StaticInstantiable} from '../../di'
import {BusSubscriber, Event, EventBus, EventHandler, EventHandlerReturn, EventHandlerSubscription} from './types'
import {
BusConnectorConfig,
BusSubscriber,
Event,
EventBus,
EventHandler,
EventHandlerReturn,
EventHandlerSubscription,
} from './types'
import {Awaitable, Collection, Pipeline, uuid4} from '../../util'
import {Logging} from '../../service/Logging'
import {Unit} from '../../lifecycle/Unit'
import {Config} from '../../service/Config'
import {RedisBus} from './RedisBus'
import {getEventName} from './getEventName'
export interface BusInternalSubscription {
busUuid: string
@@ -18,6 +29,9 @@ export class Bus<TEvent extends Event = Event> extends Unit implements EventBus<
@Inject()
protected readonly logging!: Logging
@Inject()
protected readonly config!: Config
public readonly uuid = uuid4()
/** Local listeners subscribed to events on this bus. */
@@ -37,6 +51,7 @@ export class Bus<TEvent extends Event = Event> extends Unit implements EventBus<
*/
async push(event: TEvent): Promise<void> {
if ( event.originBusUuid === this.uuid ) {
this.logging.verbose('Skipping propagation of event, because it originated from this bus.')
return
}
@@ -44,13 +59,19 @@ export class Bus<TEvent extends Event = Event> extends Unit implements EventBus<
event.originBusUuid = this.uuid
}
this.logging.verbose('Raising event on process-local bus:')
this.logging.verbose(event)
if ( await this.callSubscribers(event) ) {
// One of the subscribers halted propagation of the event
this.logging.verbose('Process-local subscriber halted propagation of event.')
return
}
if ( await this.shouldBroadcast(event) ) {
this.logging.verbose('Raising event on connected busses...')
await this.connectors.awaitMapCall('push', event)
} else {
this.logging.verbose('Will not broadcast event.')
}
}
@@ -91,7 +112,7 @@ export class Bus<TEvent extends Event = Event> extends Unit implements EventBus<
const uuid = uuid4()
this.subscribers.push({
eventName: eventKey.prototype.eventName, // FIXME this is not working
eventName: getEventName(eventKey),
handler,
eventKey,
uuid,
@@ -99,6 +120,7 @@ export class Bus<TEvent extends Event = Event> extends Unit implements EventBus<
this.subscriptions.concat(await this.connectors
.promiseMap<BusInternalSubscription>(async bus => {
this.logging.verbose(`Connecting subscriber to bus ${bus.constructor?.name}#${bus.uuid}...`)
return {
busUuid: bus.uuid,
subscriberUuid: uuid,
@@ -166,6 +188,17 @@ export class Bus<TEvent extends Event = Event> extends Unit implements EventBus<
return
}
// Read the connectors from the server.bus config and set them up
const config = this.config.get('server.bus', {})
if ( Array.isArray(config.connectors) ) {
for ( const connector of (config.connectors as BusConnectorConfig[]) ) {
this.logging.info(`Creating bus connection of type: ${connector.type}`)
if ( connector.type === 'redis' ) {
await this.connect(this.make(RedisBus))
}
}
}
await this.connectors.awaitMapCall('up')
this.isUp = true

View File

@@ -4,6 +4,7 @@ import {Awaitable, Collection, Pipeline, uuid4} from '../../util'
import {Logging} from '../../service/Logging'
import {Bus, BusInternalSubscription} from './Bus'
import {AppClass} from '../../lifecycle/AppClass'
import {getEventName} from './getEventName'
/**
* Non-connectable event bus implementation. Can forward events to the main Bus instance.
@@ -86,7 +87,7 @@ export class LocalBus<TEvent extends Event = Event> extends AppClass implements
const uuid = uuid4()
this.subscribers.push({
eventName: eventKey.prototype.eventName,
eventName: getEventName(eventKey),
handler,
eventKey,
uuid,

View File

@@ -1,9 +1,11 @@
import {BusSubscriber, Event, EventBus, EventHandler, EventHandlerReturn, EventHandlerSubscription} from './types'
import {Inject, Injectable, StaticInstantiable} from '../../di'
import {Container, Inject, Injectable, StaticInstantiable} from '../../di'
import {Awaitable, Collection, Pipeline, uuid4} from '../../util'
import {Redis} from '../redis/Redis'
import {Serialization} from './serial/Serialization'
import * as IORedis from 'ioredis'
import {Logging} from '../../service/Logging'
import {getEventName} from './getEventName'
/**
* Event bus implementation that does pub/sub over a Redis connection.
@@ -16,6 +18,12 @@ export class RedisBus implements EventBus {
@Inject()
protected readonly serial!: Serialization
@Inject()
protected readonly logging!: Logging
@Inject()
protected readonly injector!: Container
public readonly uuid = uuid4()
/** List of events for which we have created Redis channel subscriptions. */
@@ -40,18 +48,24 @@ export class RedisBus implements EventBus {
const channel = `ex-event-${event.eventName}`
const json = await this.serial.encodeJSON(event)
this.logging.verbose(`Pushing event to channel: ${channel}`)
this.logging.verbose(json)
await this.publisherConnection.publish(channel, json)
}
async subscribe<T extends Event>(eventKey: StaticInstantiable<T>, handler: EventHandler<T>): Promise<EventHandlerSubscription> {
const uuid = uuid4()
const subscriber: BusSubscriber<Event> = {
eventName: eventKey.prototype.eventName,
eventName: getEventName(eventKey),
eventKey,
handler,
uuid,
} as unknown as BusSubscriber<Event>
this.logging.verbose(`Creating subscriber ${uuid}...`)
this.logging.verbose(subscriber)
if ( !this.internalSubscriptions.includes(subscriber.eventName) ) {
await new Promise<void>((res, rej) => {
if ( !this.subscriberConnection ) {
@@ -63,6 +77,7 @@ export class RedisBus implements EventBus {
return rej(err)
}
this.logging.verbose('Successfully subscribed to channel on Redis...')
res()
})
})
@@ -95,10 +110,12 @@ export class RedisBus implements EventBus {
this.subscriberConnection.on('message', (channel: string, message: string) => {
if ( !channel.startsWith('ex-event-') ) {
this.logging.debug(`Ignoring message on invalid channel: ${channel}`)
return
}
const name = channel.substr('ex-event-'.length)
this.logging.verbose(`Received event ${name} on channel ${channel}`)
this.handleEvent(name, message)
})
}

View File

@@ -0,0 +1,35 @@
import {Event} from './types'
import {Container, StaticInstantiable} from '../../di'
import {ErrorWithContext} from '../../util'
export function getEventName<T extends Event>(eventKey: StaticInstantiable<T>): string {
const protoName = eventKey.prototype.eventName
if ( protoName ) {
return protoName
}
try {
const inst = Container.getContainer().make<T>(eventKey)
if ( inst.eventName ) {
return inst.eventName
}
} catch (e: unknown) {} // eslint-disable-line no-empty
let stringParseName = eventKey.toString()
.split('\n')
.map(x => x.trim())
.filter(x => x.startsWith('this.eventName = \'') || x.startsWith('this.eventName = "'))?.[0]
?.split('=')?.[1]
?.trim()
if ( stringParseName ) {
stringParseName = stringParseName.endsWith(';') ? stringParseName.slice(1, -2) : stringParseName.slice(1, -1)
if ( stringParseName ) {
return stringParseName
}
}
throw new ErrorWithContext('Unable to determine eventName from eventKey', {
eventKey,
})
}

View File

@@ -11,6 +11,8 @@ export * from './RedisBus'
export * from './queue/event/PushingToQueue'
export * from './queue/event/PushedToQueue'
export * from './queue/event/QueueEventSerializer'
export * from './queue/BaseJob'
export * from './queue/Queue'
export * from './queue/CacheQueue'
export * from './queue/SyncQueue'

View File

@@ -0,0 +1,13 @@
import {Queueable} from '../types'
import {Awaitable} from '../../../util'
import {Inject, Injectable} from '../../../di'
import {Logging} from '../../../service/Logging'
import {CanonicalItemClass} from '../../CanonicalReceiver'
@Injectable()
export abstract class BaseJob extends CanonicalItemClass implements Queueable {
@Inject()
protected readonly logging!: Logging
abstract execute(): Awaitable<void>
}

View File

@@ -16,9 +16,9 @@ export abstract class Queue implements BusQueue {
async dispatch<T extends Queueable>(item: T): Promise<void> {
if ( shouldQueue(item) ) {
await this.bus.push(new PushingToQueue(item))
await this.bus.push(new PushingToQueue(item, this.name))
await this.push(item)
await this.bus.push(new PushedToQueue(item))
await this.bus.push(new PushedToQueue(item, this.name))
return
}

View File

@@ -5,7 +5,7 @@ import {uuid4} from '../../../../util'
* Event fired after an item is pushed to the queue.
*/
export class PushedToQueue<T extends ShouldQueue<Queueable>> implements Event {
public readonly eventName = '@extollo/lib:PushedToQueue'
public readonly eventName = '@extollo/lib.PushedToQueue'
public readonly eventUuid = uuid4()
@@ -13,5 +13,6 @@ export class PushedToQueue<T extends ShouldQueue<Queueable>> implements Event {
constructor(
public readonly item: T,
public readonly queueName: string,
) {}
}

View File

@@ -5,7 +5,7 @@ import {uuid4} from '../../../../util'
* Event fired before an item is pushed to the queue.
*/
export class PushingToQueue<T extends ShouldQueue<Queueable>> implements Event {
public readonly eventName = '@extollo/lib:PushingToQueue'
public readonly eventName = '@extollo/lib.PushingToQueue'
public readonly eventUuid = uuid4()
@@ -13,5 +13,6 @@ export class PushingToQueue<T extends ShouldQueue<Queueable>> implements Event {
constructor(
public readonly item: T,
public readonly queueName: string,
) {}
}

View File

@@ -0,0 +1,43 @@
import {PushedToQueue} from './PushedToQueue'
import {Queueable, SerialPayload, ShouldQueue} from '../../types'
import {PushingToQueue} from './PushingToQueue'
import {BaseSerializer} from '../../serial/BaseSerializer'
import {JSONState} from '../../../../util'
import {ObjectSerializer} from '../../serial/decorators'
export type QueueEvent = PushedToQueue<ShouldQueue<Queueable>> | PushingToQueue<ShouldQueue<Queueable>>
export interface QueueEventSerialPayload extends JSONState {
eventName: '@extollo/lib.PushedToQueue' | '@extollo/lib.PushingToQueue'
itemPayload: SerialPayload<Queueable, JSONState>
queueName: string
}
@ObjectSerializer()
export class QueueEventSerializer extends BaseSerializer<QueueEvent, QueueEventSerialPayload> {
protected async decodeSerial(serial: QueueEventSerialPayload): Promise<QueueEvent> {
const item = await this.getSerialization().decode(serial.itemPayload)
if ( serial.eventName === '@extollo/lib.PushedToQueue' ) {
return new PushedToQueue(item as ShouldQueue<Queueable>, serial.queueName)
} else {
return new PushingToQueue(item as ShouldQueue<Queueable>, serial.queueName)
}
}
protected async encodeActual(actual: QueueEvent): Promise<QueueEventSerialPayload> {
return {
eventName: actual.eventName,
queueName: actual.queueName,
itemPayload: await this.getSerialization().encode(actual.item),
}
}
protected getName(): string {
return '@extollo/lib.QueueEventSerializer'
}
matchActual(some: QueueEvent): boolean {
return some instanceof PushedToQueue || some instanceof PushingToQueue
}
}

View File

@@ -3,6 +3,7 @@ import {BaseSerializer} from './BaseSerializer'
import {Awaitable, ErrorWithContext, JSONState, Rehydratable} from '../../../util'
import {Container, Inject, Injectable} from '../../../di'
import {Canon} from '../../../service/Canon'
import {ObjectSerializer} from './decorators'
/** State encoded by this class. */
export interface SimpleCanonicalItemSerialState extends JSONState {
@@ -15,6 +16,7 @@ export interface SimpleCanonicalItemSerialState extends JSONState {
* These instances must be CanonicalItemClass instances and take no constructor parameters.
* If the instance is Rehydratable, then the state will be (re-)stored.
*/
@ObjectSerializer()
@Injectable()
export class SimpleCanonicalItemSerializer<TActual extends CanonicalItemClass> extends BaseSerializer<TActual, SimpleCanonicalItemSerialState> {
@Inject()

View File

@@ -1,5 +1,5 @@
import {Awaitable, JSONState, Maybe, Pipeline, TypeTag, uuid4} from '../../util'
import {StaticInstantiable} from '../../di'
import {Instantiable, StaticInstantiable} from '../../di'
// eslint-disable-next-line @typescript-eslint/no-unused-vars
export interface SerialPayload<TActual, TSerial extends JSONState> extends JSONState {
@@ -92,3 +92,17 @@ export interface BusQueue {
pop(): Promise<Maybe<Queueable>>
}
export interface RedisBusConnectorConfig {
type: 'redis'
}
export type BusConnectorConfig = RedisBusConnectorConfig
export interface QueueConfig {
driver?: Instantiable<BusQueue>,
/* queues?: ({
name: string,
driver: Instantiable<BusQueue>,
})[] */
}