You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
177 lines
6.8 KiB
177 lines
6.8 KiB
import six
|
|
|
|
import actions
|
|
import schema
|
|
import table_data_set
|
|
import testutil
|
|
|
|
import difflib
|
|
import json
|
|
import unittest
|
|
|
|
class TestTableDataSet(unittest.TestCase):
|
|
"""
|
|
Tests functionality of TableDataSet by running through all the test cases in testscript.json.
|
|
"""
|
|
@classmethod
|
|
def init_test_cases(cls):
|
|
# Create a test_* method for each case in testscript, which runs `self._run_test_body()`.
|
|
cls.samples, test_cases = testutil.parse_testscript()
|
|
for case in test_cases:
|
|
cls._create_test_case(case["TEST_CASE"], case["BODY"])
|
|
|
|
@classmethod
|
|
def _create_test_case(cls, name, body):
|
|
setattr(cls, "test_" + name, lambda self: self._run_test_body(body))
|
|
|
|
|
|
def setUp(self):
|
|
self._table_data_set = None
|
|
|
|
def load_sample(self, sample):
|
|
"""
|
|
Load _table_data_set with given sample data. The sample is a dict with keys "SCHEMA" and
|
|
"DATA", each a dictionary mapping table names to actions.TableData objects. "SCHEMA" contains
|
|
"_grist_Tables" and "_grist_Tables_column" tables.
|
|
"""
|
|
self._table_data_set = table_data_set.TableDataSet()
|
|
for a in schema.schema_create_actions():
|
|
if a.table_id not in self._table_data_set.all_tables:
|
|
self._table_data_set.apply_doc_action(a)
|
|
|
|
for a in six.itervalues(sample["SCHEMA"]):
|
|
self._table_data_set.BulkAddRecord(*a)
|
|
|
|
# Create AddTable actions for each table described in the metadata.
|
|
meta_tables = self._table_data_set.all_tables['_grist_Tables']
|
|
meta_columns = self._table_data_set.all_tables['_grist_Tables_column']
|
|
|
|
add_tables = {} # maps the row_id of the table to the schema object for the table.
|
|
for rec in actions.transpose_bulk_action(meta_tables):
|
|
add_tables[rec.id] = actions.AddTable(rec.tableId, [])
|
|
|
|
# Go through all columns, adding them to the appropriate tables.
|
|
for rec in actions.transpose_bulk_action(meta_columns):
|
|
add_tables[rec.parentId].columns.append({
|
|
"id": rec.colId,
|
|
"type": rec.type,
|
|
"widgetOptions": rec.widgetOptions,
|
|
"isFormula": rec.isFormula,
|
|
"formula": rec.formula,
|
|
"label" : rec.label,
|
|
"parentPos": rec.parentPos,
|
|
})
|
|
|
|
# Sort the columns in the schema according to the parentPos field from the column records.
|
|
for action in six.itervalues(add_tables):
|
|
action.columns.sort(key=lambda r: r["parentPos"])
|
|
self._table_data_set.AddTable(*action)
|
|
|
|
for a in six.itervalues(sample["DATA"]):
|
|
self._table_data_set.ReplaceTableData(*a)
|
|
|
|
|
|
def _run_test_body(self, body):
|
|
"""Runs the actual script defined in the JSON test-script file."""
|
|
undo_actions = []
|
|
loaded_sample = None
|
|
for line, step, data in body:
|
|
try:
|
|
if step == "LOAD_SAMPLE":
|
|
if loaded_sample:
|
|
# Pylint's type checking gives a false positive for loaded_sample.
|
|
# pylint: disable=unsubscriptable-object
|
|
self._verify_undo_all(undo_actions, loaded_sample["DATA"])
|
|
loaded_sample = self.samples[data]
|
|
self.load_sample(loaded_sample)
|
|
elif step == "APPLY":
|
|
self._apply_stored_actions(data['ACTIONS']['stored'])
|
|
if 'calc' in data['ACTIONS']:
|
|
self._apply_stored_actions(data['ACTIONS']['calc'])
|
|
undo_actions.extend(data['ACTIONS']['undo'])
|
|
elif step == "CHECK_OUTPUT":
|
|
expected_data = {}
|
|
if "USE_SAMPLE" in data:
|
|
expected_data = self.samples[data.pop("USE_SAMPLE")]["DATA"].copy()
|
|
expected_data.update({t: testutil.table_data_from_rows(t, tdata[0], tdata[1:])
|
|
for (t, tdata) in six.iteritems(data)})
|
|
self._verify_data(expected_data)
|
|
else:
|
|
raise ValueError("Unrecognized step %s in test script" % step)
|
|
except Exception as e:
|
|
new_args0 = "LINE %s: %s" % (line, e.args[0])
|
|
e.args = (new_args0,) + e.args[1:]
|
|
raise
|
|
|
|
self._verify_undo_all(undo_actions, loaded_sample["DATA"])
|
|
|
|
def _apply_stored_actions(self, stored_actions):
|
|
for action in stored_actions:
|
|
self._table_data_set.apply_doc_action(actions.action_from_repr(action))
|
|
|
|
def _verify_undo_all(self, undo_actions, expected_data):
|
|
"""
|
|
At the end of each test, undo all and verify we get back to the originally loaded sample.
|
|
"""
|
|
self._apply_stored_actions(reversed(undo_actions))
|
|
del undo_actions[:]
|
|
self._verify_data(expected_data, ignore_formulas=True)
|
|
|
|
def _verify_data(self, expected_data, ignore_formulas=False):
|
|
observed_data = {t: self._prep_data(*data)
|
|
for t, data in six.iteritems(self._table_data_set.all_tables)
|
|
if not t.startswith("_grist_")}
|
|
if ignore_formulas:
|
|
observed_data = self._strip_formulas(observed_data)
|
|
expected_data = self._strip_formulas(expected_data)
|
|
|
|
if observed_data != expected_data:
|
|
lines = []
|
|
for table in sorted(six.viewkeys(observed_data) | six.viewkeys(expected_data)):
|
|
if table not in expected_data:
|
|
lines.append("*** Table %s observed but not expected\n" % table)
|
|
elif table not in observed_data:
|
|
lines.append("*** Table %s not observed but was expected\n" % table)
|
|
else:
|
|
obs, exp = observed_data[table], expected_data[table]
|
|
if obs != exp:
|
|
o_lines = self._get_text_lines(obs)
|
|
e_lines = self._get_text_lines(exp)
|
|
lines.append("*** Table %s differs\n" % table)
|
|
lines.extend(difflib.unified_diff(e_lines, o_lines,
|
|
fromfile="expected", tofile="observed"))
|
|
self.fail("\n" + "".join(lines))
|
|
|
|
def _strip_formulas(self, all_data):
|
|
return {t: self._strip_formulas_table(*data) for t, data in six.iteritems(all_data)}
|
|
|
|
def _strip_formulas_table(self, table_id, row_ids, columns):
|
|
return actions.TableData(table_id, row_ids, {
|
|
col_id: col for col_id, col in six.iteritems(columns)
|
|
if not self._table_data_set.get_col_info(table_id, col_id)["isFormula"]
|
|
})
|
|
|
|
@classmethod
|
|
def _prep_data(cls, table_id, row_ids, columns):
|
|
def sort(col):
|
|
return [v for r, v in sorted(zip(row_ids, col))]
|
|
|
|
sorted_data = actions.TableData(table_id, sorted(row_ids),
|
|
{c: sort(col) for c, col in six.iteritems(columns)})
|
|
return actions.encode_objects(testutil.replace_nans(sorted_data))
|
|
|
|
@classmethod
|
|
def _get_text_lines(cls, table_data):
|
|
col_items = sorted(table_data.columns.items())
|
|
col_items.insert(0, ('id', table_data.row_ids))
|
|
table_rows = zip(*[[col_id] + values for (col_id, values) in col_items])
|
|
return [json.dumps(row) + "\n" for row in table_rows]
|
|
|
|
|
|
# Parse and create test cases on module load. This way the python unittest feature to run only
|
|
# particular test cases can apply to these cases too.
|
|
TestTableDataSet.init_test_cases()
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|