mirror of
https://github.com/gristlabs/grist-core.git
synced 2024-10-27 20:44:07 +00:00
165 lines
5.9 KiB
TypeScript
165 lines
5.9 KiB
TypeScript
|
import {ApiError} from 'app/common/ApiError';
|
||
|
import {ObjMetadata, ObjSnapshotWithMetadata, toExternalMetadata, toGristMetadata} from 'app/common/DocSnapshot';
|
||
|
import {ExternalStorage} from 'app/server/lib/ExternalStorage';
|
||
|
import {IncomingMessage} from 'http';
|
||
|
import * as fse from 'fs-extra';
|
||
|
import * as minio from 'minio';
|
||
|
|
||
|
// The minio typings appear to be quite stale. Extend them here to avoid
|
||
|
// lots of casts to any.
|
||
|
type MinIOClient = minio.Client & {
|
||
|
statObject(bucket: string, key: string, options: {versionId?: string}): Promise<MinIOBucketItemStat>;
|
||
|
getObject(bucket: string, key: string, options: {versionId?: string}): Promise<IncomingMessage>;
|
||
|
listObjects(bucket: string, key: string, recursive: boolean,
|
||
|
options: {IncludeVersion?: boolean}): minio.BucketStream<minio.BucketItem>;
|
||
|
removeObjects(bucket: string, versions: Array<{name: string, versionId: string}>): Promise<void>;
|
||
|
};
|
||
|
|
||
|
type MinIOBucketItemStat = minio.BucketItemStat & {
|
||
|
versionId?: string;
|
||
|
metaData?: Record<string, string>;
|
||
|
};
|
||
|
|
||
|
/**
|
||
|
* An external store implemented using the MinIO client, which
|
||
|
* will work with MinIO and other S3-compatible storage.
|
||
|
*/
|
||
|
export class MinIOExternalStorage implements ExternalStorage {
|
||
|
private _s3: MinIOClient;
|
||
|
|
||
|
// Specify bucket to use, and optionally the max number of keys to request
|
||
|
// in any call to listObjectVersions (used for testing)
|
||
|
constructor(public bucket: string, public options: {
|
||
|
endPoint: string,
|
||
|
port?: number,
|
||
|
useSSL?: boolean,
|
||
|
accessKey: string,
|
||
|
secretKey: string,
|
||
|
}, private _batchSize?: number) {
|
||
|
this._s3 = new minio.Client(options) as MinIOClient;
|
||
|
}
|
||
|
|
||
|
public async exists(key: string, snapshotId?: string) {
|
||
|
return Boolean(await this.head(key, snapshotId));
|
||
|
}
|
||
|
|
||
|
public async head(key: string, snapshotId?: string): Promise<ObjSnapshotWithMetadata|null> {
|
||
|
try {
|
||
|
const head = await this._s3.statObject(
|
||
|
this.bucket, key,
|
||
|
snapshotId ? {versionId: snapshotId} : {},
|
||
|
);
|
||
|
if (!head.lastModified || !head.versionId) {
|
||
|
// AWS documentation says these fields will be present.
|
||
|
throw new Error('MinIOExternalStorage.head did not get expected fields');
|
||
|
}
|
||
|
return {
|
||
|
lastModified: head.lastModified.toISOString(),
|
||
|
snapshotId: head.versionId,
|
||
|
...head.metaData && { metadata: toGristMetadata(head.metaData) },
|
||
|
};
|
||
|
} catch (err) {
|
||
|
if (!this.isFatalError(err)) { return null; }
|
||
|
throw err;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
public async upload(key: string, fname: string, metadata?: ObjMetadata) {
|
||
|
const stream = fse.createReadStream(fname);
|
||
|
const result = await this._s3.putObject(
|
||
|
this.bucket, key, stream,
|
||
|
metadata ? {Metadata: toExternalMetadata(metadata)} : undefined
|
||
|
);
|
||
|
// Empirically VersionId is available in result for buckets with versioning enabled.
|
||
|
return result.versionId || null;
|
||
|
}
|
||
|
|
||
|
public async download(key: string, fname: string, snapshotId?: string) {
|
||
|
const stream = fse.createWriteStream(fname);
|
||
|
const request = await this._s3.getObject(
|
||
|
this.bucket, key,
|
||
|
snapshotId ? {versionId: snapshotId} : {}
|
||
|
);
|
||
|
const statusCode = request.statusCode || 500;
|
||
|
if (statusCode >= 300) {
|
||
|
throw new ApiError('download error', statusCode);
|
||
|
}
|
||
|
// See https://docs.aws.amazon.com/sdk-for-javascript/v2/developer-guide/requests-using-stream-objects.html
|
||
|
// for an example of streaming data.
|
||
|
const headers = request.headers;
|
||
|
// For a versioned bucket, the header 'x-amz-version-id' contains a version id.
|
||
|
const downloadedSnapshotId = String(headers['x-amz-version-id'] || '');
|
||
|
return new Promise<string>((resolve, reject) => {
|
||
|
request
|
||
|
.on('error', reject) // handle errors on the read stream
|
||
|
.pipe(stream)
|
||
|
.on('error', reject) // handle errors on the write stream
|
||
|
.on('finish', () => resolve(downloadedSnapshotId));
|
||
|
});
|
||
|
}
|
||
|
|
||
|
public async remove(key: string, snapshotIds?: string[]) {
|
||
|
if (snapshotIds) {
|
||
|
await this._deleteBatch(key, snapshotIds);
|
||
|
} else {
|
||
|
await this._deleteAllVersions(key);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
public async versions(key: string, options?: { includeDeleteMarkers?: boolean }) {
|
||
|
const results: minio.BucketItem[] = [];
|
||
|
await new Promise((resolve, reject) => {
|
||
|
const stream = this._s3.listObjects(this.bucket, key, false, {IncludeVersion: true});
|
||
|
stream
|
||
|
.on('error', reject)
|
||
|
.on('end', () => {
|
||
|
resolve(results);
|
||
|
})
|
||
|
.on('data', data => {
|
||
|
results.push(data);
|
||
|
});
|
||
|
});
|
||
|
return results
|
||
|
.filter(v => v.name === key &&
|
||
|
v.lastModified && (v as any).versionId &&
|
||
|
(options?.includeDeleteMarkers || !(v as any).isDeleteMarker))
|
||
|
.map(v => ({
|
||
|
lastModified: v.lastModified.toISOString(),
|
||
|
snapshotId: (v as any).versionId!,
|
||
|
}));
|
||
|
}
|
||
|
|
||
|
public url(key: string) {
|
||
|
return `minio://${this.bucket}/${key}`;
|
||
|
}
|
||
|
|
||
|
public isFatalError(err: any) {
|
||
|
return err.code !== 'NotFound' && err.code !== 'NoSuchKey';
|
||
|
}
|
||
|
|
||
|
public async close() {
|
||
|
// nothing to do
|
||
|
}
|
||
|
|
||
|
// Delete all versions of an object.
|
||
|
public async _deleteAllVersions(key: string) {
|
||
|
const vs = await this.versions(key, {includeDeleteMarkers: true});
|
||
|
await this._deleteBatch(key, vs.map(v => v.snapshotId));
|
||
|
}
|
||
|
|
||
|
// Delete a batch of versions for an object.
|
||
|
private async _deleteBatch(key: string, versions: Array<string | undefined>) {
|
||
|
// Max number of keys per request for AWS S3 is 1000, see:
|
||
|
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
|
||
|
// Stick to this maximum in case we are using this client to talk to AWS.
|
||
|
const N = this._batchSize || 1000;
|
||
|
for (let i = 0; i < versions.length; i += N) {
|
||
|
const iVersions = versions.slice(i, i + N).filter(v => v) as string[];
|
||
|
if (iVersions.length === 0) { continue; }
|
||
|
await this._s3.removeObjects(this.bucket, iVersions.map(versionId => {
|
||
|
return { name: key, versionId };
|
||
|
}));
|
||
|
}
|
||
|
}
|
||
|
}
|