Fix file write streaming logic

This commit is contained in:
Garrett Mills 2020-11-29 11:53:55 -06:00
parent 63d105c752
commit 86f274e6d2
Signed by: garrettmills
GPG Key ID: D2BF5FBA8298F246
6 changed files with 46 additions and 66 deletions

View File

@ -57,54 +57,35 @@ class ServerUnit extends Unit {
} }
async on_file_write_stream(stream, request) { async on_file_write_stream(stream, request) {
let { socket_uuid, node_uuid, length = 4096, position = 0 } = url.parse(request.url, true).query let { socket_uuid, node_uuid, length = 4096, position = 0, descriptor } = url.parse(request.url, true).query
if ( typeof position === 'string' ) position = parseInt(position) if ( typeof position === 'string' ) position = parseInt(position)
if ( typeof length === 'string' ) length = parseInt(length) if ( typeof length === 'string' ) length = parseInt(length)
const socket = this.sockets.find(x => x.uuid === socket_uuid) const socket = this.sockets.find(x => x.uuid === socket_uuid)
const Node = this.models.get('fs:Node')
const node = await Node.findOne({
uuid: node_uuid,
deleted: false,
descriptor_type: NodeDescriptorType.File,
})
if ( !socket.session.temp_write_files ) socket.session.temp_write_files = {} if ( !socket.session.temp_write_files ) socket.session.temp_write_files = {}
if ( !socket.session.temp_write_files[descriptor] ) socket.session.temp_write_files[descriptor] = []
const placeholder = socket.session.temp_write_files?.[node.uuid] || await tmp.file() const placeholder = await tmp.file()
socket.session.temp_write_files[node.uuid] = placeholder socket.session.temp_write_files[descriptor].push(placeholder)
placeholder.position = position
console.log('Upload placeholder:', placeholder) placeholder.length = length
const old_file = await node.uploaded_file()
if ( old_file ) {
if ( position === 0 ) {
// This is a new write, so delete the old file
await old_file.delete()
delete node.uploaded_file_id
} else {
await this.upload.provider().download_file(old_file, placeholder.path)
}
}
const encoded_buffer = await this._bufferStream(stream) const encoded_buffer = await this._bufferStream(stream)
const decoded_buffer = Buffer.from(encoded_buffer.toString(), 'base64') const decoded_buffer = Buffer.from(encoded_buffer.toString(), 'base64')
const combined_stream = CombinedStream.create() const combined_stream = CombinedStream.create()
combined_stream.append(decoded_buffer) combined_stream.append(decoded_buffer)
combined_stream.pipe(fs.createWriteStream(placeholder.path, { flags: 'r+', start: position })) combined_stream.pipe(fs.createWriteStream(placeholder.path))
} }
_bufferStream(stream) { _bufferStream(stream) {
const chunks = [] const chunks = []
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
stream.on('data', chunk => { stream.on('data', chunk => {
console.log('stream data', chunk)
chunks.push(chunk) chunks.push(chunk)
}) })
stream.on('error', reject) stream.on('error', reject)
stream.on('end', () => { stream.on('end', () => {
console.log('stream end!')
resolve(Buffer.concat(chunks)) resolve(Buffer.concat(chunks))
}) })
}) })

View File

