213 lines
6.9 KiB
TypeScript
213 lines
6.9 KiB
TypeScript
import {AwareOfContainerLifecycle, Inject, Injectable, StaticInstantiable} from '../../di'
|
|
import {
|
|
EventBus,
|
|
Event,
|
|
EventHandler,
|
|
EventHandlerSubscription,
|
|
BusSubscriber,
|
|
EventHandlerReturn,
|
|
} from './types'
|
|
import * as WebSocket from 'ws'
|
|
import {NoSerializerError, Serialization} from './serial/Serialization'
|
|
import {Logging} from '../../service/Logging'
|
|
import {Awaitable, Collection, Pipeline, uuid4} from '../../util'
|
|
import {getEventName} from './getEventName'
|
|
import {Bus} from './Bus'
|
|
import {WebSocketCloseEvent} from '../../http/lifecycle/WebSocketCloseEvent'
|
|
import {apiEvent, error} from '../../http/response/api'
|
|
import {AsyncResource, executionAsyncId} from 'async_hooks'
|
|
import {Session} from '../../http/session/Session'
|
|
import {Config} from '../../service/Config'
|
|
import {WebSocketHealthCheckEvent} from '../../http/lifecycle/WebSocketHealthCheckEvent'
|
|
import {Request} from '../../http/lifecycle/Request'
|
|
|
|
@Injectable()
|
|
export class WebSocketBus implements EventBus, AwareOfContainerLifecycle {
|
|
awareOfContainerLifecycle = true as const
|
|
|
|
/**
|
|
* If true, the session will be loaded when an event is received and
|
|
* persisted after the event's handlers have executed.
|
|
*
|
|
* If an event has no handlers, the session will NOT be loaded.
|
|
*
|
|
* Use `disableSessionLoad()` to disable.
|
|
*
|
|
* @see disableSessionLoad
|
|
* @protected
|
|
*/
|
|
protected shouldLoadSessionOnEvent = true
|
|
|
|
@Inject()
|
|
protected readonly ws!: WebSocket.WebSocket
|
|
|
|
@Inject()
|
|
protected readonly request!: Request
|
|
|
|
@Inject()
|
|
protected readonly bus!: Bus
|
|
|
|
@Inject()
|
|
protected readonly serial!: Serialization
|
|
|
|
@Inject()
|
|
protected readonly logging!: Logging
|
|
|
|
@Inject()
|
|
protected readonly config!: Config
|
|
|
|
public readonly uuid = uuid4()
|
|
|
|
private connected = false
|
|
|
|
/** List of local subscriptions on this bus. */
|
|
protected subscriptions: Collection<BusSubscriber<Event>> = new Collection()
|
|
|
|
/**
|
|
* Disables re-loading & persisting the session when an event with listeners is received.
|
|
* @see shouldLoadSessionOnEvent
|
|
*/
|
|
disableSessionLoad(): this {
|
|
this.shouldLoadSessionOnEvent = false
|
|
return this
|
|
}
|
|
|
|
/** Get a Promise that resolves then the socket closes. */
|
|
onClose(): Promise<WebSocketCloseEvent> {
|
|
return new Promise<WebSocketCloseEvent>(res => {
|
|
this.bus.subscribe(WebSocketCloseEvent, event => res(event))
|
|
})
|
|
}
|
|
|
|
pipe<T extends Event>(eventKey: StaticInstantiable<T>, line: Pipeline<T, EventHandlerReturn>): Awaitable<EventHandlerSubscription> {
|
|
return this.subscribe(eventKey, event => line.apply(event))
|
|
}
|
|
|
|
async push(event: Event): Promise<void> {
|
|
this.logging.verbose(`Pushing event to WebSocket: ${event.eventName}`)
|
|
this.logging.verbose(event)
|
|
|
|
await this.ws.send(await this.serial.encodeJSON(event))
|
|
}
|
|
|
|
async subscribe<T extends Event>(eventKey: StaticInstantiable<T>, handler: EventHandler<T>): Promise<EventHandlerSubscription> {
|
|
const uuid = uuid4()
|
|
const subscriber: BusSubscriber<Event> = {
|
|
eventName: getEventName(eventKey),
|
|
eventKey,
|
|
handler,
|
|
uuid,
|
|
} as unknown as BusSubscriber<Event>
|
|
|
|
this.logging.verbose(`Creating WebSocket subscriber ${uuid}...`)
|
|
this.logging.verbose(subscriber)
|
|
|
|
this.subscriptions.push(subscriber)
|
|
|
|
return {
|
|
unsubscribe: () => {
|
|
this.subscriptions = this.subscriptions.where('uuid', '!=', uuid)
|
|
},
|
|
}
|
|
}
|
|
|
|
protected async onMessage(message: string): Promise<void> {
|
|
const payload = await this.serial.decodeJSON<Event>(message) // FIXME validation
|
|
|
|
const listeners = await this.subscriptions
|
|
.where('eventName', '=', payload.eventName)
|
|
|
|
// If configured, re-load the session data since it may have changed outside the
|
|
// current socket's request.
|
|
const session = this.request.hasKey(Session) ? this.request.make<Session>(Session) : undefined
|
|
if ( this.shouldLoadSessionOnEvent && session && listeners.isNotEmpty() ) {
|
|
await session.load()
|
|
}
|
|
|
|
await listeners.awaitMapCall('handler', payload)
|
|
|
|
// Persist any changes to the session for other requests.
|
|
if ( this.shouldLoadSessionOnEvent && session && listeners.isNotEmpty() ) {
|
|
await session.persist()
|
|
}
|
|
}
|
|
|
|
async up(): Promise<void> {
|
|
const resource = new AsyncResource('WebSocketBus', {
|
|
triggerAsyncId: executionAsyncId(),
|
|
requireManualDestroy: false,
|
|
})
|
|
|
|
this.ws.on('message', async data => {
|
|
await resource.runInAsyncScope(async () => {
|
|
this.logging.verbose(`Got data from websocket: ${data}`)
|
|
try {
|
|
await this.onMessage(`${data}`)
|
|
} catch (e: unknown) {
|
|
if ( e instanceof NoSerializerError ) {
|
|
this.logging.warn(`Discarding message as no validator could be found to deserialize it: ${data}`)
|
|
this.push(apiEvent(error('Invalid message format or serializer.')))
|
|
} else {
|
|
throw e
|
|
}
|
|
}
|
|
})
|
|
})
|
|
|
|
this.ws.on('close', () =>
|
|
resource.runInAsyncScope(() => this.bus.push(new WebSocketCloseEvent())))
|
|
this.ws.on('error', () =>
|
|
resource.runInAsyncScope(() => this.bus.push(new WebSocketCloseEvent())))
|
|
|
|
await this.registerHealthCheck()
|
|
|
|
this.connected = true
|
|
}
|
|
|
|
/**
|
|
* Set up an interval that fires a `WebSocketHealthCheckEvent` on the request bus.
|
|
* This can be used, e.g., to check if the user is still logged in.
|
|
* You can control the interval by setting the `server.socket.healthCheckIntervalSeconds`
|
|
* config option. Set the option to 0 to disable the interval.
|
|
* @protected
|
|
*/
|
|
protected async registerHealthCheck(): Promise<void> {
|
|
const interval = this.config.safe('server.socket.healthCheckIntervalSeconds')
|
|
.or(60)
|
|
.integer()
|
|
|
|
if ( interval === 0 ) {
|
|
return
|
|
}
|
|
|
|
const handle = setInterval(() => {
|
|
if ( !this.isConnected() ) {
|
|
return
|
|
}
|
|
|
|
return this.bus.push(new WebSocketHealthCheckEvent())
|
|
}, interval * 1000)
|
|
|
|
await this.bus.subscribe(WebSocketCloseEvent, () => {
|
|
clearInterval(handle)
|
|
})
|
|
}
|
|
|
|
down(): void {
|
|
if ( this.connected ) {
|
|
try {
|
|
this.ws.close()
|
|
} catch (e) {
|
|
this.logging.error('Error closing socket:')
|
|
this.logging.error(e)
|
|
}
|
|
}
|
|
|
|
this.connected = false
|
|
}
|
|
|
|
isConnected(): boolean {
|
|
return this.connected // FIXME: monitor for bad connections
|
|
}
|
|
}
|