From 33a64b99ff6375cc975b8882126e8e211767c146 Mon Sep 17 00:00:00 2001 From: garrettmills Date: Thu, 14 Jul 2022 01:15:16 -0500 Subject: [PATCH] Implement websocket server --- src/http/kernel/HTTPKernel.ts | 14 -- ...ExecuteResolvedRoutePreflightHTTPModule.ts | 5 +- ...ecuteResolvedWebSocketHandlerHTTPModule.ts | 35 +++++ .../MonitorWebSocketConnectionHTTPModule.ts | 96 ++++++++++++++ .../module/MountWebSocketRouteHTTPModule.ts | 51 ++++++++ src/http/lifecycle/WebSocketCloseEvent.ts | 11 ++ src/http/response/api.ts | 62 ++++++++- src/http/routing/Route.ts | 5 +- src/index.ts | 1 + src/service/WebsocketServer.ts | 92 ++++++++++++- src/support/bus/RedisBus.ts | 4 +- src/support/bus/WebSocketBus.ts | 123 ++++++++++++++++++ src/support/bus/index.ts | 3 + src/support/bus/serial/JSONMessageEvent.ts | 35 +++++ src/support/bus/serial/NamedEventPayload.ts | 36 +++++ 15 files changed, 546 insertions(+), 27 deletions(-) create mode 100644 src/http/kernel/module/ExecuteResolvedWebSocketHandlerHTTPModule.ts create mode 100644 src/http/kernel/module/MonitorWebSocketConnectionHTTPModule.ts create mode 100644 src/http/kernel/module/MountWebSocketRouteHTTPModule.ts create mode 100644 src/http/lifecycle/WebSocketCloseEvent.ts create mode 100644 src/support/bus/WebSocketBus.ts create mode 100644 src/support/bus/serial/JSONMessageEvent.ts create mode 100644 src/support/bus/serial/NamedEventPayload.ts diff --git a/src/http/kernel/HTTPKernel.ts b/src/http/kernel/HTTPKernel.ts index ed3f017..2b57260 100644 --- a/src/http/kernel/HTTPKernel.ts +++ b/src/http/kernel/HTTPKernel.ts @@ -35,16 +35,6 @@ export interface ModuleRegistrationFluency { core: () => HTTPKernel, } -/** - * Error thrown when a kernel module is requested that does not exist w/in the kernel. - * @extends Error - */ -export class KernelModuleNotFoundError extends Error { - constructor(name: string) { - super(`The kernel module ${name} is not registered with the kernel.`) - } -} - /** * A singleton class that handles requests, applying logic in modular layers. */ @@ -140,8 +130,6 @@ export class HTTPKernel extends AppClass { if ( typeof foundIdx !== 'undefined' ) { this.postflight = this.postflight.put(foundIdx, this.app().make(module)) - } else { - throw new KernelModuleNotFoundError(other.name) } return this @@ -162,8 +150,6 @@ export class HTTPKernel extends AppClass { if ( typeof foundIdx !== 'undefined' ) { this.postflight = this.postflight.put(foundIdx + 1, this.app().make(module)) - } else { - throw new KernelModuleNotFoundError(other.name) } return this diff --git a/src/http/kernel/module/ExecuteResolvedRoutePreflightHTTPModule.ts b/src/http/kernel/module/ExecuteResolvedRoutePreflightHTTPModule.ts index dad812c..cc17eb6 100644 --- a/src/http/kernel/module/ExecuteResolvedRoutePreflightHTTPModule.ts +++ b/src/http/kernel/module/ExecuteResolvedRoutePreflightHTTPModule.ts @@ -5,6 +5,7 @@ import {ActivatedRoute} from '../../routing/ActivatedRoute' import {ResponseObject} from '../../routing/Route' import {AbstractResolvedRouteHandlerHTTPModule} from './AbstractResolvedRouteHandlerHTTPModule' import {collect, isLeft, unleft, unright, withErrorContext} from '../../../util' +import {MountWebSocketRouteHTTPModule} from './MountWebSocketRouteHTTPModule' /** * HTTP Kernel module that executes the preflight handlers for the route. @@ -13,7 +14,9 @@ import {collect, isLeft, unleft, unright, withErrorContext} from '../../../util' */ export class ExecuteResolvedRoutePreflightHTTPModule extends AbstractResolvedRouteHandlerHTTPModule { public static register(kernel: HTTPKernel): void { - kernel.register(this).after(MountActivatedRouteHTTPModule) + const reg = kernel.register(this) + reg.after(MountWebSocketRouteHTTPModule) + reg.after(MountActivatedRouteHTTPModule) } public async apply(request: Request): Promise { diff --git a/src/http/kernel/module/ExecuteResolvedWebSocketHandlerHTTPModule.ts b/src/http/kernel/module/ExecuteResolvedWebSocketHandlerHTTPModule.ts new file mode 100644 index 0000000..a716204 --- /dev/null +++ b/src/http/kernel/module/ExecuteResolvedWebSocketHandlerHTTPModule.ts @@ -0,0 +1,35 @@ +import {HTTPKernel} from '../HTTPKernel' +import {Request} from '../../lifecycle/Request' +import {ActivatedRoute} from '../../routing/ActivatedRoute' +import {withErrorContext} from '../../../util' +import {AbstractResolvedRouteHandlerHTTPModule} from './AbstractResolvedRouteHandlerHTTPModule' +import {ExecuteResolvedRoutePreflightHTTPModule} from './ExecuteResolvedRoutePreflightHTTPModule' +import {WebSocketBus} from '../../../support/bus' + +/** + * HTTP kernel module that runs the web socket handler for the socket connection's route. + */ +export class ExecuteResolvedWebSocketHandlerHTTPModule extends AbstractResolvedRouteHandlerHTTPModule { + public static register(kernel: HTTPKernel): void { + kernel.register(this).after(ExecuteResolvedRoutePreflightHTTPModule) + } + + public async apply(request: Request): Promise { + const route = > request.make(ActivatedRoute) + const params = route.resolvedParameters + if ( !params ) { + throw new Error('Attempted to call route handler without resolved parameters.') + } + + await withErrorContext(async () => { + const ws = request.make(WebSocketBus) + await route.handler + .tap(handler => handler(ws, ...params)) + .apply(request) + }, { + route, + }) + + return request + } +} diff --git a/src/http/kernel/module/MonitorWebSocketConnectionHTTPModule.ts b/src/http/kernel/module/MonitorWebSocketConnectionHTTPModule.ts new file mode 100644 index 0000000..653278c --- /dev/null +++ b/src/http/kernel/module/MonitorWebSocketConnectionHTTPModule.ts @@ -0,0 +1,96 @@ +import {HTTPKernelModule} from '../HTTPKernelModule' +import {HTTPKernel} from '../HTTPKernel' +import {Request} from '../../lifecycle/Request' +import {Inject, Injectable} from '../../../di' +import {Config} from '../../../service/Config' +import {setInterval} from 'timers' +import {Logging} from '../../../service/Logging' +import {WebSocketCloseEvent} from '../../lifecycle/WebSocketCloseEvent' +import Timeout = NodeJS.Timeout; +import {Bus} from '../../../support/bus' +import * as WebSockets from 'ws' +import {Maybe} from '../../../util' + +@Injectable() +export class MonitorWebSocketConnectionHTTPModule extends HTTPKernelModule { + @Inject() + protected readonly config!: Config + + @Inject() + protected readonly logging!: Logging + + public static register(kernel: HTTPKernel): void { + kernel.register(this).core() + } + + async apply(request: Request): Promise { + const ws = request.make(WebSockets.WebSocket) + + // Time to wait between pings + const pollIntervalMs = this.config.safe('server.socket.pollIntervalMs') + .or(30000) + .integer() + + // Time to wait for a response + const pollResponseTimeoutMs = this.config.safe('server.socket.pollResponseTimeoutMs') + .or(3000) + .integer() + + // Max # of failures before the connection is closed + const maxFailedPolls = this.config.safe('server.socket.maxFailedPolls') + .or(5) + .integer() + + let failedPolls = 0 + let interval: Maybe = undefined + + await new Promise(res => { + let gotResponse = false + + // Listen for pong responses + ws.on('pong', () => { + this.logging.verbose('Got pong response from socket.') + gotResponse = true + }) + + // Listen for close event + ws.on('close', () => { + this.logging.debug('Got close event from socket.') + res() + }) + + interval = setInterval(async () => { + // Every interval, send a ping request and set a timeout for the response + this.logging.verbose('Sending ping request to socket...') + gotResponse = false + ws.ping() + + await new Promise(res2 => setTimeout(res2, pollResponseTimeoutMs)) + + // If no pong response is received before the timeout occurs, tick the # of failed response + if ( !gotResponse ) { + this.logging.verbose('Socket failed to respond in time.') + failedPolls += 1 + } else { + // Otherwise, reset the failure counter + failedPolls = 0 + } + + // Once the failed responses exceeds the threshold, kill the connection + if ( failedPolls > maxFailedPolls ) { + this.logging.debug('Socket exceeded maximum # of failed pings. Killing.') + res() + } + }, pollIntervalMs) + }) + + if ( interval ) { + clearInterval(interval) + } + + // Tell the server to close the socket connection + const bus = request.make(Bus) + await bus.push(new WebSocketCloseEvent()) + return request + } +} diff --git a/src/http/kernel/module/MountWebSocketRouteHTTPModule.ts b/src/http/kernel/module/MountWebSocketRouteHTTPModule.ts new file mode 100644 index 0000000..df1ceee --- /dev/null +++ b/src/http/kernel/module/MountWebSocketRouteHTTPModule.ts @@ -0,0 +1,51 @@ +import {Injectable, Inject} from '../../../di' +import {HTTPKernelModule} from '../HTTPKernelModule' +import {HTTPKernel} from '../HTTPKernel' +import {Request} from '../../lifecycle/Request' +import {Routing} from '../../../service/Routing' +import {ActivatedRoute} from '../../routing/ActivatedRoute' +import {Logging} from '../../../service/Logging' +import {apiEvent, error} from '../../response/api' +import {Bus, WebSocketBus} from '../../../support/bus' +import {WebSocketCloseEvent} from '../../lifecycle/WebSocketCloseEvent' + +/** + * HTTP kernel middleware that tries to find a registered route matching the request's + * path and creates an ActivatedRoute instance from it, limited to websocket handling + * routes. + */ +@Injectable() +export class MountWebSocketRouteHTTPModule extends HTTPKernelModule { + public readonly executeWithBlockingWriteback = true + + @Inject() + protected readonly routing!: Routing + + @Inject() + protected readonly logging!: Logging + + public static register(kernel: HTTPKernel): void { + kernel.register(this).before() + } + + public async apply(request: Request): Promise { + const route = this.routing.match('ws', request.path) + if ( route ) { + this.logging.verbose(`Mounting activated WebSocket route: ${request.path} -> ${route}`) + const activated = > request.make(ActivatedRoute, route, request.path) + request.registerSingletonInstance>(ActivatedRoute, activated) + } else { + this.logging.debug(`No matching WebSocket route found for: ${request.method} -> ${request.path}`) + + // Send an error response on the socket to the client + const ws = request.make(WebSocketBus) + await ws.push(apiEvent(error('Endpoint is not a configured socket listener.'))) + + // Then, terminate the request & socket connections + await request.make(Bus).push(new WebSocketCloseEvent()) + request.response.blockingWriteback(true) + } + + return request + } +} diff --git a/src/http/lifecycle/WebSocketCloseEvent.ts b/src/http/lifecycle/WebSocketCloseEvent.ts new file mode 100644 index 0000000..19e60b3 --- /dev/null +++ b/src/http/lifecycle/WebSocketCloseEvent.ts @@ -0,0 +1,11 @@ +import {Event} from '../../support/bus' +import {uuid4} from '../../util' + +/** Event used to tell the server to close the websocket connection. */ +export class WebSocketCloseEvent implements Event { + eventName = '@extollo/lib:WebSocketCloseEvent' + + eventUuid = uuid4() + + shouldBroadcast = false +} diff --git a/src/http/response/api.ts b/src/http/response/api.ts index 41e83d6..8cb2e5d 100644 --- a/src/http/response/api.ts +++ b/src/http/response/api.ts @@ -1,7 +1,12 @@ /** * Base type for an API response format. */ +import {BaseSerializer, Event, ObjectSerializer} from '../../support/bus' +import {Awaitable, ErrorWithContext, hasOwnProperty, JSONState, uuid4} from '../../util' + export interface APIResponse { + eventName?: string, + eventUuid?: string, success: boolean, message?: string, data?: T, @@ -12,6 +17,61 @@ export interface APIResponse { } } +export function isAPIResponse(what: unknown): what is APIResponse { + return typeof what === 'object' && what !== null + && hasOwnProperty(what, 'success') + && typeof what.success === 'boolean' + && (!hasOwnProperty(what, 'message') || typeof what.message === 'string') + && (!hasOwnProperty(what, 'error') || ( + typeof what.error === 'object' && what.error !== null + && hasOwnProperty(what.error, 'name') && typeof what.error.name === 'string' + && hasOwnProperty(what.error, 'message') && typeof what.error.message === 'string' + && (!hasOwnProperty(what.error, 'stack') || ( + Array.isArray(what.error.stack) && what.error.stack.every(x => typeof x === 'string') + ) + ) + ) + ) +} + +export function apiEvent(response: APIResponse): APIResponse & Event { + if ( !response.eventName ) { + response.eventName = '@extollo/lib:APIResponse' + } + + if ( !response.eventUuid ) { + response.eventUuid = uuid4() + } + + return response as APIResponse & Event +} + +/** + * Serializer implementation that can encode/decode APIResponse objects. + */ +@ObjectSerializer() +export class APIResponseSerializer extends BaseSerializer, JSONState> { + protected decodeSerial(serial: JSONState): Awaitable> { + if ( isAPIResponse(serial) ) { + return serial + } + + throw new ErrorWithContext('Could not decode API response: object is malformed') + } + + protected encodeActual(actual: APIResponse): Awaitable { + return actual as unknown as JSONState + } + + protected getName(): string { + return '@extollo/lib:APIResponseSerializer' + } + + matchActual(some: APIResponse): boolean { + return isAPIResponse(some) + } +} + /** * Formats a mesage as a successful API response. * @param {string} displayMessage @@ -56,7 +116,7 @@ export function many(records: T[]): APIResponse<{records: T[], total: number} * @return APIResponse * @param thrownError */ -export function error(thrownError: string | Error): APIResponse { +export function error(thrownError: string | Error): APIResponse { if ( typeof thrownError === 'string' ) { return { success: false, diff --git a/src/http/routing/Route.ts b/src/http/routing/Route.ts index ab3f5cf..7c54de6 100644 --- a/src/http/routing/Route.ts +++ b/src/http/routing/Route.ts @@ -9,6 +9,7 @@ import {RouteGroup} from './RouteGroup' import {Config} from '../../service/Config' import {Application} from '../../lifecycle/Application' import {Logging} from '../../service/Logging' +import {WebSocketBus} from '../../support/bus/WebSocketBus' /** * Type alias for an item that is a valid response object, or lack thereof. @@ -140,8 +141,8 @@ export class Route { - return new Route('ws', endpoint) + public static socket(endpoint: string): Route, [WebSocketBus]> { + return new Route, [WebSocketBus]>('ws', endpoint) } /** diff --git a/src/index.ts b/src/index.ts index 625caa3..5bbf252 100644 --- a/src/index.ts +++ b/src/index.ts @@ -35,6 +35,7 @@ export * from './http/kernel/HTTPCookieJar' export * from './http/lifecycle/Request' export * from './http/lifecycle/Response' +export * from './http/lifecycle/WebSocketCloseEvent' export * from './http/RequestLocalStorage' export * from './make' diff --git a/src/service/WebsocketServer.ts b/src/service/WebsocketServer.ts index 95ed5c3..73072a1 100644 --- a/src/service/WebsocketServer.ts +++ b/src/service/WebsocketServer.ts @@ -5,6 +5,20 @@ import {HTTPServer} from './HTTPServer' import {Logging} from './Logging' import {ErrorWithContext} from '../util' import {Request} from '../http/lifecycle/Request' +import {AsyncResource, executionAsyncId} from 'async_hooks' +import {HTTPKernel} from '../http/kernel/HTTPKernel' +import {InjectSessionHTTPModule} from '../http/kernel/module/InjectSessionHTTPModule' +import {ExecuteResolvedRoutePreflightHTTPModule} from '../http/kernel/module/ExecuteResolvedRoutePreflightHTTPModule' +import {ParseIncomingBodyHTTPModule} from '../http/kernel/module/ParseIncomingBodyHTTPModule' +import {InjectRequestEventBusHTTPModule} from '../http/kernel/module/InjectRequestEventBusHTTPModule' +import {RequestLocalStorage} from '../http/RequestLocalStorage' +import {MountWebSocketRouteHTTPModule} from '../http/kernel/module/MountWebSocketRouteHTTPModule' +import {SetSessionCookieHTTPModule} from '../http/kernel/module/SetSessionCookieHTTPModule' +import {MonitorWebSocketConnectionHTTPModule} from '../http/kernel/module/MonitorWebSocketConnectionHTTPModule' +import {Bus, WebSocketBus} from '../support/bus' +import {WebSocketCloseEvent} from '../http/lifecycle/WebSocketCloseEvent' +import {ExecuteResolvedWebSocketHandlerHTTPModule} from '../http/kernel/module/ExecuteResolvedWebSocketHandlerHTTPModule' +import {InjectRequestLocale} from '../i18n' @Singleton() export class WebsocketServer extends Unit { @@ -14,8 +28,36 @@ export class WebsocketServer extends Unit { @Inject() protected readonly logging!: Logging + @Inject() + protected readonly requestLocalStorage!: RequestLocalStorage + + @Inject() + protected readonly bus!: Bus + + protected kernel?: HTTPKernel + protected server?: WebSocket.Server + protected getKernel(): HTTPKernel { + if ( !this.kernel ) { + const kernel = this.app().makeNew(HTTPKernel) + + SetSessionCookieHTTPModule.register(kernel) + InjectSessionHTTPModule.register(kernel) + MountWebSocketRouteHTTPModule.register(kernel) + ExecuteResolvedRoutePreflightHTTPModule.register(kernel) + ExecuteResolvedWebSocketHandlerHTTPModule.register(kernel) + ParseIncomingBodyHTTPModule.register(kernel) + InjectRequestEventBusHTTPModule.register(kernel) + MonitorWebSocketConnectionHTTPModule.register(kernel) + InjectRequestLocale.register(kernel) + + this.kernel = kernel + } + + return this.kernel + } + public async up(): Promise { // Make sure the HTTP server is started. Otherwise, this is going to fail anyway if ( this.http.status !== UnitStatus.Started ) { @@ -30,20 +72,58 @@ export class WebsocketServer extends Unit { server: this.http.getServer(), }) + // Turns out that the websocket server was causing context loss bc of the way + // its callback structure works. So, to allow us to access the global container + // from w/in the server handler, we need to give Node some guidance on which + // async context we're using. + const resource = new AsyncResource('WebsocketServer', { + triggerAsyncId: executionAsyncId(), + requireManualDestroy: false, + }) + // Register the websocket handler this.server.on('connection', (ws, request) => { - this.logging.info('Got WebSocket connection! ' + request.method) - const extolloReq = new Request(request) - this.logging.debug(ws) - this.logging.debug(request) + resource.runInAsyncScope(() => { + this.logging.info('Got WebSocket connection! ' + request.method) + const extolloReq = new Request(request) + + this.requestLocalStorage.run(extolloReq, async () => { + this.logging.info(`WebSocket connection: ${extolloReq.path}`) + + // Register the websocket with the request container + extolloReq.registerSingletonInstance(WebSocket.WebSocket, ws) + + // Register the websocket bus with the request container + const wsBus = extolloReq.makeNew(WebSocketBus) + await wsBus.up() + extolloReq.registerSingletonInstance(WebSocketBus, wsBus) + + // Register the listener to clean up this connection when it dies + extolloReq.onResolve(Bus) + .then(bus => bus.subscribe(WebSocketCloseEvent, () => { + extolloReq.destroy() + ws.terminate() + })) + + // Run the request through the kernel to get the setup + await this.getKernel().handle(extolloReq) + }) + }) }) } public down(): Promise { + // Stop the websocket server, if it exists return new Promise(res => { - // Stop the websocket server, if it exists if ( this.server ) { - this.server.close(() => res()) + // Since all the request busses connect to the global app bus, + // we can broadcast a global close event to get all in-progress + // connections to close their sockets. + const event = new WebSocketCloseEvent() + event.shouldBroadcast = true + + this.bus.push(event) + .then(() => this.server?.close?.(() => res())) } }) } diff --git a/src/support/bus/RedisBus.ts b/src/support/bus/RedisBus.ts index 754147a..bc977be 100644 --- a/src/support/bus/RedisBus.ts +++ b/src/support/bus/RedisBus.ts @@ -101,9 +101,7 @@ export class RedisBus implements EventBus, AwareOfContainerLifecycle { await this.subscriptions .where('eventName', '=', name) - .pluck('handler') - .map(handler => handler(event)) - .awaitAll() + .awaitMapCall('handler', event) } isConnected(): boolean { diff --git a/src/support/bus/WebSocketBus.ts b/src/support/bus/WebSocketBus.ts new file mode 100644 index 0000000..83d2778 --- /dev/null +++ b/src/support/bus/WebSocketBus.ts @@ -0,0 +1,123 @@ +import {AwareOfContainerLifecycle, Container, Inject, Injectable, StaticInstantiable} from '../../di' +import { + EventBus, + Event, + EventHandler, + EventHandlerSubscription, + BusSubscriber, + EventHandlerReturn, +} from './types' +import * as WebSocket from 'ws' +import {NoSerializerError, Serialization} from './serial/Serialization' +import {Logging} from '../../service/Logging' +import {Awaitable, Collection, Pipeline, uuid4} from '../../util' +import {getEventName} from './getEventName' +import {Bus} from './Bus' +import {WebSocketCloseEvent} from '../../http/lifecycle/WebSocketCloseEvent' +import {apiEvent, error} from '../../http/response/api' +import {AsyncResource, executionAsyncId} from 'async_hooks' + +@Injectable() +export class WebSocketBus implements EventBus, AwareOfContainerLifecycle { + awareOfContainerLifecycle: true = true + + @Inject() + protected readonly ws!: WebSocket.WebSocket + + @Inject() + protected readonly bus!: Bus + + @Inject() + protected readonly serial!: Serialization + + @Inject() + protected readonly logging!: Logging + + public readonly uuid = uuid4() + + private connected = false + + /** List of local subscriptions on this bus. */ + protected subscriptions: Collection> = new Collection() + + /** Get a Promise that resolves then the socket closes. */ + onClose(): Promise { + return new Promise(res => { + this.bus.subscribe(WebSocketCloseEvent, event => res(event)) + }) + } + + pipe(eventKey: StaticInstantiable, line: Pipeline): Awaitable { + return this.subscribe(eventKey, event => line.apply(event)) + } + + async push(event: Event): Promise { + this.logging.verbose(`Pushing event to WebSocket: ${event.eventName}`) + this.logging.verbose(event) + + await this.ws.send(await this.serial.encodeJSON(event)) + } + + async subscribe(eventKey: StaticInstantiable, handler: EventHandler): Promise { + const uuid = uuid4() + const subscriber: BusSubscriber = { + eventName: getEventName(eventKey), + eventKey, + handler, + uuid, + } as unknown as BusSubscriber + + this.logging.verbose(`Creating WebSocket subscriber ${uuid}...`) + this.logging.verbose(subscriber) + + this.subscriptions.push(subscriber) + + return { + unsubscribe: () => { + this.subscriptions = this.subscriptions.where('uuid', '!=', uuid) + }, + } + } + + protected async onMessage(message: string): Promise { + const payload = await this.serial.decodeJSON(message) // FIXME validation + + await this.subscriptions + .where('eventName', '=', payload.eventName) + .awaitMapCall('handler', payload) + } + + up(): void { + const resource = new AsyncResource('WebSocketBus', { + triggerAsyncId: executionAsyncId(), + requireManualDestroy: false, + }) + + this.ws.on('message', async data => { + await resource.runInAsyncScope(async () => { + this.logging.verbose(`Got data from websocket: ${data}`) + try { + Container.getContainer() + await this.onMessage(`${data}`) + } catch (e: unknown) { + if ( e instanceof NoSerializerError ) { + this.logging.warn(`Discarding message as no validator could be found to deserialize it: ${data}`) + this.push(apiEvent(error('Invalid message format or serializer.'))) + } else { + throw e + } + } + }) + }) + + this.connected = true + } + + down(): void { + this.connected = false + } + + isConnected(): boolean { + return this.connected // FIXME: monitor for bad connections + } +} diff --git a/src/support/bus/index.ts b/src/support/bus/index.ts index 18c6b0f..570a3d6 100644 --- a/src/support/bus/index.ts +++ b/src/support/bus/index.ts @@ -4,10 +4,13 @@ export * from './serial/BaseSerializer' export * from './serial/SimpleCanonicalItemSerializer' export * from './serial/Serialization' export * from './serial/decorators' +export * from './serial/NamedEventPayload' +export * from './serial/JSONMessageEvent' export * from './Bus' export * from './LocalBus' export * from './RedisBus' +export * from './WebSocketBus' export * from './queue/event/PushingToQueue' export * from './queue/event/PushedToQueue' diff --git a/src/support/bus/serial/JSONMessageEvent.ts b/src/support/bus/serial/JSONMessageEvent.ts new file mode 100644 index 0000000..b897e94 --- /dev/null +++ b/src/support/bus/serial/JSONMessageEvent.ts @@ -0,0 +1,35 @@ +import {BaseSerializer} from './BaseSerializer' +import {Awaitable, JSONState, uuid4} from '../../../util' +import {ObjectSerializer} from './decorators' +import {Event} from '../types' + +export class JSONMessageEvent implements Event { + constructor( + public readonly value: T, + ) {} + + eventName = '@extollo/lib:JSONMessageEvent' + + eventUuid = uuid4() +} + +@ObjectSerializer() +export class JSONMessageEventSerializer extends BaseSerializer, { value: JSONState }> { + protected decodeSerial(serial: { value: JSONState }): Awaitable> { + return new JSONMessageEvent(serial.value) + } + + protected encodeActual(actual: JSONMessageEvent): Awaitable<{ value: JSONState }> { + return { + value: actual.value, + } + } + + protected getName(): string { + return '@extollo/lib:JSONMessageEventSerializer' + } + + matchActual(some: JSONMessageEvent): boolean { + return some instanceof JSONMessageEvent + } +} diff --git a/src/support/bus/serial/NamedEventPayload.ts b/src/support/bus/serial/NamedEventPayload.ts new file mode 100644 index 0000000..4973dd1 --- /dev/null +++ b/src/support/bus/serial/NamedEventPayload.ts @@ -0,0 +1,36 @@ +import {Event, SerialPayload} from '../types' +import {ObjectSerializer} from './decorators' +import {BaseSerializer} from './BaseSerializer' +import {JSONState} from '../../../util' + +export class NamedEventPayload { + constructor( + public readonly eventName: string, + public readonly event: Event, + ) {} +} + +@ObjectSerializer() +export class NamedEventPayloadSerializer extends BaseSerializer }> { + protected async decodeSerial(serial: { eventName: string; event: SerialPayload }): Promise { + return new NamedEventPayload( + serial.eventName, + await this.getSerialization().decode(serial.event), + ) + } + + protected async encodeActual(actual: NamedEventPayload): Promise<{ eventName: string; event: SerialPayload }> { + return { + eventName: actual.eventName, + event: await this.getSerialization().encode(actual.event), + } + } + + protected getName(): string { + return '@extollo/lib:NamedEventPayloadSerializer' + } + + matchActual(some: NamedEventPayload): boolean { + return some instanceof NamedEventPayload + } +}