@ -39,7 +39,7 @@ class Socket extends Injectable {
} }
async on_message(msg) { async on_message(msg) {
this.output.info(msg) this.output.debug(msg)
const message = new Message(msg) const message = new Message(msg)
const response = new Message() const response = new Message()
message.socket = this message.socket = this

View File

@ -1,7 +1,6 @@
const { Errors } = require('../../shared') const { Errors } = require('../../shared')
const { NodeDescriptorType } = require('../../enum') const { NodeDescriptorType } = require('../../enum')
// TODO should this return size info for the temp write file?
module.exports = exports = async (message, di) => { module.exports = exports = async (message, di) => {
const Node = di.models.get('fs:Node') const Node = di.models.get('fs:Node')
const { path, mode } = message.data() const { path, mode } = message.data()

View File

@ -1,42 +1,24 @@
const { Errors } = require('../../shared') const { Errors } = require('../../shared')
const { NodeDescriptorType } = require('../../enum') const { NodeDescriptorType } = require('../../enum')
const fs = require('fs') const fs = require('fs')
const tmp = require('tmp-promise')
const exists = async file => { const CombinedStream = require('combined-stream')
return new Promise(res => {
fs.promises.stat(file).then(() => res(true)).catch(() => res(false))
})
}
module.exports = exports = async (message, di) => { module.exports = exports = async (message, di) => {
console.log('[FLUSH]')
const Node = di.models.get('fs:Node') const Node = di.models.get('fs:Node')
const { descriptor } = message.data() const { descriptor } = message.data()
const node_uuid = message.socket.session.file_descriptors?.[descriptor] const node_uuid = message.socket.session.file_descriptors?.[descriptor]
console.log({node_uuid})
if ( !node_uuid ) { if ( !node_uuid ) {
return message.send_response( return message.send_response(
message.fresh().error(Errors.NodeDoesNotExist) message.fresh().error(Errors.NodeDoesNotExist)
) )
} }
// Check if we have a temporary files which have been written to.
// If so, we should "flush" the temporary files to the proper storage.
// Check if we have a temporary file which has been written to. const placeholders = message.socket.session.temp_write_files?.[descriptor]
// If so, we should "flush" the temporary file to the proper storage. if ( !placeholders ) {
const placeholder = message.socket.session.temp_write_files?.[node_uuid]
console.log({placeholder})
if ( !placeholder ) {
return message.send_response(
message.fresh()
)
}
let stat;
try {
stat = await fs.promises.stat(placeholder.path)
} catch (e) {
return message.send_response( return message.send_response(
message.fresh() message.fresh()
) )
@ -48,7 +30,6 @@ console.log({placeholder})
descriptor_type: NodeDescriptorType.File, descriptor_type: NodeDescriptorType.File,
}) })
console.log({node})
if ( !node ) { if ( !node ) {
return message.send_response( return message.send_response(
message.fresh().error(Errors.NodeDoesNotExist) message.fresh().error(Errors.NodeDoesNotExist)
@ -56,30 +37,45 @@ console.log({node})
} }
const existing_file = await node.uploaded_file() const existing_file = await node.uploaded_file()
console.log({existing_file})
if ( existing_file ) { if ( existing_file ) {
// This is a new write, so delete the old file // This is a new write, so delete the old file
await existing_file.delete() await existing_file.delete()
delete node.uploaded_file_id delete node.uploaded_file_id
} }
// Sort the placeholders by the byte starting position
placeholders.sort((a, b) => {
if ( a.position < b.position ) return -1
if ( a.position > b.position ) return 1
return 0
})
// Combine them into a single stream
const stream = CombinedStream.create()
const combined_file = await tmp.file()
for ( const placeholder of placeholders ) {
stream.append(fs.createReadStream(placeholder.path))
}
const pipe = stream.pipe(fs.createWriteStream(combined_file.path))
await new Promise(res => pipe.on('finish', res))
// Store the temporary file // Store the temporary file
const new_file = await di.upload.provider().store({ const new_file = await di.upload.provider().store({
temp_path: placeholder.path, temp_path: combined_file.path,
original_name: node.pied_name, original_name: node.pied_name,
mime_type: 'application/octet-stream', // TODO determine from file extension? mime_type: 'application/octet-stream', // TODO determine from file extension?
}) })
console.log({new_file}) const stat = await fs.promises.stat(combined_file.path)
node.uploaded_file_id = new_file.id node.uploaded_file_id = new_file.id
node.size = stat.size node.size = stat.size
await node.save() await node.save()
console.log({saved_node: node})
message.send_response( message.send_response(
message.fresh() message.fresh()
) )
console.log('[/FLUSH]')
} }

View File

@ -7,7 +7,6 @@ const exists = async file => {
} }
module.exports = exports = async (message, di) => { module.exports = exports = async (message, di) => {
console.log('[RELEASE]')
const { descriptor } = message.data() const { descriptor } = message.data()
const node_uuid = message.socket.session.file_descriptors?.[descriptor] const node_uuid = message.socket.session.file_descriptors?.[descriptor]
@ -15,14 +14,18 @@ console.log('[RELEASE]')
delete message.socket.session.file_descriptors[descriptor] delete message.socket.session.file_descriptors[descriptor]
} }
const placeholder = message.socket.session.temp_write_files?.[node_uuid] const placeholders = message.socket.session.temp_write_files?.[descriptor]
if ( placeholder && (await exists(placeholder.path)) ) { if ( placeholders ) {
for ( const placeholder of placeholders ) {
if ( await exists(placeholder.path) ) {
await fs.promises.unlink(placeholder.path) await fs.promises.unlink(placeholder.path)
} }
}
delete message.socket.session.temp_write_files[descriptor]
}
message.send_response( message.send_response(
message.fresh() message.fresh()
) )
console.log('[/RELEASE]')
} }

View File

@ -26,6 +26,7 @@ module.exports = exports = async (message, di) => {
const data = { const data = {
node_uuid: node.uuid, node_uuid: node.uuid,
socket_uuid: message.socket.uuid, socket_uuid: message.socket.uuid,
descriptor,
} }
message.send_response( message.send_response(