From fc59005da33ad58dbb47b417efa76f69a6aa2a67 Mon Sep 17 00:00:00 2001 From: garrettmills Date: Thu, 26 Nov 2020 20:02:41 -0600 Subject: [PATCH] Initial import --- .gitignore | 1 + Streamer.js | 60 +++++++++++++++++++++++++++ connector.js | 98 +++++++++++++++++++++++++++++++++++++++++++++ index.js | 65 ++++++++++++++++++++++++++++++ ops/create.js | 25 ++++++++++++ ops/getattr.js | 29 ++++++++++++++ ops/mkdir.js | 25 ++++++++++++ ops/open.js | 19 +++++++++ ops/read.js | 20 +++++++++ ops/readdir.js | 36 +++++++++++++++++ ops/release.js | 18 +++++++++ ops/rename.js | 18 +++++++++ ops/rmdir.js | 18 +++++++++ ops/unlink.js | 18 +++++++++ ops/write.js | 19 +++++++++ routes/meta.ping.js | 12 ++++++ 16 files changed, 481 insertions(+) create mode 100644 .gitignore create mode 100644 Streamer.js create mode 100644 connector.js create mode 100644 index.js create mode 100644 ops/create.js create mode 100644 ops/getattr.js create mode 100644 ops/mkdir.js create mode 100644 ops/open.js create mode 100644 ops/read.js create mode 100644 ops/readdir.js create mode 100644 ops/release.js create mode 100644 ops/rename.js create mode 100644 ops/rmdir.js create mode 100644 ops/unlink.js create mode 100644 ops/write.js create mode 100644 routes/meta.ping.js diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2d4daa4 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.idea* diff --git a/Streamer.js b/Streamer.js new file mode 100644 index 0000000..10a13ea --- /dev/null +++ b/Streamer.js @@ -0,0 +1,60 @@ +const WebSocketStream = require('websocket-stream') +const ConcatStream = require('concat-stream') +const { Duplex, Readable } = require('stream') +const { Buffer } = require('buffer') + +class Streamer { + constructor(socket_uuid, node_uuid, length = 4096, position = 0) { + this.socket_uuid = socket_uuid + this.node_uuid = node_uuid + this.length = length + this.position = position + } + + write(buffer) { + console.log('write buffer pre slice', buffer) + // buffer = buffer.slice(this.position, this.position + this.length) + // console.log('write buffer post slice', buffer) + + const write_stream = WebSocketStream(`ws://localhost:5746/?socket_uuid=${this.socket_uuid}&node_uuid=${this.node_uuid}&length=${this.length}&position=${this.position}&writing_file=true`, { + perMessageDeflate: false, + binary: true, + }) + + console.log(write_stream) + console.log('writing buffer', buffer.toString(), buffer) + + const read_stream = new Readable() + read_stream.push(buffer.toString()) + read_stream.push(null) + read_stream.pipe(write_stream) + } + + stream() { + this.ws = WebSocketStream(`ws://localhost:5746/?socket_uuid=${this.socket_uuid}&node_uuid=${this.node_uuid}&length=${this.length}&position=${this.position}`, { + perMessageDeflate: false, + binary: true, + }) + + return this.ws + } + + async buffer() { + return this._bufferStream(this.stream()) + } + + _bufferStream(stream) { + const chunks = [] + return new Promise((resolve, reject) => { + stream.on('data', chunk => chunks.push(chunk)) + stream.on('error', reject) + stream.on('end', () => resolve(Buffer.concat(chunks))) + }) + } + + close() { + this.ws.close() + } +} + +module.exports = exports = Streamer \ No newline at end of file diff --git a/connector.js b/connector.js new file mode 100644 index 0000000..2908f09 --- /dev/null +++ b/connector.js @@ -0,0 +1,98 @@ +const Message = require('../shared/Message') +const WebSocket = require('ws') +const Errors = require('../shared/Errors') + +class Connector { + constructor() { + this.client = new WebSocket('ws://localhost:8100') + this.messages = [] + this.token; + + this.client.on('message', msg => this.on_message(msg)) + } + + async on_message(msg) { + const message = new Message(msg) + const response = new Message() + + message.socket = this + + if ( message.is_response() ) { + // Try to find the message that sent the request + const request = this.messages.find(x => x.uuid() === message.response_to()) + if ( request ) { + await request._response_callback(message) + request.has_response = true; + } else { + this.send( + response.response_to(request.uuid()) + .error(Errors.InvalidReplyUUID) + ) + } + + this.messages = this.messages.filter(x => !x.has_response) + } else { + let handler; + try { + handler = require(`./routes/${message.route()}`) + } catch (e) {} + + if ( !handler ) { + return this.send(response.error(Errors.InvalidMessageRoute)) + } + + await handler(message, this) + } + } + + async open() { + await new Promise(res => this.client.once('open', res)) + } + + close() { + this.client.close() + } + + send(message) { + if ( typeof message === 'string' ) { + this.client.send(message); + return; + } + + if ( message.needs_response ) { + this.messages.push(message); + } + + const serial = message.serialize() + this.client.send(serial); + } + + async simple_request(route, data) { + return new Promise((res, rej) => { + this.send( + Message.route(route) + .data(data) + .expect_response(message => { + if ( message.error() ) rej(new Error(message.error())) + else res(message.data()) + }) + ) + }) + } + + async authenticate(token_value) { + return new Promise((res, rej) => { + this.send( + Message.route('meta.authenticate') + .data({ token_value }) + .expect_response(message => { + const { is_auth } = message.data() + if ( is_auth ) res() + else rej(new Error('Unable to authenticate.')) + }) + ) + }) + } +} + +module.exports = exports = new Connector; diff --git a/index.js b/index.js new file mode 100644 index 0000000..745cb09 --- /dev/null +++ b/index.js @@ -0,0 +1,65 @@ +const uuid = require('uuid').v4 +const Fuse = require('fuse-native') +const fs = require('fs').promises +const conn = require('./connector') + +const token = '0257387b959a4cbfa7b3a36fea7d8cfb97e435e2d6454a96992470e5ba107d29b35bed8c21dc4272b4d8d6eebc9a0f5f17b707c37e57442db9ab56775f449128' + +const ops = { + readdir: require('./ops/readdir'), + getattr: require('./ops/getattr'), + mkdir: require('./ops/mkdir'), + rmdir: require('./ops/rmdir'), + open: require('./ops/open'), + read: require('./ops/read'), + release: require('./ops/release'), + create: require('./ops/create'), + unlink: require('./ops/unlink'), + rename: require('./ops/rename'), + write: require('./ops/write'), +} + +;(async () => { + await conn.open() + await conn.authenticate(token) + const mnt = '/tmp/piedev-' + uuid() + + await fs.mkdir(mnt) + + const fuse = new Fuse(mnt, ops, {debug: true, displayFolder: true}) + + await new Promise((res, rej) => { + fuse.mount(err => { + if (err) { + console.error(err) + rej(err) + } else res() + }) + }) + + console.log('Mounted at ' + mnt) + + await new Promise(res => { + process.on('SIGINT', () => { + res() + }) + }) + + await new Promise((res, rej) => { + fuse.unmount(err => { + if (err) { + console.error(err) + rej(err) + } else { + res() + } + }) + }) + + console.log('Unmounted.') + + conn.close() +})().catch((e) => { + console.error(e) + conn.close() +}) diff --git a/ops/create.js b/ops/create.js new file mode 100644 index 0000000..c6a0be0 --- /dev/null +++ b/ops/create.js @@ -0,0 +1,25 @@ +const Fuse = require('fuse-native') +const Errors = require('../../shared/Errors') +const Message = require('../../shared/Message') +const connector = require('../connector') + +module.exports = exports = function (path, mode, cb) { + connector.send( + Message.route('fs.create') + .data({ path, mode }) + .expect_response(msg => { + if ( msg.error() ) { + return Errors.toCallback(cb, msg.error()) + } + + const { node, descriptor } = msg.data() + + delete node.pied_name + node.mtime = new Date(node.mtime) + node.atime = new Date(node.atime) + node.ctime = new Date(node.ctime) + + return process.nextTick(cb, 0, descriptor, node) + }) + ) +} diff --git a/ops/getattr.js b/ops/getattr.js new file mode 100644 index 0000000..f5c1102 --- /dev/null +++ b/ops/getattr.js @@ -0,0 +1,29 @@ +const Fuse = require('fuse-native') +const Errors = require('../../shared/Errors') +const Message = require('../../shared/Message') +const connector = require('../connector') + +module.exports = exports = function (path, cb) { + connector.send( + Message.route('fs.getattr') + .data({ path }) + .expect_response(msg => { + if ( msg.error() ) { + if ( msg.error() === Errors.NodeDoesNotExist ) { + return process.nextTick(cb, Fuse.ENOENT) + } else { + return process.nextTick(cb, Fuse.ENODATA) + } + } + + const { node } = msg.data() + + delete node.pied_name + node.mtime = new Date(node.mtime) + node.atime = new Date(node.atime) + node.ctime = new Date(node.ctime) + + return process.nextTick(cb, 0, node) + }) + ) +} diff --git a/ops/mkdir.js b/ops/mkdir.js new file mode 100644 index 0000000..9e1432f --- /dev/null +++ b/ops/mkdir.js @@ -0,0 +1,25 @@ +const Fuse = require('fuse-native') +const Errors = require('../../shared/Errors') +const Message = require('../../shared/Message') +const connector = require('../connector') + +module.exports = exports = function (path, mode, cb) { + connector.send( + Message.route('fs.mkdir') + .data({ path, mode }) + .expect_response(msg => { + if ( msg.error() ) { + return Errors.toCallback(cb, msg.error()) + } + + const { node } = msg.data() + + delete node.pied_name + node.mtime = new Date(node.mtime) + node.atime = new Date(node.atime) + node.ctime = new Date(node.ctime) + + return process.nextTick(cb, 0, node) + }) + ) +} diff --git a/ops/open.js b/ops/open.js new file mode 100644 index 0000000..4ed0083 --- /dev/null +++ b/ops/open.js @@ -0,0 +1,19 @@ +const Fuse = require('fuse-native') +const Errors = require('../../shared/Errors') +const Message = require('../../shared/Message') +const connector = require('../connector') + +module.exports = exports = function (path, flags, cb) { + connector.send( + Message.route('fs.open') + .data({ path, flags }) + .expect_response(msg => { + if ( msg.error() ) { + return Errors.toCallback(cb, msg.error()) + } + + const { descriptor } = msg.data() + return process.nextTick(cb, 0, descriptor) + }) + ) +} diff --git a/ops/read.js b/ops/read.js new file mode 100644 index 0000000..1207189 --- /dev/null +++ b/ops/read.js @@ -0,0 +1,20 @@ +const Fuse = require('fuse-native') +const Errors = require('../../shared/Errors') +const Message = require('../../shared/Message') +const connector = require('../connector') +const Streamer = require('../Streamer') + +module.exports = exports = async function (path, fd, buffer, length, position, cb) { + try { + const { socket_uuid, node_uuid } = await connector.simple_request('stream.getfd', { descriptor: fd }) + + const streamer = new Streamer(socket_uuid, node_uuid, length, position) + const stream_buffer = await streamer.buffer() + + if ( stream_buffer.length < 1 ) return process.nextTick(cb, 0) + stream_buffer.copy(buffer) + return process.nextTick(cb, stream_buffer.length) + } catch (e) { + console.error(e) + } +} diff --git a/ops/readdir.js b/ops/readdir.js new file mode 100644 index 0000000..3d4d2ca --- /dev/null +++ b/ops/readdir.js @@ -0,0 +1,36 @@ +const Fuse = require('fuse-native') +const Errors = require('../../shared/Errors') +const Message = require('../../shared/Message') +const connector = require('../connector') + +module.exports = exports = function (path, cb) { + + connector.send( + Message.route('fs.readdir') + .data({ path }) + .expect_response(msg => { + if ( msg.error() ) { + if ( msg.error() === Errors.NodeDoesNotExist ) { + return process.nextTick(cb, Fuse.ENOENT) + } else { + return process.nextTick(cb, Fuse.ENODATA) + } + } + + const { nodes } = msg.data() + + const names = nodes.map(x => { + const name = x.pied_name + + x.mtime = new Date(x.mtime) + x.atime = new Date(x.atime) + x.ctime = new Date(x.ctime) + delete x.pied_name + + return name + }) + + return process.nextTick(cb, 0, names, nodes) + }) + ) +} diff --git a/ops/release.js b/ops/release.js new file mode 100644 index 0000000..8f52f2a --- /dev/null +++ b/ops/release.js @@ -0,0 +1,18 @@ +const Fuse = require('fuse-native') +const Errors = require('../../shared/Errors') +const Message = require('../../shared/Message') +const connector = require('../connector') + +module.exports = exports = function (path, fd, cb) { + connector.send( + Message.route('fs.release') + .data({ path, descriptor: fd }) + .expect_response(msg => { + if ( msg.error() ) { + return Errors.toCallback(cb, msg.error()) + } + + return process.nextTick(cb, 0) + }) + ) +} diff --git a/ops/rename.js b/ops/rename.js new file mode 100644 index 0000000..992713c --- /dev/null +++ b/ops/rename.js @@ -0,0 +1,18 @@ +const Fuse = require('fuse-native') +const Errors = require('../../shared/Errors') +const Message = require('../../shared/Message') +const connector = require('../connector') + +module.exports = exports = function (source, destination, cb) { + connector.send( + Message.route('fs.rename') + .data({ source, destination }) + .expect_response(msg => { + if ( msg.error() ) { + return Errors.toCallback(cb, msg.error()) + } + + return process.nextTick(cb, 0) + }) + ) +} diff --git a/ops/rmdir.js b/ops/rmdir.js new file mode 100644 index 0000000..0bc920f --- /dev/null +++ b/ops/rmdir.js @@ -0,0 +1,18 @@ +const Fuse = require('fuse-native') +const Errors = require('../../shared/Errors') +const Message = require('../../shared/Message') +const connector = require('../connector') + +module.exports = exports = function (path, cb) { + connector.send( + Message.route('fs.rmdir') + .data({ path }) + .expect_response(msg => { + if ( msg.error() ) { + return Errors.toCallback(cb, msg.error()) + } + + return process.nextTick(cb, 0) + }) + ) +} diff --git a/ops/unlink.js b/ops/unlink.js new file mode 100644 index 0000000..26bee29 --- /dev/null +++ b/ops/unlink.js @@ -0,0 +1,18 @@ +const Fuse = require('fuse-native') +const Errors = require('../../shared/Errors') +const Message = require('../../shared/Message') +const connector = require('../connector') + +module.exports = exports = function (path, cb) { + connector.send( + Message.route('fs.unlink') + .data({ path }) + .expect_response(msg => { + if ( msg.error() ) { + return Errors.toCallback(cb, msg.error()) + } + + return process.nextTick(cb, 0) + }) + ) +} diff --git a/ops/write.js b/ops/write.js new file mode 100644 index 0000000..f8385c9 --- /dev/null +++ b/ops/write.js @@ -0,0 +1,19 @@ +const Fuse = require('fuse-native') +const Errors = require('../../shared/Errors') +const Message = require('../../shared/Message') +const connector = require('../connector') +const Streamer = require('../Streamer') + +module.exports = exports = async function (path, fd, buffer, length, position, cb) { + try { + const { socket_uuid, node_uuid } = await connector.simple_request('stream.getfd', { descriptor: fd }) + console.log({socket_uuid, node_uuid}) + + const streamer = new Streamer(socket_uuid, node_uuid, length, position) + streamer.write(buffer) + + return process.nextTick(cb, length) + } catch (e) { + console.error(e) + } +} diff --git a/routes/meta.ping.js b/routes/meta.ping.js new file mode 100644 index 0000000..eeabe10 --- /dev/null +++ b/routes/meta.ping.js @@ -0,0 +1,12 @@ +module.exports = exports = async (message, connector) => { + console.log('Received ping!'); + message.send_response(message.fresh()) + + connector.send( + message.fresh() + .route('meta.ping') + .expect_response(res => { + console.log('Received pong!'); + }) + ) +}