You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
lib/src/support/bus/queue/CacheQueue.ts

34 lines
1003 B

import {Queue} from './Queue'
import {Inject, Injectable} from '../../../di'
import {Cache, Maybe} from '../../../util'
import {Queueable, ShouldQueue} from '../types'
import {Serialization} from '../serial/Serialization'
/**
* Queue implementation that uses the configured cache driver as a queue.
*/
@Injectable()
export class CacheQueue extends Queue {
@Inject()
protected readonly cache!: Cache
@Inject()
protected readonly serial!: Serialization
protected get queueIdentifier(): string {
return `extollo__queue__${this.name}`
}
protected async push<T extends Queueable>(item: ShouldQueue<T>): Promise<void> {
const json = await this.serial.encodeJSON(item)
await this.cache.arrayPush(this.queueIdentifier, json)
}
async pop(): Promise<Maybe<ShouldQueue<Queueable>>> {
const popped = await this.cache.arrayPop(this.queueIdentifier)
if ( popped ) {
return this.serial.decodeJSON(popped)
}
}
}