|
21 | 21 |
|
22 | 22 | import numpy as np |
23 | 23 | import requests |
24 | | -import json |
25 | 24 | import os |
26 | | -from astropy.time import Time |
27 | 25 |
|
28 | 26 | import lsst.afw.detection as afwDetection |
29 | 27 | import lsst.afw.image as afwImage |
|
32 | 30 | import lsst.daf.base as dafBase |
33 | 31 | import lsst.geom |
34 | 32 | from lsst.ip.diffim.utils import evaluateMaskFraction, computeDifferenceImageMetrics |
| 33 | +from lsst.ip.diffim.utils import populate_sattle_visit_cache |
35 | 34 | from lsst.meas.algorithms import SkyObjectsTask, SourceDetectionTask, SetPrimaryFlagsTask, MaskStreaksTask |
36 | 35 | from lsst.meas.base import ForcedMeasurementTask, ApplyApCorrTask, DetectorVisitIdGeneratorConfig |
37 | 36 | import lsst.meas.deblender |
@@ -313,18 +312,6 @@ class DetectAndMeasureConfig(pipeBase.PipelineTaskConfig, |
313 | 312 | doc="If true, dia source bounding boxes will be sent for verification" |
314 | 313 | "to the sattle service." |
315 | 314 | ) |
316 | | - sattle_host = pexConfig.Field( |
317 | | - dtype=str, |
318 | | - default=os.getenv("SATTLE_HOST"), |
319 | | - doc="Host address for sattle service.", |
320 | | - optional=True |
321 | | - ) |
322 | | - sattle_port = pexConfig.Field( |
323 | | - dtype=str, |
324 | | - default=os.getenv("SATTLE_PORT"), |
325 | | - doc="Port to connect to sattle.", |
326 | | - optional=True |
327 | | - ) |
328 | 315 | sattle_historical = pexConfig.Field( |
329 | 316 | dtype=bool, |
330 | 317 | default=False, |
@@ -403,6 +390,14 @@ def setDefaults(self): |
403 | 390 | "STREAK", "INJECTED", "INJECTED_TEMPLATE"] |
404 | 391 | self.skySources.avoidMask = ["DETECTED", "DETECTED_NEGATIVE", "BAD", "NO_DATA", "EDGE"] |
405 | 392 |
|
| 393 | + def validate(self): |
| 394 | + super().validate() |
| 395 | + |
| 396 | + if self.run_sattle: |
| 397 | + if not os.getenv("SATTLE_URI_BASE"): |
| 398 | + raise pexConfig.FieldValidationError(DetectAndMeasureConfig.run_sattle, self, |
| 399 | + "Sattle requested but URI environment variable not set.") |
| 400 | + |
406 | 401 |
|
407 | 402 | class DetectAndMeasureTask(lsst.pipe.base.PipelineTask): |
408 | 403 | """Detect and measure sources on a difference image. |
@@ -721,8 +716,6 @@ def processResults(self, science, matchedTemplate, difference, sources, idFactor |
721 | 716 | diaSources = self._removeBadSources(initialDiaSources) |
722 | 717 |
|
723 | 718 | if self.config.run_sattle: |
724 | | - if not self.config.sattle_host or not self.config.sattle_port: |
725 | | - raise RuntimeError("Sattle filtering is on but service endpoints not set.") |
726 | 719 | diaSources = self.filterSatellites(diaSources, science) |
727 | 720 |
|
728 | 721 | if self.config.doForcedMeasurement: |
@@ -982,99 +975,80 @@ def calculateMetrics(self, science, difference, diaSources, kernelSources): |
982 | 975 | raise BadSubtractionError(ratio=metrics.differenceFootprintRatioStdev, |
983 | 976 | threshold=self.config.badSubtractionVariationThreshold) |
984 | 977 |
|
985 | | - def filterSatellites(self, diaSources, science): |
986 | | - |
987 | | - wcs = science.getWcs() |
988 | | - nbbox = [] |
| 978 | + def getSattleDiaSourceAllowlist(self, diaSources, science): |
| 979 | + """Query the sattle service and determine which diaSources are allowed. |
989 | 980 |
|
990 | | - for source in diaSources: |
991 | | - fp = source.getFootprint() |
992 | | - source_bbox = fp.getBBox() |
| 981 | + Parameters |
| 982 | + ---------- |
| 983 | + diaSources : `lsst.afw.table.SourceCatalog` |
| 984 | + The catalog of detected sources. |
| 985 | + science : `lsst.afw.image.ExposureF` |
| 986 | + Science exposure that was subtracted. |
993 | 987 |
|
994 | | - corners = [wcs.pixelToSky(source_bbox.beginX, source_bbox.beginY), |
995 | | - wcs.pixelToSky(source_bbox.beginX, source_bbox.endY), |
996 | | - wcs.pixelToSky(source_bbox.endX, source_bbox.endY), |
997 | | - wcs.pixelToSky(source_bbox.endX, source_bbox.beginY)] |
| 988 | + Returns |
| 989 | + ---------- |
| 990 | + allow_list : `list` of `int` |
| 991 | + diaSourceIds of diaSources that can be made public. |
998 | 992 |
|
999 | | - tmp = [] |
1000 | | - for c, corner in enumerate(corners): |
1001 | | - tmp.append([corner.getRa().asDegrees(), corner.getDec().asDegrees()]) |
1002 | | - nbbox.append(tmp) |
| 993 | + Raises |
| 994 | + ------ |
| 995 | + requests.HTTPError |
| 996 | + Raised if sattle call does not return success. |
| 997 | + """ |
1003 | 998 |
|
1004 | | - detector_id = science.getDetector().getId() |
1005 | | - visit_id = science.getInfo().getVisitInfo().getId() |
| 999 | + wcs = science.getWcs() |
| 1000 | + visit_info = science.getInfo().getVisitInfo() |
| 1001 | + visit_id = visit_info.getId() |
| 1002 | + sattle_uri_base = os.getenv('SATTLE_URI_BASE') |
1006 | 1003 |
|
1007 | 1004 | dia_sources_json = [] |
1008 | | - for i, source in enumerate(diaSources): |
1009 | | - dia_sources_json.append( |
1010 | | - {"diasource_id": source['id'], "bbox": nbbox[i]}) |
1011 | | - |
1012 | | - sattle_output = requests.put( |
1013 | | - f'{self.config.sattle_host}:{self.config.sattle_port}/diasource_allow_list', |
1014 | | - json={"visit_id": visit_id, "detector_id": detector_id, "diasources": dia_sources_json}) |
1015 | | - |
1016 | | - if sattle_output.status_code != 200: |
1017 | | - # Check if the cache is missing the visit and retry once |
1018 | | - if sattle_output.status_code == 404: |
1019 | | - try: |
1020 | | - self.log.info('Visit not found in sattle cache, re-sending') |
1021 | | - |
1022 | | - visit_id = science.getInfo().getVisitInfo().id |
1023 | | - |
1024 | | - visit_mjd = Time( |
1025 | | - science.getInfo().getVisitInfo().getDate().toPython()).mjd |
1026 | | - |
1027 | | - exposure_time_days = science.getInfo().getVisitInfo().getExposureTime() / 86400.0 |
1028 | | - exposure_end_mjd = visit_mjd + exposure_time_days / 2.0 |
1029 | | - exposure_start_mjd = visit_mjd - exposure_time_days / 2.0 |
1030 | | - |
1031 | | - boresight_ra = science.getInfo().getVisitInfo().boresightRaDec[0].asDegrees() |
1032 | | - boresight_dec = science.getInfo().getVisitInfo().boresightRaDec[1].asDegrees() |
| 1005 | + for source in diaSources: |
| 1006 | + source_bbox = source.getFootprint().getBBox() |
| 1007 | + corners = [wcs.pixelToSky(lsst.geom.Point2D(c)) for c in source_bbox.getCorners()] |
| 1008 | + bbox_radec = [[pt.getRa().asDegrees(), pt.getDec().asDegrees()] for pt in corners] |
| 1009 | + dia_sources_json.append({"diasource_id": source["id"], "bbox": bbox_radec}) |
1033 | 1010 |
|
1034 | | - r = requests.put( |
1035 | | - f'{self.config.sattle_host}:{self.config.sattle_port}/visit_cache', |
1036 | | - json={"visit_id": visit_id, |
1037 | | - "exposure_start_mjd": exposure_start_mjd, |
1038 | | - "exposure_end_mjd": exposure_end_mjd, |
1039 | | - "boresight_ra": boresight_ra, |
1040 | | - "boresight_dec": boresight_dec, |
1041 | | - "historical": self.config.sattle_historical}) |
| 1011 | + payload = {"visit_id": visit_id, "detector_id": science.getDetector(), "diasources": dia_sources_json, |
| 1012 | + "historical": self.config.sattle_historical} |
1042 | 1013 |
|
1043 | | - if r.status_code != 200: |
1044 | | - raise RuntimeError(r.text) |
| 1014 | + sattle_output = requests.put(f'{sattle_uri_base}/diasource_allow_list', |
| 1015 | + json=payload) |
1045 | 1016 |
|
1046 | | - sattle_output = requests.put( |
1047 | | - f'{self.config.sattle_host}:{self.config.sattle_port}/' |
1048 | | - 'diasource_allow_list', |
1049 | | - json={"visit_id": visit_id, "detector_id": detector_id, |
1050 | | - "diasources": dia_sources_json}) |
| 1017 | + # retry once if visit cache is not populated |
| 1018 | + if sattle_output.status_code == 404: |
| 1019 | + self.log.warning(f'Visit {visit_id} not found in sattle cache, re-sending') |
| 1020 | + populate_sattle_visit_cache(visit_info, historical=self.config.sattle_historical) |
| 1021 | + sattle_output = requests.put(f'{sattle_uri_base}/diasource_allow_list', json=payload) |
1051 | 1022 |
|
1052 | | - # fail on any non-success |
1053 | | - if sattle_output.status_code != 200: |
1054 | | - raise RuntimeError(sattle_output.text) |
| 1023 | + sattle_output.raise_for_status() |
1055 | 1024 |
|
1056 | | - except (requests.RequestException, ConnectionError) as e: |
1057 | | - raise RuntimeError(sattle_output.text) from e |
| 1025 | + return sattle_output.json()['allow_list'] |
1058 | 1026 |
|
1059 | | - else: |
1060 | | - raise RuntimeError(sattle_output.text) |
| 1027 | + def filterSatellites(self, diaSources, science): |
| 1028 | + """Remove diaSources overlapping predicted satellite positions. |
1061 | 1029 |
|
1062 | | - sattle_output_array = json.loads(sattle_output.content) |
| 1030 | + Parameters |
| 1031 | + ---------- |
| 1032 | + diaSources : `lsst.afw.table.SourceCatalog` |
| 1033 | + The catalog of detected sources. |
| 1034 | + science : `lsst.afw.image.ExposureF` |
| 1035 | + Science exposure that was subtracted. |
1063 | 1036 |
|
1064 | | - if sattle_output_array['allow_list'] == []: |
1065 | | - self.log.warning('Sattle output array is empty, all sources removed') |
1066 | | - diaSources = diaSources[0:0].copy(deep=True) |
1067 | | - else: |
| 1037 | + Returns |
| 1038 | + ---------- |
| 1039 | + filterdDiaSources : `lsst.afw.table.SourceCatalog` |
| 1040 | + Filtered catalog of diaSources |
| 1041 | + """ |
1068 | 1042 |
|
1069 | | - allowed_ids = [] |
1070 | | - for source in diaSources: |
1071 | | - if source['id'] in sattle_output_array['allow_list']: |
1072 | | - allowed_ids.append(True) |
1073 | | - else: |
1074 | | - allowed_ids.append(False) |
| 1043 | + allow_list = self.getSattleDiaSourceAllowlist(diaSources, science) |
1075 | 1044 |
|
| 1045 | + if len(allow_list): |
| 1046 | + allow_set = set(allow_list) |
| 1047 | + allowed_ids = [source['id'] in allow_set for source in diaSources] |
1076 | 1048 | diaSources = diaSources[np.array(allowed_ids)].copy(deep=True) |
1077 | | - |
| 1049 | + else: |
| 1050 | + self.log.warning('Sattle allowlist is empty, all diaSources removed') |
| 1051 | + diaSources = diaSources[0:0].copy(deep=True) |
1078 | 1052 | return diaSources |
1079 | 1053 |
|
1080 | 1054 | def _runStreakMasking(self, difference): |
|
0 commit comments