Initial import

master
Garrett Mills 3 years ago
commit fc59005da3
Signed by: garrettmills
GPG Key ID: D2BF5FBA8298F246

1
.gitignore vendored

@ -0,0 +1 @@
.idea*

@ -0,0 +1,60 @@
const WebSocketStream = require('websocket-stream')
const ConcatStream = require('concat-stream')
const { Duplex, Readable } = require('stream')
const { Buffer } = require('buffer')
class Streamer {
constructor(socket_uuid, node_uuid, length = 4096, position = 0) {
this.socket_uuid = socket_uuid
this.node_uuid = node_uuid
this.length = length
this.position = position
}
write(buffer) {
console.log('write buffer pre slice', buffer)
// buffer = buffer.slice(this.position, this.position + this.length)
// console.log('write buffer post slice', buffer)
const write_stream = WebSocketStream(`ws://localhost:5746/?socket_uuid=${this.socket_uuid}&node_uuid=${this.node_uuid}&length=${this.length}&position=${this.position}&writing_file=true`, {
perMessageDeflate: false,
binary: true,
})
console.log(write_stream)
console.log('writing buffer', buffer.toString(), buffer)
const read_stream = new Readable()
read_stream.push(buffer.toString())
read_stream.push(null)
read_stream.pipe(write_stream)
}
stream() {
this.ws = WebSocketStream(`ws://localhost:5746/?socket_uuid=${this.socket_uuid}&node_uuid=${this.node_uuid}&length=${this.length}&position=${this.position}`, {
perMessageDeflate: false,
binary: true,
})
return this.ws
}
async buffer() {
return this._bufferStream(this.stream())
}
_bufferStream(stream) {
const chunks = []
return new Promise((resolve, reject) => {
stream.on('data', chunk => chunks.push(chunk))
stream.on('error', reject)
stream.on('end', () => resolve(Buffer.concat(chunks)))
})
}
close() {
this.ws.close()
}
}
module.exports = exports = Streamer

@ -0,0 +1,98 @@
const Message = require('../shared/Message')
const WebSocket = require('ws')
const Errors = require('../shared/Errors')
class Connector {
constructor() {
this.client = new WebSocket('ws://localhost:8100')
this.messages = []
this.token;
this.client.on('message', msg => this.on_message(msg))
}
async on_message(msg) {
const message = new Message(msg)
const response = new Message()
message.socket = this
if ( message.is_response() ) {
// Try to find the message that sent the request
const request = this.messages.find(x => x.uuid() === message.response_to())
if ( request ) {
await request._response_callback(message)
request.has_response = true;
} else {
this.send(
response.response_to(request.uuid())
.error(Errors.InvalidReplyUUID)
)
}
this.messages = this.messages.filter(x => !x.has_response)
} else {
let handler;
try {
handler = require(`./routes/${message.route()}`)
} catch (e) {}
if ( !handler ) {
return this.send(response.error(Errors.InvalidMessageRoute))
}
await handler(message, this)
}
}
async open() {
await new Promise(res => this.client.once('open', res))
}
close() {
this.client.close()
}
send(message) {
if ( typeof message === 'string' ) {
this.client.send(message);
return;
}
if ( message.needs_response ) {
this.messages.push(message);
}
const serial = message.serialize()
this.client.send(serial);
}
async simple_request(route, data) {
return new Promise((res, rej) => {
this.send(
Message.route(route)
.data(data)
.expect_response(message => {
if ( message.error() ) rej(new Error(message.error()))
else res(message.data())
})
)
})
}
async authenticate(token_value) {
return new Promise((res, rej) => {
this.send(
Message.route('meta.authenticate')
.data({ token_value })
.expect_response(message => {
const { is_auth } = message.data()
if ( is_auth ) res()
else rej(new Error('Unable to authenticate.'))
})
)
})
}
}
module.exports = exports = new Connector;

