Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 179 additions & 0 deletions sds_data_manager/lambda_code/SDSCode/pipeline_lambdas/abstractions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
"""Common fuctions for pipeline lambdas."""

from dataclasses import dataclass, asdict
from datetime import datetime
from typing import Optional, Dict, Any


@dataclass
class DependencyNode:
"""Shared between batch starter and dependency lambda."""

source: str
data_type: str
product_name: str
reprocessing: bool = False
repoint: Optional[int] = None

def serialize(self) -> Dict[str, Any]:
return asdict(self)

@classmethod
def deserialize(cls, json_object: Dict[str, Any]):
return cls(**json_object)

@dataclass
class UpstreamDependencyNode(DependencyNode):
start_date: datetime
end_date: datetime


class TriggerEventType:
"""Enum for different trigger event types."""
SCIENCE_INGESTION = "science_ingestion"
ANCILLARY_INGESTION = "ancillary_ingestion"
SPICE_INGESTION = "spice_ingestion"
CADENCE = "cadence"
REPROCESSING = "reprocessing"


class ProcessingJobType:
"""Enum for different type of processing jobs passed to batch job."""
DAILY = "daily"
POINTING = "pointing"
CADENCE = "cadence"
POINTING_ATTITUDE = "pointing_attitude"


def calculate_date_range(event_source: TriggerEventType, downstream_node: DependencyNode) -> list[list[datetime, datetime]]:
"""Calculate the date range for the job based on the trigger event type.

This function/class is triggered by different events.
1. Event of a new science or ancillary file arrival from indexer lambda.
Example event:
{
"Records": [
{
"body": '{"detail": '
'{"object": {"key": '
'"imap_swe_l1b-in-flight-cal_20240101_v001.cdf"}}'
"}"
}
]
}
2. Event of a new science reprocessing.
{
"queryStringParameters": {
"reprocessing": True,
"start_date": <>,
"end_date": <>,
"instrument": None, optional,
"data_level": None, optional,
"data_descriptor": None, optional,
}
}
3. Event of a new spice file arrival from spice indexer lambda.
{
“Source”: “imap.lambda”,
“DetailType”: “Processed File”,
“Detail”: {
“object”: {
“key”: “imap/spice/spin/imap_2025_122_2025_122_02.spin.csv”,
}
}
}
or
{
“Source”: “imap.lambda”,
“DetailType”: “Processed File”,
“Detail”: {
“object”: {
“key”: “imap/spice/repoint/imap_2025_122_02.repoint”,
“instrument”: “spacecraft”,
}
}
}

4. Event of bulk reprocessing of science.
Example event:
{
"queryStringParameters": {
"reprocessing": True,
"start_date": <>,
"end_date": <>,
"instrument": None, optional,
"data_level": None, optional,
"data_descriptor": None, optional,
}
}
5. Event of cadence job.
Example event:
{
"cadence": 1mo, 3mo, 1yr, or 6mo,
"start_date": <>,
}
6. Event of reprocessing cadence job.
Example event:
{
"cadence": 1mo, 3mo, 1yr, or 6mo,
"start_date": <>,
"end_date": <>,
"reprocessing": True
}

It can be multiple list of start and end dates depending on the event_type.
"""
# Determine event source type and extract (source, data_type, product_name)
# For current downstream node, calculate date range based on:
# - event_source type (science, ancillary, spice, cadence, reprocessing)
# - downstream node's processing job type (daily, pointing, cadence, pointing_attitude, etc.)
#
# Examples:
# - ancillary event + daily downstream job → list of (start_date, end_date) for each day
# - reprocessing event + daily downstream job → list of (start_date, end_date) for each day in range
# - cadence event + cadence downstream job → single (start_date, end_date) for cadence range
# - reprocessing event + cadence downstream job → one or more (start_date, end_date) ranges
# - science (HI DE) event + L1B goodtimes downstream job → list for 7 nearest repoint files
# - science (ENA/GLOWS) event + pointing downstream job → list for date ranges derived using repoint id of input file
# - reprocessing event + pointing downstream job → list of date ranges derived for each pointing in date range
#
# For each calculated date range, let IMAPJobHandler do these steps in batch_starter_refactor.py:
# - Query dependencies
# - Determine job version
# - Create dependency file
# - Submit job

