Skip to content

Improve parent experiment support #214

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 49 additions & 9 deletions packages/ref-core/src/cmip_ref_core/constraints.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,8 @@ def apply(
values = tuple(group[facet].unique())
supplementary_facets[facet] += values

supplementary_group = data_catalog
for facet, values in supplementary_facets.items():
mask = supplementary_group[facet].isin(values)
supplementary_group = supplementary_group[mask]
mask = data_catalog[list(supplementary_facets)].isin(supplementary_facets).all(axis="columns")
supplementary_group = data_catalog[mask]

if not supplementary_group.empty and self.optional_matching_facets:
facets = list(self.matching_facets + self.optional_matching_facets)
Expand Down Expand Up @@ -345,15 +343,57 @@ def validate(self, group: pd.DataFrame) -> bool:


@frozen
class SelectParentExperiment:
class AddParentDataset:
"""
Include a dataset's parent experiment in the selection
Include a dataset's parent in the selection.
"""

def apply(self, group: pd.DataFrame, data_catalog: pd.DataFrame) -> pd.DataFrame:
"""
Include a dataset's parent experiment in the selection
Include a dataset's parent in the selection.

Not yet implemented
"""
raise NotImplementedError("This is not implemented yet") # pragma: no cover
# branch_time_in_child
# time units
# calendar
# +
# branch_time_in_parent
# parent_time_units
# and parent calendar
#
# needed to compute parent timerange offset:
# cftime.num2date(branch_time_in_parent, parent_time_units, parent_calendar)

parent_facet_options = [
{
"source_id": "parent_source_id",
"experiment_id": "parent_experiment_id",
"variant_label": "parent_variant_label",
"table_id": "table_id",
"variable_id": "variable_id",
"grid_label": "grid_label",
},
# TODO: update for CMIP7
]
for parent_facet_map in parent_facet_options:
# We do not have access to the SourceDatasetType so we need to
# figure out which parent_facets to use.
all_parent_facets = list(parent_facet_map) + list(parent_facet_map.values())
if set(all_parent_facets).issubset(data_catalog.keys()):
break
else:
# No matching parent_facets
return group

# TODO: select files based on start_time, end_time

datasets = group[all_parent_facets].drop_duplicates().dropna(axis="columns")
parent_datasets = []
for dataset in datasets.to_dict(orient="records"):
parent_facets = {k: (dataset[v],) for k, v in parent_facet_map.items()}
select = data_catalog[list(parent_facets)].isin(parent_facets).all(axis="columns")
parent_dataset = data_catalog[select]
parent_dataset = parent_dataset[parent_dataset["version"] == parent_dataset["version"].max()]
parent_datasets.append(parent_dataset)

return pd.concat([group, *parent_datasets]).drop_duplicates()
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@
import xarray

from cmip_ref_core.constraints import (
AddParentDataset,
AddSupplementaryDataset,
RequireContiguousTimerange,
RequireFacets,
RequireOverlappingTimerange,
)
from cmip_ref_core.datasets import FacetFilter, SourceDatasetType
from cmip_ref_core.metrics import DataRequirement
Expand All @@ -25,27 +24,23 @@ class TransientClimateResponse(ESMValToolMetric):
slug = "esmvaltool-transient-climate-response"
base_recipe = "recipe_tcr.yml"

experiments = (
"1pctCO2",
"piControl",
)
data_requirements = (
DataRequirement(
source_type=SourceDatasetType.CMIP6,
filters=(
FacetFilter(
facets={
"variable_id": ("tas",),
"experiment_id": experiments,
"variable_id": "tas",
"experiment_id": "1pctCO2",
},
),
),
group_by=("source_id", "member_id", "grid_label"),
constraints=(
RequireFacets("experiment_id", experiments),
RequireContiguousTimerange(group_by=("instance_id",)),
RequireOverlappingTimerange(group_by=("instance_id",)),
AddParentDataset(),
AddSupplementaryDataset.from_defaults("areacella", SourceDatasetType.CMIP6),
RequireContiguousTimerange(group_by=("instance_id",)),
# RequireParentDataset(), # TODO: implement
),
),
)
Expand Down Expand Up @@ -78,6 +73,7 @@ def update_recipe(recipe: Recipe, input_files: pandas.DataFrame) -> None:
recipe_variables = {k: v for k, v in recipe_variables.items() if k != "areacella"}

# Select a timerange covered by all datasets.
# TODO: replace this by a function that takes into account the offset of the parent experiment
start_times, end_times = [], []
for variable in recipe_variables.values():
for dataset in variable["additional_datasets"]:
Expand Down
90 changes: 77 additions & 13 deletions packages/ref/src/cmip_ref/datasets/cmip6.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
from __future__ import annotations

import traceback
import warnings
from datetime import datetime
from pathlib import Path
from typing import Any

import ecgtools.parsers
import ecgtools.builder # type: ignore
import ecgtools.parsers.utilities # type: ignore
import pandas as pd
from ecgtools import Builder
import xarray as xr
from loguru import logger
from sqlalchemy.orm import joinedload

Expand Down Expand Up @@ -82,34 +84,35 @@ class CMIP6DatasetAdapter(DatasetAdapter):
"branch_method",
"branch_time_in_child",
"branch_time_in_parent",
"calendar",
"experiment",
"experiment_id",
"frequency",
"grid",
"grid_label",
"init_year",
"institution_id",
"long_name",
"member_id",
"nominal_resolution",
"parent_activity_id",
"parent_experiment_id",
"parent_source_id",
"parent_time_units",
"parent_variant_label",
"product",
"realm",
"source_id",
"source_type",
"sub_experiment",
"sub_experiment_id",
"standard_name",
"table_id",
"time_units",
"units",
"variable_id",
"variant_label",
"member_id",
"standard_name",
"long_name",
"units",
"vertical_levels",
"init_year",
"version",
"vertical_levels",
slug_column,
)

