Skip to content
42 changes: 42 additions & 0 deletions fre/analysis/base_class.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import json


class AnalysisScript(object):
"""Abstract base class for analysis scripts. User-defined analysis scripts
should inhert from this class and override the requires and run_analysis methods.

Check warning on line 6 in fre/analysis/base_class.py

View workflow job for this annotation

GitHub Actions / spellcheck

Misspelled word (inhert) Suggestions: (inherit*)

Attributes:
description: Longer form description for the analysis.
title: Title that describes the analysis.
"""
def __init__(self):
"""Instantiates an object. The user should provide a description and title."""
raise NotImplementedError("you must override this function.")
self.description = None
self.title = None

def requires(self):
"""Provides metadata describing what is needed for this analysis to run.

Returns:
A json string describing the metadata.
"""
raise NotImplementedError("you must override this function.")
return json.dumps("{json of metadata MDTF format.}")

def run_analysis(self, yaml, name, date_range, scripts_dir, output_dir, output_yaml):
"""Runs the analysis and generates all plots and associated datasets.

Args:
yaml: Path to a model yaml
name: Name of the analysis as specified in the yaml
date_range: Time span to use for analysis (YYYY-MM-DD,YYYY-MM-DD)
scripts_dir: Path to a directory to save intermediate scripts
output_dir: Path to a directory to save figures
output_yaml: Path to use as an structured output yaml file

Returns:
A list of png figures.
"""
raise NotImplementedError("you must override this function.")
return ["figure1.png", "figure2.png",]
132 changes: 132 additions & 0 deletions fre/analysis/env_tool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
from pathlib import Path
from subprocess import CalledProcessError, PIPE, run, STDOUT
from tempfile import TemporaryDirectory
import venv


def _process_output(output):
"""Converts bytes string to list of String lines.

Args:
output: Bytes string.

Returns:
List of strings.
"""
return [x for x in output.decode("utf-8").split("\n") if x]


class VirtualEnvManager(object):
"""Helper class for creating/running simple command in a virtual environment."""
def __init__(self, path):
self.path = Path(path)
self.activate = f"source {self.path / 'bin' / 'activate'}"

@staticmethod
def _execute(commands):
"""Runs input commands through bash in a child process.

Args:
commands: List of string commands.

Returns:
List of string output.
"""
with TemporaryDirectory() as tmp:
script_path = Path(tmp) / "script"
with open(script_path, "w") as script:
script.write("\n".join(commands))
try:
process = run(["bash", str(script_path)], stdout=PIPE, stderr=STDOUT,
check=True)
except CalledProcessError as err:
for line in _process_output(err.output):
print(line)
raise
return _process_output(process.stdout)

def _execute_python_script(self, commands):
"""Runs input python code in bash in a child process.

Args:
commands: List of string python code lines.

Returns:
List of string output.
"""
with TemporaryDirectory() as tmp:
script_path = Path(tmp) / "python_script"
with open(script_path, "w") as script:
script.write("\n".join(commands))
commands = [self.activate, f"python3 {str(script_path)}"]
return self._execute(commands)

def create_env(self):
"""Creates the virtual environment."""
venv.create(self.path, with_pip=True)

def destroy_env(self):
"""Destroys the virtual environment."""
raise NotImplementedError("this feature is not implemented yet.")

def install_package(self, name):
"""Installs a package in the virtual environment.

Args:
name: String name of the package.

Returns:
List of string output.
"""
commands = [self.activate, "python3 -m pip --upgrade pip",
f"python3 -m pip install {name}"]
return self._execute(commands)

def list_plugins(self):
"""Returns a list of plugins that are available in the virtual environment.

Returns:
List of plugins.
"""
python_script = [
"from analysis_scripts import available_plugins",
"for plugin in available_plugins():",
" print(plugin)"
]
return self._execute_python_script(python_script)

def run_analysis_plugin(self, name, catalog, output_directory, config=None):
"""Returns a list of paths to figures created by the plugin from the virtual
environment.

Args:
name: String name of the analysis package.
catalog: Path to the data catalog.
output_directory: Path to the output directory.

Returns:
List of figure paths.
"""
if config:
python_script = [f"config = {str(config)}",]
else:
python_script = ["config = None",]
python_script += [
"from analysis_scripts import run_plugin",
f"paths = run_plugin('{name}', '{catalog}', '{output_directory}', config=config)",
"for path in paths:",
" print(path)"
]
return self._execute_python_script(python_script)

def uninstall_package(self, name):
"""Uninstalls a package from the virtual environment.

Args:
name: String name of the package.

Returns:
List of string output.
"""
commands = [self.activate, f"pip uninstall {name}"]
return self._execute(commands)
23 changes: 10 additions & 13 deletions fre/analysis/freanalysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,16 @@ def list(library_directory):


