Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions runscripts/fesom2/fesom2.6-albedo-hooks-test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# FESOM2 Mini Test Configuration for Hook Event Testing on Albedo
# Purpose: Short 3-day run with 2 restarts to test event-driven hooks
# Usage: esm_runscripts fesom2.6-albedo-hooks-test.yaml -e hooks_test_001

general:
setup_name: fesom
compute_time: "00:30:00"
initial_date: '2020-01-01'
final_date: '2020-01-03'
nyear: 0
nmonth: 0
nday: 1 # 1-day runs to generate restarts
use_venv: False

fesom:
version: 2.6
mesh_dir: "/albedo/pool/FESOM2/core2"
restart_rate: 1
restart_first: 1
restart_unit: 'd'
resolution: "CORE2"
lresume: false
time_step: 1800
nproc: 96

# Expected behavior:
# - Day 1: Initial run (lresume=false)
# - Day 2: Restart run 1 (lresume=true)
# - Day 3: Restart run 2 (lresume=true)
#
# Hook events to watch for each run:
# - esm_master:get-fesom-2.6:start/complete (if model needs fetching)
# - esm_master:comp-fesom-2.6:start/complete (if compilation needed)
# - esm_runscripts:recipe:prepexp:start/complete (experiment preparation)
# - esm_runscripts:recipe:prepexp:<step>:start/complete (individual prep steps)
# - esm_runscripts:recipe:compute:start/complete (model execution)
# - esm_runscripts:recipe:compute:<step>:start/complete (e.g., namelist_changes, copy_files_to_work)
# - esm_runscripts:recipe:tidy_and_resubmit:start/complete (cleanup and resubmission)
21 changes: 20 additions & 1 deletion src/esm_master/esm_master.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@
import os
import yaml

# Non-invasive hook system integration
try:
from esm_tools.hooks import trigger_hook
except ImportError:
# Fallback if hooks not available
def trigger_hook(*args, **kwargs):
pass

from . import database_actions

