import {FileMetadata, Filesystem, Stat} from './Filesystem' import * as ssh2 from 'ssh2' import * as path from 'path' import * as fs from 'fs' import {Readable, Writable} from 'stream' import {Collection} from '../../collection/Collection' import {UniversalPath} from '../path' import {Maybe} from '../types' /** * A Filesystem implementation that stores files on remote hosts via SFTP/SSH. */ export class SSHFilesystem extends Filesystem { private sshClient?: ssh2.Client constructor( protected readonly baseConfig: { ssh: ssh2.ConnectConfig, baseDir: string }, ) { super() } getPrefix(): string { return `sftp+${this.baseConfig.ssh.host}://` } async getStoreFileAsTemp(args: { storePath: string }): Promise { const temp = this.tempName() const write = fs.createWriteStream(temp) const read = await this.getStoreFileAsStream(args) return new Promise((res, rej) => { write.on('finish', () => { res(new UniversalPath(temp)) }) write.on('error', rej) read.on('error', rej) read.pipe(write) }) } async getStoreFileAsStream(args: { storePath: string }): Promise { const sftp = await this.getSFTP() return sftp.createReadStream(this.storePath(args.storePath)) } async putLocalFile(args: { localPath: string; storePath: string; mimeType?: string; tags?: string[]; tag?: string }): Promise { const read = fs.createReadStream(args.localPath) const write = await this.putStoreFileAsStream({storePath: args.storePath}) // write the metadata first const sftp = await this.getSFTP() await sftp.writeFile(this.metadataPath(args.storePath), JSON.stringify({ mimeType: args.mimeType, tags: this.normalizeTags(args.tag, args.tags), })) // pipe the local file to the store await new Promise((res, rej) => { write.on('finish', () => res()) write.on('error', rej) read.on('error', rej) read.pipe(write) }) } async putStoreFileAsStream(args: { storePath: string }): Promise { const sftp = await this.getSFTP() return sftp.createWriteStream(this.storePath(args.storePath)) } async mkdir(args: { storePath: string }): Promise { const sftp = await this.getSFTP() await new Promise((res, rej) => { sftp.mkdir(this.storePath(args.storePath), err => { if ( err ) { rej(err) } else { res() } }) }) } async remove(args: { storePath: string; recursive?: boolean }): Promise { const sftp = await this.getSFTP() await new Promise((res, rej) => { sftp.unlink(this.storePath(args.storePath), err => { if ( err ) { return rej(err) } else { sftp.unlink(this.metadataPath(args.storePath), err2 => { if ( err2 ) { rej(err2) } else { res() } }) } }) }) } async stat(args: { storePath: string }): Promise { const sftp = await this.getSFTP() try { const stat = await new Promise((res, rej) => { sftp.stat(this.storePath(args.storePath), (err, sftpStats) => { if ( err ) { return rej(err) } res(sftpStats) }) }) const jsonStream = sftp.createReadStream(this.metadataPath(args.storePath)) const json = await this.streamToString(jsonStream) const meta = JSON.parse(json) return { path: new UniversalPath(args.storePath, this), exists: true, sizeInBytes: stat.size, mimeType: meta.mimeType, tags: meta.tags, accessed: stat.atime, modified: stat.mtime, created: stat.ctime, isFile: stat.isFile(), isDirectory: stat.isDirectory(), } } catch (e) { return { path: new UniversalPath(args.storePath, this), exists: false, sizeInBytes: 0, tags: [], isFile: false, isDirectory: false, } } } async touch(args: { storePath: string }): Promise { const sftp = await this.getSFTP() return new Promise((res, rej) => { const storePath = this.storePath(args.storePath) const time = new Date() sftp.utimes(storePath, time, time, err => { if ( err ) { sftp.open(storePath, 'w', (err2, fd) => { if ( err2 ) { return rej(err2) } sftp.close(fd, err3 => { if ( err3 ) { return rej(err3) } res() }) }) } else { res() } }) }) } async getMetadata(storePath: string): Promise { try { const sftp = await this.getSFTP() return new Promise((res, rej) => { sftp.readFile(this.metadataPath(storePath), (err, buffer) => { if ( err ) { rej(err) } res(JSON.parse(buffer.toString('utf-8'))) }) }) } catch (e) { return { tags: [], } } } async setMetadata(storePath: string, meta: FileMetadata): Promise { const sftp = await this.getSFTP() const metaPath = this.metadataPath(storePath) await new Promise((res, rej) => { sftp.writeFile(metaPath, JSON.stringify(meta), (err: Maybe) => { if ( err ) { rej(err) } else { res() } }) }) } async close(): Promise { await this.sshClient?.end() } /** * Using the provided `hostConfig`, create and cache the SSH connection to the host. * If a connection already exists, re-use it. */ async getSSH(): Promise { if ( this.sshClient ) { return this.sshClient } return new Promise((res, rej) => { const client = new ssh2.Client() client.on('ready', () => { this.sshClient = client res(client) }).connect(this.baseConfig.ssh) client.on('error', rej) }) } /** * Using `getSSH()`, create a new SFTP helper based on that connection. */ async getSFTP(): Promise { const ssh = await this.getSSH() return new Promise((res, rej) => { ssh.sftp((err, sftp) => { if ( err ) { rej(err) } else { res(sftp) } }) }) } /** * Resolve the given store path to an absolute path on the remote filesystem. * @param storePath * @protected */ protected storePath(storePath: string): string { return path.join(this.baseConfig.baseDir, 'data', storePath) } /** * Resolve the given store path to an absolute path of a metadata file on the remote filesystem. * @param storePath * @protected */ protected metadataPath(storePath: string): string { return path.join(this.baseConfig.baseDir, 'meta', storePath + '.json') } /** * Given a readable stream, cast it to a string. * @param stream * @protected */ protected streamToString(stream: NodeJS.ReadableStream): Promise { const chunks: Buffer[] = [] return new Promise((resolve, reject) => { stream.on('data', (chunk) => chunks.push(Buffer.from(chunk))) stream.on('error', (err) => reject(err)) stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf8'))) }) } async list(storePath: string): Promise> { const sftp = await this.getSFTP() return new Promise>((res, rej) => { sftp.readdir(this.storePath(storePath), (error, files) => { if ( error ) { rej(error) } else { res(Collection.collect(files).map(x => x.filename)) } }) }) } }