(core) revamp snapshot inventory

Summary:
Deliberate changes:
 * save snapshots to s3 prior to migrations.
 * label migration snapshots in s3 metadata.
 * avoid pruning migration snapshots for a month.

Opportunistic changes:
 * Associate document timezone with snapshots, so pruning can respect timezones.
 * Associate actionHash/Num with snapshots.
 * Record time of last change in snapshots (rather than just s3 upload time, which could be a while later).

This ended up being a biggish change, because there was nowhere ideal to put tags (list of possibilities in diff).

Test Plan: added tests

Reviewers: dsagal

Reviewed By: dsagal

Differential Revision: https://phab.getgrist.com/D2646
This commit is contained in:
Paul Fitzpatrick
2020-10-30 12:53:23 -04:00
parent ce824aad34
commit 71519d9e5c
20 changed files with 699 additions and 246 deletions

View File

@@ -102,6 +102,14 @@ class DummyDocWorkerMap implements IDocWorkerMap {
this._elections.delete(name);
}
}
public async updateChecksum(family: string, key: string, checksum: string) {
// nothing to do
}
public async getChecksum(family: string, key: string) {
return null;
}
}
/**
@@ -367,7 +375,16 @@ export class DocWorkerMap implements IDocWorkerMap {
}
public async updateDocStatus(docId: string, checksum: string): Promise<void> {
await this._client.setexAsync(`doc-${docId}-checksum`, CHECKSUM_TTL_MSEC / 1000.0, checksum);
this.updateChecksum('doc', docId, checksum);
}
public async updateChecksum(family: string, key: string, checksum: string) {
await this._client.setexAsync(`${family}-${key}-checksum`, CHECKSUM_TTL_MSEC / 1000.0, checksum);
}
public async getChecksum(family: string, key: string) {
const checksum = await this._client.getAsync(`${family}-${key}-checksum`);
return checksum === 'null' ? null : checksum;
}
public async setPermit(permit: Permit): Promise<string> {

View File

@@ -4,7 +4,7 @@
// TypeORM Sqlite driver does not support using transactions in async code, if it is possible
// for two transactions to get called (one of the whole point of transactions). This
// patch adds support for that, based on a monkey patch published in:
// https://gist.github.com/keenondrums/556f8c61d752eff730841170cd2bc3f1
// https://gist.github.com/aigoncharov/556f8c61d752eff730841170cd2bc3f1
// Explanation at https://github.com/typeorm/typeorm/issues/1884#issuecomment-380767213
// Patch 2:
@@ -13,6 +13,7 @@
// changed during construction of a query.
import * as sqlite3 from '@gristlabs/sqlite3';
import {Mutex, MutexInterface} from 'async-mutex';
import isEqual = require('lodash/isEqual');
import {EntityManager, QueryRunner} from 'typeorm';
import {SqliteDriver} from 'typeorm/driver/sqlite/SqliteDriver';
@@ -27,73 +28,11 @@ import {QueryBuilder} from 'typeorm/query-builder/QueryBuilder';
* Patch 1
**********************/
type Releaser = () => void;
type Worker<T> = () => Promise<T>|T;
interface MutexInterface {
acquire(): Promise<Releaser>;
runExclusive<T>(callback: Worker<T>): Promise<T>;
isLocked(): boolean;
}
class Mutex implements MutexInterface {
private _queue: Array<(release: Releaser) => void> = [];
private _pending = false;
public isLocked(): boolean {
return this._pending;
}
public acquire(): Promise<Releaser> {
const ticket = new Promise<Releaser>(resolve => this._queue.push(resolve));
if (!this._pending) {
this._dispatchNext();
}
return ticket;
}
public runExclusive<T>(callback: Worker<T>): Promise<T> {
return this
.acquire()
.then(release => {
let result: T|Promise<T>;
try {
result = callback();
} catch (e) {
release();
throw(e);
}
return Promise
.resolve(result)
.then(
(x: T) => (release(), x),
e => {
release();
throw e;
}
);
}
);
}
private _dispatchNext(): void {
if (this._queue.length > 0) {
this._pending = true;
this._queue.shift()!(this._dispatchNext.bind(this));
} else {
this._pending = false;
}
}
}
// A singleton mutex for all sqlite transactions.
const mutex = new Mutex();
class SqliteQueryRunnerPatched extends SqliteQueryRunner {
private _releaseMutex: Releaser | null;
private _releaseMutex: MutexInterface.Releaser | null;
public async startTransaction(level?: any): Promise<void> {
this._releaseMutex = await mutex.acquire();