You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
gristlabs_grist-core/sandbox/grist/test_table_data_set.py

177 lines
6.8 KiB

import six
import actions
import schema
import table_data_set
import testutil
import difflib
import json
import unittest
class TestTableDataSet(unittest.TestCase):
"""
Tests functionality of TableDataSet by running through all the test cases in testscript.json.
"""
@classmethod
def init_test_cases(cls):
# Create a test_* method for each case in testscript, which runs `self._run_test_body()`.
cls.samples, test_cases = testutil.parse_testscript()
for case in test_cases:
cls._create_test_case(case["TEST_CASE"], case["BODY"])
@classmethod
def _create_test_case(cls, name, body):
setattr(cls, "test_" + name, lambda self: self._run_test_body(body))
def setUp(self):
self._table_data_set = None
def load_sample(self, sample):
"""
Load _table_data_set with given sample data. The sample is a dict with keys "SCHEMA" and
"DATA", each a dictionary mapping table names to actions.TableData objects. "SCHEMA" contains
"_grist_Tables" and "_grist_Tables_column" tables.
"""
self._table_data_set = table_data_set.TableDataSet()
for a in schema.schema_create_actions():
if a.table_id not in self._table_data_set.all_tables:
self._table_data_set.apply_doc_action(a)
for a in six.itervalues(sample["SCHEMA"]):
self._table_data_set.BulkAddRecord(*a)
# Create AddTable actions for each table described in the metadata.
meta_tables = self._table_data_set.all_tables['_grist_Tables']
meta_columns = self._table_data_set.all_tables['_grist_Tables_column']
add_tables = {} # maps the row_id of the table to the schema object for the table.
for rec in actions.transpose_bulk_action(meta_tables):
add_tables[rec.id] = actions.AddTable(rec.tableId, [])
# Go through all columns, adding them to the appropriate tables.
for rec in actions.transpose_bulk_action(meta_columns):
add_tables[rec.parentId].columns.append({
"id": rec.colId,
"type": rec.type,
"widgetOptions": rec.widgetOptions,
"isFormula": rec.isFormula,
"formula": rec.formula,
"label" : rec.label,
"parentPos": rec.parentPos,
})
# Sort the columns in the schema according to the parentPos field from the column records.
for action in six.itervalues(add_tables):
action.columns.sort(key=lambda r: r["parentPos"])
self._table_data_set.AddTable(*action)
for a in six.itervalues(sample["DATA"]):
self._table_data_set.ReplaceTableData(*a)
def _run_test_body(self, body):
"""Runs the actual script defined in the JSON test-script file."""
undo_actions = []
loaded_sample = None
for line, step, data in body:
try:
if step == "LOAD_SAMPLE":
if loaded_sample:
# Pylint's type checking gives a false positive for loaded_sample.
# pylint: disable=unsubscriptable-object
self._verify_undo_all(undo_actions, loaded_sample["DATA"])
loaded_sample = self.samples[data]
self.load_sample(loaded_sample)
elif step == "APPLY":
self._apply_stored_actions(data['ACTIONS']['stored'])
if 'calc' in data['ACTIONS']:
self._apply_stored_actions(data['ACTIONS']['calc'])
undo_actions.extend(data['ACTIONS']['undo'])
elif step == "CHECK_OUTPUT":
expected_data = {}
if "USE_SAMPLE" in data:
expected_data = self.samples[data.pop("USE_SAMPLE")]["DATA"].copy()
expected_data.update({t: testutil.table_data_from_rows(t, tdata[0], tdata[1:])
for (t, tdata) in six.iteritems(data)})
self._verify_data(expected_data)
else:
raise ValueError("Unrecognized step %s in test script" % step)
except Exception as e:
new_args0 = "LINE %s: %s" % (line, e.args[0])
e.args = (new_args0,) + e.args[1:]
raise
self._verify_undo_all(undo_actions, loaded_sample["DATA"])
def _apply_stored_actions(self, stored_actions):
for action in stored_actions:
self._table_data_set.apply_doc_action(actions.action_from_repr(action))
def _verify_undo_all(self, undo_actions, expected_data):
"""
At the end of each test, undo all and verify we get back to the originally loaded sample.
"""
self._apply_stored_actions(reversed(undo_actions))
del undo_actions[:]
self._verify_data(expected_data, ignore_formulas=True)
def _verify_data(self, expected_data, ignore_formulas=False):
observed_data = {t: self._prep_data(*data)
for t, data in six.iteritems(self._table_data_set.all_tables)
if not t.startswith("_grist_")}
if ignore_formulas:
observed_data = self._strip_formulas(observed_data)
expected_data = self._strip_formulas(expected_data)
if observed_data != expected_data:
lines = []
for table in sorted(six.viewkeys(observed_data) | six.viewkeys(expected_data)):
if table not in expected_data:
lines.append("*** Table %s observed but not expected\n" % table)
elif table not in observed_data:
lines.append("*** Table %s not observed but was expected\n" % table)
else:
obs, exp = observed_data[table], expected_data[table]
if obs != exp:
o_lines = self._get_text_lines(obs)
e_lines = self._get_text_lines(exp)
lines.append("*** Table %s differs\n" % table)
lines.extend(difflib.unified_diff(e_lines, o_lines,
fromfile="expected", tofile="observed"))
self.fail("\n" + "".join(lines))
def _strip_formulas(self, all_data):
return {t: self._strip_formulas_table(*data) for t, data in six.iteritems(all_data)}
def _strip_formulas_table(self, table_id, row_ids, columns):
return actions.TableData(table_id, row_ids, {
col_id: col for col_id, col in six.iteritems(columns)
if not self._table_data_set.get_col_info(table_id, col_id)["isFormula"]
})
@classmethod
def _prep_data(cls, table_id, row_ids, columns):
def sort(col):
return [v for r, v in sorted(zip(row_ids, col))]
sorted_data = actions.TableData(table_id, sorted(row_ids),
{c: sort(col) for c, col in six.iteritems(columns)})
return actions.encode_objects(testutil.replace_nans(sorted_data))
@classmethod
def _get_text_lines(cls, table_data):
col_items = sorted(table_data.columns.items())
col_items.insert(0, ('id', table_data.row_ids))
table_rows = zip(*[[col_id] + values for (col_id, values) in col_items])
return [json.dumps(row) + "\n" for row in table_rows]
# Parse and create test cases on module load. This way the python unittest feature to run only
# particular test cases can apply to these cases too.
TestTableDataSet.init_test_cases()
if __name__ == "__main__":
unittest.main()