@ -0,0 +1,65 @@
const uuid = require('uuid').v4
const Fuse = require('fuse-native')
const fs = require('fs').promises
const conn = require('./connector')
const token = '0257387b959a4cbfa7b3a36fea7d8cfb97e435e2d6454a96992470e5ba107d29b35bed8c21dc4272b4d8d6eebc9a0f5f17b707c37e57442db9ab56775f449128'
const ops = {
readdir: require('./ops/readdir'),
getattr: require('./ops/getattr'),
mkdir: require('./ops/mkdir'),
rmdir: require('./ops/rmdir'),
open: require('./ops/open'),
read: require('./ops/read'),
release: require('./ops/release'),
create: require('./ops/create'),
unlink: require('./ops/unlink'),
rename: require('./ops/rename'),
write: require('./ops/write'),
}
;(async () => {
await conn.open()
await conn.authenticate(token)
const mnt = '/tmp/piedev-' + uuid()
await fs.mkdir(mnt)
const fuse = new Fuse(mnt, ops, {debug: true, displayFolder: true})
await new Promise((res, rej) => {
fuse.mount(err => {
if (err) {
console.error(err)
rej(err)
} else res()
})
})
console.log('Mounted at ' + mnt)
await new Promise(res => {
process.on('SIGINT', () => {
res()
})
})
await new Promise((res, rej) => {
fuse.unmount(err => {
if (err) {
console.error(err)
rej(err)
} else {
res()
}
})
})
console.log('Unmounted.')
conn.close()
})().catch((e) => {
console.error(e)
conn.close()
})

@ -0,0 +1,25 @@
const Fuse = require('fuse-native')
const Errors = require('../../shared/Errors')
const Message = require('../../shared/Message')
const connector = require('../connector')
module.exports = exports = function (path, mode, cb) {
connector.send(
Message.route('fs.create')
.data({ path, mode })
.expect_response(msg => {
if ( msg.error() ) {
return Errors.toCallback(cb, msg.error())
}
const { node, descriptor } = msg.data()
delete node.pied_name
node.mtime = new Date(node.mtime)
node.atime = new Date(node.atime)
node.ctime = new Date(node.ctime)
return process.nextTick(cb, 0, descriptor, node)
})
)
}

@ -0,0 +1,29 @@
const Fuse = require('fuse-native')
const Errors = require('../../shared/Errors')
const Message = require('../../shared/Message')
const connector = require('../connector')
module.exports = exports = function (path, cb) {
connector.send(
Message.route('fs.getattr')
.data({ path })
.expect_response(msg => {
if ( msg.error() ) {
if ( msg.error() === Errors.NodeDoesNotExist ) {
return process.nextTick(cb, Fuse.ENOENT)
} else {
return process.nextTick(cb, Fuse.ENODATA)
}
}
const { node } = msg.data()
delete node.pied_name
node.mtime = new Date(node.mtime)
node.atime = new Date(node.atime)
node.ctime = new Date(node.ctime)
return process.nextTick(cb, 0, node)
})
)
}

@ -0,0 +1,25 @@
const Fuse = require('fuse-native')
const Errors = require('../../shared/Errors')
const Message = require('../../shared/Message')
const connector = require('../connector')
module.exports = exports = function (path, mode, cb) {
connector.send(
Message.route('fs.mkdir')
.data({ path, mode })
.expect_response(msg => {
if ( msg.error() ) {
return Errors.toCallback(cb, msg.error())
}
const { node } = msg.data()
delete node.pied_name
node.mtime = new Date(node.mtime)
node.atime = new Date(node.atime)
node.ctime = new Date(node.ctime)
return process.nextTick(cb, 0, node)
})
)
}

@ -0,0 +1,19 @@
const Fuse = require('fuse-native')
const Errors = require('../../shared/Errors')
const Message = require('../../shared/Message')
const connector = require('../connector')
module.exports = exports = function (path, flags, cb) {
connector.send(
Message.route('fs.open')
.data({ path, flags })
.expect_response(msg => {
if ( msg.error() ) {
return Errors.toCallback(cb, msg.error())
}
const { descriptor } = msg.data()
return process.nextTick(cb, 0, descriptor)
})
)
}

