You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
97 lines
3.3 KiB
97 lines
3.3 KiB
import {HTTPKernelModule} from '../HTTPKernelModule'
|
|
import {HTTPKernel} from '../HTTPKernel'
|
|
import {Request} from '../../lifecycle/Request'
|
|
import {Inject, Injectable} from '../../../di'
|
|
import {Config} from '../../../service/Config'
|
|
import {setInterval} from 'timers'
|
|
import {Logging} from '../../../service/Logging'
|
|
import {WebSocketCloseEvent} from '../../lifecycle/WebSocketCloseEvent'
|
|
import Timeout = NodeJS.Timeout;
|
|
import {Bus} from '../../../support/bus'
|
|
import * as WebSockets from 'ws'
|
|
import {Maybe} from '../../../util'
|
|
|
|
@Injectable()
|
|
export class MonitorWebSocketConnectionHTTPModule extends HTTPKernelModule {
|
|
@Inject()
|
|
protected readonly config!: Config
|
|
|
|
@Inject()
|
|
protected readonly logging!: Logging
|
|
|
|
public static register(kernel: HTTPKernel): void {
|
|
kernel.register(this).core()
|
|
}
|
|
|
|
async apply(request: Request): Promise<Request> {
|
|
const ws = request.make<WebSockets.WebSocket>(WebSockets.WebSocket)
|
|
|
|
// Time to wait between pings
|
|
const pollIntervalMs = this.config.safe('server.socket.pollIntervalMs')
|
|
.or(30000)
|
|
.integer()
|
|
|
|
// Time to wait for a response
|
|
const pollResponseTimeoutMs = this.config.safe('server.socket.pollResponseTimeoutMs')
|
|
.or(3000)
|
|
.integer()
|
|
|
|
// Max # of failures before the connection is closed
|
|
const maxFailedPolls = this.config.safe('server.socket.maxFailedPolls')
|
|
.or(5)
|
|
.integer()
|
|
|
|
let failedPolls = 0
|
|
let interval: Maybe<Timeout> = undefined
|
|
|
|
await new Promise<void>(res => {
|
|
let gotResponse = false
|
|
|
|
// Listen for pong responses
|
|
ws.on('pong', () => {
|
|
this.logging.verbose('Got pong response from socket.')
|
|
gotResponse = true
|
|
})
|
|
|
|
// Listen for close event
|
|
ws.on('close', () => {
|
|
this.logging.debug('Got close event from socket.')
|
|
res()
|
|
})
|
|
|
|
interval = setInterval(async () => {
|
|
// Every interval, send a ping request and set a timeout for the response
|
|
this.logging.verbose('Sending ping request to socket...')
|
|
gotResponse = false
|
|
ws.ping()
|
|
|
|
await new Promise<void>(res2 => setTimeout(res2, pollResponseTimeoutMs))
|
|
|
|
// If no pong response is received before the timeout occurs, tick the # of failed response
|
|
if ( !gotResponse ) {
|
|
this.logging.verbose('Socket failed to respond in time.')
|
|
failedPolls += 1
|
|
} else {
|
|
// Otherwise, reset the failure counter
|
|
failedPolls = 0
|
|
}
|
|
|
|
// Once the failed responses exceeds the threshold, kill the connection
|
|
if ( failedPolls > maxFailedPolls ) {
|
|
this.logging.debug('Socket exceeded maximum # of failed pings. Killing.')
|
|
res()
|
|
}
|
|
}, pollIntervalMs)
|
|
})
|
|
|
|
if ( interval ) {
|
|
clearInterval(interval)
|
|
}
|
|
|
|
// Tell the server to close the socket connection
|
|
const bus = request.make<Bus>(Bus)
|
|
await bus.push(new WebSocketCloseEvent())
|
|
return request
|
|
}
|
|
}
|