You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
lib/src/service/WebsocketServer.ts

131 lines
5.6 KiB

import {Unit, UnitStatus} from '../lifecycle/Unit'
import {Inject, Singleton} from '../di'
import * as WebSocket from 'ws'
import {HTTPServer} from './HTTPServer'
import {Logging} from './Logging'
import {ErrorWithContext} from '../util'
import {Request} from '../http/lifecycle/Request'
import {AsyncResource, executionAsyncId} from 'async_hooks'
import {HTTPKernel} from '../http/kernel/HTTPKernel'
import {InjectSessionHTTPModule} from '../http/kernel/module/InjectSessionHTTPModule'
import {ExecuteResolvedRoutePreflightHTTPModule} from '../http/kernel/module/ExecuteResolvedRoutePreflightHTTPModule'
import {ParseIncomingBodyHTTPModule} from '../http/kernel/module/ParseIncomingBodyHTTPModule'
import {InjectRequestEventBusHTTPModule} from '../http/kernel/module/InjectRequestEventBusHTTPModule'
import {RequestLocalStorage} from '../http/RequestLocalStorage'
import {MountWebSocketRouteHTTPModule} from '../http/kernel/module/MountWebSocketRouteHTTPModule'
import {SetSessionCookieHTTPModule} from '../http/kernel/module/SetSessionCookieHTTPModule'
import {MonitorWebSocketConnectionHTTPModule} from '../http/kernel/module/MonitorWebSocketConnectionHTTPModule'
import {Bus, WebSocketBus} from '../support/bus'
import {WebSocketCloseEvent} from '../http/lifecycle/WebSocketCloseEvent'
import {ExecuteResolvedWebSocketHandlerHTTPModule} from '../http/kernel/module/ExecuteResolvedWebSocketHandlerHTTPModule'
import {InjectRequestLocale} from '../i18n'
@Singleton()
export class WebsocketServer extends Unit {
@Inject()
protected readonly http!: HTTPServer
@Inject()
protected readonly logging!: Logging
@Inject()
protected readonly requestLocalStorage!: RequestLocalStorage
@Inject()
protected readonly bus!: Bus
protected kernel?: HTTPKernel
protected server?: WebSocket.Server
protected getKernel(): HTTPKernel {
if ( !this.kernel ) {
const kernel = this.app().makeNew<HTTPKernel>(HTTPKernel)
SetSessionCookieHTTPModule.register(kernel)
InjectSessionHTTPModule.register(kernel)
MountWebSocketRouteHTTPModule.register(kernel)
ExecuteResolvedRoutePreflightHTTPModule.register(kernel)
ExecuteResolvedWebSocketHandlerHTTPModule.register(kernel)
ParseIncomingBodyHTTPModule.register(kernel)
InjectRequestEventBusHTTPModule.register(kernel)
MonitorWebSocketConnectionHTTPModule.register(kernel)
InjectRequestLocale.register(kernel)
this.kernel = kernel
}
return this.kernel
}
public async up(): Promise<void> {
// Make sure the HTTP server is started. Otherwise, this is going to fail anyway
if ( this.http.status !== UnitStatus.Started ) {
throw new ErrorWithContext('Cannot start WebsocketServer without HTTPServer.', {
suggestion: 'Make sure the HTTPServer is registered in your Units.extollo.ts file, and it is listed before the WebsocketServer.',
})
}
// Start the websocket server
this.logging.info('Starting WebSocket server...')
this.server = new WebSocket.Server<WebSocket.WebSocket>({
server: this.http.getServer(),
})
// Turns out that the websocket server was causing context loss bc of the way
// its callback structure works. So, to allow us to access the global container
// from w/in the server handler, we need to give Node some guidance on which
// async context we're using.
const resource = new AsyncResource('WebsocketServer', {
triggerAsyncId: executionAsyncId(),
requireManualDestroy: false,
})
// Register the websocket handler
this.server.on('connection', (ws, request) => {
resource.runInAsyncScope(() => {
this.logging.info('Got WebSocket connection! ' + request.method)
const extolloReq = new Request(request)
this.requestLocalStorage.run(extolloReq, async () => {
this.logging.info(`WebSocket connection: ${extolloReq.path}`)
// Register the websocket with the request container
extolloReq.registerSingletonInstance(WebSocket.WebSocket, ws)
// Register the websocket bus with the request container
const wsBus = extolloReq.makeNew<WebSocketBus>(WebSocketBus)
await wsBus.up()
extolloReq.registerSingletonInstance(WebSocketBus, wsBus)
// Register the listener to clean up this connection when it dies
extolloReq.onResolve<Bus>(Bus)
.then(bus => bus.subscribe(WebSocketCloseEvent, () => {
extolloReq.destroy()
ws.terminate()
}))
// Run the request through the kernel to get the setup
await this.getKernel().handle(extolloReq)
})
})
})
}
public down(): Promise<void> {
// Stop the websocket server, if it exists
return new Promise(res => {
if ( this.server ) {
// Since all the request busses connect to the global app bus,
// we can broadcast a global close event to get all in-progress
// connections to close their sockets.
const event = new WebSocketCloseEvent()
event.shouldBroadcast = true
this.bus.push(event)
.then(() => this.server?.close?.(() => res()))
}
})
}
}