(core) make document reloading cleaner

Summary:
Currently when reloading a document, we may have two sqlite connections
to the document for a small period of time. This diff removes that
overlap.

Test Plan: added test

Reviewers: georgegevoian

Reviewed By: georgegevoian

Differential Revision: https://phab.getgrist.com/D3140
This commit is contained in:
Paul Fitzpatrick 2021-11-16 16:10:57 -05:00
parent 561e32fb44
commit c7331e2453
2 changed files with 154 additions and 139 deletions

View File

@ -307,16 +307,24 @@ export class ActiveDoc extends EventEmitter {
} }
/** /**
* Shut down the ActiveDoc, and (by default) remove it from the docManager. * Shut down the ActiveDoc, and remove it from the DocManager. An optional
* @returns {Promise} Promise for when database and data engine are done shutting down. * afterShutdown operation can be provided, which will be run once the ActiveDoc
* is completely shut down but before it is removed from the DocManager, ensuring
* that the operation will not overlap with a new ActiveDoc starting up for the
* same document.
*/ */
public async shutdown(removeThisActiveDoc: boolean = true): Promise<void> { public async shutdown(options: {
afterShutdown?: () => Promise<void>
} = {}): Promise<void> {
const docSession = makeExceptionalDocSession('system'); const docSession = makeExceptionalDocSession('system');
this._log.debug(docSession, "shutdown starting"); this._log.debug(docSession, "shutdown starting");
try {
this.setMuted();
this._inactivityTimer.disable(); this._inactivityTimer.disable();
if (this.docClients.clientCount() > 0) { if (this.docClients.clientCount() > 0) {
this._log.warn(docSession, `Doc being closed with ${this.docClients.clientCount()} clients left`); this._log.warn(docSession, `Doc being closed with ${this.docClients.clientCount()} clients left`);
await this.docClients.broadcastDocMessage(null, 'docShutdown', null); await this.docClients.broadcastDocMessage(null, 'docShutdown', null);
this.docClients.interruptAllClients();
this.docClients.removeAllClients(); this.docClients.removeAllClients();
} }
@ -325,7 +333,6 @@ export class ActiveDoc extends EventEmitter {
// Clear the MapWithTTL to remove all timers from the event loop. // Clear the MapWithTTL to remove all timers from the event loop.
this._fetchCache.clear(); this._fetchCache.clear();
if (removeThisActiveDoc) { await this._docManager.removeActiveDoc(this); }
try { try {
await this._docManager.storageManager.closeDocument(this.docName); await this._docManager.storageManager.closeDocument(this.docName);
} catch (err) { } catch (err) {
@ -354,10 +361,14 @@ export class ActiveDoc extends EventEmitter {
} catch (err) { } catch (err) {
// Initialization errors do not matter at this point. // Initialization errors do not matter at this point.
} }
this._log.debug(docSession, "shutdown complete");
} catch (err) { } catch (err) {
this._log.error(docSession, "failed to shutdown some resources", err); this._log.error(docSession, "failed to shutdown some resources", err);
} }
await options.afterShutdown?.();
} finally {
this._docManager.removeActiveDoc(this);
}
this._log.debug(docSession, "shutdown complete");
} }
/** /**
@ -450,32 +461,12 @@ export class ActiveDoc extends EventEmitter {
* DocManager. * DocManager.
*/ */
public async replace(source: DocReplacementOptions) { public async replace(source: DocReplacementOptions) {
// During replacement, it is important for all hands to be off the document. So: // During replacement, it is important for all hands to be off the document. So we
// - We set the "mute" flag. Setting this means that any operations in progress // ask the shutdown method to do the replacement when the ActiveDoc is shutdown but
// using this ActiveDoc should be ineffective (apart from the replacement). // before a new one could be opened.
// In other words, the operations shouldn't ultimately result in any changes in S3, return this.shutdown({
// and any related requests should result in a failure or be retried. TODO: afterShutdown: () => this._docManager.storageManager.replace(this.docName, source)
// review how well we do on meeting this goal. });
// - We close the ActiveDoc, retaining its listing in DocManager but shutting down
// all its component parts. We retain it in DocManager to delay another
// ActiveDoc being opened for the same document if someone is trying to operate
// on it.
// - We replace the document.
// - We remove the ActiveDoc from DocManager, opening the way for the document to be
// freshly opened.
// The "mute" flag is borrowed from worker shutdown. Note this scenario is a little
// different, since the worker is not withdrawing from service, so fresh work may get
// assigned to it at any time.
this.setMuted();
this.docClients.interruptAllClients();
try {
await this.shutdown(false);
await this._docManager.storageManager.replace(this.docName, source);
} finally {
// Whatever happened, success or failure, there is nothing further we can do
// with this ActiveDoc. Unlist it.
await this._docManager.removeActiveDoc(this);
}
} }
/** /**

View File

@ -280,13 +280,9 @@ export class DocManager extends EventEmitter {
// Fetch the document, and continue when we have the ActiveDoc (which may be immediately). // Fetch the document, and continue when we have the ActiveDoc (which may be immediately).
const docSessionPrecursor = makeOptDocSession(client); const docSessionPrecursor = makeOptDocSession(client);
docSessionPrecursor.authorizer = auth; docSessionPrecursor.authorizer = auth;
const activeDoc: ActiveDoc = await this.fetchDoc(docSessionPrecursor, docId);
if (activeDoc.muted) { return this._withUnmutedDoc(docSessionPrecursor, docId, async () => {
log.debug('DocManager.openDoc interrupting, called for a muted doc', docId); const activeDoc: ActiveDoc = await this.fetchDoc(docSessionPrecursor, docId);
client.interruptConnection();
throw new Error(`document ${docId} cannot be opened right now`);
}
// Get a fresh DocSession object. // Get a fresh DocSession object.
const docSession = activeDoc.addClient(client, auth); const docSession = activeDoc.addClient(client, auth);
@ -320,16 +316,21 @@ export class DocManager extends EventEmitter {
activeDoc.getRecentMinimalActions(docSession) activeDoc.getRecentMinimalActions(docSession)
]); ]);
this.emit('open-doc', this.storageManager.getPath(activeDoc.docName)); const result = {
return {
docFD: docSession.fd, docFD: docSession.fd,
clientId: docSession.client.clientId, clientId: docSession.client.clientId,
doc: metaTables, doc: metaTables,
log: recentActions, log: recentActions,
recoveryMode: activeDoc.recoveryMode, recoveryMode: activeDoc.recoveryMode,
userOverride: await activeDoc.getUserOverride(docSession), userOverride: await activeDoc.getUserOverride(docSession),
}; } as OpenLocalDocResult;
if (!activeDoc.muted) {
this.emit('open-doc', this.storageManager.getPath(activeDoc.docName));
}
return {activeDoc, result};
});
} }
/** /**
@ -353,7 +354,7 @@ export class DocManager extends EventEmitter {
return this._activeDocs.get(docName); return this._activeDocs.get(docName);
} }
public async removeActiveDoc(activeDoc: ActiveDoc): Promise<void> { public removeActiveDoc(activeDoc: ActiveDoc): void {
this._activeDocs.delete(activeDoc.docName); this._activeDocs.delete(activeDoc.docName);
} }
@ -395,37 +396,16 @@ export class DocManager extends EventEmitter {
} }
/** /**
* Fetches an ActiveDoc object. Used by openDoc. * Fetches an ActiveDoc object. Used by openDoc. If ActiveDoc is muted (for safe closing),
* wait for another.
*/ */
public async fetchDoc(docSession: OptDocSession, docName: string, public async fetchDoc(docSession: OptDocSession, docName: string,
wantRecoveryMode?: boolean): Promise<ActiveDoc> { wantRecoveryMode?: boolean): Promise<ActiveDoc> {
log.debug('DocManager.fetchDoc', docName); log.debug('DocManager.fetchDoc', docName);
// Repeat until we acquire an ActiveDoc that is not muted (shutting down). return this._withUnmutedDoc(docSession, docName, async () => {
for (;;) { const activeDoc = await this._fetchPossiblyMutedDoc(docSession, docName, wantRecoveryMode);
if (this._activeDocs.has(docName) && wantRecoveryMode !== undefined) { return {activeDoc, result: activeDoc};
const activeDoc = await this._activeDocs.get(docName);
if (activeDoc && activeDoc.recoveryMode !== wantRecoveryMode && await activeDoc.isOwner(docSession)) {
// shutting doc down to have a chance to re-open in the correct mode.
// TODO: there could be a battle with other users opening it in a different mode.
await activeDoc.shutdown();
}
}
if (!this._activeDocs.has(docName)) {
return mapSetOrClear(this._activeDocs, docName,
this._createActiveDoc(docSession, docName, wantRecoveryMode)
.then(newDoc => {
// Propagate backupMade events from newly opened activeDocs (consolidate all to DocMan)
newDoc.on('backupMade', (bakPath: string) => {
this.emit('backupMade', bakPath);
}); });
return newDoc.loadDoc(docSession);
}));
}
const activeDoc = await this._activeDocs.get(docName)!;
if (!activeDoc.muted) { return activeDoc; }
log.debug('DocManager.fetchDoc waiting because doc is muted', docName);
await bluebird.delay(1000);
}
} }
public makeAccessId(userId: number|null): string|null { public makeAccessId(userId: number|null): string|null {
@ -437,6 +417,50 @@ export class DocManager extends EventEmitter {
return userId === this._homeDbManager.getAnonymousUserId(); return userId === this._homeDbManager.getAnonymousUserId();
} }
/**
* Perform the supplied operation and return its result - unless the activeDoc it returns
* is found to be muted, in which case we retry.
*/
private async _withUnmutedDoc<T>(docSession: OptDocSession, docName: string,
op: () => Promise<{ result: T, activeDoc: ActiveDoc }>): Promise<T> {
// Repeat until we acquire an ActiveDoc that is not muted (shutting down).
for (;;) {
const { result, activeDoc } = await op();
if (!activeDoc.muted) { return result; }
log.debug('DocManager._withUnmutedDoc waiting because doc is muted', docName);
await bluebird.delay(1000);
}
}
// Like fetchDoc(), but doesn't check if ActiveDoc returned is unmuted.
private async _fetchPossiblyMutedDoc(docSession: OptDocSession, docName: string,
wantRecoveryMode?: boolean): Promise<ActiveDoc> {
if (this._activeDocs.has(docName) && wantRecoveryMode !== undefined) {
const activeDoc = await this._activeDocs.get(docName);
if (activeDoc && activeDoc.recoveryMode !== wantRecoveryMode && await activeDoc.isOwner(docSession)) {
// shutting doc down to have a chance to re-open in the correct mode.
// TODO: there could be a battle with other users opening it in a different mode.
await activeDoc.shutdown();
}
}
let activeDoc: ActiveDoc;
if (!this._activeDocs.has(docName)) {
activeDoc = await mapSetOrClear(
this._activeDocs, docName,
this._createActiveDoc(docSession, docName, wantRecoveryMode)
.then(newDoc => {
// Propagate backupMade events from newly opened activeDocs (consolidate all to DocMan)
newDoc.on('backupMade', (bakPath: string) => {
this.emit('backupMade', bakPath);
});
return newDoc.loadDoc(docSession);
}));
} else {
activeDoc = await this._activeDocs.get(docName)!;
}
return activeDoc;
}
private async _createActiveDoc(docSession: OptDocSession, docName: string, safeMode?: boolean) { private async _createActiveDoc(docSession: OptDocSession, docName: string, safeMode?: boolean) {
// Get URL for document for use with SELF_HYPERLINK(). // Get URL for document for use with SELF_HYPERLINK().
const cachedDoc = getDocSessionCachedDoc(docSession); const cachedDoc = getDocSessionCachedDoc(docSession);