Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 31 additions & 4 deletions imap_processing/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -879,11 +879,36 @@ def do_processing( # noqa: PLR0912
f"Expected exactly one DE science dependency. "
f"Got {l1b_de_paths}"
)
anc_paths = dependencies.get_file_paths(data_type="ancillary")
if len(anc_paths) != 1:

# Get ancillary dependencies
anc_dependencies = dependencies.get_processing_inputs(
data_type="ancillary"
)
if len(anc_dependencies) != 2:
raise ValueError(
f"Expected two ancillary dependencies (cal-prod and "
f"backgrounds). Got "
f"{[anc_dep.descriptor for anc_dep in anc_dependencies]}"
)

# Create mapping from descriptor to path
anc_path_dict = {
dep.descriptor.split("-", 1)[1]: dep.imap_file_paths[
0
].construct_path()
for dep in anc_dependencies
}

# Verify we have both required ancillary files
if (
"cal-prod" not in anc_path_dict
or "backgrounds" not in anc_path_dict
):
raise ValueError(
f"Expected exactly one ancillary dependency. Got {anc_paths}"
f"Missing required ancillary files. Expected 'cal-prod' and "
f"'backgrounds', got {list(anc_path_dict.keys())}"
)
Comment on lines +894 to 910
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

anc_path_dict derives keys via dep.descriptor.split("-", 1)[1], which will misbehave or raise IndexError if the descriptor format differs (e.g., missing sensor prefix or additional hyphens). To avoid fragile parsing, prefer a safer normalization (e.g., strip leading "45sensor-"/"90sensor-" when present, otherwise keep descriptor as-is) or map using the file’s parsed name fields, and raise a clear ValueError when required descriptors can’t be determined.

Copilot uses AI. Check for mistakes.

