From 8fe3a3c7ac9391a2b3073c40e11d72568380fee6 Mon Sep 17 00:00:00 2001 From: Francesco Zanetta Date: Mon, 2 Feb 2026 10:46:32 +0100 Subject: [PATCH 1/2] support 'key:type' syntax --- .../datasets/create/sources/grib_index.py | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/src/anemoi/datasets/create/sources/grib_index.py b/src/anemoi/datasets/create/sources/grib_index.py index c8435d87a..5f3718882 100644 --- a/src/anemoi/datasets/create/sources/grib_index.py +++ b/src/anemoi/datasets/create/sources/grib_index.py @@ -200,9 +200,8 @@ def _add_grib(self, **kwargs: Any) -> None: Key-value pairs representing the GRIB record fields. """ assert self.update - + kwargs = _replace_colons(kwargs) try: - self.cursor.execute( f""" INSERT INTO grib_index ({', '.join(kwargs.keys())}) @@ -253,8 +252,13 @@ def _ensure_columns(self, columns: list[str]) -> None: """ assert self.update + columns = _replace_colons(columns) + existing_columns = self._all_columns() new_columns = [column for column in columns if column not in existing_columns] + + existing_columns = _replace_colons(existing_columns) + new_columns = _replace_colons(new_columns) if not new_columns: return @@ -265,7 +269,7 @@ def _ensure_columns(self, columns: list[str]) -> None: self.cursor.execute(f"ALTER TABLE grib_index ADD COLUMN {column} TEXT not null default ''") self.cursor.execute("""DROP INDEX IF EXISTS idx_grib_index_all_keys""") - all_columns = self._all_columns() + all_columns = _replace_colons(self._all_columns()) self.cursor.execute( f""" @@ -642,6 +646,16 @@ def _execute( return FieldArray(result) + +def _replace_colons(d: dict | list) -> dict | list: + """Replace colons in column names with underscores.""" + if isinstance(d, dict): + return {k.replace(":", "_"): v for k, v in d.items()} + elif isinstance(d, list): + return [el.replace(":", "_") for el in d] + else: + raise TypeError("Input must be a dictionary or a list.") + def factorise(lst): """Factorise a list of (dates, request) tuples by merging dates with identical requests.""" content = dict() From 61f3e8457ce9e94970e68f8a730c41cc13efb4f3 Mon Sep 17 00:00:00 2001 From: Francesco Zanetta Date: Thu, 5 Feb 2026 19:50:10 +0100 Subject: [PATCH 2/2] quote instead of replace --- .../datasets/create/sources/grib_index.py | 75 +++++++++++-------- 1 file changed, 45 insertions(+), 30 deletions(-) diff --git a/src/anemoi/datasets/create/sources/grib_index.py b/src/anemoi/datasets/create/sources/grib_index.py index 5f3718882..9de2ad624 100644 --- a/src/anemoi/datasets/create/sources/grib_index.py +++ b/src/anemoi/datasets/create/sources/grib_index.py @@ -18,7 +18,9 @@ import earthkit.data as ekd import tqdm +from anemoi.transform.fields import new_field_from_grid from anemoi.transform.flavour import RuleBasedFlavour +from anemoi.transform.grids import grid_registry from cachetools import LRUCache from earthkit.data.indexing.fieldlist import FieldArray @@ -102,6 +104,21 @@ def __init__( self.warnings = {} self.cache = {} + def _quote_column(self, column: str) -> str: + """Quote a column name for use in SQL queries. + + Parameters + ---------- + column : str + The column name to quote. + + Returns + ------- + str + The quoted column name. + """ + return f'"{column}"' + def _create_tables(self) -> None: """Create the necessary tables in the database.""" assert self.update @@ -126,7 +143,7 @@ def _create_tables(self) -> None: _path_id INTEGER not null, _offset INTEGER not null, _length INTEGER not null, - {', '.join(f"{key} TEXT not null default ''" for key in columns)}, + {', '.join(f"{self._quote_column(key)} TEXT not null default ''" for key in columns)}, FOREIGN KEY(_path_id) REFERENCES paths(id)) """ ) # , @@ -141,15 +158,15 @@ def _create_tables(self) -> None: self.cursor.execute( f""" CREATE UNIQUE INDEX IF NOT EXISTS idx_grib_index_all_keys - ON grib_index ({', '.join(columns)}) + ON grib_index ({', '.join(self._quote_column(col) for col in columns)}) """ ) for key in columns: self.cursor.execute( f""" - CREATE INDEX IF NOT EXISTS idx_grib_index_{key} - ON grib_index ({key}) + CREATE INDEX IF NOT EXISTS idx_grib_index_{key.replace(':', '_')} + ON grib_index ({self._quote_column(key)}) """ ) @@ -200,11 +217,12 @@ def _add_grib(self, **kwargs: Any) -> None: Key-value pairs representing the GRIB record fields. """ assert self.update - kwargs = _replace_colons(kwargs) + try: + self.cursor.execute( f""" - INSERT INTO grib_index ({', '.join(kwargs.keys())}) + INSERT INTO grib_index ({', '.join(self._quote_column(k) for k in kwargs.keys())}) VALUES ({', '.join('?' for _ in kwargs)}) """, tuple(kwargs.values()), @@ -217,7 +235,8 @@ def _add_grib(self, **kwargs: Any) -> None: for n in ("_path_id", "_offset", "_length"): kwargs.pop(n) self.cursor.execute( - "SELECT * FROM grib_index WHERE " + " AND ".join(f"{key} = ?" for key in kwargs.keys()), + "SELECT * FROM grib_index WHERE " + + " AND ".join(f"{self._quote_column(key)} = ?" for key in kwargs.keys()), tuple(kwargs.values()), ) existing_record = self.cursor.fetchone() @@ -252,13 +271,8 @@ def _ensure_columns(self, columns: list[str]) -> None: """ assert self.update - columns = _replace_colons(columns) - existing_columns = self._all_columns() new_columns = [column for column in columns if column not in existing_columns] - - existing_columns = _replace_colons(existing_columns) - new_columns = _replace_colons(new_columns) if not new_columns: return @@ -266,23 +280,25 @@ def _ensure_columns(self, columns: list[str]) -> None: self._columns = None for column in new_columns: - self.cursor.execute(f"ALTER TABLE grib_index ADD COLUMN {column} TEXT not null default ''") + self.cursor.execute( + f"ALTER TABLE grib_index ADD COLUMN {self._quote_column(column)} TEXT not null default ''" + ) self.cursor.execute("""DROP INDEX IF EXISTS idx_grib_index_all_keys""") - all_columns = _replace_colons(self._all_columns()) + all_columns = self._all_columns() self.cursor.execute( f""" CREATE UNIQUE INDEX IF NOT EXISTS idx_grib_index_all_keys - ON grib_index ({', '.join(all_columns)}) + ON grib_index ({', '.join(self._quote_column(col) for col in all_columns)}) """ ) for key in all_columns: self.cursor.execute( f""" - CREATE INDEX IF NOT EXISTS idx_grib_index_{key} - ON grib_index ({key}) + CREATE INDEX IF NOT EXISTS idx_grib_index_{key.replace(':', '_')} + ON grib_index ({self._quote_column(key)}) """ ) @@ -319,6 +335,8 @@ def add_grib_file(self, path: str) -> None: self._unknown(path, field, i, param) self.warnings[param] = True + continue + self._ensure_columns(list(keys.keys())) self._add_grib( @@ -556,15 +574,14 @@ def retrieve(self, dates: list[Any], **kwargs: Any) -> Iterator[Any]: LOG.warning(f"Warning : {k} not in database columns, key discarded") continue if isinstance(v, list): - query += f" AND {k} IN ({', '.join('?' for _ in v)})" + query += f" AND {self._quote_column(k)} IN ({', '.join('?' for _ in v)})" params.extend([str(_) for _ in v]) else: - query += f" AND {k} = ?" + query += f" AND {self._quote_column(k)} = ?" params.append(str(v)) print("SELECT (query)", query) print("SELECT (params)", params) - self.cursor.execute(query, params) fetch = self.cursor.fetchall() @@ -613,6 +630,11 @@ def _execute( FieldArray An array of retrieved GRIB fields. """ + + grid_definition = kwargs.pop("grid_definition", None) + if grid_definition: + grid_definition = grid_registry.from_config(grid_definition) + index = GribIndex(indexdb) if flavour is not None: @@ -643,19 +665,12 @@ def _execute( field = flavour.apply(field) result.append(field) - return FieldArray(result) + if grid_definition is not None: + result = [new_field_from_grid(field, grid_definition) for field in result] + return FieldArray(result) -def _replace_colons(d: dict | list) -> dict | list: - """Replace colons in column names with underscores.""" - if isinstance(d, dict): - return {k.replace(":", "_"): v for k, v in d.items()} - elif isinstance(d, list): - return [el.replace(":", "_") for el in d] - else: - raise TypeError("Input must be a dictionary or a list.") - def factorise(lst): """Factorise a list of (dates, request) tuples by merging dates with identical requests.""" content = dict()