Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to run displacement workflow with CSLCs on S3 #297

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/dolphin/io/_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,13 @@ def __init__(self, s3_url: Union[str, "S3Path"], unsigned: bool = False):
self.path: Path = s3_url.path
self._trailing_slash: str = s3_url._trailing_slash
else:
parsed: ParseResult = urlparse(s3_url)
s = str(s3_url).strip()
parsed: ParseResult = urlparse(str(s))
self._scheme = parsed.scheme
self._netloc = self.bucket = parsed.netloc
self._parsed = parsed
self.path = Path(parsed.path)
self._trailing_slash = "/" if s3_url.endswith("/") else ""
self._trailing_slash = "/" if s.endswith("/") else ""

if self._scheme != "s3":
raise ValueError(f"{s3_url} is not an S3 url")
Expand Down Expand Up @@ -119,7 +120,7 @@ def parent(self):
def suffix(self):
return self.path.suffix

def resolve(self) -> S3Path:
def resolve(self, strict: bool = False) -> S3Path:
"""Resolve the path to an absolute path- S3 paths are always absolute."""
return self

Expand Down
10 changes: 8 additions & 2 deletions src/dolphin/workflows/config/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from dolphin import __version__ as _dolphin_version
from dolphin._log import get_log
from dolphin._types import Bbox
from dolphin.io import DEFAULT_HDF5_OPTIONS, DEFAULT_TIFF_OPTIONS
from dolphin.io import DEFAULT_HDF5_OPTIONS, DEFAULT_TIFF_OPTIONS, S3Path

from ._enums import ShpMethod, UnwrapMethod
from ._yaml_model import YamlModel
Expand Down Expand Up @@ -518,4 +518,10 @@ def _read_file_list_or_glob(cls, value): # noqa: ARG001:
msg = f"Input file list {v_path} does not exist or is not a file."
raise ValueError(msg)

return list(value)
out = []
for v in list(value):
try:
out.append(S3Path(v))
except ValueError:
out.append(Path(v))
return out
35 changes: 28 additions & 7 deletions src/dolphin/workflows/config/_displacement.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@
BaseModel,
ConfigDict,
Field,
PlainSerializer,
StringConstraints,
WithJsonSchema,
field_validator,
model_validator,
)

from dolphin._log import get_log
from dolphin._types import TropoModel, TropoType
from dolphin._types import GeneralPath, TropoModel, TropoType
from dolphin.io import S3Path

from ._common import (
InputOptions,
Expand Down Expand Up @@ -98,18 +101,30 @@ def _to_empty_list(cls, v):
return v if v is not None else []


CslcFileList = Annotated[
# Any Path-like object is acceptable
list[GeneralPath],
# All Paths will be serialized to strings:
PlainSerializer(lambda x: [str(f) for f in x]),
# Let Pydantic know what the JSON Schema should be for this custom protocol:
# https://docs.pydantic.dev/latest/concepts/json_schema/#withjsonschema-annotation
WithJsonSchema({"type": "array", "items": {"type": "string", "format": "uri"}}),
]


class DisplacementWorkflow(WorkflowBase):
"""Configuration for the workflow."""

# Paths to input/output files
input_options: InputOptions = Field(default_factory=InputOptions)
cslc_file_list: list[Path] = Field(
cslc_file_list: CslcFileList = Field(
default_factory=list,
description=(
"list of CSLC files, or newline-delimited file "
"containing list of CSLC files."
),
)

output_options: OutputOptions = Field(default_factory=OutputOptions)

# Options for each step in the workflow
Expand Down Expand Up @@ -140,7 +155,9 @@ class DisplacementWorkflow(WorkflowBase):
# internal helpers
# Stores the list of directories to be created by the workflow
model_config = ConfigDict(
extra="allow", json_schema_extra={"required": ["cslc_file_list"]}
extra="allow",
json_schema_extra={"required": ["cslc_file_list"]},
arbitrary_types_allowed=True,
)

# validators
Expand All @@ -159,7 +176,7 @@ def _check_input_files_exist(self) -> DisplacementWorkflow:
input_options = self.input_options
date_fmt = input_options.cslc_date_fmt
# Filter out files that don't have dates in the filename
files_matching_date = [Path(f) for f in file_list if get_dates(f, fmt=date_fmt)]
files_matching_date = [f for f in file_list if get_dates(f, fmt=date_fmt)]
if len(files_matching_date) < len(file_list):
msg = (
f"Found {len(files_matching_date)} files with dates like {date_fmt} in"
Expand All @@ -176,9 +193,13 @@ def _check_input_files_exist(self) -> DisplacementWorkflow:
raise ValueError(msg)

# Coerce the file_list to a sorted list of Path objects
self.cslc_file_list = [
Path(f) for f in sort_files_by_date(file_list, file_date_fmt=date_fmt)[0]
]
out: list[S3Path | Path] = []
for f in sort_files_by_date(file_list, file_date_fmt=date_fmt)[0]:
try:
out.append(S3Path(f))
except ValueError:
out.append(Path(f))
self.cslc_file_list = out

return self

Expand Down