@ -0,0 +1,20 @@
const Fuse = require('fuse-native')
const Errors = require('../../shared/Errors')
const Message = require('../../shared/Message')
const connector = require('../connector')
const Streamer = require('../Streamer')
module.exports = exports = async function (path, fd, buffer, length, position, cb) {
try {
const { socket_uuid, node_uuid } = await connector.simple_request('stream.getfd', { descriptor: fd })
const streamer = new Streamer(socket_uuid, node_uuid, length, position)
const stream_buffer = await streamer.buffer()
if ( stream_buffer.length < 1 ) return process.nextTick(cb, 0)
stream_buffer.copy(buffer)
return process.nextTick(cb, stream_buffer.length)
} catch (e) {
console.error(e)
}
}

@ -0,0 +1,36 @@
const Fuse = require('fuse-native')
const Errors = require('../../shared/Errors')
const Message = require('../../shared/Message')
const connector = require('../connector')
module.exports = exports = function (path, cb) {
connector.send(
Message.route('fs.readdir')
.data({ path })
.expect_response(msg => {
if ( msg.error() ) {
if ( msg.error() === Errors.NodeDoesNotExist ) {
return process.nextTick(cb, Fuse.ENOENT)
} else {
return process.nextTick(cb, Fuse.ENODATA)
}
}
const { nodes } = msg.data()
const names = nodes.map(x => {
const name = x.pied_name
x.mtime = new Date(x.mtime)
x.atime = new Date(x.atime)
x.ctime = new Date(x.ctime)
delete x.pied_name
return name
})
return process.nextTick(cb, 0, names, nodes)
})
)
}

@ -0,0 +1,18 @@
const Fuse = require('fuse-native')
const Errors = require('../../shared/Errors')
const Message = require('../../shared/Message')
const connector = require('../connector')
module.exports = exports = function (path, fd, cb) {
connector.send(
Message.route('fs.release')
.data({ path, descriptor: fd })
.expect_response(msg => {
if ( msg.error() ) {
return Errors.toCallback(cb, msg.error())
}
return process.nextTick(cb, 0)
})
)
}

@ -0,0 +1,18 @@
const Fuse = require('fuse-native')
const Errors = require('../../shared/Errors')
const Message = require('../../shared/Message')
const connector = require('../connector')
module.exports = exports = function (source, destination, cb) {
connector.send(
Message.route('fs.rename')
.data({ source, destination })
.expect_response(msg => {
if ( msg.error() ) {
return Errors.toCallback(cb, msg.error())
}
return process.nextTick(cb, 0)
})
)
}

@ -0,0 +1,18 @@
const Fuse = require('fuse-native')
const Errors = require('../../shared/Errors')
const Message = require('../../shared/Message')
const connector = require('../connector')
module.exports = exports = function (path, cb) {
connector.send(
Message.route('fs.rmdir')
.data({ path })
.expect_response(msg => {
if ( msg.error() ) {
return Errors.toCallback(cb, msg.error())
}
return process.nextTick(cb, 0)
})
)
}

@ -0,0 +1,18 @@
const Fuse = require('fuse-native')
const Errors = require('../../shared/Errors')
const Message = require('../../shared/Message')
const connector = require('../connector')
module.exports = exports = function (path, cb) {
connector.send(
Message.route('fs.unlink')
.data({ path })
.expect_response(msg => {
if ( msg.error() ) {
return Errors.toCallback(cb, msg.error())
}
return process.nextTick(cb, 0)
})
)
}

@ -0,0 +1,19 @@
const Fuse = require('fuse-native')
const Errors = require('../../shared/Errors')
const Message = require('../../shared/Message')
const connector = require('../connector')
const Streamer = require('../Streamer')
module.exports = exports = async function (path, fd, buffer, length, position, cb) {
try {
const { socket_uuid, node_uuid } = await connector.simple_request('stream.getfd', { descriptor: fd })
console.log({socket_uuid, node_uuid})
const streamer = new Streamer(socket_uuid, node_uuid, length, position)
streamer.write(buffer)
return process.nextTick(cb, length)
} catch (e) {
console.error(e)
}
}

@ -0,0 +1,12 @@
module.exports = exports = async (message, connector) => {
console.log('Received ping!');
message.send_response(message.fresh())
connector.send(
message.fresh()
.route('meta.ping')
.expect_response(res => {
console.log('Received pong!');
})
)
}
Loading…
Cancel
Save