From 1864b7ba5d9f5fb0643600c82553033c3b11853f Mon Sep 17 00:00:00 2001 From: Alex Hall Date: Wed, 28 Sep 2022 15:13:07 +0200 Subject: [PATCH] (core) Add BulkAddOrUpdateRecord action for efficiency Summary: This diff adds a new `BulkAddOrUpdateRecord` user action which is what is sounds like: - A bulk version of the existing `AddOrUpdateRecord` action. - Much more efficient for operating on many records than applying many individual actions. - Column values are specified as maps from `colId` to arrays of values as usual. - Produces bulk versions of `AddRecord` and `UpdateRecord` actions instead of many individual actions. Examples of users wanting to use something like `AddOrUpdateRecord` with large numbers of records: - https://grist.slack.com/archives/C0234CPPXPA/p1651789710290879 - https://grist.slack.com/archives/C0234CPPXPA/p1660743493480119 - https://grist.slack.com/archives/C0234CPPXPA/p1660333148491559 - https://grist.slack.com/archives/C0234CPPXPA/p1663069291726159 I tested what made many `AddOrUpdateRecord` actions slow in the first place. It was almost entirely due to producing many individual `AddRecord` user actions. About half of that time was for processing the resulting `AddRecord` doc actions. Lookups and updates were not a problem. With these changes, the slowness is gone. The Python user action implementation is more complex but there are no surprises. The JS API now groups `records` based on the keys of `require` and `fields` so that `BulkAddOrUpdateRecord` can be applied to each group. Test Plan: Update and extend Python and DocApi tests. Reviewers: jarek, paulfitz Reviewed By: jarek, paulfitz Subscribers: jarek Differential Revision: https://phab.getgrist.com/D3642 --- app/common/ValueParser.ts | 4 + app/plugin/TableOperationsImpl.ts | 18 ++++- app/server/lib/GranularAccess.ts | 14 +++- sandbox/grist/test_useractions.py | 128 +++++++++++++++++++++++++++++- sandbox/grist/useractions.py | 119 ++++++++++++++++++++------- test/server/lib/DocApi.ts | 18 +++++ 6 files changed, 261 insertions(+), 40 deletions(-) diff --git a/app/common/ValueParser.ts b/app/common/ValueParser.ts index f0781202..dbfc66ae 100644 --- a/app/common/ValueParser.ts +++ b/app/common/ValueParser.ts @@ -355,6 +355,10 @@ export function parseUserAction(ua: UserAction, docData: DocData): UserAction { ua = _parseUserActionColValues(ua, docData, false, 2); ua = _parseUserActionColValues(ua, docData, false, 3); return ua; + case 'BulkAddOrUpdateRecord': + ua = _parseUserActionColValues(ua, docData, true, 2); + ua = _parseUserActionColValues(ua, docData, true, 3); + return ua; default: return ua; } diff --git a/app/plugin/TableOperationsImpl.ts b/app/plugin/TableOperationsImpl.ts index 214a5a89..183e0f64 100644 --- a/app/plugin/TableOperationsImpl.ts +++ b/app/plugin/TableOperationsImpl.ts @@ -5,6 +5,7 @@ import { arrayRepeat } from './gutil'; import flatMap = require('lodash/flatMap'); import isEqual = require('lodash/isEqual'); import pick = require('lodash/pick'); +import groupBy = require('lodash/groupBy'); /** * An implementation of the TableOperations interface, given a platform @@ -59,8 +60,21 @@ export class TableOperationsImpl implements TableOperations { allow_empty_require: upsertOptions?.allowEmptyRequire }; const recordOptions: OpOptions = pick(upsertOptions, 'parseStrings'); - const actions = records.map(rec => - ["AddOrUpdateRecord", tableId, rec.require, rec.fields || {}, options]); + + // Group records based on having the same keys in `require` and `fields`. + // A single bulk action will be applied to each group. + // We don't want one bulk action for all records that might have different shapes, + // because that would require filling arrays with null values. + const recGroups = groupBy(records, rec => { + const requireKeys = Object.keys(rec.require).sort().join(','); + const fieldsKeys = Object.keys(rec.fields || {}).sort().join(','); + return `${requireKeys}:${fieldsKeys}`; + }); + const actions = Object.values(recGroups).map(group => { + const require = convertToBulkColValues(group.map(r => ({fields: r.require}))); + const fields = convertToBulkColValues(group.map(r => ({fields: r.fields || {}}))); + return ["BulkAddOrUpdateRecord", tableId, require, fields, options]; + }); await this._applyUserActions(tableId, [...fieldNames(records)], actions, recordOptions); return []; diff --git a/app/server/lib/GranularAccess.ts b/app/server/lib/GranularAccess.ts index a3138318..365bbd1e 100644 --- a/app/server/lib/GranularAccess.ts +++ b/app/server/lib/GranularAccess.ts @@ -77,6 +77,10 @@ function isAclTable(tableId: string): boolean { return ['_grist_ACLRules', '_grist_ACLResources'].includes(tableId); } +function isAddOrUpdateRecordAction(a: UserAction): boolean { + return ['AddOrUpdateRecord', 'BulkAddOrUpdateRecord'].includes(String(a[0])); +} + // A list of key metadata tables that need special handling. Other metadata tables may // refer to material in some of these tables but don't need special handling. // TODO: there are other metadata tables that would need access control, or redesign - @@ -128,8 +132,9 @@ const OTHER_RECOGNIZED_ACTIONS = new Set([ 'BulkRemoveRecord', 'ReplaceTableData', - // A data action handled specially because of read needs. + // Data actions handled specially because of read needs. 'AddOrUpdateRecord', + 'BulkAddOrUpdateRecord', // Groups of actions. 'ApplyDocActions', @@ -979,21 +984,22 @@ export class GranularAccess implements GranularAccessForBundle { // way to do that within the data engine as currently // formulated. Could perhaps be done for on-demand tables though. private async _checkAddOrUpdateAccess(docSession: OptDocSession, actions: UserAction[]) { - if (!scanActionsRecursively(actions, (a) => a[0] === 'AddOrUpdateRecord')) { + if (!scanActionsRecursively(actions, isAddOrUpdateRecordAction)) { // Don't need to apply this particular check. return; } // Fail if being combined with anything fancy. if (scanActionsRecursively(actions, (a) => { const name = a[0]; - return !['ApplyUndoActions', 'ApplyDocActions', 'AddOrUpdateRecord'].includes(String(name)) && + return !['ApplyUndoActions', 'ApplyDocActions'].includes(String(name)) && + !isAddOrUpdateRecordAction(a) && !(isDataAction(a) && !getTableId(a).startsWith('_grist_')); })) { throw new Error('Can only combine AddOrUpdate with simple data changes'); } // Check for read access, and that we're not touching metadata. await applyToActionsRecursively(actions, async (a) => { - if (a[0] !== 'AddOrUpdateRecord') { return; } + if (!isAddOrUpdateRecordAction(a)) { return; } const tableId = validTableIdString(a[1]); if (tableId.startsWith('_grist_')) { throw new Error(`AddOrUpdate cannot yet be used on metadata tables`); diff --git a/sandbox/grist/test_useractions.py b/sandbox/grist/test_useractions.py index f7511c38..6ff61090 100644 --- a/sandbox/grist/test_useractions.py +++ b/sandbox/grist/test_useractions.py @@ -1033,8 +1033,7 @@ class TestUserActions(test_engine.EngineTestCase): {"color": "green"}, {"on_many": "all"}, [ - ["UpdateRecord", "Table1", 1, {"color": "green"}], - ["UpdateRecord", "Table1", 2, {"color": "green"}], + ["BulkUpdateRecord", "Table1", [1, 2], {"color": ["green", "green"]}], ], ) @@ -1044,8 +1043,7 @@ class TestUserActions(test_engine.EngineTestCase): {"color": "greener"}, {"on_many": "all", "allow_empty_require": True}, [ - ["UpdateRecord", "Table1", 1, {"color": "greener"}], - ["UpdateRecord", "Table1", 2, {"color": "greener"}], + ["BulkUpdateRecord", "Table1", [1, 2], {"color": ["greener", "greener"]}], ], ) @@ -1159,6 +1157,128 @@ class TestUserActions(test_engine.EngineTestCase): [["UpdateRecord", "Table1", 102, {"date": 1900800}]], ) + # Empty both does nothing + check( + {}, + {}, + {"allow_empty_require": True}, + [], + ) + + def test_bulk_add_or_update(self): + sample = testutil.parse_test_sample({ + "SCHEMA": [ + [1, "Table1", [ + [1, "first_name", "Text", False, "", "first_name", ""], + [2, "last_name", "Text", False, "", "last_name", ""], + [4, "color", "Text", False, "", "color", ""], + ]], + ], + "DATA": { + "Table1": [ + ["id", "first_name", "last_name"], + [1, "John", "Doe"], + [2, "John", "Smith"], + ], + } + }) + self.load_sample(sample) + + def check(require, values, options, stored): + self.assertPartialOutActions( + self.apply_user_action(["BulkAddOrUpdateRecord", "Table1", require, values, options]), + {"stored": stored}, + ) + + check( + { + "first_name": [ + "John", + "John", + "John", + "Bob", + ], + "last_name": [ + "Doe", + "Smith", + "Johnson", + "Johnson", + ], + }, + { + "color": [ + "red", + "blue", + "green", + "yellow", + ], + }, + {}, + [ + ["BulkAddRecord", "Table1", [3, 4], { + "color": ["green", "yellow"], + "first_name": ["John", "Bob"], + "last_name": ["Johnson", "Johnson"], + }], + ["BulkUpdateRecord", "Table1", [1, 2], {"color": ["red", "blue"]}], + ], + ) + + with self.assertRaises(ValueError) as cm: + check( + {"color": ["yellow"]}, + {"color": ["red", "blue", "green"]}, + {}, + [], + ) + self.assertEqual( + str(cm.exception), + 'Value lists must all have the same length, ' + 'got {"col_values color": 3, "require color": 1}', + ) + + with self.assertRaises(ValueError) as cm: + check( + { + "first_name": [ + "John", + "John", + ], + "last_name": [ + "Doe", + ], + }, + {}, + {}, + [], + ) + self.assertEqual( + str(cm.exception), + 'Value lists must all have the same length, ' + 'got {"require first_name": 2, "require last_name": 1}', + ) + + with self.assertRaises(ValueError) as cm: + check( + { + "first_name": [ + "John", + "John", + ], + "last_name": [ + "Doe", + "Doe", + ], + }, + {}, + {}, + [], + ) + self.assertEqual( + str(cm.exception), + "require values must be unique", + ) + def test_reference_lookup(self): sample = testutil.parse_test_sample({ "SCHEMA": [ diff --git a/sandbox/grist/useractions.py b/sandbox/grist/useractions.py index 63ea38d6..5a1302c8 100644 --- a/sandbox/grist/useractions.py +++ b/sandbox/grist/useractions.py @@ -918,14 +918,15 @@ class UserActions(object): return values @useraction - def AddOrUpdateRecord(self, table_id, require, col_values, options): + def BulkAddOrUpdateRecord(self, table_id, require, col_values, options): """ - Add or Update ('upsert') a single record depending on `options` - and on whether a record matching `require` already exists. + Add or Update ('upsert') records depending on `options` + and on whether records matching `require` already exist. - `require` and `col_values` are dictionaries mapping column IDs to single cell values. + `require` and `col_values` are dictionaries mapping column IDs to lists of cell values. + All lists across both dictionaries must have the same length. - By default, if `table.lookupRecords(**require)` returns any records, + By default, for a single record, if `table.lookupRecords(**require)` returns any records, update the first one with the values in `col_values`. Otherwise create a new record with values `{**require, **col_values}`. @@ -940,39 +941,97 @@ class UserActions(object): """ table = self._engine.tables[table_id] - if not require and not options.get("allow_empty_require", False): + update = options.get("update", True) + add = options.get("add", True) + + on_many = options.get("on_many", "first") + if on_many not in ("first", "none", "all"): + raise ValueError("on_many should be 'first', 'none', or 'all', not %r" % on_many) + + allow_empty_require = options.get("allow_empty_require", False) + if not require and not allow_empty_require: raise ValueError("require is empty but allow_empty_require isn't set") - # Decode `require` before looking up, but let AddRecord/UpdateRecord decode the final - # values when adding/updating - decoded_require = {k: decode_object(v) for k, v in six.iteritems(require)} - records = list(table.lookup_records(**decoded_require)) + if not require and not col_values: + return # nothing to do - if records and options.get("update", True): - if len(records) > 1: - on_many = options.get("on_many", "first") - if on_many == "first": - records = records[:1] - elif on_many == "none": - return - elif on_many != "all": - raise ValueError("on_many should be 'first', 'none', or 'all', not %r" % on_many) + lengths = {} + lengths.update({'require ' + k: + len(v) for k, v in six.iteritems(require)}) + lengths.update({'col_values ' + k: + len(v) for k, v in six.iteritems(col_values)}) + unique_lengths = set(lengths.values()) + if len(unique_lengths) != 1: + raise ValueError("Value lists must all have the same length, got %s" % + json.dumps(lengths, sort_keys=True)) + [length] = unique_lengths - for record in records: - self.UpdateRecord(table_id, record.id, col_values) + decoded_require = actions.decode_bulk_values(require) + num_unique_keys = len(set(zip(*decoded_require.values()))) + if require and num_unique_keys < length: + raise ValueError("require values must be unique") - if not records and options.get("add", True): - values = { - key: value - for key, value in six.iteritems(require) - if not ( + # Column IDs in `require` that can be used to set values when creating new records, + # i.e. not formula columns that don't allow setting values. + # `col_values` is not checked for this because setting such a column there should raise an error + # This doesn't apply to `require` since it's also used to match existing records. + require_add_keys = { + key for key in require + if not ( table.get_column(key).is_formula() and # Check that there actually is a formula and this isn't just an empty column self._engine.docmodel.get_column_rec(table_id, key).formula - ) - } - values.update(col_values) - self.AddRecord(table_id, values.pop("id", None), values) + ) + } + col_keys = set(col_values.keys()) + + # Arguments for `BulkAddRecord` and `BulkUpdateRecord` below + add_record_ids = [] + add_record_values = {k: [] for k in col_keys | require_add_keys - {'id'}} + update_record_ids = [] + update_record_values = {k: [] for k in col_keys - {'id'}} + + for i in range(length): + current_require = {key: vals[i] for key, vals in six.iteritems(decoded_require)} + records = list(table.lookup_records(**current_require)) + if not records and add: + values = {key: require[key][i] for key in require_add_keys} + values.update({key: vals[i] for key, vals in six.iteritems(col_values)}) + add_record_ids.append(values.pop("id", None)) + for key, value in six.iteritems(values): + add_record_values[key].append(value) + + if records and update: + if len(records) > 1: + if on_many == "first": + records = records[:1] + elif on_many == "none": + continue + + for record in records: + update_record_ids.append(record.id) + for key, vals in six.iteritems(col_values): + update_record_values[key].append(vals[i]) + + if add_record_ids: + self.BulkAddRecord(table_id, add_record_ids, add_record_values) + + if update_record_ids: + self.BulkUpdateRecord(table_id, update_record_ids, update_record_values) + + @useraction + def AddOrUpdateRecord(self, table_id, require, col_values, options): + """ + Add or Update ('upsert') a record depending on `options` + and on whether a record matching `require` already exists. + + `require` and `col_values` are dictionaries mapping column IDs to cell values. + + See `BulkAddOrUpdateRecord` for more details. + """ + require = {k: [v] for k, v in six.iteritems(require)} + col_values = {k: [v] for k, v in six.iteritems(col_values)} + self.BulkAddOrUpdateRecord(table_id, require, col_values, options) #---------------------------------------- # RemoveRecords & co. diff --git a/test/server/lib/DocApi.ts b/test/server/lib/DocApi.ts index 5ad05631..29d443f8 100644 --- a/test/server/lib/DocApi.ts +++ b/test/server/lib/DocApi.ts @@ -1029,6 +1029,24 @@ function testDocApi() { {id: [1, 2, 3, 4], A: [1, 33, 66, "$33"], B: [2, 1, 6, 0]}, ); + // Test bulk case with a mixture of record shapes + await check([ + { + require: {A: 1}, + fields: {A: 111}, + }, + { + require: {A: 33}, + fields: {A: 222, B: 444}, + }, + { + require: {id: 3}, + fields: {A: 555, B: 666}, + }, + ], + {id: [1, 2, 3, 4], A: [111, 222, 555, "$33"], B: [2, 444, 666, 0]}, + ); + // allow_empty_require option with empty `require` updates all records await check([ {