# Load goodtimes dependency
goodtimes_paths = dependencies.get_file_paths(
source="hi", data_type="l1b", descriptor="goodtimes"
Expand All @@ -893,10 +918,12 @@ def do_processing( # noqa: PLR0912
f"Expected exactly one goodtimes dependency. "
f"Got {goodtimes_paths}"
)

datasets = hi_l1c.hi_l1c(
load_cdf(l1b_de_paths[0]),
anc_paths[0],
anc_path_dict["cal-prod"],
load_cdf(goodtimes_paths[0]),
anc_path_dict["backgrounds"],
)
elif self.data_level == "l2":
science_paths = dependencies.get_file_paths(source="hi", data_type="l1c")
Expand Down
245 changes: 227 additions & 18 deletions imap_processing/hi/hi_l1c.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
from imap_processing.cdf.imap_cdf_manager import ImapCdfAttributes
from imap_processing.cdf.utils import parse_filename_like
from imap_processing.hi.utils import (
BackgroundConfig,
CalibrationProductConfig,
HiConstants,
create_dataset_variables,
full_dataarray,
iter_background_events_by_config,
iter_qualified_events_by_config,
parse_sensor_number,
)
Expand Down Expand Up @@ -44,6 +46,7 @@ def hi_l1c(
de_dataset: xr.Dataset,
calibration_prod_config_path: Path,
goodtimes_ds: xr.Dataset,
background_config_path: Path,
) -> list[xr.Dataset]:
"""
High level IMAP-Hi l1c processing function.
Expand All @@ -56,6 +59,8 @@ def hi_l1c(
Calibration product configuration file.
goodtimes_ds : xarray.Dataset
Goodtimes dataset with cull_flags.
background_config_path : pathlib.Path
Background configuration file.

Returns
-------
Expand All @@ -65,7 +70,7 @@ def hi_l1c(
logger.info("Running Hi l1c processing")

l1c_dataset = generate_pset_dataset(
de_dataset, calibration_prod_config_path, goodtimes_ds
de_dataset, calibration_prod_config_path, goodtimes_ds, background_config_path
)

return [l1c_dataset]
Expand All @@ -75,6 +80,7 @@ def generate_pset_dataset(
de_dataset: xr.Dataset,
calibration_prod_config_path: Path,
goodtimes_ds: xr.Dataset,
background_config_path: Path,
) -> xr.Dataset:
"""
Generate IMAP-Hi l1c pset xarray dataset from l1b product.
Expand All @@ -87,6 +93,8 @@ def generate_pset_dataset(
Calibration product configuration file.
goodtimes_ds : xarray.Dataset
Goodtimes dataset with cull_flags.
background_config_path : pathlib.Path
Background configuration file.

Returns
-------
Expand All @@ -100,6 +108,8 @@ def generate_pset_dataset(
logical_source_parts = parse_filename_like(de_dataset.attrs["Logical_source"])
# read calibration product configuration file
config_df = CalibrationProductConfig.from_csv(calibration_prod_config_path)
# read background configuration file
background_df = BackgroundConfig.from_csv(background_config_path)

pset_dataset = empty_pset_dataset(
de_dataset.ccsds_met.data.mean(),
Expand All @@ -119,8 +129,17 @@ def generate_pset_dataset(
)
# Calculate and add the exposure time to the pset_dataset
pset_dataset.update(pset_exposure(pset_dataset.coords, de_dataset, goodtimes_ds))
# Get the backgrounds
pset_dataset.update(pset_backgrounds(pset_dataset.coords))

# Compute backgrounds (background counts computed internally)
pset_dataset.update(
pset_backgrounds(
pset_dataset.coords,
background_df,
de_dataset,
goodtimes_ds,
pset_dataset["exposure_times"],
)
)

return pset_dataset

Expand Down Expand Up @@ -347,10 +366,9 @@ def pset_counts(
Returns
-------
dict[str, xarray.DataArray]
Dictionary containing new exposure_times DataArray to be added to the PSET
dataset.
Dictionary containing counts DataArray.
"""
# Generate exposure time variable filled with zeros
# Generate counts variable filled with zeros
counts_var = create_dataset_variables(
["counts"],
coords=pset_coords,
Expand Down Expand Up @@ -417,43 +435,234 @@ def pset_counts(
spin_bin_indices,
1,
)

return counts_var


def pset_backgrounds(pset_coords: dict[str, xr.DataArray]) -> dict[str, xr.DataArray]:
def _compute_background_counts(
pset_coords: dict[str, xr.DataArray],
background_config_df: pd.DataFrame,
l1b_de_dataset: xr.Dataset,
goodtimes_ds: xr.Dataset,
) -> xr.DataArray:
"""
Compute background counts by filtering and binning direct events.

Background counts are computed across all esa_energy_steps and spin_angle_bins
since backgrounds are isotropic and do not depend on ESA energy step or spin angle.

Parameters
----------
pset_coords : dict[str, xarray.DataArray]
The PSET coordinates from the xarray.Dataset.
background_config_df : pandas.DataFrame
Background configuration DataFrame with MultiIndex
(calibration_prod, background_index).
l1b_de_dataset : xarray.Dataset
The L1B dataset for the pointing being processed.
goodtimes_ds : xarray.Dataset
Goodtimes dataset with cull_flags.

Returns
-------
xarray.DataArray
Background counts with dims (epoch, calibration_prod, background_index).
"""
# Create background_counts as xarray DataArray with proper coordinates
# Note: esa_energy_step and spin_angle_bin are NOT included since backgrounds
# are isotropic and computed across all ESA steps and spin angles
background_indices = (
background_config_df.index.get_level_values("background_index")
.unique()
.sort_values()
.values
)

bg_coords = {
"epoch": pset_coords["epoch"],
"calibration_prod": pset_coords["calibration_prod"],
"background_index": background_indices,
}

background_counts = xr.DataArray(
np.zeros(
(
len(bg_coords["epoch"]),
len(bg_coords["calibration_prod"]),
len(bg_coords["background_index"]),
),
),
dims=[
"epoch",
"calibration_prod",
"background_index",
],
coords=bg_coords,
)

# Process direct events
de_ds = l1b_de_dataset.drop_dims("epoch")

good_mask = de_ds["trigger_id"].data != de_ds["trigger_id"].attrs["FILLVAL"]
if not np.any(good_mask):
return background_counts

if not np.all(good_mask):
raise ValueError(
"An event with trigger_id=FILLVAL should only occur for a pointing "
"with no events that gets a single fill event. Events with mixed "
"valid and FILLVAL trigger_ids found."
)

# Remove DEs not in Goodtimes/angles
goodtimes_mask = good_time_and_phase_mask(
de_ds.event_met.values,
de_ds.nominal_bin.values,
goodtimes_ds,
)
de_ds = de_ds.isel(event_met=goodtimes_mask)

n_events = len(de_ds["event_met"])
if n_events == 0:
return background_counts

for cal_prod in pset_coords["calibration_prod"].values:
# Check that cal_prod exists in background_config_df
if cal_prod not in background_config_df.index.get_level_values(
"calibration_prod"
):
raise ValueError(
f"Calibration product {cal_prod} not found in background "
f"configuration. Available calibration products: "
f"{sorted(background_config_df.index.get_level_values('calibration_prod').unique().tolist())}"
)

# Take a cross-section of the background configuration DataFrame
# to get rows relevant to the current calibration product
cal_prod_rows = background_config_df.xs(cal_prod, level="calibration_prod")

# Use iter_background_events_by_config to get filtered events
for config_row, filtered_de_ds in iter_background_events_by_config(
de_ds, cal_prod_rows
):
background_idx = config_row.Index

if len(filtered_de_ds["event_met"]) == 0:
continue

# Count all filtered events
# (no binning by spin angle since backgrounds are isotropic)
count = len(filtered_de_ds["event_met"])

background_counts.loc[
dict(
epoch=pset_coords["epoch"].values[0],
calibration_prod=cal_prod,
background_index=background_idx,
)
] += count

return background_counts


def pset_backgrounds(
pset_coords: dict[str, xr.DataArray],
background_config_df: pd.DataFrame,
l1b_de_dataset: xr.Dataset,
goodtimes_ds: xr.Dataset,
exposure_times: xr.DataArray,
) -> dict[str, xr.DataArray]:
"""
Calculate pointing set backgrounds and background uncertainties.
Calculate pointing set backgrounds from direct events.

Computes background counts internally by filtering and binning events
according to the background configuration, then calculates background
rates and uncertainties.

Parameters
----------
pset_coords : dict[str, xarray.DataArray]
The PSET coordinates from the xarray.Dataset.
background_config_df : pandas.DataFrame
Background configuration DataFrame with MultiIndex
(calibration_prod, background_index).
l1b_de_dataset : xarray.Dataset
The L1B dataset for the pointing being processed.
goodtimes_ds : xarray.Dataset
Goodtimes dataset with cull_flags.
exposure_times : xarray.DataArray
Exposure times with dims (epoch, esa_energy_step, spin_angle_bin).

Returns
-------
dict[str, xarray.DataArray]
Dictionary containing background_rates and background_rates_unc DataArrays
to be added to the PSET dataset.
Dictionary containing background_rates and background_rates_uncertainty
DataArrays to be added to the PSET dataset.
"""
# TODO: This is just a placeholder setting backgrounds to zero. The background
# algorithm will be determined in flight.
attr_mgr = ImapCdfAttributes()
attr_mgr.add_instrument_global_attrs("hi")
attr_mgr.add_instrument_variable_attrs(instrument="hi", level=None)

return {
# Create output arrays
output_vars = {
var_name: full_dataarray(
var_name,
attr_mgr.get_variable_attributes(f"hi_pset_{var_name}", check_schema=False),
pset_coords,
fill_value=fill_val,
)
for var_name, fill_val in [
("background_rates", 0),
("background_rates_uncertainty", 1),
]
for var_name in ["background_rates", "background_rates_uncertainty"]
}

# Get total exposure time
total_exposure_time = float(exposure_times.sum())

if total_exposure_time <= 0:
output_vars["background_rates"].values[:] = 0
output_vars["background_rates_uncertainty"].values[:] = 0
return output_vars

# Compute background counts
background_counts = _compute_background_counts(
pset_coords, background_config_df, l1b_de_dataset, goodtimes_ds
)

# background_counts already has shape (epoch, calibration_prod, background_index)
# (no summing needed since spin_angle_bin dimension was never included)

# Compute count rates: shape (epoch, calibration_prod, background_index)
count_rates = background_counts / total_exposure_time

# Convert background config DataFrame to xarray Dataset
config_ds = background_config_df.to_xarray()
scaling_factors_da = config_ds["scaling_factor"]
uncertainties_da = config_ds["uncertainty"]

# Compute scaled rates
scaled_rates = count_rates * scaling_factors_da

# Compute uncertainties (Poisson + scaling factor, combined in quadrature)
poisson_unc = (
np.sqrt(background_counts) / total_exposure_time
) * scaling_factors_da
scaling_unc = count_rates * uncertainties_da
combined_unc = np.sqrt(poisson_unc**2 + scaling_unc**2)

# Sum over background_index dimension to get final rates
total_rates = scaled_rates.sum(dim="background_index", skipna=True)
total_unc = np.sqrt((combined_unc**2).sum(dim="background_index", skipna=True))

# Broadcast to (epoch, esa_energy_step, calibration_prod, spin_angle_bin)
# Backgrounds are isotropic and independent of ESA step, so we
# broadcast across esa_energy_step and spin_angle_bin dimensions.
output_vars["background_rates"].values[:] = total_rates.values[
:, np.newaxis, :, np.newaxis
]
output_vars["background_rates_uncertainty"].values[:] = total_unc.values[
:, np.newaxis, :, np.newaxis
]

return output_vars


def pset_exposure(
pset_coords: dict[str, xr.DataArray],
Expand Down
Loading
Loading