Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion bin/fink_ssoft
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ while [ "$#" -gt 0 ]; do
TRANSFER_SSOFT=true
shift 1
;;
--construct-bulk)
CONSTRUCT_BULK=true
shift 1
;;
-*)
echo "unknown option: $1" >&2
exit 1
Expand Down Expand Up @@ -150,15 +154,17 @@ Options:
--link-ssoft Create a version agnostic SSOFT folder, with the latest version data
--list-ssoft List all available SSOFT on HDFS
--transfer-ssoft Transfer SSOFT to HDFS, and backup it. Read from -ssoft_outfolder
--construct-bulk Join the aggregated data and SSOFT to build bulk file

Examples:
fink_ssoft -s ztf --reconstruct-data # Reconstruct all database
fink_ssoft -s ztf --update-data # Add last month data
fink_ssoft -s ztf --list-data # List all aggregated files
fink_ssoft -s ztf --run-ssoft -model SHG1G2 # Compute SHG1G2 SSOFT
fink_ssoft -s ztf --run-ssoft -model SHG1G2 -version 202504 # Compute SHG1G2 SSOFT until 2025/04
fink_ssoft -s ztf --run-ssoft -model SHG1G2 -version 202504 # Compute SHG1G2 SSOFT until 2025/04
fink_ssoft -s ztf --run-ssoft -model SHG1G2 -limit 10 # Compute SHG1G2 SSOFT with 100 objects only
fink_ssoft -s ztf --run-ssoft -model SHG1G2 -min 100 # Compute SHG1G2 SSOFT for objects with at least 100 observations
fink_ssoft -s ztf --construct-bulk # Join the aggregated data and SSOFT to build bulk file

Notes: Typically the schedule is to invoke once a month the commands:
fink_ssoft -s ztf --update-data
Expand All @@ -168,6 +174,7 @@ fink_ssoft -s ztf --run-ssoft -model HG1G2
fink_ssoft -s ztf --run-ssoft -model SHG1G2
fink_ssoft -s ztf --run-ssoft -model SSHG1G2
fink_ssoft -s ztf --transfer_ssoft
fink_ssoft -s ztf --construct-bulk

Notes: If you lose all your ephemerides data, just reconstruct everything with
fink_ssoft -s ztf --reconstruct-data
Expand Down Expand Up @@ -375,3 +382,23 @@ if [[ ${TRANSFER_SSOFT} == true ]]; then

fi

