Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added build/lib/bowser/__init__.py
Empty file.
169 changes: 169 additions & 0 deletions build/lib/bowser/_prepare_disp_s1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
from pathlib import Path

from .titiler import Algorithm

CORE_DATASETS = [
"displacement",
"short_wavelength_displacement",
"recommended_mask",
"connected_component_labels",
"temporal_coherence",
"estimated_phase_quality",
"persistent_scatterer_mask",
"shp_counts",
"water_mask",
"phase_similarity",
"timeseries_inversion_residuals",
]
CORRECTION_DATASETS = [
"corrections/ionospheric_delay",
"corrections/perpendicular_baseline",
"corrections/solid_earth_tide",
]


def get_disp_s1_outputs(disp_s1_dir: Path | str):
def _glob(pattern: str, subdir: str) -> list[str]:
return [str(p) for p in sorted((Path(disp_s1_dir) / subdir).glob(pattern))]

return [
{
"name": "Displacement",
"file_list": _glob("*.vrt", subdir="displacement"),
"uses_spatial_ref": True,
"algorithm": Algorithm.SHIFT.value,
"mask_file_list": _glob("*.vrt", subdir="connected_component_labels"),
},
{
"name": "Short Wavelength Displacement",
"file_list": _glob("*.vrt", subdir="short_wavelength_displacement"),
},
{
"name": "Connected Component Labels",
"file_list": _glob("*.vrt", subdir="connected_component_labels"),
},
{
"name": "Re-wrapped phase",
"file_list": _glob("*.vrt", subdir="displacement"),
"algorithm": Algorithm.REWRAP.value,
},
{
"name": "Persistent Scatterer Mask",
"file_list": _glob("*.vrt", subdir="persistent_scatterer_mask"),
},
{
"name": "Temporal Coherence",
"file_list": _glob("*.vrt", subdir="temporal_coherence"),
},
{
"name": "Phase Similarity",
"file_list": _glob("*.vrt", subdir="phase_similarity"),
},
{
"name": "Timeseries Inversion Residuals",
"file_list": _glob("*.vrt", subdir="timeseries_inversion_residuals"),
},
{
"name": "Estimated Phase quality",
"file_list": _glob("*.vrt", subdir="estimated_phase_quality"),
},
{
"name": "SHP counts",
"file_list": _glob("*.vrt", subdir="shp_counts"),
},
{
"name": "Water Mask",
"file_list": _glob("*.vrt", subdir="water_mask"),
},
{
"name": "Unwrapper Mask",
"file_list": _glob("*.vrt", subdir="unwrapper_mask"),
},
{
"name": "Ionospheric Delay",
"file_list": _glob("*vrt", subdir="corrections/ionospheric_delay"),
"uses_spatial_ref": True,
"algorithm": Algorithm.SHIFT.value,
},
{
"name": "Perpendicular Baseline",
"file_list": _glob("*vrt", subdir="corrections/perpendicular_baseline"),
},
{
"name": "Solid Earth Tide",
"file_list": _glob("*vrt", subdir="corrections/solid_earth_tide"),
"uses_spatial_ref": True,
"algorithm": Algorithm.SHIFT.value,
},
]


def get_aligned_disp_s1_outputs(aligned_dir: Path | str):
from glob import glob

return [
{
"name": "Displacement",
"file_list": sorted(glob(str(Path(aligned_dir) / "displacement*.tif"))),
"uses_spatial_ref": True,
"algorithm": Algorithm.SHIFT.value,
"mask_file_list": sorted(
glob(str(Path(aligned_dir) / "recommended_mask*.tif"))
),
},
{
"name": "Short Wavelength Displacement",
"file_list": sorted(
glob(str(Path(aligned_dir) / "short_wavelength_displacement*.tif"))
),
},
{
"name": "Connected Component Labels",
"file_list": sorted(
glob(str(Path(aligned_dir) / "connected_component_labels*.tif"))
),
},
{
"name": "Re-wrapped phase",
"file_list": sorted(glob(str(Path(aligned_dir) / "displacement*.tif"))),
"algorithm": Algorithm.REWRAP.value,
},
{
"name": "Persistent Scatterer Mask",
"file_list": sorted(
glob(str(Path(aligned_dir) / "persistent_scatterer_mask*.tif"))
),
},
{
"name": "Temporal Coherence",
"file_list": sorted(
glob((str(Path(aligned_dir) / "temporal_coherence*.tif")))
),
},
{
"name": "Phase Similarity",
"file_list": sorted(
glob((str(Path(aligned_dir) / "phase_similarity*.tif")))
),
},
{
"name": "Timeseries Inversion Residuals",
"file_list": sorted(
glob(str(Path(aligned_dir) / "timeseries_inversion_residuals*.tif"))
),
},
{
"name": "Estimated Phase quality",
"file_list": sorted(
glob(str(Path(aligned_dir) / "estimated_phase_quality*.tif"))
),
},
{
"name": "SHP counts",
"file_list": sorted(glob((str(Path(aligned_dir) / "shp_counts*.tif")))),
},
{
"name": "Water Mask",
"file_list": sorted(glob((str(Path(aligned_dir) / "water_mask.tif")))),
},
]
81 changes: 81 additions & 0 deletions build/lib/bowser/_prepare_nisar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from pathlib import Path