Expand Down Expand Up @@ -168,9 +171,7 @@ def find_local_datasets(self, file_or_directory: Path) -> pd.DataFrame:
"""
with warnings.catch_warnings():
# Ignore the DeprecationWarning from xarray
warnings.simplefilter("ignore", DeprecationWarning)

builder = Builder(
builder = ecgtools.builder.Builder(
paths=[str(file_or_directory)],
depth=10,
include_patterns=["*.nc"],
Expand Down Expand Up @@ -203,7 +204,8 @@ def find_local_datasets(self, file_or_directory: Path) -> pd.DataFrame:
# TODO: Replace with a standalone package that contains metadata fixes for CMIP6 datasets
datasets = _apply_fixes(datasets)

return datasets
# Ignore the error resulting the missing type hints in ecgtools.
return datasets # type: ignore[no-any-return]

def register_dataset(
self, config: Config, db: Database, data_catalog_dataset: pd.DataFrame
Expand Down Expand Up @@ -308,3 +310,65 @@ def load_catalog(
],
index=[file.id for file in result_datasets],
)


def parse_cmip6(file: Path) -> dict[str, str]:
"""Parser for CMIP6"""
# Adapted from https://github.com/ncar-xdev/ecgtools/blob/8945f332e0b2dd5cc74e77ae7acbd2b818ec401e/ecgtools/parsers/cmip.py#L12C1-L89C72
# See also https://wcrp-cmip.github.io/CMIP6_CVs/
try:
info = {
"path": str(file),
"version": ecgtools.parsers.utilities.extract_attr_with_regex(
str(file), regex=r"v\d{4}\d{2}\d{2}|v\d{1}"
)
or "v0",
}
with xr.open_dataset(file, chunks={}, decode_times=False) as ds:
for key in (
CMIP6DatasetAdapter.dataset_specific_metadata + CMIP6DatasetAdapter.file_specific_metadata
):
info[key] = ds.attrs.get(key)

variable_id = info["variable_id"]
if variable_id:
attrs = ds[variable_id].attrs
for attr in ["standard_name", "long_name", "units"]:
info[attr] = attrs.get(attr)

try:
info["calendar"] = ds.cf["T"].attrs["calendar"]
info["time_units"] = ds.cf["T"].attrs["units"]
except (KeyError, AttributeError, ValueError):
pass

# Set the default of # of vertical levels to 1
vertical_levels = 1
try:
vertical_levels = ds[ds.cf["vertical"].name].size
except (KeyError, AttributeError, ValueError):
pass
info["vertical_levels"] = vertical_levels

if info["sub_experiment_id"] is None:
info["member_id"] = info["variant_label"]
else:
info["member_id"] = "{sub_experiment_id}-{variant_label}".format(**info)
init_year = ecgtools.parsers.utilities.extract_attr_with_regex(
info["sub_experiment_id"], r"\d{4}"
)
if init_year:
info["init_year"] = int(init_year)

time_coder = xr.coders.CFDatetimeCoder(use_cftime=True)
with xr.open_dataset(file, chunks={}, decode_times=time_coder) as ds:
try:
info["start_time"] = str(ds.cf["T"][0].data)
info["end_time"] = str(ds.cf["T"][-1].data)
except (KeyError, AttributeError, ValueError):
pass

return info

except Exception:
return {ecgtools.builder.INVALID_ASSET: str(file), ecgtools.builder.TRACEBACK: traceback.format_exc()}
Loading