import {AwareOfContainerLifecycle, Inject, Injectable, StaticInstantiable} from '../../di' import { EventBus, Event, EventHandler, EventHandlerSubscription, BusSubscriber, EventHandlerReturn, } from './types' import * as WebSocket from 'ws' import {NoSerializerError, Serialization} from './serial/Serialization' import {Logging} from '../../service/Logging' import {Awaitable, Collection, Pipeline, uuid4} from '../../util' import {getEventName} from './getEventName' import {Bus} from './Bus' import {WebSocketCloseEvent} from '../../http/lifecycle/WebSocketCloseEvent' import {apiEvent, error} from '../../http/response/api' import {AsyncResource, executionAsyncId} from 'async_hooks' import {Session} from '../../http/session/Session' import {Config} from '../../service/Config' import {WebSocketHealthCheckEvent} from '../../http/lifecycle/WebSocketHealthCheckEvent' import {Request} from '../../http/lifecycle/Request' @Injectable() export class WebSocketBus implements EventBus, AwareOfContainerLifecycle { awareOfContainerLifecycle = true as const /** * If true, the session will be loaded when an event is received and * persisted after the event's handlers have executed. * * If an event has no handlers, the session will NOT be loaded. * * Use `disableSessionLoad()` to disable. * * @see disableSessionLoad * @protected */ protected shouldLoadSessionOnEvent = true @Inject() protected readonly ws!: WebSocket.WebSocket @Inject() protected readonly request!: Request @Inject() protected readonly bus!: Bus @Inject() protected readonly serial!: Serialization @Inject() protected readonly logging!: Logging @Inject() protected readonly config!: Config public readonly uuid = uuid4() private connected = false /** List of local subscriptions on this bus. */ protected subscriptions: Collection> = new Collection() /** * Disables re-loading & persisting the session when an event with listeners is received. * @see shouldLoadSessionOnEvent */ disableSessionLoad(): this { this.shouldLoadSessionOnEvent = false return this } /** Get a Promise that resolves then the socket closes. */ onClose(): Promise { return new Promise(res => { this.bus.subscribe(WebSocketCloseEvent, event => res(event)) }) } pipe(eventKey: StaticInstantiable, line: Pipeline): Awaitable { return this.subscribe(eventKey, event => line.apply(event)) } async push(event: Event): Promise { this.logging.verbose(`Pushing event to WebSocket: ${event.eventName}`) this.logging.verbose(event) await this.ws.send(await this.serial.encodeJSON(event)) } async subscribe(eventKey: StaticInstantiable, handler: EventHandler): Promise { const uuid = uuid4() const subscriber: BusSubscriber = { eventName: getEventName(eventKey), eventKey, handler, uuid, } as unknown as BusSubscriber this.logging.verbose(`Creating WebSocket subscriber ${uuid}...`) this.logging.verbose(subscriber) this.subscriptions.push(subscriber) return { unsubscribe: () => { this.subscriptions = this.subscriptions.where('uuid', '!=', uuid) }, } } protected async onMessage(message: string): Promise { const payload = await this.serial.decodeJSON(message) // FIXME validation const listeners = await this.subscriptions .where('eventName', '=', payload.eventName) // If configured, re-load the session data since it may have changed outside the // current socket's request. const session = this.request.hasKey(Session) ? this.request.make(Session) : undefined if ( this.shouldLoadSessionOnEvent && session && listeners.isNotEmpty() ) { await session.load() } await listeners.awaitMapCall('handler', payload) // Persist any changes to the session for other requests. if ( this.shouldLoadSessionOnEvent && session && listeners.isNotEmpty() ) { await session.persist() } } async up(): Promise { const resource = new AsyncResource('WebSocketBus', { triggerAsyncId: executionAsyncId(), requireManualDestroy: false, }) this.ws.on('message', async data => { await resource.runInAsyncScope(async () => { this.logging.verbose(`Got data from websocket: ${data}`) try { await this.onMessage(`${data}`) } catch (e: unknown) { if ( e instanceof NoSerializerError ) { this.logging.warn(`Discarding message as no validator could be found to deserialize it: ${data}`) this.push(apiEvent(error('Invalid message format or serializer.'))) } else { throw e } } }) }) this.ws.on('close', () => resource.runInAsyncScope(() => this.bus.push(new WebSocketCloseEvent()))) this.ws.on('error', () => resource.runInAsyncScope(() => this.bus.push(new WebSocketCloseEvent()))) await this.registerHealthCheck() this.connected = true } /** * Set up an interval that fires a `WebSocketHealthCheckEvent` on the request bus. * This can be used, e.g., to check if the user is still logged in. * You can control the interval by setting the `server.socket.healthCheckIntervalSeconds` * config option. Set the option to 0 to disable the interval. * @protected */ protected async registerHealthCheck(): Promise { const interval = this.config.safe('server.socket.healthCheckIntervalSeconds') .or(60) .integer() if ( interval === 0 ) { return } const handle = setInterval(() => { if ( !this.isConnected() ) { return } return this.bus.push(new WebSocketHealthCheckEvent()) }, interval * 1000) await this.bus.subscribe(WebSocketCloseEvent, () => { clearInterval(handle) }) } down(): void { if ( this.connected ) { try { this.ws.close() } catch (e) { this.logging.error('Error closing socket:') this.logging.error(e) } } this.connected = false } isConnected(): boolean { return this.connected // FIXME: monitor for bad connections } }