(core) Implement new representation of ACL rules.

Summary:
- Added fields to _grist_ACLRules for the new Granular ACL representation
- Include a corresponding migration.

- Added ACLPermissions module with merging PermissionSets and converting to/from string.
- Implemented parsing of ACL formulas and compiling them into JS functions.
- Add automatic parsing of ACL formulas when ACLRules are added or updated.
- Convert GranularAccess to load and interpret new-style rules.
- Convert ACL UI to load and save new-style rules.

For now, no attempt to do anything better on the server or UI side, only to
reproduce previous behavior.

Test Plan: Added unittests for new files; fixed those for existing files.

Reviewers: paulfitz

Reviewed By: paulfitz

Differential Revision: https://phab.getgrist.com/D2664
This commit is contained in:
Dmitry S
2020-11-17 16:49:32 -05:00
parent c042935c58
commit bc3a472324
12 changed files with 1131 additions and 482 deletions

View File

@@ -0,0 +1,91 @@
import ast
import json
def parse_acl_formula(acl_formula):
"""
Parse an ACL formula expression into a parse tree that we can interpret in JS, e.g.
"rec.office == 'Seattle' and user.email in ['sally@', 'xie@']".
The idea is to support enough to express ACL rules flexibly, but we don't need to support too
much, since rules should be reasonably simple.
The returned tree has the form [NODE_TYPE, arguments...], with these NODE_TYPEs supported:
And|Or ...values
Add|Sub|Mult|Div|Mod left, right
Not operand
Eq|NotEq|Lt|LtE|Gt|GtE left, right
Is|IsNot|In|NotIn left, right
List ...elements
Const value (number, string, bool)
Name name (string)
Attr node, attr_name
"""
try:
tree = ast.parse(acl_formula, mode='eval')
return _TreeConverter().visit(tree)
except SyntaxError as err:
# In case of an error, include line and offset.
raise SyntaxError("%s on line %s col %s" % (err.args[0], err.lineno, err.offset))
def parse_acl_formula_json(acl_formula):
"""
As parse_acl_formula(), but stringifies the result, and converts empty string to empty string.
"""
return json.dumps(parse_acl_formula(acl_formula)) if acl_formula else ""
named_constants = {
'True': True,
'False': False,
'None': None,
}
class _TreeConverter(ast.NodeVisitor):
# AST nodes are documented here: https://docs.python.org/2/library/ast.html#abstract-grammar
# pylint:disable=no-self-use
def visit_Expression(self, node):
return self.visit(node.body)
def visit_BoolOp(self, node):
return [node.op.__class__.__name__] + [self.visit(v) for v in node.values]
def visit_BinOp(self, node):
if not isinstance(node.op, (ast.Add, ast.Sub, ast.Mult, ast.Div, ast.Mod)):
return self.generic_visit(node)
return [node.op.__class__.__name__, self.visit(node.left), self.visit(node.right)]
def visit_UnaryOp(self, node):
if not isinstance(node.op, (ast.Not)):
return self.generic_visit(node)
return [node.op.__class__.__name__, self.visit(node.operand)]
def visit_Compare(self, node):
# We don't try to support chained comparisons like "1 < 2 < 3" (though it wouldn't be hard).
if len(node.ops) != 1 or len(node.comparators) != 1:
raise ValueError("Can't use chained comparisons")
return [node.ops[0].__class__.__name__, self.visit(node.left), self.visit(node.comparators[0])]
def visit_Name(self, node):
if node.id in named_constants:
return ["Const", named_constants[node.id]]
return ["Name", node.id]
def visit_Attribute(self, node):
return ["Attr", self.visit(node.value), node.attr]
def visit_Num(self, node):
return ["Const", node.n]
def visit_Str(self, node):
return ["Const", node.s]
def visit_List(self, node):
return ["List"] + [self.visit(e) for e in node.elts]
def visit_Tuple(self, node):
return self.visit_List(node) # We don't distinguish tuples and lists
def generic_visit(self, node):
raise ValueError("Unsupported syntax at %s:%s" % (node.lineno, node.col_offset + 1))

View File

