diff --git a/sds_data_manager/lambda_code/SDSCode/pipeline_lambdas/abstractions.py b/sds_data_manager/lambda_code/SDSCode/pipeline_lambdas/abstractions.py new file mode 100644 index 000000000..29a034934 --- /dev/null +++ b/sds_data_manager/lambda_code/SDSCode/pipeline_lambdas/abstractions.py @@ -0,0 +1,179 @@ +"""Common fuctions for pipeline lambdas.""" + +from dataclasses import dataclass, asdict +from datetime import datetime +from typing import Optional, Dict, Any + + +@dataclass +class DependencyNode: + """Shared between batch starter and dependency lambda.""" + + source: str + data_type: str + product_name: str + reprocessing: bool = False + repoint: Optional[int] = None + + def serialize(self) -> Dict[str, Any]: + return asdict(self) + + @classmethod + def deserialize(cls, json_object: Dict[str, Any]): + return cls(**json_object) + +@dataclass +class UpstreamDependencyNode(DependencyNode): + start_date: datetime + end_date: datetime + + +class TriggerEventType: + """Enum for different trigger event types.""" + SCIENCE_INGESTION = "science_ingestion" + ANCILLARY_INGESTION = "ancillary_ingestion" + SPICE_INGESTION = "spice_ingestion" + CADENCE = "cadence" + REPROCESSING = "reprocessing" + + +class ProcessingJobType: + """Enum for different type of processing jobs passed to batch job.""" + DAILY = "daily" + POINTING = "pointing" + CADENCE = "cadence" + POINTING_ATTITUDE = "pointing_attitude" + + +def calculate_date_range(event_source: TriggerEventType, downstream_node: DependencyNode) -> list[list[datetime, datetime]]: + """Calculate the date range for the job based on the trigger event type. + + This function/class is triggered by different events. + 1. Event of a new science or ancillary file arrival from indexer lambda. + Example event: + { + "Records": [ + { + "body": '{"detail": ' + '{"object": {"key": ' + '"imap_swe_l1b-in-flight-cal_20240101_v001.cdf"}}' + "}" + } + ] + } + 2. Event of a new science reprocessing. + { + "queryStringParameters": { + "reprocessing": True, + "start_date": <>, + "end_date": <>, + "instrument": None, optional, + "data_level": None, optional, + "data_descriptor": None, optional, + } + } + 3. Event of a new spice file arrival from spice indexer lambda. + { + “Source”: “imap.lambda”, + “DetailType”: “Processed File”, + “Detail”: { + “object”: { + “key”: “imap/spice/spin/imap_2025_122_2025_122_02.spin.csv”, + } + } + } + or + { + “Source”: “imap.lambda”, + “DetailType”: “Processed File”, + “Detail”: { + “object”: { + “key”: “imap/spice/repoint/imap_2025_122_02.repoint”, + “instrument”: “spacecraft”, + } + } + } + + 4. Event of bulk reprocessing of science. + Example event: + { + "queryStringParameters": { + "reprocessing": True, + "start_date": <>, + "end_date": <>, + "instrument": None, optional, + "data_level": None, optional, + "data_descriptor": None, optional, + } + } + 5. Event of cadence job. + Example event: + { + "cadence": 1mo, 3mo, 1yr, or 6mo, + "start_date": <>, + } + 6. Event of reprocessing cadence job. + Example event: + { + "cadence": 1mo, 3mo, 1yr, or 6mo, + "start_date": <>, + "end_date": <>, + "reprocessing": True + } + + It can be multiple list of start and end dates depending on the event_type. + """ + # Determine event source type and extract (source, data_type, product_name) + # For current downstream node, calculate date range based on: + # - event_source type (science, ancillary, spice, cadence, reprocessing) + # - downstream node's processing job type (daily, pointing, cadence, pointing_attitude, etc.) + # + # Examples: + # - ancillary event + daily downstream job → list of (start_date, end_date) for each day + # - reprocessing event + daily downstream job → list of (start_date, end_date) for each day in range + # - cadence event + cadence downstream job → single (start_date, end_date) for cadence range + # - reprocessing event + cadence downstream job → one or more (start_date, end_date) ranges + # - science (HI DE) event + L1B goodtimes downstream job → list for 7 nearest repoint files + # - science (ENA/GLOWS) event + pointing downstream job → list for date ranges derived using repoint id of input file + # - reprocessing event + pointing downstream job → list of date ranges derived for each pointing in date range + # + # For each calculated date range, let IMAPJobHandler do these steps in batch_starter_refactor.py: + # - Query dependencies + # - Determine job version + # - Create dependency file + # - Submit job + + if event_source == TriggerEventType.SCIENCE_INGESTION: + if downstream_node.data_type in ["ENA", "GLOWS"]: + return calculate_repoint_date_range() + else: + return calculate_daily_date_range() + elif event_source == TriggerEventType.ANCILLARY_INGESTION: + return calculate_ancillary_date_range() + elif event_source == TriggerEventType.SPICE_INGESTION: + return calculate_spice_date_range() + elif event_source == TriggerEventType.CADENCE: + return calculate_cadence_date_range() + elif event_source == TriggerEventType.REPROCESSING: + return calculate_reprocessing_date_range(downstream_node.repoint) + pass + +def calculate_daily_date_range(): + pass +def calculate_repoint_date_range(): + pass +def calculate_ancillary_date_range(): + pass +def calculate_cadence_date_range(): + pass +def calculate_spice_date_range(): + # This includes deriving date for all kernels, spin, + # repoint, thruster and etc. + pass +def calculate_past_n_days_date_range(n: int): + pass +def calculate_future_n_days_date_range(n: int): + pass +def calculate_reprocessing_date_range(repointing: int): + pass + diff --git a/sds_data_manager/lambda_code/SDSCode/pipeline_lambdas/batch_starter_refactor.py b/sds_data_manager/lambda_code/SDSCode/pipeline_lambdas/batch_starter_refactor.py new file mode 100644 index 000000000..ce6d4cac1 --- /dev/null +++ b/sds_data_manager/lambda_code/SDSCode/pipeline_lambdas/batch_starter_refactor.py @@ -0,0 +1,96 @@ +from sds_data_manager.lambda_code.SDSCode.pipeline_lambdas.dependency_refactor import ( + DependencyResolver, +) +from sds_data_manager.lambda_code.SDSCode.pipeline_lambdas.abstractions import ( + DependencyNode, + UpstreamDependencyNode, +) + + +class IMAPJobHandler: + + def __init__(self, dependency_node: UpstreamDependencyNode): + """Base class for handling managing dependencies call and job kickoff.""" + self.dependency_node = dependency_node + self.dependencies = self.get_dependencies(dependency_node) + self.dependency_s3_path = None + if self.dependencies is not None: + job_success = self.submit_job() + if job_success: + self.clean_up() + + def get_dependencies(self, dependency_node: UpstreamDependencyNode): + """Get the dependencies for the job using the DependencyResolver.""" + response = DependencyResolver().upstream_discovery(dependency_node) + if response["status"] == 200: + return response["data"] + else: + return None + + def _determine_job_version(self): + """Determining job version will be same for all trigger event types. + + Types of job kicked off are science or spacecraft products. + All the trigger event types are only used for how we determine the date + range and dependencies. Once we have those, the rest of the steps are same. + """ + current_job_to_kickoff = self.dependency_node + # determine the version of the job to run based self.dependency_node and + # information available in the database. + # keep what we have in determine_job_version but refactor little bit but + # keep same logic. + pass + + def _create_dependencies_file(self): + """Calculate the CRID and create dependencies file for the job to submit. + + This is done same for all job types. + """ + # Remove information not needed for CLI from self.dependency_node + cli_input = self.dependency_node + dependency_file_content = self.dependencies.serialize() + # calculate the CRID for this job from serialized output and write to + # dependency file. + # Then update self.dependency_s3_path with the s3 path of the dependency file. + pass + + def submit_job(self): + """Submit job with input parameters and dependency information. + + This is done same for all job types. + """ + self._determine_job_version() + self._create_dependencies_file() + # Finally, in this function, submit job to batch job with CLI + # input of self.dependency_s3_path + pass + + def clean_up(self): + # clean up any resources or temporary files used during the job. + # Eg. right now, we clean up SQS queue if job is submitted successfully. + pass + + +# Batch Starter Lambda +def lambda_handler(event, context): + # Determine what trigger event type + # Get (source, data_type, product_name) to query for downstream + # Get downstream nodes + # For each downstream node + # calculate date range list based combination of trigger event type and + # the current downstream's processing job type. Eg. + # - ancillary event + daily downstream job → list of (start_date, end_date) for each day + # - reprocessing event + daily downstream job → list of (start_date, end_date) for each day in range + # - cadence event + cadence downstream job → single (start_date, end_date) for cadence range + # - reprocessing event + cadence downstream job → one or more (start_date, end_date) ranges + # - science (HI DE) event + L1B goodtimes downstream job → list for 7 nearest repoint files + # - science (ENA/GLOWS) event + pointing downstream job → list for date ranges derived using repoint id of input file + # - reprocessing event + pointing downstream job → list of date ranges derived for each pointing in date range + # etc. + # + # For each calculated date range, let IMAPJobHandler do these steps in batch_starter_refactor.py: + # - Query dependencies + # - Determine job version + # - Create dependency file + # - Submit job + pass diff --git a/sds_data_manager/lambda_code/SDSCode/pipeline_lambdas/dependency_refactor.py b/sds_data_manager/lambda_code/SDSCode/pipeline_lambdas/dependency_refactor.py new file mode 100644 index 000000000..4c3f8192b --- /dev/null +++ b/sds_data_manager/lambda_code/SDSCode/pipeline_lambdas/dependency_refactor.py @@ -0,0 +1,82 @@ +"""Dependency lambda details + +Inputs: + Source + Data_type + Product_name + Start_time: yyyymmddhhmmss + End_time: yyyymmddhhmmss + +Responsibilities: + Lookup upstream dependencies + Lookup downstream dependencies + Find all relavant files for upstream dependencies + Determining if it's a complete list. + Scenarios causing imcompleteness: + 1. Missing files in the database. + 2. Due to event of anamoly. Eg. LOI or TCM or solar wind + 3. Due to repoint data delay or downlink delay. + 4. If required dependencies missing or job IN PROGRESS. + +Functionality: +Look for all dependencies for given inputs and return all the available +dependencies irrespective of if we have all the dependencies or not. +Let user or caller code decide about it. + +Return type: dict +Containing all available data for all identified upstream dependents within input date range. Eg. +{ + status: 200, + message: "success or missing but we still return all dependencies we found", + data: { + (Upstream details node): { + missing_files: [(source, data_type, descriptor)], + found_files: [list of files], + } + ... +} + +""" + +from sds_data_manager.lambda_code.SDSCode.database import db +from sds_data_manager.lambda_code.SDSCode.pipeline_lambdas.abstractions import ( + DependencyConfig, + DependencyNode, + UpstreamDependencyNode, +) + +class DependencyResolver(): + dependency_config = DependencyConfig() + + def downstream_discovery(self, downstream_inputs: DependencyNode): + + return DependencyNode.serialize() + + def upstream_discovery(self, upstream_inputs: UpstreamDependencyNode): + + # look up upstream dependency based on input parameters + # and through db queries + return { + "status": 200, # if found otherwise other status code + "message": "", # if found otherwise message of which upstream are missing + "data": ["list of files"] # or empty list or partial list of files found + } + def get_science_files(self): + pass + + def get_ancillary_files(self): + pass + + def get_spin_files(self): + pass + + def get_spice_files(self): + pass + + def get_repoint_files(self): + pass + + +def handler(dependency_node: DependencyNode): + resolver = DependencyResolver() + return resolver.upstream_discovery(dependency_node)