Update dropdown conditions on column rename (#1038)

Automatically update dropdown condition formulas on Ref, RefList, Choice and ChoiceList columns when a column referred to has been renamed.
Also fixed column references in ACL formulas using the "$" notation not being properly renamed.
This commit is contained in:
Leslie H 2024-07-12 10:58:49 -04:00 committed by GitHub
parent a437dfa28c
commit 632620544c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 392 additions and 114 deletions

View File

@ -5,10 +5,9 @@
import json import json
import logging import logging
from acl_formula import parse_acl_grist_entities
from predicate_formula import parse_predicate_formula_json
import action_obj import action_obj
import textbuilder import predicate_formula
from predicate_formula import NamedEntity, parse_predicate_formula_json, TreeConverter
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -32,6 +31,40 @@ ALL = '#ALL'
ALL_SET = frozenset([ALL]) ALL_SET = frozenset([ALL])
def parse_acl_formulas(col_values):
"""
Populates `aclFormulaParsed` by parsing `aclFormula` for all `col_values`.
"""
if 'aclFormula' not in col_values:
return
col_values['aclFormulaParsed'] = [parse_predicate_formula_json(v)
for v
in col_values['aclFormula']]
class _ACLEntityCollector(TreeConverter):
def __init__(self):
self.entities = [] # NamedEntity list
def visit_Attribute(self, node):
parent = self.visit(node.value)
# We recognize a couple of specific patterns for entities that may be affected by renames.
if parent == ['Name', 'rec'] or parent == ['Name', 'newRec']:
# rec.COL refers to the column from the table that the rule is on.
self.entities.append(NamedEntity('recCol', node.last_token.startpos, node.attr, None))
elif parent == ['Name', 'user']:
# user.ATTR is a user attribute.
self.entities.append(NamedEntity('userAttr', node.last_token.startpos, node.attr, None))
elif parent[0] == 'Attr' and parent[1] == ['Name', 'user']:
# user.ATTR.COL is a column from the lookup table of the UserAttribute ATTR.
self.entities.append(
NamedEntity('userAttrCol', node.last_token.startpos, node.attr, parent[2]))
return ["Attr", parent, node.attr]
def acl_read_split(action_group): def acl_read_split(action_group):
""" """
Returns an ActionBundle containing actions from the given action_group, all in one envelope. Returns an ActionBundle containing actions from the given action_group, all in one envelope.
@ -48,20 +81,20 @@ def acl_read_split(action_group):
return bundle return bundle
def prepare_acl_table_renames(docmodel, useractions, table_renames_dict): def prepare_acl_table_renames(useractions, table_renames_dict):
""" """
Given a dict of table renames of the form {table_id: new_table_id}, returns a callback Given a dict of table renames of the form {table_id: new_table_id}, returns a callback
that will apply updates to the affected ACL rules and resources. that will apply updates to the affected ACL rules and resources.
""" """
# If there are ACLResources that refer to the renamed table, prepare updates for those. # If there are ACLResources that refer to the renamed table, prepare updates for those.
resource_updates = [] resource_updates = []
for resource_rec in docmodel.aclResources.all: for resource_rec in useractions.get_docmodel().aclResources.all:
if resource_rec.tableId in table_renames_dict: if resource_rec.tableId in table_renames_dict:
resource_updates.append((resource_rec, {'tableId': table_renames_dict[resource_rec.tableId]})) resource_updates.append((resource_rec, {'tableId': table_renames_dict[resource_rec.tableId]}))
# Collect updates for any ACLRules with UserAttributes that refer to the renamed table. # Collect updates for any ACLRules with UserAttributes that refer to the renamed table.
rule_updates = [] rule_updates = []
for rule_rec in docmodel.aclRules.all: for rule_rec in useractions.get_docmodel().aclRules.all:
if rule_rec.userAttributes: if rule_rec.userAttributes:
try: try:
rule_info = json.loads(rule_rec.userAttributes) rule_info = json.loads(rule_rec.userAttributes)
@ -77,14 +110,14 @@ def prepare_acl_table_renames(docmodel, useractions, table_renames_dict):
return do_renames return do_renames
def prepare_acl_col_renames(docmodel, useractions, col_renames_dict): def perform_acl_rule_renames(useractions, col_renames_dict):
""" """
Given a dict of column renames of the form {(table_id, col_id): new_col_id}, returns a callback Given a dict of column renames of the form {(table_id, col_id): new_col_id}, returns a callback
that will apply updates to the affected ACL rules and resources. that will apply updates to the affected ACL rules and resources.
""" """
# Collect updates for ACLResources that refer to the renamed columns. # Collect updates for ACLResources that refer to the renamed columns.
resource_updates = [] resource_updates = []
for resource_rec in docmodel.aclResources.all: for resource_rec in useractions.get_docmodel().aclResources.all:
t = resource_rec.tableId t = resource_rec.tableId
if resource_rec.colIds and resource_rec.colIds != '*': if resource_rec.colIds and resource_rec.colIds != '*':
new_col_ids = ','.join((col_renames_dict.get((t, c)) or c) new_col_ids = ','.join((col_renames_dict.get((t, c)) or c)
@ -95,7 +128,7 @@ def prepare_acl_col_renames(docmodel, useractions, col_renames_dict):
# Collect updates for any ACLRules with UserAttributes that refer to the renamed column. # Collect updates for any ACLRules with UserAttributes that refer to the renamed column.
rule_updates = [] rule_updates = []
user_attr_tables = {} # Maps name of user attribute to its lookup table user_attr_tables = {} # Maps name of user attribute to its lookup table
for rule_rec in docmodel.aclRules.all: for rule_rec in useractions.get_docmodel().aclRules.all:
if rule_rec.userAttributes: if rule_rec.userAttributes:
try: try:
rule_info = json.loads(rule_rec.userAttributes) rule_info = json.loads(rule_rec.userAttributes)
@ -107,33 +140,33 @@ def prepare_acl_col_renames(docmodel, useractions, col_renames_dict):
except Exception as e: except Exception as e:
log.warning("Error examining aclRule: %s", e) log.warning("Error examining aclRule: %s", e)
acl_resources_table = useractions.get_docmodel().aclResources.table
# Go through again checking if anything in ACL formulas is affected by the rename. # Go through again checking if anything in ACL formulas is affected by the rename.
for rule_rec in docmodel.aclRules.all: for rule_rec in useractions.get_docmodel().aclRules.all:
if rule_rec.aclFormula:
formula = rule_rec.aclFormula
patches = []
for entity in parse_acl_grist_entities(rule_rec.aclFormula): if not rule_rec.aclFormula:
if entity.type == 'recCol': continue
table_id = docmodel.aclResources.table.get_record(int(rule_rec.resource)).tableId acl_formula = rule_rec.aclFormula
elif entity.type == 'userAttrCol':
table_id = user_attr_tables.get(entity.extra)
else:
continue
col_id = entity.name
new_col_id = col_renames_dict.get((table_id, col_id))
if not new_col_id:
continue
patch = textbuilder.make_patch(
formula, entity.start_pos, entity.start_pos + len(entity.name), new_col_id)
patches.append(patch)
replacer = textbuilder.Replacer(textbuilder.Text(formula), patches) def renamer(subject):
txt = replacer.get_text() if subject.type == 'recCol':
rule_updates.append((rule_rec, {'aclFormula': txt, table_id = acl_resources_table.get_record(int(rule_rec.resource)).tableId
'aclFormulaParsed': parse_predicate_formula_json(txt)})) elif subject.type == 'userAttrCol':
table_id = user_attr_tables.get(subject.extra)
else:
return None
col_id = subject.name
return col_renames_dict.get((table_id, col_id))
def do_renames(): new_acl_formula = predicate_formula.process_renames(acl_formula, _ACLEntityCollector(), renamer)
useractions.doBulkUpdateFromPairs('_grist_ACLResources', resource_updates) # No need to check for syntax errors, but this "if" statement must be present.
useractions.doBulkUpdateFromPairs('_grist_ACLRules', rule_updates) # See perform_dropdown_condition_renames for more info.
return do_renames if new_acl_formula != acl_formula:
new_rule_record = {
"aclFormula": new_acl_formula,
"aclFormulaParsed": parse_predicate_formula_json(new_acl_formula)
}
rule_updates.append((rule_rec, new_rule_record))
useractions.doBulkUpdateFromPairs('_grist_ACLResources', resource_updates)
useractions.doBulkUpdateFromPairs('_grist_ACLRules', rule_updates)

View File

@ -1,50 +0,0 @@
import ast
import asttokens
from predicate_formula import NamedEntity, parse_predicate_formula_json, TreeConverter
def parse_acl_formulas(col_values):
"""
Populates `aclFormulaParsed` by parsing `aclFormula` for all `col_values`.
"""
if 'aclFormula' not in col_values:
return
col_values['aclFormulaParsed'] = [parse_predicate_formula_json(v)
for v
in col_values['aclFormula']]
def parse_acl_grist_entities(acl_formula):
"""
Parse the ACL formula collecting any entities that may be subject to renaming. Returns a
NamedEntity list.
"""
try:
atok = asttokens.ASTTokens(acl_formula, tree=ast.parse(acl_formula, mode='eval'))
converter = _EntityCollector()
converter.visit(atok.tree)
return converter.entities
except SyntaxError as err:
return []
class _EntityCollector(TreeConverter):
def __init__(self):
self.entities = [] # NamedEntity list
def visit_Attribute(self, node):
parent = self.visit(node.value)
# We recognize a couple of specific patterns for entities that may be affected by renames.
if parent == ['Name', 'rec'] or parent == ['Name', 'newRec']:
# rec.COL refers to the column from the table that the rule is on.
self.entities.append(NamedEntity('recCol', node.last_token.startpos, node.attr, None))
if parent == ['Name', 'user']:
# user.ATTR is a user attribute.
self.entities.append(NamedEntity('userAttr', node.last_token.startpos, node.attr, None))
elif parent[0] == 'Attr' and parent[1] == ['Name', 'user']:
# user.ATTR.COL is a column from the lookup table of the UserAttribute ATTR.
self.entities.append(
NamedEntity('userAttrCol', node.last_token.startpos, node.attr, parent[2]))
return ["Attr", parent, node.attr]

View File

@ -132,10 +132,11 @@ def make_formula_body(formula, default_value, assoc_value=None):
return final_formula return final_formula
def replace_dollar_attrs(formula): def get_dollar_replacer(formula):
""" """
Translates formula "$" expression into rec. expression. This is extracted from the Returns a textbuilder.Replacer that would replace all dollar signs ("$") in the given
make_formula_body function. formula with "rec.". The Replacer tracks extra info we can later use to restore the
dollar signs back. To get the processed text, call .get_text() on the Replacer.
""" """
formula_builder_text = textbuilder.Text(formula) formula_builder_text = textbuilder.Text(formula)
tmp_patches = textbuilder.make_regexp_patches(formula, DOLLAR_REGEX, 'DOLLAR') tmp_patches = textbuilder.make_regexp_patches(formula, DOLLAR_REGEX, 'DOLLAR')
@ -150,7 +151,7 @@ def replace_dollar_attrs(formula):
if m: if m:
patches.append(textbuilder.make_patch(formula, m.start(0), m.end(0), 'rec.')) patches.append(textbuilder.make_patch(formula, m.start(0), m.end(0), 'rec.'))
final_formula = textbuilder.Replacer(formula_builder_text, patches) final_formula = textbuilder.Replacer(formula_builder_text, patches)
return final_formula.get_text() return final_formula
def _create_syntax_error_code(builder, input_text, err): def _create_syntax_error_code(builder, input_text, err):

View File

@ -1,10 +1,77 @@
import json import json
import logging import logging
import usertypes
from predicate_formula import parse_predicate_formula_json from predicate_formula import NamedEntity, parse_predicate_formula_json, TreeConverter
import predicate_formula
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class _DCEntityCollector(TreeConverter):
def __init__(self):
self.entities = []
def visit_Attribute(self, node):
parent = self.visit(node.value)
if parent == ["Name", "choice"]:
self.entities.append(NamedEntity("choiceAttr", node.last_token.startpos, node.attr, None))
elif parent == ["Name", "rec"]:
self.entities.append(NamedEntity("recCol", node.last_token.startpos, node.attr, None))
return ["Attr", parent, node.attr]
def perform_dropdown_condition_renames(useractions, renames):
"""
Given a dict of column renames of the form {(table_id, col_id): new_col_id}, applies updates
to the affected dropdown condition formulas.
"""
updates = []
for col in useractions.get_docmodel().columns.all:
# Find all columns in the document that have dropdown conditions.
try:
widget_options = json.loads(col.widgetOptions)
dc_formula = widget_options["dropdownCondition"]["text"]
except (json.JSONDecodeError, KeyError):
continue
# Find out what table this column refers to and belongs to.
ref_table_id = usertypes.get_referenced_table_id(col.type)
self_table_id = col.parentId.tableId
def renamer(subject):
# subject.type is either choiceAttr or recCol, see _DCEntityCollector.
table_id = ref_table_id if subject.type == "choiceAttr" else self_table_id
# Dropdown conditions stay in widgetOptions, even when the current column type can't make
# use of them. Thus, attributes of "choice" do not make sense for columns other than Ref and
# RefList, but they may exist.
# We set ref_table_id to None in this case, so table_id will be None for stray choiceAttrs,
# therefore the subject will not be renamed.
# Columns of "rec" are still renamed accordingly.
return renames.get((table_id, subject.name))
new_dc_formula = predicate_formula.process_renames(dc_formula, _DCEntityCollector(), renamer)
# The data engine stops processing remaining formulas when it hits an internal exception during
# this renaming procedure. Parsing could potentially raise SyntaxErrors, so we must be careful
# not to parse a possibly syntactically wrong formula, or handle SyntaxErrors explicitly.
# Note that new_dc_formula was obtained from process_renames, where syntactically wrong formulas
# are left untouched. It is anticipated that rename-induced changes will not introduce new
# SyntaxErrors, so if the formula text is updated, the new version must be valid, hence safe
# to parse without error handling.
# This also serves as an optimization to avoid unnecessary parsing operations.
if new_dc_formula != dc_formula:
widget_options["dropdownCondition"]["text"] = new_dc_formula
widget_options["dropdownCondition"]["parsed"] = parse_predicate_formula_json(new_dc_formula)
updates.append((col, {"widgetOptions": json.dumps(widget_options)}))
# Update the dropdown condition in the database.
useractions.doBulkUpdateFromPairs('_grist_Tables_column', updates)
def parse_dropdown_conditions(col_values): def parse_dropdown_conditions(col_values):
""" """
Parses any unparsed dropdown conditions in `col_values`. Parses any unparsed dropdown conditions in `col_values`.

View File

@ -2,11 +2,12 @@ import ast
import io import io
import json import json
import tokenize import tokenize
import sys
from collections import namedtuple from collections import namedtuple
import asttokens
import textbuilder
import six import six
from codebuilder import get_dollar_replacer
from codebuilder import replace_dollar_attrs
# Entities encountered in predicate formulas, which may get renamed. # Entities encountered in predicate formulas, which may get renamed.
# type : 'recCol'|'userAttr'|'userAttrCol', # type : 'recCol'|'userAttr'|'userAttrCol',
@ -38,7 +39,7 @@ def parse_predicate_formula(formula):
if isinstance(formula, six.binary_type): if isinstance(formula, six.binary_type):
formula = formula.decode('utf8') formula = formula.decode('utf8')
try: try:
formula = replace_dollar_attrs(formula) formula = get_dollar_replacer(formula).get_text()
tree = ast.parse(formula, mode='eval') tree = ast.parse(formula, mode='eval')
result = TreeConverter().visit(tree) result = TreeConverter().visit(tree)
for part in tokenize.generate_tokens(io.StringIO(formula).readline): for part in tokenize.generate_tokens(io.StringIO(formula).readline):
@ -46,9 +47,12 @@ def parse_predicate_formula(formula):
result = ['Comment', result, part[1][1:].strip()] result = ['Comment', result, part[1][1:].strip()]
break break
return result return result
except SyntaxError as err: except SyntaxError as e:
# In case of an error, include line and offset. # In case of an error, include line and offset.
raise SyntaxError("%s on line %s col %s" % (err.args[0], err.lineno, err.offset)) _, _, exc_traceback = sys.exc_info()
six.reraise(SyntaxError,
SyntaxError("%s on line %s col %s" % (e.args[0], e.lineno, e.offset)),
exc_traceback)
def parse_predicate_formula_json(formula): def parse_predicate_formula_json(formula):
""" """
@ -63,6 +67,45 @@ named_constants = {
'None': None, 'None': None,
} }
def process_renames(formula, collector, renamer):
"""
Given a predicate formula, a collector and a renamer, rename all references in the formula
that the renamer wants to rename. This is used to automatically update references in an ACL
or dropdown condition formula when a column it refers to has been renamed.
The collector should be a subclass of TreeConverter that collects related NamedEntity's and
stores them in the field "entities". See acl._ACLEntityCollector for an example.
The renamer should be a function taking a NamedEntity as its only argument. It should return
a new name for this NamedEntity when it wants to rename this entity, or None otherwise.
"""
patches = []
# "$" can be used to refer to "rec." in Grist formulas, but it is not valid Python.
# We need to replace it with "rec." before parsing the formula, and restore it back after
# the surgery.
# Keep the dollar replacer object, so that later we know how to restore properly.
dollar_replacer = get_dollar_replacer(formula)
formula_nodollar = dollar_replacer.get_text()
try:
atok = asttokens.ASTTokens(formula_nodollar, tree=ast.parse(formula_nodollar, mode='eval'))
collector.visit(atok.tree)
except SyntaxError:
# Don't do anything to a syntactically wrong formula.
return formula
for subject in collector.entities:
new_name = renamer(subject)
if new_name is not None:
_, _, patch = dollar_replacer.map_back_patch(
textbuilder.make_patch(dollar_replacer.get_text(), subject.start_pos,
subject.start_pos + len(subject.name), new_name)
)
patches.append(patch)
return textbuilder.Replacer(textbuilder.Text(formula), patches).get_text()
class TreeConverter(ast.NodeVisitor): class TreeConverter(ast.NodeVisitor):
# AST nodes are documented here: https://docs.python.org/2/library/ast.html#abstract-grammar # AST nodes are documented here: https://docs.python.org/2/library/ast.html#abstract-grammar
# pylint:disable=no-self-use # pylint:disable=no-self-use
@ -86,7 +129,7 @@ class TreeConverter(ast.NodeVisitor):
def visit_Compare(self, node): def visit_Compare(self, node):
# We don't try to support chained comparisons like "1 < 2 < 3" (though it wouldn't be hard). # We don't try to support chained comparisons like "1 < 2 < 3" (though it wouldn't be hard).
if len(node.ops) != 1 or len(node.comparators) != 1: if len(node.ops) != 1 or len(node.comparators) != 1:
raise ValueError("Can't use chained comparisons") raise SyntaxError("Can't use chained comparisons")
return [node.ops[0].__class__.__name__, self.visit(node.left), self.visit(node.comparators[0])] return [node.ops[0].__class__.__name__, self.visit(node.left), self.visit(node.comparators[0])]
def visit_Name(self, node): def visit_Name(self, node):
@ -115,4 +158,4 @@ class TreeConverter(ast.NodeVisitor):
return self.visit_List(node) # We don't distinguish tuples and lists return self.visit_List(node) # We don't distinguish tuples and lists
def generic_visit(self, node): def generic_visit(self, node):
raise ValueError("Unsupported syntax at %s:%s" % (node.lineno, node.col_offset + 1)) raise SyntaxError("Unsupported syntax at %s:%s" % (node.lineno, node.col_offset + 1))

View File

@ -40,6 +40,12 @@ class TestACLRenames(test_engine.EngineTestCase):
'aclFormula': '( rec.schoolName != # ünîcødé comment\n user.School.name)', 'aclFormula': '( rec.schoolName != # ünîcødé comment\n user.School.name)',
'permissionsText': 'none', 'permissionsText': 'none',
}], }],
['AddRecord', '_grist_ACLRules', None, {
'resource': -2,
# Test whether both "$" and "rec." are preserved while renaming.
'aclFormula': '( $firstName not in rec.schoolName or $schoolName + $lastName == rec.firstName)',
'permissionsText': 'all',
}],
['AddRecord', '_grist_ACLRules', None, { ['AddRecord', '_grist_ACLRules', None, {
'resource': -3, 'resource': -3,
'permissionsText': 'all' 'permissionsText': 'all'
@ -57,7 +63,8 @@ class TestACLRenames(test_engine.EngineTestCase):
['id', 'resource', 'aclFormula', 'permissionsText', 'userAttributes'], ['id', 'resource', 'aclFormula', 'permissionsText', 'userAttributes'],
[1, 1, '', '', json.dumps(user_attr1)], [1, 1, '', '', json.dumps(user_attr1)],
[2, 2, '( rec.schoolName != # ünîcødé comment\n user.School.name)', 'none', ''], [2, 2, '( rec.schoolName != # ünîcødé comment\n user.School.name)', 'none', ''],
[3, 3, '', 'all', ''], [3, 2, '( $firstName not in rec.schoolName or $schoolName + $lastName == rec.firstName)', 'all', ''],
[4, 3, '', 'all', ''],
]) ])
def test_acl_table_renames(self): def test_acl_table_renames(self):
@ -78,7 +85,8 @@ class TestACLRenames(test_engine.EngineTestCase):
['id', 'resource', 'aclFormula', 'permissionsText', 'userAttributes'], ['id', 'resource', 'aclFormula', 'permissionsText', 'userAttributes'],
[1, 1, '', '', json.dumps(user_attr1_renamed)], [1, 1, '', '', json.dumps(user_attr1_renamed)],
[2, 2, '( rec.schoolName != # ünîcødé comment\n user.School.name)', 'none', ''], [2, 2, '( rec.schoolName != # ünîcødé comment\n user.School.name)', 'none', ''],
[3, 3, '', 'all', ''], [3, 2, '( $firstName not in rec.schoolName or $schoolName + $lastName == rec.firstName)', 'all', ''],
[4, 3, '', 'all', ''],
]) ])
def test_acl_column_renames(self): def test_acl_column_renames(self):
@ -101,7 +109,8 @@ class TestACLRenames(test_engine.EngineTestCase):
['id', 'resource', 'aclFormula', 'permissionsText', 'userAttributes'], ['id', 'resource', 'aclFormula', 'permissionsText', 'userAttributes'],
[1, 1, '', '', json.dumps(user_attr1_renamed)], [1, 1, '', '', json.dumps(user_attr1_renamed)],
[2, 2, '( rec.escuela != # ünîcødé comment\n user.School.schoolName)', 'none', ''], [2, 2, '( rec.escuela != # ünîcødé comment\n user.School.schoolName)', 'none', ''],
[3, 3, '', 'all', ''], [3, 2, '( $firstName not in rec.escuela or $escuela + $Family_Name == rec.firstName)', 'all', ''],
[4, 3, '', 'all', ''],
]) ])
def test_multiple_renames(self): def test_multiple_renames(self):
@ -123,5 +132,6 @@ class TestACLRenames(test_engine.EngineTestCase):
['id', 'resource', 'aclFormula', 'permissionsText', 'userAttributes'], ['id', 'resource', 'aclFormula', 'permissionsText', 'userAttributes'],
[1, 1, '', '', json.dumps(user_attr1)], [1, 1, '', '', json.dumps(user_attr1)],
[2, 2, '( rec.escuela != # ünîcødé comment\n user.School.schoolName)', 'none', ''], [2, 2, '( rec.escuela != # ünîcødé comment\n user.School.schoolName)', 'none', ''],
[3, 3, '', 'all', ''], [3, 2, '( $Given_Name not in rec.escuela or $escuela + $Family_Name == rec.Given_Name)', 'all', ''],
[4, 3, '', 'all', ''],
]) ])

