From fd67c6e3fe923c6ecda440c6a2334badbf0ea29a Mon Sep 17 00:00:00 2001 From: Florian Pinault Date: Thu, 19 Feb 2026 17:23:23 +0000 Subject: [PATCH 01/12] draft --- .../accumulate_utils/interval_generators.py | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/src/anemoi/datasets/create/sources/accumulate_utils/interval_generators.py b/src/anemoi/datasets/create/sources/accumulate_utils/interval_generators.py index 047b18f75..b94fcff4a 100644 --- a/src/anemoi/datasets/create/sources/accumulate_utils/interval_generators.py +++ b/src/anemoi/datasets/create/sources/accumulate_utils/interval_generators.py @@ -13,6 +13,7 @@ from abc import abstractmethod from typing import Iterable +from anemoi.utils.dates import as_datetime from anemoi.utils.dates import frequency_to_timedelta from anemoi.datasets.create.sources.accumulate_utils.covering_intervals import SignedInterval @@ -149,6 +150,73 @@ def __call__( return intervals +class CycleIntervalProvider(IntervalGenerator): + def __init__(self, **config: dict): + print(f"CycleIntervalProvider config: {config}") + self.reference = config.pop("start", datetime.datetime(1970, 1, 1, 0, 0)) + self.reference = as_datetime(self.reference) + + def split(s): + i, j = s.split("-") + return int(i), int(j) + + def normalise_steps(base_time, steps): + steps = steps.split("/") + return base_time, [split(s) for s in steps] + + self.config = {split(k): normalise_steps(*v) for k, v in config.items()} + + def covering_intervals(self, start: datetime.datetime, end: datetime.datetime) -> Iterable[SignedInterval]: + cycle_length_in_hours = max([k[1] for k in self.config.keys()]) + + i_start = (int((start - self.reference).total_seconds()) // 3600) % cycle_length_in_hours + i_end = (int((end - self.reference).total_seconds()) // 3600) % cycle_length_in_hours + # if i_start == 0: i_start = cycle_length_in_hours + if i_end == 0: + i_end = cycle_length_in_hours + print("💬", start) + print("💬", end) + print(f" -> CycleIntervalProvider covering_intervals for (i_start={i_start}, i_end={i_end})") + + if (i_start, i_end) not in self.config: + raise ValueError( + f"CycleIntervalProvider: no config to find ({i_start}, {i_end}) (start={start}, end={end}, {cycle_length_in_hours=})" + ) + + base_time, steps = self.config[(i_start, i_end)] + + base_datetime = datetime.datetime(end.year, end.month, end.day, base_time) + while base_datetime > end: + print( + f" ❌ -> CycleIntervalProvider: base_datetime {base_datetime} is after end {end}, going back one day" + ) + base_datetime -= datetime.timedelta(days=1) + + assert ( + base_datetime.hour == base_time + ), f"Base datetime hour {base_datetime.hour} does not match expected base time {base_time}" + + intervals = [] + for start_step, end_step in steps: + base_ = base_datetime + start_ = base_datetime + datetime.timedelta(hours=start_step) + end_ = base_datetime + datetime.timedelta(hours=end_step) + interval = SignedInterval(base=base_, start=start_, end=end_) + intervals.append(interval) + print("✅", interval) + + if not (any(i.start == start for i in intervals) or any((-i).start == start for i in intervals)): + raise ValueError( + f"CycleIntervalProvider: no interval starting at {start} (start={start}, end={end}, {cycle_length_in_hours=})" + ) + if not (any(i.end == end for i in intervals) or any((-i).end == end for i in intervals)): + raise ValueError( + f"CycleIntervalProvider: no interval ending at {end} (start={start}, end={end}, {cycle_length_in_hours=})" + ) + + return intervals + + def normalise_steps(steps_list: str | list[str]) -> list[list[int]]: """Convert the input step_list to a list of [start,end] pairs""" res = [] @@ -263,6 +331,11 @@ def _interval_generator_factory( case {"accumulated-from-start": params}: return AccumulatedFromStartIntervalGenerator(**params) + case {"type": "cycle", **params}: + return CycleIntervalProvider(**params) + case {"cycle": params}: + return CycleIntervalProvider(**params) + case {"accumulated-from-previous-step": params}: return AccumulatedFromPreviousStepIntervalGenerator(**params) case {"type": "accumulated-from-previous-step", **params}: From 95b53b55aeae784796387cec50977752dfc81ffe Mon Sep 17 00:00:00 2001 From: Florian Pinault Date: Thu, 19 Feb 2026 17:39:33 +0000 Subject: [PATCH 02/12] up --- .../accumulate_utils/interval_generators.py | 22 +++++++------------ src/anemoi/datasets/create/sources/mars.py | 13 +++++++++++ 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/src/anemoi/datasets/create/sources/accumulate_utils/interval_generators.py b/src/anemoi/datasets/create/sources/accumulate_utils/interval_generators.py index b94fcff4a..7b5efce11 100644 --- a/src/anemoi/datasets/create/sources/accumulate_utils/interval_generators.py +++ b/src/anemoi/datasets/create/sources/accumulate_utils/interval_generators.py @@ -150,7 +150,7 @@ def __call__( return intervals -class CycleIntervalProvider(IntervalGenerator): +class CycleIntervalProvider(SearchableIntervalGenerator): def __init__(self, **config: dict): print(f"CycleIntervalProvider config: {config}") self.reference = config.pop("start", datetime.datetime(1970, 1, 1, 0, 0)) @@ -171,12 +171,8 @@ def covering_intervals(self, start: datetime.datetime, end: datetime.datetime) - i_start = (int((start - self.reference).total_seconds()) // 3600) % cycle_length_in_hours i_end = (int((end - self.reference).total_seconds()) // 3600) % cycle_length_in_hours - # if i_start == 0: i_start = cycle_length_in_hours if i_end == 0: i_end = cycle_length_in_hours - print("💬", start) - print("💬", end) - print(f" -> CycleIntervalProvider covering_intervals for (i_start={i_start}, i_end={i_end})") if (i_start, i_end) not in self.config: raise ValueError( @@ -186,10 +182,8 @@ def covering_intervals(self, start: datetime.datetime, end: datetime.datetime) - base_time, steps = self.config[(i_start, i_end)] base_datetime = datetime.datetime(end.year, end.month, end.day, base_time) - while base_datetime > end: - print( - f" ❌ -> CycleIntervalProvider: base_datetime {base_datetime} is after end {end}, going back one day" - ) + # The base must be strictly before end so step 0-N lands on or before end. + while base_datetime >= end: base_datetime -= datetime.timedelta(days=1) assert ( @@ -198,12 +192,12 @@ def covering_intervals(self, start: datetime.datetime, end: datetime.datetime) - intervals = [] for start_step, end_step in steps: - base_ = base_datetime - start_ = base_datetime + datetime.timedelta(hours=start_step) - end_ = base_datetime + datetime.timedelta(hours=end_step) - interval = SignedInterval(base=base_, start=start_, end=end_) + interval = SignedInterval( + base=base_datetime, + start=base_datetime + datetime.timedelta(hours=start_step), + end=base_datetime + datetime.timedelta(hours=end_step), + ) intervals.append(interval) - print("✅", interval) if not (any(i.start == start for i in intervals) or any((-i).start == start for i in intervals)): raise ValueError( diff --git a/src/anemoi/datasets/create/sources/mars.py b/src/anemoi/datasets/create/sources/mars.py index 018d8f9c8..4caa7a754 100644 --- a/src/anemoi/datasets/create/sources/mars.py +++ b/src/anemoi/datasets/create/sources/mars.py @@ -213,6 +213,12 @@ def _expand_mars_request( if r["time"] not in user_time: continue + # SCDA stream auto-selection for ECMWF operational data: + # the 06 and 18 UTC runs use stream "scda" instead of "oper" + if r.get("class") == "od" and r.get("stream") == "oper": + if int(r.get("time", 0)) in (600, 1800): + r["stream"] = "scda" + requests.append(r) return requests @@ -435,6 +441,13 @@ def _execute( for d, interval in dates.intervals: context.trace("🌧️", "interval:", interval) _, r, _ = dates._adjust_request_to_interval(interval, request) + + # SCDA stream auto-selection for ECMWF operational data: + # the 06 and 18 UTC runs use stream "scda" instead of "oper" + if r.get("class") == "od" and r.get("stream") == "oper": + if int(r.get("time", 0)) in (600, 1800): + r["stream"] = "scda" + context.trace("🌧️", " adjusted request =", r) requests_.append(r) requests = requests_ From da0e60e2f918c4d0e2bf0349a8e2c9ec134a13c6 Mon Sep 17 00:00:00 2001 From: Florian Pinault Date: Thu, 19 Feb 2026 17:50:20 +0000 Subject: [PATCH 03/12] added checks. runs ok --- .../datasets/create/sources/accumulate.py | 7 ++++++ .../accumulate_utils/interval_generators.py | 24 ++++++++++++++++--- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/src/anemoi/datasets/create/sources/accumulate.py b/src/anemoi/datasets/create/sources/accumulate.py index ae9d13932..a16ae1f78 100644 --- a/src/anemoi/datasets/create/sources/accumulate.py +++ b/src/anemoi/datasets/create/sources/accumulate.py @@ -437,6 +437,13 @@ def _compute_accumulations( if not field_used: logs.raise_error("Field not used for any accumulation", field=field, field_interval=field_interval) + # some accumulators may be empty, remove them + # this can happen when the source provides fields that not exactly the one requested (scda/oper) + empty_accumulators = [k for k, acc in accumulators.items() if acc.values is None] + for k in empty_accumulators: + LOG.warning(f"Removing empty accumulator for date {k[0]} and key {k[1]}") + del accumulators[k] + # Final checks def check_missing_accumulators(): for date in dates: diff --git a/src/anemoi/datasets/create/sources/accumulate_utils/interval_generators.py b/src/anemoi/datasets/create/sources/accumulate_utils/interval_generators.py index 7b5efce11..86ab8efee 100644 --- a/src/anemoi/datasets/create/sources/accumulate_utils/interval_generators.py +++ b/src/anemoi/datasets/create/sources/accumulate_utils/interval_generators.py @@ -169,11 +169,24 @@ def normalise_steps(base_time, steps): def covering_intervals(self, start: datetime.datetime, end: datetime.datetime) -> Iterable[SignedInterval]: cycle_length_in_hours = max([k[1] for k in self.config.keys()]) + assert end > start, "CycleIntervalProvider only supports positive intervals (end must be after start)" + i_start = (int((start - self.reference).total_seconds()) // 3600) % cycle_length_in_hours i_end = (int((end - self.reference).total_seconds()) // 3600) % cycle_length_in_hours if i_end == 0: i_end = cycle_length_in_hours + if not (0 <= i_start < cycle_length_in_hours): + raise ValueError( + f"CycleIntervalProvider: i_start={i_start} out of range [0, {cycle_length_in_hours}) (start={start})" + ) + if not (0 < i_end <= cycle_length_in_hours): + raise ValueError( + f"CycleIntervalProvider: i_end={i_end} out of range (0, {cycle_length_in_hours}] (end={end})" + ) + if i_start >= i_end: + raise ValueError(f"CycleIntervalProvider: i_start={i_start} >= i_end={i_end} (start={start}, end={end})") + if (i_start, i_end) not in self.config: raise ValueError( f"CycleIntervalProvider: no config to find ({i_start}, {i_end}) (start={start}, end={end}, {cycle_length_in_hours=})" @@ -186,12 +199,17 @@ def covering_intervals(self, start: datetime.datetime, end: datetime.datetime) - while base_datetime >= end: base_datetime -= datetime.timedelta(days=1) - assert ( - base_datetime.hour == base_time - ), f"Base datetime hour {base_datetime.hour} does not match expected base time {base_time}" + if base_datetime.hour != base_time: + raise ValueError(f"base_datetime hour {base_datetime.hour} does not match expected base_time {base_time}") + if base_datetime >= end: + raise ValueError(f"base_datetime {base_datetime} must be strictly before end {end}") intervals = [] for start_step, end_step in steps: + if start_step < 0: + raise ValueError(f"start_step {start_step} must be non-negative") + if end_step <= start_step: + raise ValueError(f"end_step {end_step} must be greater than start_step {start_step}") interval = SignedInterval( base=base_datetime, start=base_datetime + datetime.timedelta(hours=start_step), From 6d5c90bdcb720651682bb1a545ece208ed3fbe92 Mon Sep 17 00:00:00 2001 From: Florian Pinault Date: Thu, 19 Feb 2026 17:53:11 +0000 Subject: [PATCH 04/12] clean --- src/anemoi/datasets/create/sources/mars.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/anemoi/datasets/create/sources/mars.py b/src/anemoi/datasets/create/sources/mars.py index 4caa7a754..b2b12f66b 100644 --- a/src/anemoi/datasets/create/sources/mars.py +++ b/src/anemoi/datasets/create/sources/mars.py @@ -442,12 +442,6 @@ def _execute( context.trace("🌧️", "interval:", interval) _, r, _ = dates._adjust_request_to_interval(interval, request) - # SCDA stream auto-selection for ECMWF operational data: - # the 06 and 18 UTC runs use stream "scda" instead of "oper" - if r.get("class") == "od" and r.get("stream") == "oper": - if int(r.get("time", 0)) in (600, 1800): - r["stream"] = "scda" - context.trace("🌧️", " adjusted request =", r) requests_.append(r) requests = requests_ From feb1d195e67f259790fafdcdd4c76a0383cd460e Mon Sep 17 00:00:00 2001 From: Florian Pinault Date: Wed, 25 Feb 2026 13:21:17 +0000 Subject: [PATCH 05/12] migrate accumulations --- .../datasets/commands/recipe/migrate.py | 543 +----------------- 1 file changed, 23 insertions(+), 520 deletions(-) diff --git a/src/anemoi/datasets/commands/recipe/migrate.py b/src/anemoi/datasets/commands/recipe/migrate.py index 03da61fbc..4f7449e46 100644 --- a/src/anemoi/datasets/commands/recipe/migrate.py +++ b/src/anemoi/datasets/commands/recipe/migrate.py @@ -9,537 +9,41 @@ import logging -import sys -from collections.abc import Sequence from typing import Any -from glom import assign -from glom import delete -from glom import glom - -from anemoi.datasets.create import validate_config -from anemoi.datasets.dumper import yaml_dump - LOG = logging.getLogger(__name__) -def find_paths(data, target_key=None, target_value=None, *path): - - matches = [] - - if isinstance(data, dict): - for k, v in data.items(): - if (target_key is not None and k == target_key) or (target_value is not None and v == target_value): - matches.append(list(path) + [k]) - matches.extend(find_paths(v, target_key, target_value, *path, k)) - elif isinstance(data, Sequence) and not isinstance(data, (str, bytes)): - for i, item in enumerate(data): - matches.extend(find_paths(item, target_key, target_value, *path, str(i))) - return matches - - -def find_chevrons(data, *path): - - matches = [] - - if isinstance(data, dict): - for k, v in data.items(): - if k == "<<": - matches.append(list(path) + [k]) - matches.extend(find_chevrons(v, *path, k)) - elif isinstance(data, list): - for i, item in enumerate(data): - matches.extend(find_chevrons(item, *path, str(i))) - return matches - - -def find_paths_in_substrees(path, obj, cur_path=None): - if cur_path is None: - cur_path = [] - matches = [] - try: - glom(obj, path) # just to check existence - matches.append(cur_path + path.split(".")) - except Exception: - pass - - if isinstance(obj, dict): - for k, v in obj.items(): - matches.extend(find_paths_in_substrees(path, v, cur_path + [k])) - elif isinstance(obj, list): - for i, v in enumerate(obj): - matches.extend(find_paths_in_substrees(path, v, cur_path + [str(i)])) - return matches - - -MIGRATE = { - "output.statistics_end": "statistics.end", - "has_nans": "statistics.allow_nans", - "loop.dates.group_by": "build.group_by", - "loop.0.dates.group_by": "build.group_by", - "loop.dates": "dates", - "loop.0.dates": "dates", - "copyright": "attribution", - "dates.<<": "dates", - "options.group_by": "build.group_by", - "loops.0.loop_a.dates": "dates", - "loop.0.loop_a.dates": "dates", - "dates.stop": "dates.end", - "dates.group_by": "build.group_by", - "include.mars": "data_sources.mars.mars", - "ensemble_dimension": "build.ensemble_dimension", - "flatten_grid": "build.flatten_grid", -} - -DELETE = [ - "purpose", - # "input.join.0.label", - "status", - "common", - "config_format_version", - "aliases", - # "platform", - "loops.0.loop_a.applies_to", - "loop.0.loop_a.applies_to", - "dataset_status", - "alias", - "resources", - "input.dates.<<", - "input.dates.join.0.label.name", -] - - -SOURCES = { - "oper-accumulations": "accumulations", - "era5-accumulations": "accumulations", - "ensemble-perturbations": "recentre", - "ensemble_perturbations": "recentre", - "perturbations": "recentre", - "custom-regrid": "regrid", -} - -MARKER = object() - - -def _delete(config, path): - x = glom(config, path, default=MARKER) - if x is MARKER: - return - delete(config, path) - - -def _move(config, path, new_path, result): - x = glom(config, path, default=MARKER) - if x is MARKER: - return - delete(result, path) - assign(result, new_path, x, missing=dict) - - -def _fix_input_0(config): - if isinstance(config["input"], dict): - return - - input = config["input"] - new_input = [] - - blocks = {} - first = None - for block in input: - assert isinstance(block, dict), block - - assert len(block) == 1, block - - block_name, values = list(block.items())[0] - - if "kwargs" in values: - inherit = values.pop("inherit", None) - assert len(values) == 1, values - values = values["kwargs"] - values.pop("date", None) - source_name = values.pop("name", None) - - if inherit is not None: - if inherit.startswith("$"): - inherit = inherit[1:] - inherited = blocks[inherit].copy() - inherited.update(values) - values = inherited - - if first is None: - first = source_name - - blocks[block_name] = values.copy() - - new_input.append({SOURCES.get(source_name, source_name): values.copy()}) - else: - assert False, f"Block {block_name} does not have 'kwargs': {values}" - - blocks[block_name] = values.copy() - - config["input"] = dict(join=new_input) - - -def _fix_input_1(result, config): - if isinstance(config["input"], dict): - return - - input = config["input"] - join = [] - for k in input: - assert isinstance(k, dict) - assert len(k) == 1, f"Input key {k} is not a string: {input}" - name, values = list(k.items())[0] - join.append(values) - - result["input"] = {"join": join} - config["input"] = result["input"].copy() - - -def remove_empties(config: dict) -> None: - """Remove empty dictionaries and lists from the config.""" +def migrate_accumulations(config): + """Migrate source 'accumulations' to the new 'accumulate' structure recursively.""" if isinstance(config, dict): - keys_to_delete = [k for k, v in config.items() if v in (None, {}, [], [{}])] - - for k in keys_to_delete: - del config[k] - - for k, v in config.items(): - remove_empties(v) - - if isinstance(config, list): - for item in config: - remove_empties(item) - - -def _fix_loops(result: dict, config: dict) -> None: - if "loops" not in config: - return - - input = config["input"] - loops = config["loops"] - - assert isinstance(loops, list), loops - assert isinstance(input, list), input - - entries = {} - dates_block = None - for loop in loops: - assert isinstance(loop, dict), loop - assert len(loop) == 1, loop - loop = list(loop.values())[0] - applies_to = loop["applies_to"] - dates = loop["dates"] - assert isinstance(applies_to, list), (applies_to, loop) - for a in applies_to: - entries[a] = dates.copy() - - if "start" in dates: - start = dates["start"] - else: - start = max(dates["values"]) - - if "end" in dates or "stop" in dates: - end = dates.get("end", dates.get("stop")) - else: - end = min(dates["values"]) - - if dates_block is None: - dates_block = { - "start": start, - "end": end, + if "accumulations" in config: + values = dict(config["accumulations"]) + period = values.pop("accumulation_period", 6) + result = {k: migrate_accumulations(v) for k, v in config.items() if k != "accumulations"} + result["accumulate"] = { + "period": period, + "availability": "auto", + "source": { + "mars": values, + }, } - - if "frequency" in dates: - if "frequency" not in dates_block: - dates_block["frequency"] = dates["frequency"] - else: - assert dates_block["frequency"] == dates["frequency"], (dates_block["frequency"], dates["frequency"]) - - dates_block["start"] = min(dates_block["start"], start) - dates_block["end"] = max(dates_block["end"], end) - - concat = [] - result["input"] = {"concat": concat} - - print("Found loops:", entries) - - for block in input: - assert isinstance(block, dict), block - assert len(block) == 1, block - name, values = list(block.items())[0] - assert name in entries, f"Loop {name} not found in loops: {list(entries.keys())}" - dates = entries[name].copy() - - assert "kwargs" not in values - - concat.append(dict(dates=dates, **values)) - - d = concat[0]["dates"] - if all(c["dates"] == d for c in concat): - join = [] - for c in concat: - del c["dates"] - join.append(c) - result["input"] = {"join": join} - - del config["loops"] - config["input"] = result["input"].copy() - config["dates"] = dates_block.copy() - del result["loops"] - result["dates"] = dates_block - - -def _fix_other(result: dict, config: dict) -> None: - paths = find_paths(config, target_key="source_or_dataset", target_value="$previous_data") - for p in paths: - print(f"Fixing {'.'.join(p)}") - assign(result, ".".join(p[:-1] + ["template"]), "${input.join.0.mars}", missing=dict) - delete(result, ".".join(p)) - - paths = find_paths(config, target_key="date", target_value="$dates") - for p in paths: - delete(result, ".".join(p)) - - -def _fix_join(result: dict, config: dict) -> None: - print("Fixing join...") - input = config["input"] - if "dates" in input and "join" in input["dates"]: - result["input"]["join"] = input["dates"]["join"] - config["input"]["join"] = input["dates"]["join"].copy() - - if "join" not in input: - return - - join = input["join"] - new_join = [] - for j in join: - assert isinstance(j, dict) - assert len(j) == 1 - - key, values = list(j.items())[0] - - if key not in ("label", "source"): - return - - assert isinstance(values, dict), f"Join values for {key} should be a dict: {values}" - if key == "label": - j = values - j.pop("name") - key, values = list(j.items())[0] - - print(values) - source_name = values.pop("name", "mars") - new_join.append( - { - SOURCES.get(source_name, source_name): values, - } - ) - - result["input"] = {"join": new_join} - config["input"] = result["input"].copy() - - -def _fix_sources(config: dict, what) -> None: - - input = config["input"] - if what not in input: - return - - join = input[what] - new_join = [] - for j in join: - assert isinstance(j, dict) - assert len(j) == 1, j - - key, values = list(j.items())[0] - - key = SOURCES.get(key, key) - - new_join.append( - { - key: values, - } - ) - - config["input"][what] = new_join - config["input"][what] = new_join.copy() - - -def _assign(config, path, value): - print(f"Assign {path} {value}") - assign(config, path, value) - - -def _fix_chevrons(result: dict, config: dict) -> None: - print("Fixing chevrons...") - paths = find_chevrons(config) - for p in paths: - a = glom(config, ".".join(p)) - b = glom(config, ".".join(p[:-1])) - delete(result, ".".join(p)) - a.update(b) - assign(result, ".".join(p[:-1]), a) - - -def _fix_some(config: dict) -> None: - - paths = find_paths_in_substrees("label.function", config) - for p in paths: - parent = glom(config, ".".join(p[:-2])) - node = glom(config, ".".join(p[:-1])) - assert node - _assign(config, ".".join(p[:-2]), node) - - paths = find_paths_in_substrees("constants.source_or_dataset", config) - for p in paths: - node = glom(config, ".".join(p[:-1])) - node["template"] = node.pop("source_or_dataset") - if node["template"] == "$previous_data": - node["template"] = "${input.join.0.mars}" - paths = find_paths_in_substrees("constants.template", config) - for p in paths: - node = glom(config, ".".join(p[:-1])) - if node["template"] == "$pl_data": - node["template"] = "${input.join.0.mars}" - for d in ("date", "dates", "time"): - paths = find_paths_in_substrees(d, config) - for p in paths: - if len(p) > 1: - node = glom(config, ".".join(p[:-1])) - if isinstance(node, dict) and isinstance(node[d], str) and node[d].startswith("$"): - del node[d] - - paths = find_paths_in_substrees("source.<<", config) - for p in paths: - parent = glom(config, ".".join(p[:-2])) - node = glom(config, ".".join(p[:-1])) - node.update(node.pop("<<")) - parent[node.pop("name")] = node - assert len(parent) == 2 - del parent["source"] - - paths = find_paths_in_substrees("label.mars", config) - for p in paths: - parent = glom(config, ".".join(p[:-2])) - node = glom(config, ".".join(p[:-1])) - assert node - assign(config, ".".join(p[:-2]), node) - - paths = find_paths_in_substrees("input.dates.join", config) - for p in paths: - node = glom(config, ".".join(p)) - config["input"]["join"] = node - del config["input"]["dates"] - - paths = find_paths_in_substrees("source.name", config) - for p in paths: - parent = glom(config, ".".join(p[:-2])) - node = glom(config, ".".join(p[:-1])) - name = node.pop("name") - assign(config, ".".join(p[:-2]), {name: node}) - - paths = find_paths_in_substrees("function.name", config) - for p in paths: - parent = glom(config, ".".join(p[:-2])) - node = glom(config, ".".join(p[:-1])) - name = node.pop("name") - assert node - assign(config, ".".join(p[:-2]), {name: node}) - - -def _migrate(config: dict, n) -> dict: - - result = config.copy() - - _fix_input_0(result) - # _fix_loops(result, config) - # _fix_input_1(result, config) - # _fix_join(result, config) - # _fix_chevrons(result, config) - # _fix_other(result, config) - - for k, v in MIGRATE.items(): - _move(config, k, v, result) - - _fix_some(result) - _fix_sources(result, "join") - - for k in DELETE: - _delete(result, k) - - remove_empties(result) - - return result - - -def migrate(old: dict) -> dict: - - for i in range(10): - new = _migrate(old, i) - if new == old: - return new - old = new - - return new - - -def has_key(config, key: str) -> bool: - if isinstance(config, dict): - if key in config: - return True - for k, v in config.items(): - if has_key(v, key): - return True + return result + return {k: migrate_accumulations(v) for k, v in config.items()} if isinstance(config, list): - for item in config: - if has_key(item, key): - return True - return False - - -def has_value(config, value: str) -> bool: - if isinstance(config, dict): - for k, v in config.items(): - if v == value: - return True - if has_value(v, value): - return True - - if isinstance(config, list): - for item in config: - if item == value: - return True - if has_value(item, value): - return True - return config == value - - -def check(config): + return [migrate_accumulations(item) for item in config] + return config - try: - validate_config(config) - assert config.get("input", {}) - assert config.get("dates", {}) - assert not has_key(config, "label") - assert not has_key(config, "kwargs") - assert not has_value(config, "$previous_data") - assert not has_value(config, "$pl_data") - assert not has_value(config, "$dates") - assert not has_key(config, "inherit") - assert not has_key(config, "source_or_dataset") - assert not has_key(config, "<<") +def remove_useless_common_block(config): + """Remove 'common' keys from the config.""" + return {k: v for k, v in config.items() if k != "common"} - for n in SOURCES.keys(): - assert not has_key(config, n), f"Source {n} found in config. Please update to {SOURCES[n]}." - except Exception as e: - print("Validation failed:") - print(e) - print(yaml_dump(config)) - sys.exit(1) +def migrate(config: dict) -> dict: + config = migrate_accumulations(config) + config = remove_useless_common_block(config) + return config def migrate_recipe(args: Any, config) -> None: @@ -548,7 +52,6 @@ def migrate_recipe(args: Any, config) -> None: migrated = migrate(config) - check(migrated) if migrated == config: return None From 30b521efed97844b671114331bd75c7b46932f08 Mon Sep 17 00:00:00 2001 From: Florian Pinault Date: Wed, 25 Feb 2026 16:08:42 +0000 Subject: [PATCH 06/12] migrate script for accumulation --- .../datasets/commands/recipe/migrate.py | 106 +++++++++++++++++- 1 file changed, 104 insertions(+), 2 deletions(-) diff --git a/src/anemoi/datasets/commands/recipe/migrate.py b/src/anemoi/datasets/commands/recipe/migrate.py index 4f7449e46..d1903a9a7 100644 --- a/src/anemoi/datasets/commands/recipe/migrate.py +++ b/src/anemoi/datasets/commands/recipe/migrate.py @@ -8,22 +8,103 @@ # nor does it submit to any jurisdiction. +import datetime import logging from typing import Any LOG = logging.getLogger(__name__) +def _parse_mars_times(raw_times) -> list[int]: + """Convert a list of MARS time values to integer hours. + + Parameters + ---------- + raw_times : list + Time values as integers or ``HH:MM`` strings. + + Returns + ------- + list of int + Hour values as integers (e.g. ``0``, ``6``, ``12``, ``18``). + + Examples + -------- + >>> _parse_mars_times([0]) + [0] + >>> _parse_mars_times([0, 12]) + [0, 12] + >>> _parse_mars_times(["00:00", "12:00"]) + [0, 12] + >>> _parse_mars_times(["00:00", "12:00", "18:00"]) + [0, 12, 18] + """ + if not isinstance(raw_times, (list, tuple)): + raw_times = [raw_times] + return [int(str(t).replace(":", "")) // 100 if ":" in str(t) else int(t) for t in raw_times] + + def migrate_accumulations(config): """Migrate source 'accumulations' to the new 'accumulate' structure recursively.""" if isinstance(config, dict): if "accumulations" in config: values = dict(config["accumulations"]) - period = values.pop("accumulation_period", 6) + accumulation_period = values.pop("accumulation_period", 6) + if "step" in values: + LOG.warning( + "Stripping 'step: %s' from accumulations source — " + "step is computed internally and any user-supplied value is ignored.", + values.pop("step"), + ) + if "accumulations_reset_frequency" in values: + LOG.warning( + "Stripping 'accumulations_reset_frequency: %s' from accumulations source — " + "this parameter has no equivalent in the new accumulate source.", + values.pop("accumulations_reset_frequency"), + ) + if isinstance(accumulation_period, int): + period = accumulation_period + class_ = values.get("class", "od") + stream = values.get("stream", "oper") + if (class_, stream) == ("od", "enfo"): + # 'auto' raises NotImplementedError for od/enfo in the new code. + # Use explicit availability: accumulated-from-start, all four base times. + LOG.warning( + "'availability: auto' is not yet supported for class=od stream=enfo. " + "Using explicit availability with base times [0, 6, 12, 18]." + ) + availability = [[bt, [f"0-{period}"]] for bt in [0, 6, 12, 18]] + else: + availability = "auto" + elif isinstance(accumulation_period, (list, tuple)): + step1, step2 = accumulation_period + if not isinstance(step1, int) or not isinstance(step2, int): + raise ValueError(f"Invalid accumulation_period: {accumulation_period}") + period = step2 - step1 + steps = [f"0-{step1}", f"0-{step2}"] + if "time" in values: + raw_times = values.pop("time") + base_times = _parse_mars_times(raw_times) + LOG.warning( + "Stripping 'time' from accumulations mars request — " + "using time values %s as availability base times.", + base_times, + ) + else: + base_times = [0, 6, 12, 18] + availability = [[bt, steps] for bt in base_times] + else: + raise ValueError(f"Invalid accumulation_period: {accumulation_period}") + if values.get("type") == "an": + LOG.warning( + "Changing 'type: an' to 'type: fc' in accumulations mars source — " + "accumulated fields come from forecasts, not analyses." + ) + values["type"] = "fc" result = {k: migrate_accumulations(v) for k, v in config.items() if k != "accumulations"} result["accumulate"] = { "period": period, - "availability": "auto", + "availability": availability, "source": { "mars": values, }, @@ -35,12 +116,33 @@ def migrate_accumulations(config): return config +def fix_datetimes(config): + """Convert datetime objects to plain strings without T/Z notation. + + PyYAML parses timestamp strings into Python datetime objects at load time. + When dumped back they become ISO 8601 (``2024-12-31T18:00:00Z``). + This converts them back to simple strings (``2024-12-31 18:00:00``). + """ + if isinstance(config, dict): + return {k: fix_datetimes(v) for k, v in config.items()} + if isinstance(config, list): + return [fix_datetimes(item) for item in config] + if isinstance(config, datetime.datetime): + if config.hour == 0 and config.minute == 0 and config.second == 0: + return config.strftime("%Y-%m-%d") + return config.strftime("%Y-%m-%d %H:%M:%S") + if isinstance(config, datetime.date): + return config.strftime("%Y-%m-%d") + return config + + def remove_useless_common_block(config): """Remove 'common' keys from the config.""" return {k: v for k, v in config.items() if k != "common"} def migrate(config: dict) -> dict: + config = fix_datetimes(config) config = migrate_accumulations(config) config = remove_useless_common_block(config) return config From b5f01d9df41f18e21a9ddfa0ce80e29c6a51acd2 Mon Sep 17 00:00:00 2001 From: Florian Pinault Date: Wed, 25 Feb 2026 21:10:07 +0000 Subject: [PATCH 07/12] fix --- src/anemoi/datasets/commands/recipe/migrate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/anemoi/datasets/commands/recipe/migrate.py b/src/anemoi/datasets/commands/recipe/migrate.py index d1903a9a7..031212928 100644 --- a/src/anemoi/datasets/commands/recipe/migrate.py +++ b/src/anemoi/datasets/commands/recipe/migrate.py @@ -81,7 +81,7 @@ def migrate_accumulations(config): if not isinstance(step1, int) or not isinstance(step2, int): raise ValueError(f"Invalid accumulation_period: {accumulation_period}") period = step2 - step1 - steps = [f"0-{step1}", f"0-{step2}"] + steps = [f"0-{step2}"] if step1 == 0 else [f"0-{step1}", f"0-{step2}"] if "time" in values: raw_times = values.pop("time") base_times = _parse_mars_times(raw_times) From 701e1d82d92b01b7698d94f3f980b62538ca4c73 Mon Sep 17 00:00:00 2001 From: Florian Pinault Date: Wed, 25 Feb 2026 21:18:27 +0000 Subject: [PATCH 08/12] doc --- docs/building/sources/accumulate.rst | 49 +++++++++++++++++++ ...cycle-mars-ecmwf-operational-forecast.yaml | 21 ++++++++ 2 files changed, 70 insertions(+) create mode 100644 docs/building/sources/yaml/accumulations-cycle-mars-ecmwf-operational-forecast.yaml diff --git a/docs/building/sources/accumulate.rst b/docs/building/sources/accumulate.rst index ccbb2a87a..d074f3e37 100644 --- a/docs/building/sources/accumulate.rst +++ b/docs/building/sources/accumulate.rst @@ -71,6 +71,7 @@ There are multiple ways to specify the ``availability`` parameter: - `Option 2: Availability over fixed periods`_ - `Option 3: Automatic detection for well-known datasets`_ - `Option 4: Finer control using explicit list of interval`_ +- `Option 5: Cycle availability`_ Option 1: Type-based availability @@ -152,6 +153,54 @@ For full control, provide an explicit list of ``(basetime, steps)`` pairs. These two examples are equivalent to those shown in Option 1 above. +Option 5: Cycle availability +----------------------------- + +Use ``availability: cycle`` when the mapping from valid date to base time follows a +**repeating periodic pattern** that cannot be described by a single base time. +This is typical for ECMWF operational forecasts where: + +- Forecasts run only at 00Z and 12Z (``oper``) or 06Z and 18Z (``scda``). +- Depending on the valid date, the same base time may need to supply the accumulation + from a single step (low lead time) or from the difference of two steps (higher lead + time). +- A longer cycle (e.g. 72h) may be used when you want to use a consistent forecast + lead time across multiple days. + +The ``cycle`` key takes a dictionary with the following structure: + +- **start** (optional): reference epoch used to locate the valid date within the cycle. + Defaults to ``1970-01-01 00:00:00``. For a 24-hour cycle the default is always correct. + For longer cycles, set ``start`` so that the first element of the cycle aligns with + the intended start of the repeating pattern. +- **"i_start-i_end"** keys: hours within the cycle (0 ≤ i_start < i_end ≤ cycle_length). + The cycle length is inferred as the maximum ``i_end`` value found in the keys. + Each key covers one interval of the requested accumulation period. +- Values: a two-element list ``[base_time_hour, "0-stepA/0-stepB/..."]``: + + - ``base_time_hour`` — the hour-of-day of the forecast base time (0–23). + If it is greater than or equal to ``valid_date``, the previous day is used automatically. + - The slash-separated step strings ``"0-N"`` denote cumulative-from-start intervals + (``startStep=0``, ``endStep=N``). Two steps produce a difference; one step is used directly. + +The example below shows a 24-hour cycle for ECMWF operational 6h accumulations +(base times 00Z and 12Z): + +.. literalinclude:: yaml/accumulations-cycle-mars-ecmwf-operational-forecast.yaml + :language: yaml + +For each 6-hour valid-date slot: + +- **Valid 06Z** — step 0→6h from 00Z base covers [00Z, 06Z] directly. +- **Valid 12Z** — difference of steps 0→12h and 0→6h from 00Z base gives [06Z, 12Z]. +- **Valid 18Z** — step 0→6h from 12Z base covers [12Z, 18Z] directly. +- **Valid 00Z** — difference of steps 0→12h and 0→6h from 12Z base gives [18Z, 00Z]. + +Compared to `Option 4: Finer control using explicit list of interval`_, the cycle approach +pins **which** base time is used for each valid-date slot, rather than letting the interval +solver choose. This is useful when the choice must be deterministic (e.g., to reproduce a +specific dataset exactly) or when the solver could ambiguously pick a longer-lead run. + Controlling the fields regrouped within accumulation ==================================================== diff --git a/docs/building/sources/yaml/accumulations-cycle-mars-ecmwf-operational-forecast.yaml b/docs/building/sources/yaml/accumulations-cycle-mars-ecmwf-operational-forecast.yaml new file mode 100644 index 000000000..193899879 --- /dev/null +++ b/docs/building/sources/yaml/accumulations-cycle-mars-ecmwf-operational-forecast.yaml @@ -0,0 +1,21 @@ +accumulate: + period: 6h + availability: + cycle: + # 24-hour repeating cycle. Each entry maps one 6h slot to the + # base time and cumulative steps that cover it. + # Keys are "start_hour-end_hour" within the cycle (hours since midnight). + # Values are [base_time_hour, "0-stepA/0-stepB/..."]. + '0-6': [0, "0-6"] # valid 06Z: base 00Z, step 0→6h + '6-12': [0, "0-6/0-12"] # valid 12Z: base 00Z, steps 0→6h and 0→12h (diff) + '12-18': [12, "0-6"] # valid 18Z: base 12Z, step 0→6h + '18-24': [12, "0-6/0-12"] # valid 00Z: base 12Z, steps 0→6h and 0→12h (diff) + source: + mars: + expver: "0001" + class: od + stream: oper + type: fc + grid: 20./20. + levtype: sfc + param: [tp, cp] From f007fd977eec054dc7b115b5e6e8e23fc756a6c9 Mon Sep 17 00:00:00 2001 From: Florian Pinault Date: Thu, 26 Feb 2026 08:10:58 +0000 Subject: [PATCH 09/12] doc --- docs/building/sources/accumulate.rst | 14 +++++++------- ...ions-cycle-mars-ecmwf-operational-forecast.yaml | 8 ++++---- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/docs/building/sources/accumulate.rst b/docs/building/sources/accumulate.rst index d074f3e37..d20ffb092 100644 --- a/docs/building/sources/accumulate.rst +++ b/docs/building/sources/accumulate.rst @@ -176,12 +176,12 @@ The ``cycle`` key takes a dictionary with the following structure: - **"i_start-i_end"** keys: hours within the cycle (0 ≤ i_start < i_end ≤ cycle_length). The cycle length is inferred as the maximum ``i_end`` value found in the keys. Each key covers one interval of the requested accumulation period. -- Values: a two-element list ``[base_time_hour, "0-stepA/0-stepB/..."]``: +- Values: a two-element list ``[base_time_hour, "stepA-stepB/stepC-stepD/..."]`` where: - ``base_time_hour`` — the hour-of-day of the forecast base time (0–23). If it is greater than or equal to ``valid_date``, the previous day is used automatically. - - The slash-separated step strings ``"0-N"`` denote cumulative-from-start intervals - (``startStep=0``, ``endStep=N``). Two steps produce a difference; one step is used directly. + - The slash-separated step strings ``"N-P"`` denote cumulative-from-start intervals + (``startStep=N``, ``endStep=P``). Two steps produce a difference; one step is used directly. The example below shows a 24-hour cycle for ECMWF operational 6h accumulations (base times 00Z and 12Z): @@ -191,10 +191,10 @@ The example below shows a 24-hour cycle for ECMWF operational 6h accumulations For each 6-hour valid-date slot: -- **Valid 06Z** — step 0→6h from 00Z base covers [00Z, 06Z] directly. -- **Valid 12Z** — difference of steps 0→12h and 0→6h from 00Z base gives [06Z, 12Z]. -- **Valid 18Z** — step 0→6h from 12Z base covers [12Z, 18Z] directly. -- **Valid 00Z** — difference of steps 0→12h and 0→6h from 12Z base gives [18Z, 00Z]. +- ** Data for 06Z** — step 0→6h from 00Z base covers [00Z, 06Z] directly. +- ** Data for 12Z** — difference of steps 0→12h and 0→6h from 00Z base gives [06Z, 12Z]. +- ** Data for 18Z** — step 0→6h from 12Z base covers [12Z, 18Z] directly. +- ** Data for 00Z** — difference of steps 0→12h and 0→6h from 12Z base gives [18Z, 00Z]. Compared to `Option 4: Finer control using explicit list of interval`_, the cycle approach pins **which** base time is used for each valid-date slot, rather than letting the interval diff --git a/docs/building/sources/yaml/accumulations-cycle-mars-ecmwf-operational-forecast.yaml b/docs/building/sources/yaml/accumulations-cycle-mars-ecmwf-operational-forecast.yaml index 193899879..25c9e930c 100644 --- a/docs/building/sources/yaml/accumulations-cycle-mars-ecmwf-operational-forecast.yaml +++ b/docs/building/sources/yaml/accumulations-cycle-mars-ecmwf-operational-forecast.yaml @@ -6,10 +6,10 @@ accumulate: # base time and cumulative steps that cover it. # Keys are "start_hour-end_hour" within the cycle (hours since midnight). # Values are [base_time_hour, "0-stepA/0-stepB/..."]. - '0-6': [0, "0-6"] # valid 06Z: base 00Z, step 0→6h - '6-12': [0, "0-6/0-12"] # valid 12Z: base 00Z, steps 0→6h and 0→12h (diff) - '12-18': [12, "0-6"] # valid 18Z: base 12Z, step 0→6h - '18-24': [12, "0-6/0-12"] # valid 00Z: base 12Z, steps 0→6h and 0→12h (diff) + '0-6': [0, "0-6"] # data in dataset at YYYY-MM-DD 06:00 -> use basetime 00:00, step 0-6h + '6-12': [0, "0-6/0-12"] # data in dataset at YYYY-MM-DD 12:00 -> use basetime 00:00, steps 0-6h and 0-12h (diff) + '12-18': [12, "0-6"] # data in dataset at YYYY-MM-DD 18:00 -> use basetime 12:00, step 0-6h + '18-24': [12, "0-6/0-12"] # data in dataset at YYYY-MM-DD 00:00 -> use basetime 12:00, steps 0-6h and 0-12h (diff) source: mars: expver: "0001" From ca233d61cc292cd87edd6436f501237f9d613772 Mon Sep 17 00:00:00 2001 From: Florian Pinault Date: Thu, 26 Feb 2026 17:04:00 +0000 Subject: [PATCH 10/12] remove unused --- docs/building/sources/accumulate.rst | 49 ------------------- ...cycle-mars-ecmwf-operational-forecast.yaml | 21 -------- 2 files changed, 70 deletions(-) delete mode 100644 docs/building/sources/yaml/accumulations-cycle-mars-ecmwf-operational-forecast.yaml diff --git a/docs/building/sources/accumulate.rst b/docs/building/sources/accumulate.rst index d20ffb092..ccbb2a87a 100644 --- a/docs/building/sources/accumulate.rst +++ b/docs/building/sources/accumulate.rst @@ -71,7 +71,6 @@ There are multiple ways to specify the ``availability`` parameter: - `Option 2: Availability over fixed periods`_ - `Option 3: Automatic detection for well-known datasets`_ - `Option 4: Finer control using explicit list of interval`_ -- `Option 5: Cycle availability`_ Option 1: Type-based availability @@ -153,54 +152,6 @@ For full control, provide an explicit list of ``(basetime, steps)`` pairs. These two examples are equivalent to those shown in Option 1 above. -Option 5: Cycle availability ------------------------------ - -Use ``availability: cycle`` when the mapping from valid date to base time follows a -**repeating periodic pattern** that cannot be described by a single base time. -This is typical for ECMWF operational forecasts where: - -- Forecasts run only at 00Z and 12Z (``oper``) or 06Z and 18Z (``scda``). -- Depending on the valid date, the same base time may need to supply the accumulation - from a single step (low lead time) or from the difference of two steps (higher lead - time). -- A longer cycle (e.g. 72h) may be used when you want to use a consistent forecast - lead time across multiple days. - -The ``cycle`` key takes a dictionary with the following structure: - -- **start** (optional): reference epoch used to locate the valid date within the cycle. - Defaults to ``1970-01-01 00:00:00``. For a 24-hour cycle the default is always correct. - For longer cycles, set ``start`` so that the first element of the cycle aligns with - the intended start of the repeating pattern. -- **"i_start-i_end"** keys: hours within the cycle (0 ≤ i_start < i_end ≤ cycle_length). - The cycle length is inferred as the maximum ``i_end`` value found in the keys. - Each key covers one interval of the requested accumulation period. -- Values: a two-element list ``[base_time_hour, "stepA-stepB/stepC-stepD/..."]`` where: - - - ``base_time_hour`` — the hour-of-day of the forecast base time (0–23). - If it is greater than or equal to ``valid_date``, the previous day is used automatically. - - The slash-separated step strings ``"N-P"`` denote cumulative-from-start intervals - (``startStep=N``, ``endStep=P``). Two steps produce a difference; one step is used directly. - -The example below shows a 24-hour cycle for ECMWF operational 6h accumulations -(base times 00Z and 12Z): - -.. literalinclude:: yaml/accumulations-cycle-mars-ecmwf-operational-forecast.yaml - :language: yaml - -For each 6-hour valid-date slot: - -- ** Data for 06Z** — step 0→6h from 00Z base covers [00Z, 06Z] directly. -- ** Data for 12Z** — difference of steps 0→12h and 0→6h from 00Z base gives [06Z, 12Z]. -- ** Data for 18Z** — step 0→6h from 12Z base covers [12Z, 18Z] directly. -- ** Data for 00Z** — difference of steps 0→12h and 0→6h from 12Z base gives [18Z, 00Z]. - -Compared to `Option 4: Finer control using explicit list of interval`_, the cycle approach -pins **which** base time is used for each valid-date slot, rather than letting the interval -solver choose. This is useful when the choice must be deterministic (e.g., to reproduce a -specific dataset exactly) or when the solver could ambiguously pick a longer-lead run. - Controlling the fields regrouped within accumulation ==================================================== diff --git a/docs/building/sources/yaml/accumulations-cycle-mars-ecmwf-operational-forecast.yaml b/docs/building/sources/yaml/accumulations-cycle-mars-ecmwf-operational-forecast.yaml deleted file mode 100644 index 25c9e930c..000000000 --- a/docs/building/sources/yaml/accumulations-cycle-mars-ecmwf-operational-forecast.yaml +++ /dev/null @@ -1,21 +0,0 @@ -accumulate: - period: 6h - availability: - cycle: - # 24-hour repeating cycle. Each entry maps one 6h slot to the - # base time and cumulative steps that cover it. - # Keys are "start_hour-end_hour" within the cycle (hours since midnight). - # Values are [base_time_hour, "0-stepA/0-stepB/..."]. - '0-6': [0, "0-6"] # data in dataset at YYYY-MM-DD 06:00 -> use basetime 00:00, step 0-6h - '6-12': [0, "0-6/0-12"] # data in dataset at YYYY-MM-DD 12:00 -> use basetime 00:00, steps 0-6h and 0-12h (diff) - '12-18': [12, "0-6"] # data in dataset at YYYY-MM-DD 18:00 -> use basetime 12:00, step 0-6h - '18-24': [12, "0-6/0-12"] # data in dataset at YYYY-MM-DD 00:00 -> use basetime 12:00, steps 0-6h and 0-12h (diff) - source: - mars: - expver: "0001" - class: od - stream: oper - type: fc - grid: 20./20. - levtype: sfc - param: [tp, cp] From f6336ee3a8167bd19753a624f51a611792036662 Mon Sep 17 00:00:00 2001 From: Florian Pinault Date: Mon, 2 Mar 2026 15:44:13 +0000 Subject: [PATCH 11/12] fix corner case for forecast data using date: ????-??-01, and providing step/time in the request --- src/anemoi/datasets/create/sources/mars.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/src/anemoi/datasets/create/sources/mars.py b/src/anemoi/datasets/create/sources/mars.py index b2b12f66b..869ea692c 100644 --- a/src/anemoi/datasets/create/sources/mars.py +++ b/src/anemoi/datasets/create/sources/mars.py @@ -166,7 +166,7 @@ def _expand_mars_request( user_time = None user_date = None if not request_already_using_valid_datetime: - user_time = request.get("user_time") + user_time = request.get("user_time", request.get("time")) if user_time is not None: user_time = to_list(user_time) user_time = [_normalise_time(t) for t in user_time] @@ -177,7 +177,7 @@ def _expand_mars_request( user_date = str(user_date) elif isinstance(user_date, datetime.datetime): user_date = user_date.strftime("%Y%m%d") - else: + elif not isinstance(user_date, str): raise ValueError(f"Invalid type for {user_date}") user_date = re.compile("^{}$".format(user_date.replace("-", "").replace("?", "."))) @@ -205,6 +205,8 @@ def _expand_mars_request( r[pproc] = "/".join(str(x) for x in r[pproc]) if user_date is not None: + # If date is provided by the user, we only keep the requests that match the date + # user_date is a regex pattern, so we use it to match the date in the request if not user_date.match(r["date"]): continue @@ -219,6 +221,9 @@ def _expand_mars_request( if int(r.get("time", 0)) in (600, 1800): r["stream"] = "scda" + # Strip internal filter keys that must not be forwarded to MARS. + r.pop("user_date", None) + r.pop("user_time", None) requests.append(r) return requests @@ -257,6 +262,16 @@ def factorise_requests( updates = [] for d in sorted(dates): for req in requests: + # A wildcard date pattern (e.g. "????-??-01") is a user-defined filter, + # not a real date. Rename to user_date so _expand_mars_request treats it + # as a filter and the pre-filter below does not do a broken literal comparison. + # Also rename time -> user_time in this context, since the actual base time + # is computed from the date-step arithmetic and time was used as a filter. + if isinstance(req.get("date"), str) and "?" in req["date"]: + req = dict(req) + req["user_date"] = req.pop("date") + if "time" in req and "user_time" not in req: + req["user_time"] = req.pop("time") if not no_date_here and ( ("date" in req) and ("time" in req) From 94e5768e56bba5ab2c3bc702e24a7f3a608914a3 Mon Sep 17 00:00:00 2001 From: Florian Pinault Date: Tue, 3 Mar 2026 14:09:09 +0000 Subject: [PATCH 12/12] set_start_step_to_zero --- .../sources/accumulate_utils/field_to_interval.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/anemoi/datasets/create/sources/accumulate_utils/field_to_interval.py b/src/anemoi/datasets/create/sources/accumulate_utils/field_to_interval.py index 53127011f..9eb6f4305 100644 --- a/src/anemoi/datasets/create/sources/accumulate_utils/field_to_interval.py +++ b/src/anemoi/datasets/create/sources/accumulate_utils/field_to_interval.py @@ -43,7 +43,15 @@ def _set_start_step_from_end_step_ceiled_to_24_hours(startStep, endStep, field=N return endStep - (endStep % 24), endStep -patch_registry = {"reset_24h_accumulations": _set_start_step_from_end_step_ceiled_to_24_hours} +def _set_start_step_to_zero(startStep, endStep, field=None): + # Because the data wrongly encode start_step, but end_step is correct + return 0, endStep + + +patch_registry = { + "reset_24h_accumulations": _set_start_step_from_end_step_ceiled_to_24_hours, + "set_start_step_to_zero": _set_start_step_to_zero, +} class FieldToInterval: