You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
lib/src/support/queue/Queue.ts

191 lines
6.0 KiB

import {Awaitable, ErrorWithContext, JSONState, Maybe, Rehydratable, Cache} from '../../util'
import {CanonicalItemClass} from '../CanonicalReceiver'
import {Container, Inject, Injectable, isInstantiable} from '../../di'
import {Canon} from '../../service/Canon'
/** Type annotation for a Queueable that should be pushed onto a queue. */
export type ShouldQueue<T> = T & Queueable
/**
* Base class for an object that can be pushed to/popped from a queue.
*/
export abstract class Queueable extends CanonicalItemClass implements Rehydratable {
abstract dehydrate(): Awaitable<JSONState>
abstract rehydrate(state: JSONState): Awaitable<void>
/**
* When the item is popped from the queue, this method is called.
*/
public abstract execute(): Awaitable<void>
/**
* Determine whether the object should be pushed to the queue or not.
*/
public shouldQueue(): boolean {
return true
}
/**
* The name of the queue where this object should be pushed by default.
*/
public defaultQueue(): string {
return 'default'
}
/**
* Get the canonical resolver so we can re-instantiate this class from the queue.
* Throw an error if it could not be determined.
*/
public getFullyQualifiedCanonicalResolver(): string {
const resolver = (this.constructor as typeof Queueable).getFullyQualifiedCanonicalResolver()
if ( !resolver ) {
throw new ErrorWithContext('Cannot push Queueable onto queue: missing canonical resolver.')
}
return resolver
}
}
/**
* Truth function that returns true if an object implements the same interface as Queueable.
* This is done in case some external library needs to be incorporated as the base class for
* a Queueable, and cannot be made to extend Queueable.
* @param something
*/
export function isQueueable(something: unknown): something is Queueable {
if ( something instanceof Queueable ) {
return true
}
return (
typeof something === 'function'
&& typeof (something as any).dehydrate === 'function'
&& typeof (something as any).rehydrate === 'function'
&& typeof (something as any).shouldQueue === 'function'
&& typeof (something as any).defaultQueue === 'function'
&& typeof (something as any).getFullyQualifiedCanonicalResolver === 'function'
)
}
/**
* Truth function that returns true if the given object is Queueable and wants to be
* pushed onto the queue.
* @param something
*/
export function shouldQueue<T>(something: T): something is ShouldQueue<T> {
return isQueueable(something) && something.shouldQueue()
}
/**
* A multi-node queue that accepts & reinstantiates Queueables.
*
* @example
* There are several queue backends your application may use. These are
* configured via the `queue` config. To get the default queue, however,
* use this class as a DI token:
* ```ts
* this.container().make<Queue>(Queue)
* ```
*
* This will resolve the concrete implementation configured by your app.
*/
@Injectable()
export class Queue {
@Inject()
protected readonly cache!: Cache
@Inject()
protected readonly canon!: Canon
@Inject('injector')
protected readonly injector!: Container
constructor(
public readonly name: string,
) { }
public get queueIdentifier(): string {
return `extollo__queue__${this.name}`
}
/** Get the number of items waiting in the queue. */
// public abstract length(): Awaitable<number>
/** Push a new queueable onto the queue. */
public async push(item: ShouldQueue<Queueable>): Promise<void> {
const data = {
q: true,
r: item.getFullyQualifiedCanonicalResolver(),
d: await item.dehydrate(),
}
await this.cache.arrayPush(this.queueIdentifier, JSON.stringify(data))
}
/** Remove and return a queueable from the queue. */
public async pop(): Promise<Maybe<Queueable>> {
const item = await this.cache.arrayPop(this.queueIdentifier)
if ( !item ) {
return
}
const data = JSON.parse(item)
if ( !data.q || !data.r ) {
throw new ErrorWithContext('Cannot pop Queueable: payload is invalid.', {
data,
queueName: this.name,
queueIdentifier: this.queueIdentifier,
})
}
const canonicalItem = this.canon.getFromFullyQualified(data.r)
if ( !canonicalItem ) {
throw new ErrorWithContext('Cannot pop Queueable: canonical name is not resolvable', {
data,
queueName: this.name,
queueIdentifier: this.queueIdentifier,
canonicalName: data.r,
})
}
if ( !isInstantiable(canonicalItem) ) {
throw new ErrorWithContext('Cannot pop Queueable: canonical item is not instantiable', {
data,
canonicalItem,
queueName: this.name,
queueIdentifier: this.queueIdentifier,
canonicalName: data.r,
})
}
const instance = this.injector.make(canonicalItem)
if ( !isQueueable(instance) ) {
throw new ErrorWithContext('Cannot pop Queueable: canonical item instance is not Queueable', {
data,
canonicalItem,
instance,
queueName: this.name,
queueIdentifier: this.queueIdentifier,
canonicalName: data.r,
})
}
await instance.rehydrate(data.d)
return instance
}
/** Push a raw payload onto the queue. */
public async pushRaw(item: JSONState): Promise<void> {
await this.cache.arrayPush(this.queueIdentifier, JSON.stringify(item))
}
/** Remove and return a raw payload from the queue. */
public async popRaw(): Promise<Maybe<JSONState>> {
const item = await this.cache.arrayPop(this.queueIdentifier)
if ( item ) {
return JSON.parse(item)
}
}
}