diff --git a/src/spreadsheet/tests/test_spreadsheet_parser.py b/src/spreadsheet/tests/test_spreadsheet_parser.py new file mode 100644 index 000000000..846735cd1 --- /dev/null +++ b/src/spreadsheet/tests/test_spreadsheet_parser.py @@ -0,0 +1,177 @@ +from odoo.addons.base.maintenance.migrations.testing import UnitTestCase +from odoo.addons.base.maintenance.migrations.util.spreadsheet.parser import ( + BinaryOperation, + FunctionCall, + Literal, + UnaryOperation, + ast_to_string, + parse, +) + + +class SpreadsheetParserTest(UnitTestCase): + def test_can_parse_a_function_call_with_no_argument(self): + self.assertEqual(parse("RAND()"), FunctionCall("RAND", [])) + + def test_can_parse_a_function_call_with_one_argument(self): + self.assertEqual( + parse("SUM(1)"), + FunctionCall("SUM", [Literal("NUMBER", "1")]), + ) + + def test_can_parse_a_function_call_with_function_argument(self): + self.assertEqual( + parse("SUM(UMINUS(1))"), + FunctionCall("SUM", [FunctionCall("UMINUS", [Literal("NUMBER", "1")])]), + ) + + def test_can_parse_a_function_call_with_sub_expressions_as_argument(self): + self.assertEqual( + parse("IF(A1 > 0, 1, 2)"), + FunctionCall( + "IF", + [ + BinaryOperation(">", Literal("UNKNOWN", "A1"), Literal("NUMBER", "0")), + Literal("NUMBER", "1"), + Literal("NUMBER", "2"), + ], + ), + ) + + def test_add_a_unknown_token_for_empty_arguments(self): + self.assertEqual( + parse("SUM(1,)"), + FunctionCall("SUM", [Literal("NUMBER", "1"), Literal("EMPTY", "")]), + ) + + self.assertEqual( + parse("SUM(,1)"), + FunctionCall("SUM", [Literal("EMPTY", ""), Literal("NUMBER", "1")]), + ) + + self.assertEqual( + parse("SUM(,)"), + FunctionCall("SUM", [Literal("EMPTY", ""), Literal("EMPTY", "")]), + ) + + self.assertEqual( + parse("SUM(,,)"), + FunctionCall("SUM", [Literal("EMPTY", ""), Literal("EMPTY", ""), Literal("EMPTY", "")]), + ) + + self.assertEqual( + parse("SUM(,,,1)"), + FunctionCall("SUM", [Literal("EMPTY", ""), Literal("EMPTY", ""), Literal("EMPTY", ""), Literal("NUMBER", "1")]), + ) + + def test_can_parse_unary_operations(self): + self.assertEqual( + parse("-1"), + UnaryOperation("-", Literal("NUMBER", "1")), + ) + self.assertEqual( + parse("+1"), + UnaryOperation("+", Literal("NUMBER", "1")), + ) + + def test_can_parse_numeric_values(self): + self.assertEqual(parse("1"), Literal("NUMBER", "1")) + self.assertEqual(parse("1.5"), Literal("NUMBER", "1.5")) + self.assertEqual(parse("1."), Literal("NUMBER", "1.")) + self.assertEqual(parse(".5"), Literal("NUMBER", ".5")) + + def test_can_parse_string_values(self): + self.assertEqual(parse('"Hello"'), Literal("STRING", "Hello")) + + def test_can_parse_number_expressed_as_percent(self): + self.assertEqual(parse("1%"), Literal("NUMBER", "1%")) + self.assertEqual(parse("100%"), Literal("NUMBER", "100%")) + self.assertEqual(parse("50.0%"), Literal("NUMBER", "50.0%")) + + def test_can_parse_binary_operations(self): + self.assertEqual( + parse("2-3"), + BinaryOperation("-", Literal("NUMBER", "2"), Literal("NUMBER", "3")), + ) + + def test_can_parse_concat_operator(self): + self.assertEqual( + parse("A1&A2"), + BinaryOperation("&", Literal("UNKNOWN", "A1"), Literal("UNKNOWN", "A2")), + ) + + def test_AND(self): + self.assertEqual( + parse("=AND(true, false)"), + FunctionCall("AND", [Literal("BOOLEAN", "true"), Literal("BOOLEAN", "false")]), + ) + self.assertEqual( + parse("=AND(0, tRuE)"), + FunctionCall("AND", [Literal("NUMBER", "0"), Literal("BOOLEAN", "tRuE")]), + ) + + def test_convert_string(self): + self.assertEqual(ast_to_string(parse('"hello"')), '"hello"') + + def test_convert_debugger(self): + self.assertEqual(ast_to_string(parse("?5+2")), "5+2") + + def test_convert_boolean(self): + self.assertEqual(ast_to_string(parse("TRUE")), "TRUE") + self.assertEqual(ast_to_string(parse("FALSE")), "FALSE") + + def test_convert_unary_operator(self): + self.assertEqual(ast_to_string(parse("-45")), "-45") + self.assertEqual(ast_to_string(parse("+45")), "+45") + self.assertEqual(ast_to_string(parse("-(4+5)")), "-(4+5)") + self.assertEqual(ast_to_string(parse("-4+5")), "-4+5") + self.assertEqual(ast_to_string(parse("-SUM(1)")), "-SUM(1)") + self.assertEqual(ast_to_string(parse("-(1+2)/5")), "-(1+2)/5") + self.assertEqual(ast_to_string(parse("1*-(1+2)")), "1*-(1+2)") + + def test_convert_binary_operator(self): + self.assertEqual(ast_to_string(parse("89-45")), "89-45") + self.assertEqual(ast_to_string(parse("1+2+5")), "1+2+5") + self.assertEqual(ast_to_string(parse("(1+2)/5")), "(1+2)/5") + self.assertEqual(ast_to_string(parse("5/(1+2)")), "5/(1+2)") + self.assertEqual(ast_to_string(parse("2/(1*2)")), "2/(1*2)") + self.assertEqual(ast_to_string(parse("1-2+3")), "1-2+3") + self.assertEqual(ast_to_string(parse("1-(2+3)")), "1-(2+3)") + self.assertEqual(ast_to_string(parse("(1+2)-3")), "1+2-3") + self.assertEqual(ast_to_string(parse("(1<5)+5")), "(1<5)+5") + self.assertEqual(ast_to_string(parse("1*(4*2+3)")), "1*(4*2+3)") + self.assertEqual(ast_to_string(parse("1*(4+2*3)")), "1*(4+2*3)") + self.assertEqual(ast_to_string(parse("1*(4*2+3*9)")), "1*(4*2+3*9)") + self.assertEqual(ast_to_string(parse("1*(4-(2+3))")), "1*(4-(2+3))") + self.assertEqual(ast_to_string(parse("1/(2*(2+3))")), "1/(2*(2+3))") + self.assertEqual(ast_to_string(parse("1/((2+3)*2)")), "1/((2+3)*2)") + self.assertEqual(ast_to_string(parse("2<(1<1)")), "2<(1<1)") + self.assertEqual(ast_to_string(parse("2<=(1<1)")), "2<=(1<1)") + self.assertEqual(ast_to_string(parse("2>(1<1)")), "2>(1<1)") + self.assertEqual(ast_to_string(parse("2>=(1<1)")), "2>=(1<1)") + self.assertEqual(ast_to_string(parse("TRUE=1=1")), "TRUE=1=1") + self.assertEqual(ast_to_string(parse("TRUE=(1=1)")), "TRUE=(1=1)") + + def test_convert_function(self): + self.assertEqual(ast_to_string(parse("SUM(5,9,8)")), "SUM(5,9,8)") + self.assertEqual(ast_to_string(parse("-SUM(5,9,SUM(5,9,8))")), "-SUM(5,9,SUM(5,9,8))") + + def test_convert_references(self): + self.assertEqual(ast_to_string(parse("A10")), "A10") + self.assertEqual(ast_to_string(parse("Sheet1!A10")), "Sheet1!A10") + self.assertEqual(ast_to_string(parse("'Sheet 1'!A10")), "'Sheet 1'!A10") + self.assertEqual(ast_to_string(parse("'Sheet 1'!A10:A11")), "'Sheet 1'!A10:A11") + self.assertEqual(ast_to_string(parse("SUM(A1,A2)")), "SUM(A1,A2)") + + def test_convert_strings(self): + self.assertEqual(ast_to_string(parse('"R"')), '"R"') + self.assertEqual(ast_to_string(parse('CONCAT("R", "EM")')), 'CONCAT("R","EM")') + + def test_convert_numbers(self): + self.assertEqual(ast_to_string(parse("5")), "5") + self.assertEqual(ast_to_string(parse("5+4")), "5+4") + self.assertEqual(ast_to_string(parse("+5")), "+5") + self.assertEqual(ast_to_string(parse("1%")), "1%") + self.assertEqual(ast_to_string(parse("1.5")), "1.5") + self.assertEqual(ast_to_string(parse("1.")), "1.") + self.assertEqual(ast_to_string(parse(".5")), ".5") diff --git a/src/util/context.py b/src/util/context.py new file mode 100644 index 000000000..f1cb5855f --- /dev/null +++ b/src/util/context.py @@ -0,0 +1,98 @@ +# -*- coding: utf-8 -*- + +# python3 shims +try: + basestring # noqa: B018 +except NameError: + basestring = str + + +_CONTEXT_KEYS_TO_CLEAN = ( + "group_by", + "pivot_measures", + "pivot_column_groupby", + "pivot_row_groupby", + "graph_groupbys", + "orderedBy", +) + +def clean_context(context, fieldname): + """Remove (in place) all references to the field in the context dictionary.""" + + def filter_value(key, value): + if key == "orderedBy" and isinstance(value, dict): + res = {k: (filter_value(None, v) if k == "name" else v) for k, v in value.items()} + # return if name didn't match fieldname + return res if "name" not in res or res["name"] is not None else None + if not isinstance(value, basestring): + # if not a string, ignore it + return value + if value.split(":")[0] != fieldname: + # only return if not matching fieldname + return value + return None # value filtered out + + if not isinstance(context, dict): + return False + + changed = False + for key in _CONTEXT_KEYS_TO_CLEAN: + if context.get(key): + context_part = [filter_value(key, e) for e in context[key]] + changed |= context_part != context[key] + context[key] = [e for e in context_part if e is not None] + + for vt in ["pivot", "graph", "cohort"]: + key = "{}_measure".format(vt) + if key in context: + new_value = filter_value(key, context[key]) + changed |= context[key] != new_value + context[key] = new_value if new_value is not None else "id" + + if vt in context: + changed |= clean_context(context[vt]) + + return changed + +def adapt_context(context, old, new): + """Replace (in place) all references to field `old` to `new` in the context dictionary.""" + + # adapt (in place) dictionary values + if not isinstance(context, dict): + return + + for key in _CONTEXT_KEYS_TO_CLEAN: + if context.get(key): + context[key] = [_adapt_context_value(key, e, old, new) for e in context[key]] + + for vt in ["pivot", "graph", "cohort"]: + key = "{}_measure".format(vt) + if key in context: + context[key] = _adapt_context_value(key, context[key], old, new) + + if vt in context: + adapt_context(context[vt], old, new) + + def_old = "default_{}".format(old) + def_new = "default_{}".format(new) + + if def_old in context: + context[def_new] = context.pop(def_old) + + +def _adapt_context_value(key, value, old, new): + if key == "orderedBy" and isinstance(value, dict): + # only adapt the "name" key + return {k: (_adapt_context_value(None, v, old, new) if k == "name" else v) for k, v in value.items()} + + if not isinstance(value, basestring): + # ignore if not a string + return value + + parts = value.split(":", 1) + if parts[0] != old: + # if not match old, leave it + return value + # change to new, and return it + parts[0] = new + return ":".join(parts) diff --git a/src/util/fields.py b/src/util/fields.py index 1a8de8de5..e5ff2c507 100644 --- a/src/util/fields.py +++ b/src/util/fields.py @@ -37,7 +37,9 @@ def make_index_name(table_name, column_name): return "%s_%s_index" % (table_name, column_name) +from . import spreadsheet from .const import ENVIRON +from .context import adapt_context, clean_context from .domains import _adapt_one_domain, _replace_path, _valid_path_to, adapt_domains from .exceptions import SleepyDeveloperError from .helpers import _dashboard_actions, _validate_model, resolve_model_fields_path, table_of_model @@ -62,24 +64,14 @@ def make_index_name(table_name, column_name): # python3 shims try: - basestring # noqa: B018 + unicode # noqa: B018 except NameError: - basestring = unicode = str + unicode = str _logger = logging.getLogger(__name__) IMD_FIELD_PATTERN = "field_%s__%s" if version_gte("saas~11.2") else "field_%s_%s" -_CONTEXT_KEYS_TO_CLEAN = ( - "group_by", - "pivot_measures", - "pivot_column_groupby", - "pivot_row_groupby", - "graph_groupbys", - "orderedBy", -) - - def ensure_m2o_func_field_data(cr, src_table, column, dst_table): """ Fix broken m2o relations. @@ -123,46 +115,10 @@ def remove_field(cr, model, fieldname, cascade=False, drop_column=True, skip_inh ENVIRON["__renamed_fields"][model][fieldname] = None - def filter_value(key, value): - if key == "orderedBy" and isinstance(value, dict): - res = {k: (filter_value(None, v) if k == "name" else v) for k, v in value.items()} - # return if name didn't match fieldname - return res if "name" not in res or res["name"] is not None else None - if not isinstance(value, basestring): - # if not a string, ignore it - return value - if value.split(":")[0] != fieldname: - # only return if not matching fieldname - return value - return None # value filtered out - - def clean_context(context): - if not isinstance(context, dict): - return False - - changed = False - for key in _CONTEXT_KEYS_TO_CLEAN: - if context.get(key): - context_part = [filter_value(key, e) for e in context[key]] - changed |= context_part != context[key] - context[key] = [e for e in context_part if e is not None] - - for vt in ["pivot", "graph", "cohort"]: - key = "{}_measure".format(vt) - if key in context: - new_value = filter_value(key, context[key]) - changed |= context[key] != new_value - context[key] = new_value if new_value is not None else "id" - - if vt in context: - changed |= clean_context(context[vt]) - - return changed - # clean dashboard's contexts for id_, action in _dashboard_actions(cr, r"\y{}\y".format(fieldname), model): context = safe_eval(action.get("context", "{}"), SelfPrintEvalContext(), nocopy=True) - changed = clean_context(context) + changed = clean_context(context, fieldname) action.set("context", unicode(context)) if changed: add_to_migration_reports( @@ -176,7 +132,7 @@ def clean_context(context): ) for id_, name, context_s in cr.fetchall(): context = safe_eval(context_s or "{}", SelfPrintEvalContext(), nocopy=True) - changed = clean_context(context) + changed = clean_context(context, fieldname) cr.execute("UPDATE ir_filters SET context = %s WHERE id = %s", [unicode(context), id_]) if changed: add_to_migration_reports(("ir.filters", id_, name), "Filters/Dashboards") @@ -365,6 +321,8 @@ def adapter(leaf, is_or, negated): for inh in for_each_inherit(cr, model, skip_inherit): remove_field(cr, inh.model, fieldname, cascade=cascade, drop_column=drop_column, skip_inherit=skip_inherit) + spreadsheet.remove_field_in_all_spreadsheets(cr, model, fieldname) + def remove_field_metadata(cr, model, fieldname, skip_inherit=()): """ @@ -579,6 +537,8 @@ def rename_field(cr, model, old, new, update_references=True, domain_adapter=Non for inh in for_each_inherit(cr, model, skip_inherit): rename_field(cr, inh.model, old, new, update_references=update_references, skip_inherit=skip_inherit) + spreadsheet.rename_field_in_all_spreadsheets(cr, model, old, new) + def convert_field_to_html(cr, model, field, skip_inherit=()): _validate_model(model) @@ -1146,50 +1106,11 @@ def _update_field_usage_multi(cr, models, old, new, domain_adapter=None, skip_in # ir.ui.view.custom # adapt the context. The domain will be done by `adapt_domain` eval_context = SelfPrintEvalContext() - def_old = "default_{}".format(old) - def_new = "default_{}".format(new) match = "{0[old]}|{0[def_old]}".format(p) - def adapt_value(key, value): - if key == "orderedBy" and isinstance(value, dict): - # only adapt the "name" key - return {k: (adapt_value(None, v) if k == "name" else v) for k, v in value.items()} - - if not isinstance(value, basestring): - # ignore if not a string - return value - - parts = value.split(":", 1) - if parts[0] != old: - # if not match old, leave it - return value - # change to new, and return it - parts[0] = new - return ":".join(parts) - - def adapt_dict(d): - # adapt (in place) dictionary values - if not isinstance(d, dict): - return - - for key in _CONTEXT_KEYS_TO_CLEAN: - if d.get(key): - d[key] = [adapt_value(key, e) for e in d[key]] - - for vt in ["pivot", "graph", "cohort"]: - key = "{}_measure".format(vt) - if key in d: - d[key] = adapt_value(key, d[key]) - - if vt in d: - adapt_dict(d[vt]) - for _, act in _dashboard_actions(cr, match, *only_models or ()): context = safe_eval(act.get("context", "{}"), eval_context, nocopy=True) - adapt_dict(context) - - if def_old in context: - context[def_new] = context.pop(def_old) + adapt_context(context, old, new) act.set("context", unicode(context)) # domains, related and inhited models diff --git a/src/util/models.py b/src/util/models.py index c94ef7bc2..12893ebcb 100644 --- a/src/util/models.py +++ b/src/util/models.py @@ -9,6 +9,7 @@ import logging import re +from . import spreadsheet from .const import ENVIRON from .fields import IMD_FIELD_PATTERN, remove_field from .helpers import _ir_values_value, _validate_model, model_of_table, table_of_model @@ -214,6 +215,8 @@ def remove_model(cr, model, drop_table=True, ignore_m2m=()): category="Removed Models", ) + spreadsheet.remove_model_in_all_spreadsheets(cr, model) + # compat layer... delete_model = remove_model @@ -344,6 +347,8 @@ def rename_model(cr, old, new, rename_table=True): """.format(col_prefix=col_prefix, old=old.replace(".", r"\."), new=new) ) + spreadsheet.rename_model_in_all_spreadsheets(cr, model, new) + def merge_model(cr, source, target, drop_table=True, fields_mapping=None, ignore_m2m=()): """ diff --git a/src/util/spreadsheet/__init__.py b/src/util/spreadsheet/__init__.py index 8d6dde23b..277e8572f 100644 --- a/src/util/spreadsheet/__init__.py +++ b/src/util/spreadsheet/__init__.py @@ -1 +1,14 @@ +import logging + + from .tokenizer import * +from .parser import * +from .o_spreadsheet import * + + +_logger = logging.getLogger(__name__) + + +from .fields import * +from .models import * +from .misc import * diff --git a/src/util/spreadsheet/data_wrappers.py b/src/util/spreadsheet/data_wrappers.py new file mode 100644 index 000000000..0f5575bea --- /dev/null +++ b/src/util/spreadsheet/data_wrappers.py @@ -0,0 +1,499 @@ +import json +from typing import Iterable, List, Union + +from odoo.upgrade.util.misc import version_gte + +from .o_spreadsheet import load + +"""This file provides (partial) wrappers for reading and writing spreadsheet data +and commands. +The goal is to abstract away the implementation details of the underlying data +structures and how they evolved between versions. +""" + +# TODO segregate the data wrappers by version (and use the right one). +# how should should it be done? +# Maybe they could be integrated in odoo source code? +# It could also be useful to have wrappers for validation in odoo (its currently coupled +# to the json file schema). + + +def create_data_source_from_cmd(cmd): + if cmd["type"] in ["CREATE_CHART", "UPDATE_CHART"]: + return OdooChartCmdV16(cmd) + elif cmd["type"] == "INSERT_PIVOT": + if version_gte("saas~17.1"): + return InsertPivotCmdV171(cmd) + return InsertPivotCmdV16(cmd) + elif cmd["type"] == "RE_INSERT_PIVOT": + return InsertPivotCmdV16(cmd) + elif cmd["type"] == "INSERT_ODOO_LIST": + return InsertListCmdV16(cmd) + elif cmd["type"] == "RE_INSERT_ODOO_LIST": + return ReInsertListCmdV16(cmd) + return cmd + + +class Spreadsheet: + def __init__(self, data: Union[str, dict]): + if isinstance(data, str): + data = json.loads(data) + self.data = load(data) + self.pivotContructor = version_gte("saas~17.2") and PivotV17_2 or PivotV16 + + def __str__(self) -> str: + return self.to_json() + + def __repr__(self) -> str: + return self.to_json() + + def to_json(self) -> str: + return json.dumps(self.data, indent=4) + + @property + def cells(self) -> Iterable[dict]: + for sheet in self.data["sheets"]: + for cell in sheet["cells"].values(): + if cell.get("content"): + yield cell + + @property + def odoo_charts(self): + for sheet in self.data["sheets"]: + for figure in sheet["figures"]: + if _is_odoo_chart(figure): + yield OdooChartV16(dict(figure["data"], id=figure["id"])) + + @property + def pivots(self) -> List[str]: + return [self.pivotContructor(d) for d in self.data.get("pivots", {}).values()] + + @property + def lists(self) -> List[str]: + return [SpreadsheetList(d) for d in self.data.get("lists", {}).values()] + + @property + def global_filters(self) -> List[str]: + return self.data.get("globalFilters", []) + + def delete_lists(self, *list_ids: Iterable[str]): + if "lists" not in self.data: + return + for list_id in list_ids: + del self.data["lists"][list_id] + + def delete_pivots(self, *pivot_ids: Iterable[str]): + if "pivots" not in self.data: + return + for pivot_id in pivot_ids: + del self.data["pivots"][pivot_id] + + def delete_global_filters(self, *filter_ids: Iterable[str]): + if "globalFilters" not in self.data: + return + filter_ids = set(filter_ids) + self.data["globalFilters"] = [filter for filter in self.global_filters if filter["id"] not in filter_ids] + + def delete_figures(self, *figure_ids: Iterable[str]): + figure_ids = set(figure_ids) + for sheet in self.data["sheets"]: + sheet["figures"] = [figure for figure in sheet["figures"] if figure["id"] not in figure_ids] + + def clean_empty_cells(self): + for sheet in self.data["sheets"]: + sheet["cells"] = {xc: cell for xc, cell in sheet["cells"].items() if cell and cell != {"content": ""}} + + +def _is_odoo_chart(figure): + return figure["tag"] == "chart" and figure["data"]["type"].startswith("odoo_") + + +class DataSource: + def __init__(self, definition): + self.definition = definition + + @property + def id(self): + return self.definition["id"] + + @property + def model(self): + return self.definition["model"] + + @model.setter + def model(self, model): + self.definition["model"] = model + + @property + def domain(self): + return self.definition["domain"] + + @domain.setter + def domain(self, domain): + self.definition["domain"] = domain + + @property + def context(self): + return self.definition.get("context") + + @property + def order_by(self): + return self.definition.get("orderBy") + + @order_by.setter + def order_by(self, order_by): + self.definition["orderBy"] = order_by + + @property + def fields_matching(self): + return self.definition.get("fieldMatching", {}) + + +class SpreadsheetList(DataSource): + @property + def fields(self) -> List[str]: + return self.definition["columns"] + + @fields.setter + def fields(self, fields: List[str]): + self.definition["columns"] = fields + + @property + def order_by(self): + if "searchParams" not in self.definition: + return [] + order_bys = self.definition["searchParams"].get("orderBy") + if order_bys: + return [{"field": order_by["name"], "asc": order_by["asc"]} for order_by in order_bys] + return [] + + @order_by.setter + def order_by(self, order_bys): + if not order_bys: + return + self.definition["searchParams"]["orderBy"] = [ + { + "name": order_by["field"], + "asc": order_by["asc"], + } + for order_by in order_bys + ] + + +class InsertListCmdV16: + def __init__(self, cmd): + self.cmd = cmd + self.definition = cmd["definition"] + + @property + def id(self): + return self.cmd["id"] + + @property + def model(self): + return self.definition["metaData"]["resModel"] + + @model.setter + def model(self, model): + self.definition["metaData"]["resModel"] = model + + @property + def domain(self): + return self.definition["searchParams"]["domain"] + + @domain.setter + def domain(self, domain): + self.definition["searchParams"]["domain"] = domain + + @property + def context(self): + return self.definition["searchParams"]["context"] + + @property + def order_by(self): + order_bys = self.definition["searchParams"].get("orderBy") + if order_bys: + return [{"field": order_by["name"], "asc": order_by["asc"]} for order_by in order_bys] + return [] + + @order_by.setter + def order_by(self, order_bys): + self.definition["searchParams"]["orderBy"] = [ + { + "name": order_by["field"], + "asc": order_by["asc"], + } + for order_by in order_bys + ] + + @property + def fields_matching(self): + return {} + + @property + def fields(self) -> List[str]: + return self.definition["metaData"]["columns"] + + @fields.setter + def fields(self, fields: List[str]): + self.definition["metaData"]["columns"] = fields + self.cmd["columns"] = [{"name": field} for field in fields] + + +class ReInsertListCmdV16: + def __init__(self, cmd): + self.cmd = cmd + + @property + def id(self): + return self.cmd["id"] + + @property + def fields(self) -> List[str]: + return [col["name"] for col in self.cmd["columns"]] + + @fields.setter + def fields(self, fields: List[str]): + self.cmd["columns"] = [{"name": field} for field in fields] + + +class PivotV16: + """Wrapper around a pivot data source, hiding + its internal structure for v16""" + + def __init__(self, definition): + self.definition = definition + + @property + def id(self): + return self.definition["id"] + + @property + def model(self): + return self.definition["model"] + + @model.setter + def model(self, model): + self.definition["model"] = model + + @property + def domain(self): + return self.definition["domain"] + + @domain.setter + def domain(self, domain): + self.definition["domain"] = domain + + @property + def context(self): + return self.definition.get("context") + + @property + def order_by(self): + sorted_column = self.definition.get("sortedColumn") + if not sorted_column: + return + return { + "field": sorted_column["measure"], + "asc": sorted_column["order"].lower() == "asc", + } + + @order_by.setter + def order_by(self, order_by): + sorted_column = { + "order": "asc" if order_by["asc"] else "desc", + "measure": order_by["field"], + } + self.definition["sortedColumn"].update(sorted_column) + + @property + def fields_matching(self): + return self.definition.get("fieldMatching", {}) + + @property + def measures(self): + return [m["field"] for m in self.definition.get("measures", [])] + + @measures.setter + def measures(self, measures): + self.definition["measures"] = [{"field": m} for m in measures] + + @property + def row_group_by(self): + return self.definition.get("rowGroupBys", []) + + @row_group_by.setter + def row_group_by(self, group_by): + self.definition["rowGroupBys"] = group_by + + @property + def col_group_by(self): + return self.definition.get("colGroupBys", []) + + @col_group_by.setter + def col_group_by(self, group_by): + self.definition["colGroupBys"] = group_by + +class PivotV17_2(PivotV16): + @property + def measures(self): + return self.definition.get("measures", []) + + @measures.setter + def measures(self, measures): + self.definition["measures"] = measures + +class OdooChartV16: + def __init__(self, definition): + self.definition = definition + + @property + def id(self): + return self.definition["id"] + + @property + def model(self): + return self.definition["metaData"]["resModel"] + + @model.setter + def model(self, model): + self.definition["metaData"]["resModel"] = model + + @property + def domain(self): + return self.definition["searchParams"]["domain"] + + @domain.setter + def domain(self, domain): + self.definition["searchParams"]["domain"] = domain + + @property + def context(self): + return self.definition["searchParams"].get("context", {}) + + @property + def order_by(self): + if self.definition["metaData"].get("order"): + return { + "field": self.measure, + "asc": self.definition["metaData"]["order"].lower() == "asc", + } + + @order_by.setter + def order_by(self, order_by): + self.definition["metaData"]["order"] = "ASC" if order_by["asc"] else "DESC" + + @property + def fields_matching(self): + return self.definition.get("fieldMatching", {}) + + @property + def group_by(self): + return self.definition["metaData"]["groupBy"] + + @group_by.setter + def group_by(self, group_by): + self.definition["metaData"]["groupBy"] = group_by + + @property + def measure(self): + return self.definition["metaData"].get("measure", []) + + @measure.setter + def measure(self, measure): + self.definition["metaData"]["measure"] = measure + + +class OdooChartCmdV16(OdooChartV16): + def __init__(self, cmd): + super().__init__(cmd["definition"]) + self.cmd = cmd + + @property + def id(self): + return self.cmd["id"] + + +class InsertPivotCmdV16: + """Wrapper around an INSERT_PIVOT command, hiding + the internal structure of the command.""" + + def __init__(self, cmd): + self.cmd = cmd + self.definition = cmd["definition"] + + @property + def id(self): + return self.cmd["id"] + + @property + def model(self): + return self.definition["metaData"]["resModel"] + + @model.setter + def model(self, model): + self.definition["metaData"]["resModel"] = model + + @property + def domain(self): + return self.definition["searchParams"]["domain"] + + @domain.setter + def domain(self, domain): + self.definition["searchParams"]["domain"] = domain + + @property + def context(self): + return self.definition["searchParams"]["context"] + + @property + def order_by(self): + sorted_column = self.definition["metaData"].get("sortedColumn") + if not sorted_column: + return + return { + "field": sorted_column["measure"], + "asc": sorted_column["order"].lower() == "asc", + } + + @order_by.setter + def order_by(self, order_by): + sorted_column = { + "order": "asc" if order_by["asc"] else "desc", + "measure": order_by["field"], + } + self.definition["metaData"]["sortedColumn"].update(sorted_column) + + @property + def fields_matching(self): + return {} + + @property + def measures(self): + return self.definition["metaData"]["activeMeasures"] + + @measures.setter + def measures(self, measures): + self.definition["metaData"]["activeMeasures"] = measures + + @property + def row_group_by(self): + return self.definition["metaData"]["rowGroupBys"] + + @row_group_by.setter + def row_group_by(self, group_by): + self.definition["metaData"]["rowGroupBys"] = group_by + + @property + def col_group_by(self): + return self.definition["metaData"]["colGroupBys"] + + @col_group_by.setter + def col_group_by(self, group_by): + self.definition["metaData"]["colGroupBys"] = group_by + + +class InsertPivotCmdV171(InsertPivotCmdV16): + @property + def id(self): + return self.cmd["pivotId"] diff --git a/src/util/spreadsheet/fields.py b/src/util/spreadsheet/fields.py new file mode 100644 index 000000000..2c814f47f --- /dev/null +++ b/src/util/spreadsheet/fields.py @@ -0,0 +1,460 @@ +import re +import json + +from typing import Iterable + +from itertools import chain + + +from .data_wrappers import Spreadsheet, create_data_source_from_cmd +from .misc import ( + apply_in_all_spreadsheets, + adapt_view_link_cells, + remove_data_source_function, + remove_lists, + remove_pivots, + remove_odoo_charts, + transform_data_source_functions, +) + +from .revisions import CommandAdapter, transform_revisions_data + +from odoo.osv import expression + +from odoo.upgrade.util.context import adapt_context, clean_context +from odoo.upgrade.util.domains import _adapt_one_domain + + +# stolen from util.fields:def remove_fields +def remove_adapter(leaf, is_or, negated): + # replace by TRUE_LEAF, unless negated or in a OR operation but not negated + if is_or ^ negated: + return [expression.FALSE_LEAF] + return [expression.TRUE_LEAF] + + +def rename_field_in_all_spreadsheets(cr, model, old_value, new_value): + apply_in_all_spreadsheets( + cr, + old_value, + (lambda data, revisions_data: rename_field(cr, model, old_value, new_value, data, revisions_data)), + ) + + +def rename_field(cr, model, old, new, data, revisions=()): + spreadsheet = Spreadsheet(data) + adapters = _rename_field_in_list(cr, spreadsheet, model, old, new) + # adapters += _rename_field_in_pivot(cr, spreadsheet, model, old, new) + # adapters += _rename_field_in_chart(cr, spreadsheet, model, old, new) + # adapters += _rename_field_in_filters(cr, spreadsheet, model, old, new) + # adapters += _rename_field_in_view_link(cr, spreadsheet, model, old, new) + return spreadsheet.data, transform_revisions_data(revisions, *adapters) + + +def remove_field_in_all_spreadsheets(cr, model, field): + apply_in_all_spreadsheets( + cr, field, (lambda data, revisions_data: remove_field(cr, model, field, data, revisions_data)) + ) + + +def remove_field(cr, model, field, data, revisions=()): + spreadsheet = Spreadsheet(data) + _remove_field_from_filter_matching(cr, spreadsheet, model, field) + adapters = _remove_field_from_list(cr, spreadsheet, model, field) + adapters += _remove_field_from_pivot(cr, spreadsheet, model, field) + adapters += _remove_field_from_graph(cr, spreadsheet, model, field) + adapters += _remove_field_from_view_link(cr, spreadsheet, model, field) + spreadsheet.clean_empty_cells() + return spreadsheet.data, transform_revisions_data(revisions, *adapters) + + +def _rename_function_fields(content, data_source_ids, functions, old, new): + def adapter(fun_call): + for arg in fun_call.args[1:]: + if arg.value == old: + arg.value = new + return fun_call + + return transform_data_source_functions(content, data_source_ids, functions, adapter) + + +def _rename_field_in_chain(cr, model, field_model, field_chain, old, new) -> str: + """Model on which the field chain refers to.""" + domain = [(field_chain, "=", 1)] + domain = _adapt_one_domain(cr, field_model, old, new, model, domain) + if domain is None: + return field_chain + return domain[0][0] + + +def _rename_field_in_list(cr, spreadsheet: Spreadsheet, model, old, new): + list_ids = set() + + def rename(olist): + _rename_data_source_field(cr, olist, model, old, new) + if olist.model == model: + list_ids.add(olist.id) + olist.fields = _rename_fields(old, new, olist.fields) + + for olist in spreadsheet.lists: + rename(olist) + + for cell in spreadsheet.cells: + cell["content"] = _rename_function_fields( + cell["content"], list_ids, {"ODOO.LIST", "ODOO.LIST.HEADER"}, old, new + ) + + list_models = {olist.id: olist.model for olist in spreadsheet.lists} + + def collect_list(cmd): + olist = create_data_source_from_cmd(cmd) + list_models[olist.id] = olist.model + + def rename_re_insert(cmd): + olist = create_data_source_from_cmd(cmd) + if list_models[olist.id] == model: + olist.fields = _rename_fields(old, new, olist.fields) + + return ( + CommandAdapter("INSERT_ODOO_LIST", collect_list), + CommandAdapter("INSERT_ODOO_LIST", lambda cmd: rename(create_data_source_from_cmd(cmd))), + CommandAdapter("RE_INSERT_ODOO_LIST", rename_re_insert), + CommandAdapter( + "UPDATE_CELL", + lambda cmd: dict( + cmd, + content=_rename_function_fields( + cmd.get("content"), list_ids, {"ODOO.LIST", "ODOO.LIST.HEADER"}, old, new + ), + ), + ), + ) + + +def _rename_fields(old: str, new: str, fields: Iterable[str]) -> Iterable[str]: + renamed = [] + for field in fields: + if ":" in field: + field, aggregate_operator = field.split(":") + if field == old: + renamed.append(new + ":" + aggregate_operator) + else: + renamed.append(field) + elif field == old: + renamed.append(new) + else: + renamed.append(field) + return renamed + + +def _rename_field_in_pivot(cr, spreadsheet: Spreadsheet, model, old, new): + pivot_ids = set() + + def rename(pivot): + _rename_data_source_field(cr, pivot, model, old, new) + if pivot.model == model: + pivot_ids.add(pivot.id) + pivot.col_group_by = _rename_fields(old, new, pivot.col_group_by) + pivot.row_group_by = _rename_fields(old, new, pivot.row_group_by) + pivot.measures = _rename_fields(old, new, pivot.measures) + + for pivot in spreadsheet.pivots: + rename(pivot) + + for cell in spreadsheet.cells: + cell["content"] = _rename_function_fields( + cell["content"], pivot_ids, {"ODOO.PIVOT", "ODOO.PIVOT.HEADER"}, old, new + ) + + def adapt_insert(cmd): + pivot = create_data_source_from_cmd(cmd) + rename(pivot) + adapt_pivot_table(cmd) + + def adapt_pivot_table(cmd): + table = cmd["table"] + for row in table["cols"]: + for cell in row: + cell["fields"] = _rename_fields(old, new, cell["fields"]) + # value can be the name of the measure (a field name) + cell["values"] = _rename_fields(old, new, cell["values"]) + for row in table["rows"]: + row["fields"] = _rename_fields(old, new, row["fields"]) + row["values"] = _rename_fields(old, new, row["values"]) + cmd["table"]["measures"] = _rename_fields(old, new, table["measures"]) + + return ( + CommandAdapter("INSERT_PIVOT", adapt_insert), + CommandAdapter("RE_INSERT_PIVOT", adapt_pivot_table), + CommandAdapter( + "UPDATE_CELL", + lambda cmd: dict( + cmd, + content=_rename_function_fields( + cmd.get("content"), pivot_ids, {"ODOO.PIVOT", "ODOO.PIVOT.HEADER"}, old, new + ), + ), + ), + ) + + +def _rename_data_source_field(cr, data_source, model, old, new): + data_source.domain = ( + _adapt_one_domain(cr, model, old, new, data_source.model, data_source.domain) or data_source.domain + ) + for measure in data_source.fields_matching.values(): + measure["chain"] = _rename_field_in_chain(cr, data_source.model, model, measure["chain"], old, new) + if data_source.model == model: + adapt_context(data_source.context, old, new) + if data_source.order_by: + data_source.order_by = _rename_order_by(data_source.order_by, old, new) + + +def _remove_data_source_field(cr, data_source, model, field): + if data_source.model == model: + data_source.domain = ( + _adapt_one_domain( + cr, model, field, "ignored", data_source.model, data_source.domain, remove_adapter, force_adapt=True + ) + or data_source.domain + ) + + adapt_context(data_source.context, field, "ignored") + if data_source.order_by: + data_source.order_by = _remove_order_by(data_source.order_by, field) + + +def _remove_order_by(order_by, field): + if isinstance(order_by, list): + return [order for order in order_by if order["field"] != field] + if order_by and order_by["field"] == field: + return None + return order_by + +def _rename_order_by(order_by, old, new): + if isinstance(order_by, list): + return [_rename_order_by(order, old, new) for order in order_by] + if order_by and order_by["field"] == old: + order_by["field"] = new + return order_by + + +def _rename_field_in_chart(cr, spreadsheet: Spreadsheet, model, old, new): + def rename(chart): + _rename_data_source_field(cr, chart, model, old, new) + if chart.model == model: + if chart.measure == old: + chart.measure = new + chart.group_by = _rename_fields(old, new, chart.group_by) + return chart + + for chart in spreadsheet.odoo_charts: + rename(chart) + + def adapt_create_chart(cmd): + if cmd["definition"]["type"].startswith("odoo_"): + chart = create_data_source_from_cmd(cmd) + rename(chart) + + return (CommandAdapter("CREATE_CHART", adapt_create_chart),) + + +def _rename_field_in_filters(cr, spreadsheet: Spreadsheet, model, old, new): + pivot_models = {pivot.id: pivot.model for pivot in spreadsheet.pivots} + list_models = {olist.id: olist.model for olist in spreadsheet.lists} + chart_models = {chart.id: chart.model for chart in spreadsheet.odoo_charts} + + def adapt_filter(cmd): + for pivot_id, field in cmd["pivot"].items(): + pivot_model = pivot_models[pivot_id] + field["chain"] = _rename_field_in_chain(cr, pivot_model, model, field["chain"], old, new) + for list_id, field in cmd["list"].items(): + list_model = list_models[list_id] + field["chain"] = _rename_field_in_chain(cr, list_model, model, field["chain"], old, new) + for chart_id, field in cmd["chart"].items(): + chart_model = chart_models[chart_id] + field["chain"] = _rename_field_in_chain(cr, chart_model, model, field["chain"], old, new) + + def collect_pivot(cmd): + pivot = create_data_source_from_cmd(cmd) + pivot_models[pivot.id] = pivot.model + + def collect_list(cmd): + olist = create_data_source_from_cmd(cmd) + list_models[olist.id] = olist.model + + def collect_charts(cmd): + if cmd["definition"]["type"].startswith("odoo_"): + chart = create_data_source_from_cmd(cmd) + chart_models[chart.id] = chart.model + + return ( + CommandAdapter("INSERT_PIVOT", collect_pivot), + CommandAdapter("INSERT_ODOO_LIST", collect_list), + CommandAdapter("CREATE_CHART", collect_charts), + CommandAdapter("ADD_GLOBAL_FILTER", adapt_filter), + CommandAdapter("EDIT_GLOBAL_FILTER", adapt_filter), + ) + + +def match_markdown_link(content): + return re.match(r"\[.*\]\(odoo://view/(.*)\)", content) + + +def _rename_field_in_view_link(cr, spreadsheet: Spreadsheet, model, old, new): + def adapt_view_link(action): + if action["modelName"] != model: + return + domain = _adapt_one_domain(cr, model, old, new, action["modelName"], action["domain"]) + if domain: + if isinstance(action["domain"], str): + domain = str(domain) + action["domain"] = domain + adapt_context(action["context"], old, new) + + return adapt_view_link_cells(spreadsheet, adapt_view_link) + + +## Removal + + +def _remove_list_functions(content, list_ids, field): + """Remove functions such as ODOO.LIST(1, 'field') or ODOO.LIST.HEADER(1, 'field')""" + + def filter_func(func_call_ast): + return any(arg.value == field for arg in func_call_ast.args[1:]) + + return remove_data_source_function(content, list_ids, {"ODOO.LIST", "ODOO.LIST.HEADER"}, filter_func) + + +def _remove_field_from_list(cr, spreadsheet: Spreadsheet, model, field): + def _remove_field(olist): + _remove_data_source_field(cr, olist, model, field) + if olist.model == model: + olist.fields = [column for column in olist.fields if column != field] + + for olist in spreadsheet.lists: + _remove_field(olist) + + def adapt_insert(cmd): + olist = create_data_source_from_cmd(cmd) + _remove_field(olist) + + # collect all list models inserted by INSERT_ODOO_LIST + # because we need the models to adapt RE_INSERT_ODOO_LIST + list_models = {olist.id: olist.model for olist in spreadsheet.lists} + + def collect_list(cmd): + olist = create_data_source_from_cmd(cmd) + list_models[olist.id] = olist.model + + def adapt_re_insert(cmd): + olist = create_data_source_from_cmd(cmd) + if list_models[olist.id] == model: + _remove_field(olist) + + return ( + CommandAdapter("INSERT_ODOO_LIST", collect_list), + CommandAdapter("INSERT_ODOO_LIST", adapt_insert), + CommandAdapter("RE_INSERT_ODOO_LIST", adapt_re_insert), + ) + + +def _remove_field_from_pivot(cr, spreadsheet: Spreadsheet, model, field): + def _remove_field(pivot): + _remove_data_source_field(cr, pivot, model, field) + if pivot.model == model: + pivot.col_group_by = [f for f in pivot.col_group_by if f != field] + pivot.row_group_by = [f for f in pivot.row_group_by if f != field] + pivot.measures = [f for f in pivot.measures if f != field] + + for pivot in spreadsheet.pivots: + _remove_field(pivot) + + def adapt_insert(cmd): + pivot = create_data_source_from_cmd(cmd) + _remove_field(pivot) + + return (CommandAdapter("INSERT_PIVOT", adapt_insert),) + + +def _remove_field_from_graph(cr, spreadsheet: Spreadsheet, model, field): + def _remove_field(chart): + _remove_data_source_field(cr, chart, model, field) + if chart.model == model: + chart.measure = chart.measure if chart.measure != field else None + + for chart in spreadsheet.odoo_charts: + _remove_field(chart) + + def adapt_create_chart(cmd): + if cmd["definition"]["type"].startswith("odoo_"): + chart = create_data_source_from_cmd(cmd) + _remove_field(chart) + + return (CommandAdapter("CREATE_CHART", adapt_create_chart),) + + +def _remove_field_from_view_link(cr, spreadsheet: Spreadsheet, model, field): + def adapt_view_link(action): + if action["modelName"] == model: + clean_context(action["context"], field) + action["domain"] = _adapt_one_domain( + cr, model, field, "ignored", model, action["domain"], remove_adapter, force_adapt=True + ) + + return adapt_view_link_cells(spreadsheet, adapt_view_link) + + +def _remove_field_from_filter_matching(cr, spreadsheet: Spreadsheet, model, field): + data_sources = chain(spreadsheet.lists, spreadsheet.pivots, spreadsheet.odoo_charts) + for data_source in data_sources: + matching_to_delete = [] + for filter_id, measure in data_source.fields_matching.items(): + if _is_field_in_chain(cr, model, field, data_source.model, measure["chain"]): + matching_to_delete.append(filter_id) + for filter_id in matching_to_delete: + del data_source.fields_matching[filter_id] + + +def _is_field_in_chain(cr, field_model, field, data_source_model, field_chain): + def adapter(*args, **kwargs): + return expression.FALSE_DOMAIN + + domain = [(field_chain, "=", 1)] + domain = _adapt_one_domain(cr, field_model, field, "ignored", data_source_model, domain, adapter=adapter) + return domain == expression.FALSE_DOMAIN + + +def domain_fields(domain): + """return all field names used in the domain + >>> domain_fields([['field1', '=', 1], ['field2', '=', 2]]) + ['field1', 'field2'] + """ + return [leaf[0] for leaf in domain if len(leaf) == 3] + + +def pivot_measure_fields(pivot): + return [measure for measure in pivot.measures if measure != "__count"] + + +def pivot_fields(pivot): + """return all field names used in a pivot definition""" + fields = set(pivot.col_group_by + pivot.row_group_by + pivot_measure_fields(pivot) + domain_fields(pivot.domain)) + measure = pivot.order_by and pivot.order_by["field"] + if measure and measure != "__count": + fields.add(measure) + return fields + + +def chart_fields(chart): + """return all field names used in a chart definitions""" + fields = set(chart.group_by + domain_fields(chart.domain)) + measure = chart.measure + if measure != "__count": + fields.add(measure) + return fields + + +def list_order_fields(list_definition): + return [order["field"] for order in list_definition.order_by] diff --git a/src/util/spreadsheet/misc.py b/src/util/spreadsheet/misc.py new file mode 100644 index 000000000..22211db7c --- /dev/null +++ b/src/util/spreadsheet/misc.py @@ -0,0 +1,374 @@ + +import re +import json +import logging + +from typing import Union, Callable, Dict, List + +from itertools import chain + + +from .data_wrappers import Spreadsheet, create_data_source_from_cmd +from .parser import ast_to_string, transform_ast_nodes, parse, Literal + +from .o_spreadsheet import load +from .revisions import CommandAdapter, Drop +from odoo.addons.base.maintenance.migrations import util + +_logger = logging.getLogger(__name__) + + +def read_spreadsheet_attachments(cr, like_pattern=""): + yield from read_spreadsheet_initial_data(cr, like_pattern) + yield from read_spreadsheet_snapshots(cr, like_pattern) + + +def read_spreadsheet_snapshots(cr, like_pattern=""): + cr.execute( + """ + SELECT id, res_model, res_id, db_datas + FROM ir_attachment + WHERE res_model IN ('spreadsheet.dashboard', 'documents.document') + AND res_field = 'spreadsheet_snapshot' + AND position(%s::bytea in db_datas) > 0 + """, + [like_pattern], + ) + # TODO rename 'like_pattern', it's not LIKE because LIKE doesn't work because the field is of type bytea + for attachment_id, res_model, res_id, db_datas in cr.fetchall(): + if db_datas: + yield attachment_id, res_model, res_id, json.loads(db_datas.tobytes()) + + +def read_spreadsheet_initial_data(cr, like_pattern=""): + if util.table_exists(cr, "documents_document"): + cr.execute( + """ + SELECT doc.id AS document_id, a.id AS attachment_id, a.db_datas + FROM documents_document doc + LEFT JOIN ir_attachment a ON a.id = doc.attachment_id + WHERE doc.handler='spreadsheet' + AND position(%s::bytea in db_datas) > 0 + """, + [like_pattern], + ) + # TODO there are excel files in there! + for document_id, attachment_id, db_datas in cr.fetchall(): + if db_datas: + yield attachment_id, "documents.document", document_id, json.loads(db_datas.tobytes()) + + if util.table_exists(cr, "spreadsheet_dashboard"): + data_field = _magic_spreadsheet_field(cr) #"spreadsheet_binary_data" if version_gte("saas~16.3") else "data" + cr.execute( + """ + SELECT id, res_model, res_id, db_datas + FROM ir_attachment + WHERE res_model = 'spreadsheet.dashboard' + AND res_field = %s + AND position(%s::bytea in db_datas) > 0 + """, + [data_field, like_pattern], + ) + for attachment_id, res_model, res_id, db_datas in cr.fetchall(): + if db_datas: + yield attachment_id, res_model, res_id, json.loads(db_datas.tobytes()) + +def _magic_spreadsheet_field(cr): + cr.execute( + """ + SELECT count(1) + FROM ir_model_fields + WHERE model='spreadsheet.mixin' + AND name='spreadsheet_binary_data'; + """ + ) + return cr.fetchone()[0] and 'spreadsheet_binary_data' or 'data' + +def apply_in_all_spreadsheets(cr, like_pattern, callback): + # print("like pattern : ", like_pattern) + b = False + # upgrade the initial data and all revisions based on it + for attachment_id, res_model, res_id, db_datas in read_spreadsheet_initial_data(cr, like_pattern): + print("attachment data id: ", attachment_id) + print("datas: ", len(db_datas)) + b = True + + data, _ = callback(db_datas, []) + write_attachment(cr, attachment_id, data) + + ## FIXME TODORAR batch the calls + ## FIXME we have to pass in the revisions regardless of the base data stuff + # upgrade revisions + # regardless of res_model res_id + revisions_data = [] + revisions_ids = [] + + for revision_id, commands in get_revisions(cr, "res_model", "res_id", like_pattern): + revisions_data.append(json.loads(commands)) + revisions_ids.append(revision_id) + _, revisions = callback({}, revisions_data) + for rev_id, revision in zip(revisions_ids, revisions): + cr.execute( + """ + UPDATE spreadsheet_revision + SET commands=%s + WHERE id=%s + """, + [json.dumps(revision), rev_id], + ) + if b: + _logger.info("upgrading initial data and revisions") + + b = False + # upgrade snapshots + for attachment_id, _res_model, _res_id, db_datas in read_spreadsheet_snapshots(cr, like_pattern): + print("attachment snapshot id: ", attachment_id) + + b=True + data, revisions = callback(db_datas, []) + write_attachment(cr, attachment_id, data) + + if b: + _logger.info("upgrading snapshots") + + + +def write_attachment(cr, attachment_id, data): + _logger.info("replacing attachment %s", attachment_id) + cr.execute( + """ + UPDATE ir_attachment + SET db_datas=%s + WHERE id=%s + """, + [json.dumps(data).encode(), attachment_id] + ) + + +def get_revisions(cr, res_model, res_id, like_pattern): + if util.version_gte("16.0"): + cr.execute( + """ + SELECT id, commands + FROM spreadsheet_revision + WHERE commands LIKE %s + """, + ['%' + like_pattern + '%'], + ) + else: + cr.execute( + """ + SELECT id, commands + FROM spreadsheet_revision + WHERE commands LIKE %s + """, + [like_pattern], + ) + return cr.fetchall() + +def upgrade_data(cr, upgrade_callback): + for attachment_id, _res_model, _res_id, data in read_spreadsheet_attachments(cr): + upgraded_data = upgrade_callback(load(data)) + cr.execute( + """ + UPDATE ir_attachment + SET db_datas=%s + WHERE id=%s + """, + [json.dumps(upgraded_data).encode(), attachment_id], + ) + _logger.info("spreadsheet json data upgraded") + + +def transform_data_source_functions(content, data_source_ids, functions, adapter): + """Transforms data source function calls within content. + The 'adapter' function is called with each function call AST node matching + the function name and any data source. + """ + if not content or not content.startswith("=") or not data_source_ids: + return content + if not any(fn.upper() in content.upper() for fn in functions): + return content + try: + ast = parse(content) + except ValueError: + return content + + data_source_ids = [str(did) for did in data_source_ids] + def _adapter(fun_call): + # call the provided adapter only if the function name + # and data source matches + if fun_call.value.upper() in functions and len(fun_call.args) > 0: + data_source_id = fun_call.args[0].value + if str(data_source_id) in data_source_ids: + return adapter(fun_call) + return fun_call + + ast = transform_ast_nodes(ast, "FUNCALL", _adapter) + return f"={ast_to_string(ast)}" + + + +def adapt_view_link_cells(spreadsheet: Spreadsheet, adapter: Callable[[str], Union[str, None]]): + def adapt_view_link(content): + """A view link is formatted as a markdown link + [text](odoo://view/) + """ + match = re.match(r"^\[([^\[]+)\]\(odoo://view/(.+)\)$", content) + if not match: + return content + label = match.group(1) + view_description = json.loads(match.group(2)) + result = adapter(view_description["action"]) + if result == Drop: + return "" + return f"[{label}](odoo://view/{json.dumps(view_description)})" + + for cell in spreadsheet.cells: + cell["content"] = adapt_view_link(cell["content"]) + return (CommandAdapter("UPDATE_CELL", lambda cmd: dict(cmd, content=adapt_view_link(cmd.get("content")))),) + + + +def remove_pivots(spreadsheet: Spreadsheet, pivot_ids: List[str], insert_cmd_predicate: Callable[[Dict], bool]): + spreadsheet.delete_pivots(*pivot_ids) + + for cell in spreadsheet.cells: + cell["content"] = remove_data_source_function(cell["content"], pivot_ids, ["ODOO.PIVOT", "ODOO.PIVOT.HEADER"]) + + def adapt_insert(cmd): + pivot = create_data_source_from_cmd(cmd) + if insert_cmd_predicate(pivot): + pivot_ids.append(pivot.id) + return Drop + return cmd + + def adapt_cmd_with_pivotId(cmd): + pivot_id = cmd["pivotId"] + if str(pivot_id) in pivot_ids: + return Drop + return cmd + + def adapt_re_insert(cmd): + pivot_id = cmd["id"] + if str(pivot_id) in pivot_ids: + return Drop + return cmd + + def adapt_global_filters(cmd): + if cmd.get("pivot"): + for pivot_id in pivot_ids: + cmd["pivot"].pop(str(pivot_id), None) + return cmd + + def adapt_update_cell(cmd): + content = remove_data_source_function(cmd.get("content"), pivot_ids, ["ODOO.PIVOT", "ODOO.PIVOT.HEADER"]) + if not content: + return Drop + cmd["content"] = content + + return ( + CommandAdapter("INSERT_PIVOT", adapt_insert), + CommandAdapter("RE_INSERT_PIVOT", adapt_re_insert), + CommandAdapter("UPDATE_ODOO_PIVOT_DOMAIN", adapt_cmd_with_pivotId), + CommandAdapter("RENAME_ODOO_PIVOT", adapt_cmd_with_pivotId), + CommandAdapter("REMOVE_PIVOT", adapt_cmd_with_pivotId), + CommandAdapter("ADD_GLOBAL_FILTER", adapt_global_filters), + CommandAdapter("EDIT_GLOBAL_FILTER", adapt_global_filters), + CommandAdapter("UPDATE_CELL", adapt_update_cell), + ) + + + +def remove_lists(spreadsheet: Spreadsheet, list_ids: List[str], insert_cmd_predicate: Callable[[Dict], bool]): + spreadsheet.delete_lists(*list_ids) + for cell in spreadsheet.cells: + cell["content"] = remove_data_source_function(cell["content"], list_ids, ["ODOO.LIST", "ODOO.LIST.HEADER"]) + + def adapt_insert(cmd): + list = create_data_source_from_cmd(cmd) + if insert_cmd_predicate(list): + list_ids.append(list.id) + return Drop + return cmd + + def adapt_re_insert(cmd): + list_id = cmd["id"] + if list_id in list_ids: + return Drop + return cmd + + def adapt_cmd_with_listId(cmd): + list_id = cmd["listId"] + if list_id in list_ids: + return Drop + return cmd + + def adapt_global_filters(cmd): + if cmd.get("list"): + for list_id in list_ids: + cmd["list"].pop(list_id, None) + if not cmd["list"]: + del cmd["list"] + return cmd + + def adapt_update_cell(cmd): + content = remove_data_source_function(cmd.get("content"), list_ids, ["ODOO.LIST", "ODOO.LIST.HEADER"]) + if not content: + return Drop + cmd["content"] = content + + return ( + CommandAdapter("INSERT_ODOO_LIST", adapt_insert), + CommandAdapter("RE_INSERT_ODOO_LIST", adapt_re_insert), + CommandAdapter("RENAME_ODOO_LIST", adapt_cmd_with_listId), + CommandAdapter("UPDATE_ODOO_LIST_DOMAIN", adapt_cmd_with_listId), + CommandAdapter("REMOVE_ODOO_LIST", adapt_cmd_with_listId), + CommandAdapter("ADD_GLOBAL_FILTER", adapt_global_filters), + CommandAdapter("EDIT_GLOBAL_FILTER", adapt_global_filters), + CommandAdapter("UPDATE_CELL", adapt_update_cell), + ) + +def remove_odoo_charts(spreadsheet: Spreadsheet, chart_ids: List[str], insert_cmd_predicate: Callable[[Dict], bool]): + spreadsheet.delete_figures(*chart_ids) + + def adapt_create_chart(cmd): + chart = create_data_source_from_cmd(cmd) + if cmd["definition"]["type"].startswith("odoo_") and insert_cmd_predicate(chart): + chart_ids.append(cmd["id"]) + return Drop + + def adapt_chart_cmd_with_id(cmd): + + if cmd["id"] in chart_ids: + return Drop + + def adapt_global_filters(cmd): + if cmd.get("chart"): + for chart_id in chart_ids: + cmd["chart"].pop(chart_id, None) + return ( + CommandAdapter("CREATE_CHART", adapt_create_chart), + CommandAdapter("UPDATE_CHART", adapt_chart_cmd_with_id), + CommandAdapter("DELETE_FIGURE", adapt_chart_cmd_with_id), + CommandAdapter("ADD_GLOBAL_FILTER", adapt_global_filters), + CommandAdapter("EDIT_GLOBAL_FILTER", adapt_global_filters), + ) + + +def remove_data_source_function(content, data_source_ids, functions, filter_ast=lambda ast: True): + """check if the cell content contains a function that references a data source + >>> remove_data_source_function('=ODOO.PIVOT(1, "revenue")', [1], {"ODOO.PIVOT"}) + '' + >>> remove_data_source_function('=ODOO.PIVOT(2, "revenue")', [1], {"ODOO.PIVOT"}) + '=ODOO.PIVOT(2, "revenue")'. + """ + + def adapter(fun_call): + if filter_ast(fun_call): + # remove the func call and set something else instead + return Literal("BOOLEAN", False) + return fun_call + + new_content = transform_data_source_functions(content, data_source_ids, functions, adapter) + return content if new_content == content else "" diff --git a/src/util/spreadsheet/models.py b/src/util/spreadsheet/models.py new file mode 100644 index 000000000..a2b69fd5f --- /dev/null +++ b/src/util/spreadsheet/models.py @@ -0,0 +1,160 @@ + +from .data_wrappers import Spreadsheet, create_data_source_from_cmd +from .misc import apply_in_all_spreadsheets, adapt_view_link_cells, remove_lists, remove_pivots, remove_odoo_charts +from .revisions import CommandAdapter, Drop, transform_revisions_data, transform_commands + + +def rename_model_in_all_spreadsheets(cr, old_value, new_value): + apply_in_all_spreadsheets(cr, old_value, (lambda data, revisions_data: rename_model(old_value, new_value, data, revisions_data))) + +# TODO remove cr argument +def rename_model(old, new, data, revisions = ()): + spreadsheet = Spreadsheet(data) + adapters = _rename_model_in_list(spreadsheet, old, new) + adapters += _rename_model_in_pivot(spreadsheet, old, new) + adapters += _rename_model_in_filters(spreadsheet, old, new) + adapters += _rename_model_in_charts(spreadsheet, old, new) + adapters += _rename_model_in_view_link(spreadsheet, old, new) + return spreadsheet.data, transform_revisions_data(revisions, *adapters) + +def remove_model_in_all_spreadsheets(cr, model): + apply_in_all_spreadsheets(cr, model, (lambda data, revisions_data: remove_model(model, data, revisions_data))) + +def remove_model(model: str, data, revisions = ()) -> str: + spreadsheet = Spreadsheet(data) + adapters = _remove_model_from_lists(model, spreadsheet) + adapters += _remove_model_from_pivots(model, spreadsheet) + adapters += _remove_model_from_charts(model, spreadsheet) + adapters += _remove_model_from_filters(model, spreadsheet) + adapters += _remove_model_from_view_link(model, spreadsheet) + spreadsheet.clean_empty_cells() + return spreadsheet.data, transform_revisions_data(revisions, *adapters) + +def _rename_model_in_charts(spreadsheet: Spreadsheet, old, new): + for chart in spreadsheet.odoo_charts: + if chart.model == old: + chart.model = new + + def adapt_insert(cmd): + if cmd["definition"]["type"].startswith("odoo_"): + chart = create_data_source_from_cmd(cmd) + if chart.model == old: + chart.model = new + + return (CommandAdapter("CREATE_CHART", adapt_insert),) + + +def _rename_model_in_list(spreadsheet: Spreadsheet, old, new): + for olist in spreadsheet.lists: + if olist.model == old: + olist.model = new + + def adapt_insert(cmd): + olist = create_data_source_from_cmd(cmd) + if olist.model == old: + olist.model = new + + return (CommandAdapter("INSERT_ODOO_LIST", adapt_insert),) + + +def _rename_model_in_pivot(spreadsheet: Spreadsheet, old, new): + for pivot in spreadsheet.pivots: + if pivot.model == old: + pivot.model = new + + def adapt_insert(cmd): + pivot = create_data_source_from_cmd(cmd) + if pivot.model == old: + pivot.model = new + + return (CommandAdapter("INSERT_PIVOT", adapt_insert),) + + +def _rename_model_in_filters(spreadsheet: Spreadsheet, old, new): + def rename_relational_filter(gfilter): + if gfilter["type"] == "relation" and gfilter["modelName"] == old: + gfilter["modelName"] = new + + for gfilter in spreadsheet.global_filters: + rename_relational_filter(gfilter) + + def adapt_insert(cmd): + rename_relational_filter(cmd["filter"]) + + return ( + CommandAdapter("ADD_GLOBAL_FILTER", adapt_insert), + CommandAdapter("EDIT_GLOBAL_FILTER", adapt_insert), + ) + + +def _rename_model_in_view_link(spreadsheet: Spreadsheet, old, new): + def adapt_view_link(action): + if action["modelName"] == old: + action["modelName"] = new + + return adapt_view_link_cells(spreadsheet, adapt_view_link) + + +def _remove_model_from_lists(model, spreadsheet: Spreadsheet): + lists_to_delete = [list.id for list in spreadsheet.lists if list.model == model] + return remove_lists( + spreadsheet, + lists_to_delete, + lambda list: list.model == model, + ) + + +def _remove_model_from_pivots(model, spreadsheet: Spreadsheet): + pivots_to_delete = [pivot.id for pivot in spreadsheet.pivots if pivot.model == model] + return remove_pivots( + spreadsheet, + pivots_to_delete, + lambda pivot: pivot.model == model, + ) + + +def _remove_model_from_charts(model, spreadsheet: Spreadsheet): + chart_to_delete = [chart.id for chart in spreadsheet.odoo_charts if chart.model == model] + return remove_odoo_charts( + spreadsheet, + chart_to_delete, + lambda chart: chart.model == model, + ) + + +def _remove_model_from_filters(model, spreadsheet: Spreadsheet): + global_filters = spreadsheet.global_filters + to_delete = [ + gFilter["id"] for gFilter in global_filters if gFilter["type"] == "relation" and gFilter["modelName"] == model + ] + spreadsheet.delete_global_filters(*to_delete) + + def adapt_edit_filter(cmd): + if cmd["filter"]["id"] in to_delete: + return Drop + return cmd + + def adapt_add_filter(cmd): + if cmd["filter"]["type"] == "relation" and cmd["filter"]["modelName"] == model: + to_delete.append(cmd["filter"]["id"]) + return Drop + return cmd + + def adapt_remove_filter(cmd): + if cmd["id"] in to_delete: + return Drop + return cmd + + return ( + CommandAdapter("ADD_GLOBAL_FILTER", adapt_add_filter), + CommandAdapter("EDIT_GLOBAL_FILTER", adapt_edit_filter), + CommandAdapter("REMOVE_GLOBAL_FILTER", adapt_remove_filter), + ) + + +def _remove_model_from_view_link(model, spreadsheet: Spreadsheet): + def adapt_view_link(action): + if action["modelName"] == model: + return Drop + + return adapt_view_link_cells(spreadsheet, adapt_view_link) diff --git a/src/util/spreadsheet/o_spreadsheet.py b/src/util/spreadsheet/o_spreadsheet.py new file mode 100644 index 000000000..342efc936 --- /dev/null +++ b/src/util/spreadsheet/o_spreadsheet.py @@ -0,0 +1,245 @@ +import math +import re + + +INITIAL_SHEET_ID = "Sheet1" +DEFAULT_REVISION_ID = "START_REVISION" + + +def load(data=None) -> dict: + """ + Load spreadsheet data and try to fix missing fields/corrupted state by providing + sensible default values + """ + + if not data: + return _create_empty_workbook() + data = {**_create_empty_workbook(), **data} + data["sheets"] = [ + {**_create_empty_sheet("Sheet{}".format(i + 1), "Sheet{}".format(i + 1)), **sheet} + for i, sheet in enumerate(data["sheets"]) + ] + if not data["sheets"]: + data["sheets"].append(_create_empty_sheet(INITIAL_SHEET_ID, "Sheet1")) + return data + + +def _create_empty_sheet(sheet_id: str, name: str) -> dict: + return { + "id": sheet_id, + "name": name, + "cells": {}, + "figures": [], + "isVisible": True, + } + + +def _create_empty_workbook(sheet_name: str = "Sheet1") -> dict: + return { + "sheets": [_create_empty_sheet(INITIAL_SHEET_ID, sheet_name)], + "styles": {}, + "formats": {}, + "borders": {}, + "revisionId": DEFAULT_REVISION_ID, + } + + +# def upgrade_data(data, upgrade_functions, to_version, version_field="version"): +# data = load(data) +# upgrade_functions.sort(key=lambda x: x[0]) +# for upgrade_version, upgrade in upgrade_functions: +# if data.get(version_field, 0) < upgrade_version: +# upgrade(data) +# data[version_field] = upgrade_version +# if upgrade_version == to_version: +# return data +# return data + + +# Reference of a column header (eg. A, AB) +col_header = re.compile(r"^([A-Z]{1,3})+$") +cell_reference = r"\$?([A-Z]{1,3})\$?([0-9]{1,7})" +# Reference of a normal range or a full row range (eg. A1:B1, 1:$5, $A2:5) +full_row_xc = r"(\$?[A-Z]{1,3})?\$?[0-9]{1,7}\s*:\s*(\$?[A-Z]{1,3})?\$?[0-9]{1,7}\s*" +# Reference of a normal range or a column row range (eg. A1:B1, A:$B, $A1:C) +full_col_xc = r"\$?[A-Z]{1,3}(\$?[0-9]{1,7})?\s*:\s*\$?[A-Z]{1,3}(\$?[0-9]{1,7})?\s*" +# Reference of a cell or a range, it can be a bounded range, a full row or a full column +range_reference = re.compile(r"^\s*('.+'!|[^']+!)?" + "({}|{}|{})$".format(cell_reference, full_row_xc, full_col_xc)) +# Reference of a column (eg. A, $CA, Sheet1!B) +col_reference = re.compile(r"^\s*('.+'!|[^']+!)?\$?([A-Z]{1,3})$") +# Reference of a row (eg. 1, 59, Sheet1!9) +row_reference = re.compile(r"^\s*('.+'!|[^']+!)?\$?([0-9]{1,7})$") + + +cell_reference = re.compile(r"\$?([A-Z]{1,3})\$?([0-9]{1,7})") + + +def zone_to_xc(zone) -> str: + top = zone["top"] + bottom = zone["bottom"] + left = zone["left"] + right = zone["right"] + + has_header = zone.get("hasHeader", False) + is_one_cell = top == bottom and left == right + + if bottom is None and right is not None: + return f"{number_to_letters(left)}:{number_to_letters(right)}" if top == 0 and not has_header else f"{to_xc(left, top)}:{number_to_letters(right)}" + elif right is None and bottom is not None: + return f"{top + 1}:{bottom + 1}" if left == 0 and not has_header else f"{to_xc(left, top)}:{bottom + 1}" + elif bottom is not None and right is not None: + return to_xc(left, top) if is_one_cell else f"{to_xc(left, top)}:{to_xc(right, bottom)}" + + raise ValueError("Bad zone format") + +def letters_to_number(letters): + """ + Convert a string (describing a column) to its number value. + >>> letters_to_number("A") + 0 + >>> letters_to_number("Z") + 25 + >>> letters_to_number("AA") + 26 + """ + result = 0 + length = len(letters) + for i in range(length): + n = ord(letters[i]) - 65 + (i < length - 1) + result += n * 26 ** (length - i - 1) + return result + + +def number_to_letters(n): + """ + >>> number_to_letters(0) + 'A' + >>> number_to_letters(1) + 'B' + """ + if n < 0: + raise ValueError(f"number must be positive. Got {n}") + if n < 26: + return chr(65 + n) + else: + return number_to_letters(math.floor(n / 26) - 1) + number_to_letters(n % 26) + + +def to_cartesian(xc): + """Convert a cell reference to a cartesian coordinate + >>> to_cartesian("A1") + {'col': 0, 'row': 0} + >>> to_cartesian("B2") + {'col': 1, 'row': 1} + """ + xc = xc.upper().strip() + match = cell_reference.match(xc) + if match: + (letters, numbers) = match.groups() + col = letters_to_number(letters) + row = int(numbers) - 1 + return {"col": col, "row": row} + raise ValueError(f"Invalid cell description: {xc}") + + +def is_col_reference(xc): + return col_reference.match(xc) + + +def is_row_reference(xc): + return row_reference.match(xc) + + +def is_col_header(str): + return col_header.match(str) + + +def to_zone_without_boundary_changes(xc): + xc = xc.split("!").pop() + ranges = [x.strip() for x in xc.replace("$", "").split(":")] + full_col = full_row = has_header = False + first_range_part = ranges[0] + second_range_part = len(ranges) == 2 and ranges[1] + if is_col_reference(first_range_part): + left = right = letters_to_number(first_range_part) + top = bottom = 0 + full_col = True + elif is_row_reference(first_range_part): + top = bottom = int(first_range_part) - 1 + left = right = 0 + full_row = True + else: + c = to_cartesian(first_range_part) + left = right = c["col"] + top = bottom = c["row"] + has_header = True + if len(ranges) == 2: + if is_col_reference(second_range_part): + right = letters_to_number(second_range_part) + full_col = True + elif is_row_reference(second_range_part): + bottom = int(second_range_part) - 1 + full_row = True + else: + c = to_cartesian(second_range_part) + right = c["col"] + bottom = c["row"] + if full_col: + top = bottom + if full_row: + left = right + has_header = True + if full_col and full_row: + raise ValueError("Wrong zone xc. The zone cannot be at the same time a full column and a full row") + zone = { + "top": top, + "left": left, + "bottom": None if full_col else bottom, + "right": None if full_row else right, + } + has_header = has_header and (full_row or full_col) + if has_header: + zone["has_header"] = has_header + return zone + + +def to_unbounded_zone(xc): + zone = to_zone_without_boundary_changes(xc) + if zone["right"] is not None and zone["right"] < zone["left"]: + zone["left"], zone["right"] = zone["right"], zone["left"] + if zone["bottom"] is not None and zone["bottom"] < zone["top"]: + zone["top"], zone["bottom"] = zone["bottom"], zone["top"] + return zone + + +def to_zone(xc): + """Convert from a cartesian reference to a zone. + >>> to_zone("A1") + {'top': 0, 'left': 0, 'bottom': 0, 'right': 0} + >>> to_zone("B1:B3") + {'top': 0, 'left': 1, 'bottom': 2, 'right': 1} + >>> to_zone("Sheet1!A1") + {'top': 0, 'left': 0, 'bottom': 0, 'right': 0} + """ + zone = to_unbounded_zone(xc) + if zone["bottom"] is None or zone["right"] is None: + raise ValueError("This does not support unbounded ranges") + return zone + + +def to_xc(col, row): + """ + >>> to_xc(0, 0) + 'A1' + >>> to_xc(1, 1) + 'B2' + """ + return number_to_letters(col) + str(row + 1) + + +def overlap(z1, z2) -> bool: + if z1["bottom"] < z2["top"] or z2["bottom"] < z1["top"]: + return False + if z1["right"] < z2["left"] or z2["right"] < z1["left"]: + return False + return True \ No newline at end of file diff --git a/src/util/spreadsheet/parser.py b/src/util/spreadsheet/parser.py new file mode 100644 index 000000000..92f6d8a08 --- /dev/null +++ b/src/util/spreadsheet/parser.py @@ -0,0 +1,201 @@ +from dataclasses import dataclass +from typing import Callable, ClassVar, Iterable, List, Union + +from .tokenizer import ( + POSTFIX_UNARY_OPERATORS, + tokenize, +) + +OP_PRIORITY = { + "^": 30, + "%": 30, + "*": 20, + "/": 20, + "+": 15, + "-": 15, + "&": 13, + ">": 10, + "<>": 10, + ">=": 10, + "<": 10, + "<=": 10, + "=": 10, +} + +UNARY_OPERATORS_PREFIX = ["-", "+"] + +AST = Union["Literal", "BinaryOperation", "UnaryOperation", "FunctionCall"] + + +@dataclass +class Literal: + type: str + value: str + + +@dataclass +class BinaryOperation: + type: ClassVar[str] = "BIN_OPERATION" + value: str + left: AST + right: AST + + +@dataclass +class UnaryOperation: + type: ClassVar[str] = "UNARY_OPERATION" + value: str + operand: AST + postfix: bool = False + + +@dataclass +class FunctionCall(Literal): + type: ClassVar[str] = "FUNCALL" + value: str + args: List[AST] + + +def parse(formula): + """Parse a spreadsheet formula and return an AST""" + tokens = tokenize(formula) + tokens = [token for token in tokens if token[0] != "DEBUGGER"] + if tokens[0][1] == "=": + tokens = tokens[1:] + return parse_expression(tokens) + + +def parse_expression(tokens, binding_power=0) -> AST: + if not tokens: + raise ValueError("Unexpected end of formula") + left = parse_operand(tokens) + # as long as we have operators with higher priority than the parent one, + # continue parsing the expression since we are in a child sub-expression + while tokens and tokens[0][0] == "OPERATOR" and OP_PRIORITY[tokens[0][1]] > binding_power: + operator = tokens.pop(0)[1] + if operator in POSTFIX_UNARY_OPERATORS: + left = UnaryOperation(operator, left, postfix=True) + else: + right = parse_expression(tokens, OP_PRIORITY[operator]) + left = BinaryOperation(operator, left, right) + return left + + +def parse_function_args(tokens) -> Iterable[AST]: + consume_or_raise(tokens, "LEFT_PAREN", error_msg="Missing opening parenthesis") + if not tokens: + raise ValueError("Unexpected end of formula") + next_token = tokens[0] + if next_token[0] == "RIGHT_PAREN": + consume_or_raise(tokens, "RIGHT_PAREN") + yield from [] + return + yield parse_single_arg(tokens) + while tokens and tokens[0][0] != "RIGHT_PAREN": + consume_or_raise(tokens, "ARG_SEPARATOR", error_msg="Wrong function call") + yield parse_single_arg(tokens) + consume_or_raise(tokens, "RIGHT_PAREN", error_msg="Missing closing parenthesis") + + +def parse_single_arg(tokens) -> AST: + next_token = tokens[0] + if next_token[0] in {"ARG_SEPARATOR", "RIGHT_PAREN"}: + # arg is empty: "sum(1,,2)" "sum(,1)" "sum(1,)" + return Literal("EMPTY", "") + return parse_expression(tokens) + + +def parse_operand(tokens: list) -> AST: + token = tokens.pop(0) + if token[0] == "LEFT_PAREN": + expr = parse_expression(tokens) + consume_or_raise(tokens, "RIGHT_PAREN", error_msg="Missing closing parenthesis") + return expr + elif token[0] == "STRING": + return Literal(token[0], token[1].strip('"')) + elif token[0] in ["NUMBER", "BOOLEAN", "UNKNOWN"]: + return Literal(token[0], token[1]) + elif token[0] == "OPERATOR" and token[1] in UNARY_OPERATORS_PREFIX: + operator = token[1] + return UnaryOperation(operator, parse_expression(tokens, OP_PRIORITY[operator])) + elif token[0] == "SYMBOL": + # breakpoint() + args = list(parse_function_args(tokens)) + return FunctionCall(token[1], list(args)) + raise ValueError(f"Unexpected token: {token}") + + +def consume_or_raise(tokens, token_type, error_msg="Unexpected token"): + if not tokens or tokens.pop(0)[0] != token_type: + raise ValueError(error_msg) + + +def ast_to_string(ast: AST) -> str: + """Convert an AST to the corresponding string.""" + if ast.type == "BIN_OPERATION": + operator = ast.value + left = left_to_string(ast.left, operator) + right = right_to_string(ast.right, operator) + return f"{left}{operator}{right}" + elif ast.type == "UNARY_OPERATION": + operator = ast.value + if ast.postfix: + return f"{left_to_string(ast.operand, operator)}{operator}" + else: + return f"{operator}{right_to_string(ast.operand, operator)}" + elif ast.type == "FUNCALL": + args = (ast_to_string(arg) for arg in ast.args) + return f"{ast.value}({','.join(args)})" + elif ast.type == "STRING": + return f'"{ast.value}"' + elif ast.type in {"NUMBER", "BOOLEAN", "EMPTY", "UNKNOWN"}: + return ast.value + raise ValueError("Unexpected node type: " + ast.type) + + +def left_to_string(left_expr: AST, parent_operator: str) -> str: + """Convert the left operand of a binary operation to the corresponding string + and enclose the result inside parenthesis if necessary.""" + if left_expr.type == "BIN_OPERATION" and OP_PRIORITY[left_expr.value] < OP_PRIORITY[parent_operator]: + return f"({ast_to_string(left_expr)})" + return ast_to_string(left_expr) + + +ASSOCIATIVE_OPERATORS = {"*", "+", "&"} + + +def right_to_string(right_expr: AST, parent_operator: str) -> str: + """Convert the right operand of a binary operation to the corresponding string + and enclose the result inside parenthesis if necessary.""" + if right_expr.type != "BIN_OPERATION": + return ast_to_string(right_expr) + elif (OP_PRIORITY[right_expr.value] < OP_PRIORITY[parent_operator]) or ( + OP_PRIORITY[right_expr.value] == OP_PRIORITY[parent_operator] and parent_operator not in ASSOCIATIVE_OPERATORS + ): + return f"({ast_to_string(right_expr)})" + return ast_to_string(right_expr) + + +def transform_ast_nodes(ast: AST, node_type: str, node_transformer: Callable[[AST], AST]) -> AST: + """Transform the nodes of an AST using the provided node_transformer function.""" + if ast.type == node_type: + ast = node_transformer(ast) + if ast.type == "BIN_OPERATION": + return BinaryOperation( + ast.value, + transform_ast_nodes(ast.left, node_type, node_transformer), + transform_ast_nodes(ast.right, node_type, node_transformer), + ) + elif ast.type == "UNARY_OPERATION": + return UnaryOperation( + ast.value, + transform_ast_nodes(ast.operand, node_type, node_transformer), + ast.postfix, + ) + elif ast.type == "FUNCALL": + return FunctionCall( + ast.value, + list(transform_ast_nodes(arg, node_type, node_transformer) for arg in ast.args), + ) + else: + return ast diff --git a/src/util/spreadsheet/revisions.py b/src/util/spreadsheet/revisions.py new file mode 100644 index 000000000..b037918af --- /dev/null +++ b/src/util/spreadsheet/revisions.py @@ -0,0 +1,76 @@ +import json +from collections import namedtuple +from copy import deepcopy +from typing import Iterable + +from odoo.upgrade import util + +CommandAdapter = namedtuple("CommandAdapter", ["command_type", "adapt"]) + +Drop = object() + +# not used +def transform_revisions(cr, revisions, *adapters: Iterable[CommandAdapter]): + if not len(adapters): + return + if not util.table_exists(cr, "spreadsheet_revision"): + return + cmd_types = [x[0] for x in adapters] + cr.execute( + """ + SELECT id, commands + FROM spreadsheet_revision + WHERE commands SIMILAR TO %s + """, + (f"%{'%|%'.join(cmd_types)}%",), + ) + for revision_id, data in cr.fetchall(): + data = json.loads(data) + commands = data.get("commands", []) + if not commands: + continue + new_commands = transform_commands(commands, *adapters) + if new_commands is None: + continue + data["commands"] = new_commands + cr.execute( + """ + UPDATE spreadsheet_revision + SET commands=%s + WHERE id=%s + """, + [json.dumps(data), revision_id], + ) + +def transform_revisions_data(revisions, *adapters: Iterable[CommandAdapter]): + for data in revisions: + commands = data.get("commands", []) + if not commands: + continue + new_commands = transform_commands(commands, *adapters) + if new_commands is None: + continue + # ensures it's jsonyfiable (converts tuples to lists) (probably should be removed) + data["commands"] = json.loads(json.dumps(new_commands)) + return revisions + + +def transform_commands(commands, *adapters: Iterable[CommandAdapter]): + to_update = False + new_commands = [] + for command in commands: + adapted_command = command + for adapter_type, adapter in adapters: + if command["type"] == adapter_type and adapted_command is not Drop: + to_adapt = deepcopy(adapted_command) + result = adapter(to_adapt) or to_adapt + if result is Drop: + to_update = True + adapted_command = Drop + continue + if result != adapted_command: + to_update = True + adapted_command = result + if adapted_command is not Drop: + new_commands.append(adapted_command) + return new_commands if to_update else None