Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/activator/activator.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ def _process_visit_or_cancel(expected_visit: FannedOutVisit):
if not mwi.get_main_pipeline_files():
raise IgnorableVisit(f"No pipeline configured for {expected_visit}.")
# TODO: pipeline execution requires a clean run until DM-38041.
cleanups.callback(mwi.clean_local_repo, expid_set)
cleanups.callback(mwi.clean_local_repo)
# Copy calibrations for this detector/visit
mwi.prep_butler()

Expand Down
42 changes: 24 additions & 18 deletions python/activator/middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -1834,30 +1834,36 @@ def _query_datasets_by_storage_class(self, butler, exposure_ids, collections, st
) for t in matching_types
)

def clean_local_repo(self, exposure_ids: set[int]) -> None:
def clean_local_repo(self) -> None:
"""Remove local repo content that is only needed for a single visit.

This includes raws and pipeline outputs.

Parameter
---------
exposure_ids : `set` [`int`]
Identifiers of the exposures to be removed.
"""
with lsst.utils.timer.time_this(_log, msg="clean_local_repo", level=logging.DEBUG):
self.butler.registry.refresh()
if exposure_ids:
raws = self.butler.query_datasets(
'raw',
collections=self.instrument.makeDefaultRawIngestRunName(),
where=f"exposure in ({', '.join(str(x) for x in exposure_ids)})",
find_first=False,
explain=False, # Raws might not have been ingested.
instrument=self.visit.instrument,
detector=self.visit.detector,
)
_log_trace.debug("Removing %d raws for exposures %s.", len(raws), exposure_ids)
self.butler.pruneDatasets(raws, disassociate=True, unstore=True, purge=True)

# Clean out raws
raws = self.butler.query_datasets(
"raw",
collections=self.instrument.makeDefaultRawIngestRunName(),
find_first=False,
explain=False, # Raws might not have been ingested.
instrument=self.visit.instrument,
detector=self.visit.detector,
Comment on lines +1851 to +1852
Copy link
Copy Markdown
Member

@kfindeisen kfindeisen Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, almost missed this -- please delete all raws unconditionally (maybe with an instrument constraint, but I think the query will resolve just fine without it). That's kind of the point...

)
n_raws = len(raws)
if n_raws == 0:
_log_trace.debug("No raws to remove for detector %s.", self.visit.detector)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the "for detector" bit really makes sense here -- the point is if we accidentally left some old raws because of a logic bug, they'll get cleaned up too.

else:
_log_trace.debug("Removing %d raw(s) for detector %s.", n_raws, self.visit.detector)
try:
self.butler.pruneDatasets(raws, disassociate=True, unstore=True, purge=True)
except Exception:
_log_trace.exception("Raw removal failed for detector %s.", self.visit.detector)
raise
Comment on lines +1861 to +1863
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not use log-and-reraise -- exceptions should only be logged once, when they are handled.

If this were a reasonable place to log the exception, it would not make sense to log it at trace level (if something unexpectedly goes wrong, that's at least a warning).

_log_trace.debug(
"Successfully removed %d raw(s) for detector %s.", n_raws, self.visit.detector)

# Outputs are all in their own runs, so just drop them.
preload_run = runs.get_preload_run(self.instrument, self._deployment, self._day_obs)
_remove_run_completely(self.butler, preload_run)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -1098,7 +1098,7 @@ def test_clean_local_repo(self):
self._assert_in_collection(butler, "*", "bias", calib_data_id_2)
self._assert_in_collection(butler, "*", "bias", calib_data_id_3)

self.interface.clean_local_repo({raw_data_id["exposure"]})
self.interface.clean_local_repo()
self._assert_not_in_collection(butler, "*", "raw", raw_data_id)
self._assert_not_in_collection(butler, "*", "src", processed_data_id)
self._assert_not_in_collection(butler, "*", "calexp", processed_data_id)
Expand Down