You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
lib/src/support/bus/RedisBus.ts

136 lines
4.6 KiB

import {BusSubscriber, Event, EventBus, EventHandler, EventHandlerReturn, EventHandlerSubscription} from './types'
import {AwareOfContainerLifecycle, Container, Inject, Injectable, StaticInstantiable} from '../../di'
import {Awaitable, Collection, Pipeline, uuid4} from '../../util'
import {Redis} from '../redis/Redis'
import {Serialization} from './serial/Serialization'
import * as IORedis from 'ioredis'
import {Logging} from '../../service/Logging'
import {getEventName} from './getEventName'
/**
* Event bus implementation that does pub/sub over a Redis connection.
*/
@Injectable()
export class RedisBus implements EventBus, AwareOfContainerLifecycle {
awareOfContainerLifecycle = true as const
@Inject()
protected readonly redis!: Redis
@Inject()
protected readonly serial!: Serialization
@Inject()
protected readonly logging!: Logging
@Inject()
protected readonly injector!: Container
public readonly uuid = uuid4()
/** List of events for which we have created Redis channel subscriptions. */
protected internalSubscriptions: string[] = []
/** List of local subscriptions on this bus. */
protected subscriptions: Collection<BusSubscriber<Event>> = new Collection()
protected subscriberConnection?: IORedis.Redis
protected publisherConnection?: IORedis.Redis
pipe<T extends Event>(eventKey: StaticInstantiable<T>, line: Pipeline<T, EventHandlerReturn>): Awaitable<EventHandlerSubscription> {
return this.subscribe(eventKey, event => line.apply(event))
}
async push(event: Event): Promise<void> {
if ( !this.publisherConnection ) {
throw new Error('Cannot push to RedisQueue: publisher connection is not initialized')
}
const channel = `ex-event-${event.eventName}`
const json = await this.serial.encodeJSON(event)
this.logging.verbose(`Pushing event to channel: ${channel}`)
this.logging.verbose(json)
await this.publisherConnection.publish(channel, json)
}
async subscribe<T extends Event>(eventKey: StaticInstantiable<T>, handler: EventHandler<T>): Promise<EventHandlerSubscription> {
const uuid = uuid4()
const subscriber: BusSubscriber<Event> = {
eventName: getEventName(eventKey),
eventKey,
handler,
uuid,
} as unknown as BusSubscriber<Event>
this.logging.verbose(`Creating subscriber ${uuid}...`)
this.logging.verbose(subscriber)
if ( !this.internalSubscriptions.includes(subscriber.eventName) ) {
await new Promise<void>((res, rej) => {
if ( !this.subscriberConnection ) {
return rej(new Error('RedisBus not initialized on subscription.'))
}
this.subscriberConnection.subscribe(`ex-event-${subscriber.eventName}`, err => {
if ( err ) {
return rej(err)
}
this.logging.verbose('Successfully subscribed to channel on Redis...')
res()
})
})
this.internalSubscriptions.push(subscriber.eventName)
}
this.subscriptions.push(subscriber)
return {
unsubscribe: () => {
this.subscriptions = this.subscriptions.where('uuid', '!=', uuid)
},
}
}
protected async handleEvent(name: string, payload: string): Promise<void> {
const event = await this.serial.decodeJSON<Event>(payload)
await this.subscriptions
.where('eventName', '=', name)
.awaitMapCall('handler', event)
}
isConnected(): boolean {
return Boolean(this.subscriberConnection && this.publisherConnection)
}
async up(): Promise<void> {
this.subscriberConnection = await this.redis.getNewConnection()
this.publisherConnection = await this.redis.getNewConnection()
this.subscriberConnection.on('message', (channel: string, message: string) => {
if ( !channel.startsWith('ex-event-') ) {
this.logging.debug(`Ignoring message on invalid channel: ${channel}`)
return
}
const name = channel.substr('ex-event-'.length)
this.logging.verbose(`Received event ${name} on channel ${channel}`)
this.handleEvent(name, message)
})
}
down(): Awaitable<void> {
this.subscriberConnection?.disconnect()
this.publisherConnection?.disconnect()
}
onContainerRelease(): Awaitable<void> {
this.down()
}
}