2020-11-27 01:57:37 +00:00
|
|
|
const WebSocket = require('ws')
|
|
|
|
const http = require('http')
|
|
|
|
const url = require('url')
|
|
|
|
const fs = require('fs')
|
|
|
|
const tmp = require('tmp-promise')
|
|
|
|
const websocketStream = require('websocket-stream')
|
|
|
|
const StreamSkip = require('stream-skip')
|
|
|
|
const StreamTake = require('./ws/StreamTake')
|
|
|
|
const Socket = require('./ws/Socket')
|
|
|
|
const { Unit } = require('libflitter')
|
|
|
|
const { NodeDescriptorType } = require('./enum')
|
2020-11-27 04:47:28 +00:00
|
|
|
const CombinedStream = require('combined-stream')
|
2020-11-27 01:57:37 +00:00
|
|
|
|
|
|
|
const { Buffer } = require('buffer')
|
|
|
|
const { Readable } = require('stream')
|
|
|
|
|
|
|
|
class ServerUnit extends Unit {
|
|
|
|
static get services() {
|
|
|
|
return [...super.services, 'configs', 'app', 'models', 'upload']
|
|
|
|
}
|
|
|
|
|
|
|
|
sockets = []
|
|
|
|
|
|
|
|
static name() {
|
|
|
|
return 'server'
|
|
|
|
}
|
|
|
|
|
|
|
|
name() {
|
|
|
|
return 'server'
|
|
|
|
}
|
|
|
|
|
|
|
|
async go(app) {
|
|
|
|
this.server = new WebSocket.Server({
|
|
|
|
port: this.configs.get('server.port')
|
|
|
|
})
|
|
|
|
|
|
|
|
this.server.on('connection', socket => {
|
|
|
|
socket = this.app.di().make(Socket, socket)
|
|
|
|
this.sockets.push(socket)
|
|
|
|
socket.ping()
|
|
|
|
})
|
|
|
|
|
|
|
|
this.stream_server = http.createServer().listen(this.configs.get('server.stream_port'))
|
|
|
|
this.stream_socket = websocketStream.createServer({
|
|
|
|
server: this.stream_server,
|
|
|
|
perMessageDeflate: false,
|
|
|
|
binary: true,
|
|
|
|
}, (stream, request) => {
|
|
|
|
const query = url.parse(request.url, true).query
|
|
|
|
if ( !query.writing_file ) this.on_file_stream(stream, request)
|
|
|
|
else this.on_file_write_stream(stream, request)
|
|
|
|
})
|
|
|
|
|
|
|
|
await new Promise(res => {
|
|
|
|
process.on('SIGINT', res)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
async on_file_write_stream(stream, request) {
|
|
|
|
let { socket_uuid, node_uuid, length = 4096, position = 0 } = url.parse(request.url, true).query
|
|
|
|
if ( typeof position === 'string' ) position = parseInt(position)
|
|
|
|
if ( typeof length === 'string' ) length = parseInt(length)
|
|
|
|
const socket = this.sockets.find(x => x.uuid === socket_uuid)
|
|
|
|
|
|
|
|
const Node = this.models.get('fs:Node')
|
|
|
|
const node = await Node.findOne({
|
|
|
|
uuid: node_uuid,
|
|
|
|
deleted: false,
|
|
|
|
descriptor_type: NodeDescriptorType.File,
|
|
|
|
})
|
|
|
|
|
|
|
|
if ( !socket.session.temp_write_files ) socket.session.temp_write_files = {}
|
|
|
|
|
|
|
|
const placeholder = socket.session.temp_write_files?.[node.uuid] || await tmp.file()
|
|
|
|
socket.session.temp_write_files[node.uuid] = placeholder
|
|
|
|
|
2020-11-27 02:53:50 +00:00
|
|
|
console.log('Upload placeholder:', placeholder)
|
2020-11-27 01:57:37 +00:00
|
|
|
|
|
|
|
const old_file = await node.uploaded_file()
|
|
|
|
if ( old_file ) {
|
|
|
|
if ( position === 0 ) {
|
|
|
|
// This is a new write, so delete the old file
|
|
|
|
await old_file.delete()
|
|
|
|
delete node.uploaded_file_id
|
|
|
|
} else {
|
|
|
|
await this.upload.provider().download_file(old_file, placeholder.path)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-11-27 04:47:28 +00:00
|
|
|
const encoded_buffer = await this._bufferStream(stream)
|
2020-11-27 05:37:06 +00:00
|
|
|
const decoded_buffer = Buffer.from(encoded_buffer.toString(), 'base64')
|
2020-11-27 01:57:37 +00:00
|
|
|
|
2020-11-27 04:47:28 +00:00
|
|
|
const combined_stream = CombinedStream.create()
|
|
|
|
combined_stream.append(decoded_buffer)
|
|
|
|
combined_stream.pipe(fs.createWriteStream(placeholder.path, { flags: 'r+', start: position }))
|
2020-11-27 01:57:37 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
_bufferStream(stream) {
|
|
|
|
const chunks = []
|
|
|
|
return new Promise((resolve, reject) => {
|
|
|
|
stream.on('data', chunk => {
|
|
|
|
console.log('stream data', chunk)
|
|
|
|
chunks.push(chunk)
|
|
|
|
})
|
|
|
|
stream.on('error', reject)
|
|
|
|
stream.on('end', () => {
|
|
|
|
console.log('stream end!')
|
|
|
|
resolve(Buffer.concat(chunks))
|
|
|
|
})
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
async on_file_stream(stream, request) {
|
|
|
|
let { socket_uuid, node_uuid, length = 4096, position = 0 } = url.parse(request.url, true).query
|
|
|
|
if ( typeof position === 'string' ) position = parseInt(position)
|
|
|
|
if ( typeof length === 'string' ) length = parseInt(length)
|
|
|
|
// const socket = this.sockets.find(x => x.uuid === socket_uuid)
|
|
|
|
|
|
|
|
const Node = this.models.get('fs:Node')
|
|
|
|
const node = await Node.findOne({
|
|
|
|
uuid: node_uuid,
|
|
|
|
deleted: false,
|
|
|
|
descriptor_type: NodeDescriptorType.File,
|
|
|
|
})
|
|
|
|
|
|
|
|
const file = await node.uploaded_file()
|
|
|
|
|
|
|
|
if ( file ) {
|
|
|
|
const readable = this.upload.provider().read_stream(file)
|
|
|
|
const slicer = new StreamSkip({ skip: position })
|
|
|
|
const taker = new StreamTake({ take: length })
|
|
|
|
|
|
|
|
readable.pipe(slicer).pipe(taker).pipe(stream)
|
|
|
|
} else {
|
|
|
|
// If no data was written, just return an empty file
|
|
|
|
const empty = new Readable()
|
|
|
|
empty.push('')
|
|
|
|
empty.push(null)
|
|
|
|
empty.pipe(stream)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async cleanup(app) {
|
|
|
|
this.server.close()
|
|
|
|
this.stream_server.close()
|
|
|
|
this.stream_socket.close()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
module.exports = exports = ServerUnit
|