You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
lib/src/support/redis/Redis.ts

90 lines
2.4 KiB

import {Inject, Singleton} from '../../di'
import {Config} from '../../service/Config'
import * as IORedis from 'ioredis'
import {RedisOptions} from 'ioredis'
import {Logging} from '../../service/Logging'
import {Unit} from '../../lifecycle/Unit'
import {AsyncPipe, Collection} from '../../util'
export {RedisOptions} from 'ioredis'
/**
* Unit that loads configuration for and manages instantiation
* of an IORedis connection.
*/
@Singleton()
export class Redis extends Unit {
/** The config service. */
@Inject()
protected readonly config!: Config
/** The loggers. */
@Inject()
protected readonly logging!: Logging
/**
* The instantiated connection, if one exists.
* @private
*/
private connection?: IORedis.Redis
/**
* Collection of all Redis connections opened by this service.
* We keep track of these here so we can make sure -all- of them get closed
* when the framework tries to shut down.
* @private
*/
private spawnedConnections: Collection<IORedis.Redis> = new Collection()
async up(): Promise<void> {
this.logging.info('Attempting initial connection to Redis...')
this.logging.debug(this.config)
await this.getConnection()
}
async down(): Promise<void> {
this.logging.info('Disconnecting Redis...')
await this.spawnedConnections
.where('status', '=', 'ready')
.awaitMapCall('disconnect')
}
/**
* Get the IORedis connection instance.
*/
public async getConnection(): Promise<IORedis.Redis> {
if ( !this.connection ) {
this.connection = await this.getNewConnection()
}
return this.connection
}
/**
* Get a new IORedis connection instance.
*/
public async getNewConnection(): Promise<IORedis.Redis> {
const options = this.config.get('redis.connection') as RedisOptions
const inst = new IORedis(options)
this.spawnedConnections.push(inst)
return inst
}
/**
* Get the IORedis connection in an AsyncPipe.
*/
public pipe(): AsyncPipe<IORedis.Redis> {
return new AsyncPipe<IORedis.Redis>(() => this.getConnection())
}
/**
* Get an IORedis.Pipeline instance in an AsyncPipe.
*/
public multi(): AsyncPipe<IORedis.Pipeline> {
return this.pipe()
.tap(redis => {
return redis.multi()
})
}
}