|
|
|
import {FileMetadata, Filesystem, Stat} from './Filesystem'
|
|
|
|
import * as ssh2 from 'ssh2'
|
|
|
|
import * as path from 'path'
|
|
|
|
import * as fs from 'fs'
|
|
|
|
import {Readable, Writable} from 'stream'
|
|
|
|
import {Collection} from '../../collection/Collection'
|
|
|
|
import {UniversalPath} from '../path'
|
|
|
|
import {Maybe} from '../types'
|
|
|
|
|
|
|
|
/**
|
|
|
|
* A Filesystem implementation that stores files on remote hosts via SFTP/SSH.
|
|
|
|
*/
|
|
|
|
export class SSHFilesystem extends Filesystem {
|
|
|
|
private sshClient?: ssh2.Client
|
|
|
|
|
|
|
|
constructor(
|
|
|
|
protected readonly baseConfig: { ssh: ssh2.ConnectConfig, baseDir: string },
|
|
|
|
) {
|
|
|
|
super()
|
|
|
|
}
|
|
|
|
|
|
|
|
getPrefix(): string {
|
|
|
|
return `sftp+${this.baseConfig.ssh.host}://`
|
|
|
|
}
|
|
|
|
|
|
|
|
async getStoreFileAsTemp(args: { storePath: string }): Promise<UniversalPath> {
|
|
|
|
const temp = this.tempName()
|
|
|
|
const write = fs.createWriteStream(temp)
|
|
|
|
const read = await this.getStoreFileAsStream(args)
|
|
|
|
|
|
|
|
return new Promise<UniversalPath>((res, rej) => {
|
|
|
|
write.on('finish', () => {
|
|
|
|
res(new UniversalPath(temp))
|
|
|
|
})
|
|
|
|
|
|
|
|
write.on('error', rej)
|
|
|
|
read.on('error', rej)
|
|
|
|
read.pipe(write)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
async getStoreFileAsStream(args: { storePath: string }): Promise<Readable> {
|
|
|
|
const sftp = await this.getSFTP()
|
|
|
|
return sftp.createReadStream(this.storePath(args.storePath))
|
|
|
|
}
|
|
|
|
|
|
|
|
async putLocalFile(args: { localPath: string; storePath: string; mimeType?: string; tags?: string[]; tag?: string }): Promise<void> {
|
|
|
|
const read = fs.createReadStream(args.localPath)
|
|
|
|
const write = await this.putStoreFileAsStream({storePath: args.storePath})
|
|
|
|
|
|
|
|
// write the metadata first
|
|
|
|
const sftp = await this.getSFTP()
|
|
|
|
await sftp.writeFile(this.metadataPath(args.storePath), JSON.stringify({
|
|
|
|
mimeType: args.mimeType,
|
|
|
|
tags: this.normalizeTags(args.tag, args.tags),
|
|
|
|
}))
|
|
|
|
|
|
|
|
// pipe the local file to the store
|
|
|
|
await new Promise<void>((res, rej) => {
|
|
|
|
write.on('finish', () => res())
|
|
|
|
write.on('error', rej)
|
|
|
|
read.on('error', rej)
|
|
|
|
read.pipe(write)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
async putStoreFileAsStream(args: { storePath: string }): Promise<Writable> {
|
|
|
|
const sftp = await this.getSFTP()
|
|
|
|
return sftp.createWriteStream(this.storePath(args.storePath))
|
|
|
|
}
|
|
|
|
|
|
|
|
async mkdir(args: { storePath: string }): Promise<void> {
|
|
|
|
const sftp = await this.getSFTP()
|
|
|
|
await new Promise<void>((res, rej) => {
|
|
|
|
sftp.mkdir(this.storePath(args.storePath), err => {
|
|
|
|
if ( err ) {
|
|
|
|
rej(err)
|
|
|
|
} else {
|
|
|
|
res()
|
|
|
|
}
|
|
|
|
})
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
async remove(args: { storePath: string; recursive?: boolean }): Promise<void> {
|
|
|
|
const sftp = await this.getSFTP()
|
|
|
|
|
|
|
|
await new Promise<void>((res, rej) => {
|
|
|
|
sftp.unlink(this.storePath(args.storePath), err => {
|
|
|
|
if ( err ) {
|
|
|
|
return rej(err)
|
|
|
|
} else {
|
|
|
|
sftp.unlink(this.metadataPath(args.storePath), err2 => {
|
|
|
|
if ( err2 ) {
|
|
|
|
rej(err2)
|
|
|
|
} else {
|
|
|
|
res()
|
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
})
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
async stat(args: { storePath: string }): Promise<Stat> {
|
|
|
|
const sftp = await this.getSFTP()
|
|
|
|
|
|
|
|
try {
|
|
|
|
const stat = await new Promise<any>((res, rej) => {
|
|
|
|
sftp.stat(this.storePath(args.storePath), (err, sftpStats) => {
|
|
|
|
if ( err ) {
|
|
|
|
return rej(err)
|
|
|
|
}
|
|
|
|
res(sftpStats)
|
|
|
|
})
|
|
|
|
})
|
|
|
|
|
|
|
|
const jsonStream = sftp.createReadStream(this.metadataPath(args.storePath))
|
|
|
|
const json = await this.streamToString(jsonStream)
|
|
|
|
const meta = JSON.parse(json)
|
|
|
|
|
|
|
|
return {
|
|
|
|
path: new UniversalPath(args.storePath, this),
|
|
|
|
exists: true,
|
|
|
|
sizeInBytes: stat.size,
|
|
|
|
mimeType: meta.mimeType,
|
|
|
|
tags: meta.tags,
|
|
|
|
accessed: stat.atime,
|
|
|
|
modified: stat.mtime,
|
|
|
|
created: stat.ctime,
|
|
|
|
isFile: stat.isFile(),
|
|
|
|
isDirectory: stat.isDirectory(),
|
|
|
|
}
|
|
|
|
} catch (e) {
|
|
|
|
return {
|
|
|
|
path: new UniversalPath(args.storePath, this),
|
|
|
|
exists: false,
|
|
|
|
sizeInBytes: 0,
|
|
|
|
tags: [],
|
|
|
|
isFile: false,
|
|
|
|
isDirectory: false,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async touch(args: { storePath: string }): Promise<void> {
|
|
|
|
const sftp = await this.getSFTP()
|
|
|
|
|
|
|
|
return new Promise<void>((res, rej) => {
|
|
|
|
const storePath = this.storePath(args.storePath)
|
|
|
|
const time = new Date()
|
|
|
|
sftp.utimes(storePath, time, time, err => {
|
|
|
|
if ( err ) {
|
|
|
|
sftp.open(storePath, 'w', (err2, fd) => {
|
|
|
|
if ( err2 ) {
|
|
|
|
return rej(err2)
|
|
|
|
}
|
|
|
|
sftp.close(fd, err3 => {
|
|
|
|
if ( err3 ) {
|
|
|
|
return rej(err3)
|
|
|
|
}
|
|
|
|
res()
|
|
|
|
})
|
|
|
|
})
|
|
|
|
} else {
|
|
|
|
res()
|
|
|
|
}
|
|
|
|
})
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
async getMetadata(storePath: string): Promise<FileMetadata> {
|
|
|
|
try {
|
|
|
|
const sftp = await this.getSFTP()
|
|
|
|
return new Promise((res, rej) => {
|
|
|
|
sftp.readFile(this.metadataPath(storePath), (err, buffer) => {
|
|
|
|
if ( err ) {
|
|
|
|
rej(err)
|
|
|
|
}
|
|
|
|
res(JSON.parse(buffer.toString('utf-8')))
|
|
|
|
})
|
|
|
|
})
|
|
|
|
} catch (e) {
|
|
|
|
return {
|
|
|
|
tags: [],
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async setMetadata(storePath: string, meta: FileMetadata): Promise<void> {
|
|
|
|
const sftp = await this.getSFTP()
|
|
|
|
const metaPath = this.metadataPath(storePath)
|
|
|
|
await new Promise<void>((res, rej) => {
|
|
|
|
sftp.writeFile(metaPath, JSON.stringify(meta), (err: Maybe<Error>) => {
|
|
|
|
if ( err ) {
|
|
|
|
rej(err)
|
|
|
|
} else {
|
|
|
|
res()
|
|
|
|
}
|
|
|
|
})
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
async close(): Promise<void> {
|
|
|
|
await this.sshClient?.end()
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Using the provided `hostConfig`, create and cache the SSH connection to the host.
|
|
|
|
* If a connection already exists, re-use it.
|
|
|
|
*/
|
|
|
|
async getSSH(): Promise<ssh2.Client> {
|
|
|
|
if ( this.sshClient ) {
|
|
|
|
return this.sshClient
|
|
|
|
}
|
|
|
|
|
|
|
|
return new Promise((res, rej) => {
|
|
|
|
const client = new ssh2.Client()
|
|
|
|
|
|
|
|
client.on('ready', () => {
|
|
|
|
this.sshClient = client
|
|
|
|
res(client)
|
|
|
|
}).connect(this.baseConfig.ssh)
|
|
|
|
|
|
|
|
client.on('error', rej)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Using `getSSH()`, create a new SFTP helper based on that connection.
|
|
|
|
*/
|
|
|
|
async getSFTP(): Promise<ssh2.SFTPWrapper> {
|
|
|
|
const ssh = await this.getSSH()
|
|
|
|
|
|
|
|
return new Promise((res, rej) => {
|
|
|
|
ssh.sftp((err, sftp) => {
|
|
|
|
if ( err ) {
|
|
|
|
rej(err)
|
|
|
|
} else {
|
|
|
|
res(sftp)
|
|
|
|
}
|
|
|
|
})
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Resolve the given store path to an absolute path on the remote filesystem.
|
|
|
|
* @param storePath
|
|
|
|
* @protected
|
|
|
|
*/
|
|
|
|
protected storePath(storePath: string): string {
|
|
|
|
return path.join(this.baseConfig.baseDir, 'data', storePath)
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Resolve the given store path to an absolute path of a metadata file on the remote filesystem.
|
|
|
|
* @param storePath
|
|
|
|
* @protected
|
|
|
|
*/
|
|
|
|
protected metadataPath(storePath: string): string {
|
|
|
|
return path.join(this.baseConfig.baseDir, 'meta', storePath + '.json')
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Given a readable stream, cast it to a string.
|
|
|
|
* @param stream
|
|
|
|
* @protected
|
|
|
|
*/
|
|
|
|
protected streamToString(stream: NodeJS.ReadableStream): Promise<string> {
|
|
|
|
const chunks: Buffer[] = []
|
|
|
|
return new Promise((resolve, reject) => {
|
|
|
|
stream.on('data', (chunk) => chunks.push(Buffer.from(chunk)))
|
|
|
|
stream.on('error', (err) => reject(err))
|
|
|
|
stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf8')))
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
async list(storePath: string): Promise<Collection<string>> {
|
|
|
|
const sftp = await this.getSFTP()
|
|
|
|
|
|
|
|
return new Promise<Collection<string>>((res, rej) => {
|
|
|
|
sftp.readdir(this.storePath(storePath), (error, files) => {
|
|
|
|
if ( error ) {
|
|
|
|
rej(error)
|
|
|
|
} else {
|
|
|
|
res(Collection.collect(files).map(x => x.filename))
|
|
|
|
}
|
|
|
|
})
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|