mirror of
https://github.com/gristlabs/grist-core.git
synced 2024-10-27 20:44:07 +00:00
(core) Add BulkAddOrUpdateRecord action for efficiency
Summary: This diff adds a new `BulkAddOrUpdateRecord` user action which is what is sounds like: - A bulk version of the existing `AddOrUpdateRecord` action. - Much more efficient for operating on many records than applying many individual actions. - Column values are specified as maps from `colId` to arrays of values as usual. - Produces bulk versions of `AddRecord` and `UpdateRecord` actions instead of many individual actions. Examples of users wanting to use something like `AddOrUpdateRecord` with large numbers of records: - https://grist.slack.com/archives/C0234CPPXPA/p1651789710290879 - https://grist.slack.com/archives/C0234CPPXPA/p1660743493480119 - https://grist.slack.com/archives/C0234CPPXPA/p1660333148491559 - https://grist.slack.com/archives/C0234CPPXPA/p1663069291726159 I tested what made many `AddOrUpdateRecord` actions slow in the first place. It was almost entirely due to producing many individual `AddRecord` user actions. About half of that time was for processing the resulting `AddRecord` doc actions. Lookups and updates were not a problem. With these changes, the slowness is gone. The Python user action implementation is more complex but there are no surprises. The JS API now groups `records` based on the keys of `require` and `fields` so that `BulkAddOrUpdateRecord` can be applied to each group. Test Plan: Update and extend Python and DocApi tests. Reviewers: jarek, paulfitz Reviewed By: jarek, paulfitz Subscribers: jarek Differential Revision: https://phab.getgrist.com/D3642
This commit is contained in:
parent
df65219729
commit
1864b7ba5d
@ -355,6 +355,10 @@ export function parseUserAction(ua: UserAction, docData: DocData): UserAction {
|
|||||||
ua = _parseUserActionColValues(ua, docData, false, 2);
|
ua = _parseUserActionColValues(ua, docData, false, 2);
|
||||||
ua = _parseUserActionColValues(ua, docData, false, 3);
|
ua = _parseUserActionColValues(ua, docData, false, 3);
|
||||||
return ua;
|
return ua;
|
||||||
|
case 'BulkAddOrUpdateRecord':
|
||||||
|
ua = _parseUserActionColValues(ua, docData, true, 2);
|
||||||
|
ua = _parseUserActionColValues(ua, docData, true, 3);
|
||||||
|
return ua;
|
||||||
default:
|
default:
|
||||||
return ua;
|
return ua;
|
||||||
}
|
}
|
||||||
|
@ -5,6 +5,7 @@ import { arrayRepeat } from './gutil';
|
|||||||
import flatMap = require('lodash/flatMap');
|
import flatMap = require('lodash/flatMap');
|
||||||
import isEqual = require('lodash/isEqual');
|
import isEqual = require('lodash/isEqual');
|
||||||
import pick = require('lodash/pick');
|
import pick = require('lodash/pick');
|
||||||
|
import groupBy = require('lodash/groupBy');
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An implementation of the TableOperations interface, given a platform
|
* An implementation of the TableOperations interface, given a platform
|
||||||
@ -59,8 +60,21 @@ export class TableOperationsImpl implements TableOperations {
|
|||||||
allow_empty_require: upsertOptions?.allowEmptyRequire
|
allow_empty_require: upsertOptions?.allowEmptyRequire
|
||||||
};
|
};
|
||||||
const recordOptions: OpOptions = pick(upsertOptions, 'parseStrings');
|
const recordOptions: OpOptions = pick(upsertOptions, 'parseStrings');
|
||||||
const actions = records.map(rec =>
|
|
||||||
["AddOrUpdateRecord", tableId, rec.require, rec.fields || {}, options]);
|
// Group records based on having the same keys in `require` and `fields`.
|
||||||
|
// A single bulk action will be applied to each group.
|
||||||
|
// We don't want one bulk action for all records that might have different shapes,
|
||||||
|
// because that would require filling arrays with null values.
|
||||||
|
const recGroups = groupBy(records, rec => {
|
||||||
|
const requireKeys = Object.keys(rec.require).sort().join(',');
|
||||||
|
const fieldsKeys = Object.keys(rec.fields || {}).sort().join(',');
|
||||||
|
return `${requireKeys}:${fieldsKeys}`;
|
||||||
|
});
|
||||||
|
const actions = Object.values(recGroups).map(group => {
|
||||||
|
const require = convertToBulkColValues(group.map(r => ({fields: r.require})));
|
||||||
|
const fields = convertToBulkColValues(group.map(r => ({fields: r.fields || {}})));
|
||||||
|
return ["BulkAddOrUpdateRecord", tableId, require, fields, options];
|
||||||
|
});
|
||||||
await this._applyUserActions(tableId, [...fieldNames(records)],
|
await this._applyUserActions(tableId, [...fieldNames(records)],
|
||||||
actions, recordOptions);
|
actions, recordOptions);
|
||||||
return [];
|
return [];
|
||||||
|
@ -77,6 +77,10 @@ function isAclTable(tableId: string): boolean {
|
|||||||
return ['_grist_ACLRules', '_grist_ACLResources'].includes(tableId);
|
return ['_grist_ACLRules', '_grist_ACLResources'].includes(tableId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function isAddOrUpdateRecordAction(a: UserAction): boolean {
|
||||||
|
return ['AddOrUpdateRecord', 'BulkAddOrUpdateRecord'].includes(String(a[0]));
|
||||||
|
}
|
||||||
|
|
||||||
// A list of key metadata tables that need special handling. Other metadata tables may
|
// A list of key metadata tables that need special handling. Other metadata tables may
|
||||||
// refer to material in some of these tables but don't need special handling.
|
// refer to material in some of these tables but don't need special handling.
|
||||||
// TODO: there are other metadata tables that would need access control, or redesign -
|
// TODO: there are other metadata tables that would need access control, or redesign -
|
||||||
@ -128,8 +132,9 @@ const OTHER_RECOGNIZED_ACTIONS = new Set([
|
|||||||
'BulkRemoveRecord',
|
'BulkRemoveRecord',
|
||||||
'ReplaceTableData',
|
'ReplaceTableData',
|
||||||
|
|
||||||
// A data action handled specially because of read needs.
|
// Data actions handled specially because of read needs.
|
||||||
'AddOrUpdateRecord',
|
'AddOrUpdateRecord',
|
||||||
|
'BulkAddOrUpdateRecord',
|
||||||
|
|
||||||
// Groups of actions.
|
// Groups of actions.
|
||||||
'ApplyDocActions',
|
'ApplyDocActions',
|
||||||
@ -979,21 +984,22 @@ export class GranularAccess implements GranularAccessForBundle {
|
|||||||
// way to do that within the data engine as currently
|
// way to do that within the data engine as currently
|
||||||
// formulated. Could perhaps be done for on-demand tables though.
|
// formulated. Could perhaps be done for on-demand tables though.
|
||||||
private async _checkAddOrUpdateAccess(docSession: OptDocSession, actions: UserAction[]) {
|
private async _checkAddOrUpdateAccess(docSession: OptDocSession, actions: UserAction[]) {
|
||||||
if (!scanActionsRecursively(actions, (a) => a[0] === 'AddOrUpdateRecord')) {
|
if (!scanActionsRecursively(actions, isAddOrUpdateRecordAction)) {
|
||||||
// Don't need to apply this particular check.
|
// Don't need to apply this particular check.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// Fail if being combined with anything fancy.
|
// Fail if being combined with anything fancy.
|
||||||
if (scanActionsRecursively(actions, (a) => {
|
if (scanActionsRecursively(actions, (a) => {
|
||||||
const name = a[0];
|
const name = a[0];
|
||||||
return !['ApplyUndoActions', 'ApplyDocActions', 'AddOrUpdateRecord'].includes(String(name)) &&
|
return !['ApplyUndoActions', 'ApplyDocActions'].includes(String(name)) &&
|
||||||
|
!isAddOrUpdateRecordAction(a) &&
|
||||||
!(isDataAction(a) && !getTableId(a).startsWith('_grist_'));
|
!(isDataAction(a) && !getTableId(a).startsWith('_grist_'));
|
||||||
})) {
|
})) {
|
||||||
throw new Error('Can only combine AddOrUpdate with simple data changes');
|
throw new Error('Can only combine AddOrUpdate with simple data changes');
|
||||||
}
|
}
|
||||||
// Check for read access, and that we're not touching metadata.
|
// Check for read access, and that we're not touching metadata.
|
||||||
await applyToActionsRecursively(actions, async (a) => {
|
await applyToActionsRecursively(actions, async (a) => {
|
||||||
if (a[0] !== 'AddOrUpdateRecord') { return; }
|
if (!isAddOrUpdateRecordAction(a)) { return; }
|
||||||
const tableId = validTableIdString(a[1]);
|
const tableId = validTableIdString(a[1]);
|
||||||
if (tableId.startsWith('_grist_')) {
|
if (tableId.startsWith('_grist_')) {
|
||||||
throw new Error(`AddOrUpdate cannot yet be used on metadata tables`);
|
throw new Error(`AddOrUpdate cannot yet be used on metadata tables`);
|
||||||
|
@ -1033,8 +1033,7 @@ class TestUserActions(test_engine.EngineTestCase):
|
|||||||
{"color": "green"},
|
{"color": "green"},
|
||||||
{"on_many": "all"},
|
{"on_many": "all"},
|
||||||
[
|
[
|
||||||
["UpdateRecord", "Table1", 1, {"color": "green"}],
|
["BulkUpdateRecord", "Table1", [1, 2], {"color": ["green", "green"]}],
|
||||||
["UpdateRecord", "Table1", 2, {"color": "green"}],
|
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -1044,8 +1043,7 @@ class TestUserActions(test_engine.EngineTestCase):
|
|||||||
{"color": "greener"},
|
{"color": "greener"},
|
||||||
{"on_many": "all", "allow_empty_require": True},
|
{"on_many": "all", "allow_empty_require": True},
|
||||||
[
|
[
|
||||||
["UpdateRecord", "Table1", 1, {"color": "greener"}],
|
["BulkUpdateRecord", "Table1", [1, 2], {"color": ["greener", "greener"]}],
|
||||||
["UpdateRecord", "Table1", 2, {"color": "greener"}],
|
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -1159,6 +1157,128 @@ class TestUserActions(test_engine.EngineTestCase):
|
|||||||
[["UpdateRecord", "Table1", 102, {"date": 1900800}]],
|
[["UpdateRecord", "Table1", 102, {"date": 1900800}]],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Empty both does nothing
|
||||||
|
check(
|
||||||
|
{},
|
||||||
|
{},
|
||||||
|
{"allow_empty_require": True},
|
||||||
|
[],
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_bulk_add_or_update(self):
|
||||||
|
sample = testutil.parse_test_sample({
|
||||||
|
"SCHEMA": [
|
||||||
|
[1, "Table1", [
|
||||||
|
[1, "first_name", "Text", False, "", "first_name", ""],
|
||||||
|
[2, "last_name", "Text", False, "", "last_name", ""],
|
||||||
|
[4, "color", "Text", False, "", "color", ""],
|
||||||
|
]],
|
||||||
|
],
|
||||||
|
"DATA": {
|
||||||
|
"Table1": [
|
||||||
|
["id", "first_name", "last_name"],
|
||||||
|
[1, "John", "Doe"],
|
||||||
|
[2, "John", "Smith"],
|
||||||
|
],
|
||||||
|
}
|
||||||
|
})
|
||||||
|
self.load_sample(sample)
|
||||||
|
|
||||||
|
def check(require, values, options, stored):
|
||||||
|
self.assertPartialOutActions(
|
||||||
|
self.apply_user_action(["BulkAddOrUpdateRecord", "Table1", require, values, options]),
|
||||||
|
{"stored": stored},
|
||||||
|
)
|
||||||
|
|
||||||
|
check(
|
||||||
|
{
|
||||||
|
"first_name": [
|
||||||
|
"John",
|
||||||
|
"John",
|
||||||
|
"John",
|
||||||
|
"Bob",
|
||||||
|
],
|
||||||
|
"last_name": [
|
||||||
|
"Doe",
|
||||||
|
"Smith",
|
||||||
|
"Johnson",
|
||||||
|
"Johnson",
|
||||||
|
],
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"color": [
|
||||||
|
"red",
|
||||||
|
"blue",
|
||||||
|
"green",
|
||||||
|
"yellow",
|
||||||
|
],
|
||||||
|
},
|
||||||
|
{},
|
||||||
|
[
|
||||||
|
["BulkAddRecord", "Table1", [3, 4], {
|
||||||
|
"color": ["green", "yellow"],
|
||||||
|
"first_name": ["John", "Bob"],
|
||||||
|
"last_name": ["Johnson", "Johnson"],
|
||||||
|
}],
|
||||||
|
["BulkUpdateRecord", "Table1", [1, 2], {"color": ["red", "blue"]}],
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
with self.assertRaises(ValueError) as cm:
|
||||||
|
check(
|
||||||
|
{"color": ["yellow"]},
|
||||||
|
{"color": ["red", "blue", "green"]},
|
||||||
|
{},
|
||||||
|
[],
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
str(cm.exception),
|
||||||
|
'Value lists must all have the same length, '
|
||||||
|
'got {"col_values color": 3, "require color": 1}',
|
||||||
|
)
|
||||||
|
|
||||||
|
with self.assertRaises(ValueError) as cm:
|
||||||
|
check(
|
||||||
|
{
|
||||||
|
"first_name": [
|
||||||
|
"John",
|
||||||
|
"John",
|
||||||
|
],
|
||||||
|
"last_name": [
|
||||||
|
"Doe",
|
||||||
|
],
|
||||||
|
},
|
||||||
|
{},
|
||||||
|
{},
|
||||||
|
[],
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
str(cm.exception),
|
||||||
|
'Value lists must all have the same length, '
|
||||||
|
'got {"require first_name": 2, "require last_name": 1}',
|
||||||
|
)
|
||||||
|
|
||||||
|
with self.assertRaises(ValueError) as cm:
|
||||||
|
check(
|
||||||
|
{
|
||||||
|
"first_name": [
|
||||||
|
"John",
|
||||||
|
"John",
|
||||||
|
],
|
||||||
|
"last_name": [
|
||||||
|
"Doe",
|
||||||
|
"Doe",
|
||||||
|
],
|
||||||
|
},
|
||||||
|
{},
|
||||||
|
{},
|
||||||
|
[],
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
str(cm.exception),
|
||||||
|
"require values must be unique",
|
||||||
|
)
|
||||||
|
|
||||||
def test_reference_lookup(self):
|
def test_reference_lookup(self):
|
||||||
sample = testutil.parse_test_sample({
|
sample = testutil.parse_test_sample({
|
||||||
"SCHEMA": [
|
"SCHEMA": [
|
||||||
|
@ -918,14 +918,15 @@ class UserActions(object):
|
|||||||
return values
|
return values
|
||||||
|
|
||||||
@useraction
|
@useraction
|
||||||
def AddOrUpdateRecord(self, table_id, require, col_values, options):
|
def BulkAddOrUpdateRecord(self, table_id, require, col_values, options):
|
||||||
"""
|
"""
|
||||||
Add or Update ('upsert') a single record depending on `options`
|
Add or Update ('upsert') records depending on `options`
|
||||||
and on whether a record matching `require` already exists.
|
and on whether records matching `require` already exist.
|
||||||
|
|
||||||
`require` and `col_values` are dictionaries mapping column IDs to single cell values.
|
`require` and `col_values` are dictionaries mapping column IDs to lists of cell values.
|
||||||
|
All lists across both dictionaries must have the same length.
|
||||||
|
|
||||||
By default, if `table.lookupRecords(**require)` returns any records,
|
By default, for a single record, if `table.lookupRecords(**require)` returns any records,
|
||||||
update the first one with the values in `col_values`.
|
update the first one with the values in `col_values`.
|
||||||
Otherwise create a new record with values `{**require, **col_values}`.
|
Otherwise create a new record with values `{**require, **col_values}`.
|
||||||
|
|
||||||
@ -940,39 +941,97 @@ class UserActions(object):
|
|||||||
"""
|
"""
|
||||||
table = self._engine.tables[table_id]
|
table = self._engine.tables[table_id]
|
||||||
|
|
||||||
if not require and not options.get("allow_empty_require", False):
|
update = options.get("update", True)
|
||||||
raise ValueError("require is empty but allow_empty_require isn't set")
|
add = options.get("add", True)
|
||||||
|
|
||||||
# Decode `require` before looking up, but let AddRecord/UpdateRecord decode the final
|
|
||||||
# values when adding/updating
|
|
||||||
decoded_require = {k: decode_object(v) for k, v in six.iteritems(require)}
|
|
||||||
records = list(table.lookup_records(**decoded_require))
|
|
||||||
|
|
||||||
if records and options.get("update", True):
|
|
||||||
if len(records) > 1:
|
|
||||||
on_many = options.get("on_many", "first")
|
on_many = options.get("on_many", "first")
|
||||||
if on_many == "first":
|
if on_many not in ("first", "none", "all"):
|
||||||
records = records[:1]
|
|
||||||
elif on_many == "none":
|
|
||||||
return
|
|
||||||
elif on_many != "all":
|
|
||||||
raise ValueError("on_many should be 'first', 'none', or 'all', not %r" % on_many)
|
raise ValueError("on_many should be 'first', 'none', or 'all', not %r" % on_many)
|
||||||
|
|
||||||
for record in records:
|
allow_empty_require = options.get("allow_empty_require", False)
|
||||||
self.UpdateRecord(table_id, record.id, col_values)
|
if not require and not allow_empty_require:
|
||||||
|
raise ValueError("require is empty but allow_empty_require isn't set")
|
||||||
|
|
||||||
if not records and options.get("add", True):
|
if not require and not col_values:
|
||||||
values = {
|
return # nothing to do
|
||||||
key: value
|
|
||||||
for key, value in six.iteritems(require)
|
lengths = {}
|
||||||
|
lengths.update({'require ' + k:
|
||||||
|
len(v) for k, v in six.iteritems(require)})
|
||||||
|
lengths.update({'col_values ' + k:
|
||||||
|
len(v) for k, v in six.iteritems(col_values)})
|
||||||
|
unique_lengths = set(lengths.values())
|
||||||
|
if len(unique_lengths) != 1:
|
||||||
|
raise ValueError("Value lists must all have the same length, got %s" %
|
||||||
|
json.dumps(lengths, sort_keys=True))
|
||||||
|
[length] = unique_lengths
|
||||||
|
|
||||||
|
decoded_require = actions.decode_bulk_values(require)
|
||||||
|
num_unique_keys = len(set(zip(*decoded_require.values())))
|
||||||
|
if require and num_unique_keys < length:
|
||||||
|
raise ValueError("require values must be unique")
|
||||||
|
|
||||||
|
# Column IDs in `require` that can be used to set values when creating new records,
|
||||||
|
# i.e. not formula columns that don't allow setting values.
|
||||||
|
# `col_values` is not checked for this because setting such a column there should raise an error
|
||||||
|
# This doesn't apply to `require` since it's also used to match existing records.
|
||||||
|
require_add_keys = {
|
||||||
|
key for key in require
|
||||||
if not (
|
if not (
|
||||||
table.get_column(key).is_formula() and
|
table.get_column(key).is_formula() and
|
||||||
# Check that there actually is a formula and this isn't just an empty column
|
# Check that there actually is a formula and this isn't just an empty column
|
||||||
self._engine.docmodel.get_column_rec(table_id, key).formula
|
self._engine.docmodel.get_column_rec(table_id, key).formula
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
values.update(col_values)
|
col_keys = set(col_values.keys())
|
||||||
self.AddRecord(table_id, values.pop("id", None), values)
|
|
||||||
|
# Arguments for `BulkAddRecord` and `BulkUpdateRecord` below
|
||||||
|
add_record_ids = []
|
||||||
|
add_record_values = {k: [] for k in col_keys | require_add_keys - {'id'}}
|
||||||
|
update_record_ids = []
|
||||||
|
update_record_values = {k: [] for k in col_keys - {'id'}}
|
||||||
|
|
||||||
|
for i in range(length):
|
||||||
|
current_require = {key: vals[i] for key, vals in six.iteritems(decoded_require)}
|
||||||
|
records = list(table.lookup_records(**current_require))
|
||||||
|
if not records and add:
|
||||||
|
values = {key: require[key][i] for key in require_add_keys}
|
||||||
|
values.update({key: vals[i] for key, vals in six.iteritems(col_values)})
|
||||||
|
add_record_ids.append(values.pop("id", None))
|
||||||
|
for key, value in six.iteritems(values):
|
||||||
|
add_record_values[key].append(value)
|
||||||
|
|
||||||
|
if records and update:
|
||||||
|
if len(records) > 1:
|
||||||
|
if on_many == "first":
|
||||||
|
records = records[:1]
|
||||||
|
elif on_many == "none":
|
||||||
|
continue
|
||||||
|
|
||||||
|
for record in records:
|
||||||
|
update_record_ids.append(record.id)
|
||||||
|
for key, vals in six.iteritems(col_values):
|
||||||
|
update_record_values[key].append(vals[i])
|
||||||
|
|
||||||
|
if add_record_ids:
|
||||||
|
self.BulkAddRecord(table_id, add_record_ids, add_record_values)
|
||||||
|
|
||||||
|
if update_record_ids:
|
||||||
|
self.BulkUpdateRecord(table_id, update_record_ids, update_record_values)
|
||||||
|
|
||||||
|
@useraction
|
||||||
|
def AddOrUpdateRecord(self, table_id, require, col_values, options):
|
||||||
|
"""
|
||||||
|
Add or Update ('upsert') a record depending on `options`
|
||||||
|
and on whether a record matching `require` already exists.
|
||||||
|
|
||||||
|
`require` and `col_values` are dictionaries mapping column IDs to cell values.
|
||||||
|
|
||||||
|
See `BulkAddOrUpdateRecord` for more details.
|
||||||
|
"""
|
||||||
|
require = {k: [v] for k, v in six.iteritems(require)}
|
||||||
|
col_values = {k: [v] for k, v in six.iteritems(col_values)}
|
||||||
|
self.BulkAddOrUpdateRecord(table_id, require, col_values, options)
|
||||||
|
|
||||||
#----------------------------------------
|
#----------------------------------------
|
||||||
# RemoveRecords & co.
|
# RemoveRecords & co.
|
||||||
|
@ -1029,6 +1029,24 @@ function testDocApi() {
|
|||||||
{id: [1, 2, 3, 4], A: [1, 33, 66, "$33"], B: [2, 1, 6, 0]},
|
{id: [1, 2, 3, 4], A: [1, 33, 66, "$33"], B: [2, 1, 6, 0]},
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Test bulk case with a mixture of record shapes
|
||||||
|
await check([
|
||||||
|
{
|
||||||
|
require: {A: 1},
|
||||||
|
fields: {A: 111},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
require: {A: 33},
|
||||||
|
fields: {A: 222, B: 444},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
require: {id: 3},
|
||||||
|
fields: {A: 555, B: 666},
|
||||||
|
},
|
||||||
|
],
|
||||||
|
{id: [1, 2, 3, 4], A: [111, 222, 555, "$33"], B: [2, 444, 666, 0]},
|
||||||
|
);
|
||||||
|
|
||||||
// allow_empty_require option with empty `require` updates all records
|
// allow_empty_require option with empty `require` updates all records
|
||||||
await check([
|
await check([
|
||||||
{
|
{
|
||||||
|
Loading…
Reference in New Issue
Block a user