if event_source == TriggerEventType.SCIENCE_INGESTION:
if downstream_node.data_type in ["ENA", "GLOWS"]:
return calculate_repoint_date_range()
else:
return calculate_daily_date_range()
elif event_source == TriggerEventType.ANCILLARY_INGESTION:
return calculate_ancillary_date_range()
elif event_source == TriggerEventType.SPICE_INGESTION:
return calculate_spice_date_range()
elif event_source == TriggerEventType.CADENCE:
return calculate_cadence_date_range()
elif event_source == TriggerEventType.REPROCESSING:
return calculate_reprocessing_date_range(downstream_node.repoint)
pass

def calculate_daily_date_range():
pass
def calculate_repoint_date_range():
pass
def calculate_ancillary_date_range():
pass
def calculate_cadence_date_range():
pass
def calculate_spice_date_range():
# This includes deriving date for all kernels, spin,
# repoint, thruster and etc.
pass
def calculate_past_n_days_date_range(n: int):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make more sense to have calculate_past_n_days_date_range and calculate_future_n_days_date_range be one function? It seems like they may share a lot of code and some jobs will have future and past.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, this will change quite a bit in the future based on direction and options we land on date range tuple options. I do see having lot of duplicate work there and will consider combining it.

pass
def calculate_future_n_days_date_range(n: int):
pass
def calculate_reprocessing_date_range(repointing: int):
pass

Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from sds_data_manager.lambda_code.SDSCode.pipeline_lambdas.dependency_refactor import (
DependencyResolver,
)
from sds_data_manager.lambda_code.SDSCode.pipeline_lambdas.abstractions import (
DependencyNode,
UpstreamDependencyNode,
)


class IMAPJobHandler:

def __init__(self, dependency_node: UpstreamDependencyNode):
"""Base class for handling managing dependencies call and job kickoff."""
self.dependency_node = dependency_node
self.dependencies = self.get_dependencies(dependency_node)
self.dependency_s3_path = None
if self.dependencies is not None:
job_success = self.submit_job()
if job_success:
self.clean_up()

def get_dependencies(self, dependency_node: UpstreamDependencyNode):
"""Get the dependencies for the job using the DependencyResolver."""
response = DependencyResolver().upstream_discovery(dependency_node)
if response["status"] == 200:
return response["data"]
else:
return None

def _determine_job_version(self):
"""Determining job version will be same for all trigger event types.

Types of job kicked off are science or spacecraft products.
All the trigger event types are only used for how we determine the date
range and dependencies. Once we have those, the rest of the steps are same.
"""
current_job_to_kickoff = self.dependency_node
# determine the version of the job to run based self.dependency_node and
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# determine the version of the job to run based self.dependency_node and
# determine the version of the job to run based on self.dependency_node and

# information available in the database.
# keep what we have in determine_job_version but refactor little bit but
# keep same logic.
pass

def _create_dependencies_file(self):
"""Calculate the CRID and create dependencies file for the job to submit.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these be separated out? I remember wishing that I had modularized the CRID check.

Also there is a difference between the CRID and the dependency hash. Are you talking about the dependency hash here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. Thank you for pointing this out. I totally missed it and we should refactor out CRID as it's own component for easier update in the future. I will add a place holder in this PR soon.


This is done same for all job types.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
This is done same for all job types.
Shared across all job types.

"""
# Remove information not needed for CLI from self.dependency_node
cli_input = self.dependency_node
dependency_file_content = self.dependencies.serialize()
# calculate the CRID for this job from serialized output and write to
# dependency file.
# Then update self.dependency_s3_path with the s3 path of the dependency file.
pass

def submit_job(self):
"""Submit job with input parameters and dependency information.

This is done same for all job types.
"""
self._determine_job_version()
self._create_dependencies_file()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we should have self._check_crid or something along those lines.

# Finally, in this function, submit job to batch job with CLI
# input of self.dependency_s3_path
pass

