You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
gristlabs_grist-core/sandbox/grist/objtypes.py

421 lines
15 KiB

"""
This module implements handling of non-primitive objects as values in Grist data cells. It is
currently only used to handle errors thrown from formulas.
Non-primitive values are represented in actions as [type_name, args...].
objtypes.register_converter() - registers a new supported object type.
objtypes.encode_object(obj) - returns a marshallable list representation.
objtypes.decode_object(val) - returns an object represented by the [name, args...] argument.
If an object cannot be encoded or decoded, an "UnmarshallableValue" is returned instead
of the form ['U', repr(obj)].
"""
# pylint: disable=too-many-return-statements
import traceback
from datetime import date, datetime
from math import isnan
import six
import friendly_errors
import moment
import records
import depend
class UnmarshallableError(ValueError):
"""
Error raised when an object cannot be represented in an action by Grist. It happens if the
object is of a type for which there is no registered converter, or if encoding it involves
values that cannot be marshalled.
"""
pass
class ConversionError(ValueError):
"""
Indicates a failure to convert a value between Grist types. We don't usually expose it to the
user, since such a failure normally results in silent alttext.
"""
pass
class InvalidTypedValue(ValueError):
"""
Indicates that AltText was in place of a typed value and produced an error. The value of AltText
is included into the exception, both to be more informative, and to sort displayCols properly.
"""
def __init__(self, typename, value):
super(InvalidTypedValue, self).__init__(typename)
self.typename = typename
self.value = value
def __str__(self):
return "Invalid %s: %s" % (self.typename, self.value)
class AltText(object):
"""
Represents a text value in a non-text column. The separate class allows formulas to access
wrong-type values. We use a wrapper rather than expose text directly to formulas, because with
text there is a risk that e.g. a formula that's supposed to add numbers would add two strings
with unexpected result.
"""
def __init__(self, text, typename=None):
self._text = text
self._typename = typename
def __str__(self):
return self._text
def __int__(self):
# This ensures that AltText values that look like ints may be cast back to int.
# Convert to float first, since python does not allow casting strings with decimals to int.
return int(float(self._text))
def __float__(self):
# This ensures that AltText values that look like floats may be cast back to float.
return float(self._text)
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self._text)
# Allow comparing to AltText("something")
def __eq__(self, other):
return isinstance(other, self.__class__) and self._text == other._text
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash((self.__class__, self._text))
def __getattr__(self, name):
# On attempt to do $foo.Bar on an AltText value such as "hello", raise an exception that will
# show up as e.g. "Invalid Ref: hello" or "Invalid Date: hello".
raise InvalidTypedValue(self._typename, self._text)
class UnmarshallableValue(object):
"""
Represents an UnmarshallableValue. There is nothing we can do with it except encode it back.
"""
def __init__(self, value_repr):
self.value_repr = value_repr
# Unique sentinel value representing a pending value. It's encoded as ['P'], and shown to the user
# as "Loading..." text. With the switch to stored formulas, it's currently only used when a
# document was just migrated.
_pending_sentinel = object()
# A placeholder for a value hidden by access control rules.
# Depending on the types of the columns involved, copying
# a censored value and pasting elsewhere will either use
# CensoredValue.__repr__ (python) or CensoredValue.toString (typescript)
# so they should match
class CensoredValue(object):
def __repr__(self):
return 'CENSORED'
_censored_sentinel = CensoredValue()
def is_int_short(value):
return -(1<<31) <= value < (1<<31)
def safe_shift(arg, default=None):
value = arg.pop(0) if arg else None
return default if value is None else value
def safe_repr(obj):
"""
Like repr(obj) but falls back to a simpler "<type-name>" string when repr() itself fails.
"""
try:
return repr(obj)
except Exception:
return '<' + type(obj).__name__ + '>'
def strict_equal(a, b):
"""Checks the equality of the types of the values as well as the values, and handle errors."""
# pylint: disable=unidiomatic-typecheck
# Try/catch needed because some comparisons may fail (e.g. datetimes with different tzinfo)
try:
return type(a) == type(b) and a == b
except Exception:
return False
def equal_encoding(a, b):
# Compare NaNs as equal.
if isinstance(a, float) and isinstance(b, float):
return a == b or (isnan(a) and isnan(b))
# Compare bools as equal only to bools (these are distinguishable from numbers in JSON, and we
# take care to distinguish them in DB too).
if isinstance(a, bool) or isinstance(b, bool):
# pylint: disable=unidiomatic-typecheck
return type(a) == type(b) and a == b
# Note for simple types, encode_object is trivial, and will result in a non-type-specific
# comparison (e.g. 1 and 1.0 will compare equal, as would "a" and u"a"). This is to capture
# equivalence of values in their JSON representations.
return encode_object(a) == encode_object(b)
def encode_object(value):
"""
Produces a Grist-encoded version of the value, e.g. turning a Date into ['d', timestamp].
Returns ['U', repr(value)] if it fails to encode otherwise.
"""
try:
if isinstance(value, (six.text_type, float, bool)) or value is None:
return value
elif isinstance(value, six.binary_type):
return value.decode('utf8')
elif isinstance(value, six.integer_types):
if not is_int_short(value):
raise UnmarshallableError("Integer too large")
return value
elif isinstance(value, AltText):
return six.text_type(value)
elif isinstance(value, records.Record):
return ['R', value._table.table_id, value._row_id]
elif isinstance(value, RecordStub):
return ['R', value.table_id, value.row_id]
elif isinstance(value, datetime):
return ['D', moment.dt_to_ts(value), value.tzinfo.zone.name if value.tzinfo else 'UTC']
elif isinstance(value, date):
return ['d', moment.date_to_ts(value)]
elif isinstance(value, RaisedException):
return ['E'] + value.encode_args()
elif isinstance(value, (list, tuple)):
return ['L'] + [encode_object(item) for item in value]
elif isinstance(value, records.RecordSet):
return ['r', value._table.table_id, value._get_encodable_row_ids()]
elif isinstance(value, RecordSetStub):
return ['r', value.table_id, value.row_ids]
elif isinstance(value, dict):
if not all(isinstance(key, six.string_types) for key in value):
raise UnmarshallableError("Dict with non-string keys")
return ['O', {key: encode_object(val) for key, val in six.iteritems(value)}]
elif value == _pending_sentinel:
return ['P']
elif value == _censored_sentinel:
return ['C']
elif isinstance(value, UnmarshallableValue):
return ['U', value.value_repr]
except Exception as e:
pass
# We either don't know how to convert the value, or failed during the conversion. Instead we
# return an "UnmarshallableValue" object, with repr() of the value to show to the user.
return ['U', safe_repr(value)]
def decode_object(value):
"""
Given a Grist-encoded value, returns an object represented by it.
If typename is unknown, or construction fails for any reason, returns (not raises!)
RaisedException with the original exception in its .error property.
"""
try:
if not isinstance(value, (list, tuple)):
return value
code = value[0]
args = value[1:]
if code == 'R':
return RecordStub(args[0], args[1])
elif code == 'r':
return RecordSetStub(args[0], args[1])
elif code == 'D':
return moment.ts_to_dt(args[0], moment.Zone(args[1]))
elif code == 'd':
return moment.ts_to_date(args[0])
elif code == 'E':
return RaisedException.decode_args(*args)
elif code == 'L':
return [decode_object(item) for item in args]
elif code == 'l':
return ReferenceLookup(*args)
elif code == 'O':
return {decode_object(key): decode_object(val) for key, val in six.iteritems(args[0])}
elif code == 'P':
return _pending_sentinel
elif code == 'C':
return _censored_sentinel
elif code == 'U':
return UnmarshallableValue(args[0])
raise KeyError("Unknown object type code %r" % code)
except Exception as e:
return RaisedException(e)
#----------------------------------------------------------------------
class RaisedException(object):
"""
RaisedException is a special type of object which indicates that a value in a cell isn't a plain
value but an exception to be raised. All caught exceptions are wrapped in RaisedException. The
original exception is saved in the .error attribute. The traceback is saved in .details
attribute only when needed (flag include_details is set).
RaisedException is registered under a special short name ("E") to save bytes since it's such a
widely-used wrapper. To encode_args, it simply returns the entire encoded stored error, e.g.
RaisedException(ValueError("foo")) is encoded as ["E", "ValueError", "foo"].
When user_input is passed, RaisedException(ValueError("foo"), user_input=2) is encoded as:
["E", "ValueError", "foo", {u: 2}].
"""
# Marker object that indicates that there was no user input.
NO_INPUT = object()
def __init__(self, error, include_details=False, user_input=NO_INPUT):
self.user_input = user_input
self.error = error
self.details = None
self._encoded_error = None
self._name = None
self._message = None
if error is not None:
self._fill_from_error(self.has_user_input(), include_details)
error.__traceback__ = None
def encode_args(self):
if self._encoded_error is not None:
return self._encoded_error
if self.has_user_input():
user_input = {"u": encode_object(self.user_input)}
else:
user_input = None
result = [self._name, self._message, self.details, user_input]
# Trim last values that are None
while len(result) > 1 and result[-1] is None:
result.pop()
self._encoded_error = result
return result
def _fill_from_error(self, include_message=False, include_details=False):
# TODO: We should probably return all args, to communicate the error details to the browser
# and to DB (for when we store formula results). There are two concerns: one is that it's
# potentially quite verbose; the other is that it's makes the tests more annoying (again b/c
# verbose).
error = self.error
location = ""
while isinstance(error, CellError):
if not location:
location = "\n(in referenced cell {error.location})".format(error=error)
error = error.error
self._name = type(error).__name__
if include_details:
self.details = traceback.format_exc()
self._message = str(error) + location
if not (isinstance(error, (SyntaxError, depend.CircularRefError)) or error != self.error):
# For SyntaxError, the friendly message was already added earlier.
# CircularRefError and CellError are Grist-specific and have no friendly message.
self._message += friendly_errors.friendly_message(error)
elif isinstance(error, InvalidTypedValue):
self._message = error.typename
self.details = error.value
elif include_message:
self._message = str(error) + location
def has_user_input(self):
return self.user_input is not RaisedException.NO_INPUT
def no_traceback(self):
exc = RaisedException(None)
exc._name = self._name
exc.error = self.error
exc.user_input = self.user_input
exc.details = "This error is left over from before, and " + \
"the formula hasn't been triggered since then."
exc._message = self._message
return exc
@classmethod
def decode_args(cls, *args):
exc = cls(None)
args = list(args)
assert args
exc._name = safe_shift(args)
exc._message = safe_shift(args)
exc.details = safe_shift(args)
exc.user_input = safe_shift(args, {})
exc.user_input = decode_object(exc.user_input.get("u", RaisedException.NO_INPUT))
return exc
class CellError(Exception):
def __init__(self, table_id, col_id, row_id, error):
super(CellError, self).__init__(table_id, col_id, row_id, error)
self.table_id = table_id
self.col_id = col_id
self.row_id = row_id
self.error = error
def __str__(self):
return (
"{self.error.__class__.__name__} in referenced cell {self.location}"
).format(self=self)
@property
def location(self):
return "{self.table_id}[{self.row_id}].{self.col_id}".format(self=self)
class RecordList(list):
"""
A static method to recreate a RecordList from the output of __repr__.
It only restores the row_ids. The group_by and sort_by attributes are not restored.
"""
@staticmethod
def from_repr(repr_str):
if not repr_str.startswith('RecordList(['):
raise ValueError("Invalid RecordList representation")
# This is a string representation of a RecordList, which we can parse.
# > RecordList([1,2,3], group_by=%r, sort_by=%r)
# Match only rows, as group_by and sort_by are not used and can be stale.
numbers = repr_str.split('[')[1].split(']')[0].split(',')
return RecordList([int(v) for v in numbers])
"""
Just like list but allows setting custom attributes, which we use for remembering _group_by and
_sort_by attributes when storing RecordSet as usertypes.ReferenceList type.
"""
def __init__(self, row_ids, group_by=None, sort_by=None):
list.__init__(self, row_ids)
self._group_by = group_by
self._sort_by = sort_by
def __repr__(self):
return "RecordList(%s, group_by=%r, sort_by=%r)" % (
list.__repr__(self), self._group_by, self._sort_by)
# We don't currently have a good way to convert an incoming marshalled record to a proper Record
# object for an appropriate table. We don't expect incoming marshalled records at all, but if such
# a thing happens, we'll construct this RecordStub.
class RecordStub(object):
def __init__(self, table_id, row_id):
self.table_id = table_id
self.row_id = row_id
class RecordSetStub(object):
def __init__(self, table_id, row_ids):
self.table_id = table_id
self.row_ids = row_ids
class ReferenceLookup(object):
def __init__(self, value, options=None):
self.value = value
self.options = options or {}
@property
def alt_text(self):
result = self.options.get("raw")
if result is None:
values = self.value
if not isinstance(values, list):
values = [values]
result = ", ".join(map(six.text_type, values))
return result