Add support for jobs & queueables, migrations
Some checks failed
continuous-integration/drone/push Build is passing
continuous-integration/drone Build is failing

- Create migration directives & migrators
- Modify Cache classes to support array manipulation
- Create Redis unit and RedisCache implementation
- Create Queueable base class and Queue class that uses Cache backend
This commit is contained in:
2021-08-23 23:51:53 -05:00
parent 26e0444e40
commit 074a3187eb
28 changed files with 962 additions and 56 deletions

View File

@@ -0,0 +1,61 @@
import {AppClass} from '../lifecycle/AppClass'
/**
* Interface for a class that receives its canonical resolver names upon load.
*/
export interface CanonicalReceiver {
setCanonicalResolver(fullyQualifiedResolver: string, unqualifiedResolver: string): void
getCanonicalResolver(): string | undefined
getFullyQualifiedCanonicalResolver(): string | undefined
}
/**
* Function that checks whether a given value satisfies the CanonicalReceiver interface.
* @param something
*/
export function isCanonicalReceiver(something: unknown): something is CanonicalReceiver {
return (
typeof something === 'function'
&& typeof (something as any).setCanonicalResolver === 'function'
&& (something as any).setCanonicalResolver.length >= 1
&& typeof (something as any).getCanonicalResolver === 'function'
&& (something as any).getCanonicalResolver.length === 0
)
}
/**
* Base class for canonical items that implements the CanonicalReceiver interface.
* That is, `isCanonicalReceiver(CanonicalItemClass) === true`.
*/
export class CanonicalItemClass extends AppClass {
/** The type-prefixed canonical resolver of this class, set by the startup unit. */
private static canonFullyQualifiedResolver?: string
/** The unqualified canonical resolver of this class, set by the startup unit. */
private static canonUnqualifiedResolver?: string
/**
* Sets the fully- and un-qualified canonical resolver strings. Intended for use
* by the Canonical unit.
* @param fullyQualifiedResolver
* @param unqualifiedResolver
*/
public static setCanonicalResolver(fullyQualifiedResolver: string, unqualifiedResolver: string): void {
this.canonFullyQualifiedResolver = fullyQualifiedResolver
this.canonUnqualifiedResolver = unqualifiedResolver
}
/**
* Get the fully-qualified canonical resolver of this class, if one has been set.
*/
public static getFullyQualifiedCanonicalResolver(): string | undefined {
return this.canonFullyQualifiedResolver
}
/**
* Get the unqualified canonical resolver of this class, if one has been set.
*/
public static getCanonicalResolver(): string | undefined {
return this.canonUnqualifiedResolver
}
}

View File

