Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start adding S3 support for VRTStack #245

Merged
merged 12 commits into from
May 29, 2024
43 changes: 36 additions & 7 deletions src/dolphin/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@
import sys
from enum import Enum
from os import PathLike
from typing import TYPE_CHECKING, NamedTuple, TypeVar, Union
from typing import (
TYPE_CHECKING,
NamedTuple,
Protocol,
TypeVar,
Union,
runtime_checkable,
)

if sys.version_info >= (3, 10):
from typing import ParamSpec
Expand All @@ -22,12 +29,6 @@
PathLikeStr = PathLike


PathOrStr = Union[str, PathLikeStr]
Filename = PathOrStr # May add a deprecation notice for `Filename`
# TypeVar added for generic functions which should return the same type as the input
PathLikeT = TypeVar("PathLikeT", str, PathLikeStr)


class Bbox(NamedTuple):
"""Bounding box named tuple, defining extent in cartesian coordinates.

Expand Down Expand Up @@ -104,3 +105,31 @@ class TropoType(str, Enum):
"""Hydrostatic (same as dry, named differently in raider)"""
COMB = "comb"
"""Combined wet + dry delay."""


@runtime_checkable
class GeneralPath(Protocol):
"""A protocol to handle paths that can be either local or S3 paths."""

def parent(self): ...

def suffix(self): ...

def resolve(self): ...

def exists(self): ...

def read_text(self): ...

def __truediv__(self, other): ...

def __str__(self) -> str: ...

def __fspath__(self) -> str:
return str(self)


PathOrStr = Union[str, PathLikeStr, GeneralPath]
Filename = PathOrStr # May add a deprecation notice for `Filename`
# TypeVar added for generic functions which should return the same type as the input
PathLikeT = TypeVar("PathLikeT", str, PathLikeStr, GeneralPath)
1 change: 1 addition & 0 deletions src/dolphin/io/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from ._background import *
from ._blocks import *
from ._core import *
from ._paths import *
from ._process import *
from ._readers import *
from ._writers import *
197 changes: 197 additions & 0 deletions src/dolphin/io/_paths.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
from __future__ import annotations

import copy
import logging
import re
from pathlib import Path
from typing import Union
from urllib.parse import ParseResult, urlparse

from dolphin._types import GeneralPath

__all__ = ["S3Path"]


logger = logging.getLogger(__name__)


class S3Path(GeneralPath):
"""A convenience class to handle paths on S3.

This class relies on `pathlib.Path` for operations using `urllib` to parse the url.

If passing a url with a trailing slash, the slash will be preserved
when converting back to string.

Note that pure path manipulation functions do *not* require `boto3`,
but functions which interact with S3 (e.g. `exists()`, `.read_text()`) do.

Attributes
----------
bucket : str
Name of bucket in the url
path : pathlib.Path
The URL path after s3://<bucket>/
key : str
Alias of `path` converted to a string

Examples
--------
>>> from dolphin.io import S3Path
>>> s3_path = S3Path("s3://bucket/path/to/file.txt")
>>> str(s3_path)
's3://bucket/path/to/file.txt'
>>> s3_path.parent
S3Path("s3://bucket/path/to/")
>>> str(s3_path.parent)
's3://bucket/path/to/'

"""

def __init__(self, s3_url: Union[str, "S3Path"], unsigned: bool = False):
"""Create an S3Path.

Parameters
----------
s3_url : str or S3Path
The S3 url to parse.
unsigned : bool, optional
If True, disable signing requests to S3.

"""
# Names come from the urllib.parse.ParseResult
if isinstance(s3_url, S3Path):
self._scheme: str = s3_url._scheme
self._netloc: str = s3_url._netloc
self.bucket: str = s3_url.bucket
self.path: Path = s3_url.path
self._trailing_slash: str = s3_url._trailing_slash
else:
parsed: ParseResult = urlparse(s3_url)
self._scheme = parsed.scheme
self._netloc = self.bucket = parsed.netloc
self._parsed = parsed
self.path = Path(parsed.path)
self._trailing_slash = "/" if s3_url.endswith("/") else ""

if self._scheme != "s3":
raise ValueError(f"{s3_url} is not an S3 url")

self._unsigned = unsigned

@classmethod
def from_bucket_key(cls, bucket: str, key: str):
"""Create a `S3Path` from the bucket name and key/prefix.

Matches API of some Boto3 functions which use this format.

Parameters
----------
bucket : str
Name of S3 bucket.
key : str
S3 url of path after the bucket.

