server/app/ServerUnit.js

148 lines
4.8 KiB
JavaScript
Raw Normal View History

2020-11-27 01:57:37 +00:00
const WebSocket = require('ws')
const http = require('http')
const url = require('url')
const fs = require('fs')
const tmp = require('tmp-promise')
const websocketStream = require('websocket-stream')
const StreamSkip = require('stream-skip')
const StreamTake = require('./ws/StreamTake')
const Socket = require('./ws/Socket')
const { Unit } = require('libflitter')
const { NodeDescriptorType } = require('./enum')
const { Buffer } = require('buffer')
const { Readable } = require('stream')
class ServerUnit extends Unit {
static get services() {
return [...super.services, 'configs', 'app', 'models', 'upload']
}
sockets = []
static name() {
return 'server'
}
name() {
return 'server'
}
async go(app) {
this.server = new WebSocket.Server({
port: this.configs.get('server.port')
})
this.server.on('connection', socket => {
socket = this.app.di().make(Socket, socket)
this.sockets.push(socket)
socket.ping()
})
this.stream_server = http.createServer().listen(this.configs.get('server.stream_port'))
this.stream_socket = websocketStream.createServer({
server: this.stream_server,
perMessageDeflate: false,
binary: true,
}, (stream, request) => {
const query = url.parse(request.url, true).query
if ( !query.writing_file ) this.on_file_stream(stream, request)
else this.on_file_write_stream(stream, request)
})
await new Promise(res => {
process.on('SIGINT', res)
})
}
async on_file_write_stream(stream, request) {
let { socket_uuid, node_uuid, length = 4096, position = 0 } = url.parse(request.url, true).query
if ( typeof position === 'string' ) position = parseInt(position)
if ( typeof length === 'string' ) length = parseInt(length)
const socket = this.sockets.find(x => x.uuid === socket_uuid)
const Node = this.models.get('fs:Node')
const node = await Node.findOne({
uuid: node_uuid,
deleted: false,
descriptor_type: NodeDescriptorType.File,
})
if ( !socket.session.temp_write_files ) socket.session.temp_write_files = {}
const placeholder = socket.session.temp_write_files?.[node.uuid] || await tmp.file()
socket.session.temp_write_files[node.uuid] = placeholder
console.log('Upload placeholder:', placeholder)
2020-11-27 01:57:37 +00:00
const old_file = await node.uploaded_file()
if ( old_file ) {
if ( position === 0 ) {
// This is a new write, so delete the old file
await old_file.delete()
delete node.uploaded_file_id
} else {
await this.upload.provider().download_file(old_file, placeholder.path)
}
}
console.log('write stream', stream)
console.log('write data', { placeholder, position, length })
stream.pipe(fs.createWriteStream(placeholder.path, { flags: 'r+', start: position }))
2020-11-27 01:57:37 +00:00
}
_bufferStream(stream) {
const chunks = []
return new Promise((resolve, reject) => {
stream.on('data', chunk => {
console.log('stream data', chunk)
chunks.push(chunk)
})
stream.on('error', reject)
stream.on('end', () => {
console.log('stream end!')
resolve(Buffer.concat(chunks))
})
})
}
async on_file_stream(stream, request) {
let { socket_uuid, node_uuid, length = 4096, position = 0 } = url.parse(request.url, true).query
if ( typeof position === 'string' ) position = parseInt(position)
if ( typeof length === 'string' ) length = parseInt(length)
// const socket = this.sockets.find(x => x.uuid === socket_uuid)
const Node = this.models.get('fs:Node')
const node = await Node.findOne({
uuid: node_uuid,
deleted: false,
descriptor_type: NodeDescriptorType.File,
})
const file = await node.uploaded_file()
if ( file ) {
const readable = this.upload.provider().read_stream(file)
const slicer = new StreamSkip({ skip: position })
const taker = new StreamTake({ take: length })
readable.pipe(slicer).pipe(taker).pipe(stream)
} else {
// If no data was written, just return an empty file
const empty = new Readable()
empty.push('')
empty.push(null)
empty.pipe(stream)
}
}
async cleanup(app) {
this.server.close()
this.stream_server.close()
this.stream_socket.close()
}
}
module.exports = exports = ServerUnit