From 7d2fc8295eb4e5a11889ee86b86c9aaf642a34c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20Lef=C3=A8vre=20=28lul=29?= Date: Fri, 17 May 2024 09:49:17 +0200 Subject: [PATCH 1/3] [IMP] util/fields: move context utils to their own file Currently, context related functions are only used in specific fields utils fonction. They are inlined there. In the next commit, adapting and cleaning the context is also required to upgrade spreadsheet files. This commit moves the utility functions to their own file to be able to use them else where. --- src/util/context.py | 98 +++++++++++++++++++++++++++++++++++++++++++++ src/util/fields.py | 96 +++----------------------------------------- 2 files changed, 104 insertions(+), 90 deletions(-) create mode 100644 src/util/context.py diff --git a/src/util/context.py b/src/util/context.py new file mode 100644 index 000000000..f1cb5855f --- /dev/null +++ b/src/util/context.py @@ -0,0 +1,98 @@ +# -*- coding: utf-8 -*- + +# python3 shims +try: + basestring # noqa: B018 +except NameError: + basestring = str + + +_CONTEXT_KEYS_TO_CLEAN = ( + "group_by", + "pivot_measures", + "pivot_column_groupby", + "pivot_row_groupby", + "graph_groupbys", + "orderedBy", +) + +def clean_context(context, fieldname): + """Remove (in place) all references to the field in the context dictionary.""" + + def filter_value(key, value): + if key == "orderedBy" and isinstance(value, dict): + res = {k: (filter_value(None, v) if k == "name" else v) for k, v in value.items()} + # return if name didn't match fieldname + return res if "name" not in res or res["name"] is not None else None + if not isinstance(value, basestring): + # if not a string, ignore it + return value + if value.split(":")[0] != fieldname: + # only return if not matching fieldname + return value + return None # value filtered out + + if not isinstance(context, dict): + return False + + changed = False + for key in _CONTEXT_KEYS_TO_CLEAN: + if context.get(key): + context_part = [filter_value(key, e) for e in context[key]] + changed |= context_part != context[key] + context[key] = [e for e in context_part if e is not None] + + for vt in ["pivot", "graph", "cohort"]: + key = "{}_measure".format(vt) + if key in context: + new_value = filter_value(key, context[key]) + changed |= context[key] != new_value + context[key] = new_value if new_value is not None else "id" + + if vt in context: + changed |= clean_context(context[vt]) + + return changed + +def adapt_context(context, old, new): + """Replace (in place) all references to field `old` to `new` in the context dictionary.""" + + # adapt (in place) dictionary values + if not isinstance(context, dict): + return + + for key in _CONTEXT_KEYS_TO_CLEAN: + if context.get(key): + context[key] = [_adapt_context_value(key, e, old, new) for e in context[key]] + + for vt in ["pivot", "graph", "cohort"]: + key = "{}_measure".format(vt) + if key in context: + context[key] = _adapt_context_value(key, context[key], old, new) + + if vt in context: + adapt_context(context[vt], old, new) + + def_old = "default_{}".format(old) + def_new = "default_{}".format(new) + + if def_old in context: + context[def_new] = context.pop(def_old) + + +def _adapt_context_value(key, value, old, new): + if key == "orderedBy" and isinstance(value, dict): + # only adapt the "name" key + return {k: (_adapt_context_value(None, v, old, new) if k == "name" else v) for k, v in value.items()} + + if not isinstance(value, basestring): + # ignore if not a string + return value + + parts = value.split(":", 1) + if parts[0] != old: + # if not match old, leave it + return value + # change to new, and return it + parts[0] = new + return ":".join(parts) diff --git a/src/util/fields.py b/src/util/fields.py index 1a8de8de5..1c7b26823 100644 --- a/src/util/fields.py +++ b/src/util/fields.py @@ -38,6 +38,7 @@ def make_index_name(table_name, column_name): from .const import ENVIRON +from .context import adapt_context, clean_context from .domains import _adapt_one_domain, _replace_path, _valid_path_to, adapt_domains from .exceptions import SleepyDeveloperError from .helpers import _dashboard_actions, _validate_model, resolve_model_fields_path, table_of_model @@ -62,24 +63,14 @@ def make_index_name(table_name, column_name): # python3 shims try: - basestring # noqa: B018 + unicode # noqa: B018 except NameError: - basestring = unicode = str + unicode = str _logger = logging.getLogger(__name__) IMD_FIELD_PATTERN = "field_%s__%s" if version_gte("saas~11.2") else "field_%s_%s" -_CONTEXT_KEYS_TO_CLEAN = ( - "group_by", - "pivot_measures", - "pivot_column_groupby", - "pivot_row_groupby", - "graph_groupbys", - "orderedBy", -) - - def ensure_m2o_func_field_data(cr, src_table, column, dst_table): """ Fix broken m2o relations. @@ -123,46 +114,10 @@ def remove_field(cr, model, fieldname, cascade=False, drop_column=True, skip_inh ENVIRON["__renamed_fields"][model][fieldname] = None - def filter_value(key, value): - if key == "orderedBy" and isinstance(value, dict): - res = {k: (filter_value(None, v) if k == "name" else v) for k, v in value.items()} - # return if name didn't match fieldname - return res if "name" not in res or res["name"] is not None else None - if not isinstance(value, basestring): - # if not a string, ignore it - return value - if value.split(":")[0] != fieldname: - # only return if not matching fieldname - return value - return None # value filtered out - - def clean_context(context): - if not isinstance(context, dict): - return False - - changed = False - for key in _CONTEXT_KEYS_TO_CLEAN: - if context.get(key): - context_part = [filter_value(key, e) for e in context[key]] - changed |= context_part != context[key] - context[key] = [e for e in context_part if e is not None] - - for vt in ["pivot", "graph", "cohort"]: - key = "{}_measure".format(vt) - if key in context: - new_value = filter_value(key, context[key]) - changed |= context[key] != new_value - context[key] = new_value if new_value is not None else "id" - - if vt in context: - changed |= clean_context(context[vt]) - - return changed - # clean dashboard's contexts for id_, action in _dashboard_actions(cr, r"\y{}\y".format(fieldname), model): context = safe_eval(action.get("context", "{}"), SelfPrintEvalContext(), nocopy=True) - changed = clean_context(context) + changed = clean_context(context, fieldname) action.set("context", unicode(context)) if changed: add_to_migration_reports( @@ -176,7 +131,7 @@ def clean_context(context): ) for id_, name, context_s in cr.fetchall(): context = safe_eval(context_s or "{}", SelfPrintEvalContext(), nocopy=True) - changed = clean_context(context) + changed = clean_context(context, fieldname) cr.execute("UPDATE ir_filters SET context = %s WHERE id = %s", [unicode(context), id_]) if changed: add_to_migration_reports(("ir.filters", id_, name), "Filters/Dashboards") @@ -1146,50 +1101,11 @@ def _update_field_usage_multi(cr, models, old, new, domain_adapter=None, skip_in # ir.ui.view.custom # adapt the context. The domain will be done by `adapt_domain` eval_context = SelfPrintEvalContext() - def_old = "default_{}".format(old) - def_new = "default_{}".format(new) match = "{0[old]}|{0[def_old]}".format(p) - def adapt_value(key, value): - if key == "orderedBy" and isinstance(value, dict): - # only adapt the "name" key - return {k: (adapt_value(None, v) if k == "name" else v) for k, v in value.items()} - - if not isinstance(value, basestring): - # ignore if not a string - return value - - parts = value.split(":", 1) - if parts[0] != old: - # if not match old, leave it - return value - # change to new, and return it - parts[0] = new - return ":".join(parts) - - def adapt_dict(d): - # adapt (in place) dictionary values - if not isinstance(d, dict): - return - - for key in _CONTEXT_KEYS_TO_CLEAN: - if d.get(key): - d[key] = [adapt_value(key, e) for e in d[key]] - - for vt in ["pivot", "graph", "cohort"]: - key = "{}_measure".format(vt) - if key in d: - d[key] = adapt_value(key, d[key]) - - if vt in d: - adapt_dict(d[vt]) - for _, act in _dashboard_actions(cr, match, *only_models or ()): context = safe_eval(act.get("context", "{}"), eval_context, nocopy=True) - adapt_dict(context) - - if def_old in context: - context[def_new] = context.pop(def_old) + adapt_context(context, old, new) act.set("context", unicode(context)) # domains, related and inhited models From bb68733a867216d9d999a2c29ea43ca7cf089d7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20Lef=C3=A8vre=20=28lul=29?= Date: Fri, 17 May 2024 09:24:46 +0200 Subject: [PATCH 2/3] m --- .../tests/test_spreadsheet_parser.py | 177 +++++++ src/util/fields.py | 5 + src/util/models.py | 5 + src/util/spreadsheet/__init__.py | 13 + src/util/spreadsheet/data_wrappers.py | 499 ++++++++++++++++++ src/util/spreadsheet/fields.py | 460 ++++++++++++++++ src/util/spreadsheet/misc.py | 374 +++++++++++++ src/util/spreadsheet/models.py | 160 ++++++ src/util/spreadsheet/o_spreadsheet.py | 245 +++++++++ src/util/spreadsheet/parser.py | 201 +++++++ src/util/spreadsheet/revisions.py | 76 +++ 11 files changed, 2215 insertions(+) create mode 100644 src/spreadsheet/tests/test_spreadsheet_parser.py create mode 100644 src/util/spreadsheet/data_wrappers.py create mode 100644 src/util/spreadsheet/fields.py create mode 100644 src/util/spreadsheet/misc.py create mode 100644 src/util/spreadsheet/models.py create mode 100644 src/util/spreadsheet/o_spreadsheet.py create mode 100644 src/util/spreadsheet/parser.py create mode 100644 src/util/spreadsheet/revisions.py diff --git a/src/spreadsheet/tests/test_spreadsheet_parser.py b/src/spreadsheet/tests/test_spreadsheet_parser.py new file mode 100644 index 000000000..846735cd1 --- /dev/null +++ b/src/spreadsheet/tests/test_spreadsheet_parser.py @@ -0,0 +1,177 @@ +from odoo.addons.base.maintenance.migrations.testing import UnitTestCase +from odoo.addons.base.maintenance.migrations.util.spreadsheet.parser import ( + BinaryOperation, + FunctionCall, + Literal, + UnaryOperation, + ast_to_string, + parse, +) + + +class SpreadsheetParserTest(UnitTestCase): + def test_can_parse_a_function_call_with_no_argument(self): + self.assertEqual(parse("RAND()"), FunctionCall("RAND", [])) + + def test_can_parse_a_function_call_with_one_argument(self): + self.assertEqual( + parse("SUM(1)"), + FunctionCall("SUM", [Literal("NUMBER", "1")]), + ) + + def test_can_parse_a_function_call_with_function_argument(self): + self.assertEqual( + parse("SUM(UMINUS(1))"), + FunctionCall("SUM", [FunctionCall("UMINUS", [Literal("NUMBER", "1")])]), + ) + + def test_can_parse_a_function_call_with_sub_expressions_as_argument(self): + self.assertEqual( + parse("IF(A1 > 0, 1, 2)"), + FunctionCall( + "IF", + [ + BinaryOperation(">", Literal("UNKNOWN", "A1"), Literal("NUMBER", "0")), + Literal("NUMBER", "1"), + Literal("NUMBER", "2"), + ], + ), + ) + + def test_add_a_unknown_token_for_empty_arguments(self): + self.assertEqual( + parse("SUM(1,)"), + FunctionCall("SUM", [Literal("NUMBER", "1"), Literal("EMPTY", "")]), + ) + + self.assertEqual( + parse("SUM(,1)"), + FunctionCall("SUM", [Literal("EMPTY", ""), Literal("NUMBER", "1")]), + ) + + self.assertEqual( + parse("SUM(,)"), + FunctionCall("SUM", [Literal("EMPTY", ""), Literal("EMPTY", "")]), + ) + + self.assertEqual( + parse("SUM(,,)"), + FunctionCall("SUM", [Literal("EMPTY", ""), Literal("EMPTY", ""), Literal("EMPTY", "")]), + ) + + self.assertEqual( + parse("SUM(,,,1)"), + FunctionCall("SUM", [Literal("EMPTY", ""), Literal("EMPTY", ""), Literal("EMPTY", ""), Literal("NUMBER", "1")]), + ) + + def test_can_parse_unary_operations(self): + self.assertEqual( + parse("-1"), + UnaryOperation("-", Literal("NUMBER", "1")), + ) + self.assertEqual( + parse("+1"), + UnaryOperation("+", Literal("NUMBER", "1")), + ) + + def test_can_parse_numeric_values(self): + self.assertEqual(parse("1"), Literal("NUMBER", "1")) + self.assertEqual(parse("1.5"), Literal("NUMBER", "1.5")) + self.assertEqual(parse("1."), Literal("NUMBER", "1.")) + self.assertEqual(parse(".5"), Literal("NUMBER", ".5")) + + def test_can_parse_string_values(self): + self.assertEqual(parse('"Hello"'), Literal("STRING", "Hello")) + + def test_can_parse_number_expressed_as_percent(self): + self.assertEqual(parse("1%"), Literal("NUMBER", "1%")) + self.assertEqual(parse("100%"), Literal("NUMBER", "100%")) + self.assertEqual(parse("50.0%"), Literal("NUMBER", "50.0%")) + + def test_can_parse_binary_operations(self): + self.assertEqual( + parse("2-3"), + BinaryOperation("-", Literal("NUMBER", "2"), Literal("NUMBER", "3")), + ) + + def test_can_parse_concat_operator(self): + self.assertEqual( + parse("A1&A2"), + BinaryOperation("&", Literal("UNKNOWN", "A1"), Literal("UNKNOWN", "A2")), + ) + + def test_AND(self): + self.assertEqual( + parse("=AND(true, false)"), + FunctionCall("AND", [Literal("BOOLEAN", "true"), Literal("BOOLEAN", "false")]), + ) + self.assertEqual( + parse("=AND(0, tRuE)"), + FunctionCall("AND", [Literal("NUMBER", "0"), Literal("BOOLEAN", "tRuE")]), + ) + + def test_convert_string(self): + self.assertEqual(ast_to_string(parse('"hello"')), '"hello"') + + def test_convert_debugger(self): + self.assertEqual(ast_to_string(parse("?5+2")), "5+2") + + def test_convert_boolean(self): + self.assertEqual(ast_to_string(parse("TRUE")), "TRUE") + self.assertEqual(ast_to_string(parse("FALSE")), "FALSE") + + def test_convert_unary_operator(self): + self.assertEqual(ast_to_string(parse("-45")), "-45") + self.assertEqual(ast_to_string(parse("+45")), "+45") + self.assertEqual(ast_to_string(parse("-(4+5)")), "-(4+5)") + self.assertEqual(ast_to_string(parse("-4+5")), "-4+5") + self.assertEqual(ast_to_string(parse("-SUM(1)")), "-SUM(1)") + self.assertEqual(ast_to_string(parse("-(1+2)/5")), "-(1+2)/5") + self.assertEqual(ast_to_string(parse("1*-(1+2)")), "1*-(1+2)") + + def test_convert_binary_operator(self): + self.assertEqual(ast_to_string(parse("89-45")), "89-45") + self.assertEqual(ast_to_string(parse("1+2+5")), "1+2+5") + self.assertEqual(ast_to_string(parse("(1+2)/5")), "(1+2)/5") + self.assertEqual(ast_to_string(parse("5/(1+2)")), "5/(1+2)") + self.assertEqual(ast_to_string(parse("2/(1*2)")), "2/(1*2)") + self.assertEqual(ast_to_string(parse("1-2+3")), "1-2+3") + self.assertEqual(ast_to_string(parse("1-(2+3)")), "1-(2+3)") + self.assertEqual(ast_to_string(parse("(1+2)-3")), "1+2-3") + self.assertEqual(ast_to_string(parse("(1<5)+5")), "(1<5)+5") + self.assertEqual(ast_to_string(parse("1*(4*2+3)")), "1*(4*2+3)") + self.assertEqual(ast_to_string(parse("1*(4+2*3)")), "1*(4+2*3)") + self.assertEqual(ast_to_string(parse("1*(4*2+3*9)")), "1*(4*2+3*9)") + self.assertEqual(ast_to_string(parse("1*(4-(2+3))")), "1*(4-(2+3))") + self.assertEqual(ast_to_string(parse("1/(2*(2+3))")), "1/(2*(2+3))") + self.assertEqual(ast_to_string(parse("1/((2+3)*2)")), "1/((2+3)*2)") + self.assertEqual(ast_to_string(parse("2<(1<1)")), "2<(1<1)") + self.assertEqual(ast_to_string(parse("2<=(1<1)")), "2<=(1<1)") + self.assertEqual(ast_to_string(parse("2>(1<1)")), "2>(1<1)") + self.assertEqual(ast_to_string(parse("2>=(1<1)")), "2>=(1<1)") + self.assertEqual(ast_to_string(parse("TRUE=1=1")), "TRUE=1=1") + self.assertEqual(ast_to_string(parse("TRUE=(1=1)")), "TRUE=(1=1)") + + def test_convert_function(self): + self.assertEqual(ast_to_string(parse("SUM(5,9,8)")), "SUM(5,9,8)") + self.assertEqual(ast_to_string(parse("-SUM(5,9,SUM(5,9,8))")), "-SUM(5,9,SUM(5,9,8))") + + def test_convert_references(self): + self.assertEqual(ast_to_string(parse("A10")), "A10") + self.assertEqual(ast_to_string(parse("Sheet1!A10")), "Sheet1!A10") + self.assertEqual(ast_to_string(parse("'Sheet 1'!A10")), "'Sheet 1'!A10") + self.assertEqual(ast_to_string(parse("'Sheet 1'!A10:A11")), "'Sheet 1'!A10:A11") + self.assertEqual(ast_to_string(parse("SUM(A1,A2)")), "SUM(A1,A2)") + + def test_convert_strings(self): + self.assertEqual(ast_to_string(parse('"R"')), '"R"') + self.assertEqual(ast_to_string(parse('CONCAT("R", "EM")')), 'CONCAT("R","EM")') + + def test_convert_numbers(self): + self.assertEqual(ast_to_string(parse("5")), "5") + self.assertEqual(ast_to_string(parse("5+4")), "5+4") + self.assertEqual(ast_to_string(parse("+5")), "+5") + self.assertEqual(ast_to_string(parse("1%")), "1%") + self.assertEqual(ast_to_string(parse("1.5")), "1.5") + self.assertEqual(ast_to_string(parse("1.")), "1.") + self.assertEqual(ast_to_string(parse(".5")), ".5") diff --git a/src/util/fields.py b/src/util/fields.py index 1c7b26823..e5ff2c507 100644 --- a/src/util/fields.py +++ b/src/util/fields.py @@ -37,6 +37,7 @@ def make_index_name(table_name, column_name): return "%s_%s_index" % (table_name, column_name) +from . import spreadsheet from .const import ENVIRON from .context import adapt_context, clean_context from .domains import _adapt_one_domain, _replace_path, _valid_path_to, adapt_domains @@ -320,6 +321,8 @@ def adapter(leaf, is_or, negated): for inh in for_each_inherit(cr, model, skip_inherit): remove_field(cr, inh.model, fieldname, cascade=cascade, drop_column=drop_column, skip_inherit=skip_inherit) + spreadsheet.remove_field_in_all_spreadsheets(cr, model, fieldname) + def remove_field_metadata(cr, model, fieldname, skip_inherit=()): """ @@ -534,6 +537,8 @@ def rename_field(cr, model, old, new, update_references=True, domain_adapter=Non for inh in for_each_inherit(cr, model, skip_inherit): rename_field(cr, inh.model, old, new, update_references=update_references, skip_inherit=skip_inherit) + spreadsheet.rename_field_in_all_spreadsheets(cr, model, old, new) + def convert_field_to_html(cr, model, field, skip_inherit=()): _validate_model(model) diff --git a/src/util/models.py b/src/util/models.py index c94ef7bc2..12893ebcb 100644 --- a/src/util/models.py +++ b/src/util/models.py @@ -9,6 +9,7 @@ import logging import re +from . import spreadsheet from .const import ENVIRON from .fields import IMD_FIELD_PATTERN, remove_field from .helpers import _ir_values_value, _validate_model, model_of_table, table_of_model @@ -214,6 +215,8 @@ def remove_model(cr, model, drop_table=True, ignore_m2m=()): category="Removed Models", ) + spreadsheet.remove_model_in_all_spreadsheets(cr, model) + # compat layer... delete_model = remove_model @@ -344,6 +347,8 @@ def rename_model(cr, old, new, rename_table=True): """.format(col_prefix=col_prefix, old=old.replace(".", r"\."), new=new) ) + spreadsheet.rename_model_in_all_spreadsheets(cr, model, new) + def merge_model(cr, source, target, drop_table=True, fields_mapping=None, ignore_m2m=()): """ diff --git a/src/util/spreadsheet/__init__.py b/src/util/spreadsheet/__init__.py index 8d6dde23b..277e8572f 100644 --- a/src/util/spreadsheet/__init__.py +++ b/src/util/spreadsheet/__init__.py @@ -1 +1,14 @@ +import logging + + from .tokenizer import * +from .parser import * +from .o_spreadsheet import * + + +_logger = logging.getLogger(__name__) + + +from .fields import * +from .models import * +from .misc import * diff --git a/src/util/spreadsheet/data_wrappers.py b/src/util/spreadsheet/data_wrappers.py new file mode 100644 index 000000000..0f5575bea --- /dev/null +++ b/src/util/spreadsheet/data_wrappers.py @@ -0,0 +1,499 @@ +import json +from typing import Iterable, List, Union + +from odoo.upgrade.util.misc import version_gte + +from .o_spreadsheet import load + +"""This file provides (partial) wrappers for reading and writing spreadsheet data +and commands. +The goal is to abstract away the implementation details of the underlying data +structures and how they evolved between versions. +""" + +# TODO segregate the data wrappers by version (and use the right one). +# how should should it be done? +# Maybe they could be integrated in odoo source code? +# It could also be useful to have wrappers for validation in odoo (its currently coupled +# to the json file schema). + + +def create_data_source_from_cmd(cmd): + if cmd["type"] in ["CREATE_CHART", "UPDATE_CHART"]: + return OdooChartCmdV16(cmd) + elif cmd["type"] == "INSERT_PIVOT": + if version_gte("saas~17.1"): + return InsertPivotCmdV171(cmd) + return InsertPivotCmdV16(cmd) + elif cmd["type"] == "RE_INSERT_PIVOT": + return InsertPivotCmdV16(cmd) + elif cmd["type"] == "INSERT_ODOO_LIST": + return InsertListCmdV16(cmd) + elif cmd["type"] == "RE_INSERT_ODOO_LIST": + return ReInsertListCmdV16(cmd) + return cmd + + +class Spreadsheet: + def __init__(self, data: Union[str, dict]): + if isinstance(data, str): + data = json.loads(data) + self.data = load(data) + self.pivotContructor = version_gte("saas~17.2") and PivotV17_2 or PivotV16 + + def __str__(self) -> str: + return self.to_json() + + def __repr__(self) -> str: + return self.to_json() + + def to_json(self) -> str: + return json.dumps(self.data, indent=4) + + @property + def cells(self) -> Iterable[dict]: + for sheet in self.data["sheets"]: + for cell in sheet["cells"].values(): + if cell.get("content"): + yield cell + + @property + def odoo_charts(self): + for sheet in self.data["sheets"]: + for figure in sheet["figures"]: + if _is_odoo_chart(figure): + yield OdooChartV16(dict(figure["data"], id=figure["id"])) + + @property + def pivots(self) -> List[str]: + return [self.pivotContructor(d) for d in self.data.get("pivots", {}).values()] + + @property + def lists(self) -> List[str]: + return [SpreadsheetList(d) for d in self.data.get("lists", {}).values()] + + @property + def global_filters(self) -> List[str]: + return self.data.get("globalFilters", []) + + def delete_lists(self, *list_ids: Iterable[str]): + if "lists" not in self.data: + return + for list_id in list_ids: + del self.data["lists"][list_id] + + def delete_pivots(self, *pivot_ids: Iterable[str]): + if "pivots" not in self.data: + return + for pivot_id in pivot_ids: + del self.data["pivots"][pivot_id] + + def delete_global_filters(self, *filter_ids: Iterable[str]): + if "globalFilters" not in self.data: + return + filter_ids = set(filter_ids) + self.data["globalFilters"] = [filter for filter in self.global_filters if filter["id"] not in filter_ids] + + def delete_figures(self, *figure_ids: Iterable[str]): + figure_ids = set(figure_ids) + for sheet in self.data["sheets"]: + sheet["figures"] = [figure for figure in sheet["figures"] if figure["id"] not in figure_ids] + + def clean_empty_cells(self): + for sheet in self.data["sheets"]: + sheet["cells"] = {xc: cell for xc, cell in sheet["cells"].items() if cell and cell != {"content": ""}} + + +def _is_odoo_chart(figure): + return figure["tag"] == "chart" and figure["data"]["type"].startswith("odoo_") + + +class DataSource: + def __init__(self, definition): + self.definition = definition + + @property + def id(self): + return self.definition["id"] + + @property + def model(self): + return self.definition["model"] + + @model.setter + def model(self, model): + self.definition["model"] = model + + @property + def domain(self): + return self.definition["domain"] + + @domain.setter + def domain(self, domain): + self.definition["domain"] = domain + + @property + def context(self): + return self.definition.get("context") + + @property + def order_by(self): + return self.definition.get("orderBy") + + @order_by.setter + def order_by(self, order_by): + self.definition["orderBy"] = order_by + + @property + def fields_matching(self): + return self.definition.get("fieldMatching", {}) + + +class SpreadsheetList(DataSource): + @property + def fields(self) -> List[str]: + return self.definition["columns"] + + @fields.setter + def fields(self, fields: List[str]): + self.definition["columns"] = fields + + @property + def order_by(self): + if "searchParams" not in self.definition: + return [] + order_bys = self.definition["searchParams"].get("orderBy") + if order_bys: + return [{"field": order_by["name"], "asc": order_by["asc"]} for order_by in order_bys] + return [] + + @order_by.setter + def order_by(self, order_bys): + if not order_bys: + return + self.definition["searchParams"]["orderBy"] = [ + { + "name": order_by["field"], + "asc": order_by["asc"], + } + for order_by in order_bys + ] + + +class InsertListCmdV16: + def __init__(self, cmd): + self.cmd = cmd + self.definition = cmd["definition"] + + @property + def id(self): + return self.cmd["id"] + + @property + def model(self): + return self.definition["metaData"]["resModel"] + + @model.setter + def model(self, model): + self.definition["metaData"]["resModel"] = model + + @property + def domain(self): + return self.definition["searchParams"]["domain"] + + @domain.setter + def domain(self, domain): + self.definition["searchParams"]["domain"] = domain + + @property + def context(self): + return self.definition["searchParams"]["context"] + + @property + def order_by(self): + order_bys = self.definition["searchParams"].get("orderBy") + if order_bys: + return [{"field": order_by["name"], "asc": order_by["asc"]} for order_by in order_bys] + return [] + + @order_by.setter + def order_by(self, order_bys): + self.definition["searchParams"]["orderBy"] = [ + { + "name": order_by["field"], + "asc": order_by["asc"], + } + for order_by in order_bys + ] + + @property + def fields_matching(self): + return {} + + @property + def fields(self) -> List[str]: + return self.definition["metaData"]["columns"] + + @fields.setter + def fields(self, fields: List[str]): + self.definition["metaData"]["columns"] = fields + self.cmd["columns"] = [{"name": field} for field in fields] + + +class ReInsertListCmdV16: + def __init__(self, cmd): + self.cmd = cmd + + @property + def id(self): + return self.cmd["id"] + + @property + def fields(self) -> List[str]: + return [col["name"] for col in self.cmd["columns"]] + + @fields.setter + def fields(self, fields: List[str]): + self.cmd["columns"] = [{"name": field} for field in fields] + + +class PivotV16: + """Wrapper around a pivot data source, hiding + its internal structure for v16""" + + def __init__(self, definition): + self.definition = definition + + @property + def id(self): + return self.definition["id"] + + @property + def model(self): + return self.definition["model"] + + @model.setter + def model(self, model): + self.definition["model"] = model + + @property + def domain(self): + return self.definition["domain"] + + @domain.setter + def domain(self, domain): + self.definition["domain"] = domain + + @property + def context(self): + return self.definition.get("context") + + @property + def order_by(self): + sorted_column = self.definition.get("sortedColumn") + if not sorted_column: + return + return { + "field": sorted_column["measure"], + "asc": sorted_column["order"].lower() == "asc", + } + + @order_by.setter + def order_by(self, order_by): + sorted_column = { + "order": "asc" if order_by["asc"] else "desc", + "measure": order_by["field"], + } + self.definition["sortedColumn"].update(sorted_column) + + @property + def fields_matching(self): + return self.definition.get("fieldMatching", {}) + + @property + def measures(self): + return [m["field"] for m in self.definition.get("measures", [])] + + @measures.setter + def measures(self, measures): + self.definition["measures"] = [{"field": m} for m in measures] + + @property + def row_group_by(self): + return self.definition.get("rowGroupBys", []) + + @row_group_by.setter + def row_group_by(self, group_by): + self.definition["rowGroupBys"] = group_by + + @property + def col_group_by(self): + return self.definition.get("colGroupBys", []) + + @col_group_by.setter + def col_group_by(self, group_by): + self.definition["colGroupBys"] = group_by + +class PivotV17_2(PivotV16): + @property + def measures(self): + return self.definition.get("measures", []) + + @measures.setter + def measures(self, measures): + self.definition["measures"] = measures + +class OdooChartV16: + def __init__(self, definition): + self.definition = definition + + @property + def id(self): + return self.definition["id"] + + @property + def model(self): + return self.definition["metaData"]["resModel"] + + @model.setter + def model(self, model): + self.definition["metaData"]["resModel"] = model + + @property + def domain(self): + return self.definition["searchParams"]["domain"] + + @domain.setter + def domain(self, domain): + self.definition["searchParams"]["domain"] = domain + + @property + def context(self): + return self.definition["searchParams"].get("context", {}) + + @property + def order_by(self): + if self.definition["metaData"].get("order"): + return { + "field": self.measure, + "asc": self.definition["metaData"]["order"].lower() == "asc", + } + + @order_by.setter + def order_by(self, order_by): + self.definition["metaData"]["order"] = "ASC" if order_by["asc"] else "DESC" + + @property + def fields_matching(self): + return self.definition.get("fieldMatching", {}) + + @property + def group_by(self): + return self.definition["metaData"]["groupBy"] + + @group_by.setter + def group_by(self, group_by): + self.definition["metaData"]["groupBy"] = group_by + + @property + def measure(self): + return self.definition["metaData"].get("measure", []) + + @measure.setter + def measure(self, measure): + self.definition["metaData"]["measure"] = measure + + +class OdooChartCmdV16(OdooChartV16): + def __init__(self, cmd): + super().__init__(cmd["definition"]) + self.cmd = cmd + + @property + def id(self): + return self.cmd["id"] + + +class InsertPivotCmdV16: + """Wrapper around an INSERT_PIVOT command, hiding + the internal structure of the command.""" + + def __init__(self, cmd): + self.cmd = cmd + self.definition = cmd["definition"] + + @property + def id(self): + return self.cmd["id"] + + @property + def model(self): + return self.definition["metaData"]["resModel"] + + @model.setter + def model(self, model): + self.definition["metaData"]["resModel"] = model + + @property + def domain(self): + return self.definition["searchParams"]["domain"] + + @domain.setter + def domain(self, domain): + self.definition["searchParams"]["domain"] = domain + + @property + def context(self): + return self.definition["searchParams"]["context"] + + @property + def order_by(self): + sorted_column = self.definition["metaData"].get("sortedColumn") + if not sorted_column: + return + return { + "field": sorted_column["measure"], + "asc": sorted_column["order"].lower() == "asc", + } + + @order_by.setter + def order_by(self, order_by): + sorted_column = { + "order": "asc" if order_by["asc"] else "desc", + "measure": order_by["field"], + } + self.definition["metaData"]["sortedColumn"].update(sorted_column) + + @property + def fields_matching(self): + return {} + + @property + def measures(self): + return self.definition["metaData"]["activeMeasures"] + + @measures.setter + def measures(self, measures): + self.definition["metaData"]["activeMeasures"] = measures + + @property + def row_group_by(self): + return self.definition["metaData"]["rowGroupBys"] + + @row_group_by.setter + def row_group_by(self, group_by): + self.definition["metaData"]["rowGroupBys"] = group_by + + @property + def col_group_by(self): + return self.definition["metaData"]["colGroupBys"] + + @col_group_by.setter + def col_group_by(self, group_by): + self.definition["metaData"]["colGroupBys"] = group_by + + +class InsertPivotCmdV171(InsertPivotCmdV16): + @property + def id(self): + return self.cmd["pivotId"] diff --git a/src/util/spreadsheet/fields.py b/src/util/spreadsheet/fields.py new file mode 100644 index 000000000..2c814f47f --- /dev/null +++ b/src/util/spreadsheet/fields.py @@ -0,0 +1,460 @@ +import re +import json + +from typing import Iterable + +from itertools import chain + + +from .data_wrappers import Spreadsheet, create_data_source_from_cmd +from .misc import ( + apply_in_all_spreadsheets, + adapt_view_link_cells, + remove_data_source_function, + remove_lists, + remove_pivots, + remove_odoo_charts, + transform_data_source_functions, +) + +from .revisions import CommandAdapter, transform_revisions_data + +from odoo.osv import expression + +from odoo.upgrade.util.context import adapt_context, clean_context +from odoo.upgrade.util.domains import _adapt_one_domain + + +# stolen from util.fields:def remove_fields +def remove_adapter(leaf, is_or, negated): + # replace by TRUE_LEAF, unless negated or in a OR operation but not negated + if is_or ^ negated: + return [expression.FALSE_LEAF] + return [expression.TRUE_LEAF] + + +def rename_field_in_all_spreadsheets(cr, model, old_value, new_value): + apply_in_all_spreadsheets( + cr, + old_value, + (lambda data, revisions_data: rename_field(cr, model, old_value, new_value, data, revisions_data)), + ) + + +def rename_field(cr, model, old, new, data, revisions=()): + spreadsheet = Spreadsheet(data) + adapters = _rename_field_in_list(cr, spreadsheet, model, old, new) + # adapters += _rename_field_in_pivot(cr, spreadsheet, model, old, new) + # adapters += _rename_field_in_chart(cr, spreadsheet, model, old, new) + # adapters += _rename_field_in_filters(cr, spreadsheet, model, old, new) + # adapters += _rename_field_in_view_link(cr, spreadsheet, model, old, new) + return spreadsheet.data, transform_revisions_data(revisions, *adapters) + + +def remove_field_in_all_spreadsheets(cr, model, field): + apply_in_all_spreadsheets( + cr, field, (lambda data, revisions_data: remove_field(cr, model, field, data, revisions_data)) + ) + + +def remove_field(cr, model, field, data, revisions=()): + spreadsheet = Spreadsheet(data) + _remove_field_from_filter_matching(cr, spreadsheet, model, field) + adapters = _remove_field_from_list(cr, spreadsheet, model, field) + adapters += _remove_field_from_pivot(cr, spreadsheet, model, field) + adapters += _remove_field_from_graph(cr, spreadsheet, model, field) + adapters += _remove_field_from_view_link(cr, spreadsheet, model, field) + spreadsheet.clean_empty_cells() + return spreadsheet.data, transform_revisions_data(revisions, *adapters) + + +def _rename_function_fields(content, data_source_ids, functions, old, new): + def adapter(fun_call): + for arg in fun_call.args[1:]: + if arg.value == old: + arg.value = new + return fun_call + + return transform_data_source_functions(content, data_source_ids, functions, adapter) + + +def _rename_field_in_chain(cr, model, field_model, field_chain, old, new) -> str: + """Model on which the field chain refers to.""" + domain = [(field_chain, "=", 1)] + domain = _adapt_one_domain(cr, field_model, old, new, model, domain) + if domain is None: + return field_chain + return domain[0][0] + + +def _rename_field_in_list(cr, spreadsheet: Spreadsheet, model, old, new): + list_ids = set() + + def rename(olist): + _rename_data_source_field(cr, olist, model, old, new) + if olist.model == model: + list_ids.add(olist.id) + olist.fields = _rename_fields(old, new, olist.fields) + + for olist in spreadsheet.lists: + rename(olist) + + for cell in spreadsheet.cells: + cell["content"] = _rename_function_fields( + cell["content"], list_ids, {"ODOO.LIST", "ODOO.LIST.HEADER"}, old, new + ) + + list_models = {olist.id: olist.model for olist in spreadsheet.lists} + + def collect_list(cmd): + olist = create_data_source_from_cmd(cmd) + list_models[olist.id] = olist.model + + def rename_re_insert(cmd): + olist = create_data_source_from_cmd(cmd) + if list_models[olist.id] == model: + olist.fields = _rename_fields(old, new, olist.fields) + + return ( + CommandAdapter("INSERT_ODOO_LIST", collect_list), + CommandAdapter("INSERT_ODOO_LIST", lambda cmd: rename(create_data_source_from_cmd(cmd))), + CommandAdapter("RE_INSERT_ODOO_LIST", rename_re_insert), + CommandAdapter( + "UPDATE_CELL", + lambda cmd: dict( + cmd, + content=_rename_function_fields( + cmd.get("content"), list_ids, {"ODOO.LIST", "ODOO.LIST.HEADER"}, old, new + ), + ), + ), + ) + + +def _rename_fields(old: str, new: str, fields: Iterable[str]) -> Iterable[str]: + renamed = [] + for field in fields: + if ":" in field: + field, aggregate_operator = field.split(":") + if field == old: + renamed.append(new + ":" + aggregate_operator) + else: + renamed.append(field) + elif field == old: + renamed.append(new) + else: + renamed.append(field) + return renamed + + +def _rename_field_in_pivot(cr, spreadsheet: Spreadsheet, model, old, new): + pivot_ids = set() + + def rename(pivot): + _rename_data_source_field(cr, pivot, model, old, new) + if pivot.model == model: + pivot_ids.add(pivot.id) + pivot.col_group_by = _rename_fields(old, new, pivot.col_group_by) + pivot.row_group_by = _rename_fields(old, new, pivot.row_group_by) + pivot.measures = _rename_fields(old, new, pivot.measures) + + for pivot in spreadsheet.pivots: + rename(pivot) + + for cell in spreadsheet.cells: + cell["content"] = _rename_function_fields( + cell["content"], pivot_ids, {"ODOO.PIVOT", "ODOO.PIVOT.HEADER"}, old, new + ) + + def adapt_insert(cmd): + pivot = create_data_source_from_cmd(cmd) + rename(pivot) + adapt_pivot_table(cmd) + + def adapt_pivot_table(cmd): + table = cmd["table"] + for row in table["cols"]: + for cell in row: + cell["fields"] = _rename_fields(old, new, cell["fields"]) + # value can be the name of the measure (a field name) + cell["values"] = _rename_fields(old, new, cell["values"]) + for row in table["rows"]: + row["fields"] = _rename_fields(old, new, row["fields"]) + row["values"] = _rename_fields(old, new, row["values"]) + cmd["table"]["measures"] = _rename_fields(old, new, table["measures"]) + + return ( + CommandAdapter("INSERT_PIVOT", adapt_insert), + CommandAdapter("RE_INSERT_PIVOT", adapt_pivot_table), + CommandAdapter( + "UPDATE_CELL", + lambda cmd: dict( + cmd, + content=_rename_function_fields( + cmd.get("content"), pivot_ids, {"ODOO.PIVOT", "ODOO.PIVOT.HEADER"}, old, new + ), + ), + ), + ) + + +def _rename_data_source_field(cr, data_source, model, old, new): + data_source.domain = ( + _adapt_one_domain(cr, model, old, new, data_source.model, data_source.domain) or data_source.domain + ) + for measure in data_source.fields_matching.values(): + measure["chain"] = _rename_field_in_chain(cr, data_source.model, model, measure["chain"], old, new) + if data_source.model == model: + adapt_context(data_source.context, old, new) + if data_source.order_by: + data_source.order_by = _rename_order_by(data_source.order_by, old, new) + + +def _remove_data_source_field(cr, data_source, model, field): + if data_source.model == model: + data_source.domain = ( + _adapt_one_domain( + cr, model, field, "ignored", data_source.model, data_source.domain, remove_adapter, force_adapt=True + ) + or data_source.domain + ) + + adapt_context(data_source.context, field, "ignored") + if data_source.order_by: + data_source.order_by = _remove_order_by(data_source.order_by, field) + + +def _remove_order_by(order_by, field): + if isinstance(order_by, list): + return [order for order in order_by if order["field"] != field] + if order_by and order_by["field"] == field: + return None + return order_by + +def _rename_order_by(order_by, old, new): + if isinstance(order_by, list): + return [_rename_order_by(order, old, new) for order in order_by] + if order_by and order_by["field"] == old: + order_by["field"] = new + return order_by + + +def _rename_field_in_chart(cr, spreadsheet: Spreadsheet, model, old, new): + def rename(chart): + _rename_data_source_field(cr, chart, model, old, new) + if chart.model == model: + if chart.measure == old: + chart.measure = new + chart.group_by = _rename_fields(old, new, chart.group_by) + return chart + + for chart in spreadsheet.odoo_charts: + rename(chart) + + def adapt_create_chart(cmd): + if cmd["definition"]["type"].startswith("odoo_"): + chart = create_data_source_from_cmd(cmd) + rename(chart) + + return (CommandAdapter("CREATE_CHART", adapt_create_chart),) + + +def _rename_field_in_filters(cr, spreadsheet: Spreadsheet, model, old, new): + pivot_models = {pivot.id: pivot.model for pivot in spreadsheet.pivots} + list_models = {olist.id: olist.model for olist in spreadsheet.lists} + chart_models = {chart.id: chart.model for chart in spreadsheet.odoo_charts} + + def adapt_filter(cmd): + for pivot_id, field in cmd["pivot"].items(): + pivot_model = pivot_models[pivot_id] + field["chain"] = _rename_field_in_chain(cr, pivot_model, model, field["chain"], old, new) + for list_id, field in cmd["list"].items(): + list_model = list_models[list_id] + field["chain"] = _rename_field_in_chain(cr, list_model, model, field["chain"], old, new) + for chart_id, field in cmd["chart"].items(): + chart_model = chart_models[chart_id] + field["chain"] = _rename_field_in_chain(cr, chart_model, model, field["chain"], old, new) + + def collect_pivot(cmd): + pivot = create_data_source_from_cmd(cmd) + pivot_models[pivot.id] = pivot.model + + def collect_list(cmd): + olist = create_data_source_from_cmd(cmd) + list_models[olist.id] = olist.model + + def collect_charts(cmd): + if cmd["definition"]["type"].startswith("odoo_"): + chart = create_data_source_from_cmd(cmd) + chart_models[chart.id] = chart.model + + return ( + CommandAdapter("INSERT_PIVOT", collect_pivot), + CommandAdapter("INSERT_ODOO_LIST", collect_list), + CommandAdapter("CREATE_CHART", collect_charts), + CommandAdapter("ADD_GLOBAL_FILTER", adapt_filter), + CommandAdapter("EDIT_GLOBAL_FILTER", adapt_filter), + ) + + +def match_markdown_link(content): + return re.match(r"\[.*\]\(odoo://view/(.*)\)", content) + + +def _rename_field_in_view_link(cr, spreadsheet: Spreadsheet, model, old, new): + def adapt_view_link(action): + if action["modelName"] != model: + return + domain = _adapt_one_domain(cr, model, old, new, action["modelName"], action["domain"]) + if domain: + if isinstance(action["domain"], str): + domain = str(domain) + action["domain"] = domain + adapt_context(action["context"], old, new) + + return adapt_view_link_cells(spreadsheet, adapt_view_link) + + +## Removal + + +def _remove_list_functions(content, list_ids, field): + """Remove functions such as ODOO.LIST(1, 'field') or ODOO.LIST.HEADER(1, 'field')""" + + def filter_func(func_call_ast): + return any(arg.value == field for arg in func_call_ast.args[1:]) + + return remove_data_source_function(content, list_ids, {"ODOO.LIST", "ODOO.LIST.HEADER"}, filter_func) + + +def _remove_field_from_list(cr, spreadsheet: Spreadsheet, model, field): + def _remove_field(olist): + _remove_data_source_field(cr, olist, model, field) + if olist.model == model: + olist.fields = [column for column in olist.fields if column != field] + + for olist in spreadsheet.lists: + _remove_field(olist) + + def adapt_insert(cmd): + olist = create_data_source_from_cmd(cmd) + _remove_field(olist) + + # collect all list models inserted by INSERT_ODOO_LIST + # because we need the models to adapt RE_INSERT_ODOO_LIST + list_models = {olist.id: olist.model for olist in spreadsheet.lists} + + def collect_list(cmd): + olist = create_data_source_from_cmd(cmd) + list_models[olist.id] = olist.model + + def adapt_re_insert(cmd): + olist = create_data_source_from_cmd(cmd) + if list_models[olist.id] == model: + _remove_field(olist) + + return ( + CommandAdapter("INSERT_ODOO_LIST", collect_list), + CommandAdapter("INSERT_ODOO_LIST", adapt_insert), + CommandAdapter("RE_INSERT_ODOO_LIST", adapt_re_insert), + ) + + +def _remove_field_from_pivot(cr, spreadsheet: Spreadsheet, model, field): + def _remove_field(pivot): + _remove_data_source_field(cr, pivot, model, field) + if pivot.model == model: + pivot.col_group_by = [f for f in pivot.col_group_by if f != field] + pivot.row_group_by = [f for f in pivot.row_group_by if f != field] + pivot.measures = [f for f in pivot.measures if f != field] + + for pivot in spreadsheet.pivots: + _remove_field(pivot) + + def adapt_insert(cmd): + pivot = create_data_source_from_cmd(cmd) + _remove_field(pivot) + + return (CommandAdapter("INSERT_PIVOT", adapt_insert),) + + +def _remove_field_from_graph(cr, spreadsheet: Spreadsheet, model, field): + def _remove_field(chart): + _remove_data_source_field(cr, chart, model, field) + if chart.model == model: + chart.measure = chart.measure if chart.measure != field else None + + for chart in spreadsheet.odoo_charts: + _remove_field(chart) + + def adapt_create_chart(cmd): + if cmd["definition"]["type"].startswith("odoo_"): + chart = create_data_source_from_cmd(cmd) + _remove_field(chart) + + return (CommandAdapter("CREATE_CHART", adapt_create_chart),) + + +def _remove_field_from_view_link(cr, spreadsheet: Spreadsheet, model, field): + def adapt_view_link(action): + if action["modelName"] == model: + clean_context(action["context"], field) + action["domain"] = _adapt_one_domain( + cr, model, field, "ignored", model, action["domain"], remove_adapter, force_adapt=True + ) + + return adapt_view_link_cells(spreadsheet, adapt_view_link) + + +def _remove_field_from_filter_matching(cr, spreadsheet: Spreadsheet, model, field): + data_sources = chain(spreadsheet.lists, spreadsheet.pivots, spreadsheet.odoo_charts) + for data_source in data_sources: + matching_to_delete = [] + for filter_id, measure in data_source.fields_matching.items(): + if _is_field_in_chain(cr, model, field, data_source.model, measure["chain"]): + matching_to_delete.append(filter_id) + for filter_id in matching_to_delete: + del data_source.fields_matching[filter_id] + + +def _is_field_in_chain(cr, field_model, field, data_source_model, field_chain): + def adapter(*args, **kwargs): + return expression.FALSE_DOMAIN + + domain = [(field_chain, "=", 1)] + domain = _adapt_one_domain(cr, field_model, field, "ignored", data_source_model, domain, adapter=adapter) + return domain == expression.FALSE_DOMAIN + + +def domain_fields(domain): + """return all field names used in the domain + >>> domain_fields([['field1', '=', 1], ['field2', '=', 2]]) + ['field1', 'field2'] + """ + return [leaf[0] for leaf in domain if len(leaf) == 3] + + +def pivot_measure_fields(pivot): + return [measure for measure in pivot.measures if measure != "__count"] + + +def pivot_fields(pivot): + """return all field names used in a pivot definition""" + fields = set(pivot.col_group_by + pivot.row_group_by + pivot_measure_fields(pivot) + domain_fields(pivot.domain)) + measure = pivot.order_by and pivot.order_by["field"] + if measure and measure != "__count": + fields.add(measure) + return fields + + +def chart_fields(chart): + """return all field names used in a chart definitions""" + fields = set(chart.group_by + domain_fields(chart.domain)) + measure = chart.measure + if measure != "__count": + fields.add(measure) + return fields + + +def list_order_fields(list_definition): + return [order["field"] for order in list_definition.order_by] diff --git a/src/util/spreadsheet/misc.py b/src/util/spreadsheet/misc.py new file mode 100644 index 000000000..31dc12a95 --- /dev/null +++ b/src/util/spreadsheet/misc.py @@ -0,0 +1,374 @@ + +import re +import json +import logging + +from typing import Union, Callable, Dict, List + +from itertools import chain + + +from .data_wrappers import Spreadsheet, create_data_source_from_cmd +from .parser import ast_to_string, transform_ast_nodes, parse, Literal + +from .o_spreadsheet import load +from .revisions import CommandAdapter, Drop +from odoo.addons.base.maintenance.migrations import util + +_logger = logging.getLogger(__name__) + + +def read_spreadsheet_attachments(cr, like_pattern=""): + yield from read_spreadsheet_initial_data(cr, like_pattern) + yield from read_spreadsheet_snapshots(cr, like_pattern) + + +def read_spreadsheet_snapshots(cr, like_pattern=""): + cr.execute( + """ + SELECT id, res_model, res_id, db_datas + FROM ir_attachment + WHERE res_model IN ('spreadsheet.dashboard', 'documents.document') + AND res_field = 'spreadsheet_snapshot' + AND position(%s::bytea in db_datas) > 0 + """, + [like_pattern], + ) + # TODO rename 'like_pattern', it's not LIKE because LIKE doesn't work because the field is of type bytea + for attachment_id, res_model, res_id, db_datas in cr.fetchall(): + if db_datas: + yield attachment_id, res_model, res_id, json.loads(db_datas.tobytes()) + + +def read_spreadsheet_initial_data(cr, like_pattern=""): + if util.table_exists(cr, "documents_document"): + cr.execute( + """ + SELECT doc.id AS document_id, a.id AS attachment_id, a.db_datas + FROM documents_document doc + LEFT JOIN ir_attachment a ON a.id = doc.attachment_id + WHERE doc.handler='spreadsheet' + AND position(%s::bytea in db_datas) > 0 + """, + [like_pattern], + ) + # TODO there are excel files in there! + for document_id, attachment_id, db_datas in cr.fetchall(): + if db_datas: + yield attachment_id, "documents.document", document_id, json.loads(db_datas.tobytes()) + + if util.table_exists(cr, "spreadsheet_dashboard"): + data_field = _magic_spreadsheet_field(cr) #"spreadsheet_binary_data" if version_gte("saas~16.3") else "data" + cr.execute( + """ + SELECT id, res_model, res_id, db_datas + FROM ir_attachment + WHERE res_model = 'spreadsheet.dashboard' + AND res_field = %s + AND position(%s::bytea in db_datas) > 0 + """, + [data_field, like_pattern], + ) + for attachment_id, res_model, res_id, db_datas in cr.fetchall(): + if db_datas: + yield attachment_id, res_model, res_id, json.loads(db_datas.tobytes()) + +def _magic_spreadsheet_field(cr): + cr.execute( + """ + SELECT count(1) + FROM ir_model_fields + WHERE model='spreadsheet.mixin' + AND name='spreadsheet_binary_data'; + """ + ) + return cr.fetchone()[0] and 'spreadsheet_binary_data' or 'data' + +def apply_in_all_spreadsheets(cr, like_pattern, callback): + print("like pattern : ", like_pattern) + b = False + # upgrade the initial data and all revisions based on it + for attachment_id, res_model, res_id, db_datas in read_spreadsheet_initial_data(cr, like_pattern): + print("attachment : ", attachment_id) + print("datas: ", len(db_datas)) + b = True + + data, _ = callback(db_datas, []) + write_attachment(cr, attachment_id, data) + + ## FIXME TODORAR batch the calls + ## FIXME we have to pass in the revisions regardless of the base data stuff + # upgrade revisions + # regardless of res_model res_id + revisions_data = [] + revisions_ids = [] + + for revision_id, commands in get_revisions(cr, "res_model", "res_id", like_pattern): + revisions_data.append(json.loads(commands)) + revisions_ids.append(revision_id) + _, revisions = callback({}, revisions_data) + for rev_id, revision in zip(revisions_ids, revisions): + cr.execute( + """ + UPDATE spreadsheet_revision + SET commands=%s + WHERE id=%s + """, + [json.dumps(revision), rev_id], + ) + if b: + _logger.info("upgrading initial data and revisions") + + b = False + # upgrade snapshots + for attachment_id, _res_model, _res_id, db_datas in read_spreadsheet_snapshots(cr, like_pattern): + print("attachment : ", attachment_id) + + b=True + data, revisions = callback(db_datas, []) + write_attachment(cr, attachment_id, data) + + if b: + _logger.info("upgrading snapshots") + + + +def write_attachment(cr, attachment_id, data): + _logger.info("replacing attachment %s", attachment_id) + cr.execute( + """ + UPDATE ir_attachment + SET db_datas=%s + WHERE id=%s + """, + [json.dumps(data).encode(), attachment_id] + ) + + +def get_revisions(cr, res_model, res_id, like_pattern): + if util.version_gte("16.0"): + cr.execute( + """ + SELECT id, commands + FROM spreadsheet_revision + WHERE commands LIKE %s + """, + ['%' + like_pattern + '%'], + ) + else: + cr.execute( + """ + SELECT id, commands + FROM spreadsheet_revision + WHERE commands LIKE %s + """, + [like_pattern], + ) + return cr.fetchall() + +def upgrade_data(cr, upgrade_callback): + for attachment_id, _res_model, _res_id, data in read_spreadsheet_attachments(cr): + upgraded_data = upgrade_callback(load(data)) + cr.execute( + """ + UPDATE ir_attachment + SET db_datas=%s + WHERE id=%s + """, + [json.dumps(upgraded_data).encode(), attachment_id], + ) + _logger.info("spreadsheet json data upgraded") + + +def transform_data_source_functions(content, data_source_ids, functions, adapter): + """Transforms data source function calls within content. + The 'adapter' function is called with each function call AST node matching + the function name and any data source. + """ + if not content or not content.startswith("=") or not data_source_ids: + return content + if not any(fn.upper() in content.upper() for fn in functions): + return content + try: + ast = parse(content) + except ValueError: + return content + + data_source_ids = [str(did) for did in data_source_ids] + def _adapter(fun_call): + # call the provided adapter only if the function name + # and data source matches + if fun_call.value.upper() in functions and len(fun_call.args) > 0: + data_source_id = fun_call.args[0].value + if str(data_source_id) in data_source_ids: + return adapter(fun_call) + return fun_call + + ast = transform_ast_nodes(ast, "FUNCALL", _adapter) + return f"={ast_to_string(ast)}" + + + +def adapt_view_link_cells(spreadsheet: Spreadsheet, adapter: Callable[[str], Union[str, None]]): + def adapt_view_link(content): + """A view link is formatted as a markdown link + [text](odoo://view/) + """ + match = re.match(r"^\[([^\[]+)\]\(odoo://view/(.+)\)$", content) + if not match: + return content + label = match.group(1) + view_description = json.loads(match.group(2)) + result = adapter(view_description["action"]) + if result == Drop: + return "" + return f"[{label}](odoo://view/{json.dumps(view_description)})" + + for cell in spreadsheet.cells: + cell["content"] = adapt_view_link(cell["content"]) + return (CommandAdapter("UPDATE_CELL", lambda cmd: dict(cmd, content=adapt_view_link(cmd.get("content")))),) + + + +def remove_pivots(spreadsheet: Spreadsheet, pivot_ids: List[str], insert_cmd_predicate: Callable[[Dict], bool]): + spreadsheet.delete_pivots(*pivot_ids) + + for cell in spreadsheet.cells: + cell["content"] = remove_data_source_function(cell["content"], pivot_ids, ["ODOO.PIVOT", "ODOO.PIVOT.HEADER"]) + + def adapt_insert(cmd): + pivot = create_data_source_from_cmd(cmd) + if insert_cmd_predicate(pivot): + pivot_ids.append(pivot.id) + return Drop + return cmd + + def adapt_cmd_with_pivotId(cmd): + pivot_id = cmd["pivotId"] + if str(pivot_id) in pivot_ids: + return Drop + return cmd + + def adapt_re_insert(cmd): + pivot_id = cmd["id"] + if str(pivot_id) in pivot_ids: + return Drop + return cmd + + def adapt_global_filters(cmd): + if cmd.get("pivot"): + for pivot_id in pivot_ids: + cmd["pivot"].pop(str(pivot_id), None) + return cmd + + def adapt_update_cell(cmd): + content = remove_data_source_function(cmd.get("content"), pivot_ids, ["ODOO.PIVOT", "ODOO.PIVOT.HEADER"]) + if not content: + return Drop + cmd["content"] = content + + return ( + CommandAdapter("INSERT_PIVOT", adapt_insert), + CommandAdapter("RE_INSERT_PIVOT", adapt_re_insert), + CommandAdapter("UPDATE_ODOO_PIVOT_DOMAIN", adapt_cmd_with_pivotId), + CommandAdapter("RENAME_ODOO_PIVOT", adapt_cmd_with_pivotId), + CommandAdapter("REMOVE_PIVOT", adapt_cmd_with_pivotId), + CommandAdapter("ADD_GLOBAL_FILTER", adapt_global_filters), + CommandAdapter("EDIT_GLOBAL_FILTER", adapt_global_filters), + CommandAdapter("UPDATE_CELL", adapt_update_cell), + ) + + + +def remove_lists(spreadsheet: Spreadsheet, list_ids: List[str], insert_cmd_predicate: Callable[[Dict], bool]): + spreadsheet.delete_lists(*list_ids) + for cell in spreadsheet.cells: + cell["content"] = remove_data_source_function(cell["content"], list_ids, ["ODOO.LIST", "ODOO.LIST.HEADER"]) + + def adapt_insert(cmd): + list = create_data_source_from_cmd(cmd) + if insert_cmd_predicate(list): + list_ids.append(list.id) + return Drop + return cmd + + def adapt_re_insert(cmd): + list_id = cmd["id"] + if list_id in list_ids: + return Drop + return cmd + + def adapt_cmd_with_listId(cmd): + list_id = cmd["listId"] + if list_id in list_ids: + return Drop + return cmd + + def adapt_global_filters(cmd): + if cmd.get("list"): + for list_id in list_ids: + cmd["list"].pop(list_id, None) + if not cmd["list"]: + del cmd["list"] + return cmd + + def adapt_update_cell(cmd): + content = remove_data_source_function(cmd.get("content"), list_ids, ["ODOO.LIST", "ODOO.LIST.HEADER"]) + if not content: + return Drop + cmd["content"] = content + + return ( + CommandAdapter("INSERT_ODOO_LIST", adapt_insert), + CommandAdapter("RE_INSERT_ODOO_LIST", adapt_re_insert), + CommandAdapter("RENAME_ODOO_LIST", adapt_cmd_with_listId), + CommandAdapter("UPDATE_ODOO_LIST_DOMAIN", adapt_cmd_with_listId), + CommandAdapter("REMOVE_ODOO_LIST", adapt_cmd_with_listId), + CommandAdapter("ADD_GLOBAL_FILTER", adapt_global_filters), + CommandAdapter("EDIT_GLOBAL_FILTER", adapt_global_filters), + CommandAdapter("UPDATE_CELL", adapt_update_cell), + ) + +def remove_odoo_charts(spreadsheet: Spreadsheet, chart_ids: List[str], insert_cmd_predicate: Callable[[Dict], bool]): + spreadsheet.delete_figures(*chart_ids) + + def adapt_create_chart(cmd): + chart = create_data_source_from_cmd(cmd) + if cmd["definition"]["type"].startswith("odoo_") and insert_cmd_predicate(chart): + chart_ids.append(cmd["id"]) + return Drop + + def adapt_chart_cmd_with_id(cmd): + + if cmd["id"] in chart_ids: + return Drop + + def adapt_global_filters(cmd): + if cmd.get("chart"): + for chart_id in chart_ids: + cmd["chart"].pop(chart_id, None) + return ( + CommandAdapter("CREATE_CHART", adapt_create_chart), + CommandAdapter("UPDATE_CHART", adapt_chart_cmd_with_id), + CommandAdapter("DELETE_FIGURE", adapt_chart_cmd_with_id), + CommandAdapter("ADD_GLOBAL_FILTER", adapt_global_filters), + CommandAdapter("EDIT_GLOBAL_FILTER", adapt_global_filters), + ) + + +def remove_data_source_function(content, data_source_ids, functions, filter_ast=lambda ast: True): + """check if the cell content contains a function that references a data source + >>> remove_data_source_function('=ODOO.PIVOT(1, "revenue")', [1], {"ODOO.PIVOT"}) + '' + >>> remove_data_source_function('=ODOO.PIVOT(2, "revenue")', [1], {"ODOO.PIVOT"}) + '=ODOO.PIVOT(2, "revenue")'. + """ + + def adapter(fun_call): + if filter_ast(fun_call): + # remove the func call and set something else instead + return Literal("BOOLEAN", False) + return fun_call + + new_content = transform_data_source_functions(content, data_source_ids, functions, adapter) + return content if new_content == content else "" diff --git a/src/util/spreadsheet/models.py b/src/util/spreadsheet/models.py new file mode 100644 index 000000000..a2b69fd5f --- /dev/null +++ b/src/util/spreadsheet/models.py @@ -0,0 +1,160 @@ + +from .data_wrappers import Spreadsheet, create_data_source_from_cmd +from .misc import apply_in_all_spreadsheets, adapt_view_link_cells, remove_lists, remove_pivots, remove_odoo_charts +from .revisions import CommandAdapter, Drop, transform_revisions_data, transform_commands + + +def rename_model_in_all_spreadsheets(cr, old_value, new_value): + apply_in_all_spreadsheets(cr, old_value, (lambda data, revisions_data: rename_model(old_value, new_value, data, revisions_data))) + +# TODO remove cr argument +def rename_model(old, new, data, revisions = ()): + spreadsheet = Spreadsheet(data) + adapters = _rename_model_in_list(spreadsheet, old, new) + adapters += _rename_model_in_pivot(spreadsheet, old, new) + adapters += _rename_model_in_filters(spreadsheet, old, new) + adapters += _rename_model_in_charts(spreadsheet, old, new) + adapters += _rename_model_in_view_link(spreadsheet, old, new) + return spreadsheet.data, transform_revisions_data(revisions, *adapters) + +def remove_model_in_all_spreadsheets(cr, model): + apply_in_all_spreadsheets(cr, model, (lambda data, revisions_data: remove_model(model, data, revisions_data))) + +def remove_model(model: str, data, revisions = ()) -> str: + spreadsheet = Spreadsheet(data) + adapters = _remove_model_from_lists(model, spreadsheet) + adapters += _remove_model_from_pivots(model, spreadsheet) + adapters += _remove_model_from_charts(model, spreadsheet) + adapters += _remove_model_from_filters(model, spreadsheet) + adapters += _remove_model_from_view_link(model, spreadsheet) + spreadsheet.clean_empty_cells() + return spreadsheet.data, transform_revisions_data(revisions, *adapters) + +def _rename_model_in_charts(spreadsheet: Spreadsheet, old, new): + for chart in spreadsheet.odoo_charts: + if chart.model == old: + chart.model = new + + def adapt_insert(cmd): + if cmd["definition"]["type"].startswith("odoo_"): + chart = create_data_source_from_cmd(cmd) + if chart.model == old: + chart.model = new + + return (CommandAdapter("CREATE_CHART", adapt_insert),) + + +def _rename_model_in_list(spreadsheet: Spreadsheet, old, new): + for olist in spreadsheet.lists: + if olist.model == old: + olist.model = new + + def adapt_insert(cmd): + olist = create_data_source_from_cmd(cmd) + if olist.model == old: + olist.model = new + + return (CommandAdapter("INSERT_ODOO_LIST", adapt_insert),) + + +def _rename_model_in_pivot(spreadsheet: Spreadsheet, old, new): + for pivot in spreadsheet.pivots: + if pivot.model == old: + pivot.model = new + + def adapt_insert(cmd): + pivot = create_data_source_from_cmd(cmd) + if pivot.model == old: + pivot.model = new + + return (CommandAdapter("INSERT_PIVOT", adapt_insert),) + + +def _rename_model_in_filters(spreadsheet: Spreadsheet, old, new): + def rename_relational_filter(gfilter): + if gfilter["type"] == "relation" and gfilter["modelName"] == old: + gfilter["modelName"] = new + + for gfilter in spreadsheet.global_filters: + rename_relational_filter(gfilter) + + def adapt_insert(cmd): + rename_relational_filter(cmd["filter"]) + + return ( + CommandAdapter("ADD_GLOBAL_FILTER", adapt_insert), + CommandAdapter("EDIT_GLOBAL_FILTER", adapt_insert), + ) + + +def _rename_model_in_view_link(spreadsheet: Spreadsheet, old, new): + def adapt_view_link(action): + if action["modelName"] == old: + action["modelName"] = new + + return adapt_view_link_cells(spreadsheet, adapt_view_link) + + +def _remove_model_from_lists(model, spreadsheet: Spreadsheet): + lists_to_delete = [list.id for list in spreadsheet.lists if list.model == model] + return remove_lists( + spreadsheet, + lists_to_delete, + lambda list: list.model == model, + ) + + +def _remove_model_from_pivots(model, spreadsheet: Spreadsheet): + pivots_to_delete = [pivot.id for pivot in spreadsheet.pivots if pivot.model == model] + return remove_pivots( + spreadsheet, + pivots_to_delete, + lambda pivot: pivot.model == model, + ) + + +def _remove_model_from_charts(model, spreadsheet: Spreadsheet): + chart_to_delete = [chart.id for chart in spreadsheet.odoo_charts if chart.model == model] + return remove_odoo_charts( + spreadsheet, + chart_to_delete, + lambda chart: chart.model == model, + ) + + +def _remove_model_from_filters(model, spreadsheet: Spreadsheet): + global_filters = spreadsheet.global_filters + to_delete = [ + gFilter["id"] for gFilter in global_filters if gFilter["type"] == "relation" and gFilter["modelName"] == model + ] + spreadsheet.delete_global_filters(*to_delete) + + def adapt_edit_filter(cmd): + if cmd["filter"]["id"] in to_delete: + return Drop + return cmd + + def adapt_add_filter(cmd): + if cmd["filter"]["type"] == "relation" and cmd["filter"]["modelName"] == model: + to_delete.append(cmd["filter"]["id"]) + return Drop + return cmd + + def adapt_remove_filter(cmd): + if cmd["id"] in to_delete: + return Drop + return cmd + + return ( + CommandAdapter("ADD_GLOBAL_FILTER", adapt_add_filter), + CommandAdapter("EDIT_GLOBAL_FILTER", adapt_edit_filter), + CommandAdapter("REMOVE_GLOBAL_FILTER", adapt_remove_filter), + ) + + +def _remove_model_from_view_link(model, spreadsheet: Spreadsheet): + def adapt_view_link(action): + if action["modelName"] == model: + return Drop + + return adapt_view_link_cells(spreadsheet, adapt_view_link) diff --git a/src/util/spreadsheet/o_spreadsheet.py b/src/util/spreadsheet/o_spreadsheet.py new file mode 100644 index 000000000..342efc936 --- /dev/null +++ b/src/util/spreadsheet/o_spreadsheet.py @@ -0,0 +1,245 @@ +import math +import re + + +INITIAL_SHEET_ID = "Sheet1" +DEFAULT_REVISION_ID = "START_REVISION" + + +def load(data=None) -> dict: + """ + Load spreadsheet data and try to fix missing fields/corrupted state by providing + sensible default values + """ + + if not data: + return _create_empty_workbook() + data = {**_create_empty_workbook(), **data} + data["sheets"] = [ + {**_create_empty_sheet("Sheet{}".format(i + 1), "Sheet{}".format(i + 1)), **sheet} + for i, sheet in enumerate(data["sheets"]) + ] + if not data["sheets"]: + data["sheets"].append(_create_empty_sheet(INITIAL_SHEET_ID, "Sheet1")) + return data + + +def _create_empty_sheet(sheet_id: str, name: str) -> dict: + return { + "id": sheet_id, + "name": name, + "cells": {}, + "figures": [], + "isVisible": True, + } + + +def _create_empty_workbook(sheet_name: str = "Sheet1") -> dict: + return { + "sheets": [_create_empty_sheet(INITIAL_SHEET_ID, sheet_name)], + "styles": {}, + "formats": {}, + "borders": {}, + "revisionId": DEFAULT_REVISION_ID, + } + + +# def upgrade_data(data, upgrade_functions, to_version, version_field="version"): +# data = load(data) +# upgrade_functions.sort(key=lambda x: x[0]) +# for upgrade_version, upgrade in upgrade_functions: +# if data.get(version_field, 0) < upgrade_version: +# upgrade(data) +# data[version_field] = upgrade_version +# if upgrade_version == to_version: +# return data +# return data + + +# Reference of a column header (eg. A, AB) +col_header = re.compile(r"^([A-Z]{1,3})+$") +cell_reference = r"\$?([A-Z]{1,3})\$?([0-9]{1,7})" +# Reference of a normal range or a full row range (eg. A1:B1, 1:$5, $A2:5) +full_row_xc = r"(\$?[A-Z]{1,3})?\$?[0-9]{1,7}\s*:\s*(\$?[A-Z]{1,3})?\$?[0-9]{1,7}\s*" +# Reference of a normal range or a column row range (eg. A1:B1, A:$B, $A1:C) +full_col_xc = r"\$?[A-Z]{1,3}(\$?[0-9]{1,7})?\s*:\s*\$?[A-Z]{1,3}(\$?[0-9]{1,7})?\s*" +# Reference of a cell or a range, it can be a bounded range, a full row or a full column +range_reference = re.compile(r"^\s*('.+'!|[^']+!)?" + "({}|{}|{})$".format(cell_reference, full_row_xc, full_col_xc)) +# Reference of a column (eg. A, $CA, Sheet1!B) +col_reference = re.compile(r"^\s*('.+'!|[^']+!)?\$?([A-Z]{1,3})$") +# Reference of a row (eg. 1, 59, Sheet1!9) +row_reference = re.compile(r"^\s*('.+'!|[^']+!)?\$?([0-9]{1,7})$") + + +cell_reference = re.compile(r"\$?([A-Z]{1,3})\$?([0-9]{1,7})") + + +def zone_to_xc(zone) -> str: + top = zone["top"] + bottom = zone["bottom"] + left = zone["left"] + right = zone["right"] + + has_header = zone.get("hasHeader", False) + is_one_cell = top == bottom and left == right + + if bottom is None and right is not None: + return f"{number_to_letters(left)}:{number_to_letters(right)}" if top == 0 and not has_header else f"{to_xc(left, top)}:{number_to_letters(right)}" + elif right is None and bottom is not None: + return f"{top + 1}:{bottom + 1}" if left == 0 and not has_header else f"{to_xc(left, top)}:{bottom + 1}" + elif bottom is not None and right is not None: + return to_xc(left, top) if is_one_cell else f"{to_xc(left, top)}:{to_xc(right, bottom)}" + + raise ValueError("Bad zone format") + +def letters_to_number(letters): + """ + Convert a string (describing a column) to its number value. + >>> letters_to_number("A") + 0 + >>> letters_to_number("Z") + 25 + >>> letters_to_number("AA") + 26 + """ + result = 0 + length = len(letters) + for i in range(length): + n = ord(letters[i]) - 65 + (i < length - 1) + result += n * 26 ** (length - i - 1) + return result + + +def number_to_letters(n): + """ + >>> number_to_letters(0) + 'A' + >>> number_to_letters(1) + 'B' + """ + if n < 0: + raise ValueError(f"number must be positive. Got {n}") + if n < 26: + return chr(65 + n) + else: + return number_to_letters(math.floor(n / 26) - 1) + number_to_letters(n % 26) + + +def to_cartesian(xc): + """Convert a cell reference to a cartesian coordinate + >>> to_cartesian("A1") + {'col': 0, 'row': 0} + >>> to_cartesian("B2") + {'col': 1, 'row': 1} + """ + xc = xc.upper().strip() + match = cell_reference.match(xc) + if match: + (letters, numbers) = match.groups() + col = letters_to_number(letters) + row = int(numbers) - 1 + return {"col": col, "row": row} + raise ValueError(f"Invalid cell description: {xc}") + + +def is_col_reference(xc): + return col_reference.match(xc) + + +def is_row_reference(xc): + return row_reference.match(xc) + + +def is_col_header(str): + return col_header.match(str) + + +def to_zone_without_boundary_changes(xc): + xc = xc.split("!").pop() + ranges = [x.strip() for x in xc.replace("$", "").split(":")] + full_col = full_row = has_header = False + first_range_part = ranges[0] + second_range_part = len(ranges) == 2 and ranges[1] + if is_col_reference(first_range_part): + left = right = letters_to_number(first_range_part) + top = bottom = 0 + full_col = True + elif is_row_reference(first_range_part): + top = bottom = int(first_range_part) - 1 + left = right = 0 + full_row = True + else: + c = to_cartesian(first_range_part) + left = right = c["col"] + top = bottom = c["row"] + has_header = True + if len(ranges) == 2: + if is_col_reference(second_range_part): + right = letters_to_number(second_range_part) + full_col = True + elif is_row_reference(second_range_part): + bottom = int(second_range_part) - 1 + full_row = True + else: + c = to_cartesian(second_range_part) + right = c["col"] + bottom = c["row"] + if full_col: + top = bottom + if full_row: + left = right + has_header = True + if full_col and full_row: + raise ValueError("Wrong zone xc. The zone cannot be at the same time a full column and a full row") + zone = { + "top": top, + "left": left, + "bottom": None if full_col else bottom, + "right": None if full_row else right, + } + has_header = has_header and (full_row or full_col) + if has_header: + zone["has_header"] = has_header + return zone + + +def to_unbounded_zone(xc): + zone = to_zone_without_boundary_changes(xc) + if zone["right"] is not None and zone["right"] < zone["left"]: + zone["left"], zone["right"] = zone["right"], zone["left"] + if zone["bottom"] is not None and zone["bottom"] < zone["top"]: + zone["top"], zone["bottom"] = zone["bottom"], zone["top"] + return zone + + +def to_zone(xc): + """Convert from a cartesian reference to a zone. + >>> to_zone("A1") + {'top': 0, 'left': 0, 'bottom': 0, 'right': 0} + >>> to_zone("B1:B3") + {'top': 0, 'left': 1, 'bottom': 2, 'right': 1} + >>> to_zone("Sheet1!A1") + {'top': 0, 'left': 0, 'bottom': 0, 'right': 0} + """ + zone = to_unbounded_zone(xc) + if zone["bottom"] is None or zone["right"] is None: + raise ValueError("This does not support unbounded ranges") + return zone + + +def to_xc(col, row): + """ + >>> to_xc(0, 0) + 'A1' + >>> to_xc(1, 1) + 'B2' + """ + return number_to_letters(col) + str(row + 1) + + +def overlap(z1, z2) -> bool: + if z1["bottom"] < z2["top"] or z2["bottom"] < z1["top"]: + return False + if z1["right"] < z2["left"] or z2["right"] < z1["left"]: + return False + return True \ No newline at end of file diff --git a/src/util/spreadsheet/parser.py b/src/util/spreadsheet/parser.py new file mode 100644 index 000000000..92f6d8a08 --- /dev/null +++ b/src/util/spreadsheet/parser.py @@ -0,0 +1,201 @@ +from dataclasses import dataclass +from typing import Callable, ClassVar, Iterable, List, Union + +from .tokenizer import ( + POSTFIX_UNARY_OPERATORS, + tokenize, +) + +OP_PRIORITY = { + "^": 30, + "%": 30, + "*": 20, + "/": 20, + "+": 15, + "-": 15, + "&": 13, + ">": 10, + "<>": 10, + ">=": 10, + "<": 10, + "<=": 10, + "=": 10, +} + +UNARY_OPERATORS_PREFIX = ["-", "+"] + +AST = Union["Literal", "BinaryOperation", "UnaryOperation", "FunctionCall"] + + +@dataclass +class Literal: + type: str + value: str + + +@dataclass +class BinaryOperation: + type: ClassVar[str] = "BIN_OPERATION" + value: str + left: AST + right: AST + + +@dataclass +class UnaryOperation: + type: ClassVar[str] = "UNARY_OPERATION" + value: str + operand: AST + postfix: bool = False + + +@dataclass +class FunctionCall(Literal): + type: ClassVar[str] = "FUNCALL" + value: str + args: List[AST] + + +def parse(formula): + """Parse a spreadsheet formula and return an AST""" + tokens = tokenize(formula) + tokens = [token for token in tokens if token[0] != "DEBUGGER"] + if tokens[0][1] == "=": + tokens = tokens[1:] + return parse_expression(tokens) + + +def parse_expression(tokens, binding_power=0) -> AST: + if not tokens: + raise ValueError("Unexpected end of formula") + left = parse_operand(tokens) + # as long as we have operators with higher priority than the parent one, + # continue parsing the expression since we are in a child sub-expression + while tokens and tokens[0][0] == "OPERATOR" and OP_PRIORITY[tokens[0][1]] > binding_power: + operator = tokens.pop(0)[1] + if operator in POSTFIX_UNARY_OPERATORS: + left = UnaryOperation(operator, left, postfix=True) + else: + right = parse_expression(tokens, OP_PRIORITY[operator]) + left = BinaryOperation(operator, left, right) + return left + + +def parse_function_args(tokens) -> Iterable[AST]: + consume_or_raise(tokens, "LEFT_PAREN", error_msg="Missing opening parenthesis") + if not tokens: + raise ValueError("Unexpected end of formula") + next_token = tokens[0] + if next_token[0] == "RIGHT_PAREN": + consume_or_raise(tokens, "RIGHT_PAREN") + yield from [] + return + yield parse_single_arg(tokens) + while tokens and tokens[0][0] != "RIGHT_PAREN": + consume_or_raise(tokens, "ARG_SEPARATOR", error_msg="Wrong function call") + yield parse_single_arg(tokens) + consume_or_raise(tokens, "RIGHT_PAREN", error_msg="Missing closing parenthesis") + + +def parse_single_arg(tokens) -> AST: + next_token = tokens[0] + if next_token[0] in {"ARG_SEPARATOR", "RIGHT_PAREN"}: + # arg is empty: "sum(1,,2)" "sum(,1)" "sum(1,)" + return Literal("EMPTY", "") + return parse_expression(tokens) + + +def parse_operand(tokens: list) -> AST: + token = tokens.pop(0) + if token[0] == "LEFT_PAREN": + expr = parse_expression(tokens) + consume_or_raise(tokens, "RIGHT_PAREN", error_msg="Missing closing parenthesis") + return expr + elif token[0] == "STRING": + return Literal(token[0], token[1].strip('"')) + elif token[0] in ["NUMBER", "BOOLEAN", "UNKNOWN"]: + return Literal(token[0], token[1]) + elif token[0] == "OPERATOR" and token[1] in UNARY_OPERATORS_PREFIX: + operator = token[1] + return UnaryOperation(operator, parse_expression(tokens, OP_PRIORITY[operator])) + elif token[0] == "SYMBOL": + # breakpoint() + args = list(parse_function_args(tokens)) + return FunctionCall(token[1], list(args)) + raise ValueError(f"Unexpected token: {token}") + + +def consume_or_raise(tokens, token_type, error_msg="Unexpected token"): + if not tokens or tokens.pop(0)[0] != token_type: + raise ValueError(error_msg) + + +def ast_to_string(ast: AST) -> str: + """Convert an AST to the corresponding string.""" + if ast.type == "BIN_OPERATION": + operator = ast.value + left = left_to_string(ast.left, operator) + right = right_to_string(ast.right, operator) + return f"{left}{operator}{right}" + elif ast.type == "UNARY_OPERATION": + operator = ast.value + if ast.postfix: + return f"{left_to_string(ast.operand, operator)}{operator}" + else: + return f"{operator}{right_to_string(ast.operand, operator)}" + elif ast.type == "FUNCALL": + args = (ast_to_string(arg) for arg in ast.args) + return f"{ast.value}({','.join(args)})" + elif ast.type == "STRING": + return f'"{ast.value}"' + elif ast.type in {"NUMBER", "BOOLEAN", "EMPTY", "UNKNOWN"}: + return ast.value + raise ValueError("Unexpected node type: " + ast.type) + + +def left_to_string(left_expr: AST, parent_operator: str) -> str: + """Convert the left operand of a binary operation to the corresponding string + and enclose the result inside parenthesis if necessary.""" + if left_expr.type == "BIN_OPERATION" and OP_PRIORITY[left_expr.value] < OP_PRIORITY[parent_operator]: + return f"({ast_to_string(left_expr)})" + return ast_to_string(left_expr) + + +ASSOCIATIVE_OPERATORS = {"*", "+", "&"} + + +def right_to_string(right_expr: AST, parent_operator: str) -> str: + """Convert the right operand of a binary operation to the corresponding string + and enclose the result inside parenthesis if necessary.""" + if right_expr.type != "BIN_OPERATION": + return ast_to_string(right_expr) + elif (OP_PRIORITY[right_expr.value] < OP_PRIORITY[parent_operator]) or ( + OP_PRIORITY[right_expr.value] == OP_PRIORITY[parent_operator] and parent_operator not in ASSOCIATIVE_OPERATORS + ): + return f"({ast_to_string(right_expr)})" + return ast_to_string(right_expr) + + +def transform_ast_nodes(ast: AST, node_type: str, node_transformer: Callable[[AST], AST]) -> AST: + """Transform the nodes of an AST using the provided node_transformer function.""" + if ast.type == node_type: + ast = node_transformer(ast) + if ast.type == "BIN_OPERATION": + return BinaryOperation( + ast.value, + transform_ast_nodes(ast.left, node_type, node_transformer), + transform_ast_nodes(ast.right, node_type, node_transformer), + ) + elif ast.type == "UNARY_OPERATION": + return UnaryOperation( + ast.value, + transform_ast_nodes(ast.operand, node_type, node_transformer), + ast.postfix, + ) + elif ast.type == "FUNCALL": + return FunctionCall( + ast.value, + list(transform_ast_nodes(arg, node_type, node_transformer) for arg in ast.args), + ) + else: + return ast diff --git a/src/util/spreadsheet/revisions.py b/src/util/spreadsheet/revisions.py new file mode 100644 index 000000000..b037918af --- /dev/null +++ b/src/util/spreadsheet/revisions.py @@ -0,0 +1,76 @@ +import json +from collections import namedtuple +from copy import deepcopy +from typing import Iterable + +from odoo.upgrade import util + +CommandAdapter = namedtuple("CommandAdapter", ["command_type", "adapt"]) + +Drop = object() + +# not used +def transform_revisions(cr, revisions, *adapters: Iterable[CommandAdapter]): + if not len(adapters): + return + if not util.table_exists(cr, "spreadsheet_revision"): + return + cmd_types = [x[0] for x in adapters] + cr.execute( + """ + SELECT id, commands + FROM spreadsheet_revision + WHERE commands SIMILAR TO %s + """, + (f"%{'%|%'.join(cmd_types)}%",), + ) + for revision_id, data in cr.fetchall(): + data = json.loads(data) + commands = data.get("commands", []) + if not commands: + continue + new_commands = transform_commands(commands, *adapters) + if new_commands is None: + continue + data["commands"] = new_commands + cr.execute( + """ + UPDATE spreadsheet_revision + SET commands=%s + WHERE id=%s + """, + [json.dumps(data), revision_id], + ) + +def transform_revisions_data(revisions, *adapters: Iterable[CommandAdapter]): + for data in revisions: + commands = data.get("commands", []) + if not commands: + continue + new_commands = transform_commands(commands, *adapters) + if new_commands is None: + continue + # ensures it's jsonyfiable (converts tuples to lists) (probably should be removed) + data["commands"] = json.loads(json.dumps(new_commands)) + return revisions + + +def transform_commands(commands, *adapters: Iterable[CommandAdapter]): + to_update = False + new_commands = [] + for command in commands: + adapted_command = command + for adapter_type, adapter in adapters: + if command["type"] == adapter_type and adapted_command is not Drop: + to_adapt = deepcopy(adapted_command) + result = adapter(to_adapt) or to_adapt + if result is Drop: + to_update = True + adapted_command = Drop + continue + if result != adapted_command: + to_update = True + adapted_command = result + if adapted_command is not Drop: + new_commands.append(adapted_command) + return new_commands if to_update else None From 6e508b4f01772c0b82e6c8d6f143960d9f8762bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9mi=20Rahir=20=28rar=29?= Date: Wed, 31 Jul 2024 14:44:31 +0200 Subject: [PATCH 3/3] wip --- src/util/spreadsheet/misc.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/util/spreadsheet/misc.py b/src/util/spreadsheet/misc.py index 31dc12a95..22211db7c 100644 --- a/src/util/spreadsheet/misc.py +++ b/src/util/spreadsheet/misc.py @@ -85,11 +85,11 @@ def _magic_spreadsheet_field(cr): return cr.fetchone()[0] and 'spreadsheet_binary_data' or 'data' def apply_in_all_spreadsheets(cr, like_pattern, callback): - print("like pattern : ", like_pattern) + # print("like pattern : ", like_pattern) b = False # upgrade the initial data and all revisions based on it for attachment_id, res_model, res_id, db_datas in read_spreadsheet_initial_data(cr, like_pattern): - print("attachment : ", attachment_id) + print("attachment data id: ", attachment_id) print("datas: ", len(db_datas)) b = True @@ -122,7 +122,7 @@ def apply_in_all_spreadsheets(cr, like_pattern, callback): b = False # upgrade snapshots for attachment_id, _res_model, _res_id, db_datas in read_spreadsheet_snapshots(cr, like_pattern): - print("attachment : ", attachment_id) + print("attachment snapshot id: ", attachment_id) b=True data, revisions = callback(db_datas, [])