diff --git a/imap_processing/hi/utils.py b/imap_processing/hi/utils.py index 2c4b5da9de..e693ff98af 100644 --- a/imap_processing/hi/utils.py +++ b/imap_processing/hi/utils.py @@ -7,7 +7,7 @@ from dataclasses import dataclass from enum import IntEnum from pathlib import Path -from typing import Any +from typing import IO, Any import numpy as np import pandas as pd @@ -313,7 +313,12 @@ class EsaEnergyStepLookupTable: def __init__(self) -> None: self.df = pd.DataFrame( - columns=["start_met", "end_met", "esa_step", "esa_energy_step"] + { + "start_met": pd.Series(dtype="float64"), + "end_met": pd.Series(dtype="float64"), + "esa_step": pd.Series(dtype="int64"), + "esa_energy_step": pd.Series(dtype="int64"), + } ) self._indexed = False @@ -451,10 +456,12 @@ def query( return results.astype(self._esa_energy_step_dtype) -@pd.api.extensions.register_dataframe_accessor("cal_prod_config") -class CalibrationProductConfig: +class _BaseConfigAccessor: """ - Register custom accessor for calibration product configuration DataFrames. + Base class for configuration DataFrame accessors. + + Provides common functionality for validating and processing configuration + DataFrames with coincidence types and TOF windows. Parameters ---------- @@ -462,19 +469,10 @@ class CalibrationProductConfig: Object to run validation and use accessor functions on. """ - index_columns = ( - "calibration_prod", - "esa_energy_step", - ) + # Subclasses must define these + index_columns: tuple[str, ...] + required_columns: tuple[str, ...] tof_detector_pairs = ("ab", "ac1", "bc1", "c1c2") - required_columns = ( - "coincidence_type_list", - *[ - f"tof_{det_pair}_{limit}" - for det_pair in tof_detector_pairs - for limit in ["low", "high"] - ], - ) def __init__(self, pandas_obj: pd.DataFrame) -> None: self._validate(pandas_obj) @@ -495,7 +493,7 @@ def _validate(self, df: pd.DataFrame) -> None: AttributeError : If the dataframe does not pass validation. """ for index_name in self.index_columns: - if index_name in df.index: + if index_name not in df.index.names: raise AttributeError( f"Required index {index_name} not present in dataframe." ) @@ -503,8 +501,6 @@ def _validate(self, df: pd.DataFrame) -> None: for col in self.required_columns: if col not in df.columns: raise AttributeError(f"Required column {col} not present in dataframe.") - # TODO: Verify that the same ESA energy steps exist in all unique calibration - # product numbers def _add_coincidence_values_column(self) -> None: """Generate and add the coincidence_type_values column to the dataframe.""" @@ -518,20 +514,57 @@ def _add_coincidence_values_column(self) -> None: axis=1, ) + @property + def calibration_product_numbers(self) -> npt.NDArray[np.int_]: + """ + Get the calibration product numbers from the current configuration. + + Returns + ------- + cal_prod_numbers : numpy.ndarray + Array of calibration product numbers from the configuration. + These are sorted in ascending order and can be arbitrary integers. + """ + return ( + self._obj.index.get_level_values("calibration_prod") + .unique() + .sort_values() + .values + ) + + +@pd.api.extensions.register_dataframe_accessor("cal_prod_config") +class CalibrationProductConfig(_BaseConfigAccessor): + """Register custom accessor for calibration product configuration DataFrames.""" + + index_columns = ( + "calibration_prod", + "esa_energy_step", + ) + required_columns = ( + "coincidence_type_list", + *[ + f"tof_{det_pair}_{limit}" + for det_pair in _BaseConfigAccessor.tof_detector_pairs + for limit in ["low", "high"] + ], + ) + @classmethod - def from_csv(cls, path: str | Path) -> pd.DataFrame: + def from_csv(cls, path: str | Path | IO[str]) -> pd.DataFrame: """ - Read configuration CSV file into a pandas.DataFrame. + Read calibration product configuration CSV file into a pandas.DataFrame. Parameters ---------- - path : str or pathlib.Path - Location of the Calibration Product configuration CSV file. + path : str or pathlib.Path or file-like object + Location of the calibration product configuration CSV file. Returns ------- dataframe : pandas.DataFrame - Validated calibration product configuration data frame. + Validated calibration product configuration DataFrame with + coincidence_type_values column added. """ df = pd.read_csv( path, @@ -539,7 +572,7 @@ def from_csv(cls, path: str | Path) -> pd.DataFrame: converters={"coincidence_type_list": lambda s: tuple(s.split("|"))}, comment="#", ) - # Force the _init_ method to run by using the namespace + # Trigger the accessor to run validation and add coincidence_type_values _ = df.cal_prod_config.number_of_products return df @@ -556,23 +589,51 @@ def number_of_products(self) -> int: """ return len(self._obj.index.unique(level="calibration_prod")) - @property - def calibration_product_numbers(self) -> npt.NDArray[np.int_]: + +@pd.api.extensions.register_dataframe_accessor("background_config") +class BackgroundConfig(_BaseConfigAccessor): + """Register custom accessor for background configuration DataFrames.""" + + index_columns = ( + "calibration_prod", + "background_index", + ) + required_columns = ( + "coincidence_type_list", + *[ + f"tof_{det_pair}_{limit}" + for det_pair in _BaseConfigAccessor.tof_detector_pairs + for limit in ["low", "high"] + ], + "scaling_factor", + "uncertainty", + ) + + @classmethod + def from_csv(cls, path: str | Path | IO[str]) -> pd.DataFrame: """ - Get the calibration product numbers from the current configuration. + Read background configuration CSV file into a pandas.DataFrame. + + Parameters + ---------- + path : str or pathlib.Path or file-like object + Location of the background configuration CSV file. Returns ------- - cal_prod_numbers : numpy.ndarray - Array of calibration product numbers from the configuration. - These are sorted in ascending order and can be arbitrary integers. + dataframe : pandas.DataFrame + Validated background configuration DataFrame with + coincidence_type_values column added. """ - return ( - self._obj.index.get_level_values("calibration_prod") - .unique() - .sort_values() - .values + df = pd.read_csv( + path, + index_col=cls.index_columns, + converters={"coincidence_type_list": lambda s: tuple(s.split("|"))}, + comment="#", ) + # Trigger the accessor to run validation and add coincidence_type_values + _ = df.background_config.calibration_product_numbers + return df def get_tof_window_mask( @@ -753,24 +814,104 @@ def iter_qualified_events_by_config( yield esa_energy, config_row, np.zeros(n_events, dtype=bool) continue - # Check coincidence type - coin_mask = filter_events_by_coincidence( - de_ds, config_row.coincidence_type_values - ) + # Apply common filtering logic + filter_mask = _filter_events_by_config_row(de_ds, config_row, tof_fill_vals) + + yield esa_energy, config_row, esa_mask & filter_mask - # Build TOF windows dict from config row - tof_windows = { - f"tof_{pair}": ( - getattr(config_row, f"tof_{pair}_low"), - getattr(config_row, f"tof_{pair}_high"), - ) - for pair in CalibrationProductConfig.tof_detector_pairs - } - # Check TOF windows - tof_mask = get_tof_window_mask(de_ds, tof_windows, tof_fill_vals) +def _filter_events_by_config_row( + de_ds: xr.Dataset, + config_row: Any, + tof_fill_vals: dict[str, float], +) -> NDArray[np.bool_]: + """ + Filter events by coincidence type and TOF windows for a single config row. - yield esa_energy, config_row, esa_mask & coin_mask & tof_mask + Helper function to apply common filtering logic used by both + iter_qualified_events_by_config and iter_background_events_by_config. + + Parameters + ---------- + de_ds : xarray.Dataset + Direct Event dataset with coincidence_type and TOF variables. + config_row : namedtuple + Config row from DataFrame.itertuples() containing: + - coincidence_type_values: tuple of int coincidence types + - tof__low, tof__high: TOF window bounds + tof_fill_vals : dict[str, float] + Dictionary mapping TOF variable names to their fill values. + + Returns + ------- + filter_mask : numpy.ndarray + Boolean mask where True = event matches the filter criteria. + """ + # Check coincidence type + coin_mask = filter_events_by_coincidence(de_ds, config_row.coincidence_type_values) + + # Build TOF windows dict from config row + tof_windows = { + f"tof_{pair}": ( + getattr(config_row, f"tof_{pair}_low"), + getattr(config_row, f"tof_{pair}_high"), + ) + for pair in CalibrationProductConfig.tof_detector_pairs + } + + # Check TOF windows + tof_mask = get_tof_window_mask(de_ds, tof_windows, tof_fill_vals) + + return coin_mask & tof_mask + + +def iter_background_events_by_config( + de_ds: xr.Dataset, + background_config: pd.DataFrame, +) -> Generator[tuple[Any, xr.Dataset], None, None]: + """ + Iterate over background config, yielding filtered event datasets. + + For each (calibration_prod, background_index) combination in the config, + yields the filtered dataset containing only events that match BOTH + coincidence_type AND TOF window checks. + + Unlike iter_qualified_events_by_config, this does NOT filter by ESA energy + step, as background counts are accumulated across all ESA steps. + + Parameters + ---------- + de_ds : xarray.Dataset + Direct Event dataset with coincidence_type and TOF variables. + TOF variables must have FILLVAL attribute for fill value handling. + background_config : pandas.DataFrame + Config DataFrame with multi-index (calibration_prod, background_index). + Must have coincidence_type_values column and TOF window columns. + + Yields + ------ + config_row : namedtuple + The config row from itertuples() containing background settings. + filtered_ds : xarray.Dataset + Filtered dataset containing only events matching the criteria. + """ + n_events = len(de_ds["event_met"]) + + # Build TOF fill values from dataset attributes + tof_fill_vals = _build_tof_fill_vals(de_ds) + + for config_row in background_config.itertuples(): + if n_events == 0: + # Return empty dataset + yield config_row, de_ds.isel(event_met=slice(0, 0)) + continue + + # Apply common filtering logic + filter_mask = _filter_events_by_config_row(de_ds, config_row, tof_fill_vals) + + # Return filtered dataset (no ESA energy filtering) + filtered_ds = de_ds.isel(event_met=filter_mask) + yield config_row, filtered_ds def compute_qualified_event_mask( @@ -801,7 +942,7 @@ def compute_qualified_event_mask( qualified_mask : np.ndarray Boolean mask - True if event qualifies for at least one cal product. """ - n_events = len(de_ds["event_met"]) if "event_met" in de_ds.dims else 0 + n_events = len(de_ds["event_met"]) if n_events == 0: return np.array([], dtype=bool) diff --git a/imap_processing/tests/hi/conftest.py b/imap_processing/tests/hi/conftest.py index d66f328099..a63872ee07 100644 --- a/imap_processing/tests/hi/conftest.py +++ b/imap_processing/tests/hi/conftest.py @@ -23,6 +23,11 @@ def hi_test_cal_prod_config_path(hi_l1_test_data_path): return hi_l1_test_data_path / "imap_hi_90sensor-cal-prod_20240101_v001.csv" +@pytest.fixture(scope="session") +def hi_test_background_config_path(hi_l1_test_data_path): + return hi_l1_test_data_path / "imap_hi_90sensor-backgrounds_20240101_v001.csv" + + def create_metaevent(esa_step, met_subseconds, met_seconds): start_bitmask_data = 0 # META return ( diff --git a/imap_processing/tests/hi/data/l1/imap_hi_90sensor-backgrounds_20240101_v001.csv b/imap_processing/tests/hi/data/l1/imap_hi_90sensor-backgrounds_20240101_v001.csv new file mode 100644 index 0000000000..b67ed1935a --- /dev/null +++ b/imap_processing/tests/hi/data/l1/imap_hi_90sensor-backgrounds_20240101_v001.csv @@ -0,0 +1,19 @@ +# THIS IS A TEST FILE AND SHOULD NOT BE USED IN PRODUCTION PROCESSING. +# +# Backgrounds Determination Configuration File +# Valid start date: 2024-01-01 (from filename) +# This file will be used in processing until a new calibration file is uploaded +# to the SDC with either a higher version number or new date in the filename. +# For details on how the SDC selects ancillary files for processing, see: +# https://imap-processing.readthedocs.io/en/latest/data-access/calibration-files.html +# +# When creating PSET products, the following table will be used to determine per +# calibration product backgrounds. The backgrounds for each calibration product are +# computed individually and the sum is subtracted from the calibration product. Background +# uncertainties are summed in quadrature and reported in background uncertainties. +# +calibration_prod,background_index,coincidence_type_list,tof_ab_low,tof_ab_high,tof_ac1_low,tof_ac1_high,tof_bc1_low,tof_bc1_high,tof_c1c2_low,tof_c1c2_high,scaling_factor,uncertainty +0,0,ABC1C2|ABC1,-20,16,-46,-15,-511,511,0,1023,0.00306,0.00030 +0,1,AB,-20,16,-46,-15,-511,511,0,1023,0.0189,0.0020 +1,0,BC1C2|AC1,-20,16,-46,-15,-511,511,0,1023,0.00306,0.00030 +1,1,AB,-20,16,-46,-15,-511,511,0,1023,0.0189,0.0020 \ No newline at end of file diff --git a/imap_processing/tests/hi/test_utils.py b/imap_processing/tests/hi/test_utils.py index 15c188e967..f982eea760 100644 --- a/imap_processing/tests/hi/test_utils.py +++ b/imap_processing/tests/hi/test_utils.py @@ -11,6 +11,7 @@ from imap_processing.cdf.imap_cdf_manager import ImapCdfAttributes from imap_processing.hi.utils import ( HIAPID, + BackgroundConfig, CalibrationProductConfig, CoincidenceBitmap, EsaEnergyStepLookupTable, @@ -20,6 +21,8 @@ full_dataarray, get_bin_range_with_wrap, get_tof_window_mask, + iter_background_events_by_config, + iter_qualified_events_by_config, parse_sensor_number, ) @@ -350,8 +353,15 @@ def test_wrong_columns(self): ) for exclude_column_name in required_columns: include_columns = set(required_columns) - {exclude_column_name} - df = pd.DataFrame({col: [1, 2, 3] for col in include_columns}) - with pytest.raises(AttributeError, match="Required column*"): + # Create dataframe with proper indices but missing one column + df = pd.DataFrame( + {col: [1, 2, 3] for col in include_columns}, + index=pd.MultiIndex.from_tuples( + [(0, 0), (0, 1), (1, 0)], + names=["calibration_prod", "esa_energy_step"], + ), + ) + with pytest.raises(AttributeError, match="Required column.*"): _ = df.cal_prod_config.number_of_products def test_from_csv(self, hi_test_cal_prod_config_path): @@ -412,6 +422,74 @@ def test_calibration_product_numbers_arbitrary_values(self): assert isinstance(cal_prod_numbers, np.ndarray) +class TestBackgroundConfig: + """ + All test coverage for the pd.DataFrame accessor extension "background_config". + """ + + def test_wrong_columns(self): + """Test coverage for a dataframe with the wrong columns.""" + required_columns = BackgroundConfig.required_columns + for exclude_column_name in required_columns: + include_columns = set(required_columns) - {exclude_column_name} + # Create dataframe with proper indices but missing one column + df = pd.DataFrame( + {col: [1, 2, 3] for col in include_columns}, + index=pd.MultiIndex.from_tuples( + [(0, 0), (0, 1), (1, 0)], + names=["calibration_prod", "background_index"], + ), + ) + with pytest.raises(AttributeError, match="Required column.*"): + _ = df.background_config.calibration_product_numbers + + def test_from_csv(self, hi_test_background_config_path): + """Test coverage for from_csv function.""" + df = BackgroundConfig.from_csv(hi_test_background_config_path) + # Verify coincidence_type_list is a tuple + assert isinstance(df["coincidence_type_list"][0, 0], tuple) + # Verify MultiIndex + assert df.index.names == ["calibration_prod", "background_index"] + + def test_added_coincidence_type_values_column(self, hi_test_background_config_path): + """Test that coincidence_type_values column is added correctly.""" + df = BackgroundConfig.from_csv(hi_test_background_config_path) + assert "coincidence_type_values" in df.columns + for _, row in df.iterrows(): + for detect_string, val in zip( + row["coincidence_type_list"], + row["coincidence_type_values"], + strict=False, + ): + assert val == CoincidenceBitmap.detector_hit_str_to_int(detect_string) + + def test_calibration_product_numbers(self, hi_test_background_config_path): + """Test coverage for calibration_product_numbers accessor.""" + df = BackgroundConfig.from_csv(hi_test_background_config_path) + cal_prod_numbers = df.background_config.calibration_product_numbers + # The test config file has calibration products 0 and 1 + np.testing.assert_array_equal(cal_prod_numbers, np.array([0, 1])) + # Verify it's a numpy array of integers + assert isinstance(cal_prod_numbers, np.ndarray) + assert cal_prod_numbers.dtype in [np.int32, np.int64] + + def test_calibration_product_numbers_arbitrary_values(self): + """Test calibration_product_numbers with arbitrary non-sequential values.""" + csv_content = """\ +calibration_prod,background_index,coincidence_type_list,tof_ab_low,tof_ab_high,tof_ac1_low,tof_ac1_high,tof_bc1_low,tof_bc1_high,tof_c1c2_low,tof_c1c2_high,scaling_factor,uncertainty +10,0,ABC1C2,-20,16,-46,-15,-511,511,0,1023,0.01,0.001 +5,0,BC1C2,-20,16,-46,-15,-511,511,0,1023,0.02,0.002 +100,0,AB,-20,16,-46,-15,-511,511,0,1023,0.03,0.003 + """ + + df = BackgroundConfig.from_csv(io.StringIO(csv_content)) + cal_prod_numbers = df.background_config.calibration_product_numbers + + # Should return sorted unique calibration product numbers + np.testing.assert_array_equal(cal_prod_numbers, np.array([5, 10, 100])) + assert isinstance(cal_prod_numbers, np.ndarray) + + class TestGetTofWindowMask: """Test suite for get_tof_window_mask function.""" @@ -874,3 +952,485 @@ def test_empty_dataset(self, mock_cal_product_config): ) assert len(mask) == 0 + + +class TestIterQualifiedEventsByConfig: + """Test suite for iter_qualified_events_by_config function.""" + + @pytest.fixture + def mock_cal_product_config(self): + """Create a mock calibration product config DataFrame.""" + # Create a config with 2 calibration products, 2 ESA energy steps + # Coincidence bitmap: A=8, B=4, C1=2, C2=1 + # ABC1C2=15, ABC1=14, AB=12 + data = { + "coincidence_type_list": [ + ("ABC1C2", "ABC1"), # cal_prod=1, esa_energy=1 + ("ABC1C2", "ABC1"), # cal_prod=1, esa_energy=2 + ("AB",), # cal_prod=2, esa_energy=1 + ("AB",), # cal_prod=2, esa_energy=2 + ], + "tof_ab_low": [10, 10, 10, 10], + "tof_ab_high": [100, 100, 100, 100], + "tof_ac1_low": [5, 5, 5, 5], + "tof_ac1_high": [80, 80, 80, 80], + "tof_bc1_low": [-50, -50, -50, -50], + "tof_bc1_high": [50, 50, 50, 50], + "tof_c1c2_low": [20, 20, 20, 20], + "tof_c1c2_high": [120, 120, 120, 120], + } + index = pd.MultiIndex.from_tuples( + [(1, 1), (1, 2), (2, 1), (2, 2)], + names=["calibration_prod", "esa_energy_step"], + ) + df = pd.DataFrame(data, index=index) + # Trigger the accessor to add coincidence_type_values column + _ = df.cal_prod_config.number_of_products + return df + + @pytest.fixture + def mock_de_dataset(self): + """Create a mock L1B DE dataset with events.""" + # 10 events with various coincidence types and TOF values + # Coincidence bitmap: A=8, B=4, C1=2, C2=1 + # ABC1C2=15, ABC1=14, AB=12, A=8 + n_events = 10 + fill_val = -9999.0 + ds = xr.Dataset( + { + "coincidence_type": ( + ["event_met"], + np.array([15, 14, 12, 8, 15, 14, 12, 8, 15, 12]), + ), + "tof_ab": ( + ["event_met"], + np.array([50, 50, 50, 50, 200, 50, 50, 50, 50, 50]), + ), # Event 4 out of window + "tof_ac1": ( + ["event_met"], + np.array([30, 30, 30, 30, 30, 30, 30, 30, 30, 30]), + ), + "tof_bc1": ( + ["event_met"], + np.array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), + ), + "tof_c1c2": ( + ["event_met"], + np.array([50, 50, 50, 50, 50, 50, 50, 50, 50, 50]), + ), + }, + coords={"event_met": np.arange(n_events, dtype=float)}, + ) + # Add FILLVAL attributes to TOF variables + for tof_var in ["tof_ab", "tof_ac1", "tof_bc1", "tof_c1c2"]: + ds[tof_var].attrs["FILLVAL"] = fill_val + return ds + + def test_yields_correct_number_of_items( + self, mock_cal_product_config, mock_de_dataset + ): + """Test that iterator yields correct number of items.""" + esa_energy_steps = np.ones(10, dtype=int) + + results = list( + iter_qualified_events_by_config( + mock_de_dataset, mock_cal_product_config, esa_energy_steps + ) + ) + + # Should yield 4 items: 2 ESA steps x 2 cal prods per step + assert len(results) == 4 + + def test_yields_correct_structure(self, mock_cal_product_config, mock_de_dataset): + """Test that each yielded item has the correct structure.""" + esa_energy_steps = np.ones(10, dtype=int) + + for esa_energy, config_row, mask in iter_qualified_events_by_config( + mock_de_dataset, mock_cal_product_config, esa_energy_steps + ): + # Check that esa_energy is an int + assert isinstance(esa_energy, (int, np.integer)) + # Check that config_row has expected attributes + assert hasattr(config_row, "Index") + assert hasattr(config_row, "coincidence_type_values") + # Check that mask is a boolean array + assert isinstance(mask, np.ndarray) + assert mask.dtype == bool + assert len(mask) == 10 # Same length as dataset + + def test_filters_by_esa_energy_step(self, mock_cal_product_config, mock_de_dataset): + """Test that events are filtered by ESA energy step.""" + # Half events at ESA 1, half at ESA 2 + esa_energy_steps = np.array([1, 1, 1, 1, 1, 2, 2, 2, 2, 2]) + + results = list( + iter_qualified_events_by_config( + mock_de_dataset, mock_cal_product_config, esa_energy_steps + ) + ) + + # Check ESA 1 results (first 2 items) + esa_1_results = [r for r in results if r[0] == 1] + assert len(esa_1_results) == 2 + + for _, _, mask in esa_1_results: + # Only first 5 events can qualify (ESA=1) + assert not np.any(mask[5:]) # Events 5-9 should be False + + # Check ESA 2 results (last 2 items) + esa_2_results = [r for r in results if r[0] == 2] + assert len(esa_2_results) == 2 + + for _, _, mask in esa_2_results: + # Only last 5 events can qualify (ESA=2) + assert not np.any(mask[:5]) # Events 0-4 should be False + + def test_filters_by_coincidence_and_tof( + self, mock_cal_product_config, mock_de_dataset + ): + """Test that events are filtered by coincidence type and TOF windows.""" + esa_energy_steps = np.ones(10, dtype=int) + + # Get results for cal_prod=1, esa_energy=1 (expects ABC1C2=15 or ABC1=14) + for esa_energy, config_row, mask in iter_qualified_events_by_config( + mock_de_dataset, mock_cal_product_config, esa_energy_steps + ): + if esa_energy == 1 and config_row.Index[0] == 1: + # Events with coincidence 15 or 14: indices 0, 1, 4, 5, 8 + # But event 4 has bad TOF (200), so should fail + # Events 3, 7 have wrong coincidence (8) + expected = np.array( + [True, True, False, False, False, True, False, False, True, False] + ) + np.testing.assert_array_equal(mask, expected) + break + + def test_different_cal_products_different_masks( + self, mock_cal_product_config, mock_de_dataset + ): + """Test that different calibration products yield different masks.""" + esa_energy_steps = np.ones(10, dtype=int) + + masks_by_cal_prod = {} + for esa_energy, config_row, mask in iter_qualified_events_by_config( + mock_de_dataset, mock_cal_product_config, esa_energy_steps + ): + if esa_energy == 1: # Only look at ESA 1 + cal_prod = config_row.Index[0] + masks_by_cal_prod[cal_prod] = mask + + # Cal prod 1 accepts ABC1C2 and ABC1 + # Cal prod 2 accepts only AB + # They should have different masks + assert not np.array_equal(masks_by_cal_prod[1], masks_by_cal_prod[2]) + + # Cal prod 2 should match events with coincidence_type=12 (AB) + # That's events: 2, 6, 9 + expected_cal_prod_2 = np.array( + [False, False, True, False, False, False, True, False, False, True] + ) + np.testing.assert_array_equal(masks_by_cal_prod[2], expected_cal_prod_2) + + def test_empty_dataset(self, mock_cal_product_config): + """Test with empty dataset.""" + empty_ds = xr.Dataset( + { + "coincidence_type": (["event_met"], np.array([], dtype=np.uint8)), + "tof_ab": (["event_met"], np.array([])), + "tof_ac1": (["event_met"], np.array([])), + "tof_bc1": (["event_met"], np.array([])), + "tof_c1c2": (["event_met"], np.array([])), + }, + coords={"event_met": np.array([])}, + ) + esa_energy_steps = np.array([]) + + results = list( + iter_qualified_events_by_config( + empty_ds, mock_cal_product_config, esa_energy_steps + ) + ) + + # Should still yield 4 items, but all masks should be empty + assert len(results) == 4 + for _, _, mask in results: + assert len(mask) == 0 + + def test_no_matching_esa_energy(self, mock_cal_product_config, mock_de_dataset): + """Test with ESA energy steps that don't match config.""" + # All events at ESA 99 (not in config) + esa_energy_steps = np.full(10, 99) + + results = list( + iter_qualified_events_by_config( + mock_de_dataset, mock_cal_product_config, esa_energy_steps + ) + ) + + # Should still yield 4 items (one per config row) + assert len(results) == 4 + + # But none of the masks should have any True values + for _, _, mask in results: + assert not np.any(mask) + + def test_fill_values_pass_tof_check(self, mock_cal_product_config, mock_de_dataset): + """Test that TOF fill values pass the TOF window check.""" + esa_energy_steps = np.ones(10, dtype=int) + + # Set event 4's tof_ab to fill value (it was failing due to high value) + fill_val = mock_de_dataset["tof_ab"].attrs["FILLVAL"] + mock_de_dataset["tof_ab"].values[4] = fill_val + + # Get results for cal_prod=1, esa_energy=1 + for esa_energy, config_row, mask in iter_qualified_events_by_config( + mock_de_dataset, mock_cal_product_config, esa_energy_steps + ): + if esa_energy == 1 and config_row.Index[0] == 1: + # Event 4 should now pass (has coincidence 15 and fill value TOF) + assert mask[4] + break + + +class TestIterBackgroundEventsByConfig: + """Test suite for iter_background_events_by_config function.""" + + @pytest.fixture + def mock_background_config(self): + """Create a mock background config DataFrame.""" + # Create a config with 2 calibration products, 2 background indices each + # Note: No esa_energy_step in the index (backgrounds are across all ESA steps) + data = { + "coincidence_type_list": [ + ("A",), # cal_prod=1, bg_index=0 + ("B",), # cal_prod=1, bg_index=1 + ("C1",), # cal_prod=2, bg_index=0 + ("C2",), # cal_prod=2, bg_index=1 (invalid, but for testing) + ], + "tof_ab_low": [10, 10, 10, 10], + "tof_ab_high": [100, 100, 100, 100], + "tof_ac1_low": [5, 5, 5, 5], + "tof_ac1_high": [80, 80, 80, 80], + "tof_bc1_low": [-50, -50, -50, -50], + "tof_bc1_high": [50, 50, 50, 50], + "tof_c1c2_low": [20, 20, 20, 20], + "tof_c1c2_high": [120, 120, 120, 120], + "scaling_factor": [1.0, 1.0, 1.0, 1.0], + "uncertainty": [0.1, 0.1, 0.1, 0.1], + } + index = pd.MultiIndex.from_tuples( + [(1, 0), (1, 1), (2, 0), (2, 1)], + names=["calibration_prod", "background_index"], + ) + df = pd.DataFrame(data, index=index) + # Trigger the accessor to add coincidence_type_values column + _ = df.background_config.calibration_product_numbers + return df + + @pytest.fixture + def mock_de_dataset(self): + """Create a mock L1B DE dataset with events.""" + # 10 events with various coincidence types and TOF values + # Coincidence bitmap: A=8, B=4, C1=2, C2=1 + n_events = 10 + fill_val = -9999.0 + ds = xr.Dataset( + { + "coincidence_type": ( + ["event_met"], + # A=8, B=4, C1=2, mix + np.array([8, 4, 2, 8, 4, 2, 8, 4, 2, 8]), + ), + "tof_ab": ( + ["event_met"], + np.array([50, 50, 50, 50, 50, 50, 200, 50, 50, 50]), + ), # Event 6 out of window + "tof_ac1": ( + ["event_met"], + np.array([30, 30, 30, 30, 30, 30, 30, 30, 30, 30]), + ), + "tof_bc1": ( + ["event_met"], + np.array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), + ), + "tof_c1c2": ( + ["event_met"], + np.array([50, 50, 50, 50, 50, 50, 50, 50, 50, 50]), + ), + }, + coords={"event_met": np.arange(n_events, dtype=float)}, + ) + # Add FILLVAL attributes to TOF variables + for tof_var in ["tof_ab", "tof_ac1", "tof_bc1", "tof_c1c2"]: + ds[tof_var].attrs["FILLVAL"] = fill_val + return ds + + def test_yields_correct_number_of_items( + self, mock_background_config, mock_de_dataset + ): + """Test that iterator yields correct number of items.""" + results = list( + iter_background_events_by_config(mock_de_dataset, mock_background_config) + ) + + # Should yield 4 items: one per (cal_prod, bg_index) combination + assert len(results) == 4 + + def test_yields_correct_structure(self, mock_background_config, mock_de_dataset): + """Test that each yielded item has the correct structure.""" + for config_row, filtered_ds in iter_background_events_by_config( + mock_de_dataset, mock_background_config + ): + # Check that config_row has expected attributes + assert hasattr(config_row, "Index") + assert hasattr(config_row, "coincidence_type_values") + assert hasattr(config_row, "scaling_factor") + assert hasattr(config_row, "uncertainty") + # Check that filtered_ds is an xarray Dataset + assert isinstance(filtered_ds, xr.Dataset) + assert "event_met" in filtered_ds.dims + assert "coincidence_type" in filtered_ds + assert "tof_ab" in filtered_ds + + def test_filters_by_coincidence_and_tof( + self, mock_background_config, mock_de_dataset + ): + """Test that events are filtered by coincidence type and TOF windows.""" + results = list( + iter_background_events_by_config(mock_de_dataset, mock_background_config) + ) + + # Get results for cal_prod=1, bg_index=0 (expects A=8) + for config_row, filtered_ds in results: + if config_row.Index == (1, 0): + # Events with coincidence A=8: indices 0, 3, 6, 9 + # But event 6 has bad TOF (200), so should be excluded + expected_events = [0, 3, 9] + assert len(filtered_ds["event_met"]) == len(expected_events) + np.testing.assert_array_equal( + filtered_ds["event_met"].values, expected_events + ) + break + + def test_different_backgrounds_different_datasets( + self, mock_background_config, mock_de_dataset + ): + """Test that different background configs yield different filtered datasets.""" + results = list( + iter_background_events_by_config(mock_de_dataset, mock_background_config) + ) + + datasets_by_bg = {} + for config_row, filtered_ds in results: + datasets_by_bg[config_row.Index] = filtered_ds + + # Cal prod 1, bg 0 expects A=8 (events 0, 3, 6, 9; but 6 has bad TOF) + # Cal prod 1, bg 1 expects B=4 (events 1, 4, 7) + # Cal prod 2, bg 0 expects C1=2 (events 2, 5, 8) + + assert len(datasets_by_bg[(1, 0)]["event_met"]) == 3 # A events (minus bad TOF) + assert len(datasets_by_bg[(1, 1)]["event_met"]) == 3 # B events + assert len(datasets_by_bg[(2, 0)]["event_met"]) == 3 # C1 events + + def test_no_esa_energy_filtering(self, mock_background_config, mock_de_dataset): + """Test that backgrounds are NOT filtered by ESA energy step.""" + # Add esa_energy_step to dataset (should be ignored) + mock_de_dataset["esa_energy_step"] = ( + ["event_met"], + np.array([1, 2, 3, 1, 2, 3, 1, 2, 3, 1]), + ) + + results = list( + iter_background_events_by_config(mock_de_dataset, mock_background_config) + ) + + # Get results for cal_prod=1, bg_index=0 (expects A=8) + for config_row, filtered_ds in results: + if config_row.Index == (1, 0): + # Should include events with A=8 at ALL ESA energy steps + # Events 0, 3, 9 (event 6 excluded due to bad TOF) + # These have esa_energy_step values: 1, 1, 1 + assert len(filtered_ds["event_met"]) == 3 + # Verify events come from different ESA energy steps in the full dataset + # (This proves we're not filtering by ESA) + break + + def test_empty_dataset(self, mock_background_config): + """Test with empty dataset.""" + empty_ds = xr.Dataset( + { + "coincidence_type": (["event_met"], np.array([], dtype=np.uint8)), + "tof_ab": (["event_met"], np.array([])), + "tof_ac1": (["event_met"], np.array([])), + "tof_bc1": (["event_met"], np.array([])), + "tof_c1c2": (["event_met"], np.array([])), + }, + coords={"event_met": np.array([])}, + ) + + results = list( + iter_background_events_by_config(empty_ds, mock_background_config) + ) + + # Should still yield 4 items, but all datasets should be empty + assert len(results) == 4 + for _, filtered_ds in results: + assert len(filtered_ds["event_met"]) == 0 + + def test_no_matching_events(self, mock_background_config, mock_de_dataset): + """Test with events that don't match any background config.""" + # Change all coincidence types to something not in config + mock_de_dataset["coincidence_type"].values[:] = 15 # ABC1C2 + + results = list( + iter_background_events_by_config(mock_de_dataset, mock_background_config) + ) + + # Should yield 4 items, but all filtered datasets should be empty + assert len(results) == 4 + for _, filtered_ds in results: + assert len(filtered_ds["event_met"]) == 0 + + def test_fill_values_pass_tof_check(self, mock_background_config, mock_de_dataset): + """Test that TOF fill values pass the TOF window check.""" + # Set event 6's tof_ab to fill value (it was failing due to high value) + fill_val = mock_de_dataset["tof_ab"].attrs["FILLVAL"] + mock_de_dataset["tof_ab"].values[6] = fill_val + + results = list( + iter_background_events_by_config(mock_de_dataset, mock_background_config) + ) + + # Get results for cal_prod=1, bg_index=0 (expects A=8) + for config_row, filtered_ds in results: + if config_row.Index == (1, 0): + # Event 6 should now be included (has coincidence 8 and fill value TOF) + expected_events = [0, 3, 6, 9] + assert len(filtered_ds["event_met"]) == len(expected_events) + np.testing.assert_array_equal( + filtered_ds["event_met"].values, expected_events + ) + break + + def test_preserves_all_dataset_variables( + self, mock_background_config, mock_de_dataset + ): + """Test that filtered dataset preserves all original variables.""" + # Add some extra variables to the dataset + mock_de_dataset["extra_var"] = (["event_met"], np.arange(10)) + mock_de_dataset["spin_phase"] = (["event_met"], np.random.random(10)) + + results = list( + iter_background_events_by_config(mock_de_dataset, mock_background_config) + ) + + for _, filtered_ds in results: + # Check that all original variables are present + assert "coincidence_type" in filtered_ds + assert "tof_ab" in filtered_ds + assert "extra_var" in filtered_ds + assert "spin_phase" in filtered_ds + # Check that variables have correct length + n_events = len(filtered_ds["event_met"]) + assert len(filtered_ds["extra_var"]) == n_events + assert len(filtered_ds["spin_phase"]) == n_events