Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 136 additions & 0 deletions fink_filters/ztf/filter_blazar_high_state/filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# Copyright 2025 AstroLab Software
# Author: Julian Hamo
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from line_profiler import profile

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import BooleanType

import pandas as pd

from fink_filters.tester import spark_unit_tests


@pandas_udf(BooleanType(), PandasUDFType.SCALAR)
@profile
def high_state_filter(instantness_high, robustness_high) -> pd.Series:
"""Returns True if the alert is considered a high state, False otherwise.

Parameters
----------
instantness_high: Spark DataFrame Column
`instantness_high` feature computed in the blazar_extreme_state module.
robustness_high: Spark DataFrame Column
`robustness_high` feature computed in the blazar_extreme_state module.

Returns
-------
check: pd.Series
Mask that returns True if the alert is a high state,
False else.

Examples
--------
>>> import pyspark.sql.functions as F
>>> import os
>>> import numpy as np
>>> import pandas as pd
>>> from fink_utils.spark.utils import concat_col
>>> from fink_science.ztf.standardized_flux.processor import standardized_flux
>>> from fink_science.ztf.blazar_extreme_state.processor import extreme_state
>>> from fink_utils.spark.utils import apply_user_defined_filter

>>> parDF = spark.read.parquet(ztf_alert_sample)
>>> parDF = parDF.drop("blazar_stats")

# Required alert columns
>>> what = [
... "distnr",
... "magpsf",
... "sigmapsf",
... "magnr",
... "sigmagnr",
... "isdiffpos",
... "fid",
... "jd",
... "ra",
... "dec",
... ]

# Concatenation
>>> prefix = "c"
>>> for key in what:
... parDF = concat_col(parDF, colname=key, prefix=prefix)

# Preliminary module run
>>> args = [
... "candid",
... "objectId",
... "cdistnr",
... "cmagpsf",
... "csigmapsf",
... "cmagnr",
... "csigmagnr",
... "cisdiffpos",
... "cfid",
... "cjd",
... ]
>>> parDF = parDF.withColumn(
... "container",
... standardized_flux(*args)
... )
>>> parDF = parDF.withColumn(
... "cstd_flux",
... parDF["container"].getItem("flux")
... )
>>> parDF = parDF.withColumn(
... "csigma_std_flux",
... parDF["container"].getItem("sigma")
... )

# Drop temporary columns
>>> what_prefix = [prefix + key for key in what]
>>> parDF = parDF.drop("container")

# Test the module
>>> args = ["candid", "objectId", "cstd_flux", "cjd", "cra", "cdec"]
>>> parDF = parDF.withColumn("blazar_stats", extreme_state(*args))

>>> parDF = parDF.withColumn(
... "instantness_high",
... F.col("blazar_stats").getItem("instantness_high").alias("instantness_high")
... )
>>> parDF = parDF.withColumn(
... "robustness_high",
... F.col("blazar_stats").getItem("robustness_high").alias("robustness_high")
... )
>>> f = "fink_filters.ztf.filter_blazar_high_state.filter.high_state_filter"
>>> parDF = apply_user_defined_filter(parDF, f)
>>> print(parDF.count())
24
"""
f1, f2 = instantness_high > 1, robustness_high > 1
return pd.Series(f1 & f2)


if __name__ == "__main__":
""" Execute the test suite """

# Run the test suite
globs = globals()
ztf_alert_sample = "datatest/CTAO_blazar/CTAO_blazar_datatest_v20-12-24.parquet"
globs["ztf_alert_sample"] = ztf_alert_sample

# Run the test suite
spark_unit_tests(globs)
102 changes: 84 additions & 18 deletions fink_filters/ztf/filter_blazar_low_state/filter.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2025 AstroLab Software
# Copyright 2026 AstroLab Software
# Author: Julian Hamo
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -24,38 +24,104 @@

@pandas_udf(BooleanType(), PandasUDFType.SCALAR)
@profile
def low_state_filter(m1, m2) -> pd.Series:
"""Returns True the alert is considered a quiescent state, False otherwise.
def low_state_filter(instantness_low, robustness_low) -> pd.Series:
"""Returns True if the alert is considered a low state, False otherwise.

Parameters
----------
m1: Spark DataFrame Column
`m1` feature computed in the blazar_low_state module
m2: Spark DataFrame Column
`m2` feature computed in the blazar_low_state module
instantness_low: Spark DataFrame Column
`instantness_low` feature computed in the blazar_extreme_state module.
robustness_low: Spark DataFrame Column
`robustness_low` feature computed in the blazar_extreme_state module.

Returns
-------
check: pd.Series
Mask that returns True if the alert is a low state,
False else
False else.

Examples
--------
>>> import pyspark.sql.functions as F
>>> import os
>>> import numpy as np
>>> import pandas as pd
>>> from fink_utils.spark.utils import concat_col
>>> from fink_science.ztf.standardized_flux.processor import standardized_flux
>>> from fink_science.ztf.blazar_extreme_state.processor import extreme_state
>>> from fink_utils.spark.utils import apply_user_defined_filter

# Test
>>> df = spark.read.parquet(ztf_alert_sample)
>>> df = df.withColumn("m1", F.col('blazar_stats').getItem('m1').alias("m1"))
>>> df = df.withColumn("m2", F.col('blazar_stats').getItem('m2').alias("m2"))
>>> f = 'fink_filters.ztf.filter_blazar_low_state.filter.low_state_filter'
>>> df = apply_user_defined_filter(df, f)
>>> print(df.count())
12
>>> parDF = spark.read.parquet(ztf_alert_sample)
>>> parDF = parDF.drop("blazar_stats")

