You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
lib/src/util/support/path/SSHFilesystem.ts

291 lines
9.0 KiB

import {FileMetadata, Filesystem, Stat} from './Filesystem'
import * as ssh2 from 'ssh2'
import * as path from 'path'
import * as fs from 'fs'
import {Readable, Writable} from 'stream'
import {Collection} from '../../collection/Collection'
import {UniversalPath} from '../path'
/**
* A Filesystem implementation that stores files on remote hosts via SFTP/SSH.
*/
export class SSHFilesystem extends Filesystem {
private sshClient?: ssh2.Client
constructor(
protected readonly baseConfig: { ssh: ssh2.ConnectConfig, baseDir: string },
) {
super()
}
getPrefix(): string {
return `sftp+${this.baseConfig.ssh.host}://`
}
async getStoreFileAsTemp(args: { storePath: string }): Promise<UniversalPath> {
const temp = this.tempName()
const write = fs.createWriteStream(temp)
const read = await this.getStoreFileAsStream(args)
return new Promise<UniversalPath>((res, rej) => {
write.on('finish', () => {
res(new UniversalPath(temp))
})
write.on('error', rej)
read.on('error', rej)
read.pipe(write)
})
}
async getStoreFileAsStream(args: { storePath: string }): Promise<Readable> {
const sftp = await this.getSFTP()
return sftp.createReadStream(this.storePath(args.storePath))
}
async putLocalFile(args: { localPath: string; storePath: string; mimeType?: string; tags?: string[]; tag?: string }): Promise<void> {
const read = fs.createReadStream(args.localPath)
const write = await this.putStoreFileAsStream({storePath: args.storePath})
// write the metadata first
const sftp = await this.getSFTP()
await sftp.writeFile(this.metadataPath(args.storePath), JSON.stringify({
mimeType: args.mimeType,
tags: this.normalizeTags(args.tag, args.tags),
}))
// pipe the local file to the store
await new Promise<void>((res, rej) => {
write.on('finish', () => res())
write.on('error', rej)
read.on('error', rej)
read.pipe(write)
})
}
async putStoreFileAsStream(args: { storePath: string }): Promise<Writable> {
const sftp = await this.getSFTP()
return sftp.createWriteStream(this.storePath(args.storePath))
}
async mkdir(args: { storePath: string }): Promise<void> {
const sftp = await this.getSFTP()
await new Promise<void>((res, rej) => {
sftp.mkdir(this.storePath(args.storePath), err => {
if ( err ) {
rej(err)
} else {
res()
}
})
})
}
async remove(args: { storePath: string; recursive?: boolean }): Promise<void> {
const sftp = await this.getSFTP()
await new Promise<void>((res, rej) => {
sftp.unlink(this.storePath(args.storePath), err => {
if ( err ) {
return rej(err)
} else {
sftp.unlink(this.metadataPath(args.storePath), err2 => {
if ( err2 ) {
rej(err2)
} else {
res()
}
})
}
})
})
}
async stat(args: { storePath: string }): Promise<Stat> {
const sftp = await this.getSFTP()
try {
const stat = await new Promise<any>((res, rej) => {
sftp.stat(this.storePath(args.storePath), (err, sftpStats) => {
if ( err ) {
return rej(err)
}
res(sftpStats)
})
})
const jsonStream = sftp.createReadStream(this.metadataPath(args.storePath))
const json = await this.streamToString(jsonStream)
const meta = JSON.parse(json)
return {
path: new UniversalPath(args.storePath, this),
exists: true,
sizeInBytes: stat.size,
mimeType: meta.mimeType,
tags: meta.tags,
accessed: stat.atime,
modified: stat.mtime,
created: stat.ctime,
isFile: stat.isFile(),
isDirectory: stat.isDirectory(),
}
} catch (e) {
return {
path: new UniversalPath(args.storePath, this),
exists: false,
sizeInBytes: 0,
tags: [],
isFile: false,
isDirectory: false,
}
}
}
async touch(args: { storePath: string }): Promise<void> {
const sftp = await this.getSFTP()
return new Promise<void>((res, rej) => {
const storePath = this.storePath(args.storePath)
const time = new Date()
sftp.utimes(storePath, time, time, err => {
if ( err ) {
sftp.open(storePath, 'w', (err2, fd) => {
if ( err2 ) {
return rej(err2)
}
sftp.close(fd, err3 => {
if ( err3 ) {
return rej(err3)
}
res()
})
})
} else {
res()
}
})
})
}
async getMetadata(storePath: string): Promise<FileMetadata> {
try {
const sftp = await this.getSFTP()
return new Promise((res, rej) => {
sftp.readFile(this.metadataPath(storePath), (err, buffer) => {
if ( err ) {
rej(err)
}
res(JSON.parse(buffer.toString('utf-8')))
})
})
} catch (e) {
return {
tags: [],
}
}
}
async setMetadata(storePath: string, meta: FileMetadata): Promise<void> {
const sftp = await this.getSFTP()
const metaPath = this.metadataPath(storePath)
await new Promise<void>((res, rej) => {
sftp.writeFile(metaPath, JSON.stringify(meta), err => {
if ( err ) {
rej(err)
} else {
res()
}
})
})
}
async close(): Promise<void> {
await this.sshClient?.end()
}
/**
* Using the provided `hostConfig`, create and cache the SSH connection to the host.
* If a connection already exists, re-use it.
*/
async getSSH(): Promise<ssh2.Client> {
if ( this.sshClient ) {
return this.sshClient
}
return new Promise((res, rej) => {
const client = new ssh2.Client()
client.on('ready', () => {
this.sshClient = client
res(client)
}).connect(this.baseConfig.ssh)
client.on('error', rej)
})
}
/**
* Using `getSSH()`, create a new SFTP helper based on that connection.
*/
async getSFTP(): Promise<ssh2.SFTPWrapper> {
const ssh = await this.getSSH()
return new Promise((res, rej) => {
ssh.sftp((err, sftp) => {
if ( err ) {
rej(err)
} else {
res(sftp)
}
})
})
}
/**
* Resolve the given store path to an absolute path on the remote filesystem.
* @param storePath
* @protected
*/
protected storePath(storePath: string): string {
return path.join(this.baseConfig.baseDir, 'data', storePath)
}
/**
* Resolve the given store path to an absolute path of a metadata file on the remote filesystem.
* @param storePath
* @protected
*/
protected metadataPath(storePath: string): string {
return path.join(this.baseConfig.baseDir, 'meta', storePath + '.json')
}
/**
* Given a readable stream, cast it to a string.
* @param stream
* @protected
*/
protected streamToString(stream: NodeJS.ReadableStream): Promise<string> {
const chunks: Buffer[] = []
return new Promise((resolve, reject) => {
stream.on('data', (chunk) => chunks.push(Buffer.from(chunk)))
stream.on('error', (err) => reject(err))
stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf8')))
})
}
async list(storePath: string): Promise<Collection<string>> {
const sftp = await this.getSFTP()
return new Promise<Collection<string>>((res, rej) => {
sftp.readdir(this.storePath(storePath), (error, files) => {
if ( error ) {
rej(error)
} else {
res(Collection.collect(files).map(x => x.filename))
}
})
})
}
}