const WebSocket = require('ws') const http = require('http') const url = require('url') const fs = require('fs') const tmp = require('tmp-promise') const websocketStream = require('websocket-stream') const StreamSkip = require('stream-skip') const StreamTake = require('./ws/StreamTake') const Socket = require('./ws/Socket') const { Unit } = require('libflitter') const { NodeDescriptorType } = require('./enum') const { Buffer } = require('buffer') const { Readable } = require('stream') class ServerUnit extends Unit { static get services() { return [...super.services, 'configs', 'app', 'models', 'upload'] } sockets = [] static name() { return 'server' } name() { return 'server' } async go(app) { this.server = new WebSocket.Server({ port: this.configs.get('server.port') }) this.server.on('connection', socket => { socket = this.app.di().make(Socket, socket) this.sockets.push(socket) socket.ping() }) this.stream_server = http.createServer().listen(this.configs.get('server.stream_port')) this.stream_socket = websocketStream.createServer({ server: this.stream_server, perMessageDeflate: false, binary: true, }, (stream, request) => { const query = url.parse(request.url, true).query if ( !query.writing_file ) this.on_file_stream(stream, request) else this.on_file_write_stream(stream, request) }) await new Promise(res => { process.on('SIGINT', res) }) } async on_file_write_stream(stream, request) { let { socket_uuid, node_uuid, length = 4096, position = 0 } = url.parse(request.url, true).query if ( typeof position === 'string' ) position = parseInt(position) if ( typeof length === 'string' ) length = parseInt(length) const socket = this.sockets.find(x => x.uuid === socket_uuid) const Node = this.models.get('fs:Node') const node = await Node.findOne({ uuid: node_uuid, deleted: false, descriptor_type: NodeDescriptorType.File, }) if ( !socket.session.temp_write_files ) socket.session.temp_write_files = {} const placeholder = socket.session.temp_write_files?.[node.uuid] || await tmp.file() socket.session.temp_write_files[node.uuid] = placeholder console.log('Upload placeholder!', placeholder) const old_file = await node.uploaded_file() if ( old_file ) { if ( position === 0 ) { // This is a new write, so delete the old file await old_file.delete() delete node.uploaded_file_id } else { await this.upload.provider().download_file(old_file, placeholder.path) } } console.log('write stream', stream) console.log('write data', { placeholder, position, length }) stream.pipe(fs.createWriteStream(placeholder.path, { start: position })) } _bufferStream(stream) { const chunks = [] return new Promise((resolve, reject) => { stream.on('data', chunk => { console.log('stream data', chunk) chunks.push(chunk) }) stream.on('error', reject) stream.on('end', () => { console.log('stream end!') resolve(Buffer.concat(chunks)) }) }) } async on_file_stream(stream, request) { let { socket_uuid, node_uuid, length = 4096, position = 0 } = url.parse(request.url, true).query if ( typeof position === 'string' ) position = parseInt(position) if ( typeof length === 'string' ) length = parseInt(length) // const socket = this.sockets.find(x => x.uuid === socket_uuid) const Node = this.models.get('fs:Node') const node = await Node.findOne({ uuid: node_uuid, deleted: false, descriptor_type: NodeDescriptorType.File, }) const file = await node.uploaded_file() if ( file ) { const readable = this.upload.provider().read_stream(file) const slicer = new StreamSkip({ skip: position }) const taker = new StreamTake({ take: length }) readable.pipe(slicer).pipe(taker).pipe(stream) } else { // If no data was written, just return an empty file const empty = new Readable() empty.push('') empty.push(null) empty.pipe(stream) } } async cleanup(app) { this.server.close() this.stream_server.close() this.stream_socket.close() } } module.exports = exports = ServerUnit