import {Inject, Singleton} from '../../di' import {Config} from '../../service/Config' import * as IORedis from 'ioredis' import {RedisOptions} from 'ioredis' import {Logging} from '../../service/Logging' import {Unit} from '../../lifecycle/Unit' import {AsyncPipe, Collection} from '../../util' export {RedisOptions} from 'ioredis' /** * Unit that loads configuration for and manages instantiation * of an IORedis connection. */ @Singleton() export class Redis extends Unit { /** The config service. */ @Inject() protected readonly config!: Config /** The loggers. */ @Inject() protected readonly logging!: Logging /** * The instantiated connection, if one exists. * @private */ private connection?: IORedis.Redis /** * Collection of all Redis connections opened by this service. * We keep track of these here so we can make sure -all- of them get closed * when the framework tries to shut down. * @private */ private spawnedConnections: Collection = new Collection() async up(): Promise { this.logging.info('Attempting initial connection to Redis...') this.logging.debug(this.config) await this.getConnection() } async down(): Promise { this.logging.info('Disconnecting Redis...') await this.spawnedConnections .where('status', '=', 'ready') .awaitMapCall('disconnect') } /** * Get the IORedis connection instance. */ public async getConnection(): Promise { if ( !this.connection ) { this.connection = await this.getNewConnection() } return this.connection } /** * Get a new IORedis connection instance. */ public async getNewConnection(): Promise { const options = this.config.get('redis.connection') as RedisOptions const inst = new IORedis(options) this.spawnedConnections.push(inst) return inst } /** * Get the IORedis connection in an AsyncPipe. */ public pipe(): AsyncPipe { return new AsyncPipe(() => this.getConnection()) } /** * Get an IORedis.Pipeline instance in an AsyncPipe. */ public multi(): AsyncPipe { return this.pipe() .tap(redis => { return redis.multi() }) } }