@@ -767,3 +767,12 @@ def migration20(tdset):
'indentation': [1 if v.id in table_views_map else 0 for v in views]
})
])
@migration(schema_version=21)
def migration21(tdset):
return tdset.apply_doc_actions([
add_column('_grist_ACLRules', 'aclFormulaParsed', 'Text'),
add_column('_grist_ACLRules', 'permissionsText', 'Text'),
add_column('_grist_ACLRules', 'rulePos', 'PositionNumber'),
add_column('_grist_ACLRules', 'userAttributes', 'Text'),
])

View File

@@ -12,7 +12,7 @@ import itertools
from collections import OrderedDict, namedtuple
import actions
SCHEMA_VERSION = 20
SCHEMA_VERSION = 21
def make_column(col_id, col_type, formula='', isFormula=False):
return {
@@ -225,17 +225,40 @@ def schema_create_actions():
# All of the ACL rules.
actions.AddTable('_grist_ACLRules', [
make_column('resource', 'Ref:_grist_ACLResources'),
make_column('permissions', 'Int'), # Bit-map of permission types. See acl.py.
make_column('principals', 'Text'), # JSON array of _grist_ACLPrincipals refs.
make_column('permissions', 'Int'), # DEPRECATED: permissionsText is used instead.
make_column('principals', 'Text'), # DEPRECATED
make_column('aclFormula', 'Text'), # Formula to apply to tableId, which should return
# additional principals for each row.
make_column('aclColumn', 'Ref:_grist_Tables_column')
# Text of match formula, in restricted Python syntax; "" for default rule.
make_column('aclFormula', 'Text'),
make_column('aclColumn', 'Ref:_grist_Tables_column'), # DEPRECATED
# JSON representation of the parse tree of matchFunc; "" for default rule.
make_column('aclFormulaParsed', 'Text'),
# Permissions in the form '[+<bits>][-<bits>]' where <bits> is a string of
# C,R,U,D,S characters, each appearing at most once. Or the special values
# 'all' or 'none'. The empty string does not affect permissions.
make_column('permissionsText', 'Text'),
# Rules for one resource are ordered by increasing rulePos. The default rule
# should be at the end (later rules would have no effect).
make_column('rulePos', 'PositionNumber'),
# If non-empty, this rule adds extra user attributes. It should contain JSON
# of the form {name, tableId, lookupColId, charId}, and should be tied to the
# resource *:*. It acts by looking up user[charId] in the given tableId on the
# given lookupColId, and adds the full looked-up record as user[name], which
# becomes available to matchFunc. These rules are processed in order of rulePos,
# which should list them before regular rules.
make_column('userAttributes', 'Text'),
]),
# Note that the special resource with tableId of '' and colIds of '' should be ignored. It is
# present to satisfy older versions of Grist (before Nov 2020).
actions.AddTable('_grist_ACLResources', [
make_column('tableId', 'Text'), # Name of the table this rule applies to, or ''
make_column('colIds', 'Text'), # Comma-separated list of colIds, or ''
make_column('tableId', 'Text'), # Name of the table this rule applies to, or '*'
make_column('colIds', 'Text'), # Comma-separated list of colIds, or '*'
]),
# DEPRECATED: All of the principals used by ACL rules, including users, groups, and instances.

View File

@@ -0,0 +1,159 @@
# -*- coding: utf-8 -*-
# pylint:disable=line-too-long
import unittest
from acl_formula import parse_acl_formula
import test_engine
class TestACLFormula(unittest.TestCase):
def test_basic(self):
# Test a few basic formulas and structures, hitting everything we expect to support
self.assertEqual(parse_acl_formula(
"user.Email == 'X@'"),
["Eq", ["Attr", ["Name", "user"], "Email"],
["Const", "X@"]])
self.assertEqual(parse_acl_formula(
"user.Role in ('editors', 'owners')"),
["In", ["Attr", ["Name", "user"], "Role"],
["List", ["Const", "editors"], ["Const", "owners"]]])
self.assertEqual(parse_acl_formula(
"user.Role not in ('editors', 'owners')"),
["NotIn", ["Attr", ["Name", "user"], "Role"],
["List", ["Const", "editors"], ["Const", "owners"]]])
self.assertEqual(parse_acl_formula(
"rec.office == 'Seattle' and user.email in ['sally@', 'xie@']"),
['And',
['Eq', ['Attr', ['Name', 'rec'], 'office'], ['Const', 'Seattle']],
['In',
['Attr', ['Name', 'user'], 'email'],
['List', ['Const', 'sally@'], ['Const', 'xie@']]
]])
self.assertEqual(parse_acl_formula(
"user.IsAdmin or rec.assigned is None or (not newRec.HasDuplicates and rec.StatusIndex <= newRec.StatusIndex)"),
['Or',
['Attr', ['Name', 'user'], 'IsAdmin'],
['Is', ['Attr', ['Name', 'rec'], 'assigned'], ['Const', None]],
['And',
['Not', ['Attr', ['Name', 'newRec'], 'HasDuplicates']],
['LtE', ['Attr', ['Name', 'rec'], 'StatusIndex'], ['Attr', ['Name', 'newRec'], 'StatusIndex']]
]
])
self.assertEqual(parse_acl_formula(
"r.A <= n.A + 1 or r.A >= n.A - 1 or r.B < n.B * 2.5 or r.B > n.B / 2.5 or r.C % 2 != 0"),
['Or',
['LtE',
['Attr', ['Name', 'r'], 'A'],
['Add', ['Attr', ['Name', 'n'], 'A'], ['Const', 1]]],
['GtE',
['Attr', ['Name', 'r'], 'A'],
['Sub', ['Attr', ['Name', 'n'], 'A'], ['Const', 1]]],
['Lt',
['Attr', ['Name', 'r'], 'B'],
['Mult', ['Attr', ['Name', 'n'], 'B'], ['Const', 2.5]]],
['Gt',
['Attr', ['Name', 'r'], 'B'],
['Div', ['Attr', ['Name', 'n'], 'B'], ['Const', 2.5]]],
['NotEq',
['Mod', ['Attr', ['Name', 'r'], 'C'], ['Const', 2]],
['Const', 0]]
])
self.assertEqual(parse_acl_formula(
"rec.A is True or rec.A is not False"),
['Or',
['Is', ['Attr', ['Name', 'rec'], 'A'], ['Const', True]],
['IsNot', ['Attr', ['Name', 'rec'], 'A'], ['Const', False]]
])
self.assertEqual(parse_acl_formula(
"user.Office.City == 'Seattle' and user.Status.IsActive"),
['And',
['Eq',
['Attr', ['Attr', ['Name', 'user'], 'Office'], 'City'],
['Const', 'Seattle']],
['Attr', ['Attr', ['Name', 'user'], 'Status'], 'IsActive']
])
def test_unsupported(self):
# Test a few constructs we expect to fail
# Not an expression
self.assertRaises(SyntaxError, parse_acl_formula, "return 1")
self.assertRaises(SyntaxError, parse_acl_formula, "def foo(): pass")
# Unsupported node type
self.assertRaisesRegexp(ValueError, r'Unsupported syntax', parse_acl_formula, "max(rec)")
self.assertRaisesRegexp(ValueError, r'Unsupported syntax', parse_acl_formula, "user.id in {1, 2, 3}")
self.assertRaisesRegexp(ValueError, r'Unsupported syntax', parse_acl_formula, "1 if user.IsAnon else 2")
# Unsupported operation
self.assertRaisesRegexp(ValueError, r'Unsupported syntax', parse_acl_formula, "1 | 2")
self.assertRaisesRegexp(ValueError, r'Unsupported syntax', parse_acl_formula, "1 << 2")
self.assertRaisesRegexp(ValueError, r'Unsupported syntax', parse_acl_formula, "~test")
# Syntax error
self.assertRaises(SyntaxError, parse_acl_formula, "[(]")
self.assertRaises(SyntaxError, parse_acl_formula, "user.id in (1,2))")
self.assertRaisesRegexp(SyntaxError, r'invalid syntax on line 1 col 9', parse_acl_formula, "foo and !bar")
class TestACLFormulaUserActions(test_engine.EngineTestCase):
def test_acl_actions(self):
# Adding or updating ACLRules automatically includes aclFormula compilation.
# Single Add
out_actions = self.apply_user_action(
['AddRecord', '_grist_ACLRules', None, {"resource": 1, "aclFormula": "user.UserID == 7"}],
)
self.assertPartialOutActions(out_actions, { "stored": [
["AddRecord", "_grist_ACLRules", 1, {"resource": 1, "aclFormula": "user.UserID == 7",
"aclFormulaParsed": '["Eq", ["Attr", ["Name", "user"], "UserID"], ["Const", 7]]',
"rulePos": 1.0
}],
]})
# Single Update
out_actions = self.apply_user_action(
['UpdateRecord', '_grist_ACLRules', 1, {
"aclFormula": "user.UserID == 8",
"aclFormulaParsed": "hello"
}],
)
self.assertPartialOutActions(out_actions, { "stored": [
["UpdateRecord", "_grist_ACLRules", 1, {
"aclFormula": "user.UserID == 8",
"aclFormulaParsed": '["Eq", ["Attr", ["Name", "user"], "UserID"], ["Const", 8]]',
}],
]})
# BulkAddRecord
out_actions = self.apply_user_action(['BulkAddRecord', '_grist_ACLRules', [None, None], {
"resource": [1, 1],
"aclFormula": ["user.IsGood", "user.IsBad"],
"aclFormulaParsed": ["[1]", '["ignored"]'], # Should get overwritten
}])
self.assertPartialOutActions(out_actions, { "stored": [
[ 'BulkAddRecord', '_grist_ACLRules', [2, 3], {
"resource": [1, 1],
"aclFormula": ["user.IsGood", "user.IsBad"],
"aclFormulaParsed": [ # Gets overwritten
'["Attr", ["Name", "user"], "IsGood"]',
'["Attr", ["Name", "user"], "IsBad"]',
],
"rulePos": [2.0, 3.0], # Gets filled in.
}],
]})
# BulkUpdateRecord
out_actions = self.apply_user_action(['BulkUpdateRecord', '_grist_ACLRules', [2, 3], {
"aclFormula": ["not user.IsGood", ""],
}])
self.assertPartialOutActions(out_actions, { "stored": [
[ 'BulkUpdateRecord', '_grist_ACLRules', [2, 3], {
"aclFormula": ["not user.IsGood", ""],
"aclFormulaParsed": ['["Not", ["Attr", ["Name", "user"], "IsGood"]]', ''],
}],
]})

View File

@@ -5,6 +5,7 @@ import json
import sys
import acl
from acl_formula import parse_acl_formula_json
import actions
import column
import identifiers
@@ -277,7 +278,8 @@ class UserActions(object):
column_values = actions.decode_bulk_values(column_values)
for col_id, values in column_values.iteritems():
self._ensure_column_accepts_data(table_id, col_id, values)
return self.doBulkAddOrReplace(table_id, row_ids, column_values, replace=False)
method = self._overrides.get(('BulkAddRecord', table_id), self.doBulkAddOrReplace)
return method(table_id, row_ids, column_values)
@useraction
def ReplaceTableData(self, table_id, row_ids, column_values):
@@ -325,6 +327,13 @@ class UserActions(object):
return filled_row_ids
@override_action('BulkAddRecord', '_grist_ACLRules')
def _addACLRules(self, table_id, row_ids, col_values):
# Automatically populate aclFormulaParsed value by parsing aclFormula.
if 'aclFormula' in col_values:
col_values['aclFormulaParsed'] = map(parse_acl_formula_json, col_values['aclFormula'])
return self.doBulkAddOrReplace(table_id, row_ids, col_values)
#----------------------------------------
# UpdateRecords & co.
#----------------------------------------
@@ -376,7 +385,6 @@ class UserActions(object):
method = self._overrides.get(('BulkUpdateRecord', table_id), self.doBulkUpdateRecord)
method(table_id, row_ids, columns)
@override_action('BulkUpdateRecord', '_grist_Validations')
def _updateValidationRecords(self, table_id, row_ids, col_values):
for i, rec, values in self._bulk_action_iter(table_id, row_ids, col_values):
@@ -550,6 +558,13 @@ class UserActions(object):
self.doBulkUpdateRecord(table_id, row_ids, col_values)
@override_action('BulkUpdateRecord', '_grist_ACLRules')
def _updateACLRules(self, table_id, row_ids, col_values):
# Automatically populate aclFormulaParsed value by parsing aclFormula.
if 'aclFormula' in col_values:
col_values['aclFormulaParsed'] = map(parse_acl_formula_json, col_values['aclFormula'])
return self.doBulkUpdateRecord(table_id, row_ids, col_values)
def _prepare_formula_renames(self, renames):
"""
Helper that accepts a dict of {(table_id, col_id): new_name} (where col_id is None when table