from .titiler import Algorithm

# Define NISAR GUNW datasets to extract
NISAR_BASE_PATH = "/science/LSAR/GUNW/grids"
NISAR_FREQ_A_PATH = f"{NISAR_BASE_PATH}/frequencyA"
NISAR_GUNW_A_HH_PATH = f"{NISAR_FREQ_A_PATH}/unwrappedInterferogram/HH"
NISAR_IFGW_A_HH_PATH = f"{NISAR_FREQ_A_PATH}/wrappedInterferogram/HH"
NISAR_GOFF_A_HH_PATH = f"{NISAR_FREQ_A_PATH}/pixelOffsets/HH"
NISAR_GUNW_DATASETS = [
f"{NISAR_GUNW_A_HH_PATH}/unwrappedPhase",
f"{NISAR_GUNW_A_HH_PATH}/coherenceMagnitude",
f"{NISAR_GUNW_A_HH_PATH}/connectedComponents",
f"{NISAR_GUNW_A_HH_PATH}/ionospherePhaseScreen",
f"{NISAR_GUNW_A_HH_PATH}/ionospherePhaseScreenUncertainty",
f"{NISAR_IFGW_A_HH_PATH}/wrappedInterferogram",
f"{NISAR_IFGW_A_HH_PATH}/coherenceMagnitude",
f"{NISAR_GOFF_A_HH_PATH}/alongTrackOffset",
f"{NISAR_GOFF_A_HH_PATH}/slantRangeOffset",
f"{NISAR_GOFF_A_HH_PATH}/correlationSurfacePeak",
]


def get_nisar_outputs(nisar_dir: Path | str):
def _glob(pattern: str, subdir: str) -> list[str]:
return [str(p) for p in sorted((Path(nisar_dir) / subdir).glob(pattern))]

return [
{
"name": "Unwrapped Phase",
"file_list": _glob("*.vrt", subdir="unwrappedPhase"),
"uses_spatial_ref": True,
"algorithm": Algorithm.SHIFT.value,
"mask_file_list": _glob("*.vrt", subdir="connectedComponents"),
},
{
"name": "Coherence Magnitude",
"file_list": _glob("*.vrt", subdir="coherenceMagnitude"),
},
{
"name": "Connected Components",
"file_list": _glob("*.vrt", subdir="connectedComponents"),
},
{
"name": "Ionosphere Phase Screen",
"file_list": _glob("*.vrt", subdir="ionospherePhaseScreen"),
"uses_spatial_ref": True,
"algorithm": Algorithm.SHIFT.value,
},
{
"name": "Ionosphere Phase Uncertainty",
"file_list": _glob("*.vrt", subdir="ionospherePhaseScreenUncertainty"),
},
{
"name": "Wrapped Interferogram",
"file_list": _glob("*.vrt", subdir="wrappedInterferogram"),
"algorithm": Algorithm.PHASE.value,
},
{
"name": "Wrapped Coherence",
"file_list": _glob("*.vrt", subdir="coherenceMagnitude"),
},
{
"name": "Re-wrapped Phase",
"file_list": _glob("*.vrt", subdir="unwrappedPhase"),
"algorithm": Algorithm.REWRAP.value,
},
{
"name": "Along-Track Offset",
"file_list": _glob("*.vrt", subdir="alongTrackOffset"),
},
{
"name": "Slant Range Offset",
"file_list": _glob("*.vrt", subdir="slantRangeOffset"),
},
{
"name": "Correlation Surface Peak",
"file_list": _glob("*.vrt", subdir="correlationSurfacePeak"),
},
]
144 changes: 144 additions & 0 deletions build/lib/bowser/_prepare_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import logging
from functools import partial
from pathlib import Path
from typing import Sequence

