You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

132 lines
4.3 KiB

const WebSocket = require('ws')
const http = require('http')
const url = require('url')
const fs = require('fs')
const tmp = require('tmp-promise')
const websocketStream = require('websocket-stream')
const StreamSkip = require('stream-skip')
const StreamTake = require('./ws/StreamTake')
const Socket = require('./ws/Socket')
const { Unit } = require('libflitter')
const { NodeDescriptorType } = require('./enum')
const CombinedStream = require('combined-stream')
const { Buffer } = require('buffer')
const { Readable } = require('stream')
class ServerUnit extends Unit {
static get services() {
return [...super.services, 'configs', 'app', 'models', 'upload']
}
sockets = []
static name() {
return 'server'
}
name() {
return 'server'
}
async go(app) {
this.server = new WebSocket.Server({
port: this.configs.get('server.port')
})
this.server.on('connection', socket => {
socket = this.app.di().make(Socket, socket)
this.sockets.push(socket)
socket.ping()
})
this.stream_server = http.createServer().listen(this.configs.get('server.stream_port'))
this.stream_socket = websocketStream.createServer({
server: this.stream_server,
perMessageDeflate: false,
binary: true,
}, (stream, request) => {
const query = url.parse(request.url, true).query
if ( !query.writing_file ) this.on_file_stream(stream, request)
else this.on_file_write_stream(stream, request)
})
await new Promise(res => {
process.on('SIGINT', res)
})
}
async on_file_write_stream(stream, request) {
let { socket_uuid, node_uuid, length = 4096, position = 0, descriptor } = url.parse(request.url, true).query
if ( typeof position === 'string' ) position = parseInt(position)
if ( typeof length === 'string' ) length = parseInt(length)
const socket = this.sockets.find(x => x.uuid === socket_uuid)
if ( !socket.session.temp_write_files ) socket.session.temp_write_files = {}
if ( !socket.session.temp_write_files[descriptor] ) socket.session.temp_write_files[descriptor] = []
const placeholder = await tmp.file()
socket.session.temp_write_files[descriptor].push(placeholder)
placeholder.position = position
placeholder.length = length
const encoded_buffer = await this._bufferStream(stream)
const decoded_buffer = Buffer.from(encoded_buffer.toString(), 'base64')
const combined_stream = CombinedStream.create()
combined_stream.append(decoded_buffer)
combined_stream.pipe(fs.createWriteStream(placeholder.path))
}
_bufferStream(stream) {
const chunks = []
return new Promise((resolve, reject) => {
stream.on('data', chunk => {
chunks.push(chunk)
})
stream.on('error', reject)
stream.on('end', () => {
resolve(Buffer.concat(chunks))
})
})
}
async on_file_stream(stream, request) {
let { socket_uuid, node_uuid, length = 4096, position = 0 } = url.parse(request.url, true).query
if ( typeof position === 'string' ) position = parseInt(position)
if ( typeof length === 'string' ) length = parseInt(length)
// const socket = this.sockets.find(x => x.uuid === socket_uuid)
const Node = this.models.get('fs:Node')
const node = await Node.findOne({
uuid: node_uuid,
deleted: false,
descriptor_type: NodeDescriptorType.File,
})
const file = await node.uploaded_file()
if ( file ) {
const readable = this.upload.provider().read_stream(file)
const slicer = new StreamSkip({ skip: position })
const taker = new StreamTake({ take: length })
readable.pipe(slicer).pipe(taker).pipe(stream)
} else {
// If no data was written, just return an empty file
const empty = new Readable()
empty.push('')
empty.push(null)
empty.pipe(stream)
}
}
async cleanup(app) {
this.server.close()
this.stream_server.close()
this.stream_socket.close()
}
}
module.exports = exports = ServerUnit