"""
return cls(f"s3://{bucket}/{key}")

def get_path(self):
# For S3 paths, we need to add the double slash and netloc back to the front
return f"{self._scheme}://{self._netloc}{self.path.as_posix()}{self._trailing_slash}"

@property
def key(self) -> str:
"""Name of key/prefix within the bucket with leading slash removed."""
return f"{str(self.path.as_posix()).lstrip('/')}{self._trailing_slash}"

@property
def parent(self):
parent_path = self.path.parent
# Since this is a parent, it will will always end in a slash
if self._scheme == "s3":
# For S3 paths, we need to add the scheme and netloc back to the front
return S3Path(f"{self._scheme}://{self._netloc}{parent_path.as_posix()}/")
else:
# For local paths, we can just convert the path to a string
return S3Path(str(parent_path) + "/")

@property
def suffix(self):
return self.path.suffix

def resolve(self) -> S3Path:
"""Resolve the path to an absolute path- S3 paths are always absolute."""
return self

def _get_client(self):
import boto3
from botocore import UNSIGNED
from botocore.config import Config

if self._unsigned:
return boto3.client("s3", config=Config(signature_version=UNSIGNED))
else:
return boto3.client("s3")

def exists(self) -> bool:
"""Whether this path exists on S3."""
client = self._get_client()
resp = client.list_objects_v2(
Bucket=self.bucket,
Prefix=self.key,
MaxKeys=1,
)
return resp.get("KeyCount") == 1

def read_text(self) -> str:
"""Download/read the S3 file as text."""
return self._download_as_bytes().decode()

def read_bytes(self) -> bytes:
"""Download/read the S3 file as bytes."""
return self._download_as_bytes()

def _download_as_bytes(self) -> bytes:
"""Download file to a `BytesIO` buffer to read as bytes."""
from io import BytesIO

client = self._get_client()

bio = BytesIO()
client.download_fileobj(self.bucket, self.key, bio)
bio.seek(0)
out = bio.read()
bio.close()
return out

def __truediv__(self, other):
new = copy.deepcopy(self)
new.path = self.path / other
new._trailing_slash = "/" if str(other).endswith("/") else ""
return new

def __eq__(self, other):
if isinstance(other, S3Path):
return self.get_path() == other.get_path()
elif isinstance(other, str):
return self.get_path() == other
else:
return False

def __repr__(self):
return f'S3Path("{self.get_path()}")'

def __str__(self):
return self.get_path()

def to_gdal(self):
"""Convert this S3Path to a GDAL URL."""
return f"/vsis3/{self.bucket}/{self.key}"


def fix_s3_url(url):
"""Fix an S3 URL that has been altered by pathlib.

Will replace s3:/my-bucket/... with s3://my-bucket/...
"""
return re.sub(r"s3:/((?!/).*)", r"s3://\1", str(url))
16 changes: 13 additions & 3 deletions src/dolphin/io/_readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from dolphin.io._blocks import iter_blocks

from ._background import _DEFAULT_TIMEOUT, BackgroundReader
from ._paths import S3Path
from ._utils import _ensure_slices, _unpack_3d_slices

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -715,7 +716,10 @@ def __init__(

# files: list[Filename] = [Path(f) for f in file_list]
self._use_abs_path = use_abs_path
if use_abs_path:
files: list[Filename | S3Path]
if any(str(f).startswith("s3://") for f in file_list):
files = [S3Path(str(f)) for f in file_list]
elif use_abs_path:
files = [utils._resolve_gdal_path(p) for p in file_list]
else:
files = list(file_list)
Expand Down Expand Up @@ -808,12 +812,18 @@ def _write(self):
ds = None

@property
def _gdal_file_strings(self):
def _gdal_file_strings(self) -> list[str]:
"""Get the GDAL-compatible paths to write to the VRT.

If we're not using .h5 or .nc, this will just be the file_list as is.
"""
return [io.format_nc_filename(f, self.subdataset) for f in self.file_list]
out = []
for f in self.file_list:
if isinstance(f, S3Path):
out.append(f.to_gdal())
else:
out.append(io.format_nc_filename(f, self.subdataset))
return out

def __fspath__(self):
# Allows os.fspath() to work on the object, enabling rasterio.open()
Expand Down
12 changes: 12 additions & 0 deletions src/dolphin/stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ class BaseStack(BaseModel):
description="Index of the SLC to use as reference during phase linking",
)

model_config = {
# For the `Filename, so it can handle the `GeneralPath` protocol`
# https://github.com/pydantic/pydantic/discussions/5767
"arbitrary_types_allowed": True
}

@field_validator("dates", mode="before")
@classmethod
def _check_if_not_tuples(cls, v):
Expand Down Expand Up @@ -190,6 +196,12 @@ class CompressedSlcInfo(BaseModel):
description="Folder/location where ministack will write outputs to.",
)

model_config = {
# For the `Filename, so it can handle the `GeneralPath` protocol`
# https://github.com/pydantic/pydantic/discussions/5767
"arbitrary_types_allowed": True
}

@field_validator("real_slc_dates", mode="before")
@classmethod
def _untuple_dates(cls, v):
Expand Down
Loading