gristlabs_grist-core/app/server/lib/Sharing.ts

441 lines
18 KiB
TypeScript
Raw Normal View History

import {
ActionBundle,
ActionInfo,
Envelope,
getEnvContent,
LocalActionBundle,
UserActionBundle
} from 'app/common/ActionBundle';
import {DocAction, getNumRows, UserAction} from 'app/common/DocActions';
import {allToken} from 'app/common/sharing';
import * as log from 'app/server/lib/log';
import {LogMethods} from "app/server/lib/LogMethods";
import {shortDesc} from 'app/server/lib/shortDesc';
import * as assert from 'assert';
import {Mutex} from 'async-mutex';
import * as Deque from 'double-ended-queue';
import {ActionHistory, asActionGroup, getActionUndoInfo} from './ActionHistory';
import {ActiveDoc} from './ActiveDoc';
import {makeExceptionalDocSession, OptDocSession} from './DocSession';
import {WorkCoordinator} from './WorkCoordinator';
// Describes the request to apply a UserActionBundle. It includes a Client (so that broadcast
// message can set `.fromSelf` property), and methods to resolve or reject the promise for when
// the action is applied. Note that it may not be immediate in case we are in the middle of
// processing hub actions or rebasing.
interface UserRequest {
action: UserActionBundle;
docSession: OptDocSession|null;
resolve(result: UserResult): void;
reject(err: Error): void;
}
// The result of applying a UserRequest, used to resolve the promise. It includes the retValues
// (one for each UserAction in the bundle) and the actionNum of the applied LocalActionBundle.
interface UserResult {
actionNum: number;
retValues: any[];
isModification: boolean;
}
// Internally-used enum to distinguish if applied actions should be logged as local or shared.
enum Branch { Local, Shared }
// Don't log details of action bundles in production.
const LOG_ACTION_BUNDLE = (process.env.NODE_ENV !== 'production');
export class Sharing {
protected _activeDoc: ActiveDoc;
protected _actionHistory: ActionHistory;
protected _hubQueue: Deque<ActionBundle> = new Deque();
protected _pendingQueue: Deque<UserRequest> = new Deque();
protected _workCoordinator: WorkCoordinator;
private _log = new LogMethods('Sharing ', (s: OptDocSession|null) => this._activeDoc.getLogMeta(s));
constructor(activeDoc: ActiveDoc, actionHistory: ActionHistory, private _modificationLock: Mutex) {
// TODO actionHistory is currently unused (we use activeDoc.actionLog).
assert(actionHistory.isInitialized());
this._activeDoc = activeDoc;
this._actionHistory = actionHistory;
this._workCoordinator = new WorkCoordinator(() => this._doNextStep());
}
/**
* Returns whether this doc is shared. It's shared if and only if HubDocClient is set (though it
* may be disconnected).
*/
public isShared(): boolean { return false; }
public isSharingActivated(): boolean { return false; }
/** Returns the instanceId if the doc is shared or null otherwise. */
public get instanceId(): string|null { return null; }
public isOwnEnvelope(recipients: string[]): boolean { return true; }
public async sendLocalAction(): Promise<void> {
throw new Error('sendLocalAction not implemented');
}
public async removeInstanceFromDoc(): Promise<string> {
throw new Error('removeInstanceFromDoc not implemented');
}
/**
* The only public interface. This may be called at any time, including while rebasing.
* WorkCoordinator ensures that actual work will only happen once other work finishes.
*/
public addUserAction(userRequest: UserRequest) {
this._pendingQueue.push(userRequest);
this._workCoordinator.ping();
}
// Returns a promise if there is some work happening, or null if there isn't.
private _doNextStep(): Promise<void>|null {
if (this._hubQueue.isEmpty()) {
if (!this._pendingQueue.isEmpty()) {
return this._applyLocalAction();
} else if (this.isSharingActivated() && this._actionHistory.haveLocalUnsent()) {
return this.sendLocalAction();
} else {
return null;
}
} else {
if (!this._actionHistory.haveLocalActions()) {
return this._applyHubAction();
} else {
return this._mergeInHubAction();
}
}
}
private async _applyLocalAction(): Promise<void> {
assert(this._hubQueue.isEmpty() && !this._pendingQueue.isEmpty());
const userRequest: UserRequest = this._pendingQueue.shift()!;
try {
const ret = await this._doApplyUserActionBundle(userRequest.action, userRequest.docSession);
userRequest.resolve(ret);
} catch (e) {
this._log.warn(userRequest.docSession, "Unable to apply action...", e);
userRequest.reject(e);
}
}
private async _applyHubAction(): Promise<void> {
assert(!this._hubQueue.isEmpty() && !this._actionHistory.haveLocalActions());
const action: ActionBundle = this._hubQueue.shift()!;
try {
await this._doApplySharedActionBundle(action);
} catch (e) {
this._log.error(null, "Unable to apply hub action... skipping");
}
}
private async _mergeInHubAction(): Promise<void> {
assert(!this._hubQueue.isEmpty() && this._actionHistory.haveLocalActions());
const action: ActionBundle = this._hubQueue.peekFront()!;
try {
const accepted = await this._actionHistory.acceptNextSharedAction(action.actionHash);
if (accepted) {
this._hubQueue.shift();
} else {
await this._rebaseLocalActions();
}
} catch (e) {
this._log.error(null, "Unable to apply hub action... skipping");
}
}
private async _rebaseLocalActions(): Promise<void> {
const rebaseQueue: Deque<UserActionBundle> = new Deque<UserActionBundle>();
try {
this._createCheckpoint();
const actions: LocalActionBundle[] = await this._actionHistory.fetchAllLocal();
assert(actions.length > 0);
await this._doApplyUserActionBundle(this._createUndo(actions), null);
rebaseQueue.push(...actions.map((a) => getUserActionBundle(a)));
await this._actionHistory.clearLocalActions();
} catch (e) {
this._log.error(null, "Can't undo local actions; sharing is off");
this._rollbackToCheckpoint();
// TODO this.disconnect();
// TODO errorState = true;
return;
}
assert(!this._actionHistory.haveLocalActions());
while (!this._hubQueue.isEmpty()) {
await this._applyHubAction();
}
const rebaseFailures: Array<[UserActionBundle, UserActionBundle]> = [];
while (!rebaseQueue.isEmpty()) {
const action: UserActionBundle = rebaseQueue.shift()!;
const adjusted: UserActionBundle = this._mergeAdjust(action);
try {
await this._doApplyUserActionBundle(adjusted, null);
} catch (e) {
this._log.warn(null, "Unable to apply rebased action...");
rebaseFailures.push([action, adjusted]);
}
}
if (rebaseFailures.length > 0) {
this._createBackupAtCheckpoint();
// TODO we should notify the user too.
this._log.error(null, 'Rebase failed to reapply some of your actions, backup of local at...');
}
this._releaseCheckpoint();
}
// ======================================================================
private _doApplySharedActionBundle(action: ActionBundle): Promise<UserResult> {
const userActions: UserAction[] = [
['ApplyDocActions', action.stored.map(envContent => envContent[1])]
];
return this._doApplyUserActions(action.info[1], userActions, Branch.Shared, null);
}
private _doApplyUserActionBundle(action: UserActionBundle, docSession: OptDocSession|null): Promise<UserResult> {
return this._doApplyUserActions(action.info, action.userActions, Branch.Local, docSession);
}
private async _doApplyUserActions(info: ActionInfo, userActions: UserAction[],
branch: Branch, docSession: OptDocSession|null): Promise<UserResult> {
const client = docSession && docSession.client;
if (docSession?.linkId) {
info.linkId = docSession.linkId;
}
const {sandboxActionBundle, undo, accessControl} =
await this._modificationLock.runExclusive(() => this._applyActionsToDataEngine(docSession, userActions));
try {
// A trivial action does not merit allocating an actionNum,
// logging, and sharing. Since we currently don't store
// calculated values in the database, it is best not to log the
// action that initializes them when the document is opened cold
// (without cached ActiveDoc) - otherwise we'll end up with spam
// log entries for each time the document is opened cold.
const isCalculate = (userActions.length === 1 &&
userActions[0][0] === 'Calculate');
const trivial = isCalculate && sandboxActionBundle.stored.length === 0;
const actionNum = trivial ? 0 :
(branch === Branch.Shared ? this._actionHistory.getNextHubActionNum() :
this._actionHistory.getNextLocalActionNum());
const localActionBundle: LocalActionBundle = {
actionNum,
// The ActionInfo should go into the envelope that includes all recipients.
info: [findOrAddAllEnvelope(sandboxActionBundle.envelopes), info],
envelopes: sandboxActionBundle.envelopes,
stored: sandboxActionBundle.stored,
calc: sandboxActionBundle.calc,
undo,
userActions,
actionHash: null, // Gets set below by _actionHistory.recordNext...
parentActionHash: null, // Gets set below by _actionHistory.recordNext...
};
const altSessionId = client?.getAltSessionId();
const logMeta = {
actionNum,
linkId: info.linkId,
otherId: info.otherId,
numDocActions: localActionBundle.stored.length,
numRows: localActionBundle.stored.reduce((n, env) => n + getNumRows(env[1]), 0),
author: info.user,
...(altSessionId ? {session: altSessionId}: {}),
};
this._log.rawLog('debug', docSession, '_doApplyUserActions', logMeta);
if (LOG_ACTION_BUNDLE) {
this._logActionBundle(`_doApplyUserActions (${Branch[branch]})`, localActionBundle);
}
// TODO Note that the sandbox may produce actions which are not addressed to us (e.g. when we
// have EDIT permission without VIEW). These are not sent to the browser or the database. But
// today they are reflected in the sandbox. Should we (or the sandbox) immediately undo the
// full change, and then redo only the actions addressed to ourselves? Let's cross that bridge
// when we come to it. For now we only log skipped envelopes as "alien" in _logActionBundle().
const ownActionBundle: LocalActionBundle = this._filterOwnActions(localActionBundle);
// If the document has shut down in the meantime, and this was just a "Calculate" action,
// return a trivial result. This is just to reduce noisy warnings in migration tests.
if (this._activeDoc.isShuttingDown && isCalculate) {
return {
actionNum: localActionBundle.actionNum,
retValues: [],
isModification: false
};
}
// Apply the action to the database, and record in the action log.
if (!trivial) {
await this._activeDoc.docStorage.execTransaction(async () => {
await this._activeDoc.docStorage.applyStoredActions(getEnvContent(ownActionBundle.stored));
if (this.isShared() && branch === Branch.Local) {
// this call will compute an actionHash for localActionBundle
await this._actionHistory.recordNextLocalUnsent(localActionBundle);
} else {
// Before sharing is enabled, actions are immediately marked as "shared" (as if accepted
// by the hub). The alternative of keeping actions on the "local" branch until sharing is
// enabled is less suitable, because such actions could have empty envelopes, and cannot
// be shared. Once sharing is enabled, we would share a snapshot at that time.
await this._actionHistory.recordNextShared(localActionBundle);
}
// Check isCalculate because that's not an action we should allow undo/redo for (it's not
// considered as performed by a particular client).
if (client && client.clientId && !isCalculate) {
this._actionHistory.setActionUndoInfo(
localActionBundle.actionHash!,
getActionUndoInfo(localActionBundle, client.clientId, sandboxActionBundle.retValues));
}
});
}
await this._activeDoc.processActionBundle(ownActionBundle);
const actionSummary = await this._activeDoc.handleTriggers(localActionBundle);
(core) Initial webhooks implementation Summary: See https://grist.quip.com/VKd3ASF99ezD/Outgoing-Webhooks - 2 new DocApi endpoints: _subscribe and _unsubscribe, not meant to be user friendly or publicly documented. _unsubscribe should be given the response from _subscribe in the body, e.g: ``` $ curl -X POST -H "Authorization: Bearer 8fd4dc59ecb05ab29ae5a183c03101319b8e6ca9" "http://localhost:8080/api/docs/6WYa23FqWxGNe3AR6DLjCJ/tables/Table2/_subscribe" -H "Content-type: application/json" -d '{"url": "https://webhook.site/a916b526-8afc-46e6-aa8f-a625d0d83ec3", "eventTypes": ["add"], "isReadyColumn": "C"}' {"unsubscribeKey":"3246f158-55b5-4fc7-baa5-093b75ffa86c","triggerId":2,"webhookId":"853b4bfa-9d39-4639-aa33-7d45354903c0"} $ curl -X POST -H "Authorization: Bearer 8fd4dc59ecb05ab29ae5a183c03101319b8e6ca9" "http://localhost:8080/api/docs/6WYa23FqWxGNe3AR6DLjCJ/tables/Table2/_unsubscribe" -H "Content-type: application/json" -d '{"unsubscribeKey":"3246f158-55b5-4fc7-baa5-093b75ffa86c","triggerId":2,"webhookId":"853b4bfa-9d39-4639-aa33-7d45354903c0"}' {"success":true} ``` - New DB entity Secret to hold the webhook URL and unsubscribe key - New document metatable _grist_Triggers subscribes to table changes and points to a secret to use for a webhook - New file Triggers.ts processes action summaries and uses the two new tables to send webhooks. - Also went on a bit of a diversion and made a typesafe subclass of TableData for metatables. I think this is essentially good enough for a first diff, to keep the diffs manageable and to talk about the overall structure. Future diffs can add tests and more robustness using redis etc. After this diff I can also start building the Zapier integration privately. Test Plan: Tested manually: see curl commands in summary for an example. Payloads can be seen in https://webhook.site/#!/a916b526-8afc-46e6-aa8f-a625d0d83ec3/0b9fe335-33f7-49fe-b90b-2db5ba53382d/1 . Great site for testing webhooks btw. Reviewers: dsagal, paulfitz Reviewed By: paulfitz Differential Revision: https://phab.getgrist.com/D3019
2021-09-22 23:06:23 +00:00
await this._activeDoc.updateRowCount(sandboxActionBundle.rowCount, docSession);
// Broadcast the action to connected browsers.
const actionGroup = asActionGroup(this._actionHistory, localActionBundle, {
clientId: client?.clientId,
retValues: sandboxActionBundle.retValues,
// Mark the on-open Calculate action as internal. In future, synchronizing fields to today's
// date and other changes from external values may count as internal.
internal: isCalculate,
});
(core) Initial webhooks implementation Summary: See https://grist.quip.com/VKd3ASF99ezD/Outgoing-Webhooks - 2 new DocApi endpoints: _subscribe and _unsubscribe, not meant to be user friendly or publicly documented. _unsubscribe should be given the response from _subscribe in the body, e.g: ``` $ curl -X POST -H "Authorization: Bearer 8fd4dc59ecb05ab29ae5a183c03101319b8e6ca9" "http://localhost:8080/api/docs/6WYa23FqWxGNe3AR6DLjCJ/tables/Table2/_subscribe" -H "Content-type: application/json" -d '{"url": "https://webhook.site/a916b526-8afc-46e6-aa8f-a625d0d83ec3", "eventTypes": ["add"], "isReadyColumn": "C"}' {"unsubscribeKey":"3246f158-55b5-4fc7-baa5-093b75ffa86c","triggerId":2,"webhookId":"853b4bfa-9d39-4639-aa33-7d45354903c0"} $ curl -X POST -H "Authorization: Bearer 8fd4dc59ecb05ab29ae5a183c03101319b8e6ca9" "http://localhost:8080/api/docs/6WYa23FqWxGNe3AR6DLjCJ/tables/Table2/_unsubscribe" -H "Content-type: application/json" -d '{"unsubscribeKey":"3246f158-55b5-4fc7-baa5-093b75ffa86c","triggerId":2,"webhookId":"853b4bfa-9d39-4639-aa33-7d45354903c0"}' {"success":true} ``` - New DB entity Secret to hold the webhook URL and unsubscribe key - New document metatable _grist_Triggers subscribes to table changes and points to a secret to use for a webhook - New file Triggers.ts processes action summaries and uses the two new tables to send webhooks. - Also went on a bit of a diversion and made a typesafe subclass of TableData for metatables. I think this is essentially good enough for a first diff, to keep the diffs manageable and to talk about the overall structure. Future diffs can add tests and more robustness using redis etc. After this diff I can also start building the Zapier integration privately. Test Plan: Tested manually: see curl commands in summary for an example. Payloads can be seen in https://webhook.site/#!/a916b526-8afc-46e6-aa8f-a625d0d83ec3/0b9fe335-33f7-49fe-b90b-2db5ba53382d/1 . Great site for testing webhooks btw. Reviewers: dsagal, paulfitz Reviewed By: paulfitz Differential Revision: https://phab.getgrist.com/D3019
2021-09-22 23:06:23 +00:00
actionGroup.actionSummary = actionSummary;
actionGroup.rowCount = sandboxActionBundle.rowCount;
await accessControl.appliedBundle();
await accessControl.sendDocUpdateForBundle(actionGroup);
if (docSession) {
docSession.linkId = docSession.shouldBundleActions ? localActionBundle.actionNum : 0;
}
return {
actionNum: localActionBundle.actionNum,
retValues: sandboxActionBundle.retValues,
isModification: sandboxActionBundle.stored.length > 0
};
} finally {
2022-02-19 09:46:49 +00:00
// Make sure the bundle is marked as complete, even if some miscellaneous error occurred.
await accessControl.finishedBundle();
}
}
private _mergeAdjust(action: UserActionBundle): UserActionBundle {
// TODO: This is where we adjust actions after rebase, e.g. add delta to rowIds and such.
return action;
}
/**
* Creates a UserActionBundle with a single 'ApplyUndoActions' action, which combines the undo
* actions addressed to ourselves from all of the passed-in LocalActionBundles.
*/
private _createUndo(localActions: LocalActionBundle[]): UserActionBundle {
assert(localActions.length > 0);
const undo: DocAction[] = [];
for (const local of localActions) {
undo.push(...local.undo);
}
const first = localActions[0];
return {
info: {
time: Date.now(),
user: first.info[1].user,
inst: first.info[1].inst,
desc: "UNDO BEFORE REBASE",
otherId: 0,
linkId: 0,
},
userActions: [['ApplyUndoActions', undo]]
};
}
// Our beautiful little checkpointing interface, used to handle errors during rebase.
private _createCheckpoint() { /* TODO */ }
private _releaseCheckpoint() { /* TODO */ }
private _rollbackToCheckpoint() { /* TODO */ }
private _createBackupAtCheckpoint() { /* TODO */ }
/**
* Reduces a LocalActionBundle down to only those actions addressed to ourselves.
*/
private _filterOwnActions(localActionBundle: LocalActionBundle): LocalActionBundle {
const includeEnv: boolean[] = localActionBundle.envelopes.map(
(e) => this.isOwnEnvelope(e.recipients));
return Object.assign({}, localActionBundle, {
stored: localActionBundle.stored.filter((ea) => includeEnv[ea[0]]),
calc: localActionBundle.calc.filter((ea) => includeEnv[ea[0]]),
});
}
/** Log an action bundle to the debug log. */
private _logActionBundle(prefix: string, actionBundle: ActionBundle) {
const includeEnv = actionBundle.envelopes.map((e) => this.isOwnEnvelope(e.recipients));
actionBundle.stored.forEach((envAction, i) =>
log.debug("%s: stored #%s [%s%s]: %s", prefix, i, envAction[0],
(includeEnv[envAction[0]] ? "" : " alien"),
shortDesc(envAction[1])));
actionBundle.calc.forEach((envAction, i) =>
log.debug("%s: calc #%s [%s%s]: %s", prefix, i, envAction[0],
(includeEnv[envAction[0]] ? "" : " alien"),
shortDesc(envAction[1])));
}
private async _applyActionsToDataEngine(docSession: OptDocSession|null, userActions: UserAction[]) {
const sandboxActionBundle = await this._activeDoc.applyActionsToDataEngine(docSession, userActions);
const undo = getEnvContent(sandboxActionBundle.undo);
const docActions = getEnvContent(sandboxActionBundle.stored).concat(
getEnvContent(sandboxActionBundle.calc));
const isDirect = getEnvContent(sandboxActionBundle.direct);
const accessControl = this._activeDoc.getGranularAccessForBundle(
docSession || makeExceptionalDocSession('share'), docActions, undo, userActions, isDirect
);
try {
// TODO: see if any of the code paths that have no docSession are relevant outside
// of tests.
await accessControl.canApplyBundle();
} catch (e) {
// should not commit. Don't write to db. Remove changes from sandbox.
try {
await this._activeDoc.applyActionsToDataEngine(docSession, [['ApplyUndoActions', undo]]);
} finally {
await accessControl.finishedBundle();
}
throw e;
}
return {sandboxActionBundle, undo, docActions, accessControl};
}
}
/**
* Returns the index of the envelope containing the '#ALL' recipient, adding such an envelope to
* the provided array if it wasn't already there.
*/
export function findOrAddAllEnvelope(envelopes: Envelope[]): number {
const i = envelopes.findIndex(e => e.recipients.includes(allToken));
if (i >= 0) { return i; }
envelopes.push({recipients: [allToken]});
return envelopes.length - 1;
}
/**
* Extract a UserActionBundle from a LocalActionBundle, which contains a superset of data.
*/
function getUserActionBundle(localAction: LocalActionBundle): UserActionBundle {
return {
info: localAction.info[1],
userActions: localAction.userActions
};
}