From 074a3187ebb835dcf3b8219383f7f863ea386a2c Mon Sep 17 00:00:00 2001 From: garrettmills Date: Mon, 23 Aug 2021 23:51:53 -0500 Subject: [PATCH] Add support for jobs & queueables, migrations - Create migration directives & migrators - Modify Cache classes to support array manipulation - Create Redis unit and RedisCache implementation - Create Queueable base class and Queue class that uses Cache backend --- package.json | 2 + pnpm-lock.yaml | 92 +++++++++++ src/http/Controller.ts | 4 +- src/http/routing/Middleware.ts | 4 +- src/index.ts | 8 + src/orm/connection/Connection.ts | 9 +- src/orm/connection/PostgresConnection.ts | 13 +- src/orm/directive/MigrateDirective.ts | 18 +-- src/orm/directive/RollbackDirective.ts | 18 +-- src/orm/index.ts | 1 + src/orm/migrations/Migrator.ts | 24 +-- src/orm/model/Model.ts | 24 +++ src/orm/model/ModelBuilder.ts | 48 +++++- src/orm/services/Migrations.ts | 11 +- src/orm/support/CacheModel.ts | 13 ++ src/orm/support/ORMCache.ts | 107 +++++++++++-- src/orm/types.ts | 5 + src/service/Canon.ts | 12 ++ src/service/Canonical.ts | 12 +- src/service/Queueables.ts | 25 +++ src/support/CanonicalReceiver.ts | 61 ++++++++ src/support/cache/MemoryCache.ts | 42 ++++- src/support/cache/RedisCache.ts | 85 ++++++++++ src/support/queue/Queue.ts | 190 +++++++++++++++++++++++ src/support/redis/Redis.ts | 75 +++++++++ src/util/cache/Cache.ts | 37 ++++- src/util/cache/InMemCache.ts | 59 +++++++ src/util/support/Pipe.ts | 19 +++ 28 files changed, 962 insertions(+), 56 deletions(-) create mode 100644 src/service/Queueables.ts create mode 100644 src/support/CanonicalReceiver.ts create mode 100644 src/support/cache/RedisCache.ts create mode 100644 src/support/queue/Queue.ts create mode 100644 src/support/redis/Redis.ts diff --git a/package.json b/package.json index 1e2c3ef..9809cba 100644 --- a/package.json +++ b/package.json @@ -12,6 +12,7 @@ "@types/bcrypt": "^5.0.0", "@types/busboy": "^0.2.3", "@types/cli-table": "^0.3.0", + "@types/ioredis": "^4.26.6", "@types/mime-types": "^2.1.0", "@types/mkdirp": "^1.0.1", "@types/negotiator": "^0.6.1", @@ -27,6 +28,7 @@ "cli-table": "^0.3.6", "colors": "^1.4.0", "dotenv": "^8.2.0", + "ioredis": "^4.27.6", "mime-types": "^2.1.31", "mkdirp": "^1.0.4", "negotiator": "^0.6.2", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 64e7d04..9f69eb5 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -3,6 +3,7 @@ dependencies: '@types/bcrypt': 5.0.0 '@types/busboy': 0.2.3 '@types/cli-table': 0.3.0 + '@types/ioredis': 4.26.6 '@types/mime-types': 2.1.0 '@types/mkdirp': 1.0.1 '@types/negotiator': 0.6.1 @@ -18,6 +19,7 @@ dependencies: cli-table: 0.3.6 colors: 1.4.0 dotenv: 8.2.0 + ioredis: 4.27.6 mime-types: 2.1.31 mkdirp: 1.0.4 negotiator: 0.6.2 @@ -182,6 +184,12 @@ packages: dev: false resolution: integrity: sha512-SEYeGAIQIQX8NN6LDKprLjbrd5dARM5EXsd8GI/A5l0apYI1fGMWgPHSe4ZKL4eozlAyI+doUE9XbYS4xCkQ1w== + /@types/ioredis/4.26.6: + dependencies: + '@types/node': 14.17.6 + dev: false + resolution: + integrity: sha512-Q9ydXL/5Mot751i7WLCm9OGTj5jlW3XBdkdEW21SkXZ8Y03srbkluFGbM3q8c+vzPW30JOLJ+NsZWHoly0+13A== /@types/json-schema/7.0.7: dev: true resolution: @@ -220,6 +228,10 @@ packages: dev: false resolution: integrity: sha512-8kQ3+wKGRNN0ghtEn7EGps/B8CzuBz1nXZEIGGLP2GnwbqYn4dbTs7k+VKLTq1HvZLRCIDtN3Snx1Ege8B7L5A== + /@types/node/14.17.6: + dev: false + resolution: + integrity: sha512-iBxsxU7eswQDGhlr3AiamBxOssaYxbM+NKXVil8jg9yFXvrfEFbDumLD/2dMTB+zYyg7w+Xjt8yuxfdbUHAtcQ== /@types/pg/8.6.0: dependencies: '@types/node': 14.17.1 @@ -673,6 +685,12 @@ packages: node: '>=0.8' resolution: integrity: sha1-2jCcwmPfFZlMaIypAheco8fNfH4= + /cluster-key-slot/1.1.0: + dev: false + engines: + node: '>=0.10.0' + resolution: + integrity: sha512-2Nii8p3RwAPiFwsnZvukotvow2rIHM+yQ6ZcBXGHdniadkYGZYiGmkHJIbZPIV9nfv7m/U1IPMVVcAhoWFeklw== /code-point-at/1.1.0: dev: false engines: @@ -774,6 +792,19 @@ packages: optional: true resolution: integrity: sha512-doEwdvm4PCeK4K3RQN2ZC2BYUBaxwLARCqZmMjtF8a51J2Rb0xpVloFRnCODwqjpwnAoao4pelN8l3RJdv3gRQ== + /debug/4.3.2: + dependencies: + ms: 2.1.2 + dev: false + engines: + node: '>=6.0' + peerDependencies: + supports-color: '*' + peerDependenciesMeta: + supports-color: + optional: true + resolution: + integrity: sha512-mOp8wKcvj7XxC78zLgw/ZA+6TSgkoE2C/ienthhRD298T7UNwAg9diBpLRxC0mOezLl4B0xV7M0cCO6P/O0Xhw== /deep-is/0.1.3: dev: true resolution: @@ -788,6 +819,12 @@ packages: dev: false resolution: integrity: sha1-hMbhWbgZBP3KWaDvRM2HDTElD5o= + /denque/1.5.0: + dev: false + engines: + node: '>=0.10' + resolution: + integrity: sha512-CYiCSgIF1p6EUByQPlGkKnP1M9g0ZV3qMIrqMqZqdwazygIA/YP2vrbcyl1h/WppKJTdl1F85cXIle+394iDAQ== /detect-libc/1.0.3: dev: false engines: @@ -1312,6 +1349,23 @@ packages: node: '>= 0.10' resolution: integrity: sha512-agE4QfB2Lkp9uICn7BAqoscw4SZP9kTE2hxiFI3jBPmXJfdqiahTbUuKGsMoN2GtqL9AxhYioAcVvgsb1HvRbA== + /ioredis/4.27.6: + dependencies: + cluster-key-slot: 1.1.0 + debug: 4.3.2 + denque: 1.5.0 + lodash.defaults: 4.2.0 + lodash.flatten: 4.4.0 + p-map: 2.1.0 + redis-commands: 1.7.0 + redis-errors: 1.2.0 + redis-parser: 3.0.0 + standard-as-callback: 2.1.0 + dev: false + engines: + node: '>=6' + resolution: + integrity: sha512-6W3ZHMbpCa8ByMyC1LJGOi7P2WiOKP9B3resoZOVLDhi+6dDBOW+KNsRq3yI36Hmnb2sifCxHX+YSarTeXh48A== /is-core-module/2.4.0: dependencies: has: 1.0.3 @@ -1447,6 +1501,14 @@ packages: dev: true resolution: integrity: sha1-4j8/nE+Pvd6HJSnBBxhXoIblzO8= + /lodash.defaults/4.2.0: + dev: false + resolution: + integrity: sha1-0JF4cW/+pN3p5ft7N/bwgCJ0WAw= + /lodash.flatten/4.4.0: + dev: false + resolution: + integrity: sha1-8xwiIlqWMtK7+OSt2+8kCqdlph8= /lodash.merge/4.6.2: dev: true resolution: @@ -1691,6 +1753,12 @@ packages: node: '>=0.10.0' resolution: integrity: sha1-u+Z0BseaqFxc/sdm/lc0VV36EnQ= + /p-map/2.1.0: + dev: false + engines: + node: '>=6' + resolution: + integrity: sha512-y3b8Kpd8OAN444hxfBbFfj1FY/RjtTd8tzYwhUqNYXx0fXx2iX4maP4Qr6qhIKbQXI02wTLAda4fYUbDagTUFw== /packet-reader/1.0.0: dev: false resolution: @@ -1973,6 +2041,24 @@ packages: node: '>= 0.10' resolution: integrity: sha1-hSBLVNuoLVdC4oyWdW70OvUOM4Q= + /redis-commands/1.7.0: + dev: false + resolution: + integrity: sha512-nJWqw3bTFy21hX/CPKHth6sfhZbdiHP6bTawSgQBlKOVRG7EZkfHbbHwQJnrE4vsQf0CMNE+3gJ4Fmm16vdVlQ== + /redis-errors/1.2.0: + dev: false + engines: + node: '>=4' + resolution: + integrity: sha1-62LSrbFeTq9GEMBK/hUpOEJQq60= + /redis-parser/3.0.0: + dependencies: + redis-errors: 1.2.0 + dev: false + engines: + node: '>=4' + resolution: + integrity: sha1-tm2CjNyv5rS4pCin3vTGvKwxyLQ= /reflect-metadata/0.1.13: dev: false resolution: @@ -2165,6 +2251,10 @@ packages: requiresBuild: true resolution: integrity: sha512-CidQLG2ZacoT0Z7O6dOyisj4JdrOrLVJ4KbHjVNz9yI1vO08FAYQPcnkXY9BP8zeYo+J/nBgY6Gg4R7w4WFWtg== + /standard-as-callback/2.1.0: + dev: false + resolution: + integrity: sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A== /streamsearch/0.1.2: dev: false engines: @@ -2557,6 +2647,7 @@ specifiers: '@types/bcrypt': ^5.0.0 '@types/busboy': ^0.2.3 '@types/cli-table': ^0.3.0 + '@types/ioredis': ^4.26.6 '@types/mime-types': ^2.1.0 '@types/mkdirp': ^1.0.1 '@types/negotiator': ^0.6.1 @@ -2575,6 +2666,7 @@ specifiers: colors: ^1.4.0 dotenv: ^8.2.0 eslint: ^7.27.0 + ioredis: ^4.27.6 mime-types: ^2.1.31 mkdirp: ^1.0.4 negotiator: ^0.6.2 diff --git a/src/http/Controller.ts b/src/http/Controller.ts index c4f6dba..b9aa5e3 100644 --- a/src/http/Controller.ts +++ b/src/http/Controller.ts @@ -1,12 +1,12 @@ -import {AppClass} from '../lifecycle/AppClass' import {Request} from './lifecycle/Request' import {Container} from '../di' +import {CanonicalItemClass} from '../support/CanonicalReceiver' /** * Base class for controllers that define methods that * handle HTTP requests. */ -export class Controller extends AppClass { +export class Controller extends CanonicalItemClass { constructor( protected readonly request: Request, ) { diff --git a/src/http/routing/Middleware.ts b/src/http/routing/Middleware.ts index b1718c2..5376822 100644 --- a/src/http/routing/Middleware.ts +++ b/src/http/routing/Middleware.ts @@ -1,12 +1,12 @@ -import {AppClass} from '../../lifecycle/AppClass' import {Request} from '../lifecycle/Request' import {ResponseObject} from './Route' import {Container} from '../../di' +import {CanonicalItemClass} from '../../support/CanonicalReceiver' /** * Base class representing a middleware handler that can be applied to routes. */ -export abstract class Middleware extends AppClass { +export abstract class Middleware extends CanonicalItemClass { constructor( /** The request that will be handled by this middleware. */ protected readonly request: Request, diff --git a/src/index.ts b/src/index.ts index 202e9cb..76285f4 100644 --- a/src/index.ts +++ b/src/index.ts @@ -62,6 +62,9 @@ export * from './http/Controller' export * from './http/servers/static' +export * from './support/CanonicalReceiver' + +export * from './service/Canon' export * from './service/Canonical' export * from './service/CanonicalInstantiable' export * from './service/CanonicalRecursive' @@ -74,9 +77,14 @@ export * from './service/HTTPServer' export * from './service/Routing' export * from './service/Middlewares' +export * from './support/redis/Redis' export * from './support/cache/MemoryCache' +export * from './support/cache/RedisCache' export * from './support/cache/CacheFactory' export * from './support/NodeModules' +export * from './support/queue/Queue' + +export * from './service/Queueables' export * from './views/ViewEngine' export * from './views/ViewEngineFactory' diff --git a/src/orm/connection/Connection.ts b/src/orm/connection/Connection.ts index 3022b39..e336ef5 100644 --- a/src/orm/connection/Connection.ts +++ b/src/orm/connection/Connection.ts @@ -1,4 +1,4 @@ -import {ErrorWithContext} from '../../util' +import {Awaitable, ErrorWithContext} from '../../util' import {QueryResult} from '../types' import {SQLDialect} from '../dialect/SQLDialect' import {AppClass} from '../../lifecycle/AppClass' @@ -68,6 +68,13 @@ export abstract class Connection extends AppClass { */ public abstract schema(name?: string): Schema + /** + * Execute all queries logged to this connection during the closure + * as a transaction in the database. + * @param closure + */ + public abstract asTransaction(closure: () => Awaitable): Awaitable + /** * Fire a QueryExecutedEvent for the given query string. * @param query diff --git a/src/orm/connection/PostgresConnection.ts b/src/orm/connection/PostgresConnection.ts index c44b388..cfce90a 100644 --- a/src/orm/connection/PostgresConnection.ts +++ b/src/orm/connection/PostgresConnection.ts @@ -2,7 +2,7 @@ import {Connection, ConnectionNotReadyError} from './Connection' import {Client} from 'pg' import {Inject} from '../../di' import {QueryResult} from '../types' -import {collect} from '../../util' +import {Awaitable, collect} from '../../util' import {SQLDialect} from '../dialect/SQLDialect' import {PostgreSQLDialect} from '../dialect/PostgreSQLDialect' import {Logging} from '../../service/Logging' @@ -70,6 +70,17 @@ export class PostgresConnection extends Connection { } } + public async asTransaction(closure: () => Awaitable): Promise { + if ( !this.client ) { + throw new ConnectionNotReadyError(this.name, { config: JSON.stringify(this.config) }) + } + + await this.client.query('BEGIN') + const result = await closure() + await this.client.query('COMMIT') + return result + } + public schema(name?: string): Schema { return new PostgresSchema(this, name) } diff --git a/src/orm/directive/MigrateDirective.ts b/src/orm/directive/MigrateDirective.ts index 4697f50..985f680 100644 --- a/src/orm/directive/MigrateDirective.ts +++ b/src/orm/directive/MigrateDirective.ts @@ -3,8 +3,8 @@ import {Container, Inject, Injectable} from '../../di' import {EventBus} from '../../event/EventBus' import {Migrator} from '../migrations/Migrator' import {Migrations} from '../services/Migrations' -import {ApplyingMigrationEvent} from '../migrations/events/ApplyingMigrationEvent' -import {AppliedMigrationEvent} from '../migrations/events/AppliedMigrationEvent' +// import {ApplyingMigrationEvent} from '../migrations/events/ApplyingMigrationEvent' +// import {AppliedMigrationEvent} from '../migrations/events/AppliedMigrationEvent' import {EventSubscription} from '../../event/types' import {NothingToMigrateError} from '../migrations/NothingToMigrateError' @@ -100,13 +100,13 @@ export class MigrateDirective extends Directive { * @protected */ protected async registerListeners(): Promise { - this.subscriptions.push(await this.bus.subscribe(ApplyingMigrationEvent, event => { - this.info(`Applying migration ${event.migration.identifier}...`) - })) - - this.subscriptions.push(await this.bus.subscribe(AppliedMigrationEvent, event => { - this.success(`Applied migration: ${event.migration.identifier}`) - })) + // this.subscriptions.push(await this.bus.subscribe(ApplyingMigrationEvent, event => { + // this.info(`Applying migration ${event.migration.identifier}...`) + // })) + // + // this.subscriptions.push(await this.bus.subscribe(AppliedMigrationEvent, event => { + // this.success(`Applied migration: ${event.migration.identifier}`) + // })) } /** Remove event bus listeners before finish. */ diff --git a/src/orm/directive/RollbackDirective.ts b/src/orm/directive/RollbackDirective.ts index 6f97e48..2cb1656 100644 --- a/src/orm/directive/RollbackDirective.ts +++ b/src/orm/directive/RollbackDirective.ts @@ -3,8 +3,8 @@ import {Container, Inject, Injectable} from '../../di' import {EventBus} from '../../event/EventBus' import {Migrator} from '../migrations/Migrator' import {Migrations} from '../services/Migrations' -import {RollingBackMigrationEvent} from '../migrations/events/RollingBackMigrationEvent' -import {RolledBackMigrationEvent} from '../migrations/events/RolledBackMigrationEvent' +// import {RollingBackMigrationEvent} from '../migrations/events/RollingBackMigrationEvent' +// import {RolledBackMigrationEvent} from '../migrations/events/RolledBackMigrationEvent' import {EventSubscription} from '../../event/types' import {NothingToMigrateError} from '../migrations/NothingToMigrateError' @@ -85,13 +85,13 @@ export class RollbackDirective extends Directive { * @protected */ protected async registerListeners(): Promise { - this.subscriptions.push(await this.bus.subscribe(RollingBackMigrationEvent, event => { - this.info(`Rolling-back migration ${event.migration.identifier}...`) - })) - - this.subscriptions.push(await this.bus.subscribe(RolledBackMigrationEvent, event => { - this.success(`Rolled-back migration: ${event.migration.identifier}`) - })) + // this.subscriptions.push(await this.bus.subscribe(RollingBackMigrationEvent, event => { + // this.info(`Rolling-back migration ${event.migration.identifier}...`) + // })) + // + // this.subscriptions.push(await this.bus.subscribe(RolledBackMigrationEvent, event => { + // this.success(`Rolled-back migration: ${event.migration.identifier}`) + // })) } /** Remove event bus listeners before finish. */ diff --git a/src/orm/index.ts b/src/orm/index.ts index cc5af20..5f99f98 100644 --- a/src/orm/index.ts +++ b/src/orm/index.ts @@ -31,6 +31,7 @@ export * from './schema/Schema' export * from './schema/PostgresSchema' export * from './migrations/NothingToMigrateError' +export * from './migrations/events/MigrationEvent' export * from './migrations/events/ApplyingMigrationEvent' export * from './migrations/events/AppliedMigrationEvent' export * from './migrations/events/RollingBackMigrationEvent' diff --git a/src/orm/migrations/Migrator.ts b/src/orm/migrations/Migrator.ts index e51a749..4dae0e5 100644 --- a/src/orm/migrations/Migrator.ts +++ b/src/orm/migrations/Migrator.ts @@ -3,10 +3,10 @@ import {Awaitable, collect, ErrorWithContext} from '../../util' import {Migration} from './Migration' import {Migrations} from '../services/Migrations' import {EventBus} from '../../event/EventBus' -import {ApplyingMigrationEvent} from './events/ApplyingMigrationEvent' -import {AppliedMigrationEvent} from './events/AppliedMigrationEvent' -import {RollingBackMigrationEvent} from './events/RollingBackMigrationEvent' -import {RolledBackMigrationEvent} from './events/RolledBackMigrationEvent' +// import {ApplyingMigrationEvent} from './events/ApplyingMigrationEvent' +// import {AppliedMigrationEvent} from './events/AppliedMigrationEvent' +// import {RollingBackMigrationEvent} from './events/RollingBackMigrationEvent' +// import {RolledBackMigrationEvent} from './events/RolledBackMigrationEvent' import {NothingToMigrateError} from './NothingToMigrateError' /** @@ -259,8 +259,8 @@ export abstract class Migrator { * @protected */ protected async applying(migration: Migration): Promise { - const event = this.injector.make(ApplyingMigrationEvent, migration) - await this.bus.dispatch(event) + // const event = this.injector.make(ApplyingMigrationEvent, migration) + // await this.bus.dispatch(event) } /** @@ -269,8 +269,8 @@ export abstract class Migrator { * @protected */ protected async applied(migration: Migration): Promise { - const event = this.injector.make(AppliedMigrationEvent, migration) - await this.bus.dispatch(event) + // const event = this.injector.make(AppliedMigrationEvent, migration) + // await this.bus.dispatch(event) } /** @@ -279,8 +279,8 @@ export abstract class Migrator { * @protected */ protected async rollingBack(migration: Migration): Promise { - const event = this.injector.make(RollingBackMigrationEvent, migration) - await this.bus.dispatch(event) + // const event = this.injector.make(RollingBackMigrationEvent, migration) + // await this.bus.dispatch(event) } /** @@ -289,7 +289,7 @@ export abstract class Migrator { * @protected */ protected async rolledBack(migration: Migration): Promise { - const event = this.injector.make(RolledBackMigrationEvent, migration) - await this.bus.dispatch(event) + // const event = this.injector.make(RolledBackMigrationEvent, migration) + // await this.bus.dispatch(event) } } diff --git a/src/orm/model/Model.ts b/src/orm/model/Model.ts index 095fe27..06d517b 100644 --- a/src/orm/model/Model.ts +++ b/src/orm/model/Model.ts @@ -635,6 +635,30 @@ export abstract class Model> extends AppClass implements Bus return this } + /** + * Delete the current model from the database, if it exists. + */ + async delete(): Promise { + if ( !this.exists() ) { + return + } + + await this.query() + .where(this.qualifyKey(), '=', this.key()) + .delete() + + const ctor = this.constructor as typeof Model + const field = getFieldsMeta(this) + .firstWhere('databaseKey', '=', ctor.key) + + if ( field ) { + delete (this as any)[field.modelKey] + return + } + + delete (this as any)[ctor.key] + } + /** * Cast this model to a simple object mapping model fields to their values. * diff --git a/src/orm/model/ModelBuilder.ts b/src/orm/model/ModelBuilder.ts index 899b415..c91c544 100644 --- a/src/orm/model/ModelBuilder.ts +++ b/src/orm/model/ModelBuilder.ts @@ -1,8 +1,11 @@ import {Model} from './Model' import {AbstractBuilder} from '../builder/AbstractBuilder' import {AbstractResultIterable} from '../builder/result/AbstractResultIterable' -import {Instantiable} from '../../di' +import {Instantiable, StaticClass} from '../../di' import {ModelResultIterable} from './ModelResultIterable' +import {Collection} from '../../util' +import {ConstraintOperator, ModelKey, ModelKeys} from '../types' +import {EscapeValue} from '../dialect/SQLDialect' /** * Implementation of the abstract builder whose results yield instances of a given Model, `T`. @@ -10,7 +13,7 @@ import {ModelResultIterable} from './ModelResultIterable' export class ModelBuilder> extends AbstractBuilder { constructor( /** The model class that is created for results of this query. */ - protected readonly ModelClass: Instantiable, + protected readonly ModelClass: StaticClass & Instantiable, ) { super() } @@ -22,4 +25,45 @@ export class ModelBuilder> extends AbstractBuilder { public getResultIterable(): AbstractResultIterable { return this.app().make>(ModelResultIterable, this, this.registeredConnection, this.ModelClass) } + + /** + * Apply a WHERE...IN... constraint on the primary key of the model. + * @param keys + */ + public whereKey(keys: ModelKeys): this { + return this.whereIn( + this.ModelClass.qualifyKey(), + this.normalizeModelKeys(keys), + ) + } + + /** + * Apply a where constraint on the column corresponding the the specified + * property on the model. + * @param propertyName + * @param operator + * @param operand + */ + public whereProperty(propertyName: string, operator: ConstraintOperator, operand?: EscapeValue): this { + return this.where( + this.ModelClass.propertyToColumn(propertyName), + operator, + operand, + ) + } + + /** + * Given some format of keys of the model, try to normalize them to a flat array. + * @param keys + * @protected + */ + protected normalizeModelKeys(keys: ModelKeys): ModelKey[] { + if ( Array.isArray(keys) ) { + return keys + } else if ( keys instanceof Collection ) { + return keys.all() + } + + return [keys] + } } diff --git a/src/orm/services/Migrations.ts b/src/orm/services/Migrations.ts index 9a3d821..4015cb7 100644 --- a/src/orm/services/Migrations.ts +++ b/src/orm/services/Migrations.ts @@ -9,14 +9,14 @@ import {CommandLine} from '../../cli' import {MigrateDirective} from '../directive/MigrateDirective' import {RollbackDirective} from '../directive/RollbackDirective' import {CreateMigrationDirective} from '../directive/CreateMigrationDirective' +import {MigratorFactory} from '../migrations/MigratorFactory' /** * Service unit that loads and instantiates migration classes. */ @Singleton() export class Migrations extends CanonicalInstantiable { - @Inject() - protected readonly migrator!: Migrator + protected migrator!: Migrator @Inject() protected readonly cli!: CommandLine @@ -34,6 +34,13 @@ export class Migrations extends CanonicalInstantiable { this.logging.debug(`Base migration path does not exist, or has no files: ${this.path}`) } + // Register the migrator factory + this.container().registerFactory( + this.container().make(MigratorFactory), + ) + + this.migrator = this.container().make(Migrator) + // Register the migrations for @extollo/lib const basePath = lib().concat('migrations') const resolver = await this.buildMigrationNamespaceResolver('@extollo', basePath) diff --git a/src/orm/support/CacheModel.ts b/src/orm/support/CacheModel.ts index de9f6a8..d2f33f8 100644 --- a/src/orm/support/CacheModel.ts +++ b/src/orm/support/CacheModel.ts @@ -1,6 +1,8 @@ import {Model} from '../model/Model' import {Field} from '../model/Field' import {FieldType} from '../types' +import {Maybe} from '../../util' +import {ModelBuilder} from '../model/ModelBuilder' /** * A model instance which stores records from the ORMCache driver. @@ -18,4 +20,15 @@ export class CacheModel extends Model { @Field(FieldType.timestamp, 'cache_expires') public cacheExpires?: Date; + + public static withCacheKey(key: string): ModelBuilder { + return this.query() + .whereKey(key) + .whereProperty('cacheExpires', '>', new Date()) + } + + public static getCacheKey(key: string): Promise> { + return this.withCacheKey(key) + .first() + } } diff --git a/src/orm/support/ORMCache.ts b/src/orm/support/ORMCache.ts index 2606d81..232d434 100644 --- a/src/orm/support/ORMCache.ts +++ b/src/orm/support/ORMCache.ts @@ -1,5 +1,5 @@ import {Container} from '../../di' -import {Cache} from '../../util' +import {Awaitable, Cache, ErrorWithContext, Maybe} from '../../util' import {CacheModel} from './CacheModel' /** @@ -7,14 +7,7 @@ import {CacheModel} from './CacheModel' */ export class ORMCache extends Cache { public async fetch(key: string): Promise { - const model = await CacheModel.query() - .where(CacheModel.qualifyKey(), '=', key) - .where(CacheModel.propertyToColumn('cacheExpires'), '>', new Date()) - .first() - - if ( model ) { - return model.cacheValue - } + return (await CacheModel.getCacheKey(key))?.cacheValue } public async put(key: string, value: string, expires?: Date): Promise { @@ -31,15 +24,103 @@ export class ORMCache extends Cache { } public async has(key: string): Promise { - return CacheModel.query() - .where(CacheModel.qualifyKey(), '=', key) - .where(CacheModel.propertyToColumn('cacheExpires'), '>', new Date()) + return CacheModel.withCacheKey(key) .exists() } public async drop(key: string): Promise { await CacheModel.query() - .where(CacheModel.qualifyKey(), '=', key) + .whereKey(key) .delete() } + + public async pop(key: string): Promise { + return CacheModel.getConnection() + .asTransaction(async () => { + const model = await CacheModel.getCacheKey(key) + if ( !model ) { + throw new ErrorWithContext('Cannot pop cache value: key does not exist.', { + key, + }) + } + + await model.delete() + return model.cacheValue + }) + } + + public increment(key: string, amount = 1): Awaitable { + return CacheModel.getConnection() + .asTransaction(async () => { + const model = await CacheModel.getCacheKey(key) + if ( !model ) { + await this.put(key, String(amount)) + return amount + } + + model.cacheValue = String(parseInt(model.cacheValue, 10) + amount) + await model.save() + return parseInt(model.cacheValue, 10) + }) + } + + public decrement(key: string, amount = 1): Awaitable { + return CacheModel.getConnection() + .asTransaction(async () => { + const model = await CacheModel.getCacheKey(key) + if ( !model ) { + await this.put(key, String(-amount)) + return amount + } + + model.cacheValue = String(parseInt(model.cacheValue, 10) - amount) + await model.save() + return parseInt(model.cacheValue, 10) + }) + } + + public async arrayPush(key: string, value: string): Promise { + await CacheModel.getConnection() + .asTransaction(async () => { + const model = await CacheModel.getCacheKey(key) + if ( !model ) { + await this.put(key, JSON.stringify([value])) + return + } + + const cacheValue = JSON.parse(model.cacheValue) + if ( !Array.isArray(cacheValue) ) { + throw new ErrorWithContext('Cannot push value to non-array.', { + key, + }) + } + + cacheValue.push(value) + model.cacheValue = JSON.stringify(cacheValue) + }) + + throw new Error('Method not implemented.') + } + + public async arrayPop(key: string): Promise> { + return CacheModel.getConnection() + .asTransaction>(async () => { + const model = await CacheModel.getCacheKey(key) + if ( !model ) { + return + } + + const cacheValue = JSON.parse(model.cacheValue) + if ( !Array.isArray(cacheValue) ) { + throw new ErrorWithContext('Cannot pop value from non-array.', { + key, + }) + } + + const value = cacheValue.pop() + model.cacheValue = JSON.stringify(cacheValue) + await model.save() + return value + }) + } } diff --git a/src/orm/types.ts b/src/orm/types.ts index f721c1d..934913d 100644 --- a/src/orm/types.ts +++ b/src/orm/types.ts @@ -11,6 +11,11 @@ export type QueryRow = { [key: string]: any } */ export type ModelKey = string | number +/** + * Collection of keys of a set of models. + */ +export type ModelKeys = ModelKey | ModelKey[] | Collection + /** * Interface for the result of a query execution. */ diff --git a/src/service/Canon.ts b/src/service/Canon.ts index 7dc5733..2584f8a 100644 --- a/src/service/Canon.ts +++ b/src/service/Canon.ts @@ -1,5 +1,6 @@ import {Canonical} from './Canonical' import {Singleton} from '../di' +import {Maybe} from '../util' /** * Error throw when a duplicate canonical key is registered. @@ -46,6 +47,17 @@ export class Canon { return this.resources[key] as Canonical } + /** + * Get a canonical item from a fully-qualified canonical name. + * This is just a quality-of-life wrapper around `this.resource(...).get(...)`. + * @param key + */ + getFromFullyQualified(key: string): Maybe { + const [namespace, ...parts] = key.split('::') + const unqualified = parts.join('::') + return this.resource(namespace).get(unqualified) + } + /** * Register a canonical resource. * @param {Canonical} unit diff --git a/src/service/Canonical.ts b/src/service/Canonical.ts index 90a4aa4..e911409 100644 --- a/src/service/Canonical.ts +++ b/src/service/Canonical.ts @@ -7,6 +7,7 @@ import {Logging} from './Logging' import {Inject} from '../di' import * as nodePath from 'path' import {Unit} from '../lifecycle/Unit' +import {isCanonicalReceiver} from '../support/CanonicalReceiver' /** * Interface describing a definition of a single canonical item loaded from the app. @@ -228,7 +229,16 @@ export abstract class Canonical extends Unit { const definition = await this.buildCanonicalDefinition(entry) this.logging.verbose(`Registering canonical ${this.canonicalItem} "${definition.canonicalName}" from ${entry}`) - this.loadedItems[definition.canonicalName] = await this.initCanonicalItem(definition) + const resolvedItem = await this.initCanonicalItem(definition) + + if ( isCanonicalReceiver(resolvedItem) ) { + resolvedItem.setCanonicalResolver( + `${this.canonicalItems}::${definition.canonicalName}`, + definition.canonicalName, + ) + } + + this.loadedItems[definition.canonicalName] = resolvedItem } this.canon.registerCanonical(this) diff --git a/src/service/Queueables.ts b/src/service/Queueables.ts new file mode 100644 index 0000000..f4ea45a --- /dev/null +++ b/src/service/Queueables.ts @@ -0,0 +1,25 @@ +import {CanonicalStatic} from './CanonicalStatic' +import {Singleton, Instantiable, StaticClass} from '../di' +import {CanonicalDefinition} from './Canonical' +import {Queueable} from '../support/queue/Queue' + +/** + * A canonical unit that resolves Queueable classes from `app/queueables`. + */ +@Singleton() +export class Queueables extends CanonicalStatic> { + protected appPath = ['queueables'] + + protected canonicalItem = 'job' + + protected suffix = '.job.js' + + public async initCanonicalItem(definition: CanonicalDefinition): Promise>> { + const item = await super.initCanonicalItem(definition) + if ( !(item.prototype instanceof Queueable) ) { + throw new TypeError(`Invalid middleware definition: ${definition.originalName}. Controllers must extend from @extollo/lib.Queueable.`) + } + + return item + } +} diff --git a/src/support/CanonicalReceiver.ts b/src/support/CanonicalReceiver.ts new file mode 100644 index 0000000..43884e4 --- /dev/null +++ b/src/support/CanonicalReceiver.ts @@ -0,0 +1,61 @@ +import {AppClass} from '../lifecycle/AppClass' + +/** + * Interface for a class that receives its canonical resolver names upon load. + */ +export interface CanonicalReceiver { + setCanonicalResolver(fullyQualifiedResolver: string, unqualifiedResolver: string): void + getCanonicalResolver(): string | undefined + getFullyQualifiedCanonicalResolver(): string | undefined +} + +/** + * Function that checks whether a given value satisfies the CanonicalReceiver interface. + * @param something + */ +export function isCanonicalReceiver(something: unknown): something is CanonicalReceiver { + return ( + typeof something === 'function' + && typeof (something as any).setCanonicalResolver === 'function' + && (something as any).setCanonicalResolver.length >= 1 + && typeof (something as any).getCanonicalResolver === 'function' + && (something as any).getCanonicalResolver.length === 0 + ) +} + +/** + * Base class for canonical items that implements the CanonicalReceiver interface. + * That is, `isCanonicalReceiver(CanonicalItemClass) === true`. + */ +export class CanonicalItemClass extends AppClass { + /** The type-prefixed canonical resolver of this class, set by the startup unit. */ + private static canonFullyQualifiedResolver?: string + + /** The unqualified canonical resolver of this class, set by the startup unit. */ + private static canonUnqualifiedResolver?: string + + /** + * Sets the fully- and un-qualified canonical resolver strings. Intended for use + * by the Canonical unit. + * @param fullyQualifiedResolver + * @param unqualifiedResolver + */ + public static setCanonicalResolver(fullyQualifiedResolver: string, unqualifiedResolver: string): void { + this.canonFullyQualifiedResolver = fullyQualifiedResolver + this.canonUnqualifiedResolver = unqualifiedResolver + } + + /** + * Get the fully-qualified canonical resolver of this class, if one has been set. + */ + public static getFullyQualifiedCanonicalResolver(): string | undefined { + return this.canonFullyQualifiedResolver + } + + /** + * Get the unqualified canonical resolver of this class, if one has been set. + */ + public static getCanonicalResolver(): string | undefined { + return this.canonUnqualifiedResolver + } +} diff --git a/src/support/cache/MemoryCache.ts b/src/support/cache/MemoryCache.ts index 9988b13..dc68e89 100644 --- a/src/support/cache/MemoryCache.ts +++ b/src/support/cache/MemoryCache.ts @@ -8,7 +8,10 @@ export class MemoryCache extends Cache { /** Static collection of in-memory cache items. */ private static cacheItems: Collection<{key: string, value: string, expires?: Date}> = new Collection<{key: string; value: string, expires?: Date}>() - public fetch(key: string): Awaitable { + /** Static collection of in-memory arrays. */ + private static cacheArrays: Collection<{key: string, values: string[]}> = new Collection<{key: string; values: string[]}>() + + public fetch(key: string): string|undefined { const now = new Date() return MemoryCache.cacheItems .where('key', '=', key) @@ -41,4 +44,41 @@ export class MemoryCache extends Cache { public drop(key: string): Awaitable { MemoryCache.cacheItems = MemoryCache.cacheItems.where('key', '!=', key) } + + public decrement(key: string, amount = 1): Awaitable { + const nextValue = (parseInt(this.fetch(key) ?? '0', 10) ?? 0) - amount + this.put(key, String(nextValue)) + return nextValue + } + + public increment(key: string, amount = 1): Awaitable { + const nextValue = (parseInt(this.fetch(key) ?? '0', 10) ?? 0) + amount + this.put(key, String(nextValue)) + return nextValue + } + + public pop(key: string): Awaitable { + const value = this.fetch(key) + this.drop(key) + return value + } + + public arrayPop(key: string): Awaitable { + const arr = MemoryCache.cacheArrays.firstWhere('key', '=', key) + if ( arr ) { + return arr.values.shift() + } + } + + public arrayPush(key: string, value: string): Awaitable { + const arr = MemoryCache.cacheArrays.firstWhere('key', '=', key) + if ( arr ) { + arr.values.push(value) + } else { + MemoryCache.cacheArrays.push({ + key, + values: [value], + }) + } + } } diff --git a/src/support/cache/RedisCache.ts b/src/support/cache/RedisCache.ts new file mode 100644 index 0000000..0f73c53 --- /dev/null +++ b/src/support/cache/RedisCache.ts @@ -0,0 +1,85 @@ +import {Cache, Maybe} from '../../util' +import {Inject, Injectable} from '../../di' +import {Redis} from '../redis/Redis' + +/** + * Redis-driven Cache implementation. + */ +@Injectable() +export class RedisCache extends Cache { + /** The Redis service. */ + @Inject() + protected readonly redis!: Redis + + async arrayPop(key: string): Promise { + return this.redis.pipe() + .tap(redis => redis.lpop(key)) + .resolve() + } + + async arrayPush(key: string, value: string): Promise { + await this.redis.pipe() + .tap(redis => redis.rpush(key, value)) + .resolve() + } + + async decrement(key: string, amount?: number): Promise { + return this.redis.pipe() + .tap(redis => redis.decrby(key, amount ?? 1)) + .resolve() + } + + async increment(key: string, amount?: number): Promise { + return this.redis.pipe() + .tap(redis => redis.incrby(key, amount ?? 1)) + .resolve() + } + + async drop(key: string): Promise { + await this.redis.pipe() + .tap(redis => redis.del(key)) + .resolve() + } + + async fetch(key: string): Promise { + return this.redis.pipe() + .tap(redis => redis.get(key)) + .tap(value => value ?? undefined) + .resolve() + } + + async has(key: string): Promise { + return this.redis.pipe() + .tap(redis => redis.exists(key)) + .tap(numExisting => numExisting > 0) + .resolve() + } + + pop(key: string): Promise> { + return new Promise>((res, rej) => { + this.redis.pipe() + .tap(redis => { + redis.multi() + .get(key, (err, value) => { + if ( err ) { + rej(err) + } else { + res(value) + } + }) + .del(key) + }) + }) + } + + async put(key: string, value: string, expires?: Date): Promise { + await this.redis.multi() + .tap(redis => redis.set(key, value)) + .when(Boolean(expires), redis => { + const seconds = Math.round(((new Date()).getTime() - expires!.getTime()) / 1000) // eslint-disable-line @typescript-eslint/no-non-null-assertion + return redis.expire(key, seconds) + }) + .tap(pipeline => pipeline.exec()) + .resolve() + } +} diff --git a/src/support/queue/Queue.ts b/src/support/queue/Queue.ts new file mode 100644 index 0000000..d3f9982 --- /dev/null +++ b/src/support/queue/Queue.ts @@ -0,0 +1,190 @@ +import {Awaitable, ErrorWithContext, JSONState, Maybe, Rehydratable, Cache} from '../../util' +import {CanonicalItemClass} from '../CanonicalReceiver' +import {Container, Inject, Injectable, isInstantiable} from '../../di' +import {Canon} from '../../service/Canon' + +/** Type annotation for a Queueable that should be pushed onto a queue. */ +export type ShouldQueue = T & Queueable + +/** + * Base class for an object that can be pushed to/popped from a queue. + */ +export abstract class Queueable extends CanonicalItemClass implements Rehydratable { + abstract dehydrate(): Awaitable + + abstract rehydrate(state: JSONState): Awaitable + + /** + * When the item is popped from the queue, this method is called. + */ + public abstract execute(): Awaitable + + /** + * Determine whether the object should be pushed to the queue or not. + */ + public shouldQueue(): boolean { + return true + } + + /** + * The name of the queue where this object should be pushed by default. + */ + public defaultQueue(): string { + return 'default' + } + + /** + * Get the canonical resolver so we can re-instantiate this class from the queue. + * Throw an error if it could not be determined. + */ + public getFullyQualifiedCanonicalResolver(): string { + const resolver = (this.constructor as typeof Queueable).getFullyQualifiedCanonicalResolver() + if ( !resolver ) { + throw new ErrorWithContext('Cannot push Queueable onto queue: missing canonical resolver.') + } + + return resolver + } +} + +/** + * Truth function that returns true if an object implements the same interface as Queueable. + * This is done in case some external library needs to be incorporated as the base class for + * a Queueable, and cannot be made to extend Queueable. + * @param something + */ +export function isQueueable(something: unknown): something is Queueable { + if ( something instanceof Queueable ) { + return true + } + + return ( + typeof something === 'function' + && typeof (something as any).dehydrate === 'function' + && typeof (something as any).rehydrate === 'function' + && typeof (something as any).shouldQueue === 'function' + && typeof (something as any).defaultQueue === 'function' + && typeof (something as any).getFullyQualifiedCanonicalResolver === 'function' + ) +} + +/** + * Truth function that returns true if the given object is Queueable and wants to be + * pushed onto the queue. + * @param something + */ +export function shouldQueue(something: T): something is ShouldQueue { + return isQueueable(something) && something.shouldQueue() +} + +/** + * A multi-node queue that accepts & reinstantiates Queueables. + * + * @example + * There are several queue backends your application may use. These are + * configured via the `queue` config. To get the default queue, however, + * use this class as a DI token: + * ```ts + * this.container().make(Queue) + * ``` + * + * This will resolve the concrete implementation configured by your app. + */ +@Injectable() +export class Queue { + @Inject() + protected readonly cache!: Cache + + @Inject() + protected readonly canon!: Canon + + @Inject('injector') + protected readonly injector!: Container + + constructor( + public readonly name: string, + ) { } + + public get queueIdentifier(): string { + return `extollo__queue__${this.name}` + } + + /** Get the number of items waiting in the queue. */ + // public abstract length(): Awaitable + + /** Push a new queueable onto the queue. */ + public async push(item: ShouldQueue): Promise { + const data = { + q: true, + r: item.getFullyQualifiedCanonicalResolver(), + d: await item.dehydrate(), + } + + await this.cache.arrayPush(this.queueIdentifier, JSON.stringify(data)) + } + + /** Remove and return a queueable from the queue. */ + public async pop(): Promise> { + const item = await this.cache.arrayPop(this.queueIdentifier) + if ( !item ) { + return + } + + const data = JSON.parse(item) + if ( !data.q || !data.r ) { + throw new ErrorWithContext('Cannot pop Queueable: payload is invalid.', { + data, + queueName: this.name, + queueIdentifier: this.queueIdentifier, + }) + } + + const canonicalItem = this.canon.getFromFullyQualified(data.r) + if ( !canonicalItem ) { + throw new ErrorWithContext('Cannot pop Queueable: canonical name is not resolvable', { + data, + queueName: this.name, + queueIdentifier: this.queueIdentifier, + canonicalName: data.r, + }) + } + + if ( !isInstantiable(canonicalItem) ) { + throw new ErrorWithContext('Cannot pop Queueable: canonical item is not instantiable', { + data, + canonicalItem, + queueName: this.name, + queueIdentifier: this.queueIdentifier, + canonicalName: data.r, + }) + } + + const instance = this.injector.make(canonicalItem) + if ( !isQueueable(instance) ) { + throw new ErrorWithContext('Cannot pop Queueable: canonical item instance is not Queueable', { + data, + canonicalItem, + instance, + queueName: this.name, + queueIdentifier: this.queueIdentifier, + canonicalName: data.r, + }) + } + + await instance.rehydrate(data.d) + return instance + } + + /** Push a raw payload onto the queue. */ + public async pushRaw(item: JSONState): Promise { + await this.cache.arrayPush(this.queueIdentifier, JSON.stringify(item)) + } + + /** Remove and return a raw payload from the queue. */ + public async popRaw(): Promise> { + const item = await this.cache.arrayPop(this.queueIdentifier) + if ( item ) { + return JSON.parse(item) + } + } +} diff --git a/src/support/redis/Redis.ts b/src/support/redis/Redis.ts new file mode 100644 index 0000000..73e390c --- /dev/null +++ b/src/support/redis/Redis.ts @@ -0,0 +1,75 @@ +import {Inject, Singleton} from '../../di' +import {Config} from '../../service/Config' +import * as IORedis from 'ioredis' +import {RedisOptions} from 'ioredis' +import {Logging} from '../../service/Logging' +import {Unit} from '../../lifecycle/Unit' +import {AsyncPipe} from '../../util' + +export {RedisOptions} from 'ioredis' + +/** + * Unit that loads configuration for and manages instantiation + * of an IORedis connection. + */ +@Singleton() +export class Redis extends Unit { + /** The config service. */ + @Inject() + protected readonly config!: Config + + /** The loggers. */ + @Inject() + protected readonly logging!: Logging + + /** + * The instantiated connection, if one exists. + * @private + */ + private connection?: IORedis.Redis + + async up(): Promise { + this.logging.info('Attempting initial connection to Redis...') + this.logging.debug('Config:') + this.logging.debug(Config) + this.logging.debug(this.config) + await this.getConnection() + } + + async down(): Promise { + this.logging.info('Disconnecting Redis...') + if ( this.connection?.status === 'ready' ) { + await this.connection.disconnect() + } + } + + /** + * Get the IORedis connection instance. + */ + public async getConnection(): Promise { + if ( !this.connection ) { + const options = this.config.get('redis.connection') as RedisOptions + this.logging.verbose(options) + this.connection = new IORedis(options) + } + + return this.connection + } + + /** + * Get the IORedis connection in an AsyncPipe. + */ + public pipe(): AsyncPipe { + return new AsyncPipe(() => this.getConnection()) + } + + /** + * Get an IORedis.Pipeline instance in an AsyncPipe. + */ + public multi(): AsyncPipe { + return this.pipe() + .tap(redis => { + return redis.multi() + }) + } +} diff --git a/src/util/cache/Cache.ts b/src/util/cache/Cache.ts index e244052..8cbdc48 100644 --- a/src/util/cache/Cache.ts +++ b/src/util/cache/Cache.ts @@ -15,8 +15,9 @@ export abstract class Cache { * Store the given value in the cache by key. * @param {string} key * @param {string} value + * @param expires */ - public abstract put(key: string, value: string): Awaitable; + public abstract put(key: string, value: string, expires?: Date): Awaitable; /** * Check if the cache has the given key. @@ -30,4 +31,38 @@ export abstract class Cache { * @param {string} key */ public abstract drop(key: string): Awaitable; + + /** + * Fetch an item from the cache by key, and then remove it. + * @param key + */ + public abstract pop(key: string): Awaitable; + + /** + * Increment a key in the cache by a given amount. + * @param key + * @param amount + */ + public abstract increment(key: string, amount?: number): Awaitable; + + /** + * Decrement a key in the cache by a given amount. + * @param key + * @param amount + */ + public abstract decrement(key: string, amount?: number): Awaitable; + + /** + * Push an item onto the end an array-like key. + * @param key + * @param value + */ + public abstract arrayPush(key: string, value: string): Awaitable; + + /** + * Remove and return an item from the beginning of an array-like key. + * @param key + * @param value + */ + public abstract arrayPop(key: string): Awaitable; } diff --git a/src/util/cache/InMemCache.ts b/src/util/cache/InMemCache.ts index 2694a4b..748a948 100644 --- a/src/util/cache/InMemCache.ts +++ b/src/util/cache/InMemCache.ts @@ -1,5 +1,7 @@ import { Cache } from './Cache' import { Collection } from '../collection/Collection' +import {Awaitable, Maybe} from '../support/types' +import {ErrorWithContext} from '../error/ErrorWithContext' /** * Base interface for an item stored in a memory cache. @@ -44,4 +46,61 @@ export class InMemCache extends Cache { public async drop(key: string): Promise { this.items = this.items.whereNot('key', '=', key) } + + public pop(key: string): Awaitable> { + const existing = this.items.firstWhere('key', '=', key) + this.items = this.items.where('key', '!=', key) + return existing?.item + } + + public async increment(key: string, amount?: number): Promise { + const next = parseInt((await this.fetch(key)) ?? '0', 10) + (amount ?? 1) + await this.put(key, String(next)) + return next + } + + public async decrement(key: string, amount?: number): Promise { + const next = parseInt((await this.fetch(key)) ?? '0', 10) - (amount ?? 1) + await this.put(key, String(next)) + return next + } + + public arrayPush(key: string, value: string): Awaitable { + const existing = this.items.where('key', '=', key).first() + const arr = JSON.parse(existing?.item ?? '[]') + + if ( !Array.isArray(arr) ) { + throw new ErrorWithContext('Unable to arrayPush: key is not an array', { + key, + value, + }) + } + + arr.push(value) + if ( existing ) { + existing.item = JSON.stringify(arr) + } else { + this.items.push({ + key, + item: JSON.stringify(arr), + }) + } + } + + public arrayPop(key: string): Awaitable> { + const existing = this.items.where('key', '=', key).first() + const arr = JSON.parse(existing?.item ?? '[]') + + const value = arr.pop() + if ( existing ) { + existing.item = JSON.stringify(arr) + } else { + this.items.push({ + key, + item: JSON.stringify(arr), + }) + } + + return value + } } diff --git a/src/util/support/Pipe.ts b/src/util/support/Pipe.ts index a74a1a8..6700071 100644 --- a/src/util/support/Pipe.ts +++ b/src/util/support/Pipe.ts @@ -158,6 +158,8 @@ export type AsyncPipeResolver = () => Awaitable */ export type AsyncPipeOperator = (subject: T) => Awaitable +export type PromisePipeOperator = (subject: T, resolve: (val: T2) => unknown, reject: (err: Error) => unknown) => Awaitable + /** * A closure that maps a given pipe item to an item of the same type. */ @@ -193,6 +195,23 @@ export class AsyncPipe { return new AsyncPipe(async () => op(await this.subject())) } + /** + * Apply a transformative operator to the pipe, wrapping it + * in a Promise and passing the resolve/reject callbacks to the + * closure. + * @param op + */ + promise(op: PromisePipeOperator): AsyncPipe { + return new AsyncPipe(() => { + return new Promise((res, rej) => { + (async () => this.subject())() + .then(subject => { + op(subject, res, rej) + }) + }) + }) + } + /** * Apply an operator to the pipe, but return the reference * to the current pipe. The operator is resolved when the