diff --git a/Streamer.js b/Streamer.js index 1473acc..2092131 100644 --- a/Streamer.js +++ b/Streamer.js @@ -21,6 +21,12 @@ class Streamer { const combined_stream = CombinedStream.create() combined_stream.append(buffer.toString('base64')) combined_stream.pipe(write_stream) + + return new Promise(res => { + write_stream.socket.on('close', () => { + res() + }) + }) } stream() { diff --git a/index.js b/index.js index 745cb09..0616a90 100644 --- a/index.js +++ b/index.js @@ -17,6 +17,7 @@ const ops = { unlink: require('./ops/unlink'), rename: require('./ops/rename'), write: require('./ops/write'), + flush: require('./ops/flush'), } ;(async () => { diff --git a/ops/flush.js b/ops/flush.js new file mode 100644 index 0000000..d964dd7 --- /dev/null +++ b/ops/flush.js @@ -0,0 +1,20 @@ +const Fuse = require('fuse-native') +const Errors = require('../../shared/Errors') +const Message = require('../../shared/Message') +const connector = require('../connector') + +module.exports = exports = function (path, fd, cb) { +console.log('[FLUSH]') + connector.send( + Message.route('fs.flush') + .data({ path, descriptor: fd }) + .expect_response(msg => { + if ( msg.error() ) { + return Errors.toCallback(cb, msg.error()) + } + +console.log('[/FLUSH]') + return process.nextTick(cb, 0) + }) + ) +} diff --git a/ops/release.js b/ops/release.js index 8f52f2a..7cdceb1 100644 --- a/ops/release.js +++ b/ops/release.js @@ -4,6 +4,7 @@ const Message = require('../../shared/Message') const connector = require('../connector') module.exports = exports = function (path, fd, cb) { +console.log('[RELEASE]') connector.send( Message.route('fs.release') .data({ path, descriptor: fd }) @@ -12,6 +13,7 @@ module.exports = exports = function (path, fd, cb) { return Errors.toCallback(cb, msg.error()) } +console.log('[/RELEASE]') return process.nextTick(cb, 0) }) ) diff --git a/ops/write.js b/ops/write.js index f8385c9..0605ada 100644 --- a/ops/write.js +++ b/ops/write.js @@ -10,9 +10,9 @@ module.exports = exports = async function (path, fd, buffer, length, position, c console.log({socket_uuid, node_uuid}) const streamer = new Streamer(socket_uuid, node_uuid, length, position) - streamer.write(buffer) - - return process.nextTick(cb, length) + streamer.write(buffer).then(() => { + return process.nextTick(cb, length) + }) } catch (e) { console.error(e) }