diff --git a/app/common/ActionBundle.ts b/app/common/ActionBundle.ts index eb35f3a6..f98f70b1 100644 --- a/app/common/ActionBundle.ts +++ b/app/common/ActionBundle.ts @@ -62,6 +62,16 @@ export interface SandboxActionBundle { undo: Array>; // Inverse actions for all 'stored' actions. retValues: any[]; // Contains retValue for each of userActions. rowCount: number; + // Mapping of keys (hashes of request args) to all unique requests made in a round of calculation + requests?: Record; +} + +// Represents a unique call to the Python REQUEST function +export interface SandboxRequest { + url: string; + params: Record | null; + headers: Record | null; + deps: unknown; // pass back to the sandbox unchanged in the response } // Local action that's been applied. It now has an actionNum, and includes doc actions packaged diff --git a/app/common/DocActions.ts b/app/common/DocActions.ts index 209ce13b..fd75f5ef 100644 --- a/app/common/DocActions.ts +++ b/app/common/DocActions.ts @@ -144,6 +144,9 @@ export interface TableRecordValue { export type UserAction = Array; +// Actions that trigger formula calculations in the data engine +export const CALCULATING_USER_ACTIONS = new Set(['Calculate', 'UpdateCurrentTime', 'RespondToRequests']); + /** * Gives a description for an action which involves setting values to a selection. * @param {Array} action - The (Bulk)AddRecord/(Bulk)UpdateRecord action to describe. diff --git a/app/server/lib/ActiveDoc.ts b/app/server/lib/ActiveDoc.ts index 58d513e8..5310a2c2 100644 --- a/app/server/lib/ActiveDoc.ts +++ b/app/server/lib/ActiveDoc.ts @@ -4,7 +4,13 @@ * change events. */ -import {getEnvContent, LocalActionBundle, SandboxActionBundle, UserActionBundle} from 'app/common/ActionBundle'; +import { + getEnvContent, + LocalActionBundle, + SandboxActionBundle, + SandboxRequest, + UserActionBundle +} from 'app/common/ActionBundle'; import {ActionGroup, MinimalActionGroup} from 'app/common/ActionGroup'; import {ActionSummary} from "app/common/ActionSummary"; import { @@ -33,12 +39,7 @@ import { UserAction } from 'app/common/DocActions'; import {DocData} from 'app/common/DocData'; -import { - getDataLimitRatio, - getDataLimitStatus, - getSeverity, - LimitExceededError, -} from 'app/common/DocLimits'; +import {getDataLimitRatio, getDataLimitStatus, getSeverity, LimitExceededError} from 'app/common/DocLimits'; import {DocSnapshots} from 'app/common/DocSnapshot'; import {DocumentSettings} from 'app/common/DocumentSettings'; import { @@ -75,6 +76,7 @@ import {GRIST_DOC_SQL, GRIST_DOC_WITH_TABLE1_SQL} from 'app/server/lib/initialDo import {ISandbox} from 'app/server/lib/ISandbox'; import * as log from 'app/server/lib/log'; import {LogMethods} from "app/server/lib/LogMethods"; +import {DocRequests} from 'app/server/lib/Requests'; import {shortDesc} from 'app/server/lib/shortDesc'; import {TableMetadataLoader} from 'app/server/lib/TableMetadataLoader'; import {DocTriggers} from "app/server/lib/Triggers"; @@ -182,6 +184,7 @@ export class ActiveDoc extends EventEmitter { private _log = new LogMethods('ActiveDoc ', (s: OptDocSession) => this.getLogMeta(s)); private _triggers: DocTriggers; + private _requests: DocRequests; private _dataEngine: Promise|undefined; private _activeDocImport: ActiveDocImport; private _onDemandActions: OnDemandActions; @@ -270,6 +273,7 @@ export class ActiveDoc extends EventEmitter { this.docStorage = new DocStorage(docManager.storageManager, docName); this.docClients = new DocClients(this); this._triggers = new DocTriggers(this); + this._requests = new DocRequests(this); this._actionHistory = new ActionHistoryImpl(this.docStorage); this.docPluginManager = new DocPluginManager(docManager.pluginManager.getPlugins(), docManager.pluginManager.appRoot!, this, this._docManager.gristServer); @@ -1095,7 +1099,7 @@ export class ActiveDoc extends EventEmitter { this.dataLimitStatus === "deleteOnly" && !actions.every(action => [ 'RemoveTable', 'RemoveColumn', 'RemoveRecord', 'BulkRemoveRecord', - 'RemoveViewSection', 'RemoveView', 'ApplyUndoActions', + 'RemoveViewSection', 'RemoveView', 'ApplyUndoActions', 'RespondToRequests', ].includes(action[0] as string)) ) { throw new Error("Document is in delete-only mode"); @@ -1420,6 +1424,10 @@ export class ActiveDoc extends EventEmitter { } const user = docSession ? await this._granularAccess.getCachedUser(docSession) : undefined; sandboxActionBundle = await this._rawPyCall('apply_user_actions', normalActions, user?.toJSON()); + const {requests} = sandboxActionBundle; + if (requests) { + this._requests.handleRequestsBatchFromUserActions(requests).catch(e => console.error(e)); + } await this._reportDataEngineMemory(); } else { // Create default SandboxActionBundle to use if the data engine is not called. @@ -2087,6 +2095,7 @@ export class ActiveDoc extends EventEmitter { preferredPythonVersion, sandboxOptions: { exports: { + request: (key: string, args: SandboxRequest) => this._requests.handleSingleRequestWithCache(key, args), guessColInfo: (values: Array) => guessColInfoWithDocData(values, this.docData!), convertFromColumn: (...args: Parameters>) => diff --git a/app/server/lib/GranularAccess.ts b/app/server/lib/GranularAccess.ts index 2ff1febb..fb34afed 100644 --- a/app/server/lib/GranularAccess.ts +++ b/app/server/lib/GranularAccess.ts @@ -4,7 +4,13 @@ import { ActionGroup } from 'app/common/ActionGroup'; import { createEmptyActionSummary } from 'app/common/ActionSummary'; import { ServerQuery } from 'app/common/ActiveDocAPI'; import { ApiError } from 'app/common/ApiError'; -import { AddRecord, BulkAddRecord, BulkColValues, BulkRemoveRecord, BulkUpdateRecord } from 'app/common/DocActions'; +import { + AddRecord, + BulkAddRecord, + BulkColValues, + BulkRemoveRecord, + BulkUpdateRecord, +} from 'app/common/DocActions'; import { RemoveRecord, ReplaceTableData, UpdateRecord } from 'app/common/DocActions'; import { CellValue, ColValues, DocAction, getTableId, isSchemaAction } from 'app/common/DocActions'; import { TableDataAction, UserAction } from 'app/common/DocActions'; diff --git a/app/server/lib/Requests.ts b/app/server/lib/Requests.ts new file mode 100644 index 00000000..3835b1ff --- /dev/null +++ b/app/server/lib/Requests.ts @@ -0,0 +1,128 @@ +import {SandboxRequest} from 'app/common/ActionBundle'; +import {ActiveDoc} from 'app/server/lib/ActiveDoc'; +import {makeExceptionalDocSession} from 'app/server/lib/DocSession'; +import {httpEncoding} from 'app/server/lib/httpEncoding'; +import {HttpsProxyAgent} from 'https-proxy-agent'; +import {HttpProxyAgent} from 'http-proxy-agent'; +import fetch from 'node-fetch'; +import * as path from 'path'; +import * as tmp from 'tmp'; +import chunk = require('lodash/chunk'); +import fromPairs = require('lodash/fromPairs'); +import zipObject = require('lodash/zipObject'); +import * as fse from 'fs-extra'; +import * as log from 'app/server/lib/log'; + +export class DocRequests { + // Request responses are briefly cached in files only to handle multiple requests in a formula + // and only as long as needed to finish calculating all formulas. + // When _numPending reaches 0 again, _cacheDir is deleted. + private _numPending: number = 0; + private _cacheDir: tmp.SynchrounousResult | null = null; + + constructor(private readonly _activeDoc: ActiveDoc) {} + + public async handleRequestsBatchFromUserActions(requests: Record) { + const numRequests = Object.keys(requests).length; + this._numPending += numRequests; + try { + // Perform batches of requests in parallel for speed, and hope it doesn't cause rate limiting... + for (const keys of chunk(Object.keys(requests), 10)) { + const responses: Response[] = await Promise.all(keys.map(async key => { + const request = requests[key]; + const response = await this.handleSingleRequestWithCache(key, request); + return { + ...response, + // Tells the engine which cell(s) made the request and should be recalculated to use the response + deps: request.deps, + }; + })); + // Tell the sandbox which previous responses we have cached in files. + // This lets it know it can immediately and synchronously get those responses again. + const cachedRequestKeys = await fse.readdir(this._cacheDir!.name); + // Recalculate formulas using this batch of responses. + const action = ["RespondToRequests", zipObject(keys, responses), cachedRequestKeys]; + await this._activeDoc.applyUserActions(makeExceptionalDocSession("system"), [action]); + } + } finally { + this._numPending -= numRequests; + if (this._numPending === 0) { + log.debug(`Removing DocRequests._cacheDir: ${this._cacheDir!.name}`); + this._cacheDir!.removeCallback(); + this._cacheDir = null; + } + } + } + + public async handleSingleRequestWithCache(key: string, request: SandboxRequest): Promise { + if (!this._cacheDir) { + // Use the sync API because otherwise multiple requests being handled at the same time + // all reach this point, `await`, and create different dirs. + // `unsafeCleanup: true` means the directory can be deleted even if it's not empty, which is what we expect. + this._cacheDir = tmp.dirSync({unsafeCleanup: true}); + log.debug(`Created DocRequests._cacheDir: ${this._cacheDir.name}`); + } + + const cachePath = path.resolve(this._cacheDir.name, key); + try { + const result = await fse.readJSON(cachePath); + result.content = Buffer.from(result.content, "base64"); + return result; + } catch { + const result = await this._handleSingleRequestRaw(request); + const resultForJson = {...result} as any; + if ('content' in result) { + resultForJson.content = result.content.toString("base64"); + } + fse.writeJSON(cachePath, resultForJson).catch(e => log.warn(`Failed to save response to cache file: ${e}`)); + return result; + } + } + + private async _handleSingleRequestRaw(request: SandboxRequest): Promise { + try { + if (process.env.GRIST_EXPERIMENTAL_PLUGINS != '1') { + throw new Error("REQUEST is not enabled"); + } + const {url, params, headers} = request; + const urlObj = new URL(url); + log.rawInfo("Handling sandbox request", {host: urlObj.host, docId: this._activeDoc.docName}); + for (const [param, value] of Object.entries(params || {})) { + urlObj.searchParams.append(param, value); + } + const response = await fetch(urlObj.toString(), {headers: headers || {}, agent: proxyAgent(urlObj)}); + const content = await response.buffer(); + const {status, statusText} = response; + const encoding = httpEncoding(response.headers.get('content-type'), content); + return { + content, status, statusText, encoding, + headers: fromPairs([...response.headers]), + }; + } catch (e) { + return {error: String(e)}; + } + } +} + +interface SuccessfulResponse { + content: Buffer; + status: number; + statusText: string; + encoding?: string; + headers: Record; +} + +interface RequestError { + error: string; +} + +type Response = RequestError | SuccessfulResponse; + +function proxyAgent(requestUrl: URL) { + const proxy = process.env.GRIST_HTTPS_PROXY; + if (!proxy) { + return undefined; + } + const ProxyAgent = requestUrl.protocol === "https:" ? HttpsProxyAgent : HttpProxyAgent; + return new ProxyAgent(proxy); +} diff --git a/app/server/lib/Sharing.ts b/app/server/lib/Sharing.ts index 5ddd2370..f8383b60 100644 --- a/app/server/lib/Sharing.ts +++ b/app/server/lib/Sharing.ts @@ -6,7 +6,7 @@ import { LocalActionBundle, UserActionBundle } from 'app/common/ActionBundle'; -import {DocAction, getNumRows, UserAction} from 'app/common/DocActions'; +import {CALCULATING_USER_ACTIONS, DocAction, getNumRows, UserAction} from 'app/common/DocActions'; import {allToken} from 'app/common/sharing'; import * as log from 'app/server/lib/log'; import {LogMethods} from "app/server/lib/LogMethods"; @@ -215,8 +215,7 @@ export class Sharing { try { - const isCalculate = (userActions.length === 1 && - (userActions[0][0] === 'Calculate' || userActions[0][0] === 'UpdateCurrentTime')); + const isCalculate = (userActions.length === 1 && CALCULATING_USER_ACTIONS.has(userActions[0][0] as string)); // `internal` is true if users shouldn't be able to undo the actions. Applies to: // - Calculate/UpdateCurrentTime because it's not considered as performed by a particular client. // - Adding attachment metadata when uploading attachments, diff --git a/app/server/lib/httpEncoding.ts b/app/server/lib/httpEncoding.ts new file mode 100644 index 00000000..af485ff3 --- /dev/null +++ b/app/server/lib/httpEncoding.ts @@ -0,0 +1,43 @@ +// Based on the source code of the Body.textConverted method in node-fetch +export function httpEncoding(header: string | null, content: Buffer): string | undefined { + let res: RegExpExecArray | null = null; + + // header + if (header) { + res = /charset=([^;]*)/i.exec(header); + } + + // no charset in content type, peek at response body for at most 1024 bytes + const str = content.slice(0, 1024).toString(); + + // html5 + if (!res && str) { + res = /= 1.5.0 < 2" toidentifier "1.0.0" +http-proxy-agent@5.0.0: + version "5.0.0" + resolved "https://registry.yarnpkg.com/http-proxy-agent/-/http-proxy-agent-5.0.0.tgz#5129800203520d434f142bc78ff3c170800f2b43" + integrity sha512-n2hY8YdoRE1i7r6M0w9DIw5GgZN0G25P8zLCRQ8rjXtTU3vsNFBI/vWK/UIeE6g5MUUz6avwAPXmL6Fy9D/90w== + dependencies: + "@tootallnate/once" "2" + agent-base "6" + debug "4" + http-signature@~1.2.0: version "1.2.0" resolved "https://registry.yarnpkg.com/http-signature/-/http-signature-1.2.0.tgz#9aecd925114772f3d95b65a60abb8f7c18fbace1" @@ -3661,6 +3675,14 @@ https-browserify@^1.0.0: resolved "https://registry.yarnpkg.com/https-browserify/-/https-browserify-1.0.0.tgz#ec06c10e0a34c0f2faf199f7fd7fc78fffd03c73" integrity sha1-7AbBDgo0wPL68Zn3/X/Hj//QPHM= +https-proxy-agent@5.0.1: + version "5.0.1" + resolved "https://registry.yarnpkg.com/https-proxy-agent/-/https-proxy-agent-5.0.1.tgz#c59ef224a04fe8b754f3db0063a25ea30d0005d6" + integrity sha512-dFcAjpTQFgoLMzC2VwU+C/CbS7uRL0lWmxDITmqm7C+7F0Odmj6s9l6alZc6AELXhrnggM2CeWSXHGOdX2YtwA== + dependencies: + agent-base "6" + debug "4" + https-proxy-agent@5.0.0, https-proxy-agent@^5.0.0: version "5.0.0" resolved "https://registry.yarnpkg.com/https-proxy-agent/-/https-proxy-agent-5.0.0.tgz#e2a90542abb68a762e0a0850f6c9edadfd8506b2"