import {BusSubscriber, Event, EventBus, EventHandler, EventHandlerReturn, EventHandlerSubscription} from './types' import {AwareOfContainerLifecycle, Container, Inject, Injectable, StaticInstantiable} from '../../di' import {Awaitable, Collection, Pipeline, uuid4} from '../../util' import {Redis} from '../redis/Redis' import {Serialization} from './serial/Serialization' import * as IORedis from 'ioredis' import {Logging} from '../../service/Logging' import {getEventName} from './getEventName' /** * Event bus implementation that does pub/sub over a Redis connection. */ @Injectable() export class RedisBus implements EventBus, AwareOfContainerLifecycle { awareOfContainerLifecycle = true as const @Inject() protected readonly redis!: Redis @Inject() protected readonly serial!: Serialization @Inject() protected readonly logging!: Logging @Inject() protected readonly injector!: Container public readonly uuid = uuid4() /** List of events for which we have created Redis channel subscriptions. */ protected internalSubscriptions: string[] = [] /** List of local subscriptions on this bus. */ protected subscriptions: Collection> = new Collection() protected subscriberConnection?: IORedis.Redis protected publisherConnection?: IORedis.Redis pipe(eventKey: StaticInstantiable, line: Pipeline): Awaitable { return this.subscribe(eventKey, event => line.apply(event)) } async push(event: Event): Promise { if ( !this.publisherConnection ) { throw new Error('Cannot push to RedisQueue: publisher connection is not initialized') } const channel = `ex-event-${event.eventName}` const json = await this.serial.encodeJSON(event) this.logging.verbose(`Pushing event to channel: ${channel}`) this.logging.verbose(json) await this.publisherConnection.publish(channel, json) } async subscribe(eventKey: StaticInstantiable, handler: EventHandler): Promise { const uuid = uuid4() const subscriber: BusSubscriber = { eventName: getEventName(eventKey), eventKey, handler, uuid, } as unknown as BusSubscriber this.logging.verbose(`Creating subscriber ${uuid}...`) this.logging.verbose(subscriber) if ( !this.internalSubscriptions.includes(subscriber.eventName) ) { await new Promise((res, rej) => { if ( !this.subscriberConnection ) { return rej(new Error('RedisBus not initialized on subscription.')) } this.subscriberConnection.subscribe(`ex-event-${subscriber.eventName}`, err => { if ( err ) { return rej(err) } this.logging.verbose('Successfully subscribed to channel on Redis...') res() }) }) this.internalSubscriptions.push(subscriber.eventName) } this.subscriptions.push(subscriber) return { unsubscribe: () => { this.subscriptions = this.subscriptions.where('uuid', '!=', uuid) }, } } protected async handleEvent(name: string, payload: string): Promise { const event = await this.serial.decodeJSON(payload) await this.subscriptions .where('eventName', '=', name) .awaitMapCall('handler', event) } isConnected(): boolean { return Boolean(this.subscriberConnection && this.publisherConnection) } async up(): Promise { this.subscriberConnection = await this.redis.getNewConnection() this.publisherConnection = await this.redis.getNewConnection() this.subscriberConnection.on('message', (channel: string, message: string) => { if ( !channel.startsWith('ex-event-') ) { this.logging.debug(`Ignoring message on invalid channel: ${channel}`) return } const name = channel.substr('ex-event-'.length) this.logging.verbose(`Received event ${name} on channel ${channel}`) this.handleEvent(name, message) }) } down(): Awaitable { this.subscriberConnection?.disconnect() this.publisherConnection?.disconnect() } onContainerRelease(): Awaitable { this.down() } }