if [[ ${CONSTRUCT_BULK} == true ]]; then
# check last aggregation exist
is_data=$(hdfs dfs -test -d sso_ztf_lc_aggregated_${YEAR}${MONTH}.parquet)
if [[ $? == 0 ]]; then
echo -e "Computing the SSO BULK..."
# Run the script
spark-submit \
--master ${SPARK_MASTER} \
--packages ${FINK_PACKAGES} --jars ${FINK_JARS} \
${EXTRA_SPARK_CONFIG} ${RESOURCES} \
--conf spark.sql.execution.arrow.pyspark.enabled=true\
--conf spark.kryoserializer.buffer.max=512m\
${PYTHON_EXTRA_FILE}\
${FINK_HOME}/bin/${SURVEY}/extract_ssobulk.py ${LIMIT_ARGS}
else
echo -e >&2 "${SERROR} sso_ztf_lc_aggregated_${YEAR}${MONTH}.parquet does not exist."
echo -e >&2 "${SSTOP} Merging aborted..."
exit 1
fi
fi
182 changes: 182 additions & 0 deletions bin/ztf/extract_ssobulk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
# Copyright 2019-2026 AstroLab Software
# Author: Julien Peloton
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Extract all SSO lightcurves with residuals from the sHG1G2 model."""

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, FloatType

import pandas as pd
import numpy as np
import datetime
import argparse
from line_profiler import profile

from fink_utils.sso.spins import func_shg1g2

import logging


_LOG = logging.getLogger(__name__)
SSO_FILE = "sso_ztf_lc_aggregated_w_ssoft_{}{}.parquet"


@pandas_udf(ArrayType(FloatType()), PandasUDFType.SCALAR)
@profile
def extract_residuals(
fid,
magpsf,
raobs,
decobs,
phase,
dobs,
dhelio,
H_1,

Check warning on line 46 in bin/ztf/extract_ssobulk.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename this parameter "H_1" to match the regular expression ^[_a-z][a-z0-9_]*$.

See more on https://sonarcloud.io/project/issues?id=finkbroker&issues=AZzXDOPrzSB7bvLMYtaG&open=AZzXDOPrzSB7bvLMYtaG&pullRequest=1146
H_2,

Check warning on line 47 in bin/ztf/extract_ssobulk.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename this parameter "H_2" to match the regular expression ^[_a-z][a-z0-9_]*$.

See more on https://sonarcloud.io/project/issues?id=finkbroker&issues=AZzXDOPrzSB7bvLMYtaH&open=AZzXDOPrzSB7bvLMYtaH&pullRequest=1146
G1_1,

Check warning on line 48 in bin/ztf/extract_ssobulk.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename this parameter "G1_1" to match the regular expression ^[_a-z][a-z0-9_]*$.

See more on https://sonarcloud.io/project/issues?id=finkbroker&issues=AZzXDOPrzSB7bvLMYtaC&open=AZzXDOPrzSB7bvLMYtaC&pullRequest=1146
G1_2,

Check warning on line 49 in bin/ztf/extract_ssobulk.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename this parameter "G1_2" to match the regular expression ^[_a-z][a-z0-9_]*$.

See more on https://sonarcloud.io/project/issues?id=finkbroker&issues=AZzXDOPrzSB7bvLMYtaD&open=AZzXDOPrzSB7bvLMYtaD&pullRequest=1146
G2_1,

Check warning on line 50 in bin/ztf/extract_ssobulk.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename this parameter "G2_1" to match the regular expression ^[_a-z][a-z0-9_]*$.

See more on https://sonarcloud.io/project/issues?id=finkbroker&issues=AZzXDOPrzSB7bvLMYtaE&open=AZzXDOPrzSB7bvLMYtaE&pullRequest=1146
G2_2,

Check warning on line 51 in bin/ztf/extract_ssobulk.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename this parameter "G2_2" to match the regular expression ^[_a-z][a-z0-9_]*$.

See more on https://sonarcloud.io/project/issues?id=finkbroker&issues=AZzXDOPrzSB7bvLMYtaF&open=AZzXDOPrzSB7bvLMYtaF&pullRequest=1146
R,

Check warning on line 52 in bin/ztf/extract_ssobulk.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename this parameter "R" to match the regular expression ^[_a-z][a-z0-9_]*$.

See more on https://sonarcloud.io/project/issues?id=finkbroker&issues=AZzXDOPrzSB7bvLMYtaI&open=AZzXDOPrzSB7bvLMYtaI&pullRequest=1146
alpha0,
delta0,
fit,
):
"""Apply the sHG1G2 model to the data

