diff --git a/bin/ztf/archive_hostless.py b/bin/ztf/archive_hostless.py index dbe11bd6..417512cd 100644 --- a/bin/ztf/archive_hostless.py +++ b/bin/ztf/archive_hostless.py @@ -112,6 +112,7 @@ def main(): "kstest_static", "finkclass", "tnsclass", + "slsn_score", F.col("cutoutScience.stampData").alias("cutoutScience"), F.col("cutoutTemplate.stampData").alias("cutoutTemplate"), ] @@ -151,15 +152,17 @@ def main(): cutout_template = get_cutout(cutout=alert["cutoutTemplate"]) text = """ -*Object ID*: [{}](https://fink-portal.org/{}) +*Object ID*: [{}](https://ztf.fink-portal.org/{}) *Scores:*\n- Science: {:.2f}\n- Template: {:.2f} *Fink class*: {} +*SLSN score*: {} """.format( alert["objectId"], alert["objectId"], alert["kstest_static"][0], alert["kstest_static"][1], alert["finkclass"], + alert["slsn_score"], ) payloads.append((text, curve_png, [cutout_science, cutout_template])) diff --git a/fink_broker/rubin/hbase_utils.py b/fink_broker/rubin/hbase_utils.py index 46e4443e..92ef1c5f 100644 --- a/fink_broker/rubin/hbase_utils.py +++ b/fink_broker/rubin/hbase_utils.py @@ -114,7 +114,9 @@ def load_fink_cols(): "pred.main_label_classifier": {"type": "int", "default": None}, "misc.firstDiaSourceMjdTaiFink": {"type": "double", "default": None}, } - + fink_source_cols.update({ + "elephant_kstest": {"type": "map", "default": {}} + }) return fink_source_cols, fink_object_cols diff --git a/fink_broker/rubin/science.py b/fink_broker/rubin/science.py index b90a3448..c5fde754 100644 --- a/fink_broker/rubin/science.py +++ b/fink_broker/rubin/science.py @@ -23,6 +23,8 @@ from fink_broker.common.tester import spark_unit_tests +from fink_science.rubin.hostless_detection.processor import run_potential_hostless + # Import of science modules from fink_science.rubin.snn.processor import snn_ia_elasticc from fink_science.rubin.cats.processor import predict_nn @@ -374,6 +376,13 @@ def apply_science_modules(df: DataFrame, tns_raw_output: str = "") -> DataFrame: df = df.withColumn("misc", F.struct(*misc_struct)) df = df.drop(*misc_struct) + _LOG.info("New processor: ELEPHANT Hostless module") + df = df.withColumn( + "elephant_kstest", + run_potential_hostless( + df["cutoutScience"], df["cutoutTemplate"], df["ssSource.ssObjectId"] + ), + ) # Drop temp columns df = df.drop(*expanded) diff --git a/fink_broker/ztf/hbase_utils.py b/fink_broker/ztf/hbase_utils.py index 16640321..049a99c2 100644 --- a/fink_broker/ztf/hbase_utils.py +++ b/fink_broker/ztf/hbase_utils.py @@ -82,6 +82,8 @@ def load_fink_cols(): "gaiaClass": {"type": "string", "default": "Unknown"}, "is_transient": {"type": "boolean", "default": False}, "slsn_score": {"type": "float", "default": -1}, + "finkclass": {"type": "float", "default": "Unknown"}, + "is_hostless": {"type": "boolean", "default": False}, } fink_nested_cols = {} diff --git a/fink_broker/ztf/science.py b/fink_broker/ztf/science.py index 67340552..8ba754e5 100644 --- a/fink_broker/ztf/science.py +++ b/fink_broker/ztf/science.py @@ -19,10 +19,14 @@ import os import logging +import numpy as np from fink_utils.spark.utils import concat_col from fink_broker.common.tester import spark_unit_tests +from fink_filters.ztf.classification import extract_fink_classification +from fink_science.ztf.hostless_detection.processor import run_potential_hostless + # Import of science modules from fink_science.ztf.random_forest_snia.processor import rfscore_sigmoid_full @@ -347,7 +351,6 @@ def apply_science_modules(df: DataFrame, tns_raw_output: str = "") -> DataFrame: .withColumn("lc_features_r", df["lc_features"].getItem("2")) .drop("lc_features") ) - # Apply level one processor: fast transient _LOG.info("New processor: magnitude rate for fast transient") mag_rate_args = [ @@ -421,6 +424,55 @@ def apply_science_modules(df: DataFrame, tns_raw_output: str = "") -> DataFrame: df = df.withColumn("slsn_score", superluminous_score(*args)) + _LOG.info("New processor: ELEPHANT Hostless module") + fink_classifier_cols = [ + "cdsxmatch", + "roid", + "mulens", + "snn_snia_vs_nonia", + "snn_sn_vs_all", + "rf_snia_vs_nonia", + "candidate.ndethist", + "candidate.drb", + "candidate.classtar", + "candidate.jd", + "candidate.jdstarthist", + "rf_kn_vs_nonkn", + F.lit(""), + ] + df = df.withColumn("finkclass", extract_fink_classification(*fink_classifier_cols)) + df = df.withColumn( + "kstest_static", + run_potential_hostless( + df["cmagpsf"], + df["cutoutScience.stampData"], + df["cutoutTemplate.stampData"], + df["snn_snia_vs_nonia"], + df["snn_sn_vs_all"], + df["rf_snia_vs_nonia"], + df["rf_kn_vs_nonkn"], + df["finkclass"], + df["tns"], + df["candidate.jd"] - df["candidate.jdstarthist"], + df["roid"], + ), + ) + cond_science_low = df["kstest_static"][0] >= 0.0 + cond_science_high = df["kstest_static"][0] <= 0.5 + cond_template_low = df["kstest_static"][1] >= 0.0 + cond_template_high = df["kstest_static"][1] <= 0.85 + cond_max_detections = F.size(F.array_remove("cmagpsf", np.nan)) <= 20 + + df = df.withColumn( + "is_hostless", + cond_science_low + & cond_science_high + & cond_template_low + & cond_template_high + & cond_max_detections, + ) + + expanded.extend(["kstest_static"]) # Drop temp columns df = df.drop(*expanded)