View File

@ -0,0 +1,158 @@
# -*- coding: utf-8 -*-
import json
import test_engine
import testsamples
import useractions
# A sample dropdown condition formula for the column Schools.address and alike, of type Ref/RefList.
def build_dc1_text(school_name, address_city):
return "'New' in choice.{address_city} and ${school_name} == rec.{school_name} + rec.choice.city or choice.rec.city != $name2".format(**locals())
# Another sample formula for a new column of type ChoiceList (or actually, anything other than Ref/RefList).
def build_dc2_text(school_name, school_address):
# We currently don't support layered attribute access, e.g. rec.address.city, so this is not tested.
# choice.city really is nonsense, as choice will not be an object.
# Just for testing purposes, to make sure nothing is renamed here.
return "choice + ${school_name} == choice.city or rec.{school_address} > 2".format(**locals())
def build_dc1(school_name, address_city):
return json.dumps({
"dropdownCondition": {
"text": build_dc1_text(school_name, address_city),
# The ModifyColumn user action should trigger an auto parse.
# "parsed" is stored as dumped JSON, so we need to explicitly dump it here as well.
"parsed": json.dumps(["Or", ["And", ["In", ["Const", "New"], ["Attr", ["Name", "choice"], address_city]], ["Eq", ["Attr", ["Name", "rec"], school_name], ["Add", ["Attr", ["Name", "rec"], school_name], ["Attr", ["Attr", ["Name", "rec"], "choice"], "city"]]]], ["NotEq", ["Attr", ["Attr", ["Name", "choice"], "rec"], "city"], ["Attr", ["Name", "rec"], "name2"]]])
}
})
def build_dc2(school_name, school_address):
return json.dumps({
"dropdownCondition": {
"text": build_dc2_text(school_name, school_address),
"parsed": json.dumps(["Or", ["Eq", ["Add", ["Name", "choice"], ["Attr", ["Name", "rec"], school_name]], ["Attr", ["Name", "choice"], "city"]], ["Gt", ["Attr", ["Name", "rec"], school_address], ["Const", 2]]])
}
})
class TestDCRenames(test_engine.EngineTestCase):
def setUp(self):
super(TestDCRenames, self).setUp()
self.load_sample(testsamples.sample_students)
self.engine.apply_user_actions([useractions.from_repr(ua) for ua in (
# Add some irrelevant columns to the table Schools. These should never be renamed.
["AddColumn", "Schools", "name2", {
"type": "Text"
}],
["AddColumn", "Schools", "choice", {
"type": "Ref:Address"
}],
["AddColumn", "Address", "rec", {
"type": "Text"
}],
# Add a dropdown condition formula to Schools.address (column #12).
["ModifyColumn", "Schools", "address", {
"widgetOptions": json.dumps({
"dropdownCondition": {
"text": build_dc1_text("name", "city"),
}
}),
}],
# Create a similar column with an invalid dropdown condition formula.
# This formula should never be touched.
# This column will have the ID 25.
["AddColumn", "Schools", "address2", {
"type": "Ref:Address",
"widgetOptions": json.dumps({
"dropdownCondition": {
"text": "+ 'New' in choice.city and $name == rec.name",
}
}),
}],
# And another similar column, but of type RefList.
# This column will have the ID 26.
["AddColumn", "Schools", "addresses", {
"type": "RefList:Address",
}],
# AddColumn will not trigger parsing. We emulate a real user's action here by creating it first,
# then editing its widgetOptions.
["ModifyColumn", "Schools", "addresses", {
"widgetOptions": json.dumps({
"dropdownCondition": {
"text": build_dc1_text("name", "city"),
}
}),
}],
# And another similar column, but of type ChoiceList.
# widgetOptions stay when the column type changes. We do our best to rename stuff in stray widgetOptions.
# This column will have the ID 27.
["AddColumn", "Schools", "features", {
"type": "ChoiceList",
}],
["ModifyColumn", "Schools", "features", {
"widgetOptions": json.dumps({
"dropdownCondition": {
"text": build_dc2_text("name", "address"),
}
}),
}],
)])
# This is what we'll have at the beginning, for later tests to refer to.
# Table Schools is 2.
self.assertTableData("_grist_Tables_column", cols="subset", rows="subset", data=[
["id", "parentId", "colId", "widgetOptions"],
[12, 2, "address", build_dc1("name", "city")],
[26, 2, "addresses", build_dc1("name", "city")],
[27, 2, "features", build_dc2("name", "address")],
])
self.assert_invalid_formula_untouched()
def assert_invalid_formula_untouched(self):
self.assertTableData("_grist_Tables_column", cols="subset", rows="subset", data=[
["id", "parentId", "colId", "widgetOptions"],
[25, 2, "address2", json.dumps({
"dropdownCondition": {
"text": "+ 'New' in choice.city and $name == rec.name",
}
})]
])
def test_referred_column_renames(self):
self.apply_user_action(["RenameColumn", "Address", "city", "area"])
self.assertTableData("_grist_Tables_column", cols="subset", rows="subset", data=[
["id", "parentId", "colId", "widgetOptions"],
[12, 2, "address", build_dc1("name", "area")],
[26, 2, "addresses", build_dc1("name", "area")],
# Nothing should be renamed here, as only column renames in the table "Schools" are relevant.
[27, 2, "features", build_dc2("name", "address")],
])
self.assert_invalid_formula_untouched()
def test_record_column_renames(self):
self.apply_user_action(["RenameColumn", "Schools", "name", "identifier"])
self.apply_user_action(["RenameColumn", "Schools", "address", "location"])
self.assertTableData("_grist_Tables_column", cols="subset", rows="subset", data=[
["id", "parentId", "colId", "widgetOptions"],
# Side effect: "address" becomes "location".
[12, 2, "location", build_dc1("identifier", "city")],
[26, 2, "addresses", build_dc1("identifier", "city")],
# Now "$name" should become "$identifier", just like in Ref/RefList columns. Nothing else should change.
[27, 2, "features", build_dc2("identifier", "location")],
])
self.assert_invalid_formula_untouched()
def test_multiple_renames(self):
# Put all renames together.
self.apply_user_action(["RenameColumn", "Address", "city", "area"])
self.apply_user_action(["RenameColumn", "Schools", "name", "identifier"])
self.assertTableData("_grist_Tables_column", cols="subset", rows="subset", data=[
["id", "parentId", "colId", "widgetOptions"],
[12, 2, "address", build_dc1("identifier", "area")],
[26, 2, "addresses", build_dc1("identifier", "area")],
[27, 2, "features", build_dc2("identifier", "address")],
])
self.assert_invalid_formula_untouched()

