From 86f274e6d2861b00e4375b09cd99b76deb38e8f9 Mon Sep 17 00:00:00 2001 From: garrettmills Date: Sun, 29 Nov 2020 11:53:55 -0600 Subject: [PATCH] Fix file write streaming logic --- app/ServerUnit.js | 33 ++++--------------- app/ws/Socket.js | 2 +- app/ws/routes/fs.chmod.js | 1 - app/ws/routes/fs.flush.js | 60 ++++++++++++++++------------------- app/ws/routes/fs.release.js | 15 +++++---- app/ws/routes/stream.getfd.js | 1 + 6 files changed, 46 insertions(+), 66 deletions(-) diff --git a/app/ServerUnit.js b/app/ServerUnit.js index ceae555..3b2767a 100644 --- a/app/ServerUnit.js +++ b/app/ServerUnit.js @@ -57,54 +57,35 @@ class ServerUnit extends Unit { } async on_file_write_stream(stream, request) { - let { socket_uuid, node_uuid, length = 4096, position = 0 } = url.parse(request.url, true).query + let { socket_uuid, node_uuid, length = 4096, position = 0, descriptor } = url.parse(request.url, true).query if ( typeof position === 'string' ) position = parseInt(position) if ( typeof length === 'string' ) length = parseInt(length) const socket = this.sockets.find(x => x.uuid === socket_uuid) - const Node = this.models.get('fs:Node') - const node = await Node.findOne({ - uuid: node_uuid, - deleted: false, - descriptor_type: NodeDescriptorType.File, - }) - if ( !socket.session.temp_write_files ) socket.session.temp_write_files = {} + if ( !socket.session.temp_write_files[descriptor] ) socket.session.temp_write_files[descriptor] = [] - const placeholder = socket.session.temp_write_files?.[node.uuid] || await tmp.file() - socket.session.temp_write_files[node.uuid] = placeholder - - console.log('Upload placeholder:', placeholder) - - const old_file = await node.uploaded_file() - if ( old_file ) { - if ( position === 0 ) { - // This is a new write, so delete the old file - await old_file.delete() - delete node.uploaded_file_id - } else { - await this.upload.provider().download_file(old_file, placeholder.path) - } - } + const placeholder = await tmp.file() + socket.session.temp_write_files[descriptor].push(placeholder) + placeholder.position = position + placeholder.length = length const encoded_buffer = await this._bufferStream(stream) const decoded_buffer = Buffer.from(encoded_buffer.toString(), 'base64') const combined_stream = CombinedStream.create() combined_stream.append(decoded_buffer) - combined_stream.pipe(fs.createWriteStream(placeholder.path, { flags: 'r+', start: position })) + combined_stream.pipe(fs.createWriteStream(placeholder.path)) } _bufferStream(stream) { const chunks = [] return new Promise((resolve, reject) => { stream.on('data', chunk => { - console.log('stream data', chunk) chunks.push(chunk) }) stream.on('error', reject) stream.on('end', () => { - console.log('stream end!') resolve(Buffer.concat(chunks)) }) }) diff --git a/app/ws/Socket.js b/app/ws/Socket.js index 3900dd2..bbbf832 100644 --- a/app/ws/Socket.js +++ b/app/ws/Socket.js @@ -39,7 +39,7 @@ class Socket extends Injectable { } async on_message(msg) { - this.output.info(msg) + this.output.debug(msg) const message = new Message(msg) const response = new Message() message.socket = this diff --git a/app/ws/routes/fs.chmod.js b/app/ws/routes/fs.chmod.js index 277e6b9..204ff38 100644 --- a/app/ws/routes/fs.chmod.js +++ b/app/ws/routes/fs.chmod.js @@ -1,7 +1,6 @@ const { Errors } = require('../../shared') const { NodeDescriptorType } = require('../../enum') -// TODO should this return size info for the temp write file? module.exports = exports = async (message, di) => { const Node = di.models.get('fs:Node') const { path, mode } = message.data() diff --git a/app/ws/routes/fs.flush.js b/app/ws/routes/fs.flush.js index 92c281c..759e824 100644 --- a/app/ws/routes/fs.flush.js +++ b/app/ws/routes/fs.flush.js @@ -1,42 +1,24 @@ const { Errors } = require('../../shared') const { NodeDescriptorType } = require('../../enum') const fs = require('fs') - -const exists = async file => { - return new Promise(res => { - fs.promises.stat(file).then(() => res(true)).catch(() => res(false)) - }) -} +const tmp = require('tmp-promise') +const CombinedStream = require('combined-stream') module.exports = exports = async (message, di) => { -console.log('[FLUSH]') const Node = di.models.get('fs:Node') const { descriptor } = message.data() const node_uuid = message.socket.session.file_descriptors?.[descriptor] -console.log({node_uuid}) if ( !node_uuid ) { return message.send_response( message.fresh().error(Errors.NodeDoesNotExist) ) } - - - // Check if we have a temporary file which has been written to. - // If so, we should "flush" the temporary file to the proper storage. - const placeholder = message.socket.session.temp_write_files?.[node_uuid] -console.log({placeholder}) - if ( !placeholder ) { - return message.send_response( - message.fresh() - ) - } - - let stat; - try { - stat = await fs.promises.stat(placeholder.path) - } catch (e) { + // Check if we have a temporary files which have been written to. + // If so, we should "flush" the temporary files to the proper storage. + const placeholders = message.socket.session.temp_write_files?.[descriptor] + if ( !placeholders ) { return message.send_response( message.fresh() ) @@ -48,7 +30,6 @@ console.log({placeholder}) descriptor_type: NodeDescriptorType.File, }) -console.log({node}) if ( !node ) { return message.send_response( message.fresh().error(Errors.NodeDoesNotExist) @@ -56,30 +37,45 @@ console.log({node}) } const existing_file = await node.uploaded_file() -console.log({existing_file}) if ( existing_file ) { // This is a new write, so delete the old file await existing_file.delete() delete node.uploaded_file_id } + // Sort the placeholders by the byte starting position + placeholders.sort((a, b) => { + if ( a.position < b.position ) return -1 + if ( a.position > b.position ) return 1 + return 0 + }) + + // Combine them into a single stream + const stream = CombinedStream.create() + const combined_file = await tmp.file() + + for ( const placeholder of placeholders ) { + stream.append(fs.createReadStream(placeholder.path)) + } + + const pipe = stream.pipe(fs.createWriteStream(combined_file.path)) + + await new Promise(res => pipe.on('finish', res)) + // Store the temporary file const new_file = await di.upload.provider().store({ - temp_path: placeholder.path, + temp_path: combined_file.path, original_name: node.pied_name, mime_type: 'application/octet-stream', // TODO determine from file extension? }) -console.log({new_file}) + const stat = await fs.promises.stat(combined_file.path) + node.uploaded_file_id = new_file.id node.size = stat.size await node.save() -console.log({saved_node: node}) - message.send_response( message.fresh() ) - -console.log('[/FLUSH]') } diff --git a/app/ws/routes/fs.release.js b/app/ws/routes/fs.release.js index 6c9fc3d..4b0d46f 100644 --- a/app/ws/routes/fs.release.js +++ b/app/ws/routes/fs.release.js @@ -7,7 +7,6 @@ const exists = async file => { } module.exports = exports = async (message, di) => { -console.log('[RELEASE]') const { descriptor } = message.data() const node_uuid = message.socket.session.file_descriptors?.[descriptor] @@ -15,14 +14,18 @@ console.log('[RELEASE]') delete message.socket.session.file_descriptors[descriptor] } - const placeholder = message.socket.session.temp_write_files?.[node_uuid] - if ( placeholder && (await exists(placeholder.path)) ) { - await fs.promises.unlink(placeholder.path) + const placeholders = message.socket.session.temp_write_files?.[descriptor] + if ( placeholders ) { + for ( const placeholder of placeholders ) { + if ( await exists(placeholder.path) ) { + await fs.promises.unlink(placeholder.path) + } + } + + delete message.socket.session.temp_write_files[descriptor] } message.send_response( message.fresh() ) - -console.log('[/RELEASE]') } diff --git a/app/ws/routes/stream.getfd.js b/app/ws/routes/stream.getfd.js index 15a0dea..859d91e 100644 --- a/app/ws/routes/stream.getfd.js +++ b/app/ws/routes/stream.getfd.js @@ -26,6 +26,7 @@ module.exports = exports = async (message, di) => { const data = { node_uuid: node.uuid, socket_uuid: message.socket.uuid, + descriptor, } message.send_response(