diff --git a/runscripts/fesom2/fesom2.6-albedo-hooks-test.yaml b/runscripts/fesom2/fesom2.6-albedo-hooks-test.yaml new file mode 100644 index 000000000..09849aebc --- /dev/null +++ b/runscripts/fesom2/fesom2.6-albedo-hooks-test.yaml @@ -0,0 +1,38 @@ +# FESOM2 Mini Test Configuration for Hook Event Testing on Albedo +# Purpose: Short 3-day run with 2 restarts to test event-driven hooks +# Usage: esm_runscripts fesom2.6-albedo-hooks-test.yaml -e hooks_test_001 + +general: + setup_name: fesom + compute_time: "00:30:00" + initial_date: '2020-01-01' + final_date: '2020-01-03' + nyear: 0 + nmonth: 0 + nday: 1 # 1-day runs to generate restarts + use_venv: False + +fesom: + version: 2.6 + mesh_dir: "/albedo/pool/FESOM2/core2" + restart_rate: 1 + restart_first: 1 + restart_unit: 'd' + resolution: "CORE2" + lresume: false + time_step: 1800 + nproc: 96 + +# Expected behavior: +# - Day 1: Initial run (lresume=false) +# - Day 2: Restart run 1 (lresume=true) +# - Day 3: Restart run 2 (lresume=true) +# +# Hook events to watch for each run: +# - esm_master:get-fesom-2.6:start/complete (if model needs fetching) +# - esm_master:comp-fesom-2.6:start/complete (if compilation needed) +# - esm_runscripts:recipe:prepexp:start/complete (experiment preparation) +# - esm_runscripts:recipe:prepexp::start/complete (individual prep steps) +# - esm_runscripts:recipe:compute:start/complete (model execution) +# - esm_runscripts:recipe:compute::start/complete (e.g., namelist_changes, copy_files_to_work) +# - esm_runscripts:recipe:tidy_and_resubmit:start/complete (cleanup and resubmission) diff --git a/src/esm_master/esm_master.py b/src/esm_master/esm_master.py index 9331adf34..7e3652dad 100644 --- a/src/esm_master/esm_master.py +++ b/src/esm_master/esm_master.py @@ -6,6 +6,14 @@ import os import yaml +# Non-invasive hook system integration +try: + from esm_tools.hooks import trigger_hook +except ImportError: + # Fallback if hooks not available + def trigger_hook(*args, **kwargs): + pass + from . import database_actions from .general_stuff import ( @@ -107,8 +115,19 @@ def main_flow(parsed_args, target): print("esm_master: check mode is activated. Not executing the actions above") return 0 + # Trigger hook before task starts (using full task name like "get-fesom-2.5") + task_name = user_task.raw_name + trigger_hook(f"esm_master:{task_name}:start", config=complete_config, target=target, task=task_name) + + try: + user_task.execute(ignore_errors) # env) - user_task.execute(ignore_errors) # env) + # Trigger hook after successful task completion + trigger_hook(f"esm_master:{task_name}:complete", config=complete_config, target=target, task=task_name) + except Exception as e: + # Trigger hook on task error + trigger_hook(f"esm_master:{task_name}:error", config=complete_config, target=target, task=task_name, error=str(e)) + raise # Re-raise the exception to maintain existing error handling database = database_actions.database_entry( complete_config, user_task.todo, user_task.package.raw_name, ESM_MASTER_DIR diff --git a/src/esm_plugin_manager/esm_plugin_manager.py b/src/esm_plugin_manager/esm_plugin_manager.py index 02d18ecaf..b0ae37019 100644 --- a/src/esm_plugin_manager/esm_plugin_manager.py +++ b/src/esm_plugin_manager/esm_plugin_manager.py @@ -10,6 +10,14 @@ import esm_profile from esm_parser import yaml_file_to_dict +# Non-invasive hook system integration +try: + from esm_tools.hooks import trigger_hook +except ImportError: + # Fallback if hooks not available + def trigger_hook(*args, **kwargs): + pass + def read_recipe(recipe, additional_dict, needs_parse=True): if needs_parse: @@ -133,6 +141,10 @@ def work_through_recipe(recipe, plugins, config): pdb.set_trace() recipes = recipe["recipe"] recipe_name = recipe["job_type"] + + # Trigger hook at start of recipe execution + trigger_hook(f"esm_runscripts:recipe:{recipe_name}:start", config=config, recipe=recipes) + for index, workitem in enumerate(recipes, start=1): if config["general"].get("verbose", False): # diagnostic message of which recipe step is being executed @@ -146,47 +158,67 @@ def work_through_recipe(recipe, plugins, config): logger.info("=" * len(message)) logger.info(message) logger.info("=" * len(message)) - if plugins[workitem]["type"] == "core": - thismodule = __import__(plugins[workitem]["module"]) - submodule = getattr(thismodule, plugins[workitem]["submodule"]) - if config["general"].get("profile", False): - workitem_callable = getattr(submodule, workitem) - timed_workitem_callable = esm_profile.timing( - workitem_callable, recipe_name - ) - config = timed_workitem_callable(config) - else: - config = getattr(submodule, workitem)(config) - elif plugins[workitem]["type"] == "installed": - if config["general"].get("profile", False): - workitem_callable = plugins[workitem]["callable"] - timed_workitem_callable = esm_profile.timing( - workitem_callable, recipe_name - ) - config = timed_workitem_callable(config) - else: - config = plugins[workitem]["callable"](config) - else: - if sys.version_info >= (3, 5): - import importlib.util - spec = importlib.util.spec_from_file_location( - plugins[workitem]["module"], - plugins[workitem]["location"] - + "/" - + plugins[workitem]["module"] - + ".py", - ) - thismodule = importlib.util.module_from_spec(spec) - spec.loader.exec_module(thismodule) + # Trigger hook before each recipe step + trigger_hook(f"esm_runscripts:recipe:{recipe_name}:{workitem}:start", + config=config, workitem=workitem, step=index, total=len(recipes)) + + try: + if plugins[workitem]["type"] == "core": + thismodule = __import__(plugins[workitem]["module"]) + submodule = getattr(thismodule, plugins[workitem]["submodule"]) if config["general"].get("profile", False): - workitem_callable = getattr(thismodule, workitem) + workitem_callable = getattr(submodule, workitem) timed_workitem_callable = esm_profile.timing( workitem_callable, recipe_name ) config = timed_workitem_callable(config) else: - config = getattr(thismodule, workitem)(config) + config = getattr(submodule, workitem)(config) + elif plugins[workitem]["type"] == "installed": + if config["general"].get("profile", False): + workitem_callable = plugins[workitem]["callable"] + timed_workitem_callable = esm_profile.timing( + workitem_callable, recipe_name + ) + config = timed_workitem_callable(config) + else: + config = plugins[workitem]["callable"](config) + else: + if sys.version_info >= (3, 5): + import importlib.util + + spec = importlib.util.spec_from_file_location( + plugins[workitem]["module"], + plugins[workitem]["location"] + + "/" + + plugins[workitem]["module"] + + ".py", + ) + thismodule = importlib.util.module_from_spec(spec) + spec.loader.exec_module(thismodule) + if config["general"].get("profile", False): + workitem_callable = getattr(thismodule, workitem) + timed_workitem_callable = esm_profile.timing( + workitem_callable, recipe_name + ) + config = timed_workitem_callable(config) + else: + config = getattr(thismodule, workitem)(config) + + # Trigger hook after successful step completion + trigger_hook(f"esm_runscripts:recipe:{recipe_name}:{workitem}:complete", + config=config, workitem=workitem, step=index, total=len(recipes)) + + except Exception as e: + # Trigger hook on step error + trigger_hook(f"esm_runscripts:recipe:{recipe_name}:{workitem}:error", + config=config, workitem=workitem, step=index, total=len(recipes), error=str(e)) + raise # Re-raise to maintain existing error handling + + # Trigger hook at end of recipe execution + trigger_hook(f"esm_runscripts:recipe:{recipe_name}:complete", config=config, recipe=recipes) + return config diff --git a/src/esm_tools/__init__.py b/src/esm_tools/__init__.py index 2f43251b0..6e7b4c8d2 100644 --- a/src/esm_tools/__init__.py +++ b/src/esm_tools/__init__.py @@ -40,6 +40,16 @@ from .error_handling import * +# Event-driven architecture support +try: + from .hooks import add_hook, trigger_hook +except ImportError: + # Hooks module not available, define no-op functions + def add_hook(*args, **kwargs): + pass + def trigger_hook(*args, **kwargs): + pass + # Setup Loguru for the following cases: # A) If user sets if os.environ.get("DEBUG_ESM_TOOLS"): diff --git a/src/esm_tools/hooks.py b/src/esm_tools/hooks.py new file mode 100644 index 000000000..d42e1930c --- /dev/null +++ b/src/esm_tools/hooks.py @@ -0,0 +1,58 @@ +""" +Minimal event hook system for esm-tools. + +This module provides a simple, thread-safe hook notification system with +zero dependencies beyond Python's standard library. It's designed to be +stable and maintainable with minimal API surface. +""" + +from typing import Callable, Dict, List +import threading +from datetime import datetime + + +class HookManager: + """ + A minimal hook manager with thread-safe hook dispatch. + + Hooks are identified by string names. Callbacks are called synchronously + in the order they were registered when a hook is triggered. + """ + + def __init__(self): + self._hooks: Dict[str, List[Callable]] = {} + self._lock = threading.Lock() + + def add_hook(self, hook: str, callback: Callable) -> None: + """Register a callback for a hook.""" + with self._lock: + if hook not in self._hooks: + self._hooks[hook] = [] + self._hooks[hook].append(callback) + + def trigger_hook(self, hook: str, *args, **kwargs) -> None: + """Trigger a hook, calling all registered callbacks.""" + with self._lock: + callbacks = self._hooks.get(hook, []).copy() + + for callback in callbacks: + try: + callback(*args, **kwargs) + except Exception: + # Silently ignore callback errors to prevent one bad + # callback from breaking the entire hook chain + pass + + +# Global singleton instance +_manager = HookManager() + + +def add_hook(hook: str, callback: Callable) -> None: + """Register a callback for a hook on the global manager.""" + _manager.add_hook(hook, callback) + + +def trigger_hook(hook: str, *args, **kwargs) -> None: + """Trigger a hook on the global manager.""" + _manager.trigger_hook(hook, *args, **kwargs) diff --git a/tests/test_hooks.py b/tests/test_hooks.py new file mode 100644 index 000000000..23d79b3f0 --- /dev/null +++ b/tests/test_hooks.py @@ -0,0 +1,97 @@ +""" +Unit tests for hook system +""" + +import pytest +from esm_tools.hooks import HookManager, add_hook, trigger_hook + + +class TestHookManager: + """Test HookManager class""" + + def test_add_hook(self): + """Test basic hook registration""" + manager = HookManager() + called = [] + + manager.add_hook("test:event", lambda: called.append(1)) + manager.trigger_hook("test:event") + + assert len(called) == 1 + + def test_hook_with_args(self): + """Test hooks receive arguments""" + manager = HookManager() + received = [] + + manager.add_hook("test:event", lambda x, y: received.append((x, y))) + manager.trigger_hook("test:event", 1, 2) + + assert received == [(1, 2)] + + def test_hook_with_kwargs(self): + """Test hooks receive keyword arguments""" + manager = HookManager() + received = [] + + manager.add_hook("test:event", lambda **kw: received.append(kw)) + manager.trigger_hook("test:event", a=1, b=2) + + assert received == [{"a": 1, "b": 2}] + + def test_multiple_hooks(self): + """Test multiple callbacks for same hook""" + manager = HookManager() + results = [] + + manager.add_hook("test:event", lambda: results.append("a")) + manager.add_hook("test:event", lambda: results.append("b")) + manager.trigger_hook("test:event") + + assert results == ["a", "b"] + + def test_error_isolation(self): + """Test one bad callback doesn't break others""" + manager = HookManager() + results = [] + + manager.add_hook("test:event", lambda: 1/0) # Will raise + manager.add_hook("test:event", lambda: results.append("ok")) + manager.trigger_hook("test:event") + + assert results == ["ok"] + + def test_nonexistent_hook(self): + """Test triggering nonexistent hook doesn't error""" + manager = HookManager() + manager.trigger_hook("nonexistent:hook") + # Should not raise + + +class TestGlobalHooks: + """Test global hook functions""" + + def test_global_add_hook(self): + """Test global add_hook function""" + called = [] + add_hook("global:test", lambda: called.append(1)) + trigger_hook("global:test") + + assert len(called) >= 1 # May have been called by other tests + + def test_hook_order(self): + """Test hooks called in registration order""" + results = [] + + add_hook("order:test", lambda: results.append(1)) + add_hook("order:test", lambda: results.append(2)) + add_hook("order:test", lambda: results.append(3)) + + trigger_hook("order:test") + + # Should be called in order [1, 2, 3] + assert results == [1, 2, 3] + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/utils/hooks_monitor.py b/utils/hooks_monitor.py new file mode 100644 index 000000000..d8495a129 --- /dev/null +++ b/utils/hooks_monitor.py @@ -0,0 +1,184 @@ +#!/usr/bin/env python +""" +Hook Event Monitor for ESM-Tools Testing + +This script registers hooks to monitor all events during an ESM-Tools run. +It logs events to both console and a JSON file for later analysis. + +Usage: + 1. Import this module at the start of your runscript or in esm_tools/__init__.py: + + import test_hooks_monitor + test_hooks_monitor.start_monitoring() + + 2. Or run it as a standalone script that monkey-patches the hooks system: + + python test_hooks_monitor.py + + 3. Then run your normal ESM-Tools command: + + esm_runscripts fesom2.6-albedo-hooks-test.yaml -e hooks_test_001 + +Events will be logged to: + - Console (STDOUT) + - hooks_events.json (JSON file for later analysis) +""" + +import json +import sys +from datetime import datetime +from pathlib import Path + +# Event storage +events_log = [] +log_file = Path("hooks_events.json") + +def log_event(event_name, **kwargs): + """Log hook events with timestamp and details.""" + event = { + "timestamp": datetime.now().isoformat(), + "event": event_name, + "details": {k: str(v)[:200] for k, v in kwargs.items() if k != 'config'} # Truncate long values + } + + events_log.append(event) + + # Pretty print to console + print(f"\n{'='*80}") + print(f"šŸŽÆ HOOK EVENT: {event_name}") + print(f" Time: {event['timestamp']}") + if event['details']: + print(f" Details:") + for key, value in event['details'].items(): + print(f" {key}: {value}") + print(f"{'='*80}\n") + sys.stdout.flush() + + # Save to JSON file + with open(log_file, 'w') as f: + json.dump(events_log, f, indent=2) + + +def start_monitoring(): + """Register hooks to monitor all ESM-Tools events.""" + try: + from esm_tools.hooks import add_hook + + # Register wildcard patterns for common event types + event_patterns = [ + "esm_master:*", + "esm_runscripts:recipe:*", + ] + + print("\n" + "="*80) + print("šŸ” HOOK MONITORING STARTED") + print(f" Log file: {log_file.absolute()}") + print(f" Monitoring patterns: {', '.join(event_patterns)}") + print("="*80 + "\n") + sys.stdout.flush() + + # Since we can't use wildcards in the current implementation, + # we'll create a general hook that captures everything + # This requires modifying the trigger_hook function or registering specific hooks + + # For now, let's register the most common hooks we expect: + common_hooks = [ + # esm_master hooks + "esm_master:get-fesom-2.6:start", + "esm_master:get-fesom-2.6:complete", + "esm_master:get-fesom-2.6:error", + "esm_master:comp-fesom-2.6:start", + "esm_master:comp-fesom-2.6:complete", + "esm_master:comp-fesom-2.6:error", + + # Recipe-level hooks + "esm_runscripts:recipe:prepexp:start", + "esm_runscripts:recipe:prepexp:complete", + "esm_runscripts:recipe:compute:start", + "esm_runscripts:recipe:compute:complete", + "esm_runscripts:recipe:tidy_and_resubmit:start", + "esm_runscripts:recipe:tidy_and_resubmit:complete", + + # Common recipe step hooks (you'll see more when you run it) + "esm_runscripts:recipe:prepexp:initialize_experiment_logfile:start", + "esm_runscripts:recipe:prepexp:initialize_experiment_logfile:complete", + "esm_runscripts:recipe:prepexp:add_batch_hostfile:start", + "esm_runscripts:recipe:prepexp:add_batch_hostfile:complete", + "esm_runscripts:recipe:compute:namelist_changes:start", + "esm_runscripts:recipe:compute:namelist_changes:complete", + "esm_runscripts:recipe:compute:copy_files_to_work:start", + "esm_runscripts:recipe:compute:copy_files_to_work:complete", + "esm_runscripts:recipe:compute:create_new_files:start", + "esm_runscripts:recipe:compute:create_new_files:complete", + "esm_runscripts:recipe:compute:prepare_coupler_files:start", + "esm_runscripts:recipe:compute:prepare_coupler_files:complete", + "esm_runscripts:recipe:compute:add_batch_hostfile:start", + "esm_runscripts:recipe:compute:add_batch_hostfile:complete", + "esm_runscripts:recipe:compute:write_simple_runscript:start", + "esm_runscripts:recipe:compute:write_simple_runscript:complete", + "esm_runscripts:recipe:compute:report_missing_files:start", + "esm_runscripts:recipe:compute:report_missing_files:complete", + "esm_runscripts:recipe:compute:wait_for_first_job:start", + "esm_runscripts:recipe:compute:wait_for_first_job:complete", + "esm_runscripts:recipe:compute:run_job:start", + "esm_runscripts:recipe:compute:run_job:complete", + "esm_runscripts:recipe:compute:log_finish_date:start", + "esm_runscripts:recipe:compute:log_finish_date:complete", + ] + + for hook in common_hooks: + add_hook(hook, log_event) + + print(f"āœ… Registered {len(common_hooks)} hook listeners") + print(" Run your ESM-Tools command now to see events!\n") + sys.stdout.flush() + + except ImportError as e: + print(f"āŒ ERROR: Could not import hooks system: {e}") + print(" Make sure you're running this from the esm_tools_edd environment") + sys.exit(1) + + +def print_summary(): + """Print summary of captured events.""" + if not events_log: + print("\nāš ļø No events captured") + return + + print("\n" + "="*80) + print(f"šŸ“Š EVENT SUMMARY") + print(f" Total events: {len(events_log)}") + print(f" Time range: {events_log[0]['timestamp']} to {events_log[-1]['timestamp']}") + print("="*80) + + # Count events by type + event_counts = {} + for event in events_log: + event_type = event['event'].split(':')[0] + ':' + event['event'].split(':')[1] + event_counts[event_type] = event_counts.get(event_type, 0) + 1 + + print("\nšŸ“ˆ Events by type:") + for event_type, count in sorted(event_counts.items()): + print(f" {event_type}: {count}") + + print(f"\nšŸ’¾ Full log saved to: {log_file.absolute()}\n") + + +if __name__ == "__main__": + # If run as a script, start monitoring and wait + start_monitoring() + + print("\nšŸ’” TIP: You can also import this module in your Python code:") + print(" >>> import test_hooks_monitor") + print(" >>> test_hooks_monitor.start_monitoring()") + print("\n Then run your ESM-Tools commands in the same Python session") + print("\n Or add it to esm_tools/__init__.py for automatic monitoring\n") + + # Keep running to maintain hooks + try: + import time + while True: + time.sleep(1) + except KeyboardInterrupt: + print_summary() + sys.exit(0)