View File

@ -139,14 +139,14 @@ class TestPredicateFormula(unittest.TestCase):
self.assertRaises(SyntaxError, parse_predicate_formula, "def foo(): pass") self.assertRaises(SyntaxError, parse_predicate_formula, "def foo(): pass")
# Unsupported node type # Unsupported node type
self.assertRaisesRegex(ValueError, r'Unsupported syntax', parse_predicate_formula, "max(rec)") self.assertRaisesRegex(SyntaxError, r'Unsupported syntax', parse_predicate_formula, "max(rec)")
self.assertRaisesRegex(ValueError, r'Unsupported syntax', parse_predicate_formula, "user.id in {1, 2, 3}") self.assertRaisesRegex(SyntaxError, r'Unsupported syntax', parse_predicate_formula, "user.id in {1, 2, 3}")
self.assertRaisesRegex(ValueError, r'Unsupported syntax', parse_predicate_formula, "1 if user.IsAnon else 2") self.assertRaisesRegex(SyntaxError, r'Unsupported syntax', parse_predicate_formula, "1 if user.IsAnon else 2")
# Unsupported operation # Unsupported operation
self.assertRaisesRegex(ValueError, r'Unsupported syntax', parse_predicate_formula, "1 | 2") self.assertRaisesRegex(SyntaxError, r'Unsupported syntax', parse_predicate_formula, "1 | 2")
self.assertRaisesRegex(ValueError, r'Unsupported syntax', parse_predicate_formula, "1 << 2") self.assertRaisesRegex(SyntaxError, r'Unsupported syntax', parse_predicate_formula, "1 << 2")
self.assertRaisesRegex(ValueError, r'Unsupported syntax', parse_predicate_formula, "~test") self.assertRaisesRegex(SyntaxError, r'Unsupported syntax', parse_predicate_formula, "~test")
# Syntax error # Syntax error
self.assertRaises(SyntaxError, parse_predicate_formula, "[(]") self.assertRaises(SyntaxError, parse_predicate_formula, "[(]")