# Required alert columns
>>> what = [
... "distnr",
... "magpsf",
... "sigmapsf",
... "magnr",
... "sigmagnr",
... "isdiffpos",
... "fid",
... "jd",
... "ra",
... "dec",
... ]

# Concatenation
>>> prefix = "c"
>>> for key in what:
... parDF = concat_col(parDF, colname=key, prefix=prefix)

# Preliminary module run
>>> args = [
... "candid",
... "objectId",
... "cdistnr",
... "cmagpsf",
... "csigmapsf",
... "cmagnr",
... "csigmagnr",
... "cisdiffpos",
... "cfid",
... "cjd",
... ]
>>> parDF = parDF.withColumn(
... "container",
... standardized_flux(*args)
... )
>>> parDF = parDF.withColumn(
... "cstd_flux",
... parDF["container"].getItem("flux")
... )
>>> parDF = parDF.withColumn(
... "csigma_std_flux",
... parDF["container"].getItem("sigma")
... )

# Drop temporary columns
>>> what_prefix = [prefix + key for key in what]
>>> parDF = parDF.drop("container")

# Test the module
>>> args = ["candid", "objectId", "cstd_flux", "cjd", "cra", "cdec"]
>>> parDF = parDF.withColumn("blazar_stats", extreme_state(*args))

>>> parDF = parDF.withColumn(
... "instantness_low",
... F.col("blazar_stats").getItem("instantness_low").alias("instantness_low")
... )
>>> parDF = parDF.withColumn(
... "robustness_low",
... F.col("blazar_stats").getItem("robustness_low").alias("robustness_low")
... )
>>> f = "fink_filters.ztf.filter_blazar_low_state.filter.low_state_filter"
>>> parDF = apply_user_defined_filter(parDF, f)
>>> print(parDF.count())
8
"""
f1 = (m1 < 1) & (m1 >= 0)
f2 = (m2 < 1) & (m2 >= 0)
f1 = (instantness_low < 1) & (instantness_low >= 0)
f2 = (robustness_low < 1) & (robustness_low >= 0)
return pd.Series(f1 & f2)


Expand Down
Empty file.
128 changes: 128 additions & 0 deletions fink_filters/ztf/filter_blazar_low_state_old/filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Copyright 2025 AstroLab Software
# Author: Julian Hamo
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from line_profiler import profile

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import BooleanType

import pandas as pd

from fink_filters.tester import spark_unit_tests


@pandas_udf(BooleanType(), PandasUDFType.SCALAR)
@profile
def low_state_filter(m1, m2) -> pd.Series:
"""Returns True the alert is considered a quiescent state, False otherwise.

Parameters
----------
m1: Spark DataFrame Column
`m1` feature computed in the blazar_low_state module
m2: Spark DataFrame Column
`m2` feature computed in the blazar_low_state module

Returns
-------
check: pd.Series
Mask that returns True if the alert is a low state,
False else

Examples
--------
>>> import pyspark.sql.functions as F
>>> import os
>>> import numpy as np
>>> import pandas as pd
>>> from fink_utils.spark.utils import concat_col
>>> from fink_science.ztf.standardized_flux.processor import standardized_flux
>>> from fink_science.ztf.blazar_low_state.processor import quiescent_state
>>> from fink_utils.spark.utils import apply_user_defined_filter

>>> parDF = spark.read.parquet(ztf_alert_sample)
>>> parDF = parDF.drop("blazar_stats")

# Required alert columns
>>> what = [
... 'distnr',
... 'magpsf',
... 'sigmapsf',
... 'magnr',
... 'sigmagnr',
... 'isdiffpos',
... 'fid',
... 'jd'
... ]

# Concatenation
>>> prefix = 'c'
>>> for key in what:
... parDF = concat_col(parDF, colname=key, prefix=prefix)

# Preliminary module run
>>> args = [
... 'candid',
... 'objectId',
... 'cdistnr',
... 'cmagpsf',
... 'csigmapsf',
... 'cmagnr',
... 'csigmagnr',
... 'cisdiffpos',
... 'cfid',
... 'cjd'
... ]
>>> parDF = parDF.withColumn(
... 'container',
... standardized_flux(*args)
... )
>>> parDF = parDF.withColumn(
... 'cstd_flux',
... parDF['container'].getItem('flux')
... )
>>> parDF = parDF.withColumn(
... 'csigma_std_flux',
... parDF['container'].getItem('sigma')
... )

# Drop temporary columns
>>> what_prefix = [prefix + key for key in what]
>>> parDF = parDF.drop('container')

# Test the module
>>> args = ['candid', 'objectId', 'cstd_flux', 'cjd']
>>> parDF = parDF.withColumn('blazar_stats', quiescent_state(*args))

>>> parDF = parDF.withColumn("m1", F.col('blazar_stats').getItem('m1').alias("m1"))
>>> parDF = parDF.withColumn("m2", F.col('blazar_stats').getItem('m2').alias("m2"))
>>> f = 'fink_filters.ztf.filter_blazar_low_state_old.filter.low_state_filter'
>>> parDF = apply_user_defined_filter(parDF, f)
>>> print(parDF.count())
12
"""
f1, f2 = (m1 < 1) & (m1 >= 0), (m2 < 1) & (m2 >= 0)
return pd.Series(f1 & f2)


if __name__ == "__main__":
""" Execute the test suite """

# Run the test suite
globs = globals()
ztf_alert_sample = "datatest/CTAO_blazar/CTAO_blazar_datatest_v20-12-24.parquet"
globs["ztf_alert_sample"] = ztf_alert_sample

# Run the test suite
spark_unit_tests(globs)
Loading
Loading