mirror of
https://github.com/gristlabs/grist-core.git
synced 2024-10-27 20:44:07 +00:00
9fa5d4c9d6
Summary: The way linkId was set on actions to tie them together for undo bundling was incorrect. This diff fixes it by moves the setting of linkIds to Sharing.ts, which already serializes the processing of actions. Test Plan: Added a test case for submitting actions together while bundling (which fails without this change). Reviewers: paulfitz Reviewed By: paulfitz Differential Revision: https://phab.getgrist.com/D2716
424 lines
17 KiB
TypeScript
424 lines
17 KiB
TypeScript
import {ActionBundle, LocalActionBundle, UserActionBundle} from 'app/common/ActionBundle';
|
|
import {ActionInfo, Envelope, getEnvContent} from 'app/common/ActionBundle';
|
|
import {DocAction, UserAction} from 'app/common/DocActions';
|
|
import {allToken, Peer} from 'app/common/sharing';
|
|
import {timeFormat} from 'app/common/timeFormat';
|
|
import * as log from 'app/server/lib/log';
|
|
import {shortDesc} from 'app/server/lib/shortDesc';
|
|
import * as assert from 'assert';
|
|
import {Mutex} from 'async-mutex';
|
|
import * as Deque from 'double-ended-queue';
|
|
import {ActionHistory, asActionGroup} from './ActionHistory';
|
|
import {ActiveDoc} from './ActiveDoc';
|
|
import {makeExceptionalDocSession, OptDocSession} from './DocSession';
|
|
import {WorkCoordinator} from './WorkCoordinator';
|
|
|
|
// Describes the request to apply a UserActionBundle. It includes a Client (so that broadcast
|
|
// message can set `.fromSelf` property), and methods to resolve or reject the promise for when
|
|
// the action is applied. Note that it may not be immediate in case we are in the middle of
|
|
// processing hub actions or rebasing.
|
|
interface UserRequest {
|
|
action: UserActionBundle;
|
|
docSession: OptDocSession|null;
|
|
resolve(result: UserResult): void;
|
|
reject(err: Error): void;
|
|
}
|
|
|
|
// The result of applying a UserRequest, used to resolve the promise. It includes the retValues
|
|
// (one for each UserAction in the bundle) and the actionNum of the applied LocalActionBundle.
|
|
interface UserResult {
|
|
actionNum: number;
|
|
retValues: any[];
|
|
isModification: boolean;
|
|
}
|
|
|
|
// Internally-used enum to distinguish if applied actions should be logged as local or shared.
|
|
enum Branch { Local, Shared }
|
|
|
|
export class Sharing {
|
|
protected _activeDoc: ActiveDoc;
|
|
protected _actionHistory: ActionHistory;
|
|
protected _hubQueue: Deque<ActionBundle> = new Deque();
|
|
protected _pendingQueue: Deque<UserRequest> = new Deque();
|
|
protected _workCoordinator: WorkCoordinator;
|
|
|
|
constructor(activeDoc: ActiveDoc, actionHistory: ActionHistory, private _modificationLock: Mutex) {
|
|
// TODO actionHistory is currently unused (we use activeDoc.actionLog).
|
|
assert(actionHistory.isInitialized());
|
|
|
|
this._activeDoc = activeDoc;
|
|
this._actionHistory = actionHistory;
|
|
this._workCoordinator = new WorkCoordinator(() => this._doNextStep());
|
|
}
|
|
|
|
/** Initialize the sharing for a previously-shared doc. */
|
|
public async openSharedDoc(hub: any, docId: string): Promise<void> {
|
|
throw new Error('openSharedDoc not implemented');
|
|
}
|
|
|
|
/** Initialize the sharing for a newly-shared doc. */
|
|
public async createSharedDoc(hub: any, docId: string, docName: string, peers: Peer[]): Promise<void> {
|
|
throw new Error('openSharedDoc not implemented');
|
|
}
|
|
|
|
/**
|
|
* Returns whether this doc is shared. It's shared if and only if HubDocClient is set (though it
|
|
* may be disconnected).
|
|
*/
|
|
public isShared(): boolean { return false; }
|
|
|
|
public isSharingActivated(): boolean { return false; }
|
|
|
|
/** Returns the instanceId if the doc is shared or null otherwise. */
|
|
public get instanceId(): string|null { return null; }
|
|
|
|
public isOwnEnvelope(recipients: string[]): boolean { return true; }
|
|
|
|
public async sendLocalAction(): Promise<void> {
|
|
throw new Error('sendLocalAction not implemented');
|
|
}
|
|
|
|
public async shareDoc(docName: string, peers: Peer[]): Promise<void> {
|
|
throw new Error('shareDoc not implemented');
|
|
}
|
|
|
|
public async removeInstanceFromDoc(): Promise<string> {
|
|
throw new Error('removeInstanceFromDoc not implemented');
|
|
}
|
|
|
|
/**
|
|
* The only public interface. This may be called at any time, including while rebasing.
|
|
* WorkCoordinator ensures that actual work will only happen once other work finishes.
|
|
*/
|
|
public addUserAction(userRequest: UserRequest) {
|
|
this._pendingQueue.push(userRequest);
|
|
this._workCoordinator.ping();
|
|
}
|
|
|
|
// Returns a promise if there is some work happening, or null if there isn't.
|
|
private _doNextStep(): Promise<void>|null {
|
|
if (this._hubQueue.isEmpty()) {
|
|
if (!this._pendingQueue.isEmpty()) {
|
|
return this._applyLocalAction();
|
|
} else if (this.isSharingActivated() && this._actionHistory.haveLocalUnsent()) {
|
|
return this.sendLocalAction();
|
|
} else {
|
|
return null;
|
|
}
|
|
} else {
|
|
if (!this._actionHistory.haveLocalActions()) {
|
|
return this._applyHubAction();
|
|
} else {
|
|
return this._mergeInHubAction();
|
|
}
|
|
}
|
|
}
|
|
|
|
private async _applyLocalAction(): Promise<void> {
|
|
assert(this._hubQueue.isEmpty() && !this._pendingQueue.isEmpty());
|
|
const userRequest: UserRequest = this._pendingQueue.shift()!;
|
|
try {
|
|
const ret = await this.doApplyUserActionBundle(userRequest.action, userRequest.docSession);
|
|
userRequest.resolve(ret);
|
|
} catch (e) {
|
|
log.warn("Unable to apply action...", e);
|
|
userRequest.reject(e);
|
|
}
|
|
}
|
|
|
|
private async _applyHubAction(): Promise<void> {
|
|
assert(!this._hubQueue.isEmpty() && !this._actionHistory.haveLocalActions());
|
|
const action: ActionBundle = this._hubQueue.shift()!;
|
|
try {
|
|
await this.doApplySharedActionBundle(action);
|
|
} catch (e) {
|
|
log.error("Unable to apply hub action... skipping");
|
|
}
|
|
}
|
|
|
|
private async _mergeInHubAction(): Promise<void> {
|
|
assert(!this._hubQueue.isEmpty() && this._actionHistory.haveLocalActions());
|
|
|
|
const action: ActionBundle = this._hubQueue.peekFront()!;
|
|
try {
|
|
const accepted = await this._actionHistory.acceptNextSharedAction(action.actionHash);
|
|
if (accepted) {
|
|
this._hubQueue.shift();
|
|
} else {
|
|
await this._rebaseLocalActions();
|
|
}
|
|
} catch (e) {
|
|
log.error("Unable to apply hub action... skipping");
|
|
}
|
|
}
|
|
|
|
private async _rebaseLocalActions(): Promise<void> {
|
|
const rebaseQueue: Deque<UserActionBundle> = new Deque<UserActionBundle>();
|
|
try {
|
|
await this.createCheckpoint();
|
|
const actions: LocalActionBundle[] = await this._actionHistory.fetchAllLocal();
|
|
assert(actions.length > 0);
|
|
await this.doApplyUserActionBundle(this._createUndo(actions), null);
|
|
rebaseQueue.push(...actions.map((a) => getUserActionBundle(a)));
|
|
await this._actionHistory.clearLocalActions();
|
|
} catch (e) {
|
|
log.error("Can't undo local actions; sharing is off");
|
|
await this.rollbackToCheckpoint();
|
|
// TODO this.disconnect();
|
|
// TODO errorState = true;
|
|
return;
|
|
}
|
|
assert(!this._actionHistory.haveLocalActions());
|
|
|
|
while (!this._hubQueue.isEmpty()) {
|
|
await this._applyHubAction();
|
|
}
|
|
const rebaseFailures: Array<[UserActionBundle, UserActionBundle]> = [];
|
|
while (!rebaseQueue.isEmpty()) {
|
|
const action: UserActionBundle = rebaseQueue.shift()!;
|
|
const adjusted: UserActionBundle = this._mergeAdjust(action);
|
|
try {
|
|
await this.doApplyUserActionBundle(adjusted, null);
|
|
} catch (e) {
|
|
log.warn("Unable to apply rebased action...");
|
|
rebaseFailures.push([action, adjusted]);
|
|
}
|
|
}
|
|
if (rebaseFailures.length > 0) {
|
|
await this.createBackupAtCheckpoint();
|
|
// TODO we should notify the user too.
|
|
log.error('Rebase failed to reapply some of your actions, backup of local at...');
|
|
}
|
|
await this.releaseCheckpoint();
|
|
}
|
|
|
|
// ======================================================================
|
|
|
|
private doApplySharedActionBundle(action: ActionBundle): Promise<UserResult> {
|
|
const userActions: UserAction[] = [
|
|
['ApplyDocActions', action.stored.map(envContent => envContent[1])]
|
|
];
|
|
return this.doApplyUserActions(action.info[1], userActions, Branch.Shared, null);
|
|
}
|
|
|
|
private doApplyUserActionBundle(action: UserActionBundle, docSession: OptDocSession|null): Promise<UserResult> {
|
|
return this.doApplyUserActions(action.info, action.userActions, Branch.Local, docSession);
|
|
}
|
|
|
|
private async doApplyUserActions(info: ActionInfo, userActions: UserAction[],
|
|
branch: Branch, docSession: OptDocSession|null): Promise<UserResult> {
|
|
const client = docSession && docSession.client;
|
|
|
|
if (docSession?.linkId) {
|
|
info.linkId = docSession.linkId;
|
|
}
|
|
|
|
const {sandboxActionBundle, undo, docActions} =
|
|
await this._modificationLock.runExclusive(() => this._applyActionsToDataEngine(docSession, userActions));
|
|
|
|
// A trivial action does not merit allocating an actionNum,
|
|
// logging, and sharing. Since we currently don't store
|
|
// calculated values in the database, it is best not to log the
|
|
// action that initializes them when the document is opened cold
|
|
// (without cached ActiveDoc) - otherwise we'll end up with spam
|
|
// log entries for each time the document is opened cold.
|
|
|
|
const isCalculate = (userActions.length === 1 &&
|
|
userActions[0][0] === 'Calculate');
|
|
const trivial = isCalculate && sandboxActionBundle.stored.length === 0;
|
|
|
|
const actionNum = trivial ? 0 :
|
|
(branch === Branch.Shared ? this._actionHistory.getNextHubActionNum() :
|
|
this._actionHistory.getNextLocalActionNum());
|
|
|
|
const localActionBundle: LocalActionBundle = {
|
|
actionNum,
|
|
// The ActionInfo should go into the envelope that includes all recipients.
|
|
info: [findOrAddAllEnvelope(sandboxActionBundle.envelopes), info],
|
|
envelopes: sandboxActionBundle.envelopes,
|
|
stored: sandboxActionBundle.stored,
|
|
calc: sandboxActionBundle.calc,
|
|
undo,
|
|
userActions,
|
|
actionHash: null, // Gets set below by _actionHistory.recordNext...
|
|
parentActionHash: null, // Gets set below by _actionHistory.recordNext...
|
|
};
|
|
this._logActionBundle(`doApplyUserActions (${Branch[branch]})`, localActionBundle);
|
|
|
|
// TODO Note that the sandbox may produce actions which are not addressed to us (e.g. when we
|
|
// have EDIT permission without VIEW). These are not sent to the browser or the database. But
|
|
// today they are reflected in the sandbox. Should we (or the sandbox) immediately undo the
|
|
// full change, and then redo only the actions addressed to ourselves? Let's cross that bridge
|
|
// when we come to it. For now we only log skipped envelopes as "alien" in _logActionBundle().
|
|
const ownActionBundle: LocalActionBundle = this._filterOwnActions(localActionBundle);
|
|
|
|
// Apply the action to the database, and record in the action log.
|
|
if (!trivial) {
|
|
await this._activeDoc.docStorage.execTransaction(async () => {
|
|
await this._activeDoc.docStorage.applyStoredActions(getEnvContent(ownActionBundle.stored));
|
|
if (this.isShared() && branch === Branch.Local) {
|
|
// this call will compute an actionHash for localActionBundle
|
|
await this._actionHistory.recordNextLocalUnsent(localActionBundle);
|
|
} else {
|
|
// Before sharing is enabled, actions are immediately marked as "shared" (as if accepted
|
|
// by the hub). The alternative of keeping actions on the "local" branch until sharing is
|
|
// enabled is less suitable, because such actions could have empty envelopes, and cannot
|
|
// be shared. Once sharing is enabled, we would share a snapshot at that time.
|
|
await this._actionHistory.recordNextShared(localActionBundle);
|
|
}
|
|
// Check isCalculate because that's not an action we should allow undo/redo for (it's not
|
|
// considered as performed by a particular client).
|
|
if (client && client.clientId && !isCalculate) {
|
|
this._actionHistory.setActionClientId(localActionBundle.actionHash!, client.clientId);
|
|
}
|
|
});
|
|
}
|
|
await this._activeDoc.processActionBundle(ownActionBundle);
|
|
|
|
// Broadcast the action to connected browsers.
|
|
const actionGroup = asActionGroup(this._actionHistory, localActionBundle, {
|
|
client,
|
|
retValues: sandboxActionBundle.retValues,
|
|
summarize: true,
|
|
// Mark the on-open Calculate action as internal. In future, synchronizing fields to today's
|
|
// date and other changes from external values may count as internal.
|
|
internal: isCalculate,
|
|
});
|
|
try {
|
|
await this._activeDoc.appliedActions(docActions, undo);
|
|
await this._activeDoc.broadcastDocUpdate(client || null, 'docUserAction', {
|
|
actionGroup,
|
|
docActions,
|
|
});
|
|
} finally {
|
|
await this._activeDoc.finishedActions();
|
|
}
|
|
if (docSession) {
|
|
docSession.linkId = docSession.shouldBundleActions ? localActionBundle.actionNum : 0;
|
|
}
|
|
return {
|
|
actionNum: localActionBundle.actionNum,
|
|
retValues: sandboxActionBundle.retValues,
|
|
isModification: sandboxActionBundle.stored.length > 0
|
|
};
|
|
}
|
|
|
|
private _mergeAdjust(action: UserActionBundle): UserActionBundle {
|
|
// TODO: This is where we adjust actions after rebase, e.g. add delta to rowIds and such.
|
|
return action;
|
|
}
|
|
|
|
/**
|
|
* Creates a UserActionBundle with a single 'ApplyUndoActions' action, which combines the undo
|
|
* actions addressed to ourselves from all of the passed-in LocalActionBundles.
|
|
*/
|
|
private _createUndo(localActions: LocalActionBundle[]): UserActionBundle {
|
|
assert(localActions.length > 0);
|
|
const undo: DocAction[] = [];
|
|
for (const local of localActions) {
|
|
undo.push(...local.undo);
|
|
}
|
|
const first = localActions[0];
|
|
return {
|
|
info: {
|
|
time: Date.now(),
|
|
user: first.info[1].user,
|
|
inst: first.info[1].inst,
|
|
desc: "UNDO BEFORE REBASE",
|
|
otherId: 0,
|
|
linkId: 0,
|
|
},
|
|
userActions: [['ApplyUndoActions', undo]]
|
|
};
|
|
}
|
|
|
|
// Our beautiful little checkpointing interface, used to handle errors during rebase.
|
|
private createCheckpoint() { /* TODO */ }
|
|
private releaseCheckpoint() { /* TODO */ }
|
|
private rollbackToCheckpoint() { /* TODO */ }
|
|
private createBackupAtCheckpoint() { /* TODO */ }
|
|
|
|
/**
|
|
* Reduces a LocalActionBundle down to only those actions addressed to ourselves.
|
|
*/
|
|
private _filterOwnActions(localActionBundle: LocalActionBundle): LocalActionBundle {
|
|
const includeEnv: boolean[] = localActionBundle.envelopes.map(
|
|
(e) => this.isOwnEnvelope(e.recipients));
|
|
|
|
return Object.assign({}, localActionBundle, {
|
|
stored: localActionBundle.stored.filter((ea) => includeEnv[ea[0]]),
|
|
calc: localActionBundle.calc.filter((ea) => includeEnv[ea[0]]),
|
|
});
|
|
}
|
|
|
|
/** Log an action bundle to the debug log. */
|
|
private _logActionBundle(prefix: string, actionBundle: ActionBundle) {
|
|
const includeEnv = actionBundle.envelopes.map((e) => this.isOwnEnvelope(e.recipients));
|
|
log.debug("%s: ActionBundle #%s with #%s envelopes: %s",
|
|
prefix, actionBundle.actionNum, actionBundle.envelopes.length,
|
|
infoDesc(actionBundle.info[1]));
|
|
actionBundle.envelopes.forEach((env, i) =>
|
|
log.debug("%s: env #%s: %s", prefix, i, env.recipients.join(' ')));
|
|
actionBundle.stored.forEach((envAction, i) =>
|
|
log.debug("%s: stored #%s [%s%s]: %s", prefix, i, envAction[0],
|
|
(includeEnv[envAction[0]] ? "" : " alien"),
|
|
shortDesc(envAction[1])));
|
|
actionBundle.calc.forEach((envAction, i) =>
|
|
log.debug("%s: calc #%s [%s%s]: %s", prefix, i, envAction[0],
|
|
(includeEnv[envAction[0]] ? "" : " alien"),
|
|
shortDesc(envAction[1])));
|
|
}
|
|
|
|
private async _applyActionsToDataEngine(docSession: OptDocSession|null, userActions: UserAction[]) {
|
|
const sandboxActionBundle = await this._activeDoc.applyActionsToDataEngine(userActions);
|
|
const undo = getEnvContent(sandboxActionBundle.undo);
|
|
const docActions = getEnvContent(sandboxActionBundle.stored).concat(
|
|
getEnvContent(sandboxActionBundle.calc));
|
|
|
|
try {
|
|
// TODO: see if any of the code paths that have no docSession are relevant outside
|
|
// of tests.
|
|
await this._activeDoc.canApplyDocActions(docSession || makeExceptionalDocSession('share'),
|
|
docActions, undo);
|
|
} catch (e) {
|
|
// should not commit. Don't write to db. Remove changes from sandbox.
|
|
await this._activeDoc.applyActionsToDataEngine([['ApplyUndoActions', undo]]);
|
|
await this._activeDoc.finishedActions();
|
|
throw e;
|
|
}
|
|
return {sandboxActionBundle, undo, docActions};
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Returns the index of the envelope containing the '#ALL' recipient, adding such an envelope to
|
|
* the provided array if it wasn't already there.
|
|
*/
|
|
export function findOrAddAllEnvelope(envelopes: Envelope[]): number {
|
|
const i = envelopes.findIndex(e => e.recipients.includes(allToken));
|
|
if (i >= 0) { return i; }
|
|
envelopes.push({recipients: [allToken]});
|
|
return envelopes.length - 1;
|
|
}
|
|
|
|
/**
|
|
* Convert actionInfo to a concise human-readable description, for debugging.
|
|
*/
|
|
function infoDesc(info: ActionInfo): string {
|
|
const timestamp = timeFormat('A', new Date(info.time));
|
|
const desc = info.desc ? ` desc=[${info.desc}]` : '';
|
|
const otherId = info.otherId ? ` [otherId=${info.otherId}]` : '';
|
|
const linkId = info.linkId ? ` [linkId=${info.linkId}]` : '';
|
|
return `${timestamp} on ${info.inst} by ${info.user}${desc}${otherId}${linkId}`;
|
|
}
|
|
|
|
/**
|
|
* Extract a UserActionBundle from a LocalActionBundle, which contains a superset of data.
|
|
*/
|
|
function getUserActionBundle(localAction: LocalActionBundle): UserActionBundle {
|
|
return {
|
|
info: localAction.info[1],
|
|
userActions: localAction.userActions
|
|
};
|
|
}
|