import {HTTPKernelModule} from '../HTTPKernelModule' import {HTTPKernel} from '../HTTPKernel' import {Request} from '../../lifecycle/Request' import {Inject, Injectable} from '../../../di' import {Config} from '../../../service/Config' import {setInterval} from 'timers' import {Logging} from '../../../service/Logging' import {WebSocketCloseEvent} from '../../lifecycle/WebSocketCloseEvent' import Timeout = NodeJS.Timeout; import {Bus} from '../../../support/bus' import * as WebSockets from 'ws' import {Maybe} from '../../../util' @Injectable() export class MonitorWebSocketConnectionHTTPModule extends HTTPKernelModule { @Inject() protected readonly config!: Config @Inject() protected readonly logging!: Logging public static register(kernel: HTTPKernel): void { kernel.register(this).core() } async apply(request: Request): Promise { const ws = request.make(WebSockets.WebSocket) // Time to wait between pings const pollIntervalMs = this.config.safe('server.socket.pollIntervalMs') .or(30000) .integer() // Time to wait for a response const pollResponseTimeoutMs = this.config.safe('server.socket.pollResponseTimeoutMs') .or(3000) .integer() // Max # of failures before the connection is closed const maxFailedPolls = this.config.safe('server.socket.maxFailedPolls') .or(5) .integer() let failedPolls = 0 let interval: Maybe = undefined await new Promise(res => { let gotResponse = false // Listen for pong responses ws.on('pong', () => { this.logging.verbose('Got pong response from socket.') gotResponse = true }) // Listen for close event ws.on('close', () => { this.logging.debug('Got close event from socket.') res() }) interval = setInterval(async () => { // Every interval, send a ping request and set a timeout for the response this.logging.verbose('Sending ping request to socket...') gotResponse = false ws.ping() await new Promise(res2 => setTimeout(res2, pollResponseTimeoutMs)) // If no pong response is received before the timeout occurs, tick the # of failed response if ( !gotResponse ) { this.logging.verbose('Socket failed to respond in time.') failedPolls += 1 } else { // Otherwise, reset the failure counter failedPolls = 0 } // Once the failed responses exceeds the threshold, kill the connection if ( failedPolls > maxFailedPolls ) { this.logging.debug('Socket exceeded maximum # of failed pings. Killing.') res() } }, pollIntervalMs) }) if ( interval ) { clearInterval(interval) } // Tell the server to close the socket connection const bus = request.make(Bus) await bus.push(new WebSocketCloseEvent()) return request } }