mirror of
https://github.com/gristlabs/grist-core.git
synced 2024-10-27 20:44:07 +00:00
(core) Remove expired attachments every hour and on shutdown
Summary: Call ActiveDoc.removeUnusedAttachments every hour using setInterval, and in ActiveDoc.shutdown (which also clears said interval). Unrelated: small fix to my webhooks code which was creating a redis client on shutdown just to quit it. Test Plan: Tweaked DocApi test to remove expired attachments by force-reloading the doc, so that it removes them during shutdown. Extracted a new testing endpoint /verifyFiles to support this test (previously running that code only happened with `/removeUnused?verifyfiles=1`). Tested the setInterval part manually. Reviewers: paulfitz, dsagal Reviewed By: paulfitz Subscribers: dsagal Differential Revision: https://phab.getgrist.com/D3387
This commit is contained in:
parent
890c550fc3
commit
a701b4bf13
@ -115,6 +115,9 @@ const ACTIVEDOC_TIMEOUT = (process.env.NODE_ENV === 'production') ? 30 : 5;
|
||||
// We'll wait this long between re-measuring sandbox memory.
|
||||
const MEMORY_MEASUREMENT_INTERVAL_MS = 60 * 1000;
|
||||
|
||||
// Cleanup expired attachments every hour (also happens when shutting down)
|
||||
const REMOVE_UNUSED_ATTACHMENTS_INTERVAL_MS = 60 * 60 * 1000;
|
||||
|
||||
// A hook for dependency injection.
|
||||
export const Deps = {ACTIVEDOC_TIMEOUT};
|
||||
|
||||
@ -179,6 +182,12 @@ export class ActiveDoc extends EventEmitter {
|
||||
private _recoveryMode: boolean = false;
|
||||
private _shuttingDown: boolean = false;
|
||||
|
||||
// Cleanup expired attachments every hour (also happens when shutting down)
|
||||
private _removeUnusedAttachmentsInterval = setInterval(
|
||||
() => this.removeUnusedAttachments(true),
|
||||
REMOVE_UNUSED_ATTACHMENTS_INTERVAL_MS,
|
||||
);
|
||||
|
||||
constructor(docManager: DocManager, docName: string, private _options?: ICreateActiveDocOptions) {
|
||||
super();
|
||||
if (_options?.safeMode) { this._recoveryMode = true; }
|
||||
@ -389,6 +398,16 @@ export class ActiveDoc extends EventEmitter {
|
||||
// Clear the MapWithTTL to remove all timers from the event loop.
|
||||
this._fetchCache.clear();
|
||||
|
||||
clearInterval(this._removeUnusedAttachmentsInterval);
|
||||
try {
|
||||
// Remove expired attachments, i.e. attachments that were soft deleted a while ago.
|
||||
// This needs to happen periodically, and doing it here means we can guarantee that it happens even if
|
||||
// the doc is only ever opened briefly, without having to slow down startup.
|
||||
await this.removeUnusedAttachments(true);
|
||||
} catch (e) {
|
||||
this._log.error(docSession, "Failed to remove expired attachments", e);
|
||||
}
|
||||
|
||||
try {
|
||||
await this._docManager.storageManager.closeDocument(this.docName);
|
||||
} catch (err) {
|
||||
|
@ -241,13 +241,21 @@ export class DocWorkerApi {
|
||||
const verifyFiles = isAffirmative(req.query.verifyfiles);
|
||||
await activeDoc.removeUnusedAttachments(expiredOnly);
|
||||
if (verifyFiles) {
|
||||
assert.deepStrictEqual(
|
||||
await activeDoc.docStorage.all(`SELECT DISTINCT fileIdent AS ident FROM _grist_Attachments ORDER BY ident`),
|
||||
await activeDoc.docStorage.all(`SELECT ident FROM _gristsys_Files ORDER BY ident`),
|
||||
);
|
||||
await verifyAttachmentFiles(activeDoc);
|
||||
}
|
||||
res.json(null);
|
||||
}));
|
||||
this._app.post('/api/docs/:docId/attachments/verifyFiles', isOwner, withDoc(async (activeDoc, req, res) => {
|
||||
await verifyAttachmentFiles(activeDoc);
|
||||
res.json(null);
|
||||
}));
|
||||
|
||||
async function verifyAttachmentFiles(activeDoc: ActiveDoc) {
|
||||
assert.deepStrictEqual(
|
||||
await activeDoc.docStorage.all(`SELECT DISTINCT fileIdent AS ident FROM _grist_Attachments ORDER BY ident`),
|
||||
await activeDoc.docStorage.all(`SELECT ident FROM _gristsys_Files ORDER BY ident`),
|
||||
);
|
||||
}
|
||||
|
||||
// Adds records given in a column oriented format,
|
||||
// returns an array of row IDs
|
||||
|
@ -22,6 +22,7 @@ import * as log from 'app/server/lib/log';
|
||||
import * as assert from 'assert';
|
||||
import * as bluebird from 'bluebird';
|
||||
import * as fse from 'fs-extra';
|
||||
import {RunResult} from 'sqlite3';
|
||||
import * as _ from 'underscore';
|
||||
import * as util from 'util';
|
||||
import * as uuidv4 from "uuid/v4";
|
||||
@ -1037,7 +1038,7 @@ export class DocStorage implements ISQLiteDB, OnDemandStorage {
|
||||
* @param {String} rowId - Row ID.
|
||||
* @returns {Promise} - A promise for the SQL execution.
|
||||
*/
|
||||
public _process_RemoveRecord(tableId: string, rowId: string): Promise<void> {
|
||||
public _process_RemoveRecord(tableId: string, rowId: string): Promise<RunResult> {
|
||||
const sql = "DELETE FROM " + quoteIdent(tableId) + " WHERE id=?";
|
||||
debuglog("RemoveRecord SQL: " + sql, [rowId]);
|
||||
return this.run(sql, [rowId]);
|
||||
@ -1060,8 +1061,8 @@ export class DocStorage implements ISQLiteDB, OnDemandStorage {
|
||||
* @param {Array[Integer]} rowIds - Array of row IDs to be deleted.
|
||||
* @returns {Promise} - Promise for SQL execution.
|
||||
*/
|
||||
public _process_BulkRemoveRecord(tableId: string, rowIds: number[]): Promise<void> {
|
||||
if (rowIds.length === 0) { return Promise.resolve(); }// If we have nothing to remove, done.
|
||||
public async _process_BulkRemoveRecord(tableId: string, rowIds: number[]): Promise<void> {
|
||||
if (rowIds.length === 0) { return; }// If we have nothing to remove, done.
|
||||
|
||||
const chunkSize = 10;
|
||||
const preSql = "DELETE FROM " + quoteIdent(tableId) + " WHERE id IN (";
|
||||
@ -1071,12 +1072,10 @@ export class DocStorage implements ISQLiteDB, OnDemandStorage {
|
||||
const numChunks = Math.floor(rowIds.length / chunkSize);
|
||||
const numLeftovers = rowIds.length % chunkSize;
|
||||
|
||||
let chunkPromise;
|
||||
|
||||
if (numChunks > 0) {
|
||||
debuglog("DocStorage.BulkRemoveRecord: splitting " + rowIds.length +
|
||||
" deletes into chunks of size " + chunkSize);
|
||||
chunkPromise = this.prepare(preSql + chunkParams + postSql)
|
||||
await this.prepare(preSql + chunkParams + postSql)
|
||||
.then(function(stmt) {
|
||||
return bluebird.Promise.each(_.range(0, numChunks * chunkSize, chunkSize), function(index: number) {
|
||||
debuglog("DocStorage.BulkRemoveRecord: chunk delete " + index + "-" + (index + chunkSize - 1));
|
||||
@ -1086,18 +1085,14 @@ export class DocStorage implements ISQLiteDB, OnDemandStorage {
|
||||
return bluebird.Promise.fromCallback((cb: any) => stmt.finalize(cb));
|
||||
});
|
||||
});
|
||||
} else {
|
||||
chunkPromise = Promise.resolve();
|
||||
}
|
||||
|
||||
return chunkPromise.then(() => {
|
||||
if (numLeftovers > 0) {
|
||||
debuglog("DocStorage.BulkRemoveRecord: leftover delete " + (numChunks * chunkSize) + "-" + (rowIds.length - 1));
|
||||
const leftoverParams = _.range(numLeftovers).map(q).join(',');
|
||||
return this.run(preSql + leftoverParams + postSql,
|
||||
rowIds.slice(numChunks * chunkSize, rowIds.length));
|
||||
}
|
||||
});
|
||||
if (numLeftovers > 0) {
|
||||
debuglog("DocStorage.BulkRemoveRecord: leftover delete " + (numChunks * chunkSize) + "-" + (rowIds.length - 1));
|
||||
const leftoverParams = _.range(numLeftovers).map(q).join(',');
|
||||
await this.run(preSql + leftoverParams + postSql,
|
||||
rowIds.slice(numChunks * chunkSize, rowIds.length));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1333,7 +1328,7 @@ export class DocStorage implements ISQLiteDB, OnDemandStorage {
|
||||
* Delete attachments from _gristsys_Files that have no matching metadata row in _grist_Attachments.
|
||||
*/
|
||||
public async removeUnusedAttachments() {
|
||||
await this.run(`
|
||||
const result = await this._getDB().run(`
|
||||
DELETE FROM _gristsys_Files
|
||||
WHERE ident IN (
|
||||
SELECT ident
|
||||
@ -1343,13 +1338,16 @@ export class DocStorage implements ISQLiteDB, OnDemandStorage {
|
||||
WHERE fileIdent IS NULL
|
||||
)
|
||||
`);
|
||||
if (result.changes > 0) {
|
||||
await this._markAsChanged(Promise.resolve());
|
||||
}
|
||||
}
|
||||
|
||||
public all(sql: string, ...args: any[]): Promise<ResultRow[]> {
|
||||
return this._getDB().all(sql, ...args);
|
||||
}
|
||||
|
||||
public run(sql: string, ...args: any[]): Promise<void> {
|
||||
public run(sql: string, ...args: any[]): Promise<RunResult> {
|
||||
return this._markAsChanged(this._getDB().run(sql, ...args));
|
||||
}
|
||||
|
||||
@ -1393,17 +1391,17 @@ export class DocStorage implements ISQLiteDB, OnDemandStorage {
|
||||
return typeof row !== 'undefined';
|
||||
}
|
||||
|
||||
public setPluginDataItem(pluginId: string, key: string, value: string): Promise<void> {
|
||||
return this.run('INSERT OR REPLACE into _gristsys_PluginData (pluginId, key, value) values (?, ?, ?)',
|
||||
public async setPluginDataItem(pluginId: string, key: string, value: string): Promise<void> {
|
||||
await this.run('INSERT OR REPLACE into _gristsys_PluginData (pluginId, key, value) values (?, ?, ?)',
|
||||
pluginId, key, value);
|
||||
}
|
||||
|
||||
public removePluginDataItem(pluginId: string, key: string): Promise<void> {
|
||||
return this.run('DELETE from _gristsys_PluginData where pluginId = ? and key = ?', pluginId, key);
|
||||
public async removePluginDataItem(pluginId: string, key: string): Promise<void> {
|
||||
await this.run('DELETE from _gristsys_PluginData where pluginId = ? and key = ?', pluginId, key);
|
||||
}
|
||||
|
||||
public clearPluginDataItem(pluginId: string): Promise<void> {
|
||||
return this.run('DELETE from _gristsys_PluginData where pluginId = ?', pluginId);
|
||||
public async clearPluginDataItem(pluginId: string): Promise<void> {
|
||||
await this.run('DELETE from _gristsys_PluginData where pluginId = ?', pluginId);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1486,9 +1484,9 @@ export class DocStorage implements ISQLiteDB, OnDemandStorage {
|
||||
/**
|
||||
* Internal helper for applying Bulk Update or Add Record sql
|
||||
*/
|
||||
private _applyMaybeBulkUpdateOrAddSql(sql: string, sqlParams: any[]): Promise<void> {
|
||||
private async _applyMaybeBulkUpdateOrAddSql(sql: string, sqlParams: any[]): Promise<void> {
|
||||
if (sqlParams.length === 1) {
|
||||
return this.run(sql, sqlParams[0]);
|
||||
await this.run(sql, sqlParams[0]);
|
||||
} else {
|
||||
return this.prepare(sql)
|
||||
.then(function(stmt) {
|
||||
|
@ -26,7 +26,7 @@ export interface ExternalStorage {
|
||||
head(key: string, snapshotId?: string): Promise<ObjSnapshotWithMetadata|null>;
|
||||
|
||||
// Upload content from file to the given key. Returns a snapshotId if store supports that.
|
||||
upload(key: string, fname: string, metadata?: ObjMetadata): Promise<string|null>;
|
||||
upload(key: string, fname: string, metadata?: ObjMetadata): Promise<string|null|typeof Unchanged>;
|
||||
|
||||
// Download content from key to given file. Can download a specific version of the key
|
||||
// if store supports that (should throw a fatal exception if not).
|
||||
@ -162,11 +162,11 @@ export class ChecksummedExternalStorage implements ExternalStorage {
|
||||
const snapshotId = await this._options.latestVersion.load(key);
|
||||
log.info("ext %s upload: %s unchanged, not sending (checksum %s, version %s)", this.label, key,
|
||||
checksum, snapshotId);
|
||||
return snapshotId;
|
||||
return Unchanged;
|
||||
}
|
||||
const snapshotId = await this._ext.upload(key, fname, metadata);
|
||||
log.info("ext %s upload: %s checksum %s version %s", this.label, this._ext.url(key), checksum, snapshotId);
|
||||
if (snapshotId) { await this._options.latestVersion.save(key, snapshotId); }
|
||||
if (typeof snapshotId === "string") { await this._options.latestVersion.save(key, snapshotId); }
|
||||
await this._options.localHash.save(key, checksum);
|
||||
await this._options.sharedHash.save(key, checksum);
|
||||
return snapshotId;
|
||||
@ -364,3 +364,5 @@ export interface PropStorage {
|
||||
save(key: string, val: string): Promise<void>;
|
||||
load(key: string): Promise<string|null>;
|
||||
}
|
||||
|
||||
export const Unchanged = Symbol('Unchanged');
|
||||
|
@ -10,7 +10,7 @@ import {HomeDBManager} from 'app/gen-server/lib/HomeDBManager';
|
||||
import {checksumFile} from 'app/server/lib/checksumFile';
|
||||
import {DocSnapshotInventory, DocSnapshotPruner} from 'app/server/lib/DocSnapshots';
|
||||
import {IDocWorkerMap} from 'app/server/lib/DocWorkerMap';
|
||||
import {ChecksummedExternalStorage, DELETED_TOKEN, ExternalStorage} from 'app/server/lib/ExternalStorage';
|
||||
import {ChecksummedExternalStorage, DELETED_TOKEN, ExternalStorage, Unchanged} from 'app/server/lib/ExternalStorage';
|
||||
import {HostedMetadataManager} from 'app/server/lib/HostedMetadataManager';
|
||||
import {ICreate} from 'app/server/lib/ICreate';
|
||||
import {IDocStorageManager} from 'app/server/lib/IDocStorageManager';
|
||||
@ -707,6 +707,10 @@ export class HostedStorageManager implements IDocStorageManager {
|
||||
};
|
||||
const prevSnapshotId = this._latestVersions.get(docId) || null;
|
||||
const newSnapshotId = await this._ext.upload(docId, tmpPath, metadata);
|
||||
if (newSnapshotId === Unchanged) {
|
||||
// Nothing uploaded because nothing changed
|
||||
return;
|
||||
}
|
||||
if (!newSnapshotId) {
|
||||
// This is unexpected.
|
||||
throw new Error('No snapshotId allocated after upload');
|
||||
|
@ -77,6 +77,7 @@ import * as sqlite3 from '@gristlabs/sqlite3';
|
||||
import * as assert from 'assert';
|
||||
import {each} from 'bluebird';
|
||||
import * as fse from 'fs-extra';
|
||||
import {RunResult} from 'sqlite3';
|
||||
import fromPairs = require('lodash/fromPairs');
|
||||
import isEqual = require('lodash/isEqual');
|
||||
import noop = require('lodash/noop');
|
||||
@ -132,7 +133,7 @@ export interface MigrationHooks {
|
||||
*/
|
||||
export interface ISQLiteDB {
|
||||
exec(sql: string): Promise<void>;
|
||||
run(sql: string, ...params: any[]): Promise<void>;
|
||||
run(sql: string, ...params: any[]): Promise<RunResult>;
|
||||
get(sql: string, ...params: any[]): Promise<ResultRow|undefined>;
|
||||
all(sql: string, ...params: any[]): Promise<ResultRow[]>;
|
||||
prepare(sql: string, ...params: any[]): Promise<sqlite3.Statement>;
|
||||
@ -288,8 +289,17 @@ export class SQLiteDB implements ISQLiteDB {
|
||||
return fromCallback(cb => this._db.exec(sql, cb));
|
||||
}
|
||||
|
||||
public run(sql: string, ...params: any[]): Promise<void> {
|
||||
return fromCallback(cb => this._db.run(sql, ...params, cb));
|
||||
public run(sql: string, ...params: any[]): Promise<RunResult> {
|
||||
return new Promise((resolve, reject) => {
|
||||
function callback(this: RunResult, err: Error | null) {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
resolve(this);
|
||||
}
|
||||
}
|
||||
this._db.run(sql, ...params, callback);
|
||||
});
|
||||
}
|
||||
|
||||
public get(sql: string, ...params: any[]): Promise<ResultRow|undefined> {
|
||||
@ -339,11 +349,19 @@ export class SQLiteDB implements ISQLiteDB {
|
||||
* to db.run, e.g. [sqlString, [params...]].
|
||||
*/
|
||||
public runEach(...statements: Array<string | [string, any[]]>): Promise<void> {
|
||||
return each(statements, (stmt: any) => {
|
||||
return (Array.isArray(stmt) ? this.run(stmt[0], ...stmt[1]) :
|
||||
this.exec(stmt))
|
||||
.catch(err => { log.warn(`SQLiteDB: Failed to run ${stmt}`); throw err; });
|
||||
});
|
||||
return each(statements,
|
||||
async (stmt: any) => {
|
||||
try {
|
||||
return await (Array.isArray(stmt) ?
|
||||
this.run(stmt[0], ...stmt[1]) :
|
||||
this.exec(stmt)
|
||||
);
|
||||
} catch (err) {
|
||||
log.warn(`SQLiteDB: Failed to run ${stmt}`);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
public close(): Promise<void> {
|
||||
@ -356,16 +374,9 @@ export class SQLiteDB implements ISQLiteDB {
|
||||
* is sqlite's rowid for the last insert made on this database connection. This method
|
||||
* is only useful if the sql is actually an INSERT operation, but we don't check this.
|
||||
*/
|
||||
public runAndGetId(sql: string, ...params: any[]): Promise<number> {
|
||||
return new Promise<number>((resolve, reject) => {
|
||||
this._db.run(sql, ...params, function(this: any, err: any) {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
resolve(this.lastID);
|
||||
}
|
||||
});
|
||||
});
|
||||
public async runAndGetId(sql: string, ...params: any[]): Promise<number> {
|
||||
const result = await this.run(sql, ...params);
|
||||
return result.lastID;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -116,7 +116,7 @@ export class DocTriggers {
|
||||
public shutdown() {
|
||||
this._shuttingDown = true;
|
||||
if (!this._sending) {
|
||||
this._redisClient?.quitAsync();
|
||||
this._redisClientField?.quitAsync();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -699,6 +699,12 @@ def migration17(tdset):
|
||||
actions.ModifyColumn(tables_map[c.parentId].tableId, c.colId, {'type': 'Attachments'})
|
||||
for c in affected_cols
|
||||
)
|
||||
# Update the types in the metadata tables
|
||||
doc_actions.append(actions.BulkUpdateRecord(
|
||||
'_grist_Tables_column',
|
||||
[c.id for c in affected_cols],
|
||||
{'type': ['Attachments' for c in affected_cols]}
|
||||
))
|
||||
# Update the values to lists
|
||||
for c in affected_cols:
|
||||
if c.isFormula:
|
||||
@ -710,12 +716,6 @@ def migration17(tdset):
|
||||
actions.BulkUpdateRecord(table_id, table.row_ids,
|
||||
{c.colId: [conv(val) for val in table.columns[c.colId]]})
|
||||
)
|
||||
# Update the types in the metadata tables
|
||||
doc_actions.append(actions.BulkUpdateRecord(
|
||||
'_grist_Tables_column',
|
||||
[c.id for c in affected_cols],
|
||||
{'type': ['Attachments' for c in affected_cols]}
|
||||
))
|
||||
|
||||
return tdset.apply_doc_actions(doc_actions)
|
||||
|
||||
|
@ -1707,17 +1707,20 @@ function testDocApi() {
|
||||
assert.equal(resp.status, 200);
|
||||
await checkAttachmentIds([1, 2, 3]);
|
||||
|
||||
// Remove the expired attachment (1)
|
||||
// Remove the expired attachment (1) by force-reloading, so it removes it during shutdown.
|
||||
// It has a duplicate (3) that hasn't expired and thus isn't removed,
|
||||
// although they share the same fileIdent and row in _gristsys_Files.
|
||||
// So for now only the metadata is removed.
|
||||
resp = await axios.post(`${docUrl}/attachments/removeUnused?verifyfiles=1&expiredonly=1`, null, chimpy);
|
||||
resp = await axios.post(`${docUrl}/force-reload`, null, chimpy);
|
||||
assert.equal(resp.status, 200);
|
||||
await checkAttachmentIds([2, 3]);
|
||||
resp = await axios.post(`${docUrl}/attachments/verifyFiles`, null, chimpy);
|
||||
assert.equal(resp.status, 200);
|
||||
|
||||
// Remove the not expired attachments (2 and 3).
|
||||
// We didn't set a timeDeleted for 3, but it gets set automatically by updateUsedAttachments.
|
||||
resp = await axios.post(`${docUrl}/attachments/removeUnused?verifyfiles=1`, null, chimpy);
|
||||
assert.equal(resp.status, 200);
|
||||
await checkAttachmentIds([]);
|
||||
});
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user