@analysis_cli.command()
@click.option("--name", type=str, required=True, help="Name of the analysis script.")
@click.option("--catalog", type=str, required=True, help="Path to the data catalog.")
@click.option("--output-directory", type=str, required=True,
help="Path to the output directory.")
@click.option("--output-yaml", type=str, required=True, help="Path to the output yaml.")
@click.option("--experiment-yaml", type=str, required=True, help="Path to the experiment yaml.")
@click.option("--library-directory", type=str, required=False,
help="Path to a custom lib directory.")
def run(name, catalog, output_directory, output_yaml, experiment_yaml,
library_directory):
"""Runs the analysis script and writes the paths to the created figures to a yaml file."""
run_analysis(name, catalog, output_directory, output_yaml, experiment_yaml,
library_directory)
@click.option("--yaml", type=str, required=True, help="Path to the model yaml")
@click.option("--name", type=str, required=True, help="Name of the analysis script")
@click.option("--date_range", type=str, required=True, help="Time span to use for analysis (YYYY-MM-DD,YYYY-MM-DD)")
@click.option("--scripts_dir", type=str, required=True, help="Path to a directory to save intermediate scripts")
@click.option("--output_dir", type=str, required=True, help="Path to a directory to save figures")
@click.option("--output_yaml", type=str, required=True, help="Path to use as an structured output yaml file")
def run(yaml, name, date_range, scripts_dir, output_dir, output_yaml):
"""Runs the analysis and generates all plots and associated datasets.
"""
run_analysis(yaml, name, date_range, scripts_dir, output_dir, output_yaml)


@analysis_cli.command()
Expand Down
1 change: 1 addition & 0 deletions fre/analysis/plugins/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .esnb import freanalysis_esnb

Check warning on line 1 in fre/analysis/plugins/__init__.py

View workflow job for this annotation

GitHub Actions / spellcheck

Unknown word (esnb)

Check warning on line 1 in fre/analysis/plugins/__init__.py

View workflow job for this annotation

GitHub Actions / spellcheck

Unknown word (esnb)
80 changes: 80 additions & 0 deletions fre/analysis/plugins/esnb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import logging
from pathlib import Path, PurePosixPath
import requests
import subprocess
import sys
from ..base_class import AnalysisScript

fre_logger = logging.getLogger(__name__)

class freanalysis_esnb(AnalysisScript):

Check warning on line 10 in fre/analysis/plugins/esnb.py

View workflow job for this annotation

GitHub Actions / spellcheck

Unknown word (esnb)
"""Defines run and report-requirements methods for ESNB flavor usage

Check warning on line 11 in fre/analysis/plugins/esnb.py

View workflow job for this annotation

GitHub Actions / spellcheck

Unknown word (ESNB)
"""

def __init__(self):
self.description = "Wrapper to access analysis framework for ESNB scripts"
self.title = "ESNB"

def run_analysis(self, config, name, date_range, scripts_dir, output_dir, output_yaml, pp_dir):
"""Runs the ESNB analysis specified in the yaml and the runtime options

Args:
config: Dictionary of specific configuration for the script
name: Name of the analysis as specified in the yaml
date_range: Time span to use for analysis (YYYY-MM-DD,YYYY-MM-DD)
scripts_dir: Path to a directory to save intermediate scripts
output_dir: Path to a directory to save figures
output_yaml: Path to use as an structured output yaml file
pp_dir: Path to input postprocessed files
"""

# save notebook to scripts_dir
url = config["notebook_path"]
# convert to the "Raw" URL
# replace 'github.com' with 'raw.githubusercontent.com' and remove '/blob'
raw_url = url.replace("github.com", "raw.githubusercontent.com").replace("/blob/", "/")
local_filename = Path(scripts_dir) / PurePosixPath(url).name
with requests.get(raw_url) as r:
r.raise_for_status() # Check for HTTP errors (404, 500, etc.)
with open(local_filename, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
fre_logger.debug(f"ESNB notebook saved to '{local_filename}'")

# create run_settings dictionary
run_settings = {
'conda_env_root': config["conda_env_root"],
'notebook_path': local_filename,
'outdir': output_dir,
'scripts_dir': scripts_dir
}

# create case_settings dictionary
split_date = date_range.split(",")
case_settings = {
'PP_DIR': pp_dir,
'date_range': split_date
}

try:
import esnb.engine
except ImportError:
raise ImportError(
"The 'esnb' package is required for ESNB analysis but is not installed. "
"Please install it using: pip install esnb"
)

# write the python script that runs the notebook
python_script = esnb.engine.canopy_launcher(run_settings, case_settings, verbose=True)
fre_logger.debug(f"ESNB python wrapper saved to '{python_script}'")

# run the python script
fre_logger.info(f"Running ESNB analysis script: {python_script}")
result = subprocess.run([sys.executable, python_script], check=True, capture_output=True, text=True)
fre_logger.debug(f"ESNB script output: {result.stdout}")
if result.stderr:
fre_logger.warning(f"ESNB script stderr: {result.stderr}")

# Return empty list for now, as ESNB might not generate PNGs directly
return []

Loading
Loading