You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
lib/src/support/bus/WebSocketBus.ts

124 lines
4.0 KiB

import {AwareOfContainerLifecycle, Container, Inject, Injectable, StaticInstantiable} from '../../di'
import {
EventBus,
Event,
EventHandler,
EventHandlerSubscription,
BusSubscriber,
EventHandlerReturn,
} from './types'
import * as WebSocket from 'ws'
import {NoSerializerError, Serialization} from './serial/Serialization'
import {Logging} from '../../service/Logging'
import {Awaitable, Collection, Pipeline, uuid4} from '../../util'
import {getEventName} from './getEventName'
import {Bus} from './Bus'
import {WebSocketCloseEvent} from '../../http/lifecycle/WebSocketCloseEvent'
import {apiEvent, error} from '../../http/response/api'
import {AsyncResource, executionAsyncId} from 'async_hooks'
@Injectable()
export class WebSocketBus implements EventBus, AwareOfContainerLifecycle {
awareOfContainerLifecycle: true = true
@Inject()
protected readonly ws!: WebSocket.WebSocket
@Inject()
protected readonly bus!: Bus
@Inject()
protected readonly serial!: Serialization
@Inject()
protected readonly logging!: Logging
public readonly uuid = uuid4()
private connected = false
/** List of local subscriptions on this bus. */
protected subscriptions: Collection<BusSubscriber<Event>> = new Collection()
/** Get a Promise that resolves then the socket closes. */
onClose(): Promise<WebSocketCloseEvent> {
return new Promise<WebSocketCloseEvent>(res => {
this.bus.subscribe(WebSocketCloseEvent, event => res(event))
})
}
pipe<T extends Event>(eventKey: StaticInstantiable<T>, line: Pipeline<T, EventHandlerReturn>): Awaitable<EventHandlerSubscription> {
return this.subscribe(eventKey, event => line.apply(event))
}
async push(event: Event): Promise<void> {
this.logging.verbose(`Pushing event to WebSocket: ${event.eventName}`)
this.logging.verbose(event)
await this.ws.send(await this.serial.encodeJSON(event))
}
async subscribe<T extends Event>(eventKey: StaticInstantiable<T>, handler: EventHandler<T>): Promise<EventHandlerSubscription> {
const uuid = uuid4()
const subscriber: BusSubscriber<Event> = {
eventName: getEventName(eventKey),
eventKey,
handler,
uuid,
} as unknown as BusSubscriber<Event>
this.logging.verbose(`Creating WebSocket subscriber ${uuid}...`)
this.logging.verbose(subscriber)
this.subscriptions.push(subscriber)
return {
unsubscribe: () => {
this.subscriptions = this.subscriptions.where('uuid', '!=', uuid)
},
}
}
protected async onMessage(message: string): Promise<void> {
const payload = await this.serial.decodeJSON<Event>(message) // FIXME validation
await this.subscriptions
.where('eventName', '=', payload.eventName)
.awaitMapCall('handler', payload)
}
up(): void {
const resource = new AsyncResource('WebSocketBus', {
triggerAsyncId: executionAsyncId(),
requireManualDestroy: false,
})
this.ws.on('message', async data => {
await resource.runInAsyncScope(async () => {
this.logging.verbose(`Got data from websocket: ${data}`)
try {
Container.getContainer()
await this.onMessage(`${data}`)
} catch (e: unknown) {
if ( e instanceof NoSerializerError ) {
this.logging.warn(`Discarding message as no validator could be found to deserialize it: ${data}`)
this.push(apiEvent(error('Invalid message format or serializer.')))
} else {
throw e
}
}
})
})
this.connected = true
}
down(): void {
this.connected = false
}
isConnected(): boolean {
return this.connected // FIXME: monitor for bad connections
}
}