const WebSocket = require('ws') const http = require('http') const url = require('url') const fs = require('fs') const tmp = require('tmp-promise') const websocketStream = require('websocket-stream') const StreamSkip = require('stream-skip') const StreamTake = require('./ws/StreamTake') const Socket = require('./ws/Socket') const { Unit } = require('libflitter') const { NodeDescriptorType } = require('./enum') const CombinedStream = require('combined-stream') const { Buffer } = require('buffer') const { Readable } = require('stream') class ServerUnit extends Unit { static get services() { return [...super.services, 'configs', 'app', 'models', 'upload'] } sockets = [] static name() { return 'server' } name() { return 'server' } async go(app) { this.server = new WebSocket.Server({ port: this.configs.get('server.port') }) this.server.on('connection', socket => { socket = this.app.di().make(Socket, socket) this.sockets.push(socket) socket.ping() }) this.stream_server = http.createServer().listen(this.configs.get('server.stream_port')) this.stream_socket = websocketStream.createServer({ server: this.stream_server, perMessageDeflate: false, binary: true, }, (stream, request) => { const query = url.parse(request.url, true).query if ( !query.writing_file ) this.on_file_stream(stream, request) else this.on_file_write_stream(stream, request) }) await new Promise(res => { process.on('SIGINT', res) }) } async on_file_write_stream(stream, request) { let { socket_uuid, node_uuid, length = 4096, position = 0, descriptor } = url.parse(request.url, true).query if ( typeof position === 'string' ) position = parseInt(position) if ( typeof length === 'string' ) length = parseInt(length) const socket = this.sockets.find(x => x.uuid === socket_uuid) if ( !socket.session.temp_write_files ) socket.session.temp_write_files = {} if ( !socket.session.temp_write_files[descriptor] ) socket.session.temp_write_files[descriptor] = [] const placeholder = await tmp.file() socket.session.temp_write_files[descriptor].push(placeholder) placeholder.position = position placeholder.length = length const encoded_buffer = await this._bufferStream(stream) const decoded_buffer = Buffer.from(encoded_buffer.toString(), 'base64') const combined_stream = CombinedStream.create() combined_stream.append(decoded_buffer) combined_stream.pipe(fs.createWriteStream(placeholder.path)) } _bufferStream(stream) { const chunks = [] return new Promise((resolve, reject) => { stream.on('data', chunk => { chunks.push(chunk) }) stream.on('error', reject) stream.on('end', () => { resolve(Buffer.concat(chunks)) }) }) } async on_file_stream(stream, request) { let { socket_uuid, node_uuid, length = 4096, position = 0 } = url.parse(request.url, true).query if ( typeof position === 'string' ) position = parseInt(position) if ( typeof length === 'string' ) length = parseInt(length) // const socket = this.sockets.find(x => x.uuid === socket_uuid) const Node = this.models.get('fs:Node') const node = await Node.findOne({ uuid: node_uuid, deleted: false, descriptor_type: NodeDescriptorType.File, }) const file = await node.uploaded_file() if ( file ) { const readable = this.upload.provider().read_stream(file) const slicer = new StreamSkip({ skip: position }) const taker = new StreamTake({ take: length }) readable.pipe(slicer).pipe(taker).pipe(stream) } else { // If no data was written, just return an empty file const empty = new Readable() empty.push('') empty.push(null) empty.pipe(stream) } } async cleanup(app) { this.server.close() this.stream_server.close() this.stream_socket.close() } } module.exports = exports = ServerUnit