import h5py
from opera_utils import get_dates
from opera_utils.credentials import ASFCredentialEndpoints, AWSCredentials
from opera_utils.disp import open_h5
from osgeo import gdal
from tqdm.contrib.concurrent import process_map

from bowser.add_overviews import add_overviews

logger = logging.getLogger("bowser")


def process_netcdf_files(
netcdf_files: Sequence[Path | str],
output_dir: Path,
datasets: list[str],
max_workers: int = 5,
strip_group_path: bool = False,
) -> None:
"""Process NetCDF files in the input directory, create VRT files, build overviews.

Parameters
----------
netcdf_files : Sequence[Path]
Paths to input NetCDF files.
output_dir : Path
Path to the directory where output VRT files will be saved.
datasets : list[str]
list of dataset names to process from each NetCDF file.
max_workers : int
Number of parallel files to process.
Default is 5.
strip_group_path : bool
If True, the output directory for the VRTs is only one level deep.
Otherwise, uses the full HDF5 path as VRT path.
Default is False.

Returns
-------
None

Notes
-----
This function processes all NetCDF files in the input directory, creates VRT files
for each specified dataset, and builds overviews for the created VRT files.

"""
# Ensure output directory exists
out_path = Path(output_dir)
out_path.mkdir(exist_ok=True, parents=True)

if any(str(f).startswith("s3://") for f in netcdf_files):
aws_credentials = AWSCredentials.from_asf(endpoint=ASFCredentialEndpoints.OPERA)
else:
aws_credentials = None

func = partial(
process_single_file,
output_dir=output_dir,
datasets=datasets,
aws_credentials=aws_credentials,
strip_group_path=strip_group_path,
)

process_map(
func,
netcdf_files,
max_workers=max_workers,
)


def process_single_file(
netcdf_file: str,
output_dir: Path,
datasets: list[str],
aws_credentials: AWSCredentials | None,
strip_group_path: bool = False,
) -> None:
"""Create VRT files from subdatasets and build overviews.

Parameters
----------
netcdf_file : str
Path to the input NetCDF file.
output_dir : Path
Path to the directory where output VRT files will be saved.
datasets : list[str]
list of dataset names to process from the NetCDF file.
aws_credentials : AWSCredentials, optional
Object containing temporary S3 credentials for remote datasets.
Only usable on in-region EC2 instances.
strip_group_path : bool
If True, the output directory for the VRTs is only one level deep.
Otherwise, uses the full HDF5 path as VRT path.
Default is False.

Returns
-------
None

"""
# Extract date information from the filename

try:
fmt = "%Y%m%d"
# TODO: NISAR ifgs may have 4 dates in them
# Rethink how to make this filename here for multiple products
dates = get_dates(netcdf_file, fmt=fmt)[:2]
vrt_filename = f"{dates[0].strftime(fmt)}_{dates[1].strftime(fmt)}.vrt"
except IndexError:
# Date parsing failed: just use stem
# TODO: NISAR holds ref/secondary as
# /science/LSAR/identification/secondaryZeroDopplerEndTime
vrt_filename = f"{str(netcdf_file).replace('/', '_')}.vrt"

if str(netcdf_file).startswith("s3://"):
hf = open_h5(netcdf_file, aws_credentials=aws_credentials)
logger.debug(f"Read remote {netcdf_file}")
else:
hf = h5py.File(netcdf_file)

for in_dataset in datasets:
if in_dataset not in hf:
continue
out_dataset = in_dataset if not strip_group_path else in_dataset.split("/")[-1]
cur_output_dir = output_dir / out_dataset
cur_output_dir.mkdir(exist_ok=True, parents=True)

vrt_path = cur_output_dir / vrt_filename

# Create VRT file
gdal.Translate(
str(vrt_path),
f"netcdf:{netcdf_file.replace('s3://', '/vsis3/')}:{in_dataset}",
callback=gdal.TermProgress_nocb,
)

# Build overviews using GDAL function with compression
add_overviews(vrt_path, external=True)
Loading
Loading