Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 24 additions & 15 deletions sotrplib/handlers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from sotrplib.maps.postprocessor import MapPostprocessor
from sotrplib.maps.preprocessor import MapPreprocessor
from sotrplib.outputs.core import SourceOutput
from sotrplib.sifter.core import EmptySifter, SiftingProvider
from sotrplib.sifter.core import EmptySifter, SifterResult, SiftingProvider
from sotrplib.sims.sim_source_generators import (
SimulatedSource,
SimulatedSourceGenerator,
Expand All @@ -29,6 +29,7 @@
ForcedPhotometryProvider,
)
from sotrplib.sources.force import EmptyForcedPhotometry
from sotrplib.sources.sources import MeasuredSource
from sotrplib.sources.subtractor import EmptySourceSubtractor, SourceSubtractor

__all__ = ["BaseRunner"]
Expand Down Expand Up @@ -116,9 +117,6 @@ def build_map(self, input_map: ProcessableMap) -> ProcessableMap:

return output_map

def coadd_maps(self, input_maps: list[ProcessableMap]) -> list[ProcessableMap]:
return self.map_coadder.coadd(input_maps)

@property
def bbox(self):
if not self.maps:
Expand Down Expand Up @@ -168,9 +166,22 @@ def simulate_sources(self) -> list[SimulatedSource]:
self.source_catalogs.append(catalog)
return all_simulated_sources

def coadd_and_analyze_maps(
self, maps: list[ProcessableMap], simulated_sources: list[SimulatedSource]
) -> tuple[list[MeasuredSource], SifterResult]:
"""
Coadd and analyze maps in a single task to avoid passing maps between processes.
"""
coadded_map = self.profilable_task(self.map_coadder.coadd_maps)(maps)
return self.profilable_task(self.analyze_map)(
input_map=coadded_map, simulated_sources=simulated_sources
)

def analyze_map(
self, input_map: ProcessableMap, simulated_sources: list[SimulatedSource]
) -> tuple[list, object, ProcessableMap]:
) -> tuple[list[MeasuredSource], SifterResult]:
input_map = self.profilable_task(self.build_map)(input_map)

self.profilable_task(input_map.finalize)()

injected_sources, input_map = self.profilable_task(self.source_injector.inject)(
Expand Down Expand Up @@ -232,28 +243,26 @@ def analyze_map(
self.profilable_task(output.output)(
forced_photometry_candidates=forced_photometry_candidates,
sifter_result=sifter_result,
input_map=input_map,
map_id=input_map.map_id,
pointing_sources=pointing_sources,
injected_sources=injected_sources,
)
if input_map._parent_database is not None:
set_processing_end(input_map.map_id)
return forced_photometry_candidates, sifter_result, input_map
self.profilable_task(set_processing_end)(input_map.map_id)
return forced_photometry_candidates, sifter_result

def run(self) -> tuple[list[list], list[object], list[ProcessableMap]]:
def run(self) -> tuple[list[list[MeasuredSource]], list[SifterResult]]:
return self.flow(self._run)()

def _run(self) -> tuple[list[list], list[object], list[ProcessableMap]]:
def _run(self) -> tuple[list[list[MeasuredSource]], list[SifterResult]]:
"""
The actual pipeline run logic has to be in a separate method so that it can be
decorated with the flow as prefect needs these to be defined in advance.
"""
all_simulated_sources = self.basic_task(self.simulate_sources)()
self.maps = self.basic_task(self.build_map).map(self.maps).result()
self.maps = [m for m in self.maps if m is not None]
self.maps = self.coadd_maps(self.maps)
self.map_sets = self.basic_task(self.map_coadder.group_maps)(self.maps)
return (
self.basic_task(self.analyze_map)
.map(self.maps, self.unmapped(all_simulated_sources))
self.basic_task(self.coadd_and_analyze_maps)
.map(self.map_sets, self.unmapped(all_simulated_sources))
.result()
)
Loading