View File

@ -12,8 +12,9 @@ from six.moves import xrange
import acl import acl
import depend import depend
import gencode import gencode
from acl_formula import parse_acl_formulas from acl import parse_acl_formulas
from dropdown_condition import parse_dropdown_conditions from dropdown_condition import parse_dropdown_conditions
import dropdown_condition
import actions import actions
import column import column
import sort_specs import sort_specs
@ -228,6 +229,12 @@ class UserActions(object):
self._overrides = {key: method.__get__(self, UserActions) self._overrides = {key: method.__get__(self, UserActions)
for key, method in six.iteritems(_action_method_overrides)} for key, method in six.iteritems(_action_method_overrides)}
def get_docmodel(self):
"""
Getter for the docmodel.
"""
return self._docmodel
@contextmanager @contextmanager
def indirect_actions(self): def indirect_actions(self):
""" """
@ -635,7 +642,7 @@ class UserActions(object):
if 'type' in values: if 'type' in values:
self.doModifyColumn(col.tableId, col.colId, {'type': 'Int'}) self.doModifyColumn(col.tableId, col.colId, {'type': 'Int'})
make_acl_updates = acl.prepare_acl_table_renames(self._docmodel, self, table_renames) make_acl_updates = acl.prepare_acl_table_renames(self, table_renames)
# Collect all the table renames, and do the actual schema actions to apply them. # Collect all the table renames, and do the actual schema actions to apply them.
for tbl, values in update_pairs: for tbl, values in update_pairs:
@ -691,6 +698,9 @@ class UserActions(object):
if has_diff_value(values, 'colId', c.colId)} if has_diff_value(values, 'colId', c.colId)}
if renames: if renames:
# When a column rename has occurred, we need to update the corresponding references in
# formula, ACL rules and dropdown conditions.
# Build up a dictionary mapping col_ref of each affected formula to the new formula text. # Build up a dictionary mapping col_ref of each affected formula to the new formula text.
formula_updates = self._prepare_formula_renames(renames) formula_updates = self._prepare_formula_renames(renames)
@ -698,6 +708,9 @@ class UserActions(object):
for col_rec, new_formula in sorted(six.iteritems(formula_updates)): for col_rec, new_formula in sorted(six.iteritems(formula_updates)):
col_updates.setdefault(col_rec, {}).setdefault('formula', new_formula) col_updates.setdefault(col_rec, {}).setdefault('formula', new_formula)
acl.perform_acl_rule_renames(self, renames)
dropdown_condition.perform_dropdown_condition_renames(self, renames)
update_pairs = col_updates.items() update_pairs = col_updates.items()
# Disallow most changes to summary group-by columns, except to match the underlying column. # Disallow most changes to summary group-by columns, except to match the underlying column.
@ -721,8 +734,6 @@ class UserActions(object):
if not allowed_summary_change(key, value, expected): if not allowed_summary_change(key, value, expected):
raise ValueError("Cannot modify summary group-by column '%s'" % col.colId) raise ValueError("Cannot modify summary group-by column '%s'" % col.colId)
make_acl_updates = acl.prepare_acl_col_renames(self._docmodel, self, renames)
rename_summary_tables = set() rename_summary_tables = set()
for c, values in update_pairs: for c, values in update_pairs:
# Trigger ModifyColumn and RenameColumn as necessary # Trigger ModifyColumn and RenameColumn as necessary
@ -745,8 +756,6 @@ class UserActions(object):
table = self._engine.tables[table_id] table = self._engine.tables[table_id]
self._engine._update_table_model(table, table.user_table) self._engine._update_table_model(table, table.user_table)
make_acl_updates()
for table in rename_summary_tables: for table in rename_summary_tables:
groupby_col_ids = [c.colId for c in table.columns if c.summarySourceCol] groupby_col_ids = [c.colId for c in table.columns if c.summarySourceCol]
new_table_id = summary.encode_summary_table_name(table.summarySourceTable.tableId, new_table_id = summary.encode_summary_table_name(table.summarySourceTable.tableId,

View File

@ -63,6 +63,13 @@ def formulaType(grist_type):
return method return method
return wrapper return wrapper
def get_referenced_table_id(col_type):
if col_type.startswith("Ref:"):
return col_type[4:]
if col_type.startswith("RefList:"):
return col_type[8:]
return None
def ifError(value, value_if_error): def ifError(value, value_if_error):
""" """