Returns
-------
out: pd.Series of list
Magnitude residuals
"""
pdf = pd.DataFrame({
"residuals": [[] for i in range(len(raobs))],

Check warning on line 65 in bin/ztf/extract_ssobulk.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace the unused loop index "i" with "_".

See more on https://sonarcloud.io/project/issues?id=finkbroker&issues=AZzXDOPrzSB7bvLMYtaJ&open=AZzXDOPrzSB7bvLMYtaJ&pullRequest=1146
"fid": fid.to_numpy(),
"ra": raobs.to_numpy(),
"dec": decobs.to_numpy(),
"phase": phase.to_numpy(),
"magpsf": magpsf.to_numpy(),
"dobs": dobs.to_numpy(),
"dhelio": dhelio.to_numpy(),
"R": R.to_numpy(),
"alpha0": alpha0.to_numpy(),
"delta0": delta0.to_numpy(),
"H_1": H_1.to_numpy(),
"H_2": H_2.to_numpy(),
"G1_1": G1_1.to_numpy(),
"G1_2": G1_2.to_numpy(),
"G2_1": G2_1.to_numpy(),
"G2_2": G2_2.to_numpy(),
"fit": fit.to_numpy(),
})
for index, row in pdf.iterrows():
if row["fit"] != 0:
# Fit failed
pdf["residuals"].iloc[index] = []
continue

magpsf_red = row["magpsf"] - 5 * np.log10(row["dobs"] * row["dhelio"])
container = np.zeros(len(magpsf_red))
for filt in np.unique(row["fid"]):
cond = row["fid"] == filt
model = func_shg1g2(
[
np.deg2rad(row["phase"][cond]),
np.deg2rad(row["ra"][cond]),
np.deg2rad(row["dec"][cond]),
],
row["H_{}".format(filt)],
row["G1_{}".format(filt)],
row["G2_{}".format(filt)],
row["R"],
np.deg2rad(row["alpha0"]),
np.deg2rad(row["delta0"]),
)
container[cond] = magpsf_red[cond] - model
pdf["residuals"].iloc[index] = container
return pdf["residuals"]


def main():
parser = argparse.ArgumentParser(description=__doc__)

parser.add_argument(
"-limit",
type=int,
default=None,
help="""
Use only `limit` number of SSO for test purposes.
Default is None, meaning all available data is considered.
""",
)
args = parser.parse_args(None)

# Get current year and month
year = datetime.datetime.now().year
month = "{:02}".format(datetime.datetime.now().month)

spark = SparkSession.builder.appName(

Check warning on line 130 in bin/ztf/extract_ssobulk.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Specify both "master" and "appName" parameters to initialize this SparkSession.

See more on https://sonarcloud.io/project/issues?id=finkbroker&issues=AZzXEBis32CG6XB0CgMx&open=AZzXEBis32CG6XB0CgMx&pullRequest=1146
"sso_bulk_{}{}".format(year, month)
).getOrCreate()
spark.sparkContext.setLogLevel("WARN")

# Load SSO LC data
df_lc = spark.read.format("parquet").load(
"/user/julien.peloton/sso_ztf_lc_aggregated_{}{}.parquet".format(year, month)
)

# Load SSOFT data
df_ssoft = spark.read.format("parquet").load(
"/user/livy/SSOFT/ssoft_SHG1G2_{}.{}.parquet".format(year, month)
)

# Join
df = df_lc.join(df_ssoft, on="ssnamenr").cache()

Check warning on line 146 in bin/ztf/extract_ssobulk.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Specify the "how" parameter of this "join".

See more on https://sonarcloud.io/project/issues?id=finkbroker&issues=AZzXDOPrzSB7bvLMYtaL&open=AZzXDOPrzSB7bvLMYtaL&pullRequest=1146
nobjects = df.count()
_LOG.info("{} objects in the SSOBULK".format(nobjects))

if args.limit is not None:
assert isinstance(args.limit, int), (args.limit, type(args.limit))
_LOG.info("Limiting the new number of objects to {}".format(args.limit))
df = df.limit(args.limit)

df = df.withColumn(
"residuals_shg1g2",
extract_residuals(
df["cfid"],
df["cmagpsf"],
df["cra"],
df["cdec"],
df["Phase"],
df["Dobs"],
df["Dhelio"],
df["H_1"],
df["H_2"],
df["G1_1"],
df["G1_2"],
df["G2_1"],
df["G2_2"],
df["R"],
df["alpha0"],
df["delta0"],
df["fit"],
),
)

df.write.mode("append").parquet(SSO_FILE.format(year, month))


if __name__ == "__main__":
main()
2 changes: 1 addition & 1 deletion scheduler/ztf/launch_ssoft.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ fink_ssoft -s ztf --link-data > ${FINK_HOME}/broker_logs/ssoft_link_ephems_$CURR
fink_ssoft -s ztf --run-ssoft -model HG -version ${CURRDATE} > ${FINK_HOME}/broker_logs/ssoft_HG_$CURRDATE.log 2>&1
fink_ssoft -s ztf --run-ssoft -model HG1G2 -version ${CURRDATE} > ${FINK_HOME}/broker_logs/ssoft_HG1G2_$CURRDATE.log 2>&1
fink_ssoft -s ztf --run-ssoft -model SHG1G2 -version ${CURRDATE} > ${FINK_HOME}/broker_logs/ssoft_SHG1G2_$CURRDATE.log 2>&1
#fink_ssoft -s ztf --run-ssoft -model SSHG1G2
fink_ssoft -s ztf --construct-bulk > ${FINK_HOME}/broker_logs/ssobulk_$CURRDATE.log 2>&1
Loading