From 73e1b114ebfb4a5c66d411812dfab379caf655e8 Mon Sep 17 00:00:00 2001 From: Paul Gierz Date: Mon, 20 Jan 2025 13:25:17 +0100 Subject: [PATCH 1/9] feat(catalog): start of catalog work --- .../esm_runscripts/esm_plugins.yaml | 4 + .../esm_runscripts/esm_runscripts.yaml | 2 + setup.py | 1 + src/esm_runscripts/catalog.py | 163 ++++++++++++++++++ 4 files changed, 170 insertions(+) create mode 100644 src/esm_runscripts/catalog.py diff --git a/configs/esm_software/esm_runscripts/esm_plugins.yaml b/configs/esm_software/esm_runscripts/esm_plugins.yaml index 47158d88e..346382069 100644 --- a/configs/esm_software/esm_runscripts/esm_plugins.yaml +++ b/configs/esm_software/esm_runscripts/esm_plugins.yaml @@ -111,3 +111,7 @@ core: workflow: - "assemble_workflow" + catalog: + - "create_intake_esm_catalog" + - "write_intake_esm_catalog" + diff --git a/configs/esm_software/esm_runscripts/esm_runscripts.yaml b/configs/esm_software/esm_runscripts/esm_runscripts.yaml index 8e425a98e..b5610dc35 100644 --- a/configs/esm_software/esm_runscripts/esm_runscripts.yaml +++ b/configs/esm_software/esm_runscripts/esm_runscripts.yaml @@ -72,6 +72,8 @@ choose_job_type: - "copy_stuff_back_from_work" - "copy_all_results_to_exp" - "clean_run_dir" + - "create_intake_esm_catalog" + - "write_intake_esm_catalog" prepcompute: recipe: diff --git a/setup.py b/setup.py index 74323b429..16ea5520e 100644 --- a/setup.py +++ b/setup.py @@ -17,6 +17,7 @@ "PyGithub==1.55", "colorama==0.4.5", "coloredlogs==15.0.1", # NOTE(PG): Should be removed during cleanup for loguru instead + "dpath", "emoji==1.7.0", "f90nml==1.4.2", "gfw-creator==0.2.2", diff --git a/src/esm_runscripts/catalog.py b/src/esm_runscripts/catalog.py new file mode 100644 index 000000000..b3347fd74 --- /dev/null +++ b/src/esm_runscripts/catalog.py @@ -0,0 +1,163 @@ +import dpath +import xarray as xr +from loguru import logger + + +def create_intake_esm_catalog(config): + """ + Create an intake-esm catalog based on the configuration. + + This creates an intake-esm compatible catalog based on the simulation + configuration. The catalog is stored under ``config["intake"]``. Write + to disk as a separate step, this is not handled here! + + Creation of the catalog can be controlled via the configuration key:: + + config["general"]["create_catalog"] = True + + Default is ``True``. + + + Parameters + ---------- + config : dict + The simulation configuration + + Notes + ----- + The catalog attributes for each entry define what can be searched for and filtered. The exact + specifications of what should be included seems to be ambiguous and could be defined however + we want. Here, we use NextGEMS as a basis, and remove some of the fields which are not obvious + as a start. To see what is used in the DKRZ NextGems catalogue:: + + $ jq .attributes[].column_name /pool/data/Catalogs/dkrz_nextgems_disk.json + + See Also + -------- + * https://intake.readthedocs.io/en/latest/index.html + * https://github.com/NCAR/esm-collection-spec/blob/master/collection-spec/collection-spec.md + * https://tutorials.dkrz.de/tutorial_intake-5-create-esm-collection.html + """ + if not config["general"].get("create_catalog", True): + return config + catalog = config["intake"] = {} + catalog["esmcat_version"] = "0.1.0" + attributes = catalog["attributes"] = [] + catalog_attrs = [ + "variable_id", + "project", + "institution_id", + "source_id", + "experiment_id", + "realm", + "time_min", + "time_max", + ] + for attr in catalog_attrs: + attributes.append(dict(column_name=attr, vocabulary="")) + assets = catalog["assets"] = dict(column_name="uri", format_column_name="format") + aggregation_control = catalog["aggregation_control"] = { + "variable_column_name": "variable_id", + "groupby_attrs": [ + "project", + "institution_id", + "source_id", + "experiment_id", + "realm", + ], + "aggregations": [ + {"type": "union", "attribute_name": "variable_id", "options": {}}, + { + "type": "join_existing", + "attribute_name": "time_min", + "options": {"dim": "time", "coords": "minimal", "compat": "override"}, + }, + ], + } + catalog["id"] = "testcat" + catalog["description"] = f"Test Catalog for Experiment {config['general']['expid']}" + catalog["title"] = None + catalog["last_updated"] = str(datetime.datetime.now()) + catalog_dict = catalog["catalog_dict"] = [] + # Each entry in catalog_dict should correspond to the schema provided + # in catalog_attrs plus assets + for model in config["general"]["valid_model_names"]: + print(f"Cataloguing output of model {model}") + mconfig = config[model] + # FIXME(PG): This is not how we should determine which files are in the experiment outdata + # since this will list **all** files, not just the ones added during this run. + for output_file in pathlib.Path(mconfig["experiment_outdata_dir"]).iterdir(): + if mconfig["model"] in ["echam", "jsbach"]: + if "codes" in output_file.suffix or "idx" in output_file.suffix: + logger.debug( + "Skipping codes file or already processed grib outputfile" + ) + continue + # print(f"Cataloguing {output_file}...") + xarray_engine = "netcdf4" if "nc" in output_file.suffix else "cfgrib" + # NOTE(PG): Determine which variables are contained in the file, this could be better... + try: + var_list = list( + xr.open_dataset(output_file, engine=xarray_engine).variables.keys() + ) + except Exception as e: + logger.warning(f"Could not determine variables in {output_file}: {e}") + logger.warning("This file is not added to the catalog!") + continue + this_asset = dict( + project=config["general"].get("project", "esm_tools"), + institution_id="AWI", + source_id=f"{config['general']['model']}-{config['general']['version']}", + experiment_id=config["general"]["expid"], + realm=mconfig.get("type", "UNKNOWN"), + time_min=config["general"]["start_date"], + time_max=config["general"]["end_date"], + uri=f"file://{output_file}", + _data_format_=xarray_engine, # NOTE(PG): I don't like needing this... + variable_id=var_list, + ) + catalog_dict.append(this_asset) + + return config + + +def write_intake_esm_catalog(config): + """ + Save the intake catalog to disk. + + This function saves the intake catalog specified in the configuration to a YAML file + on disk. If a previous catalog exists, it merges the new catalog with the existing one. + + Saving of the catalog can be controlled via the configuration key:: + + config["general"]["write_catalog"] = True + + Default is ``True``. + + Parameters + ---------- + config : dict + Configuration dictionary containing the catalog and general settings. + Expected keys are "general" and "intake". + + Returns + ------- + dict + The updated configuration dictionary with the merged intake catalog. + """ + if not config["general"].get("write_catalog", True): + return config + save_cat_to_disk = config["general"].get("save_intake_esm_catalog_to_disk", True) + cat_fname = pathlib.Path( + f'{config["general"]["experiment_dir"]}/{config["general"]["expid"]}_intake_catalog.yaml' + ) + if cat_fname.exists(): + prev_cat = yaml.safe_load(cat_fname.read_text()) + else: + prev_cat = {} + # Full catalog: + dpath.merge(prev_cat, config.get("intake", {})) + config["intake"] = prev_cat + with open(cat_fname, "w") as f: + yaml.dump(config.get("intake", {}), f) + return config From ab37b8fff8af1d157345f8a1b5160249d16d7eb4 Mon Sep 17 00:00:00 2001 From: Paul Gierz Date: Mon, 20 Jan 2025 13:58:47 +0100 Subject: [PATCH 2/9] fix(catalog): forgot to add new module in __init__ --- src/esm_runscripts/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/esm_runscripts/__init__.py b/src/esm_runscripts/__init__.py index 0ce5ebf4e..9a6b02c2c 100644 --- a/src/esm_runscripts/__init__.py +++ b/src/esm_runscripts/__init__.py @@ -6,6 +6,7 @@ from .batch_system import * from .chunky_parts import * +from .catalog import * from .database import * from .database_actions import * from .dataprocess import * From 2c624420ea6f749780cb5453d3fd07a7a064ab75 Mon Sep 17 00:00:00 2001 From: Paul Gierz Date: Mon, 20 Jan 2025 14:09:06 +0100 Subject: [PATCH 3/9] fix(catalog): forgot some imports --- src/esm_runscripts/catalog.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/esm_runscripts/catalog.py b/src/esm_runscripts/catalog.py index b3347fd74..c7b7b261e 100644 --- a/src/esm_runscripts/catalog.py +++ b/src/esm_runscripts/catalog.py @@ -1,5 +1,9 @@ +import datetime +import pathlib + import dpath import xarray as xr +import yaml from loguru import logger From 098fb6bd147ef744bd4ac62b6567bca4367bec22 Mon Sep 17 00:00:00 2001 From: Paul Gierz Date: Mon, 20 Jan 2025 14:37:22 +0100 Subject: [PATCH 4/9] wip(catalog): convert all config elements to strings explicitly, better hashing for ID --- src/esm_runscripts/catalog.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/src/esm_runscripts/catalog.py b/src/esm_runscripts/catalog.py index c7b7b261e..45cd9ec73 100644 --- a/src/esm_runscripts/catalog.py +++ b/src/esm_runscripts/catalog.py @@ -1,4 +1,6 @@ import datetime +import getpass +import hashlib import pathlib import dpath @@ -78,8 +80,12 @@ def create_intake_esm_catalog(config): }, ], } - catalog["id"] = "testcat" - catalog["description"] = f"Test Catalog for Experiment {config['general']['expid']}" + catalog["id"] = hashlib.sha256( + f"{config['general']['expid']}_{datetime.datetime.now()}_{getpass.getuser()}".encode() + ).hexdigest() + catalog[ + "description" + ] = f"Intake-ESM Catalog for Experiment {config['general']['expid']}" catalog["title"] = None catalog["last_updated"] = str(datetime.datetime.now()) catalog_dict = catalog["catalog_dict"] = [] @@ -112,12 +118,13 @@ def create_intake_esm_catalog(config): project=config["general"].get("project", "esm_tools"), institution_id="AWI", source_id=f"{config['general']['model']}-{config['general']['version']}", - experiment_id=config["general"]["expid"], - realm=mconfig.get("type", "UNKNOWN"), - time_min=config["general"]["start_date"], - time_max=config["general"]["end_date"], + experiment_id=str(config["general"]["expid"]), + realm=str(mconfig.get("type", "UNKNOWN")), + time_min=str(config["general"]["start_date"]), + time_max=str(config["general"]["end_date"]), uri=f"file://{output_file}", - _data_format_=xarray_engine, # NOTE(PG): I don't like needing this... + # _data_format_=xarray_engine, # NOTE(PG): I don't like needing this... + format=xarray_engine, variable_id=var_list, ) catalog_dict.append(this_asset) From 69c40f9a34226e8ab6aa302a67ddf1debbd1d512 Mon Sep 17 00:00:00 2001 From: Paul Gierz Date: Mon, 20 Jan 2025 14:57:37 +0100 Subject: [PATCH 5/9] wip(catalog): better use of title and description keys --- src/esm_runscripts/catalog.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/esm_runscripts/catalog.py b/src/esm_runscripts/catalog.py index 45cd9ec73..b78749c79 100644 --- a/src/esm_runscripts/catalog.py +++ b/src/esm_runscripts/catalog.py @@ -83,10 +83,8 @@ def create_intake_esm_catalog(config): catalog["id"] = hashlib.sha256( f"{config['general']['expid']}_{datetime.datetime.now()}_{getpass.getuser()}".encode() ).hexdigest() - catalog[ - "description" - ] = f"Intake-ESM Catalog for Experiment {config['general']['expid']}" - catalog["title"] = None + catalog["description"] = config["general"].get("description") + catalog["title"] = f"Intake-ESM Catalog for Experiment {config['general']['expid']}" catalog["last_updated"] = str(datetime.datetime.now()) catalog_dict = catalog["catalog_dict"] = [] # Each entry in catalog_dict should correspond to the schema provided From 2fd4955d4aa7444df7e0671c970b8b9d67ab2419 Mon Sep 17 00:00:00 2001 From: Paul Gierz Date: Mon, 20 Jan 2025 17:41:00 +0100 Subject: [PATCH 6/9] wip(catalog): better saving by using serialize directly from catalog instance --- setup.py | 2 ++ src/esm_runscripts/catalog.py | 50 ++++++++++++++++++++++++++--------- 2 files changed, 40 insertions(+), 12 deletions(-) diff --git a/setup.py b/setup.py index 16ea5520e..92c9ba04c 100644 --- a/setup.py +++ b/setup.py @@ -22,6 +22,8 @@ "f90nml==1.4.2", "gfw-creator==0.2.2", "gitpython==3.1.41", # Maximum version for Python 3.6 support + "intake", + "intake-esm", "jinja2==3.1.4", "loguru==0.6.0", "numpy>=1.19.5", # Maximum version for Python 3.6 support diff --git a/src/esm_runscripts/catalog.py b/src/esm_runscripts/catalog.py index b78749c79..a5094033a 100644 --- a/src/esm_runscripts/catalog.py +++ b/src/esm_runscripts/catalog.py @@ -1,11 +1,14 @@ import datetime import getpass import hashlib +import json import pathlib import dpath +import intake +import intake_esm # noqa: F401, import only needed to register intake-esm driver +import pandas as pd import xarray as xr -import yaml from loguru import logger @@ -83,7 +86,7 @@ def create_intake_esm_catalog(config): catalog["id"] = hashlib.sha256( f"{config['general']['expid']}_{datetime.datetime.now()}_{getpass.getuser()}".encode() ).hexdigest() - catalog["description"] = config["general"].get("description") + catalog["description"] = str(config["general"].get("description")) catalog["title"] = f"Intake-ESM Catalog for Experiment {config['general']['expid']}" catalog["last_updated"] = str(datetime.datetime.now()) catalog_dict = catalog["catalog_dict"] = [] @@ -127,6 +130,11 @@ def create_intake_esm_catalog(config): ) catalog_dict.append(this_asset) + catalog_dict = catalog["catalog_dict"] + catalog_df = pd.DataFrame(catalog_dict) + # Try to construct the esm_datastore object: + validated_cat = intake.open_esm_datastore(obj=dict(esmcat=catalog, df=catalog_df)) + config["intake"] = validated_cat return config @@ -156,17 +164,35 @@ def write_intake_esm_catalog(config): """ if not config["general"].get("write_catalog", True): return config - save_cat_to_disk = config["general"].get("save_intake_esm_catalog_to_disk", True) - cat_fname = pathlib.Path( - f'{config["general"]["experiment_dir"]}/{config["general"]["expid"]}_intake_catalog.yaml' + + cat_file = pathlib.Path( + f'{config["general"]["experiment_dir"]}/{config["general"]["expid"]}_intake_catalog.json' ) - if cat_fname.exists(): - prev_cat = yaml.safe_load(cat_fname.read_text()) + catalog = config["intake"] + + if cat_file.exists(): + with open(cat_file, "r") as f: + prev_cat = json.load(f) else: prev_cat = {} - # Full catalog: - dpath.merge(prev_cat, config.get("intake", {})) - config["intake"] = prev_cat - with open(cat_fname, "w") as f: - yaml.dump(config.get("intake", {}), f) + + catalog_name = cat_file.stem + catalog.serialize(catalog_name, directory=cat_file.parent) + # catalog.serialize("paul", directory=cat_file.parent) + + backup_file = cat_file.with_suffix(".json_backup") + if cat_file.exists(): + with open(cat_file, "r") as f: + with open(backup_file, "w") as backup: + backup.write(f.read()) + + # Merge the new catalog into the previous one + with open(cat_file, "r") as f: + new_cat = json.load(f) + dpath.merge(prev_cat, new_cat) + + # Save the merged catalog back to disk + with open(cat_file, "w") as f: + json.dump(prev_cat, f, indent=4) + return config From 44bac3fdbf781c626a69b09dc36443da2d83fb28 Mon Sep 17 00:00:00 2001 From: Paul Gierz Date: Tue, 21 Jan 2025 09:07:17 +0100 Subject: [PATCH 7/9] wip(catalog): adds cfgrib dependency for indexing grib files correctly --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index 92c9ba04c..079008f9d 100644 --- a/setup.py +++ b/setup.py @@ -14,6 +14,7 @@ requirements = [ "Click>=8.0.4", # Maximum version for Python 3.6 support + "cfgrib", "PyGithub==1.55", "colorama==0.4.5", "coloredlogs==15.0.1", # NOTE(PG): Should be removed during cleanup for loguru instead From dd2bf6db7376df48c25e14b18209afd9b5cc2891 Mon Sep 17 00:00:00 2001 From: Paul Gierz Date: Tue, 21 Jan 2025 11:50:38 +0100 Subject: [PATCH 8/9] wip(catalog): finishing touches for first part --- src/esm_runscripts/catalog.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/src/esm_runscripts/catalog.py b/src/esm_runscripts/catalog.py index a5094033a..cbb24a5c3 100644 --- a/src/esm_runscripts/catalog.py +++ b/src/esm_runscripts/catalog.py @@ -47,9 +47,9 @@ def create_intake_esm_catalog(config): * https://github.com/NCAR/esm-collection-spec/blob/master/collection-spec/collection-spec.md * https://tutorials.dkrz.de/tutorial_intake-5-create-esm-collection.html """ - if not config["general"].get("create_catalog", True): + if not config.get("intake", {}).get("create_catalog", True): return config - catalog = config["intake"] = {} + catalog = config.get("intake", {}).get("catalog", {}) catalog["esmcat_version"] = "0.1.0" attributes = catalog["attributes"] = [] catalog_attrs = [ @@ -93,20 +93,23 @@ def create_intake_esm_catalog(config): # Each entry in catalog_dict should correspond to the schema provided # in catalog_attrs plus assets for model in config["general"]["valid_model_names"]: - print(f"Cataloguing output of model {model}") + logger.info(f"Cataloguing output of model {model}") mconfig = config[model] # FIXME(PG): This is not how we should determine which files are in the experiment outdata # since this will list **all** files, not just the ones added during this run. for output_file in pathlib.Path(mconfig["experiment_outdata_dir"]).iterdir(): + # TODO(PG): @JanStreffing, how does OIFS output look like? GRIB, NetCDF? + # Known GRIB output models: if mconfig["model"] in ["echam", "jsbach"]: if "codes" in output_file.suffix or "idx" in output_file.suffix: logger.debug( "Skipping codes file or already processed grib outputfile" ) continue - # print(f"Cataloguing {output_file}...") + # TODO(PG): Add zarr support later on xarray_engine = "netcdf4" if "nc" in output_file.suffix else "cfgrib" - # NOTE(PG): Determine which variables are contained in the file, this could be better... + # NOTE(PG): Determine which variables are contained in the file, I don't know + # but this could be better... try: var_list = list( xr.open_dataset(output_file, engine=xarray_engine).variables.keys() @@ -134,7 +137,8 @@ def create_intake_esm_catalog(config): catalog_df = pd.DataFrame(catalog_dict) # Try to construct the esm_datastore object: validated_cat = intake.open_esm_datastore(obj=dict(esmcat=catalog, df=catalog_df)) - config["intake"] = validated_cat + config["intake"] = config.get("intake", {}) + config["intake"]["catalog"] = validated_cat return config @@ -147,7 +151,7 @@ def write_intake_esm_catalog(config): Saving of the catalog can be controlled via the configuration key:: - config["general"]["write_catalog"] = True + config["intake"]["write_catalog"] = True Default is ``True``. @@ -162,13 +166,13 @@ def write_intake_esm_catalog(config): dict The updated configuration dictionary with the merged intake catalog. """ - if not config["general"].get("write_catalog", True): + if not config.get("intake", {}).get("write_catalog", True): return config cat_file = pathlib.Path( f'{config["general"]["experiment_dir"]}/{config["general"]["expid"]}_intake_catalog.json' ) - catalog = config["intake"] + catalog = config["intake"]["catalog"] if cat_file.exists(): with open(cat_file, "r") as f: @@ -194,5 +198,5 @@ def write_intake_esm_catalog(config): # Save the merged catalog back to disk with open(cat_file, "w") as f: json.dump(prev_cat, f, indent=4) - + config["intake"]["catalog_json"] = prev_cat return config From 1664787cf93abac7f9ebdba996ab4eb35b7aa02a Mon Sep 17 00:00:00 2001 From: Paul Gierz Date: Tue, 21 Jan 2025 11:56:05 +0100 Subject: [PATCH 9/9] wip: start of catalog server part --- setup.py | 1 + src/esm_runscripts/catalog.py | 20 ++++++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/setup.py b/setup.py index 079008f9d..c68356dfd 100644 --- a/setup.py +++ b/setup.py @@ -38,6 +38,7 @@ "semver==2.13.0", "sqlalchemy>=1.4.39", "tabulate==0.8.10", + "tiled[client]", "tqdm==4.66.3", "typing_extensions>=4.1.1", # Maximum number for Python 3.6 support "xdgenvpy==2.3.5", diff --git a/src/esm_runscripts/catalog.py b/src/esm_runscripts/catalog.py index cbb24a5c3..da8257b31 100644 --- a/src/esm_runscripts/catalog.py +++ b/src/esm_runscripts/catalog.py @@ -8,6 +8,7 @@ import intake import intake_esm # noqa: F401, import only needed to register intake-esm driver import pandas as pd +import tiled.client import xarray as xr from loguru import logger @@ -200,3 +201,22 @@ def write_intake_esm_catalog(config): json.dump(prev_cat, f, indent=4) config["intake"]["catalog_json"] = prev_cat return config + + +def upload_intake_esm_catalog(config): + if not config.get("intake", {}).get("upload_catalog", True): + return config + catalog_server_uri = config["general"].get( + "catalog_server_uri", "http://134.1.7.6:8000" + ) + # TODO(PG): Figure out a clean way to handle authentication + catalog_password = "secret" + try: + client = tiled.client.from_uri(catalog_server_uri, api_key=catalog_password) + except ConnectError as e: + logger.error(f"Unable to connect to {catalog_server_uri}: {e}") + return config + + catalog_json = config["intake"]["catalog_json"] + + return config