from .general_stuff import (
Expand Down Expand Up @@ -107,8 +115,19 @@ def main_flow(parsed_args, target):
print("esm_master: check mode is activated. Not executing the actions above")
return 0

# Trigger hook before task starts (using full task name like "get-fesom-2.5")
task_name = user_task.raw_name
trigger_hook(f"esm_master:{task_name}:start", config=complete_config, target=target, task=task_name)

try:
user_task.execute(ignore_errors) # env)

user_task.execute(ignore_errors) # env)
# Trigger hook after successful task completion
trigger_hook(f"esm_master:{task_name}:complete", config=complete_config, target=target, task=task_name)
except Exception as e:
# Trigger hook on task error
trigger_hook(f"esm_master:{task_name}:error", config=complete_config, target=target, task=task_name, error=str(e))
raise # Re-raise the exception to maintain existing error handling

database = database_actions.database_entry(
complete_config, user_task.todo, user_task.package.raw_name, ESM_MASTER_DIR
Expand Down
100 changes: 66 additions & 34 deletions src/esm_plugin_manager/esm_plugin_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@
import esm_profile
from esm_parser import yaml_file_to_dict

# Non-invasive hook system integration
try:
from esm_tools.hooks import trigger_hook
except ImportError:
# Fallback if hooks not available
def trigger_hook(*args, **kwargs):
pass


def read_recipe(recipe, additional_dict, needs_parse=True):
if needs_parse:
Expand Down Expand Up @@ -133,6 +141,10 @@ def work_through_recipe(recipe, plugins, config):
pdb.set_trace()
recipes = recipe["recipe"]
recipe_name = recipe["job_type"]

# Trigger hook at start of recipe execution
trigger_hook(f"esm_runscripts:recipe:{recipe_name}:start", config=config, recipe=recipes)

for index, workitem in enumerate(recipes, start=1):
if config["general"].get("verbose", False):
# diagnostic message of which recipe step is being executed
Expand All @@ -146,47 +158,67 @@ def work_through_recipe(recipe, plugins, config):
logger.info("=" * len(message))
logger.info(message)
logger.info("=" * len(message))
if plugins[workitem]["type"] == "core":
thismodule = __import__(plugins[workitem]["module"])
submodule = getattr(thismodule, plugins[workitem]["submodule"])
if config["general"].get("profile", False):
workitem_callable = getattr(submodule, workitem)
timed_workitem_callable = esm_profile.timing(
workitem_callable, recipe_name
)
config = timed_workitem_callable(config)
else:
config = getattr(submodule, workitem)(config)
elif plugins[workitem]["type"] == "installed":
if config["general"].get("profile", False):
workitem_callable = plugins[workitem]["callable"]
timed_workitem_callable = esm_profile.timing(
workitem_callable, recipe_name
)
config = timed_workitem_callable(config)
else:
config = plugins[workitem]["callable"](config)
else:
if sys.version_info >= (3, 5):
import importlib.util

spec = importlib.util.spec_from_file_location(
plugins[workitem]["module"],
plugins[workitem]["location"]
+ "/"
+ plugins[workitem]["module"]
+ ".py",
)
thismodule = importlib.util.module_from_spec(spec)
spec.loader.exec_module(thismodule)
# Trigger hook before each recipe step
trigger_hook(f"esm_runscripts:recipe:{recipe_name}:{workitem}:start",
config=config, workitem=workitem, step=index, total=len(recipes))

try:
if plugins[workitem]["type"] == "core":
thismodule = __import__(plugins[workitem]["module"])
submodule = getattr(thismodule, plugins[workitem]["submodule"])
if config["general"].get("profile", False):
workitem_callable = getattr(thismodule, workitem)
workitem_callable = getattr(submodule, workitem)
timed_workitem_callable = esm_profile.timing(
workitem_callable, recipe_name
)
config = timed_workitem_callable(config)
else:
config = getattr(thismodule, workitem)(config)
config = getattr(submodule, workitem)(config)
elif plugins[workitem]["type"] == "installed":
if config["general"].get("profile", False):
workitem_callable = plugins[workitem]["callable"]
timed_workitem_callable = esm_profile.timing(
workitem_callable, recipe_name
)
config = timed_workitem_callable(config)
else:
config = plugins[workitem]["callable"](config)
else:
if sys.version_info >= (3, 5):
import importlib.util

spec = importlib.util.spec_from_file_location(
plugins[workitem]["module"],
plugins[workitem]["location"]
+ "/"
+ plugins[workitem]["module"]
+ ".py",
)
thismodule = importlib.util.module_from_spec(spec)
spec.loader.exec_module(thismodule)
if config["general"].get("profile", False):
workitem_callable = getattr(thismodule, workitem)
timed_workitem_callable = esm_profile.timing(
workitem_callable, recipe_name
)
config = timed_workitem_callable(config)
else:
config = getattr(thismodule, workitem)(config)

# Trigger hook after successful step completion
trigger_hook(f"esm_runscripts:recipe:{recipe_name}:{workitem}:complete",
config=config, workitem=workitem, step=index, total=len(recipes))

except Exception as e:
# Trigger hook on step error
trigger_hook(f"esm_runscripts:recipe:{recipe_name}:{workitem}:error",
config=config, workitem=workitem, step=index, total=len(recipes), error=str(e))
raise # Re-raise to maintain existing error handling

# Trigger hook at end of recipe execution
trigger_hook(f"esm_runscripts:recipe:{recipe_name}:complete", config=config, recipe=recipes)

return config


Expand Down
10 changes: 10 additions & 0 deletions src/esm_tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@

from .error_handling import *

# Event-driven architecture support
try:
from .hooks import add_hook, trigger_hook
except ImportError:
# Hooks module not available, define no-op functions
def add_hook(*args, **kwargs):
pass
def trigger_hook(*args, **kwargs):
pass

# Setup Loguru for the following cases:
# A) If user sets
if os.environ.get("DEBUG_ESM_TOOLS"):
Expand Down
58 changes: 58 additions & 0 deletions src/esm_tools/hooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""
Minimal event hook system for esm-tools.

This module provides a simple, thread-safe hook notification system with
zero dependencies beyond Python's standard library. It's designed to be
stable and maintainable with minimal API surface.
"""

from typing import Callable, Dict, List
import threading
from datetime import datetime


class HookManager:
"""
A minimal hook manager with thread-safe hook dispatch.

Hooks are identified by string names. Callbacks are called synchronously
in the order they were registered when a hook is triggered.
"""

def __init__(self):
self._hooks: Dict[str, List[Callable]] = {}
self._lock = threading.Lock()

def add_hook(self, hook: str, callback: Callable) -> None:
"""Register a callback for a hook."""
with self._lock:
if hook not in self._hooks:
self._hooks[hook] = []
self._hooks[hook].append(callback)

def trigger_hook(self, hook: str, *args, **kwargs) -> None:
"""Trigger a hook, calling all registered callbacks."""
with self._lock:
callbacks = self._hooks.get(hook, []).copy()

for callback in callbacks:
try:
callback(*args, **kwargs)
except Exception:
# Silently ignore callback errors to prevent one bad
# callback from breaking the entire hook chain
pass


# Global singleton instance
_manager = HookManager()


def add_hook(hook: str, callback: Callable) -> None:
"""Register a callback for a hook on the global manager."""
_manager.add_hook(hook, callback)


def trigger_hook(hook: str, *args, **kwargs) -> None:
"""Trigger a hook on the global manager."""
_manager.trigger_hook(hook, *args, **kwargs)
97 changes: 97 additions & 0 deletions tests/test_hooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""
Unit tests for hook system
"""

import pytest
from esm_tools.hooks import HookManager, add_hook, trigger_hook


class TestHookManager:
"""Test HookManager class"""

def test_add_hook(self):
"""Test basic hook registration"""
manager = HookManager()
called = []

manager.add_hook("test:event", lambda: called.append(1))
manager.trigger_hook("test:event")

assert len(called) == 1

def test_hook_with_args(self):
"""Test hooks receive arguments"""
manager = HookManager()
received = []

manager.add_hook("test:event", lambda x, y: received.append((x, y)))
manager.trigger_hook("test:event", 1, 2)

assert received == [(1, 2)]

def test_hook_with_kwargs(self):
"""Test hooks receive keyword arguments"""
manager = HookManager()
received = []

manager.add_hook("test:event", lambda **kw: received.append(kw))
manager.trigger_hook("test:event", a=1, b=2)

assert received == [{"a": 1, "b": 2}]

def test_multiple_hooks(self):
"""Test multiple callbacks for same hook"""
manager = HookManager()
results = []

manager.add_hook("test:event", lambda: results.append("a"))
manager.add_hook("test:event", lambda: results.append("b"))
manager.trigger_hook("test:event")

assert results == ["a", "b"]

def test_error_isolation(self):
"""Test one bad callback doesn't break others"""
manager = HookManager()
results = []

manager.add_hook("test:event", lambda: 1/0) # Will raise
manager.add_hook("test:event", lambda: results.append("ok"))
manager.trigger_hook("test:event")

assert results == ["ok"]

def test_nonexistent_hook(self):
"""Test triggering nonexistent hook doesn't error"""
manager = HookManager()
manager.trigger_hook("nonexistent:hook")
# Should not raise


class TestGlobalHooks:
"""Test global hook functions"""

def test_global_add_hook(self):
"""Test global add_hook function"""
called = []
add_hook("global:test", lambda: called.append(1))
trigger_hook("global:test")

assert len(called) >= 1 # May have been called by other tests

def test_hook_order(self):
"""Test hooks called in registration order"""
results = []

add_hook("order:test", lambda: results.append(1))
add_hook("order:test", lambda: results.append(2))
add_hook("order:test", lambda: results.append(3))

trigger_hook("order:test")

# Should be called in order [1, 2, 3]
assert results == [1, 2, 3]


if __name__ == "__main__":
pytest.main([__file__, "-v"])
Loading
Loading