(core) Proxy Agent moved to the separate file, Triggers are using proxy now to perform fetch

Summary:
- Webhooks form Triggers.ts should now use proxy if it's configured
- Proxy handling code separated to ProxyAgent.ts
- Tests for ProxyAgent
- Integration/API Tests for using Proxy in webhooks
- a bit of refactor - proxy test uses mostly the same codebase as DocApi.ts, but because last one if over 4000 lines long, I've put it into separated file, and extract some common parts (there is some duplicates tho)
- some cleanup in files that I've touched

Test Plan:
Manual test to check if proxy is used on the staging env

Automatic test checking if (fake) proxy was called

Reviewers: paulfitz

Reviewed By: paulfitz

Subscribers: paulfitz

Differential Revision: https://phab.getgrist.com/D3860
This commit is contained in:
Jakub Serafin 2023-05-08 11:49:53 +02:00
parent 37347a79c0
commit 440d5b935a
11 changed files with 1058 additions and 517 deletions

View File

@ -2,16 +2,15 @@ import {SandboxRequest} from 'app/common/ActionBundle';
import {ActiveDoc} from 'app/server/lib/ActiveDoc'; import {ActiveDoc} from 'app/server/lib/ActiveDoc';
import {makeExceptionalDocSession} from 'app/server/lib/DocSession'; import {makeExceptionalDocSession} from 'app/server/lib/DocSession';
import {httpEncoding} from 'app/server/lib/httpEncoding'; import {httpEncoding} from 'app/server/lib/httpEncoding';
import {HttpsProxyAgent} from 'https-proxy-agent';
import {HttpProxyAgent} from 'http-proxy-agent';
import fetch from 'node-fetch'; import fetch from 'node-fetch';
import * as path from 'path'; import * as path from 'path';
import * as tmp from 'tmp'; import * as tmp from 'tmp';
import * as fse from 'fs-extra';
import log from 'app/server/lib/log';
import {proxyAgent} from "app/server/utils/ProxyAgent";
import chunk = require('lodash/chunk'); import chunk = require('lodash/chunk');
import fromPairs = require('lodash/fromPairs'); import fromPairs = require('lodash/fromPairs');
import zipObject = require('lodash/zipObject'); import zipObject = require('lodash/zipObject');
import * as fse from 'fs-extra';
import log from 'app/server/lib/log';
export class DocRequests { export class DocRequests {
// Request responses are briefly cached in files only to handle multiple requests in a formula // Request responses are briefly cached in files only to handle multiple requests in a formula
@ -118,11 +117,3 @@ interface RequestError {
type Response = RequestError | SuccessfulResponse; type Response = RequestError | SuccessfulResponse;
function proxyAgent(requestUrl: URL) {
const proxy = process.env.GRIST_HTTPS_PROXY;
if (!proxy) {
return undefined;
}
const ProxyAgent = requestUrl.protocol === "https:" ? HttpsProxyAgent : HttpProxyAgent;
return new ProxyAgent(proxy);
}

View File

@ -13,6 +13,7 @@ import {makeExceptionalDocSession} from 'app/server/lib/DocSession';
import log from 'app/server/lib/log'; import log from 'app/server/lib/log';
import {matchesBaseDomain} from 'app/server/lib/requestUtils'; import {matchesBaseDomain} from 'app/server/lib/requestUtils';
import {delayAbort} from 'app/server/lib/serverUtils'; import {delayAbort} from 'app/server/lib/serverUtils';
import {proxyAgent} from 'app/server/utils/ProxyAgent';
import {promisifyAll} from 'bluebird'; import {promisifyAll} from 'bluebird';
import * as _ from 'lodash'; import * as _ from 'lodash';
import {AbortController, AbortSignal} from 'node-abort-controller'; import {AbortController, AbortSignal} from 'node-abort-controller';
@ -738,6 +739,7 @@ export class DocTriggers {
'Content-Type': 'application/json', 'Content-Type': 'application/json',
}, },
signal, signal,
agent: proxyAgent(new URL(url)),
}); });
if (response.status === 200) { if (response.status === 200) {
await this._stats.logBatch(id, 'success', { size, httpStatus: 200, error: null, attempts: attempt + 1 }); await this._stats.logBatch(id, 'success', { size, httpStatus: 200, error: null, attempts: attempt + 1 });

View File

@ -0,0 +1,11 @@
import {HttpsProxyAgent} from "https-proxy-agent";
import {HttpProxyAgent} from "http-proxy-agent";
export function proxyAgent(requestUrl: URL): HttpProxyAgent | HttpsProxyAgent | undefined {
const proxy = process.env.GRIST_HTTPS_PROXY;
if (!proxy) {
return undefined;
}
const ProxyAgent = requestUrl.protocol === "https:" ? HttpsProxyAgent : HttpProxyAgent;
return new ProxyAgent(proxy);
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,38 @@
import {proxyAgent} from "app/server/utils/ProxyAgent";
import {assert} from "chai";
import {HttpsProxyAgent} from "https-proxy-agent";
import {HttpProxyAgent} from "http-proxy-agent";
import {EnvironmentSnapshot} from "test/server/testUtils";
describe("ProxyAgent", function () {
let oldEnv: EnvironmentSnapshot;
before(() => {
oldEnv = new EnvironmentSnapshot();
});
after(() => {
oldEnv.restore();
});
it("should be undefined if no proxy is configured", async function () {
delete process.env.GRIST_HTTPS_PROXY;
const httpProxy = proxyAgent(new URL("http://localhost:3000"));
const httpsProxy = proxyAgent(new URL("https://localhost:3000"));
assert.equal(httpProxy, undefined);
assert.equal(httpsProxy, undefined);
});
it("should be https proxy if proxy is configured and address is https", async function () {
process.env.GRIST_HTTPS_PROXY = "https://localhost:9000";
const httpsProxy = proxyAgent(new URL("https://localhost:3000"));
assert.instanceOf(httpsProxy, HttpsProxyAgent);
});
it("should be https proxy if proxy is configured and address is https", async function () {
process.env.GRIST_HTTPS_PROXY = "https://localhost:9000";
const httpsProxy = proxyAgent(new URL("http://localhost:3000"));
assert.instanceOf(httpsProxy, HttpProxyAgent);
});
});

View File

@ -0,0 +1,341 @@
import {UserAPIImpl} from 'app/common/UserAPI';
import {WebhookSubscription} from 'app/server/lib/DocApi';
import log from 'app/server/lib/log';
import axios from 'axios';
import * as bodyParser from 'body-parser';
import {assert} from 'chai';
import FormData from 'form-data';
import fetch from 'node-fetch';
import {tmpdir} from 'os';
import * as path from 'path';
import {createClient} from 'redis';
import {configForUser} from 'test/gen-server/testUtils';
import {serveSomething, Serving} from 'test/server/customUtil';
import {prepareDatabase} from 'test/server/lib/helpers/PrepareDatabase';
import {prepareFilesystemDirectoryForTests} from 'test/server/lib/helpers/PrepareFilesystemDirectoryForTests';
import {signal} from 'test/server/lib/helpers/Signal';
import {TestProxyServer} from 'test/server/lib/helpers/TestProxyServer';
import {TestServer} from 'test/server/lib/helpers/TestServer';
import * as testUtils from 'test/server/testUtils';
import clone = require('lodash/clone');
const chimpy = configForUser('Chimpy');
// some doc ids
const docIds: { [name: string]: string } = {
ApiDataRecordsTest: 'sample_7',
Timesheets: 'sample_13',
Bananas: 'sample_6',
Antartic: 'sample_11'
};
let dataDir: string;
let suitename: string;
let serverUrl: string;
let userApi: UserAPIImpl;
async function cleanRedisDatabase() {
const cli = createClient(process.env.TEST_REDIS_URL);
await cli.flushdbAsync();
await cli.quitAsync();
}
function backupEnvironmentVariables() {
let oldEnv: NodeJS.ProcessEnv;
before(() => {
oldEnv = clone(process.env);
});
after(() => {
Object.assign(process.env, oldEnv);
});
}
/*
TODO: this hardcoded port numbers might cause conflicts in parallel tests executions. replace with someone more generic
*/
const webhooksTestPort = 34365;
const webhooksTestProxyPort = 22335;
describe('Webhooks proxy configuration', function () {
// A testDir of the form grist_test_{USER}_{SERVER_NAME}
// - its a directory that will be base for all test related files and activities
const username = process.env.USER || "nobody";
const tmpDir = path.join(tmpdir(), `grist_test_${username}_docapi_webhooks_proxy`);
let home: TestServer;
let docs: TestServer;
this.timeout(30000);
testUtils.setTmpLogLevel('debug');
// test might override environment values, therefore we need to backup current ones to restore them later
backupEnvironmentVariables();
function setupMockServers(name: string, tmpDir: string, cb: () => Promise<void>) {
let api: UserAPIImpl;
before(async function () {
suitename = name;
await cb();
// create TestDoc as an empty doc into Private workspace
userApi = api = makeUserApi(ORG_NAME, home.serverUrl);
const wid = await getWorkspaceId(api, 'Private');
docIds.TestDoc = await api.newDoc({name: 'TestDoc'}, wid);
});
after(async function () {
// remove TestDoc
await api.deleteDoc(docIds.TestDoc);
delete docIds.TestDoc;
// stop all servers
await home.stop();
await docs.stop();
});
}
describe('Proxy is configured', function () {
runServerConfigurations({GRIST_HTTPS_PROXY:`http://localhost:${webhooksTestProxyPort}`}, ()=>testWebhookProxy(true));
});
describe('Proxy not configured', function () {
runServerConfigurations({GRIST_HTTPS_PROXY:undefined}, ()=>testWebhookProxy(false));
});
function runServerConfigurations(additionaEnvConfiguration: object, subTestCall: Function) {
additionaEnvConfiguration = {
ALLOWED_WEBHOOK_DOMAINS: `example.com,localhost:${webhooksTestPort}`,
GRIST_DATA_DIR: dataDir,
...additionaEnvConfiguration
};
before(async function () {
// Clear redis test database if redis is in use.
if (process.env.TEST_REDIS_URL) {
await cleanRedisDatabase();
}
await prepareFilesystemDirectoryForTests(tmpDir);
await prepareDatabase(tmpDir);
});
/**
* Doc api tests are run against three different setup:
* - a merged server: a single server serving both as a home and doc worker
* - two separated servers: requests are sent to a home server which then forward them to a doc worker
* - a doc worker: request are sent directly to the doc worker (note that even though it is not
* used for testing we starts anyway a home server, needed for setting up the test cases)
*
* Future tests must be added within the testDocApi() function.
*/
describe("should work with a merged server", async () => {
setupMockServers('merged', tmpDir, async () => {
home = docs = await TestServer.startServer('home,docs', tmpDir, suitename, additionaEnvConfiguration);
serverUrl = home.serverUrl;
});
subTestCall();
});
// the way these tests are written, non-merged server requires redis.
if (process.env.TEST_REDIS_URL) {
describe("should work with a home server and a docworker", async () => {
setupMockServers('separated', tmpDir, async () => {
home = await TestServer.startServer('home', tmpDir, suitename, additionaEnvConfiguration);
docs = await TestServer.startServer('docs', tmpDir, suitename, additionaEnvConfiguration, home.serverUrl);
serverUrl = home.serverUrl;
});
subTestCall();
});
describe("should work directly with a docworker", async () => {
setupMockServers('docs', tmpDir, async () => {
home = await TestServer.startServer('home', tmpDir, suitename, additionaEnvConfiguration);
docs = await TestServer.startServer('docs', tmpDir, suitename, additionaEnvConfiguration, home.serverUrl);
serverUrl = docs.serverUrl;
});
subTestCall();
});
}
}
function testWebhookProxy(shouldProxyBeCalled: boolean) {
describe('calling registered webhooks after data update', function () {
let serving: Serving; // manages the test webhook server
let testProxyServer: TestProxyServer; // manages the test webhook server
let redisMonitor: any;
// Create couple of promises that can be used to monitor
// if the endpoint was called.
const successCalled = signal();
const notFoundCalled = signal();
async function autoSubscribe(
endpoint: string, docId: string, options?: {
tableId?: string,
isReadyColumn?: string | null,
eventTypes?: string[]
}) {
// Subscribe helper that returns a method to unsubscribe.
const data = await subscribe(endpoint, docId, options);
return () => unsubscribe(docId, data, options?.tableId ?? 'Table1');
}
function unsubscribe(docId: string, data: any, tableId = 'Table1') {
return axios.post(
`${serverUrl}/api/docs/${docId}/tables/${tableId}/_unsubscribe`,
data, chimpy
);
}
async function subscribe(endpoint: string, docId: string, options?: {
tableId?: string,
isReadyColumn?: string | null,
eventTypes?: string[]
}) {
// Subscribe helper that returns a method to unsubscribe.
const {data, status} = await axios.post(
`${serverUrl}/api/docs/${docId}/tables/${options?.tableId ?? 'Table1'}/_subscribe`,
{
eventTypes: options?.eventTypes ?? ['add', 'update'],
url: `${serving.url}/${endpoint}`,
isReadyColumn: options?.isReadyColumn === undefined ? 'B' : options?.isReadyColumn
}, chimpy
);
assert.equal(status, 200);
return data as WebhookSubscription;
}
async function clearQueue(docId: string) {
const deleteResult = await axios.delete(
`${serverUrl}/api/docs/${docId}/webhooks/queue`, chimpy
);
assert.equal(deleteResult.status, 200);
}
before(async function () {
this.timeout(30000);
serving = await serveSomething(app => {
app.use(bodyParser.json());
app.post('/200', ({body}, res) => {
successCalled.emit(body[0].A);
res.sendStatus(200);
res.end();
});
app.post('/404', ({body}, res) => {
notFoundCalled.emit(body[0].A);
res.sendStatus(404); // Webhooks treats it as an error and will retry. Probably it shouldn't work this way.
res.end();
});
}, webhooksTestPort);
testProxyServer = await TestProxyServer.Prepare(webhooksTestProxyPort);
});
after(async function () {
await serving.shutdown();
await testProxyServer.dispose();
});
before(async function () {
this.timeout(30000);
if (process.env.TEST_REDIS_URL) {
redisMonitor = createClient(process.env.TEST_REDIS_URL);
}
});
after(async function () {
if (process.env.TEST_REDIS_URL) {
await redisMonitor.quitAsync();
}
});
if (shouldProxyBeCalled) {
it("Should call proxy", async function () {
//Run standard subscribe-modify data-check response - unsubscribe scenario, we are not mutch
// intrested in it, only want to check if proxy was used
await runTestCase();
assert.isTrue(testProxyServer.wasProxyCalled());
});
} else {
it("Should not call proxy", async function () {
//Run standard subscribe-modify data-check response - unsubscribe scenario, we are not mutch
// intrested in it, only want to check if proxy was used
await runTestCase();
assert.isFalse(testProxyServer.wasProxyCalled());
});
}
async function runTestCase() {
//Create a test document.
const ws1 = (await userApi.getOrgWorkspaces('current'))[0].id;
const docId = await userApi.newDoc({name: 'testdoc2'}, ws1);
const doc = userApi.getDocAPI(docId);
await axios.post(`${serverUrl}/api/docs/${docId}/apply`, [
['ModifyColumn', 'Table1', 'B', {type: 'Bool'}],
], chimpy);
// Try to clear the queue, even if it is empty.
await clearQueue(docId);
const cleanup: (() => Promise<any>)[] = [];
// Subscribe a valid webhook endpoint.
cleanup.push(await autoSubscribe('200', docId));
// Subscribe an invalid webhook endpoint.
cleanup.push(await autoSubscribe('404', docId));
// Prepare signals, we will be waiting for those two to be called.
successCalled.reset();
notFoundCalled.reset();
// Trigger both events.
await doc.addRows("Table1", {
A: [1],
B: [true],
});
// Wait for both of them to be called (this is correct order)
await successCalled.waitAndReset();
await notFoundCalled.waitAndReset();
// Broken endpoint will be called multiple times here, and any subsequent triggers for working
// endpoint won't be called.
await notFoundCalled.waitAndReset();
// But the working endpoint won't be called more then once.
assert.isFalse(successCalled.called());
//Cleanup all
await Promise.all(cleanup.map(fn => fn())).finally(() => cleanup.length = 0);
await clearQueue(docId);
}
});
}
});
const ORG_NAME = 'docs-1';
function makeUserApi(org: string, homeServerUrl: string, user?: string) {
return new UserAPIImpl(`${homeServerUrl}/o/${org}`, {
headers: {Authorization: `Bearer api_key_for_${user || 'chimpy'}`},
fetch: fetch as any,
newFormData: () => new FormData() as any,
logger: log
});
}
async function getWorkspaceId(api: UserAPIImpl, name: string) {
const workspaces = await api.getOrgWorkspaces('current');
return workspaces.find((w) => w.name === name)!.id;
}

View File

@ -0,0 +1,17 @@
import path from "path";
import * as testUtils from "test/server/testUtils";
import {execFileSync} from "child_process";
export async function prepareDatabase(tempDirectory: string) {
// Let's create a sqlite db that we can share with servers that run in other processes, hence
// not an in-memory db. Running seed.ts directly might not take in account the most recent value
// for TYPEORM_DATABASE, because ormconfig.js may already have been loaded with a different
// configuration (in-memory for instance). Spawning a process is one way to make sure that the
// latest value prevail.
process.env.TYPEORM_DATABASE = path.join(tempDirectory, 'landing.db');
const seed = await testUtils.getBuildFile('test/gen-server/seed.js');
execFileSync('node', [seed, 'init'], {
env: process.env,
stdio: 'inherit'
});
}

View File

@ -0,0 +1,9 @@
import * as fse from "fs-extra";
import log from "app/server/lib/log";
export async function prepareFilesystemDirectoryForTests(directory: string) {
// Create the tmp dir removing any previous one
await fse.remove(directory);
await fse.mkdirs(directory);
log.warn(`Test logs and data are at: ${directory}/`);
}

View File

@ -0,0 +1,44 @@
import {delay} from "bluebird";
/**
* Helper that creates a promise that can be resolved from outside.
*/
export function signal() {
let resolve: null | ((data: any) => void) = null;
let promise: null | Promise<any> = null;
let called = false;
return {
emit(data: any) {
if (!resolve) {
throw new Error("signal.emit() called before signal.reset()");
}
called = true;
resolve(data);
},
async wait() {
if (!promise) {
throw new Error("signal.wait() called before signal.reset()");
}
const proms = Promise.race([promise, delay(2000).then(() => {
throw new Error("signal.wait() timed out");
})]);
return await proms;
},
async waitAndReset() {
try {
return await this.wait();
} finally {
this.reset();
}
},
called() {
return called;
},
reset() {
called = false;
promise = new Promise((res) => {
resolve = res;
});
}
};
}

View File

@ -0,0 +1,46 @@
import {serveSomething, Serving} from "test/server/customUtil";
import * as bodyParser from "body-parser";
import {Request, Response} from "express-serve-static-core";
import axios from "axios";
export class TestProxyServer {
public static async Prepare(portNumber: number): Promise<TestProxyServer> {
const server = new TestProxyServer();
await server._prepare(portNumber);
return server;
}
private _proxyCallsCounter: number = 0;
private _proxyServing: Serving;
private constructor() {
}
public wasProxyCalled(): boolean {
return this._proxyCallsCounter > 0;
}
public async dispose() {
await this._proxyServing.shutdown();
}
private async _prepare(portNumber: number) {
this._proxyServing = await serveSomething(app => {
app.use(bodyParser.json());
app.all('*', async (req: Request, res: Response) => {
this._proxyCallsCounter += 1;
let responseCode;
try {
const axiosResponse = await axios.post(req.url, req.body);
responseCode = axiosResponse.status;
} catch (error: any) {
responseCode = error.response.status;
}
res.sendStatus(responseCode);
res.end();
//next();
});
}, portNumber);
}
}

View File

@ -0,0 +1,143 @@
import {connectTestingHooks, TestingHooksClient} from "app/server/lib/TestingHooks";
import {ChildProcess, execFileSync, spawn} from "child_process";
import path from "path";
import * as fse from "fs-extra";
import * as testUtils from "test/server/testUtils";
import {exitPromise} from "app/server/lib/serverUtils";
import log from "app/server/lib/log";
import {delay} from "bluebird";
import fetch from "node-fetch";
export class TestServer {
public static async startServer
(serverTypes: string,
tempDirectory: string,
suitename: string,
additionalConfig?: Object,
_homeUrl?: string): Promise<TestServer> {
const server = new TestServer(serverTypes, tempDirectory, suitename);
// Override some env variables in server configuration to serve our test purpose:
const customEnv = {
...additionalConfig};
await server.start(_homeUrl, customEnv);
return server;
}
public testingSocket: string;
public testingHooks: TestingHooksClient;
public serverUrl: string;
public stopped = false;
private _server: ChildProcess;
private _exitPromise: Promise<number | string>;
private readonly _defaultEnv;
constructor(private _serverTypes: string, private _tmpDir: string, private _suiteName: string) {
this._defaultEnv = {
GRIST_INST_DIR: this._tmpDir,
GRIST_SERVERS: this._serverTypes,
// with port '0' no need to hard code a port number (we can use testing hooks to find out what
// port server is listening on).
GRIST_PORT: '0',
GRIST_DISABLE_S3: 'true',
REDIS_URL: process.env.TEST_REDIS_URL,
GRIST_ALLOWED_HOSTS: `example.com,localhost`,
GRIST_TRIGGER_WAIT_DELAY: '100',
// this is calculated value, some tests expect 4 attempts and some will try 3 times
GRIST_TRIGGER_MAX_ATTEMPTS: '4',
GRIST_MAX_QUEUE_SIZE: '10',
...process.env
};
}
public async start(_homeUrl?: string, customEnv?: object) {
// put node logs into files with meaningful name that relate to the suite name and server type
const fixedName = this._serverTypes.replace(/,/, '_');
const nodeLogPath = path.join(this._tmpDir, `${this._suiteName}-${fixedName}-node.log`);
const nodeLogFd = await fse.open(nodeLogPath, 'a');
const serverLog = process.env.VERBOSE ? 'inherit' : nodeLogFd;
// use a path for socket that relates to suite name and server types
this.testingSocket = path.join(this._tmpDir, `${this._suiteName}-${fixedName}.socket`);
const env = {
APP_HOME_URL: _homeUrl,
GRIST_TESTING_SOCKET: this.testingSocket,
...this._defaultEnv,
...customEnv
};
const main = await testUtils.getBuildFile('app/server/mergedServerMain.js');
this._server = spawn('node', [main, '--testingHooks'], {
env,
stdio: ['inherit', serverLog, serverLog]
});
this._exitPromise = exitPromise(this._server);
// Try to be more helpful when server exits by printing out the tail of its log.
this._exitPromise.then((code) => {
if (this._server.killed) {
return;
}
log.error("Server died unexpectedly, with code", code);
const output = execFileSync('tail', ['-30', nodeLogPath]);
log.info(`\n===== BEGIN SERVER OUTPUT ====\n${output}\n===== END SERVER OUTPUT =====`);
})
.catch(() => undefined);
await this._waitServerReady();
log.info(`server ${this._serverTypes} up and listening on ${this.serverUrl}`);
}
public async stop() {
if (this.stopped) {
return;
}
log.info("Stopping node server: " + this._serverTypes);
this.stopped = true;
this._server.kill();
this.testingHooks.close();
await this._exitPromise;
}
public async isServerReady(): Promise<boolean> {
// Let's wait for the testingSocket to be created, then get the port the server is listening on,
// and then do an api check. This approach allow us to start server with GRIST_PORT set to '0',
// which will listen on first available port, removing the need to hard code a port number.
try {
// wait for testing socket
while (!(await fse.pathExists(this.testingSocket))) {
await delay(200);
}
// create testing hooks and get own port
this.testingHooks = await connectTestingHooks(this.testingSocket);
const port: number = await this.testingHooks.getOwnPort();
this.serverUrl = `http://localhost:${port}`;
// wait for check
return (await fetch(`${this.serverUrl}/status/hooks`, {timeout: 1000})).ok;
} catch (err) {
return false;
}
}
private async _waitServerReady() {
// It's important to clear the timeout, because it can prevent node from exiting otherwise,
// which is annoying when running only this test for debugging.
let timeout: any;
const maxDelay = new Promise((resolve) => {
timeout = setTimeout(resolve, 30000);
});
try {
await Promise.race([
this.isServerReady(),
this._exitPromise.then(() => {
throw new Error("Server exited while waiting for it");
}),
maxDelay,
]);
} finally {
clearTimeout(timeout);
}
}
}