@@ -8,7 +8,10 @@ export class MemoryCache extends Cache {
/** Static collection of in-memory cache items. */
private static cacheItems: Collection<{key: string, value: string, expires?: Date}> = new Collection<{key: string; value: string, expires?: Date}>()
public fetch(key: string): Awaitable<string|undefined> {
/** Static collection of in-memory arrays. */
private static cacheArrays: Collection<{key: string, values: string[]}> = new Collection<{key: string; values: string[]}>()
public fetch(key: string): string|undefined {
const now = new Date()
return MemoryCache.cacheItems
.where('key', '=', key)
@@ -41,4 +44,41 @@ export class MemoryCache extends Cache {
public drop(key: string): Awaitable<void> {
MemoryCache.cacheItems = MemoryCache.cacheItems.where('key', '!=', key)
}
public decrement(key: string, amount = 1): Awaitable<number | undefined> {
const nextValue = (parseInt(this.fetch(key) ?? '0', 10) ?? 0) - amount
this.put(key, String(nextValue))
return nextValue
}
public increment(key: string, amount = 1): Awaitable<number | undefined> {
const nextValue = (parseInt(this.fetch(key) ?? '0', 10) ?? 0) + amount
this.put(key, String(nextValue))
return nextValue
}
public pop(key: string): Awaitable<string | undefined> {
const value = this.fetch(key)
this.drop(key)
return value
}
public arrayPop(key: string): Awaitable<string | undefined> {
const arr = MemoryCache.cacheArrays.firstWhere('key', '=', key)
if ( arr ) {
return arr.values.shift()
}
}
public arrayPush(key: string, value: string): Awaitable<void> {
const arr = MemoryCache.cacheArrays.firstWhere('key', '=', key)
if ( arr ) {
arr.values.push(value)
} else {
MemoryCache.cacheArrays.push({
key,
values: [value],
})
}
}
}

85
src/support/cache/RedisCache.ts vendored Normal file
View File

@@ -0,0 +1,85 @@
import {Cache, Maybe} from '../../util'
import {Inject, Injectable} from '../../di'
import {Redis} from '../redis/Redis'
/**
* Redis-driven Cache implementation.
*/
@Injectable()
export class RedisCache extends Cache {
/** The Redis service. */
@Inject()
protected readonly redis!: Redis
async arrayPop(key: string): Promise<string | undefined> {
return this.redis.pipe()
.tap(redis => redis.lpop(key))
.resolve()
}
async arrayPush(key: string, value: string): Promise<void> {
await this.redis.pipe()
.tap(redis => redis.rpush(key, value))
.resolve()
}
async decrement(key: string, amount?: number): Promise<number | undefined> {
return this.redis.pipe()
.tap(redis => redis.decrby(key, amount ?? 1))
.resolve()
}
async increment(key: string, amount?: number): Promise<number | undefined> {
return this.redis.pipe()
.tap(redis => redis.incrby(key, amount ?? 1))
.resolve()
}
async drop(key: string): Promise<void> {
await this.redis.pipe()
.tap(redis => redis.del(key))
.resolve()
}
async fetch(key: string): Promise<string | undefined> {
return this.redis.pipe()
.tap(redis => redis.get(key))
.tap(value => value ?? undefined)
.resolve()
}
async has(key: string): Promise<boolean> {
return this.redis.pipe()
.tap(redis => redis.exists(key))
.tap(numExisting => numExisting > 0)
.resolve()
}
pop(key: string): Promise<Maybe<string>> {
return new Promise<Maybe<string>>((res, rej) => {
this.redis.pipe()
.tap(redis => {
redis.multi()
.get(key, (err, value) => {
if ( err ) {
rej(err)
} else {
res(value)
}
})
.del(key)
})
})
}
async put(key: string, value: string, expires?: Date): Promise<void> {
await this.redis.multi()
.tap(redis => redis.set(key, value))
.when(Boolean(expires), redis => {
const seconds = Math.round(((new Date()).getTime() - expires!.getTime()) / 1000) // eslint-disable-line @typescript-eslint/no-non-null-assertion
return redis.expire(key, seconds)
})
.tap(pipeline => pipeline.exec())
.resolve()
}
}

190
src/support/queue/Queue.ts Normal file
View File

@@ -0,0 +1,190 @@
import {Awaitable, ErrorWithContext, JSONState, Maybe, Rehydratable, Cache} from '../../util'
import {CanonicalItemClass} from '../CanonicalReceiver'
import {Container, Inject, Injectable, isInstantiable} from '../../di'
import {Canon} from '../../service/Canon'
/** Type annotation for a Queueable that should be pushed onto a queue. */
export type ShouldQueue<T> = T & Queueable
/**
* Base class for an object that can be pushed to/popped from a queue.
*/
export abstract class Queueable extends CanonicalItemClass implements Rehydratable {
abstract dehydrate(): Awaitable<JSONState>
abstract rehydrate(state: JSONState): Awaitable<void>
/**
* When the item is popped from the queue, this method is called.
*/
public abstract execute(): Awaitable<void>
/**
* Determine whether the object should be pushed to the queue or not.
*/
public shouldQueue(): boolean {
return true
}
/**
* The name of the queue where this object should be pushed by default.
*/
public defaultQueue(): string {
return 'default'
}
/**
* Get the canonical resolver so we can re-instantiate this class from the queue.
* Throw an error if it could not be determined.
*/
public getFullyQualifiedCanonicalResolver(): string {
const resolver = (this.constructor as typeof Queueable).getFullyQualifiedCanonicalResolver()
if ( !resolver ) {
throw new ErrorWithContext('Cannot push Queueable onto queue: missing canonical resolver.')
}
return resolver
}
}
/**
* Truth function that returns true if an object implements the same interface as Queueable.
* This is done in case some external library needs to be incorporated as the base class for
* a Queueable, and cannot be made to extend Queueable.
* @param something
*/
export function isQueueable(something: unknown): something is Queueable {
if ( something instanceof Queueable ) {
return true
}
return (
typeof something === 'function'
&& typeof (something as any).dehydrate === 'function'
&& typeof (something as any).rehydrate === 'function'
&& typeof (something as any).shouldQueue === 'function'
&& typeof (something as any).defaultQueue === 'function'
&& typeof (something as any).getFullyQualifiedCanonicalResolver === 'function'
)
}
/**
* Truth function that returns true if the given object is Queueable and wants to be
* pushed onto the queue.
* @param something
*/
export function shouldQueue<T>(something: T): something is ShouldQueue<T> {
return isQueueable(something) && something.shouldQueue()
}
/**
* A multi-node queue that accepts & reinstantiates Queueables.
*
* @example
* There are several queue backends your application may use. These are
* configured via the `queue` config. To get the default queue, however,
* use this class as a DI token:
* ```ts
* this.container().make<Queue>(Queue)
* ```
*
* This will resolve the concrete implementation configured by your app.
*/
@Injectable()
export class Queue {
@Inject()
protected readonly cache!: Cache
@Inject()
protected readonly canon!: Canon
@Inject('injector')
protected readonly injector!: Container
constructor(
public readonly name: string,
) { }
public get queueIdentifier(): string {
return `extollo__queue__${this.name}`
}
/** Get the number of items waiting in the queue. */
// public abstract length(): Awaitable<number>
/** Push a new queueable onto the queue. */
public async push(item: ShouldQueue<Queueable>): Promise<void> {
const data = {
q: true,
r: item.getFullyQualifiedCanonicalResolver(),
d: await item.dehydrate(),
}
await this.cache.arrayPush(this.queueIdentifier, JSON.stringify(data))
}
/** Remove and return a queueable from the queue. */
public async pop(): Promise<Maybe<Queueable>> {
const item = await this.cache.arrayPop(this.queueIdentifier)
if ( !item ) {
return
}
const data = JSON.parse(item)
if ( !data.q || !data.r ) {
throw new ErrorWithContext('Cannot pop Queueable: payload is invalid.', {
data,
queueName: this.name,
queueIdentifier: this.queueIdentifier,
})
}
const canonicalItem = this.canon.getFromFullyQualified(data.r)
if ( !canonicalItem ) {
throw new ErrorWithContext('Cannot pop Queueable: canonical name is not resolvable', {
data,
queueName: this.name,
queueIdentifier: this.queueIdentifier,
canonicalName: data.r,
})
}
if ( !isInstantiable(canonicalItem) ) {
throw new ErrorWithContext('Cannot pop Queueable: canonical item is not instantiable', {
data,
canonicalItem,
queueName: this.name,
queueIdentifier: this.queueIdentifier,
canonicalName: data.r,
})
}
const instance = this.injector.make(canonicalItem)
if ( !isQueueable(instance) ) {
throw new ErrorWithContext('Cannot pop Queueable: canonical item instance is not Queueable', {
data,
canonicalItem,
instance,
queueName: this.name,
queueIdentifier: this.queueIdentifier,
canonicalName: data.r,
})
}
await instance.rehydrate(data.d)
return instance
}
/** Push a raw payload onto the queue. */
public async pushRaw(item: JSONState): Promise<void> {
await this.cache.arrayPush(this.queueIdentifier, JSON.stringify(item))
}
/** Remove and return a raw payload from the queue. */
public async popRaw(): Promise<Maybe<JSONState>> {
const item = await this.cache.arrayPop(this.queueIdentifier)
if ( item ) {
return JSON.parse(item)
}
}
}

View File

@@ -0,0 +1,75 @@
import {Inject, Singleton} from '../../di'
import {Config} from '../../service/Config'
import * as IORedis from 'ioredis'
import {RedisOptions} from 'ioredis'
import {Logging} from '../../service/Logging'
import {Unit} from '../../lifecycle/Unit'
import {AsyncPipe} from '../../util'
export {RedisOptions} from 'ioredis'
/**
* Unit that loads configuration for and manages instantiation
* of an IORedis connection.
*/
@Singleton()
export class Redis extends Unit {
/** The config service. */
@Inject()
protected readonly config!: Config
/** The loggers. */
@Inject()
protected readonly logging!: Logging
/**
* The instantiated connection, if one exists.
* @private
*/
private connection?: IORedis.Redis
async up(): Promise<void> {
this.logging.info('Attempting initial connection to Redis...')
this.logging.debug('Config:')
this.logging.debug(Config)
this.logging.debug(this.config)
await this.getConnection()
}
async down(): Promise<void> {
this.logging.info('Disconnecting Redis...')
if ( this.connection?.status === 'ready' ) {
await this.connection.disconnect()
}
}
/**
* Get the IORedis connection instance.
*/
public async getConnection(): Promise<IORedis.Redis> {
if ( !this.connection ) {
const options = this.config.get('redis.connection') as RedisOptions
this.logging.verbose(options)
this.connection = new IORedis(options)
}
return this.connection
}
/**
* Get the IORedis connection in an AsyncPipe.
*/
public pipe(): AsyncPipe<IORedis.Redis> {
return new AsyncPipe<IORedis.Redis>(() => this.getConnection())
}
/**
* Get an IORedis.Pipeline instance in an AsyncPipe.
*/
public multi(): AsyncPipe<IORedis.Pipeline> {
return this.pipe()
.tap(redis => {
return redis.multi()
})
}
}