diff --git a/README.md b/README.md index 6f1d6d8..23362a1 100644 --- a/README.md +++ b/README.md @@ -33,16 +33,10 @@ The data necessary to reproduce these results are available through [zenodo](htt Create a virtual environment following [these instructions](https://uoa-eresearch.github.io/eresearch-cookbook/recipe/2014/11/26/python-virtual-env/). Source it and install the [actsnclass](https://github.com/COINtoolbox/ActSNClass) package. -Then you can install the other dependencies using pip: - -``` -python3 -m pip install -r requirements.txt -``` - Then you can install the functionalities of this package. ``` -python setup.py install +>>> (my_env) pip install . ``` ## Rainbow diff --git a/actsnfink/early_sn_classifier.py b/actsnfink/early_sn_classifier.py index 8e881b2..d825b6b 100644 --- a/actsnfink/early_sn_classifier.py +++ b/actsnfink/early_sn_classifier.py @@ -14,6 +14,9 @@ # limitations under the License. import actsnclass +import json +import mlflow +from mlflow.models.signature import infer_signature import pandas as pd import numpy as np import os @@ -302,7 +305,9 @@ def build_samples(features: pd.DataFrame, initial_training: int, def learn_loop(data: actsnclass.DataBase, nloops: int, strategy: str, output_metrics_file: str, output_queried_file: str, classifier='RandomForest', batch=1, screen=True, - output_prob_root=None, seed=42, nest=1000): + output_prob_root=None, seed=42, nest=1000, mlflow_uri=None, + mlflow_exp=None, features_names=None, pre_code_path=None, + pre_data_path=None): """Perform the active learning loop. All results are saved to file. Parameters @@ -331,15 +336,29 @@ def learn_loop(data: actsnclass.DataBase, nloops: int, strategy: str, If True, print on screen number of light curves processed. seed: int (optional) Random seed. + mlflow_uri: str (optional) + MLFlow address to log info on each loop. Default is None. + mlflow_exp: str (optional) + Name of MLFlow experiment. Default is None. + pre_code_path: str (optional) + Path to code enabling feature extraction. Default is None. + pre_data_path: str (optional) + Path to data enabling feature extraction. Default is None. """ + if bool(mlflow_uri): + + mlflow.set_tracking_uri(mlflow_uri) # set mlflow remote uri + mlflow.set_experiment(mlflow_exp) # determine experiment name + + for loop in range(nloops): if screen: print('Processing... ', loop) # classify - data.classify(method=classifier, seed=seed, n_est=nest) + data.classify(method=classifier, seed=seed, n_est=nest, return_model=bool(mlflow_uri)) if isinstance(output_prob_root, str): data_temp = data.test_metadata.copy(deep=True) @@ -363,7 +382,57 @@ def learn_loop(data: actsnclass.DataBase, nloops: int, strategy: str, # save query sample to file data.save_queried_sample(output_queried_file, loop=loop, batch=batch, full_sample=False) - + + if bool(mlflow_uri): + with mlflow.start_run(run_name=strategy + "_loop_" + str(loop)) as run: + + mlflow.log_artifact(pre_code_path, artifact_path='code') + mlflow.log_artifact(pre_data_path, artifact_path='code') + + # Log metadata + meta_info = { + "n_train": data.train_labels.shape[0], + "n_test": data.test_labels.shape[0], + "n_queried": len(data.queried_sample), + "n_queryable": data.queryable_ids.shape[0] + } + + mlflow.log_dict(meta_info, "meta.json") + + # log parameters of learn_loop + mlflow.log_param('loop', loop) + mlflow.log_param('strategy', strategy) + mlflow.log_param('classifier', classifier) + mlflow.log_param('batch', batch) + mlflow.log_param('seed', seed) + mlflow.log_param('nest', nest) + + # log metrics + for i in range(len(data.metrics_list_names)): + mlflow.log_metric(data.metrics_list_names[i], data.metrics_list_values[i]) + + # log signature + signature = infer_signature(data.train_features, data.trained_model.predict(data.train_features)) + current_model = mlflow.sklearn.log_model( + name ='actsnfink_' + str(loop), + signature = signature, + sk_model=data.trained_model, + model_type = 'classifier' + ) + + # Saving output file + mlflow.log_artifact(output_metrics_file) + + # Saving datasets + train = pd.DataFrame(data.train_features, columns=features_names) + mlflow.log_table(train, artifact_file='training_features.parquet') + + result = mlflow.models.evaluate( + model=current_model.model_uri, + data=data.test_features, + targets=data.test_labels, + model_type="classifier", + ) def build_matrix(fname_output: str, dirname_input: str, n: int, diff --git a/actsnfink/scripts/feature_extraction_ztf.py b/actsnfink/scripts/feature_extraction_ztf.py new file mode 100644 index 0000000..b1c1218 --- /dev/null +++ b/actsnfink/scripts/feature_extraction_ztf.py @@ -0,0 +1,233 @@ +# Copyright 2025 +# Author: Emille Ishida +# +# Licensed under the MIT License; +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://opensource.org/licenses/mit-license.php +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pandas as pd +import numpy as np +from copy import deepcopy + +from actsnfink.classifier_sigmoid import get_sigmoid_features_dev +from fink_utils.data.utils import format_data_as_snana +from actsnfink.classifier_sigmoid import RF_FEATURE_NAMES + +__all__ = [ + 'concat_val', + 'apply_selection_cuts_ztf', + 'extract_features_rf_snia' +] + + +def concat_val(df, colname: str): + """Concatenate historical and current measurements for 1 alert. + + Parameters + ---------- + df: pd.DataFrame + DataFrame containing data for 1 alert + colname: str + Name of the column to concatenate. + prefix: str + Additional prefix to add to the column name. Default is 'c'. + + Returns + ------- + hist_vals: list + list containing the concatenation of historical and current measurements. + """ + + current_val = [df["candidate"].get(colname)] + + prv = df.get("prv_candidates", None) + + if prv is not None: + hist_vals = [p.get(colname) for p in prv] + else: + hist_vals = [] + + return hist_vals + +def apply_selection_cuts_ztf( + magpsf: pd.Series, + ndethist: pd.Series, + #cdsxmatch: pd.Series, + minpoints: int = 4, + maxndethist: int = 20, +) -> pd.Series: + """Apply selection cuts to keep only alerts of interest for early SN Ia analysis + + Parameters + ---------- + magpsf: pd.Series + Series containing data measurement (array of double). Each row contains + all measurement values for one alert. + ndethist: pd.Series + Series containing length of the alert history (int). + Each row contains the (single) length of the alert. + #cdsxmatch: pd.Series + # Series containing crossmatch label with SIMBAD (str). + # Each row contains one label. + + Returns + ------- + mask: pd.Series + Series containing `True` if the alert is valid, `False` otherwise. + Each row contains one boolean. + """ + # Flag alerts with less than 3 points in total + mask = magpsf.apply(lambda x: np.sum(np.array(x) == np.array(x))) >= minpoints + + # only alerts with less or equal than 20 measurements + mask *= ndethist.astype(int) <= maxndethist + + # reject galactic objects + #list_of_sn_host = return_list_of_eg_host() + #mask *= cdsxmatch.apply(lambda x: x in list_of_sn_host) + + return mask + +def extract_features_rf_snia( + jd, + fid, + magpsf, + sigmapsf, + #cdsxmatch, + ndethist, + min_rising_points=None, + min_data_points=None, + rising_criteria=None, +) -> pd.Series: + """Return the features used by the RF classifier. + + There are 12 features. Order is: + a_g,b_g,c_g,snratio_g,chisq_g,nrise_g, + a_r,b_r,c_r,snratio_r,chisq_r,nrise_r + + Parameters + ---------- + jd: Spark DataFrame Column + JD times (float) + fid: Spark DataFrame Column + Filter IDs (int) + magpsf, sigmapsf: Spark DataFrame Columns + Magnitude from PSF-fit photometry, and 1-sigma error + #cdsxmatch: Spark DataFrame Column + # Type of object found in Simbad (string) + ndethist: Spark DataFrame Column + Column containing the number of detection by ZTF at 3 sigma (int) + min_rising_points, min_data_points: int + Parameters from fink_sn_activelearning.git + rising_criteria: str + How to compute derivatives: ewma (default), or diff. + + Returns + ------- + features: list of str + List of string. + + Examples + -------- + >>> df = pd.read_parquet(ztf_alert_sample) + + # Required alert columns + >>> what = ['jd', 'fid', 'magpsf', 'sigmapsf'] + + # Use for creating temp name + >>> prefix = 'c' + >>> what_prefix = [prefix + i for i in what] + + # Append temp columns with historical + current measurements + for colname in what: + df[prefix + colname] = df.apply(concat_val, args=[colname], axis=1) + + # Expose extra parameter + ndethist = pd.Series([df['candidate'][i]['ndethist'] for i in range(df.shape[0])]) + + # Perform the fit + classification (default model) + >>> features = df.apply(extract_features_rf_snia, axis=1, + ... args=[df['cjd'], df['cfid'], df['cmagpsf'], df['csigmapsf', 'ndethist']]) + + >>> for name in RF_FEATURE_NAMES: + ... index = RF_FEATURE_NAMES.index(name) + ... df[name] = features[:,index] + + # Trigger something + >>> sum(df[RF_FEATURES_NAMES[0]] != 0) == 5 + True + """ + if min_rising_points is None: + min_rising_points = pd.Series([2]) + if min_data_points is None: + min_data_points = pd.Series([4]) + if rising_criteria is None: + rising_criteria = pd.Series(["ewma"]) + + mask = apply_selection_cuts_ztf(magpsf, ndethist) #, cdsxmatch) + + if len(jd[mask]) == 0: + return pd.Series(np.zeros(len(jd), dtype=float)) + + candid = pd.Series(range(len(jd))) + pdf = format_data_as_snana(jd, magpsf, sigmapsf, fid, candid, mask) + + test_features = [] + for id in np.unique(pdf["SNID"]): + pdf_sub = pdf[pdf["SNID"] == id] + features = get_sigmoid_features_dev( + pdf_sub, + min_rising_points=min_rising_points.to_numpy()[0], + min_data_points=min_data_points.to_numpy()[0], + rising_criteria=rising_criteria.to_numpy()[0], + ) + test_features.append(features) + + to_return_features = np.zeros((len(jd), len(RF_FEATURE_NAMES)), dtype=float) + to_return_features[mask] = test_features + + return np.array(to_return_features) + +def main(): + + pre_data_test = '../../data/test_alerts.parquet' + + # read data + data = pd.read_parquet(pre_data_test) + + # Required alert columns + what = ['jd', 'fid', 'magpsf', 'sigmapsf'] + + # Use for creating temp name + prefix = 'c' + what_prefix = [prefix + i for i in what] + + # Append temp columns with historical + current measurements + for colname in what: + data[prefix + colname] = data.apply(concat_val, args=[colname], axis=1) + + # expose feature from outside candidates + ndethist = pd.Series([data['candidate'][i]['ndethist'] for i in range(data.shape[0])]) + + # extract features + features = extract_features_rf_snia(data['cjd'], data['cfid'], + data['cmagpsf'], data['csigmapsf'], ndethist) + + print('Found ', sum(features[:,0] != 0), 'valid features.') + print('Correct answer is 5.') + + + + return None + +if __name__ == '__main__': + main() + diff --git a/actsnfink/scripts/run_loop_mlflow.py b/actsnfink/scripts/run_loop_mlflow.py new file mode 100644 index 0000000..6540318 --- /dev/null +++ b/actsnfink/scripts/run_loop_mlflow.py @@ -0,0 +1,144 @@ +# Copyright 2022 +# Author: Emille Ishida +# +# Licensed under the MIT License; +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://opensource.org/licenses/mit-license.php +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from actsnfink import * + +import os +import pandas as pd + +def main(): + + ################################################################ + + ######### User choices: general ######################### + + create_matrix = False # create raw data file by combining all TNS + a few simbad files + n_files_simbad = 5 # number of simbad files randomly chosen to compose the raw data + + + fname_features_matrix = '../../data/features.csv' # output features file + fname_raw_output = '../../../test_mlflow_2/data/raw.csv.gz' # output raw data file + dirname_input = '../../../data/AL_data/' # input directory with labelled alerts + dirname_output = '../../../test_mlflow_2/' # root products output directory + append_name = '' # append to all metric, prob and queries names + + nloops = 3 # number of learning loops + strategy = 'UncSampling' # query strategy + initial_training = 10 # total number of objs in initial training + frac_Ia_tot = 0.5 # fraction of Ia in initial training + n_realizations = 1 # total number of realizations + n_realizations_ini = 0 # start from this realization number + new_raw_file = False # save raw data in one file + input_raw_file = fname_raw_output # name of raw data file + n = 15000 # number of random simbad objects per file + # to be used as part of the raw data + + mlflow_uri = "https://mlflow-dev.fink-broker.org" # address of mlflow server + mlflow_exp = 'finksnclass_ztf_evaluate' # root name for this experiment run + pre_code_path = 'feature_extraction_ztf.py' + pre_data_path = '../../data/test_alerts.parquet' + + drop_zeros = True # ignore objects with observations in only 1 filter + screen = True # print debug comments to screen + + ##### User choices: For Figure 7 ########################## + + initial_state_from_file = False # read initial state from a fixed file + initial_state_version = 68 # version from which initial state is chosen + + ################################################################ + ################################################################ + + features_names = ['a_g', 'b_g', 'c_g', 'snratio_g', 'mse_g', 'nrise_g', + 'a_r', 'b_r', 'c_r', 'snratio_r', 'mse_r', 'nrise_r'] + + for name in [dirname_output + '/', + dirname_output + '/data/', + dirname_output + '/' + strategy + '/', + dirname_output + '/' + strategy + '/class_prob/', + dirname_output + '/' + strategy + '/metrics/', + dirname_output + '/' + strategy + '/queries/', + dirname_output + '/' + strategy + '/training_samples/', + dirname_output + '/' + strategy + '/test_samples/']: + if not os.path.isdir(name): + os.makedirs(name) + + if create_matrix: + matrix_clean = build_matrix(fname_output=fname_features_matrix, dirname_input=dirname_input, dirname_output=dirname_output + 'data/', + fname_raw_output=fname_raw_output, new_raw_file=new_raw_file, + input_raw_file=input_raw_file,n=n, + n_files_simbad=n_files_simbad, drop_zeros=drop_zeros, screen=screen) + print(np.unique(matrix_clean['type'].values)) + + else: + matrix_clean = pd.read_csv(fname_features_matrix, comment='#') + + if initial_state_from_file: + fname_ini_train = dirname_output + '/UncSampling/training_samples/initialtrain_v' + str(initial_state_version) + '.csv' + fname_ini_test = dirname_output + '/UncSampling/test_samples/initial_test_v' + str(initial_state_version) + '.csv' + + output_metrics_file = dirname_output + '/' + strategy + '/metrics/metrics_' + strategy + '_v' + str(initial_state_version) + append_name + '.dat' + output_queried_file = dirname_output + '/' + strategy + '/queries/queried_' + strategy + '_v'+ str(initial_state_version) + append_name + '.dat' + output_prob_root = dirname_output + '/' + strategy + '/class_prob/v' + str(initial_state_version) + '/class_prob_' + strategy + append_name + + name = dirname_output + '/' + strategy + '/class_prob/v' + str(initial_state_version) + '/' + if not os.path.isdir(name): + os.makedirs(name) + data = read_initial_samples(fname_ini_train, fname_ini_test) + + # perform learnin loop + learn_loop(data, nloops=nloops, strategy=strategy, + output_metrics_file=output_metrics_file, + output_queried_file=output_queried_file, + classifier='RandomForest', seed=None, + batch=1, screen=True, output_prob_root=output_prob_root, + mlflow_uri=mlflow_uri, mlflow_exp=mlflow_exp) + + else: + for v in range(n_realizations_ini, n_realizations): + output_metrics_file = dirname_output + '/' + strategy + '/metrics/metrics_' + strategy + '_v' + str(v) + append_name + '.dat' + output_queried_file = dirname_output + '/' + strategy + '/queries/queried_' + strategy + '_v'+ str(v) + append_name + '.dat' + output_prob_root = dirname_output + '/' + strategy + '/class_prob/v' + str(v) + '/class_prob_' + strategy + append_name + + name = dirname_output + '/' + strategy + '/class_prob/v' + str(v) + '/' + if not os.path.isdir(name): + os.makedirs(name) + #build samples + data = build_samples(matrix_clean, initial_training=initial_training, screen=True) + + # save initial data + train = pd.DataFrame(data.train_features, columns=features_names) + train['objectId'] = data.train_metadata['id'].values + train['type'] = data.train_metadata['type'].values + train.to_csv(dirname_output + '/' + strategy + '/training_samples/initialtrain_v' + str(v) + '.csv', index=False) + + test = pd.DataFrame(data.test_features, columns=features_names) + test['objectId'] = data.test_metadata['id'].values + test['type'] = data.test_metadata['type'].values + test.to_csv(dirname_output + '/' + strategy + '/test_samples/initial_test_v' + str(v) + '.csv', index=False) + + # perform learnin loop + learn_loop(data, nloops=nloops, strategy=strategy, + output_metrics_file=output_metrics_file, + output_queried_file=output_queried_file, + classifier='RandomForest', seed=None, + batch=1, screen=True, output_prob_root=output_prob_root, mlflow_uri=mlflow_uri, + mlflow_exp=mlflow_exp, features_names=features_names, pre_code_path=pre_code_path, + pre_data_path=pre_data_path) + +if __name__ == '__main__': + main() + diff --git a/data/test_alerts.parquet b/data/test_alerts.parquet new file mode 100644 index 0000000..4623c38 Binary files /dev/null and b/data/test_alerts.parquet differ diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..1659841 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,37 @@ +[build-system] +requires = ["setuptools>=77", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "actsnfink" +version = "0.1" +description = "Active Learning for Early Supernova Classification in Fink" +readme = "README.md" +requires-python = ">=3.12" +authors = [{ name = "Emille E. O. Ishida" }] +license = "MIT" +license-files = ["LICENSE*"] +dependencies = [ + "pandas>=1.3.5", + "numpy>=1.21.6", + "actsnclass>=1.3.1", + "scipy>=1.7.3", + "scikit-learn>=1.0.2", + "fink_utils>=0.43.0", + "mlflow>=3.4.0", + "docutils", + "shap>=0.48.0" +] + +[project.scripts] +extract-features-ztf = "actsnfink.scripts.features_extraction_ztf:main" +run-loop = "actsnfink.scripts.run_loop:main" +run-loop-mlflow = "actsnfink.scripts.run_loop_mlflow:main" + +[tool.setuptools.packages.find] +where = ["."] +include = ["actsnfink*"] +exclude = ["build*"] + +[tool.setuptools.package-data] +actsnclass = ["scripts/*.py"] diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 21d3b2a..0000000 --- a/requirements.txt +++ /dev/null @@ -1,5 +0,0 @@ -pandas>=1.3.5 -numpy>=1.21.6 -actsnclass>=1.2 -scipy>=1.7.3 -scikit-learn>=1.0.2 diff --git a/setup.py b/setup.py deleted file mode 100644 index efe7ba7..0000000 --- a/setup.py +++ /dev/null @@ -1,33 +0,0 @@ -# Copyright 2022 -# Author: Emille Ishida -# -# Licensed under the MIT License; -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://opensource.org/licenses/mit-license.php -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import setuptools - -setuptools.setup( - name='actsnfink', - version='0.1', - packages=setuptools.find_packages(), - py_modules=['classifier_sigmoid', - 'early_sn_classifier', - 'rainbow', - 'sigmoid', - 'train_utils'], - scripts=['actsnfink/scripts/run_loop.py'], - url='https://github.com/emilleishida/fink_sn_activelearning', - license='MIT', - author='Emille E. O.Ishida', - author_email='emille.ishida@clermont.in2p3.fr', - description='Fink Early SN classifier using Active Learning' -)