diff --git a/configurations/ANNA/Containerfile b/configurations/ANNA/Containerfile new file mode 100644 index 0000000..5e113d7 --- /dev/null +++ b/configurations/ANNA/Containerfile @@ -0,0 +1,51 @@ +ARG BASE_IMAGE="pytorch/pytorch:2.10.0-cuda13.0-cudnn9-runtime" +FROM ${BASE_IMAGE} + +WORKDIR /workspace + +COPY pyproject.toml . +COPY *.yaml ./ +COPY entry.sh ./ +COPY src/ ./src + +# Download inference artifact from S3 +ARG DEFAULT_ARTIFACT="s3://mlwm-artifacts/inference-artifacts/gefion-1.zip" +ENV MLWM_INFERENCE_ARTIFACT=${DEFAULT_ARTIFACT} + +RUN apt-get update && apt-get install -y python3 python3-pip unzip git +RUN pip3 install awscli + +ARG AWS_ACCESS_KEY_ID +ARG AWS_SECRET_ACCESS_KEY +ARG AWS_DEFAULT_REGION +ENV AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID +ENV AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY +ENV AWS_DEFAULT_REGION=$AWS_DEFAULT_REGION + +# Get inference artifact from S3 +RUN aws s3 cp $MLWM_INFERENCE_ARTIFACT ./inference_artifact.zip +RUN mkdir -p /workspace/inference_artifact +RUN unzip inference_artifact.zip -d /workspace/inference_artifact +RUN rm inference_artifact.zip +# List files in the inference artifact directory for verification +RUN ls -la /workspace/inference_artifact + +# Install uv +RUN curl -LsSf https://astral.sh/uv/install.sh | sh + +ENV PATH="/root/.local/bin:$PATH" + +# export current torch version to constraints file +RUN python -c 'import torch; print(f"torch=={torch.__version__}")' > constraints.txt +# install with constraints +RUN uv pip install --break-system-packages --system --constraints constraints.txt . +# check that we can print out neural-lam version +RUN python -c 'import neural_lam; print(f"neural-lam=={neural_lam.__version__}")' + +# inside the container we have installed directly into system python, so we +# wont use uv here (otherwise uv tries to create a new virtual environment) +ENV USE_UV="false" +# Set the default command to run when the container starts +ENTRYPOINT ["./entry.sh"] +# No arguments passed by default +CMD [""] diff --git a/configurations/ANNA/README.md b/configurations/ANNA/README.md new file mode 100644 index 0000000..aa4b18a --- /dev/null +++ b/configurations/ANNA/README.md @@ -0,0 +1,301 @@ +# ANNA + +The ANNA artifact is "s3://mlwm-artifacts/inference-artifacts/gefion-1.zip", which contains a model trained on the DANRA dataset on Gefion. + +## Building image +`AWS_ACCESS_KEY_ID= AWS_SECRET_ACCESS_KEY= CONTAINER_APP=podman ./build_image.sh` + +## Running inference +`AWS_ACCESS_KEY_ID= AWS_SECRET_ACCESS_KEY= ./run_inference_container.sh 2026-02-04T00:00:00Z` + + +## Training cli args + +```yaml +- datastore: + - config_path: /dcai/projects/cu_0003/user_space/hinkas/git-repos/ablation-studies/configs/danra_model1/7deg_config.yaml +- num_workers: 6 +- precision: bf16-mixed +- batch_size: 1 +- hidden_dim: 300 +- hidden_dim_grid: 150 +- time_delta_enc_dim: 32 +- config_path: /dcai/projects/cu_0003/user_space/hinkas/git-repos/ablation-studies/configs/danra_model1/7deg_config.yaml +- model: hi_lam +- processor_layers: 2 +- graph_name: 7deg_rect_hi3 +- num_nodes: 2 +- epochs: 80 +- ar_steps_train: 1 +- lr: 0.001 +- min_lr: 0.001 +- val_interval: 5 +- ar_steps_eval: 4 +- val_steps_to_log: 1 2 4 +``` + +## mllam-data-prep config + +```yaml +schema_version: v0.5.0 +dataset_version: v0.1.0 + +output: + variables: + static: [grid_index, static_feature] + state: [time, grid_index, state_feature] + forcing: [time, grid_index, forcing_feature] + coord_ranges: + time: + start: 2000-01-01T00:00 + end: 2020-10-29T00:00 + step: PT3H + chunking: + time: 1 + state_feature: 20 + splitting: + dim: time + splits: + train: + start: 2000-01-01T00:00 + end: 2018-10-29T00:00 + compute_statistics: + ops: [mean, std, diff_mean, diff_std] + dims: [grid_index, time] + val: + start: 2018-11-05T00:00 + end: 2019-10-22T00:00 + test: + start: 2019-10-29T00:00 + end: 2020-10-29T00:00 + +inputs: + danra_sl_state: + path: /dcai/projects/cu_0003/data/sources/danra/v0.5.0/single_levels.zarr/ + dims: [time, x, y] + variables: + - pres_seasurface + - t2m + - u10m + - v10m + - pres0m + - lwavr0m + - swavr0m + dim_mapping: + time: + method: rename + dim: time + grid_index: + method: stack + dims: [x, y] + state_feature: + method: stack_variables_by_var_name + name_format: "{var_name}" + target_output_variable: state + + danra_pl_state: + path: /dcai/projects/cu_0003/data/sources/danra/v0.5.0/pressure_levels.zarr/ + dims: [time, x, y, pressure] + variables: + z: + pressure: + values: [100, 200, 400, 600, 700, 850, 925, 1000,] + units: hPa + t: + pressure: + values: [100, 200, 400, 600, 700, 850, 925, 1000,] + units: hPa + r: + pressure: + values: [100, 200, 400, 600, 700, 850, 925, 1000,] + units: hPa + u: + pressure: + values: [100, 200, 400, 600, 700, 850, 925, 1000,] + units: hPa + v: + pressure: + values: [100, 200, 400, 600, 700, 850, 925, 1000,] + units: hPa + tw: + pressure: + values: [100, 200, 400, 600, 700, 850, 925, 1000,] + units: hPa + dim_mapping: + time: + method: rename + dim: time + state_feature: + method: stack_variables_by_var_name + dims: [pressure] + name_format: "{var_name}{pressure}" + grid_index: + method: stack + dims: [x, y] + target_output_variable: state + + danra_static: + path: /dcai/projects/cu_0003/data/sources/danra/v0.5.0/single_levels.zarr/ + dims: [x, y] + variables: + - lsm + - orography + dim_mapping: + grid_index: + method: stack + dims: [x, y] + static_feature: + method: stack_variables_by_var_name + name_format: "{var_name}" + target_output_variable: static + + danra_forcing: + path: /dcai/projects/cu_0003/data/sources/danra/v0.5.0/single_levels.zarr/ + dims: [time, x, y] + derived_variables: + # derive variables to be used as forcings + toa_radiation: + kwargs: + time: ds_input.time + lat: ds_input.lat + lon: ds_input.lon + function: mllam_data_prep.ops.derive_variable.physical_field.calculate_toa_radiation + hour_of_day_sin: + kwargs: + time: ds_input.time + component: sin + function: mllam_data_prep.ops.derive_variable.time_components.calculate_hour_of_day + hour_of_day_cos: + kwargs: + time: ds_input.time + component: cos + function: mllam_data_prep.ops.derive_variable.time_components.calculate_hour_of_day + day_of_year_sin: + kwargs: + time: ds_input.time + component: sin + function: mllam_data_prep.ops.derive_variable.time_components.calculate_day_of_year + day_of_year_cos: + kwargs: + time: ds_input.time + component: cos + function: mllam_data_prep.ops.derive_variable.time_components.calculate_day_of_year + dim_mapping: + time: + method: rename + dim: time + grid_index: + method: stack + dims: [x, y] + forcing_feature: + method: stack_variables_by_var_name + name_format: "{var_name}" + target_output_variable: forcing + +extra: + projection: + class_name: LambertConformal + kwargs: + central_longitude: 25.0 + central_latitude: 56.7 + standard_parallels: [56.7, 56.7] + globe: + semimajor_axis: 6367470.0 + semiminor_axis: 6367470.0 +``` + +## Neural-lam config + +```yaml +datastore: + config_path: /dcai/projects/cu_0003/user_space/hinkas/git-repos/ablation-studies/configs/danra_model1/danra_model1_config.yaml + kind: mdp +datastore_boundary: + config_path: /dcai/projects/cu_0003/user_space/hinkas/git-repos/ablation-studies/configs/era_forcing/era_7deg_model1_config.yaml + kind: mdp +training: + excluded_intervals: + - - 2002-11-19T00 + - 2002-11-19T06 + - - 2007-08-26T00 + - 2007-08-26T21 + - - 2017-11-25T15 + - 2017-11-25T15 + output_clamping: + lower: + r100: 0 + r1000: 0 + r200: 0 + r400: 0 + r600: 0 + r700: 0 + r850: 0 + r925: 0 + upper: + r100: 1 + r1000: 1 + r200: 1 + r400: 1 + r600: 1 + r700: 1 + r850: 1 + r925: 1 + state_feature_weighting: + __config_class__: ManualStateFeatureWeighting + weights: + lwavr0m: 1.0 + pres0m: 1.0 + pres_seasurface: 1.0 + r100: 0.125 + r1000: 0.125 + r200: 0.125 + r400: 0.125 + r600: 0.125 + r700: 0.125 + r850: 0.125 + r925: 0.125 + swavr0m: 1.0 + t100: 0.125 + t1000: 0.125 + t200: 0.125 + t2m: 1.0 + t400: 0.125 + t600: 0.125 + t700: 0.125 + t850: 0.125 + t925: 0.125 + tw100: 0.125 + tw1000: 0.125 + tw200: 0.125 + tw400: 0.125 + tw600: 0.125 + tw700: 0.125 + tw850: 0.125 + tw925: 0.125 + u100: 0.125 + u1000: 0.125 + u10m: 1.0 + u200: 0.125 + u400: 0.125 + u600: 0.125 + u700: 0.125 + u850: 0.125 + u925: 0.125 + v100: 0.125 + v1000: 0.125 + v10m: 1.0 + v200: 0.125 + v400: 0.125 + v600: 0.125 + v700: 0.125 + v850: 0.125 + v925: 0.125 + z100: 0.125 + z1000: 0.125 + z200: 0.125 + z400: 0.125 + z600: 0.125 + z700: 0.125 + z850: 0.125 + z925: 0.125 +``` diff --git a/configurations/ANNA/build_image.sh b/configurations/ANNA/build_image.sh new file mode 100755 index 0000000..1217d96 --- /dev/null +++ b/configurations/ANNA/build_image.sh @@ -0,0 +1,63 @@ +#!/bin/bash + +# Container application (defuault to podman if not set) +CONTAINER_APP=${CONTAINER_APP:-podman} + +# Configuration +MLWM_LOG_LEVEL=DEBUG +MLWM_IMAGE_NAME="anna:latest" + +HTTP_PROXY="" +HTTPS_PROXY="" + +# Set MLWM_PULL_PROXY before running this script, e.g.: +# export MLWM_PULL_PROXY="your.proxy.server:port" +if [ -z "$MLWM_PULL_PROXY" ]; then + echo "Info: MLWM_PULL_PROXY is not set. Using public DockerHub." + MLWM_PULL_PROXY="" + CR_URL="docker.io" +else + echo "Info: Using proxy $MLWM_PULL_PROXY and internal DockerHub." + CR_URL="dockerhub.dmi.dk" +fi + +# if we're on ARM architecture, use the ARM base image +if [ "$(uname -m)" = "aarch64" ]; then + echo "Info: Detected ARM architecture. Using ARM base image." + # dockerhub doesn't have an official pytorch image for ARM, so we use NVIDIA's NGC registry + if [ -z "$CR_URL" ] || [ "$CR_URL" = "docker.io" ]; then + CR_URL="nvcr.io" + fi + MLWM_BASE_IMAGE="${CR_URL}/nvidia/pytorch:26.01-py3" +else + echo "Info: Using x86_64 base image." + MLWM_BASE_IMAGE="${CR_URL}/pytorch/pytorch:2.10.0-cuda13.0-cudnn9-runtime" +fi + +# Check AWS credentials if S3 access is needed +if [ -z "$AWS_ACCESS_KEY_ID" ]; then + echo "Error: AWS_ACCESS_KEY_ID is not set. Please set it before running this script." + exit 1 +fi +if [ -z "$AWS_SECRET_ACCESS_KEY" ]; then + echo "Error: AWS_SECRET_ACCESS_KEY is not set. Please set it before running this script." + exit 1 +fi +if [ -z "$AWS_DEFAULT_REGION" ]; then + echo "Error: AWS_DEFAULT_REGION is not set. We set it automatically to eu-central-1." + AWS_DEFAULT_REGION="eu-central-1" +fi + +# Pull base image with proxy +HTTP_PROXY="$MLWM_PULL_PROXY" HTTPS_PROXY="$MLWM_PULL_PROXY" ${CONTAINER_APP} --log-level="$MLWM_LOG_LEVEL" pull "$MLWM_BASE_IMAGE" + +# Build image with AWS credentials as build arguments +echo "Running ${CONTAINER_APP} build to create image $MLWM_IMAGE_NAME ..." +${CONTAINER_APP} build \ + --build-arg BASE_IMAGE="$MLWM_BASE_IMAGE" \ + --build-arg AWS_ACCESS_KEY_ID="$AWS_ACCESS_KEY_ID" \ + --build-arg AWS_SECRET_ACCESS_KEY="$AWS_SECRET_ACCESS_KEY" \ + --build-arg AWS_DEFAULT_REGION="$AWS_DEFAULT_REGION" \ + -t "$MLWM_IMAGE_NAME" \ + -f Containerfile \ + . diff --git a/configurations/ANNA/config.yaml b/configurations/ANNA/config.yaml new file mode 100644 index 0000000..dd84fcd --- /dev/null +++ b/configurations/ANNA/config.yaml @@ -0,0 +1,37 @@ +docker_image: ghcr.io/meteomatics/gefion1:latest + +inputs: + dini_pressure_levels: + uri_args: + bbox: + lon_min: -10.0 + lat_min: 50.0 + lon_max: 10.0 + lat_max: 60.0 + resolution: + lon_resolution: 2.5 + lat_resolution: 2.5 + unit: km + data_kind: pressure_levels + model_name: harmonie + model_config: dini + bucket_name: forecast-data-public + internal_path: /volume/inputs/dini/pressure_levels.zarr + +outputs: + pressure_levels: + internal_path: /volume/output/pressure_levels.zarr + uri_args: + bbox: + lon_min: -10.0 + lat_min: 50.0 + lon_max: 10.0 + lat_max: 60.0 + resolution: + lon_resolution: 2.5 + lat_resolution: 2.5 + unit: km + data_kind: pressure_levels + model_name: anna + model_config: FakeDINI+FakeIFS + bucket_name: forecast-data-public diff --git a/configurations/ANNA/entry.sh b/configurations/ANNA/entry.sh new file mode 100755 index 0000000..8e44cf3 --- /dev/null +++ b/configurations/ANNA/entry.sh @@ -0,0 +1,140 @@ +#!/usr/bin/env bash +# This script is used to run the inference for the ANNA model. +# +# This script is intended to be run in a container, and assumes that during the +# container image build that the inference artifact was unpacked to +# inference_artifact/. You can also run this script interactively if you have +# extracted the inference artifact yourself. +# +# The selection of datasets to use for input to the model, analysis time and +# forecast duration is controller by the following environment variables: +# DATASTORE_INPUT_PATHS, ANALYSIS_TIME, FORECAST_DURATION and NUM_EVAL_STEPS +# (the latter should be inferred from FORECAST_DURATION, but that is TODO) +# +# - DATASTORE_INPUT_PATHS is a comma-separated list of mappings of +# {datastore_name}.{input_name}={input_path} +# - ANALYSIS_TIME is the analysis time to start the forecast from is ISO8601 +# format +# - FORECAST_DURATION is the duration of the forecast in ISO8601 duration +# format and effects the length of the produced inference dataset +# - NUM_EVAL_STEPS is the number of autoregressive steps to run during +# inference. This should be consistent with FORECAST_DURATION and the model +# configuration (e.g. if the model was trained on 3-hourly data and +# FORECAST_DURATION is PT18H then NUM_EVAL_STEPS should be 6 + +# make this script fail on any error +set -e + +## Runtime configuration (variable expected to change on every execution) +# enable use of .env so that during development we can set environment (e.g. +# paths to replace in datastore config) +if [ -f .env ] ; then + echo "Sourcing local .env file" + set -a && source .env && set +a +fi + +USE_UV=${USE_UV:-true} +if [ "$USE_UV" = true ] ; then + echo "Using uv to run commands" + UV_CMD="uv run" +else + echo "Not using uv to run commands, using plain python" + UV_CMD="" +fi + +# print CUDA debug info +${UV_CMD} python - <<'PY' +import torch, subprocess, os +print("torch:", torch.__version__) +print("cuda available:", torch.cuda.is_available()) +if torch.cuda.is_available(): + cap = torch.cuda.get_device_capability(0) + print("device capability:", cap) + print("name:", torch.cuda.get_device_name(0)) + try: + torch.randn(2, device="cuda") + print("cuda op: OK") + except Exception as e: + print("cuda op failed:", e) +PY + +## Model specific inference configuration (same across all executions) +NUM_HIDDEN_DIMS=300 +NUM_HIDDEN_DIMS_GRID=150 # To merge from Joels fork. +GRAPH_NAME="hi_lam" +HIEARCHICAL_GRAPH=true +MODEL_TIMESTEP_HOURS=3 + + +# set default override of input paths in the datastore config used for creating the +# inference dataset if environment variable isn't set + +# DATASTORE_INPUT_PATHS=${DATASTORE_INPUT_PATHS:-"\ +# danra.danra_surface=https://object-store.os-api.cci1.ecmwf.int/danra/v0.6.0dev1/single_levels.zarr/,\ +# danra.danra_static=https://object-store.os-api.cci1.ecmwf.int/danra/v0.5.0/single_levels.zarr/"} +DATASTORE_INPUT_PATHS=${DATASTORE_INPUT_PATHS:-"\ +danra.danra_sl_state=https://object-store.os-api.cci1.ecmwf.int/danra/v0.6.0dev1/single_levels.zarr/,\ +danra.danra_static=https://object-store.os-api.cci1.ecmwf.int/danra/v0.5.0/single_levels.zarr/"} + +TIME_DIMENSIONS=${TIME_DIMENSIONS:-"analysis_time,elapsed_forecast_duration"} +ANALYSIS_TIME=${ANALYSIS_TIME:-"2019-02-04T12:00"} # assumed to be in UTC +# default forecast duration of 18 hours +FORECAST_DURATION=${FORECAST_DURATION:-"PT18H"} + +# compute number of eval steps from forecast duration +if [ -z "${NUM_EVAL_STEPS}" ] ; then + # check that FORECAST_DURATION is in expected format PT{N}H + if [[ "${FORECAST_DURATION}" =~ ^PT([0-9]+)H$ ]] ; then + HOURS="${BASH_REMATCH[1]}" + NUM_EVAL_STEPS=$((HOURS / MODEL_TIMESTEP_HOURS)) + echo "Inferred NUM_EVAL_STEPS=${NUM_EVAL_STEPS} from FORECAST_DURATION=${FORECAST_DURATION}" + else + echo "ERROR: Cannot infer NUM_EVAL_STEPS from FORECAST_DURATION='${FORECAST_DURATION}', please set NUM_EVAL_STEPS explicitly" + exit 1 + fi +fi + +# All working directories (for input data, output data, intermediate files) +# will be created under INFERENCE_WORKDIR +INFERENCE_WORKDIR=${INFERENCE_WORKDIR:-"./inference_workdir"} + +echo "Creating forecast using following runtime args:" +echo " DATASTORE_INPUT_PATHS=${DATASTORE_INPUT_PATHS}" +echo " TIME_DIMENSIONS=${TIME_DIMENSIONS}" +echo " ANALYSIS_TIME=${ANALYSIS_TIME}" +echo " FORECAST_DURATION=${FORECAST_DURATION}" +echo " NUM_EVAL_STEPS=${NUM_EVAL_STEPS}" +echo " INFERENCE_WORKDIR=${INFERENCE_WORKDIR}" + +# set cli argument for creating hierarchical graph if needed +if [ "$HIEARCHICAL_GRAPH" = true ] ; then + CREATE_GRAPH_ARG="--hierarchical" +else + CREATE_GRAPH_ARG="" +fi + +## Setup working directories +INFERENCE_ARTIFACT_PATH="./inference_artifact" +INPUT_DATASETS_ROOT_PATH="${INFERENCE_WORKDIR}/inputs" +OUTPUT_DATASETS_ROOT_PATH="${INFERENCE_WORKDIR}/outputs" +mkdir -p ${OUTPUT_DATASETS_ROOT_PATH} + +# disable weights and biases logging, without this --eval with neural-lam fails +# because it tries to set up the logging and there is no WANDB_API_KEY set +${UV_CMD} wandb disabled + +## 1. Create inference dataset +# This uses a cli stored within mlwm to called mllam-data-prep to create the +# inference dataset. The inference dataset is created by modifying the +# configuration used during training to +# a) change the paths to the input datasets, +# b) include the statistics from the training dataset and +# c) set the dimensions in the configuration to have `analysis_time` and +# `elapsed_forecast_duration` instead of just `time`. +echo "Creating inference dataset\n" +DATASTORE_INPUT_PATHS=${DATASTORE_INPUT_PATHS} \ +ANALYSIS_TIME=${ANALYSIS_TIME} \ +FORECAST_DURATION=${FORECAST_DURATION} \ +TIME_DIMENSIONS=${TIME_DIMENSIONS} \ +INFERENCE_WORKDIR=${INFERENCE_WORKDIR} \ +${UV_CMD} python src/create_inference_dataset.py diff --git a/configurations/ANNA/pyproject.toml b/configurations/ANNA/pyproject.toml new file mode 100644 index 0000000..f4dfa1c --- /dev/null +++ b/configurations/ANNA/pyproject.toml @@ -0,0 +1,39 @@ +[project] +name = "ANNA" +version = "0.1.0" +description = "Add your description here" +readme = "README.md" +requires-python = ">=3.11" +authors = [ + {name = "Leif Denby", email = "lcd@dmi.dk"}, + {name = "Kasper Hintz", email = "kah@dmi.dk"}, +] +dependencies = [ + "parse>=1.20.2", + "dask>=2025.4.1", + "dotenv>=0.9.9", + "ipdb>=0.13.13", + "s3fs>=2025.3.2", + "tqdm>=4.67.1", + "universal-pathlib>=0.2.6", + "zarr>=3.0", + "ipython>=8.37.0", + "mllam-data-prep", + "neural-lam", +] + +[dependency-groups] +dev = [ + "pre-commit>=4.2.0", + "pytest>=8.3.5", +] + +[tool.isort] +profile = "black" + +[tool.uv.sources] +mllam-data-prep = { git = "https://github.com/leifdenby/mllam-data-prep", rev = "feat/inference-cli-args" } +neural-lam = { git = "https://github.com/khintz/neural-lam", rev = "dev/first-inference-image" } +[build-system] +requires = ["setuptools>=61", "setuptools_scm"] +build-backend = "setuptools.build_meta" diff --git a/configurations/ANNA/run_inference_container.sh b/configurations/ANNA/run_inference_container.sh new file mode 100755 index 0000000..f0c9c4d --- /dev/null +++ b/configurations/ANNA/run_inference_container.sh @@ -0,0 +1,69 @@ +#!/bin/bash + +# This script runs the inference container using initial conditions from DINI +# stored on AWS + +# The script takes only one argument: the analysis time to use for inference, +# in ISO8601 format (e.g. 2025-11-05T090000Z). If "Z" is omitted, UTC is +# assumed. An optional second argument can be provided to specify the forecast +# duration in ISO8601 duration format (e.g. PT18H for 18 hours). If not +# provided, the default is PT18H. + +if [ "$#" -lt 1 ] || [ "$#" -gt 2 ] ; then + echo "Usage: $0 []" >&2 + echo "" >&2 + echo " ANALYSIS_TIME: the analysis time to start the forecast from in ISO8601 format" >&2 + echo " FORECAST_DURATION: the duration of the forecast in ISO8601 duration format (default PT18H)" >&2 + exit 1 +fi +ANALYSIS_TIME="$1" +if [ "$#" -eq 2 ] ; then + FORECAST_DURATION="$2" +else + FORECAST_DURATION="PT18H" +fi + +# function to format analysis time to remove colons and ensure UTC 'Z' suffix +format_analysis_time() { + local iso="$1" + + if [[ -z "$iso" ]]; then + echo "format_analysis_time: missing ISO8601 datetime" >&2 + return 1 + fi + + if date -u -d "1970-01-01T00:00:00Z" >/dev/null 2>&1; then + # GNU date (Linux) + date -u -d "$iso" +"%Y-%m-%dT%H%M%SZ" || return 1 + else + # macOS / BSD fallback using Python stdlib + python3 - <<'EOF' "$iso" +from datetime import datetime, timezone +import sys + +dt = datetime.fromisoformat(sys.argv[1].replace("Z", "+00:00")) +dt = dt.astim$ezone(timezone.utc) +print(dt.strftime("%Y-%m-%dT%H%M%SZ")) +EOF + fi +} + +# Create the inference working directory if it doesn't exist +mkdir -p ./inference_workdir/ + +# prepare environment variables for container +ANALYSIS_TIME=$(format_analysis_time "${ANALYSIS_TIME}") +DINI_ZARR="s3://harmonie-zarr/dini/control/${ANALYSIS_TIME}/single_levels.zarr/" +DATASTORE_INPUT_PATHS="danra_model1_config.danra_sl_state=${DINI_ZARR},danra_model1_config.danra_static=${DINI_ZARR}" +TIME_DIMENSIONS="time" +INFERENCE_WORKDIR="$(pwd)/inference_workdir/" + +podman run --rm \ + --device nvidia.com/gpu=all \ + --shm-size=32g \ + -v ${INFERENCE_WORKDIR}:/workspace/inference_workdir:Z \ + -e DATASTORE_INPUT_PATHS="${DATASTORE_INPUT_PATHS}" \ + -e TIME_DIMENSIONS="${TIME_DIMENSIONS}" \ + -e ANALYSIS_TIME="${ANALYSIS_TIME}" \ + -e FORECAST_DURATION="${FORECAST_DURATION}" \ + localhost/anna:latest diff --git a/configurations/ANNA/src/create_inference_dataset.py b/configurations/ANNA/src/create_inference_dataset.py new file mode 100644 index 0000000..8e3d2fa --- /dev/null +++ b/configurations/ANNA/src/create_inference_dataset.py @@ -0,0 +1,485 @@ +import copy +import datetime +import os +from pathlib import Path +from typing import Dict + +import isodate +import mllam_data_prep as mdp +import mllam_data_prep.config as mdp_config +import parse +import xarray as xr +from loguru import logger +from neural_lam.config import DatastoreSelection, NeuralLAMConfig + +FP_TRAINING_CONFIG = "inference_artifact/configs/config.yaml" +DATASTORE_INPUT_PATH_FORMAT = "{datastore_name}.{input_name}={input_path}" + + +def _parse_datastore_input_paths(s: str) -> Dict[str, Dict[str, str]]: + """ + Parse a comma-separated list of {datastore_name}.{input_name}={input_path} + into a dictionary of dictionaries. + + Parameters + ---------- + s : str + The string to parse. + + Returns + ------- + Dict[str, Dict[str, str]] + A dictionary of dictionaries. + """ + result = {} + for item in s.split(","): + parts = parse.parse(DATASTORE_INPUT_PATH_FORMAT, item) + if parts is None: + raise ValueError( + f"Invalid format for DATASTORE_INPUT_PATHS item: {item}. " + f"Expected format is {DATASTORE_INPUT_PATH_FORMAT}" + ) + datastore_name = parts["datastore_name"] + input_name = parts["input_name"] + input_path = parts["input_path"] + + if datastore_name not in result: + result[datastore_name] = {} + elif input_name in result[datastore_name]: + raise ValueError( + f"Duplicate input name {input_name} for datastore " + f"{datastore_name} in DATASTORE_INPUT_PATHS" + ) + result[datastore_name][input_name] = input_path + return result + + +REQUIRED_ENV_VARS = { + # comma-separated list of {datastore_name}:{input_name}={input_path} + "DATASTORE_INPUT_PATHS": _parse_datastore_input_paths, + # iso8601 datetime string, e.g. 2019-02-04T12:00+0000 + "ANALYSIS_TIME": isodate.parse_datetime, + # iso8160 duration string, e.g. PT6H for 6 hours + "FORECAST_DURATION": isodate.parse_duration, + # comma-separated list of time dimensions to replace, e.g. + # time,forecast_reference_time + "TIME_DIMENSIONS": lambda s: s.split(","), + # inference working directory, relative to where inference config and + # datasets are saved + "INFERENCE_WORKDIR": str, +} + + +def _parse_env_vars() -> Dict[str, any]: + """ + Parse and validate required environment variables. + + Returns + ------- + Dict[str, any] + A dictionary of parsed environment variables. + """ + env_vars = {} + for var, parser in REQUIRED_ENV_VARS.items(): + value = os.getenv(var) + if value is None: + raise EnvironmentError(f"Environment variable {var} is not set.") + try: + env_vars[var] = parser(value) + except Exception as e: + raise ValueError(f"Error parsing environment variable {var}: {e}") + return env_vars + + +def _create_inference_datastore_config( + training_config: mdp.Config, + forecast_analysis_time: datetime.datetime, + forecast_duration: datetime.timedelta, + time_dimensions: list[str], + overwrite_input_paths: Dict[str, str] = {}, +) -> mdp.Config: + """ + From a training datastore config, create an inference datastore config that: + - samples along a new sampling dimension `sampling_dim` (default: + `analysis_time`) instead of `time` + - has a single split called "test" with a single time slice given by the + `forecast_analysis_time` argument + - optionally overwrites input paths with the `overwrite_input_paths` argument + - ensures that the output variables have the correct dimensions, for example + replacing `time` with [`analysis_time`, `elapsed_forecast_duration`] + - ensures that the input datasets have the correct dimensions and dim_mappings, + i.e. replacing `time` with [`analysis_time`, `elapsed_forecast_duration` + + Parameters + ---------- + training_config : mdp.Config + The training config to base the inference config on + forecast_analysis_time : datetime.datetime + The analysis time to use for the inference config + forecast_duration : datetime.timedelta + The forecast duration to use for the inference config + time_dimensions : list[str], optional + The list of time dimensions to replace `time` with, for example + replacing `time` with [`analysis_time`, `elapsed_forecast_duration`], + the first dimension is assumed to be the sampling dimension (e.g. the + analysis time) + overwrite_input_paths : Dict[str, str], optional + A dictionary of input names and paths to overwrite in the training config, + by default {} + + Returns + ------- + mdp.Config + The inference config + """ + # the new sampling dimension is `analysis_time` + old_sampling_dim = "time" + if not isinstance(time_dimensions, list) or len(time_dimensions) == 0: + raise ValueError( + "time_dimensions must be a non-empty list of strings, got " + f"{time_dimensions}" + ) + sampling_dim = time_dimensions[0] + # instead of only having `time` as dimension, the input forecast datasets + # have two dimensions that describe the time value [analysis_time, + # elapsed_forecast_duration] + dim_replacements = dict( + time=time_dimensions, + ) + # there will be a single split called "test" + # split_name = "test" + # which will have a single time slice, given by the analysis time argument + # to the script + sampling_coord_range = dict( + start=forecast_analysis_time, + end=forecast_analysis_time + forecast_duration, + ) + + inference_config = copy.deepcopy(training_config) + + if len(overwrite_input_paths) > 0: + for key, value in overwrite_input_paths.items(): + if key not in training_config.inputs: + raise ValueError( + f"Key {key} not found in config inputs. " + f"Available keys are: {list(training_config.inputs.keys())}" + ) + logger.info( + f"Overwriting input path for {key} with {value} previously " + f"{training_config.inputs[key].path}" + ) + inference_config.inputs[key].path = value + + # setup the split (test) for the dataset with a coordinate range along the + # sampling dimension (analysis_time) of length 1 + # XXX: this can't currently be used, as in we have to have train, val and + # test splits for now (see below) + # inference_config.output.splitting = mdp_config.Splitting( + # dim=sampling_dim, + # splits={split_name: mdp_config.Split(**sampling_coord_range)}, + # ) + + # XXX: currently (as of 0.4.0) neural-lam requires that `train`, `val` and + # `test` splits are always present, even if they are not used. So we + # create empty `train` and `val` splits here + inference_config.output.splitting = mdp_config.Splitting( + dim="time", + splits={ + "train": mdp_config.Split( + start=forecast_analysis_time, end=forecast_analysis_time + ), + "val": mdp_config.Split( + start=forecast_analysis_time, end=forecast_analysis_time + ), + "test": mdp_config.Split( + start=forecast_analysis_time, + end=forecast_analysis_time + forecast_duration, + ), + }, + ) + + # ensure the output data is sampled along the sampling dimension + # (analysis_time) too + inference_config.output.coord_ranges = { + sampling_dim: mdp_config.Range(**sampling_coord_range) + } + + inference_config.output.chunking = {sampling_dim: 1} + + # replace old sampling_dimension (time) dimension in outputs with + # [`analysis_time`, `elapsed_forecast_time`] + for variable, dims in training_config.output.variables.items(): + if old_sampling_dim in dims: + orig_sampling_dim_index = dims.index(old_sampling_dim) + dims.remove(old_sampling_dim) + for dim in dim_replacements[old_sampling_dim][::-1]: + dims.insert(orig_sampling_dim_index, dim) + inference_config.output.variables[variable] = dims + logger.info( + f"Replaced {old_sampling_dim} dimension with" + f" {dim_replacements[old_sampling_dim]} for {variable}" + ) + + # these dimensions should also be "renamed" from the input datasets + for input_name in training_config.inputs.keys(): + if "time" in training_config.inputs[input_name].dim_mapping: + dims = training_config.inputs[input_name].dims + orig_sampling_dim_index = dims.index(old_sampling_dim) + dims.remove(old_sampling_dim) + for dim in dim_replacements[old_sampling_dim][::-1]: + dims.insert(orig_sampling_dim_index, dim) + inference_config.inputs[input_name].dims = dims + + del inference_config.inputs[input_name].dim_mapping[ + old_sampling_dim + ] + + # add new "rename" dim-mappins for `analysis_time` and + # `elapsed_forecast_duration` + for dim in dim_replacements[old_sampling_dim]: + inference_config.inputs[input_name].dim_mapping[ + dim + ] = mdp_config.DimMapping(method="rename", dim=dim) + + return inference_config + + +def _prepare_inference_dataset_zarr( + datastore_name: str, + datastore_input_paths: Dict[str, str], + fp_inference_workdir: str, + analysis_time: datetime.datetime, + forecast_duration: datetime.timedelta, + time_dimensions: list[str], +) -> str: + """ + Prepare the inference dataset for a single datastore. + + Parameters + ---------- + datastore_name : str + The name of the datastore to prepare the inference dataset for, this + sets the expected path of the training datastore config and stats. + datastore_input_paths : Dict[str, str] + A dictionary of input names and paths to overwrite in the training + config. + fp_inference_workdir : str + The path to the inference working directory, where the inference + datastore config(s) and zarr dataset(s) will be saved. + analysis_time : datetime.datetime + The analysis time to use for the inference dataset. + forecast_duration : datetime.timedelta + The forecast duration to use for the inference dataset. + time_dimensions : list[str] + The list of time dimensions to replace `time` with, for example + replacing `time` with [`analysis_time`, `elapsed_forecast_duration`] + + Returns + ------- + str + The path to the inference datastore config file. The inference dataset + is saved as a zarr store in the same directory as the config file, with + the same name but with a .zarr extension instead of .yaml. + """ + # fp_training_datastore_stats = ( + # f"inference_artifact/stats/{datastore_name}.datastore.stats.zarr" + # ) + fp_training_datastore_stats = ( + f"inference_artifact/stats/{datastore_name}.stats.zarr" + ) + ds_stats = xr.open_dataset(fp_training_datastore_stats) + logger.debug(f"Opened stats dataset: {ds_stats}") + + # fp_training_datastore_config = ( + # f"inference_artifact/configs/{datastore_name}.datastore.yaml" + # ) + fp_training_datastore_config = ( + f"inference_artifact/configs/{datastore_name}.yaml" + ) + + logger.debug( + f"Loading training datastore config from {fp_training_datastore_config}" + ) + datastore_training_config = mdp.Config.from_yaml_file( + fp_training_datastore_config + ) + + inference_config = _create_inference_datastore_config( + training_config=datastore_training_config, + forecast_analysis_time=analysis_time, + forecast_duration=forecast_duration, + overwrite_input_paths=datastore_input_paths, + time_dimensions=time_dimensions, + ) + + fp_inference_datastore_config = ( + f"{fp_inference_workdir}/{datastore_name}.datastore.yaml" + ) + + Path(fp_inference_datastore_config).parent.mkdir( + parents=True, exist_ok=True + ) + logger.info( + f"Saving inference datastore config to {fp_inference_datastore_config}" + ) + + # neural-lam's convention is to have the same name for the zarr store + # as the config file, but with .zarr extension + fp_dataset = fp_inference_datastore_config.replace(".yaml", ".zarr") + inference_config.to_yaml_file(fp_inference_datastore_config) + + ds = mdp.create_dataset(config=inference_config, ds_stats=ds_stats) + logger.info(f"Writing inference dataset to {fp_dataset}") + ds.to_zarr(fp_dataset) + + return fp_inference_datastore_config + + +def _prepare_all_inference_dataset_zarr( + analysis_time: datetime.datetime, + forecast_duration: datetime.timedelta, + datastore_input_paths: Dict[str, Dict[str, str]], + fp_inference_workdir: str, + time_dimensions: list[str], +) -> str: + """ + Prepare the inference dataset. + + Parameters + ---------- + analysis_time : datetime.datetime + The analysis time to use for the inference dataset(s). + forecast_duration : datetime.timedelta + The forecast duration to use for the inference dataset(s). + datastore_input_paths : Dict[str, Dict[str,str]] + A dictionary of datastore names and their corresponding input names + and paths to overwrite in the training config. + fp_inference_workdir : str + The path to the inference working directory, where the inference + datastore config(s) and zarr dataset(s) will be saved. + time_dimensions : list[str] + The list of time dimensions to replace `time` with, for example + replacing `time` with [`analysis_time`, `elapsed_forecast_duration`] + + Returns + ------- + Dict[str, str] + A dictionary of datastore names and the path to their corresponding + inference datastore config file. The inference dataset is saved as a + zarr store in the same directory as the config file, with the same + name but with a .zarr extension instead of .yaml. + """ + fps_datastore_configs = {} + for datastore_name, input_paths in datastore_input_paths.items(): + logger.info(f"Processing {datastore_name} datastore for inference") + fp_training_datastore_config = _prepare_inference_dataset_zarr( + datastore_name=datastore_name, + datastore_input_paths=input_paths, + fp_inference_workdir=fp_inference_workdir, + analysis_time=analysis_time, + forecast_duration=forecast_duration, + time_dimensions=time_dimensions, + ) + + fps_datastore_configs[datastore_name] = fp_training_datastore_config + + return fps_datastore_configs + + +def _create_inference_config( + fps_inference_datastore_config: Dict[str, str], fp_inference_workdir: str +) -> str: + """ + Create the inference config file for neural-lam, updating the datastore + config paths to point to the inference datastore config files. + + Parameters + ---------- + fps_inference_datastore_config : Dict[str, str] + A dictionary of datastore names and the path to their corresponding + inference datastore config file. + fp_inference_workdir : str + The path to the inference working directory, where the inference + config file will be saved. + + Returns + ------- + str + The path to the inference config file. + """ + training_config = NeuralLAMConfig.from_yaml_file(FP_TRAINING_CONFIG) + inference_config = copy.deepcopy(training_config) + + fp_inference_config = f"{fp_inference_workdir}/config.yaml" + + def _set_datastore_config_path(node: DatastoreSelection, fp: str): + node.config_path = Path(fp).relative_to( + Path(fp_inference_config).parent + ) + # XXX: There is a bug in neural-lam here that means that the datastore kind + # doesn't correctly get serialised to a string in the config file when + # saved to yaml + node.kind = str(node.kind) + + # see if the neural-lam config was for single or multiple datastores + if hasattr(training_config, "datastores"): + # using multiple datastores + for ( + datastore_name, + fp_datastore_config, + ) in fps_inference_datastore_config.items(): + if datastore_name not in inference_config.datastores: + raise ValueError( + f"Datastore {datastore_name} not found in training config. " + f"Available datastores are: " + f"{list(inference_config.datastores.keys())}" + ) + _set_datastore_config_path( + node=inference_config.datastores[datastore_name], + fp=fp_datastore_config, + ) + else: + fp_datastore_config = list(fps_inference_datastore_config.values())[0] + # using a single datastore + _set_datastore_config_path( + node=inference_config.datastore, fp=fp_datastore_config + ) + + inference_config.to_yaml_file(fp_inference_config) + logger.info(f"Saved inference config to {fp_inference_config}") + + return fp_inference_config + + +@logger.catch(reraise=True) +def main(): + env_vars = _parse_env_vars() + # convert analysis time to UTC and strip timezone info + analysis_time = ( + env_vars["ANALYSIS_TIME"] + .astimezone(datetime.timezone.utc) + .replace(tzinfo=None) + ) + + fps_inference_datastore_config = _prepare_all_inference_dataset_zarr( + analysis_time=analysis_time, + forecast_duration=env_vars["FORECAST_DURATION"], + datastore_input_paths=env_vars["DATASTORE_INPUT_PATHS"], + fp_inference_workdir=env_vars["INFERENCE_WORKDIR"], + time_dimensions=env_vars["TIME_DIMENSIONS"], + ) + _create_inference_config( + fps_inference_datastore_config=fps_inference_datastore_config, + fp_inference_workdir=env_vars["INFERENCE_WORKDIR"], + ) + + +if __name__ == "__main__": + with_debugger = os.getenv("MLWM_DEBUGGER", "0") + if with_debugger == "ipdb": + import ipdb + + with ipdb.launch_ipdb_on_exception(): + main() + else: + main() diff --git a/configurations/surface-dummy-model_DINI/run_inference_container.sh b/configurations/surface-dummy-model_DINI/run_inference_container.sh index 681ed56..61d478e 100755 --- a/configurations/surface-dummy-model_DINI/run_inference_container.sh +++ b/configurations/surface-dummy-model_DINI/run_inference_container.sh @@ -58,19 +58,34 @@ DATASTORE_INPUT_PATHS="danra.danra_surface=${DINI_ZARR},danra.danra_static=${DIN TIME_DIMENSIONS="time" INFERENCE_WORKDIR="$(pwd)/inference_workdir/" -podman run --rm \ - --device /dev/nvidia0 \ - --device /dev/nvidiactl \ - --device /dev/nvidia-uvm \ - --device /dev/nvidia-uvm-tools \ - --device /dev/nvidia-modeset \ - -v /lib/x86_64-linux-gnu/libcuda.so.1:/lib/x86_64-linux-gnu/libcuda.so.1:ro \ - -v /lib/x86_64-linux-gnu/libnvidia-ml.so.1:/lib/x86_64-linux-gnu/libnvidia-ml.so.1:ro \ - -v /lib/x86_64-linux-gnu/libnvidia-ptxjitcompiler.so.1:/lib/x86_64-linux-gnu/libnvidia-ptxjitcompiler.so.1:ro \ - --shm-size=32g \ - -v ${INFERENCE_WORKDIR}:/workspace/inference_workdir:Z \ - -e DATASTORE_INPUT_PATHS="${DATASTORE_INPUT_PATHS}" \ - -e TIME_DIMENSIONS="${TIME_DIMENSIONS}" \ - -e ANALYSIS_TIME="${ANALYSIS_TIME}" \ - -e FORECAST_DURATION="${FORECAST_DURATION}" \ - localhost/surface-dummy-model_dini:latest +# Check if hostname starts with "spark" to determine GPU device mounting strategy +if [[ $HOSTNAME == spark* ]]; then + # On spark devices: use --gpus all flag + podman run --rm \ + --gpus all \ + --shm-size=32g \ + -v ${INFERENCE_WORKDIR}:/workspace/inference_workdir:Z \ + -e DATASTORE_INPUT_PATHS="${DATASTORE_INPUT_PATHS}" \ + -e TIME_DIMENSIONS="${TIME_DIMENSIONS}" \ + -e ANALYSIS_TIME="${ANALYSIS_TIME}" \ + -e FORECAST_DURATION="${FORECAST_DURATION}" \ + localhost/surface-dummy-model_dini:latest +else + # On non-spark devices: use manual device mounting + podman run --rm \ + --device /dev/nvidia0 \ + --device /dev/nvidiactl \ + --device /dev/nvidia-uvm \ + --device /dev/nvidia-uvm-tools \ + --device /dev/nvidia-modeset \ + -v /lib/x86_64-linux-gnu/libcuda.so.1:/lib/x86_64-linux-gnu/libcuda.so.1:ro \ + -v /lib/x86_64-linux-gnu/libnvidia-ml.so.1:/lib/x86_64-linux-gnu/libnvidia-ml.so.1:ro \ + -v /lib/x86_64-linux-gnu/libnvidia-ptxjitcompiler.so.1:/lib/x86_64-linux-gnu/libnvidia-ptxjitcompiler.so.1:ro \ + --shm-size=32g \ + -v ${INFERENCE_WORKDIR}:/workspace/inference_workdir:Z \ + -e DATASTORE_INPUT_PATHS="${DATASTORE_INPUT_PATHS}" \ + -e TIME_DIMENSIONS="${TIME_DIMENSIONS}" \ + -e ANALYSIS_TIME="${ANALYSIS_TIME}" \ + -e FORECAST_DURATION="${FORECAST_DURATION}" \ + localhost/surface-dummy-model_dini:latest +fi \ No newline at end of file