import {Unit, UnitStatus} from '../lifecycle/Unit' import {Inject, Singleton} from '../di' import * as WebSocket from 'ws' import {HTTPServer} from './HTTPServer' import {Logging} from './Logging' import {ErrorWithContext} from '../util' import {Request} from '../http/lifecycle/Request' import {AsyncResource, executionAsyncId} from 'async_hooks' import {HTTPKernel} from '../http/kernel/HTTPKernel' import {InjectSessionHTTPModule} from '../http/kernel/module/InjectSessionHTTPModule' import {ExecuteResolvedRoutePreflightHTTPModule} from '../http/kernel/module/ExecuteResolvedRoutePreflightHTTPModule' import {ParseIncomingBodyHTTPModule} from '../http/kernel/module/ParseIncomingBodyHTTPModule' import {InjectRequestEventBusHTTPModule} from '../http/kernel/module/InjectRequestEventBusHTTPModule' import {RequestLocalStorage} from '../http/RequestLocalStorage' import {MountWebSocketRouteHTTPModule} from '../http/kernel/module/MountWebSocketRouteHTTPModule' import {SetSessionCookieHTTPModule} from '../http/kernel/module/SetSessionCookieHTTPModule' import {MonitorWebSocketConnectionHTTPModule} from '../http/kernel/module/MonitorWebSocketConnectionHTTPModule' import {Bus, WebSocketBus} from '../support/bus' import {WebSocketCloseEvent} from '../http/lifecycle/WebSocketCloseEvent' import {ExecuteResolvedWebSocketHandlerHTTPModule} from '../http/kernel/module/ExecuteResolvedWebSocketHandlerHTTPModule' import {InjectRequestLocale} from '../i18n' @Singleton() export class WebsocketServer extends Unit { @Inject() protected readonly http!: HTTPServer @Inject() protected readonly logging!: Logging @Inject() protected readonly requestLocalStorage!: RequestLocalStorage @Inject() protected readonly bus!: Bus protected kernel?: HTTPKernel protected server?: WebSocket.Server protected getKernel(): HTTPKernel { if ( !this.kernel ) { const kernel = this.app().makeNew(HTTPKernel) SetSessionCookieHTTPModule.register(kernel) InjectSessionHTTPModule.register(kernel) MountWebSocketRouteHTTPModule.register(kernel) ExecuteResolvedRoutePreflightHTTPModule.register(kernel) ExecuteResolvedWebSocketHandlerHTTPModule.register(kernel) ParseIncomingBodyHTTPModule.register(kernel) InjectRequestEventBusHTTPModule.register(kernel) MonitorWebSocketConnectionHTTPModule.register(kernel) InjectRequestLocale.register(kernel) this.kernel = kernel } return this.kernel } public async up(): Promise { // Make sure the HTTP server is started. Otherwise, this is going to fail anyway if ( this.http.status !== UnitStatus.Started ) { throw new ErrorWithContext('Cannot start WebsocketServer without HTTPServer.', { suggestion: 'Make sure the HTTPServer is registered in your Units.extollo.ts file, and it is listed before the WebsocketServer.', }) } // Start the websocket server this.logging.info('Starting WebSocket server...') this.server = new WebSocket.Server({ server: this.http.getServer(), }) // Turns out that the websocket server was causing context loss bc of the way // its callback structure works. So, to allow us to access the global container // from w/in the server handler, we need to give Node some guidance on which // async context we're using. const resource = new AsyncResource('WebsocketServer', { triggerAsyncId: executionAsyncId(), requireManualDestroy: false, }) // Register the websocket handler this.server.on('connection', (ws, request) => { resource.runInAsyncScope(() => { this.logging.info('Got WebSocket connection! ' + request.method) const extolloReq = new Request(request) this.requestLocalStorage.run(extolloReq, async () => { this.logging.info(`WebSocket connection: ${extolloReq.path}`) // Register the websocket with the request container extolloReq.registerSingletonInstance(WebSocket.WebSocket, ws) // Register the websocket bus with the request container const wsBus = extolloReq.makeNew(WebSocketBus) await wsBus.up() extolloReq.registerSingletonInstance(WebSocketBus, wsBus) // Register the listener to clean up this connection when it dies extolloReq.onResolve(Bus) .then(bus => bus.subscribe(WebSocketCloseEvent, () => { extolloReq.destroy() ws.terminate() })) // Run the request through the kernel to get the setup await this.getKernel().handle(extolloReq) }) }) }) } public down(): Promise { // Stop the websocket server, if it exists return new Promise(res => { if ( this.server ) { // Since all the request busses connect to the global app bus, // we can broadcast a global close event to get all in-progress // connections to close their sockets. const event = new WebSocketCloseEvent() event.shouldBroadcast = true this.bus.push(event) .then(() => this.server?.close?.(() => res())) } }) } }