diff --git a/README.md b/README.md index 57f566c5..72cfa844 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,7 @@ currently packaged with adapters for pulling and converting `.grib` data from: - [CEDA Atmospheric Archive](https://catalogue.ceda.ac.uk) - [ECMWF MARS API](https://apps.ecmwf.int/mars-catalogue) - [DWD's ICON Model from the Opendata API](https://opendata.dwd.de) +- [CMC's GDPS Model from the Opendata API](https://dd.weather.gc.ca/) Similarly, the service can write to multiple sinks: diff --git a/src/nwp_consumer/internal/config/env.py b/src/nwp_consumer/internal/config/env.py index 29cca85e..ea28dced 100644 --- a/src/nwp_consumer/internal/config/env.py +++ b/src/nwp_consumer/internal/config/env.py @@ -90,6 +90,13 @@ class ICONEnv(EnvParser): ICON_PARAMETER_GROUP: str = "default" +class CMCEnv(EnvParser): + """Config for CMC API.""" + + CMC_MODEL: str = "gdps" + CMC_HOURS: int = 240 + CMC_PARAMETER_GROUP: str = "full" + # --- Outputs environment variables --- # diff --git a/src/nwp_consumer/internal/inputs/__init__.py b/src/nwp_consumer/internal/inputs/__init__.py index 1fbaf4b4..d0c3a3bb 100644 --- a/src/nwp_consumer/internal/inputs/__init__.py +++ b/src/nwp_consumer/internal/inputs/__init__.py @@ -1,8 +1,9 @@ -__all__ = ["ceda", "metoffice", "ecmwf", "icon"] +__all__ = ["ceda", "metoffice", "ecmwf", "icon", "cmc"] from . import ( ceda, metoffice, ecmwf, - icon + icon, + cmc, ) diff --git a/src/nwp_consumer/internal/inputs/cmc/CMC_glb_CAPE_SFC_0_latlon.15x.15_2023080900_P027.grib2 b/src/nwp_consumer/internal/inputs/cmc/CMC_glb_CAPE_SFC_0_latlon.15x.15_2023080900_P027.grib2 new file mode 100644 index 00000000..6950eae6 Binary files /dev/null and b/src/nwp_consumer/internal/inputs/cmc/CMC_glb_CAPE_SFC_0_latlon.15x.15_2023080900_P027.grib2 differ diff --git a/src/nwp_consumer/internal/inputs/cmc/CMC_glb_TMP_TGL_2_latlon.15x.15_2023080900_P027.grib2 b/src/nwp_consumer/internal/inputs/cmc/CMC_glb_TMP_TGL_2_latlon.15x.15_2023080900_P027.grib2 new file mode 100644 index 00000000..6548a192 Binary files /dev/null and b/src/nwp_consumer/internal/inputs/cmc/CMC_glb_TMP_TGL_2_latlon.15x.15_2023080900_P027.grib2 differ diff --git a/src/nwp_consumer/internal/inputs/cmc/CMC_glb_VGRD_ISBL_200_latlon.15x.15_2023080900_P027.grib2 b/src/nwp_consumer/internal/inputs/cmc/CMC_glb_VGRD_ISBL_200_latlon.15x.15_2023080900_P027.grib2 new file mode 100644 index 00000000..b7e53b91 Binary files /dev/null and b/src/nwp_consumer/internal/inputs/cmc/CMC_glb_VGRD_ISBL_200_latlon.15x.15_2023080900_P027.grib2 differ diff --git a/src/nwp_consumer/internal/inputs/cmc/__init__.py b/src/nwp_consumer/internal/inputs/cmc/__init__.py new file mode 100644 index 00000000..5d97b9a1 --- /dev/null +++ b/src/nwp_consumer/internal/inputs/cmc/__init__.py @@ -0,0 +1,4 @@ +__all__ = ["Client"] + +from .client import Client + diff --git a/src/nwp_consumer/internal/inputs/cmc/_consts.py b/src/nwp_consumer/internal/inputs/cmc/_consts.py new file mode 100644 index 00000000..aba6595f --- /dev/null +++ b/src/nwp_consumer/internal/inputs/cmc/_consts.py @@ -0,0 +1,62 @@ +"""Defines all parameters available from GDPS.""" + + +GDPS_VARIABLES = [ + "ALBDO", + "ABSV", + "CWAT", + "TSOIL", + "SOILVIC", + "SOILM", + "SFCWRO", + "CAPE", + "CIN", + "ACPCP", + "DLWRF", + "DSWRF", + "HGT", + "FPRATE", + "IPRATE", + "PCPNTYPE", + "LHTFL", + "NLWRS", + "NSWRS", + "PRATE", + "PRES", + "RH", + "SKINT", + "SDEN", + "SNOD", + "SPRATE", + "SPFH", + "TMP", + "TCDC", + "APCP", + "ULWRF", + "VVEL", + "GUST", + "UGRD", + "VGRD", +] + +GEPS_VARIABLES = [ + "CAPE", + "CIN", + "HGT", + "ICETK", + "PRES", + "PRMSL", + "PWAT", + "RH", + "SCWRO", + "SNOD", + "SPFH", + "TCDC", + "TMP", + "TSOIL", + "UGRD", + "VGRD", + "WEASD", + "WIND", + "VVEL" + ] diff --git a/src/nwp_consumer/internal/inputs/cmc/_models.py b/src/nwp_consumer/internal/inputs/cmc/_models.py new file mode 100644 index 00000000..f199a786 --- /dev/null +++ b/src/nwp_consumer/internal/inputs/cmc/_models.py @@ -0,0 +1,22 @@ +import datetime as dt + +from nwp_consumer import internal + + +class CMCFileInfo(internal.FileInfoModel): + def __init__( + self, it: dt.datetime, filename: str, currentURL: str, step: int, + ) -> "CMCFileInfo": + self._it = it + self._filename = filename + self._url = currentURL + self.step = step + + def filename(self) -> str: + return self._filename + + def filepath(self) -> str: + return self._url + "/" + self._filename + + def it(self) -> dt.datetime: + return self._it diff --git a/src/nwp_consumer/internal/inputs/cmc/client.py b/src/nwp_consumer/internal/inputs/cmc/client.py new file mode 100644 index 00000000..a6097075 --- /dev/null +++ b/src/nwp_consumer/internal/inputs/cmc/client.py @@ -0,0 +1,332 @@ +"""Implements a client to fetch GDPS/GEPS data from CMC.""" +import bz2 +import datetime as dt +import pathlib +import re +import typing +import urllib.request + +import requests +import structlog +import xarray as xr + +from nwp_consumer import internal + +from ._consts import GDPS_VARIABLES, GEPS_VARIABLES +from ._models import CMCFileInfo + +log = structlog.getLogger() + +# See https://eccc-msc.github.io/open-data/msc-data/nwp_gdps/readme_gdps-datamart_en/ for a list of CMC parameters +PARAMETER_RENAME_MAP: dict[str, str] = { + "t": internal.OCFShortName.TemperatureAGL.value, + "tclc": internal.OCFShortName.LowCloudCover.value, # TODO: Check this is okay + "dswrf": internal.OCFShortName.DownwardShortWaveRadiationFlux.value, + "dlwrf": internal.OCFShortName.DownwardLongWaveRadiationFlux.value, + "snod": internal.OCFShortName.SnowDepthWaterEquivalent.value, + "rh": internal.OCFShortName.RelativeHumidityAGL.value, + "u": internal.OCFShortName.WindUComponentAGL.value, + "v": internal.OCFShortName.WindVComponentAGL.value, +} + +COORDINATE_ALLOW_LIST: typing.Sequence[str] = ("time", "step", "latitude", "longitude") + + +class Client(internal.FetcherInterface): + """Implements a client to fetch GDPS/GEPS data from CMC.""" + + baseurl: str # The base URL for the GDPS/GEPS model + model: str # The model to fetch data for + parameters: list[str] # The parameters to fetch + conform: bool # Whether to rename parameters to OCF names and clear unwanted coordinates + + def __init__(self, model: str, hours: int = 48, param_group: str = "default") -> None: + """Create a new GDPS Client. + + Exposes a client for GDPS and GEPS data from Canada CMC that conforms to the FetcherInterface. + + Args: + model: The model to fetch data for. Valid models are "gdps" and "geps". + param_group: The set of parameters to fetch. + Valid groups are "default", "full", and "basic". + """ + self.baseurl = "https://dd.weather.gc.ca" + + match model: + case "gdps": + self.baseurl += "/model_gem_global/15km/grib2/lat_lon/" + case "geps": + self.baseurl += "/ensemble/geps/grib2/raw/" + case _: + raise ValueError( + f"unknown GDPS/GEPS model {model}. Valid models are 'gdps' and 'geps'", + ) + + match (param_group, model): + case ("default", _): + self.parameters = list(PARAMETER_RENAME_MAP.keys()) + self.conform = True + case ("full", "geps"): + self.parameters = GEPS_VARIABLES + self.conform = False + case ("full", "gdps"): + self.parameters = GDPS_VARIABLES + self.conform = False + case ("basic", "geps"): + self.parameters = GEPS_VARIABLES[:2] + self.conform = False + case ("basic", "gdps"): + self.parameters = GDPS_VARIABLES[:2] + self.conform = False + case (_, _): + raise ValueError( + f"unknown parameter group {param_group}." + "Valid groups are 'default', 'full', 'basic'", + ) + + self.model = model + self.hours = hours + + def listRawFilesForInitTime(self, *, it: dt.datetime) -> list[internal.FileInfoModel]: # noqa: D102 + # GDPS data is only available for today's and yesterdays's date. If data hasn't been uploaded for that init + # time yet, then yesterday's data will still be present on the server. + if it.date() != dt.datetime.now(dt.timezone.utc).date(): + raise ValueError("GDPS/GEPS data is only available on today's date") + return [] + + # The GDPS/GEPS model only runs on the hours [00, 12] + if it.hour not in [0, 12]: + return [] + + files: list[internal.FileInfoModel] = [] + + # Files are split per parameter, level, and step, with a webpage per parameter + # * The webpage contains a list of files for the parameter + # * Find these files for each parameter and add them to the list + for param in self.parameters: + # The list of files for the parameter + parameterFiles: list[internal.FileInfoModel] = [] + + # Fetch CMC webpage detailing the available files for the timestep + response = requests.get(f"{self.baseurl}/{it.strftime('%H')}/000/", timeout=3) + + if response.status_code != 200: + log.warn( + event="error fetching filelisting webpage for parameter", + status=response.status_code, + url=response.url, + param=param, + inittime=it.strftime("%Y-%m-%d %H:%M"), + ) + continue + + # The webpage's HTML
contains a list of tags + # * Each tag has a href, most of which point to a file) + for line in response.text.splitlines(): + # Check if the line contains a href, if not, skip it + refmatch = re.search(pattern=r'href="(.+)">', string=line) + if refmatch is None: + continue + + # The href contains the name of a file - parse this into a FileInfo object + fi: CMCFileInfo | None = None + # If not conforming, match all files + # * Otherwise only match single level and time invariant + fi = _parseCMCFilename( + name=refmatch.groups()[0], + baseurl=self.baseurl, + match_ml=not self.conform, + match_pl=not self.conform, + ) + # Ignore the file if it is not for today's date or has a step > 48 (when conforming) + if fi is None or fi.it() != it or (fi.step > self.hours and self.conform): + continue + + # Add the file to the list + parameterFiles.append(fi) + + log.debug( + event="listed files for parameter", + param=param, + inittime=it.strftime("%Y-%m-%d %H:%M"), + url=response.url, + numfiles=len(parameterFiles), + ) + + # Add the files for the parameter to the list of all files + files.extend(parameterFiles) + + return files + + def mapTemp(self, *, p: pathlib.Path) -> xr.Dataset: # noqa: D102 + if p.suffix != ".grib2": + log.warn( + event="cannot map non-grib file to dataset", + filepath=p.as_posix(), + ) + return xr.Dataset() + + log.debug(event="mapping raw file to xarray dataset", filepath=p.as_posix()) + + # Load the raw file as a dataset + try: + ds = xr.open_dataset( + p.as_posix(), + engine="cfgrib", + chunks={ + "time": 1, + "step": 1, + "variable": -1, + "latitude": "auto", + "longitude": "auto", + }, + ) + except Exception as e: + log.warn( + event="error converting raw file as dataset", + error=e, + filepath=p.as_posix(), + ) + return xr.Dataset() + # Rename variable to the value, as some have unknown as the name + if list(ds.data_vars.keys())[0] == "unknown": + ds = ds.rename({"unknown": str(p.name).split("_")[2].lower()}) + + # Rename variables that are both pressure level and surface + if "surface" in list(ds.coords): + ds = ds.rename({"surface": "heightAboveGround"}) + + if "heightAboveGround" in list(ds.coords) and list(ds.data_vars.keys())[0] in ["q","t","u","v"]: + # Rename data variable to add _surface to it so merging works later + ds = ds.rename({list(ds.data_vars.keys())[0]: f"{list(ds.data_vars.keys())[0]}_surface"}) + + if "isobaricInhPa" in list(ds.coords): + if "rh" in list(ds.data_vars.keys()): + ds = ds.rename({"isobaricInhPa": "isobaricInhPa_humidity"}) + if "absv" in list(ds.data_vars.keys()) or "vvel" in list(ds.data_vars.keys()): + ds = ds.rename({"isobaricInhPa": "isobaricInhPa_absv_vvel"}) + # Only conform the dataset if requested (defaults to True) + if self.conform: + # Rename the parameters to the OCF names + # * Only do so if they exist in the dataset + for oldParamName, newParamName in PARAMETER_RENAME_MAP.items(): + if oldParamName in ds: + ds = ds.rename({oldParamName: newParamName}) + + # Delete unwanted coordinates + ds = ds.drop_vars( + names=[c for c in ds.coords if c not in COORDINATE_ALLOW_LIST], + errors="ignore", + ) + + # Create chunked Dask dataset with a single "variable" dimension + # * Each chunk is a single time step + ds = ( + ds.rename({"time": "init_time"}) + .expand_dims("init_time") + .expand_dims("step") + .to_array(dim="variable", name=f"CMC_{self.model}".upper()) + .to_dataset() + .transpose("variable", "init_time", "step", ...) + .sortby("step") + .sortby("variable") + .chunk( + { + "init_time": 1, + "step": -1, + "variable": -1, + }, + ) + ) + return ds + + def downloadToTemp( # noqa: D102 + self, + *, + fi: internal.FileInfoModel, + ) -> tuple[internal.FileInfoModel, pathlib.Path]: + log.debug(event="requesting download of file", file=fi.filename(), path=fi.filepath()) + try: + response = urllib.request.urlopen(fi.filepath()) + except Exception as e: + log.warn( + event="error calling url for file", + url=fi.filepath(), + filename=fi.filename(), + error=e, + ) + return fi, pathlib.Path() + + if response.status != 200: + log.warn( + event="error downloading file", + status=response.status, + url=fi.filepath(), + filename=fi.filename(), + ) + return fi, pathlib.Path() + + tfp: pathlib.Path = internal.TMP_DIR / fi.filename() + with open(tfp, "wb") as f: + f.write(response.read()) + + log.debug( + event="fetched all data from file", + filename=fi.filename(), + url=fi.filepath(), + filepath=tfp.as_posix(), + nbytes=tfp.stat().st_size, + ) + + return fi, tfp + + +def _parseCMCFilename( + name: str, + baseurl: str, + match_sl: bool = True, + match_tgl: bool = False, + match_pl: bool = False, +) -> CMCFileInfo | None: + """Parse a string of HTML into an CMCFileInfo object, if it contains one. + + Args: + name: The name of the file to parse + baseurl: The base URL for the GDPS model + match_sl: Whether to match single-level files + match_tgl: Whether to match Height Above Ground-level files + match_pl: Whether to match pressure-level files + """ + # TODO Skips the special ones, CWAT + # Define the regex patterns to match the different types of file; X is step, L is level + # * Single Level: `CMC_glb_PARAM_SFC_0_latlon.15x.15_YYYYMMDD_P