const WebSocketStream = require('websocket-stream') const CombinedStream = require('combined-stream') const { Buffer } = require('buffer') const config = require('./config') class Streamer { constructor(socket_uuid, node_uuid, length = 4096, position = 0, descriptor = undefined) { this.socket_uuid = socket_uuid this.node_uuid = node_uuid this.length = length this.position = position this.descriptor = descriptor } write(buffer) { const write_stream = WebSocketStream(`ws://${config.config.data_server}/?socket_uuid=${this.socket_uuid}&node_uuid=${this.node_uuid}&length=${this.length}&position=${this.position}&writing_file=true&descriptor=${this.descriptor}`, { perMessageDeflate: false, binary: true, }) const combined_stream = CombinedStream.create() combined_stream.append(buffer.toString('base64')) combined_stream.pipe(write_stream) return new Promise(res => { write_stream.socket.on('close', () => { res() }) }) } stream() { this.ws = WebSocketStream(`ws://${config.config.data_server}/?socket_uuid=${this.socket_uuid}&node_uuid=${this.node_uuid}&length=${this.length}&position=${this.position}`, { perMessageDeflate: false, binary: true, }) return this.ws } async buffer() { return this._bufferStream(this.stream()) } _bufferStream(stream) { const chunks = [] return new Promise((resolve, reject) => { stream.on('data', chunk => chunks.push(chunk)) stream.on('error', reject) stream.on('end', () => resolve(Buffer.concat(chunks))) }) } close() { this.ws.close() } } module.exports = exports = Streamer