import {AwareOfContainerLifecycle, Inject, Injectable, StaticInstantiable} from '../../di' import { EventBus, Event, EventHandler, EventHandlerSubscription, BusSubscriber, EventHandlerReturn, } from './types' import * as WebSocket from 'ws' import {NoSerializerError, Serialization} from './serial/Serialization' import {Logging} from '../../service/Logging' import {Awaitable, Collection, Pipeline, uuid4} from '../../util' import {getEventName} from './getEventName' import {Bus} from './Bus' import {WebSocketCloseEvent} from '../../http/lifecycle/WebSocketCloseEvent' import {apiEvent, error} from '../../http/response/api' import {AsyncResource, executionAsyncId} from 'async_hooks' import {Session} from '../../http/session/Session' @Injectable() export class WebSocketBus implements EventBus, AwareOfContainerLifecycle { awareOfContainerLifecycle: true = true /** * If true, the session will be loaded when an event is received and * persisted after the event's handlers have executed. * * If an event has no handlers, the session will NOT be loaded. * * Use `disableSessionLoad()` to disable. * * @see disableSessionLoad * @protected */ protected shouldLoadSessionOnEvent = true @Inject() protected readonly ws!: WebSocket.WebSocket @Inject() protected readonly bus!: Bus @Inject() protected readonly serial!: Serialization @Inject() protected readonly logging!: Logging @Inject() protected readonly session!: Session public readonly uuid = uuid4() private connected = false /** List of local subscriptions on this bus. */ protected subscriptions: Collection> = new Collection() /** * Disables re-loading & persisting the session when an event with listeners is received. * @see shouldLoadSessionOnEvent */ disableSessionLoad(): this { this.shouldLoadSessionOnEvent = false return this } /** Get a Promise that resolves then the socket closes. */ onClose(): Promise { return new Promise(res => { this.bus.subscribe(WebSocketCloseEvent, event => res(event)) }) } pipe(eventKey: StaticInstantiable, line: Pipeline): Awaitable { return this.subscribe(eventKey, event => line.apply(event)) } async push(event: Event): Promise { this.logging.verbose(`Pushing event to WebSocket: ${event.eventName}`) this.logging.verbose(event) await this.ws.send(await this.serial.encodeJSON(event)) } async subscribe(eventKey: StaticInstantiable, handler: EventHandler): Promise { const uuid = uuid4() const subscriber: BusSubscriber = { eventName: getEventName(eventKey), eventKey, handler, uuid, } as unknown as BusSubscriber this.logging.verbose(`Creating WebSocket subscriber ${uuid}...`) this.logging.verbose(subscriber) this.subscriptions.push(subscriber) return { unsubscribe: () => { this.subscriptions = this.subscriptions.where('uuid', '!=', uuid) }, } } protected async onMessage(message: string): Promise { const payload = await this.serial.decodeJSON(message) // FIXME validation const listeners = await this.subscriptions .where('eventName', '=', payload.eventName) // If configured, re-load the session data since it may have changed outside the // current socket's request. if ( this.shouldLoadSessionOnEvent && listeners.isNotEmpty() ) { await this.session.load() } await listeners.awaitMapCall('handler', payload) // Persist any changes to the session for other requests. if ( this.shouldLoadSessionOnEvent && listeners.isNotEmpty() ) { await this.session.persist() } } up(): void { const resource = new AsyncResource('WebSocketBus', { triggerAsyncId: executionAsyncId(), requireManualDestroy: false, }) this.ws.on('message', async data => { await resource.runInAsyncScope(async () => { this.logging.verbose(`Got data from websocket: ${data}`) try { await this.onMessage(`${data}`) } catch (e: unknown) { if ( e instanceof NoSerializerError ) { this.logging.warn(`Discarding message as no validator could be found to deserialize it: ${data}`) this.push(apiEvent(error('Invalid message format or serializer.'))) } else { throw e } } }) }) this.connected = true } down(): void { this.connected = false } isConnected(): boolean { return this.connected // FIXME: monitor for bad connections } }