mirror of
https://github.com/gristlabs/grist-core.git
synced 2024-10-27 20:44:07 +00:00
391c8ee087
Summary: Replaces https://phab.getgrist.com/D3940, particularly to avoid doing potentially unwanted things automatically. Adds optional fields `evaluateCurrentFormula?: boolean; rowId?: number` to `FormulaAssistanceContext` (part of `AssistanceRequest`). When `evaluateCurrentFormula` is `true`, calls a new function `evaluate_formula` in the sandbox which computes the existing formula in the column (regardless of anything the AI may have suggested) and uses that to generate an additional system message which is added before the user's message. In theory this could be used in an interface where users ask why a formula doesn't work, including possibly a formula suggested by the AI. For now, it's only used in `runCompletion_impl.ts` for experimenting. Also cleaned up a bit, removing `_chatMode` which is always `true` now, and uses of `regenerate` which is always `false`. Test Plan: Updated `runCompletion_impl` to optionally use the new feature, in which case it now scores 51/68 instead of 49/68. Reviewers: paulfitz Reviewed By: paulfitz Differential Revision: https://phab.getgrist.com/D3970
283 lines
8.8 KiB
Python
283 lines
8.8 KiB
Python
import ast
|
|
import json
|
|
import re
|
|
import textwrap
|
|
|
|
import asttokens
|
|
import asttokens.util
|
|
import six
|
|
|
|
import attribute_recorder
|
|
import objtypes
|
|
from codebuilder import make_formula_body
|
|
from column import is_visible_column, BaseReferenceColumn
|
|
from objtypes import RaisedException
|
|
import records
|
|
|
|
|
|
def column_type(engine, table_id, col_id):
|
|
col_rec = engine.docmodel.get_column_rec(table_id, col_id)
|
|
typ = col_rec.type
|
|
parts = typ.split(":")
|
|
if parts[0] == "Ref":
|
|
return parts[1]
|
|
elif parts[0] == "RefList":
|
|
return "List[{}]".format(parts[1])
|
|
elif typ == "Choice":
|
|
return choices(col_rec)
|
|
elif typ == "ChoiceList":
|
|
return "Tuple[{}, ...]".format(choices(col_rec))
|
|
elif typ == "Any":
|
|
table = engine.tables[table_id]
|
|
col = table.get_column(col_id)
|
|
values = [col.raw_get(row_id) for row_id in table.row_ids]
|
|
return values_type(values)
|
|
else:
|
|
return dict(
|
|
Text="str",
|
|
Numeric="float",
|
|
Int="int",
|
|
Bool="bool",
|
|
Date="datetime.date",
|
|
DateTime="datetime.datetime",
|
|
Any="Any",
|
|
Attachments="Any",
|
|
)[parts[0]]
|
|
|
|
def choices(col_rec):
|
|
try:
|
|
widget_options = json.loads(col_rec.widgetOptions)
|
|
return "Literal{}".format(widget_options["choices"])
|
|
except (ValueError, KeyError):
|
|
return 'str'
|
|
|
|
|
|
def values_type(values):
|
|
types = set(type(v) for v in values) - {RaisedException}
|
|
optional = type(None) in types # pylint: disable=unidiomatic-typecheck
|
|
types.discard(type(None))
|
|
|
|
if types == {int, float}:
|
|
types = {float}
|
|
|
|
if len(types) != 1:
|
|
return "Any"
|
|
|
|
[typ] = types
|
|
val = next(v for v in values if isinstance(v, typ))
|
|
|
|
if isinstance(val, records.Record):
|
|
type_name = val._table.table_id
|
|
elif isinstance(val, records.RecordSet):
|
|
type_name = "List[{}]".format(val._table.table_id)
|
|
elif isinstance(val, list):
|
|
type_name = "List[{}]".format(values_type(val))
|
|
elif isinstance(val, set):
|
|
type_name = "Set[{}]".format(values_type(val))
|
|
elif isinstance(val, tuple):
|
|
type_name = "Tuple[{}, ...]".format(values_type(val))
|
|
elif isinstance(val, dict):
|
|
type_name = "Dict[{}, {}]".format(values_type(val.keys()), values_type(val.values()))
|
|
else:
|
|
type_name = typ.__name__
|
|
|
|
if optional:
|
|
type_name = "Optional[{}]".format(type_name)
|
|
|
|
return type_name
|
|
|
|
|
|
def referenced_tables(engine, table_id):
|
|
result = set()
|
|
queue = [table_id]
|
|
while queue:
|
|
cur_table_id = queue.pop()
|
|
if cur_table_id in result:
|
|
continue
|
|
result.add(cur_table_id)
|
|
for col_id, col in visible_columns(engine, cur_table_id):
|
|
if isinstance(col, BaseReferenceColumn):
|
|
target_table_id = col._target_table.table_id
|
|
if not target_table_id.startswith("_"):
|
|
queue.append(target_table_id)
|
|
return result - {table_id}
|
|
|
|
def all_other_tables(engine, table_id):
|
|
result = set(t for t in engine.tables.keys() if not t.startswith('_grist'))
|
|
return result - {table_id} - {'GristDocTour'}
|
|
|
|
def visible_columns(engine, table_id):
|
|
return [
|
|
(col_id, col)
|
|
for col_id, col in engine.tables[table_id].all_columns.items()
|
|
if is_visible_column(col_id)
|
|
]
|
|
|
|
|
|
def class_schema(engine, table_id, exclude_col_id=None, lookups=False):
|
|
result = "@dataclass\nclass {}:\n".format(table_id)
|
|
|
|
if lookups:
|
|
|
|
# Build a lookupRecords and lookupOne method for each table, providing some arguments hints
|
|
# for the columns that are visible.
|
|
lookupRecords_args = []
|
|
lookupOne_args = []
|
|
for col_id, col in visible_columns(engine, table_id):
|
|
if col_id != exclude_col_id:
|
|
lookupOne_args.append(col_id + '=None')
|
|
lookupRecords_args.append('%s=%s' % (col_id, col_id))
|
|
lookupOne_args.append('sort_by=None')
|
|
lookupRecords_args.append('sort_by=sort_by')
|
|
lookupOne_args_line = ', '.join(lookupOne_args)
|
|
lookupRecords_args_line = ', '.join(lookupRecords_args)
|
|
|
|
result += " def __len__(self):\n"
|
|
result += " return len(%s.lookupRecords())\n" % table_id
|
|
result += " @staticmethod\n"
|
|
result += " def lookupRecords(%s) -> List[%s]:\n" % (lookupOne_args_line, table_id)
|
|
result += " # ...\n"
|
|
result += " @staticmethod\n"
|
|
result += " def lookupOne(%s) -> %s:\n" % (lookupOne_args_line, table_id)
|
|
result += " '''\n"
|
|
result += " Filter for one result matching the keys provided.\n"
|
|
result += " To control order, use e.g. `sort_by='Key' or `sort_by='-Key'`.\n"
|
|
result += " '''\n"
|
|
result += " return %s.lookupRecords(%s)[0]\n" % (table_id, lookupRecords_args_line)
|
|
result += "\n"
|
|
|
|
for col_id, col in visible_columns(engine, table_id):
|
|
if col_id != exclude_col_id:
|
|
result += " {}: {}\n".format(col_id, column_type(engine, table_id, col_id))
|
|
result += "\n"
|
|
return result
|
|
|
|
|
|
def get_formula_prompt(engine, table_id, col_id, _description,
|
|
include_all_tables=True,
|
|
lookups=True):
|
|
result = ""
|
|
other_tables = (all_other_tables(engine, table_id)
|
|
if include_all_tables else referenced_tables(engine, table_id))
|
|
for other_table_id in sorted(other_tables):
|
|
result += class_schema(engine, other_table_id, lookups)
|
|
|
|
result += class_schema(engine, table_id, col_id, lookups)
|
|
|
|
return_type = column_type(engine, table_id, col_id)
|
|
result += " @property\n"
|
|
result += " # rec is alias for self\n"
|
|
result += " def {}(rec) -> {}:\n".format(col_id, return_type)
|
|
result += " # Please fill in code only after this line, not the `def`\n"
|
|
return result
|
|
|
|
def indent(text, prefix, predicate=None):
|
|
"""
|
|
Copied from https://github.com/python/cpython/blob/main/Lib/textwrap.py for python2 compatibility.
|
|
"""
|
|
if six.PY3:
|
|
return textwrap.indent(text, prefix, predicate) # pylint: disable = no-member
|
|
if predicate is None:
|
|
def predicate(line):
|
|
return line.strip()
|
|
def prefixed_lines():
|
|
for line in text.splitlines(True):
|
|
yield (prefix + line if predicate(line) else line)
|
|
return ''.join(prefixed_lines())
|
|
|
|
|
|
def convert_completion(completion):
|
|
# Extract code from a markdown code block if needed.
|
|
match = re.search(r"```\w*\n(.*)```", completion, re.DOTALL)
|
|
if match:
|
|
completion = match.group(1)
|
|
|
|
result = textwrap.dedent(completion)
|
|
|
|
try:
|
|
atok = asttokens.ASTTokens(result, parse=True)
|
|
except SyntaxError:
|
|
# If we don't have valid Python code, don't suggest a formula at all
|
|
return ""
|
|
|
|
stmts = atok.tree.body
|
|
|
|
# If the code starts with imports, save them for later.
|
|
# In particular, the model may return something like:
|
|
# from datetime import date
|
|
# def my_column():
|
|
# ...
|
|
# We want to return just the function body, but we need to keep the import,
|
|
# i.e. move it 'inside the function'.
|
|
imports = ""
|
|
while stmts and isinstance(stmts[0], (ast.Import, ast.ImportFrom)):
|
|
imports += atok.get_text(stmts.pop(0)) + "\n"
|
|
|
|
# If the non-import code consists only of a function definition, extract the body.
|
|
if len(stmts) == 1 and isinstance(stmts[0], ast.FunctionDef):
|
|
func_body_stmts = stmts[0].body
|
|
if (
|
|
len(func_body_stmts) > 1 and
|
|
isinstance(func_body_stmts[0], ast.Expr) and
|
|
isinstance(func_body_stmts[0].value, ast.Str)
|
|
):
|
|
# Skip the docstring.
|
|
first_stmt = func_body_stmts[1]
|
|
else:
|
|
first_stmt = func_body_stmts[0]
|
|
result_lines = result.splitlines()[first_stmt.lineno - 1:]
|
|
result = "\n".join(result_lines)
|
|
result = textwrap.dedent(result)
|
|
|
|
if imports:
|
|
result = imports + "\n" + result
|
|
|
|
# Now convert `rec.` to `$` and remove redundant `return ` at the end.
|
|
try:
|
|
atok = asttokens.ASTTokens(result, parse=True)
|
|
except SyntaxError:
|
|
# In case the above extraction somehow messed things up
|
|
return ""
|
|
|
|
replacements = []
|
|
for node in ast.walk(atok.tree):
|
|
if isinstance(node, ast.Attribute):
|
|
start, end = atok.get_text_range(node.value)
|
|
end += 1
|
|
if result[start:end] == "rec.":
|
|
replacements.append((start, end, "$"))
|
|
|
|
last_stmt = atok.tree.body[-1]
|
|
if isinstance(last_stmt, ast.Return):
|
|
start, _ = atok.get_text_range(last_stmt)
|
|
expected = "return "
|
|
end = start + len(expected)
|
|
if result[start:end] == expected:
|
|
replacements.append((start, end, ""))
|
|
|
|
result = asttokens.util.replace(result, replacements)
|
|
|
|
return result.strip()
|
|
|
|
|
|
def evaluate_formula(engine, table_id, col_id, row_id):
|
|
grist_formula = engine.docmodel.get_column_rec(table_id, col_id).formula
|
|
assert grist_formula
|
|
plain_formula = make_formula_body(grist_formula, default_value=None).get_text()
|
|
|
|
attributes = {}
|
|
result = engine.get_formula_value(table_id, col_id, row_id, record_attributes=attributes)
|
|
if isinstance(result, objtypes.RaisedException):
|
|
name, message = result.encode_args()[:2]
|
|
result = "%s: %s" % (name, message)
|
|
error = True
|
|
else:
|
|
result = attribute_recorder.safe_repr(result)
|
|
error = False
|
|
return dict(
|
|
error=error,
|
|
formula=plain_formula,
|
|
result=result,
|
|
attributes=attributes,
|
|
)
|