mirror of
https://github.com/gristlabs/grist-core.git
synced 2024-10-27 20:44:07 +00:00
Removing dependency on REDIS in webhook tests
This commit is contained in:
parent
7dc49f3c85
commit
0b6b8feb2b
@ -12,7 +12,7 @@
|
||||
"install:python3": "buildtools/prepare_python3.sh",
|
||||
"build:prod": "buildtools/build.sh",
|
||||
"start:prod": "sandbox/run.sh",
|
||||
"test": "GRIST_SESSION_COOKIE=grist_test_cookie GRIST_TEST_LOGIN=1 TEST_SUPPORT_API_KEY=api_key_for_support TEST_CLEAN_DATABASE=true NODE_PATH=_build:_build/stubs:_build/ext mocha ${DEBUG:+-b --no-exit} ${DEBUG:---forbid-only} -g ${GREP_TESTS:-''} _build/test/common/*.js _build/test/client/*.js _build/test/nbrowser/*.js _build/test/server/**/*.js _build/test/gen-server/**/*.js",
|
||||
"test": "GRIST_SESSION_COOKIE=grist_test_cookie GRIST_TEST_LOGIN=1 TEST_SUPPORT_API_KEY=api_key_for_support TEST_CLEAN_DATABASE=true NODE_PATH=_build:_build/stubs:_build/ext mocha ${DEBUG:+-b --no-exit} --slow 8000 ${DEBUG:---forbid-only} -g ${GREP_TESTS:-''} _build/test/common/*.js _build/test/client/*.js _build/test/nbrowser/*.js _build/test/server/**/*.js _build/test/gen-server/**/*.js",
|
||||
"test:nbrowser": "GRIST_SESSION_COOKIE=grist_test_cookie GRIST_TEST_LOGIN=1 TEST_SUPPORT_API_KEY=api_key_for_support TEST_CLEAN_DATABASE=true NODE_PATH=_build:_build/stubs:_build/ext mocha ${DEBUG:+-b --no-exit} ${DEBUG:---forbid-only} -g ${GREP_TESTS:-''} --slow 8000 _build/test/nbrowser/**/*.js",
|
||||
"test:client": "GRIST_SESSION_COOKIE=grist_test_cookie NODE_PATH=_build:_build/stubs:_build/ext mocha ${DEBUG:+'-b'} _build/test/client/**/*.js",
|
||||
"test:common": "GRIST_SESSION_COOKIE=grist_test_cookie NODE_PATH=_build:_build/stubs:_build/ext mocha ${DEBUG:+'-b'} _build/test/common/**/*.js",
|
||||
|
@ -2990,7 +2990,6 @@ function testDocApi() {
|
||||
|
||||
before(async function() {
|
||||
this.timeout(30000);
|
||||
if (!process.env.TEST_REDIS_URL) { this.skip(); }
|
||||
requests = {
|
||||
"add,update": [],
|
||||
"add": [],
|
||||
@ -3059,358 +3058,173 @@ function testDocApi() {
|
||||
}
|
||||
});
|
||||
}, webhooksTestPort);
|
||||
|
||||
redisCalls = [];
|
||||
redisMonitor = createClient(process.env.TEST_REDIS_URL);
|
||||
redisMonitor.monitor();
|
||||
redisMonitor.on("monitor", (_time: any, args: any, _rawReply: any) => {
|
||||
redisCalls.push(args);
|
||||
});
|
||||
});
|
||||
|
||||
after(async function() {
|
||||
if (!process.env.TEST_REDIS_URL) { this.skip(); }
|
||||
await serving.shutdown();
|
||||
await redisMonitor.quitAsync();
|
||||
});
|
||||
|
||||
it("delivers expected payloads from combinations of changes, with retrying and batching", async function() {
|
||||
// Create a test document.
|
||||
const ws1 = (await userApi.getOrgWorkspaces('current'))[0].id;
|
||||
const docId = await userApi.newDoc({name: 'testdoc'}, ws1);
|
||||
const doc = userApi.getDocAPI(docId);
|
||||
describe('table endpoints', function() {
|
||||
before(async function() {
|
||||
this.timeout(30000);
|
||||
// We rely on the REDIS server in this test.
|
||||
if (!process.env.TEST_REDIS_URL) { this.skip(); }
|
||||
requests = {
|
||||
"add,update": [],
|
||||
"add": [],
|
||||
"update": [],
|
||||
};
|
||||
|
||||
// For some reason B is turned into Numeric even when given bools
|
||||
await axios.post(`${serverUrl}/api/docs/${docId}/apply`, [
|
||||
['ModifyColumn', 'Table1', 'B', {type: 'Bool'}],
|
||||
], chimpy);
|
||||
redisCalls = [];
|
||||
redisMonitor = createClient(process.env.TEST_REDIS_URL);
|
||||
redisMonitor.monitor();
|
||||
redisMonitor.on("monitor", (_time: any, args: any, _rawReply: any) => {
|
||||
redisCalls.push(args);
|
||||
});
|
||||
});
|
||||
|
||||
// Make a webhook for every combination of event types
|
||||
const subscribeResponses = [];
|
||||
const webhookIds: Record<string, string> = {};
|
||||
for (const eventTypes of [
|
||||
["add"],
|
||||
["update"],
|
||||
["add", "update"],
|
||||
]) {
|
||||
const {data, status} = await axios.post(
|
||||
`${serverUrl}/api/docs/${docId}/tables/Table1/_subscribe`,
|
||||
{eventTypes, url: `${serving.url}/${eventTypes}`, isReadyColumn: "B"}, chimpy
|
||||
after(async function() {
|
||||
if (!process.env.TEST_REDIS_URL) { this.skip(); }
|
||||
await redisMonitor.quitAsync();
|
||||
});
|
||||
|
||||
it("delivers expected payloads from combinations of changes, with retrying and batching", async function() {
|
||||
// Create a test document.
|
||||
const ws1 = (await userApi.getOrgWorkspaces('current'))[0].id;
|
||||
const docId = await userApi.newDoc({name: 'testdoc'}, ws1);
|
||||
const doc = userApi.getDocAPI(docId);
|
||||
|
||||
// For some reason B is turned into Numeric even when given bools
|
||||
await axios.post(`${serverUrl}/api/docs/${docId}/apply`, [
|
||||
['ModifyColumn', 'Table1', 'B', {type: 'Bool'}],
|
||||
], chimpy);
|
||||
|
||||
// Make a webhook for every combination of event types
|
||||
const subscribeResponses = [];
|
||||
const webhookIds: Record<string, string> = {};
|
||||
for (const eventTypes of [
|
||||
["add"],
|
||||
["update"],
|
||||
["add", "update"],
|
||||
]) {
|
||||
const {data, status} = await axios.post(
|
||||
`${serverUrl}/api/docs/${docId}/tables/Table1/_subscribe`,
|
||||
{eventTypes, url: `${serving.url}/${eventTypes}`, isReadyColumn: "B"}, chimpy
|
||||
);
|
||||
assert.equal(status, 200);
|
||||
subscribeResponses.push(data);
|
||||
webhookIds[data.webhookId] = String(eventTypes);
|
||||
}
|
||||
|
||||
// Add and update some rows, trigger some events
|
||||
// Values of A where B is true and thus the record is ready are [1, 4, 7, 8]
|
||||
// So those are the values seen in expectedEvents
|
||||
await doc.addRows("Table1", {
|
||||
A: [1, 2],
|
||||
B: [true, false], // 1 is ready, 2 is not ready yet
|
||||
});
|
||||
await doc.updateRows("Table1", {id: [2], A: [3]}); // still not ready
|
||||
await doc.updateRows("Table1", {id: [2], A: [4], B: [true]}); // ready!
|
||||
await doc.updateRows("Table1", {id: [2], A: [5], B: [false]}); // not ready again
|
||||
await doc.updateRows("Table1", {id: [2], A: [6]}); // still not ready
|
||||
await doc.updateRows("Table1", {id: [2], A: [7], B: [true]}); // ready!
|
||||
await doc.updateRows("Table1", {id: [2], A: [8]}); // still ready!
|
||||
|
||||
// The end result here is additions for column A (now A3) with values [13, 15, 18]
|
||||
// and an update for 101
|
||||
await axios.post(`${serverUrl}/api/docs/${docId}/apply`, [
|
||||
['BulkAddRecord', 'Table1', [3, 4, 5, 6], {A: [9, 10, 11, 12], B: [true, true, false, false]}],
|
||||
['BulkUpdateRecord', 'Table1', [1, 2, 3, 4, 5, 6], {
|
||||
A: [101, 102, 13, 14, 15, 16],
|
||||
B: [true, false, true, false, true, false],
|
||||
}],
|
||||
|
||||
['RenameColumn', 'Table1', 'A', 'A3'],
|
||||
['RenameColumn', 'Table1', 'B', 'B3'],
|
||||
|
||||
['RenameTable', 'Table1', 'Table12'],
|
||||
|
||||
// FIXME a double rename A->A2->A3 doesn't seem to get summarised correctly
|
||||
// ['RenameColumn', 'Table12', 'A2', 'A3'],
|
||||
// ['RenameColumn', 'Table12', 'B2', 'B3'],
|
||||
|
||||
['RemoveColumn', 'Table12', 'C'],
|
||||
], chimpy);
|
||||
|
||||
// FIXME record changes after a RenameTable in the same bundle
|
||||
// don't appear in the action summary
|
||||
await axios.post(`${serverUrl}/api/docs/${docId}/apply`, [
|
||||
['AddRecord', 'Table12', 7, {A3: 17, B3: false}],
|
||||
['UpdateRecord', 'Table12', 7, {A3: 18, B3: true}],
|
||||
|
||||
['AddRecord', 'Table12', 8, {A3: 19, B3: true}],
|
||||
['UpdateRecord', 'Table12', 8, {A3: 20, B3: false}],
|
||||
|
||||
['AddRecord', 'Table12', 9, {A3: 20, B3: true}],
|
||||
['RemoveRecord', 'Table12', 9],
|
||||
], chimpy);
|
||||
|
||||
// Add 200 rows. These become the `expected200AddEvents`
|
||||
await doc.addRows("Table12", {
|
||||
A3: _.range(200, 400),
|
||||
B3: arrayRepeat(200, true),
|
||||
});
|
||||
|
||||
await receivedLastEvent;
|
||||
|
||||
// Unsubscribe
|
||||
await Promise.all(subscribeResponses.map(async subscribeResponse => {
|
||||
const unsubscribeResponse = await axios.post(
|
||||
`${serverUrl}/api/docs/${docId}/tables/Table12/_unsubscribe`,
|
||||
subscribeResponse, chimpy
|
||||
);
|
||||
assert.equal(unsubscribeResponse.status, 200);
|
||||
assert.deepEqual(unsubscribeResponse.data, {success: true});
|
||||
}));
|
||||
|
||||
// Further changes should generate no events because the triggers are gone
|
||||
await doc.addRows("Table12", {
|
||||
A3: [88, 99],
|
||||
B3: [true, false],
|
||||
});
|
||||
|
||||
assert.deepEqual(requests, expectedRequests);
|
||||
|
||||
// Check that the events were all pushed to the redis queue
|
||||
const queueRedisCalls = redisCalls.filter(args => args[1] === "webhook-queue-" + docId);
|
||||
const redisPushes = _.chain(queueRedisCalls)
|
||||
.filter(args => args[0] === "rpush") // Array<["rpush", key, ...events: string[]]>
|
||||
.flatMap(args => args.slice(2)) // events: string[]
|
||||
.map(JSON.parse) // events: WebhookEvent[]
|
||||
.groupBy('id') // {[webHookId: string]: WebhookEvent[]}
|
||||
.mapKeys((_value, key) => webhookIds[key]) // {[eventTypes: 'add'|'update'|'add,update']: WebhookEvent[]}
|
||||
.mapValues(group => _.map(group, 'payload')) // {[eventTypes: 'add'|'update'|'add,update']: RowRecord[]}
|
||||
.value();
|
||||
const expectedPushes = _.mapValues(expectedRequests, value => _.flatten(value));
|
||||
assert.deepEqual(redisPushes, expectedPushes);
|
||||
|
||||
// Check that the events were all removed from the redis queue
|
||||
const redisTrims = queueRedisCalls.filter(args => args[0] === "ltrim")
|
||||
.map(([,, start, end]) => {
|
||||
assert.equal(end, '-1');
|
||||
start = Number(start);
|
||||
assert.isTrue(start > 0);
|
||||
return start;
|
||||
});
|
||||
const expectedTrims = Object.values(redisPushes).map(value => value.length);
|
||||
assert.equal(
|
||||
_.sum(redisTrims),
|
||||
_.sum(expectedTrims),
|
||||
);
|
||||
assert.equal(status, 200);
|
||||
subscribeResponses.push(data);
|
||||
webhookIds[data.webhookId] = String(eventTypes);
|
||||
}
|
||||
|
||||
// Add and update some rows, trigger some events
|
||||
// Values of A where B is true and thus the record is ready are [1, 4, 7, 8]
|
||||
// So those are the values seen in expectedEvents
|
||||
await doc.addRows("Table1", {
|
||||
A: [1, 2],
|
||||
B: [true, false], // 1 is ready, 2 is not ready yet
|
||||
});
|
||||
await doc.updateRows("Table1", {id: [2], A: [3]}); // still not ready
|
||||
await doc.updateRows("Table1", {id: [2], A: [4], B: [true]}); // ready!
|
||||
await doc.updateRows("Table1", {id: [2], A: [5], B: [false]}); // not ready again
|
||||
await doc.updateRows("Table1", {id: [2], A: [6]}); // still not ready
|
||||
await doc.updateRows("Table1", {id: [2], A: [7], B: [true]}); // ready!
|
||||
await doc.updateRows("Table1", {id: [2], A: [8]}); // still ready!
|
||||
|
||||
// The end result here is additions for column A (now A3) with values [13, 15, 18]
|
||||
// and an update for 101
|
||||
await axios.post(`${serverUrl}/api/docs/${docId}/apply`, [
|
||||
['BulkAddRecord', 'Table1', [3, 4, 5, 6], {A: [9, 10, 11, 12], B: [true, true, false, false]}],
|
||||
['BulkUpdateRecord', 'Table1', [1, 2, 3, 4, 5, 6], {
|
||||
A: [101, 102, 13, 14, 15, 16],
|
||||
B: [true, false, true, false, true, false],
|
||||
}],
|
||||
|
||||
['RenameColumn', 'Table1', 'A', 'A3'],
|
||||
['RenameColumn', 'Table1', 'B', 'B3'],
|
||||
|
||||
['RenameTable', 'Table1', 'Table12'],
|
||||
|
||||
// FIXME a double rename A->A2->A3 doesn't seem to get summarised correctly
|
||||
// ['RenameColumn', 'Table12', 'A2', 'A3'],
|
||||
// ['RenameColumn', 'Table12', 'B2', 'B3'],
|
||||
|
||||
['RemoveColumn', 'Table12', 'C'],
|
||||
], chimpy);
|
||||
|
||||
// FIXME record changes after a RenameTable in the same bundle
|
||||
// don't appear in the action summary
|
||||
await axios.post(`${serverUrl}/api/docs/${docId}/apply`, [
|
||||
['AddRecord', 'Table12', 7, {A3: 17, B3: false}],
|
||||
['UpdateRecord', 'Table12', 7, {A3: 18, B3: true}],
|
||||
|
||||
['AddRecord', 'Table12', 8, {A3: 19, B3: true}],
|
||||
['UpdateRecord', 'Table12', 8, {A3: 20, B3: false}],
|
||||
|
||||
['AddRecord', 'Table12', 9, {A3: 20, B3: true}],
|
||||
['RemoveRecord', 'Table12', 9],
|
||||
], chimpy);
|
||||
|
||||
// Add 200 rows. These become the `expected200AddEvents`
|
||||
await doc.addRows("Table12", {
|
||||
A3: _.range(200, 400),
|
||||
B3: arrayRepeat(200, true),
|
||||
});
|
||||
|
||||
await receivedLastEvent;
|
||||
|
||||
// Unsubscribe
|
||||
await Promise.all(subscribeResponses.map(async subscribeResponse => {
|
||||
const unsubscribeResponse = await axios.post(
|
||||
`${serverUrl}/api/docs/${docId}/tables/Table12/_unsubscribe`,
|
||||
subscribeResponse, chimpy
|
||||
);
|
||||
assert.equal(unsubscribeResponse.status, 200);
|
||||
assert.deepEqual(unsubscribeResponse.data, {success: true});
|
||||
}));
|
||||
|
||||
// Further changes should generate no events because the triggers are gone
|
||||
await doc.addRows("Table12", {
|
||||
A3: [88, 99],
|
||||
B3: [true, false],
|
||||
});
|
||||
|
||||
assert.deepEqual(requests, expectedRequests);
|
||||
|
||||
// Check that the events were all pushed to the redis queue
|
||||
const queueRedisCalls = redisCalls.filter(args => args[1] === "webhook-queue-" + docId);
|
||||
const redisPushes = _.chain(queueRedisCalls)
|
||||
.filter(args => args[0] === "rpush") // Array<["rpush", key, ...events: string[]]>
|
||||
.flatMap(args => args.slice(2)) // events: string[]
|
||||
.map(JSON.parse) // events: WebhookEvent[]
|
||||
.groupBy('id') // {[webHookId: string]: WebhookEvent[]}
|
||||
.mapKeys((_value, key) => webhookIds[key]) // {[eventTypes: 'add'|'update'|'add,update']: WebhookEvent[]}
|
||||
.mapValues(group => _.map(group, 'payload')) // {[eventTypes: 'add'|'update'|'add,update']: RowRecord[]}
|
||||
.value();
|
||||
const expectedPushes = _.mapValues(expectedRequests, value => _.flatten(value));
|
||||
assert.deepEqual(redisPushes, expectedPushes);
|
||||
|
||||
// Check that the events were all removed from the redis queue
|
||||
const redisTrims = queueRedisCalls.filter(args => args[0] === "ltrim")
|
||||
.map(([,, start, end]) => {
|
||||
assert.equal(end, '-1');
|
||||
start = Number(start);
|
||||
assert.isTrue(start > 0);
|
||||
return start;
|
||||
});
|
||||
const expectedTrims = Object.values(redisPushes).map(value => value.length);
|
||||
assert.equal(
|
||||
_.sum(redisTrims),
|
||||
_.sum(expectedTrims),
|
||||
);
|
||||
|
||||
});
|
||||
|
||||
it("should clear the outgoing queue", async() => {
|
||||
// Create a test document.
|
||||
const ws1 = (await userApi.getOrgWorkspaces('current'))[0].id;
|
||||
const docId = await userApi.newDoc({name: 'testdoc2'}, ws1);
|
||||
const doc = userApi.getDocAPI(docId);
|
||||
await axios.post(`${serverUrl}/api/docs/${docId}/apply`, [
|
||||
['ModifyColumn', 'Table1', 'B', {type: 'Bool'}],
|
||||
], chimpy);
|
||||
|
||||
// Try to clear the queue, even if it is empty.
|
||||
await clearQueue(docId);
|
||||
|
||||
const cleanup: (() => Promise<any>)[] = [];
|
||||
|
||||
// Subscribe a valid webhook endpoint.
|
||||
cleanup.push(await autoSubscribe('200', docId));
|
||||
// Subscribe an invalid webhook endpoint.
|
||||
cleanup.push(await autoSubscribe('404', docId));
|
||||
|
||||
// Prepare signals, we will be waiting for those two to be called.
|
||||
successCalled.reset();
|
||||
notFoundCalled.reset();
|
||||
// Trigger both events.
|
||||
await doc.addRows("Table1", {
|
||||
A: [1],
|
||||
B: [true],
|
||||
});
|
||||
|
||||
// Wait for both of them to be called (this is correct order)
|
||||
await successCalled.waitAndReset();
|
||||
await notFoundCalled.waitAndReset();
|
||||
|
||||
// Broken endpoint will be called multiple times here, and any subsequent triggers for working
|
||||
// endpoint won't be called.
|
||||
await notFoundCalled.waitAndReset();
|
||||
|
||||
// But the working endpoint won't be called more then once.
|
||||
assert.isFalse(successCalled.called());
|
||||
|
||||
// Trigger second event.
|
||||
await doc.addRows("Table1", {
|
||||
A: [2],
|
||||
B: [true],
|
||||
});
|
||||
// Error endpoint will be called with the first row (still).
|
||||
const firstRow = await notFoundCalled.waitAndReset();
|
||||
assert.deepEqual(firstRow, 1);
|
||||
|
||||
// But the working endpoint won't be called till we reset the queue.
|
||||
assert.isFalse(successCalled.called());
|
||||
|
||||
// Now reset the queue.
|
||||
await clearQueue(docId);
|
||||
|
||||
assert.isFalse(successCalled.called());
|
||||
assert.isFalse(notFoundCalled.called());
|
||||
|
||||
// Prepare for new calls.
|
||||
successCalled.reset();
|
||||
notFoundCalled.reset();
|
||||
// Trigger them.
|
||||
await doc.addRows("Table1", {
|
||||
A: [3],
|
||||
B: [true],
|
||||
});
|
||||
// We will receive data from the 3rd row only (the second one was omitted).
|
||||
let thirdRow = await successCalled.waitAndReset();
|
||||
assert.deepEqual(thirdRow, 3);
|
||||
thirdRow = await notFoundCalled.waitAndReset();
|
||||
assert.deepEqual(thirdRow, 3);
|
||||
// And the situation will be the same, the working endpoint won't be called till we reset the queue, but
|
||||
// the error endpoint will be called with the third row multiple times.
|
||||
await notFoundCalled.waitAndReset();
|
||||
assert.isFalse(successCalled.called());
|
||||
|
||||
// Cleanup everything, we will now test request timeouts.
|
||||
await Promise.all(cleanup.map(fn => fn())).finally(() => cleanup.length = 0);
|
||||
await clearQueue(docId);
|
||||
|
||||
// Create 2 webhooks, one that is very long.
|
||||
cleanup.push(await autoSubscribe('200', docId));
|
||||
cleanup.push(await autoSubscribe('long', docId));
|
||||
successCalled.reset();
|
||||
longFinished.reset();
|
||||
longStarted.reset();
|
||||
// Trigger them.
|
||||
await doc.addRows("Table1", {
|
||||
A: [4],
|
||||
B: [true],
|
||||
});
|
||||
// 200 will be called immediately.
|
||||
await successCalled.waitAndReset();
|
||||
// Long will be started immediately.
|
||||
await longStarted.waitAndReset();
|
||||
// But it won't be finished.
|
||||
assert.isFalse(longFinished.called());
|
||||
// It will be aborted.
|
||||
controller.abort();
|
||||
assert.deepEqual(await longFinished.waitAndReset(), [408, 4]);
|
||||
|
||||
// Trigger another event.
|
||||
await doc.addRows("Table1", {
|
||||
A: [5],
|
||||
B: [true],
|
||||
});
|
||||
// We are stuck once again on the long call. But this time we won't
|
||||
// abort it till the end of this test.
|
||||
assert.deepEqual(await successCalled.waitAndReset(), 5);
|
||||
assert.deepEqual(await longStarted.waitAndReset(), 5);
|
||||
assert.isFalse(longFinished.called());
|
||||
|
||||
// Remember this controller for cleanup.
|
||||
const controller5 = controller;
|
||||
// Trigger another event.
|
||||
await doc.addRows("Table1", {
|
||||
A: [6],
|
||||
B: [true],
|
||||
});
|
||||
// We are now completely stuck on the 5th row webhook.
|
||||
assert.isFalse(successCalled.called());
|
||||
assert.isFalse(longFinished.called());
|
||||
// Clear the queue, it will free webhooks requests, but it won't cancel long handler on the external server
|
||||
// so it is still waiting.
|
||||
assert.isTrue((await axios.delete(
|
||||
`${serverUrl}/api/docs/${docId}/webhooks/queue`, chimpy
|
||||
)).status === 200);
|
||||
// Now we can release the stuck request.
|
||||
controller5.abort();
|
||||
// We will be cancelled from the 5th row.
|
||||
assert.deepEqual(await longFinished.waitAndReset(), [408, 5]);
|
||||
|
||||
// We won't be called for the 6th row at all, as it was stuck and the queue was purged.
|
||||
assert.isFalse(successCalled.called());
|
||||
assert.isFalse(longStarted.called());
|
||||
|
||||
// Trigger next event.
|
||||
await doc.addRows("Table1", {
|
||||
A: [7],
|
||||
B: [true],
|
||||
});
|
||||
// We will be called once again with a new 7th row.
|
||||
assert.deepEqual(await successCalled.waitAndReset(), 7);
|
||||
assert.deepEqual(await longStarted.waitAndReset(), 7);
|
||||
// But we are stuck again.
|
||||
assert.isFalse(longFinished.called());
|
||||
// And we can abort current request from 7th row (6th row was skipped).
|
||||
controller.abort();
|
||||
assert.deepEqual(await longFinished.waitAndReset(), [408, 7]);
|
||||
|
||||
// Cleanup all
|
||||
await Promise.all(cleanup.map(fn => fn())).finally(() => cleanup.length = 0);
|
||||
await clearQueue(docId);
|
||||
});
|
||||
|
||||
it("should not call to a deleted webhook", async() => {
|
||||
// Create a test document.
|
||||
const ws1 = (await userApi.getOrgWorkspaces('current'))[0].id;
|
||||
const docId = await userApi.newDoc({name: 'testdoc4'}, ws1);
|
||||
const doc = userApi.getDocAPI(docId);
|
||||
await axios.post(`${serverUrl}/api/docs/${docId}/apply`, [
|
||||
['ModifyColumn', 'Table1', 'B', {type: 'Bool'}],
|
||||
], chimpy);
|
||||
|
||||
// Subscribe to 2 webhooks, we will remove the second one.
|
||||
const webhook1 = await autoSubscribe('probe', docId);
|
||||
const webhook2 = await autoSubscribe('200', docId);
|
||||
|
||||
probeStatus = 200;
|
||||
successCalled.reset();
|
||||
longFinished.reset();
|
||||
// Trigger them.
|
||||
await doc.addRows("Table1", {
|
||||
A: [1],
|
||||
B: [true],
|
||||
});
|
||||
|
||||
// Wait for the first one to be called.
|
||||
await longStarted.waitAndReset();
|
||||
// Now why we are on the call remove the second one.
|
||||
// Check that it is queued.
|
||||
const stats = await readStats(docId);
|
||||
assert.equal(2, _.sum(stats.map(x => x.usage?.numWaiting ?? 0)));
|
||||
await webhook2();
|
||||
// Let the first one finish.
|
||||
controller.abort();
|
||||
await longFinished.waitAndReset();
|
||||
// The second one is not called.
|
||||
assert.isFalse(successCalled.called());
|
||||
// Triggering next event, we will get only calls to the probe (first webhook).
|
||||
await doc.addRows("Table1", {
|
||||
A: [2],
|
||||
B: [true],
|
||||
});
|
||||
await longStarted.waitAndReset();
|
||||
controller.abort();
|
||||
await longFinished.waitAndReset();
|
||||
|
||||
// Unsubscribe.
|
||||
await webhook1();
|
||||
});
|
||||
|
||||
describe("/webhooks endpoint", function() {
|
||||
let docId: string;
|
||||
let doc: DocAPI;
|
||||
let stats: WebhookSummary[];
|
||||
before(async() => {
|
||||
before(async function() {
|
||||
// Create a test document.
|
||||
const ws1 = (await userApi.getOrgWorkspaces('current'))[0].id;
|
||||
docId = await userApi.newDoc({name: 'testdoc2'}, ws1);
|
||||
@ -3427,6 +3241,207 @@ function testDocApi() {
|
||||
}, 1000, 200);
|
||||
};
|
||||
|
||||
it("should clear the outgoing queue", async() => {
|
||||
// Create a test document.
|
||||
const ws1 = (await userApi.getOrgWorkspaces('current'))[0].id;
|
||||
const docId = await userApi.newDoc({name: 'testdoc2'}, ws1);
|
||||
const doc = userApi.getDocAPI(docId);
|
||||
await axios.post(`${serverUrl}/api/docs/${docId}/apply`, [
|
||||
['ModifyColumn', 'Table1', 'B', {type: 'Bool'}],
|
||||
], chimpy);
|
||||
|
||||
// Try to clear the queue, even if it is empty.
|
||||
await clearQueue(docId);
|
||||
|
||||
const cleanup: (() => Promise<any>)[] = [];
|
||||
|
||||
// Subscribe a valid webhook endpoint.
|
||||
cleanup.push(await autoSubscribe('200', docId));
|
||||
// Subscribe an invalid webhook endpoint.
|
||||
cleanup.push(await autoSubscribe('404', docId));
|
||||
|
||||
// Prepare signals, we will be waiting for those two to be called.
|
||||
successCalled.reset();
|
||||
notFoundCalled.reset();
|
||||
// Trigger both events.
|
||||
await doc.addRows("Table1", {
|
||||
A: [1],
|
||||
B: [true],
|
||||
});
|
||||
|
||||
// Wait for both of them to be called (this is correct order)
|
||||
await successCalled.waitAndReset();
|
||||
await notFoundCalled.waitAndReset();
|
||||
|
||||
// Broken endpoint will be called multiple times here, and any subsequent triggers for working
|
||||
// endpoint won't be called.
|
||||
await notFoundCalled.waitAndReset();
|
||||
|
||||
// But the working endpoint won't be called more then once.
|
||||
assert.isFalse(successCalled.called());
|
||||
|
||||
// Trigger second event.
|
||||
await doc.addRows("Table1", {
|
||||
A: [2],
|
||||
B: [true],
|
||||
});
|
||||
// Error endpoint will be called with the first row (still).
|
||||
const firstRow = await notFoundCalled.waitAndReset();
|
||||
assert.deepEqual(firstRow, 1);
|
||||
|
||||
// But the working endpoint won't be called till we reset the queue.
|
||||
assert.isFalse(successCalled.called());
|
||||
|
||||
// Now reset the queue.
|
||||
await clearQueue(docId);
|
||||
|
||||
assert.isFalse(successCalled.called());
|
||||
assert.isFalse(notFoundCalled.called());
|
||||
|
||||
// Prepare for new calls.
|
||||
successCalled.reset();
|
||||
notFoundCalled.reset();
|
||||
// Trigger them.
|
||||
await doc.addRows("Table1", {
|
||||
A: [3],
|
||||
B: [true],
|
||||
});
|
||||
// We will receive data from the 3rd row only (the second one was omitted).
|
||||
let thirdRow = await successCalled.waitAndReset();
|
||||
assert.deepEqual(thirdRow, 3);
|
||||
thirdRow = await notFoundCalled.waitAndReset();
|
||||
assert.deepEqual(thirdRow, 3);
|
||||
// And the situation will be the same, the working endpoint won't be called till we reset the queue, but
|
||||
// the error endpoint will be called with the third row multiple times.
|
||||
await notFoundCalled.waitAndReset();
|
||||
assert.isFalse(successCalled.called());
|
||||
|
||||
// Cleanup everything, we will now test request timeouts.
|
||||
await Promise.all(cleanup.map(fn => fn())).finally(() => cleanup.length = 0);
|
||||
await clearQueue(docId);
|
||||
|
||||
// Create 2 webhooks, one that is very long.
|
||||
cleanup.push(await autoSubscribe('200', docId));
|
||||
cleanup.push(await autoSubscribe('long', docId));
|
||||
successCalled.reset();
|
||||
longFinished.reset();
|
||||
longStarted.reset();
|
||||
// Trigger them.
|
||||
await doc.addRows("Table1", {
|
||||
A: [4],
|
||||
B: [true],
|
||||
});
|
||||
// 200 will be called immediately.
|
||||
await successCalled.waitAndReset();
|
||||
// Long will be started immediately.
|
||||
await longStarted.waitAndReset();
|
||||
// But it won't be finished.
|
||||
assert.isFalse(longFinished.called());
|
||||
// It will be aborted.
|
||||
controller.abort();
|
||||
assert.deepEqual(await longFinished.waitAndReset(), [408, 4]);
|
||||
|
||||
// Trigger another event.
|
||||
await doc.addRows("Table1", {
|
||||
A: [5],
|
||||
B: [true],
|
||||
});
|
||||
// We are stuck once again on the long call. But this time we won't
|
||||
// abort it till the end of this test.
|
||||
assert.deepEqual(await successCalled.waitAndReset(), 5);
|
||||
assert.deepEqual(await longStarted.waitAndReset(), 5);
|
||||
assert.isFalse(longFinished.called());
|
||||
|
||||
// Remember this controller for cleanup.
|
||||
const controller5 = controller;
|
||||
// Trigger another event.
|
||||
await doc.addRows("Table1", {
|
||||
A: [6],
|
||||
B: [true],
|
||||
});
|
||||
// We are now completely stuck on the 5th row webhook.
|
||||
assert.isFalse(successCalled.called());
|
||||
assert.isFalse(longFinished.called());
|
||||
// Clear the queue, it will free webhooks requests, but it won't cancel long handler on the external server
|
||||
// so it is still waiting.
|
||||
assert.isTrue((await axios.delete(
|
||||
`${serverUrl}/api/docs/${docId}/webhooks/queue`, chimpy
|
||||
)).status === 200);
|
||||
// Now we can release the stuck request.
|
||||
controller5.abort();
|
||||
// We will be cancelled from the 5th row.
|
||||
assert.deepEqual(await longFinished.waitAndReset(), [408, 5]);
|
||||
|
||||
// We won't be called for the 6th row at all, as it was stuck and the queue was purged.
|
||||
assert.isFalse(successCalled.called());
|
||||
assert.isFalse(longStarted.called());
|
||||
|
||||
// Trigger next event.
|
||||
await doc.addRows("Table1", {
|
||||
A: [7],
|
||||
B: [true],
|
||||
});
|
||||
// We will be called once again with a new 7th row.
|
||||
assert.deepEqual(await successCalled.waitAndReset(), 7);
|
||||
assert.deepEqual(await longStarted.waitAndReset(), 7);
|
||||
// But we are stuck again.
|
||||
assert.isFalse(longFinished.called());
|
||||
// And we can abort current request from 7th row (6th row was skipped).
|
||||
controller.abort();
|
||||
assert.deepEqual(await longFinished.waitAndReset(), [408, 7]);
|
||||
|
||||
// Cleanup all
|
||||
await Promise.all(cleanup.map(fn => fn())).finally(() => cleanup.length = 0);
|
||||
await clearQueue(docId);
|
||||
});
|
||||
|
||||
it("should not call to a deleted webhook", async() => {
|
||||
// Create a test document.
|
||||
const ws1 = (await userApi.getOrgWorkspaces('current'))[0].id;
|
||||
const docId = await userApi.newDoc({name: 'testdoc4'}, ws1);
|
||||
const doc = userApi.getDocAPI(docId);
|
||||
await axios.post(`${serverUrl}/api/docs/${docId}/apply`, [
|
||||
['ModifyColumn', 'Table1', 'B', {type: 'Bool'}],
|
||||
], chimpy);
|
||||
|
||||
// Subscribe to 2 webhooks, we will remove the second one.
|
||||
const webhook1 = await autoSubscribe('probe', docId);
|
||||
const webhook2 = await autoSubscribe('200', docId);
|
||||
|
||||
probeStatus = 200;
|
||||
successCalled.reset();
|
||||
longFinished.reset();
|
||||
// Trigger them.
|
||||
await doc.addRows("Table1", {
|
||||
A: [1],
|
||||
B: [true],
|
||||
});
|
||||
|
||||
// Wait for the first one to be called.
|
||||
await longStarted.waitAndReset();
|
||||
// Now why we are on the call remove the second one.
|
||||
// Check that it is queued.
|
||||
const stats = await readStats(docId);
|
||||
assert.equal(2, _.sum(stats.map(x => x.usage?.numWaiting ?? 0)));
|
||||
await webhook2();
|
||||
// Let the first one finish.
|
||||
controller.abort();
|
||||
await longFinished.waitAndReset();
|
||||
// The second one is not called.
|
||||
assert.isFalse(successCalled.called());
|
||||
// Triggering next event, we will get only calls to the probe (first webhook).
|
||||
await doc.addRows("Table1", {
|
||||
A: [2],
|
||||
B: [true],
|
||||
});
|
||||
await longStarted.waitAndReset();
|
||||
controller.abort();
|
||||
await longFinished.waitAndReset();
|
||||
|
||||
// Unsubscribe.
|
||||
await webhook1();
|
||||
});
|
||||
|
||||
it("should return statistics", async() => {
|
||||
await clearQueue(docId);
|
||||
// Read stats, it should be empty.
|
||||
@ -3757,7 +3772,6 @@ function testDocApi() {
|
||||
await unsubscribe(docId, webhook4);
|
||||
});
|
||||
});
|
||||
|
||||
});
|
||||
|
||||
describe("Allowed Origin", () => {
|
||||
|
Loading…
Reference in New Issue
Block a user