def clean_up(self):
# clean up any resources or temporary files used during the job.
# Eg. right now, we clean up SQS queue if job is submitted successfully.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe safely closing out any database connections can go in this too.

pass


# Batch Starter Lambda
def lambda_handler(event, context):
# Determine what trigger event type
# Get (source, data_type, product_name) to query for downstream
# Get downstream nodes
# For each downstream node
# calculate date range list based combination of trigger event type and
# the current downstream's processing job type. Eg.
# - ancillary event + daily downstream job → list of (start_date, end_date) for each day
# - reprocessing event + daily downstream job → list of (start_date, end_date) for each day in range
# - cadence event + cadence downstream job → single (start_date, end_date) for cadence range
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think cadence will also be a list of start and end dates. Unless that will be refactored as well. Right now we trigger only based on the cadence e.g. "3mo" and then batch starter will have to trigger the 3mo jobs for ULTRA, HI, and LO.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are planning to add the instrument and descriptor to the cadence refactor then it will be a single job.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(based on in person discussion)
If a regular cadence job we require

  • start-date
  • cadence
  • instrument
  • descriptor

if a reprocessing cadence job

  • start-date
  • end-date
  • cadence
  • instrument
  • descriptor

# - reprocessing event + cadence downstream job → one or more (start_date, end_date) ranges
# - science (HI DE) event + L1B goodtimes downstream job → list for 7 nearest repoint files
# - science (ENA/GLOWS) event + pointing downstream job → list for date ranges derived using repoint id of input file
# - reprocessing event + pointing downstream job → list of date ranges derived for each pointing in date range
# etc.
#
# For each calculated date range, let IMAPJobHandler do these steps in batch_starter_refactor.py:
# - Query dependencies
# - Determine job version
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add something about checking the CRID to this comment.

# - Create dependency file
# - Submit job
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
"""Dependency lambda details

Inputs:
Source
Data_type
Product_name
Start_time: yyyymmddhhmmss
End_time: yyyymmddhhmmss

Responsibilities:
Lookup upstream dependencies
Lookup downstream dependencies
Find all relavant files for upstream dependencies
Determining if it's a complete list.
Scenarios causing imcompleteness:
1. Missing files in the database.
2. Due to event of anamoly. Eg. LOI or TCM or solar wind
3. Due to repoint data delay or downlink delay.
4. If required dependencies missing or job IN PROGRESS.

Functionality:
Look for all dependencies for given inputs and return all the available
dependencies irrespective of if we have all the dependencies or not.
Let user or caller code decide about it.

Return type: dict
Containing all available data for all identified upstream dependents within input date range. Eg.
{
status: 200,
message: "success or missing but we still return all dependencies we found",
data: {
(Upstream details node): {
missing_files: [(source, data_type, descriptor)],
found_files: [list of files],
}
...
}

"""

from sds_data_manager.lambda_code.SDSCode.database import db
from sds_data_manager.lambda_code.SDSCode.pipeline_lambdas.abstractions import (
DependencyConfig,
DependencyNode,
UpstreamDependencyNode,
)

class DependencyResolver():
dependency_config = DependencyConfig()

def downstream_discovery(self, downstream_inputs: DependencyNode):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I liked get_upstream_dependencies and get_downstream_dependencies a little better. Feel free to take or leave.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same. It feels more readable for me as well.


return DependencyNode.serialize()

def upstream_discovery(self, upstream_inputs: UpstreamDependencyNode):

# look up upstream dependency based on input parameters
# and through db queries
return {
"status": 200, # if found otherwise other status code
"message": "", # if found otherwise message of which upstream are missing
"data": ["list of files"] # or empty list or partial list of files found
Copy link
Copy Markdown
Contributor

@lacoak21 lacoak21 Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a serialized processingInputCollection. So users can get dependencies to use for local processing.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great point!

}
def get_science_files(self):
pass

def get_ancillary_files(self):
pass

def get_spin_files(self):
pass

def get_spice_files(self):
pass

def get_repoint_files(self):
pass


def handler(dependency_node: DependencyNode):
resolver = DependencyResolver()
return resolver.upstream_discovery(dependency_node)