From a45a45050338aab8d758af444cc7473254c75f12 Mon Sep 17 00:00:00 2001 From: Scott Staniewicz Date: Fri, 19 Jul 2024 06:55:19 -0400 Subject: [PATCH 1/7] Rename `create_temporal_average` to just `create_average`. Use in `sequential` instead of `gdal_calc` --- src/dolphin/timeseries.py | 20 ++++++++++++++++---- src/dolphin/workflows/sequential.py | 28 +++------------------------- 2 files changed, 19 insertions(+), 29 deletions(-) diff --git a/src/dolphin/timeseries.py b/src/dolphin/timeseries.py index 1453f971..8f84c082 100644 --- a/src/dolphin/timeseries.py +++ b/src/dolphin/timeseries.py @@ -587,7 +587,7 @@ class AverageFunc(Protocol): def __call__(self, ArrayLike, axis: int) -> ArrayLike: ... -def create_temporal_average( +def create_average( file_list: Sequence[PathOrStr], output_file: PathOrStr, block_shape: tuple[int, int] = (512, 512), @@ -595,7 +595,7 @@ def create_temporal_average( average_func: Callable[[ArrayLike, int], np.ndarray] = np.nanmean, read_masked: bool = False, ) -> None: - """Average all images in `reader` to create a 2D image in `output_file`. + """Average all images in `file_list` to create a 2D image in `output_file`. Parameters ---------- @@ -622,7 +622,8 @@ def read_and_average( readers: Sequence[io.StackReader], rows: slice, cols: slice ) -> tuple[slice, slice, np.ndarray]: chunk = readers[0][:, rows, cols] - return average_func(chunk, 0), rows, cols + out_chunk = average_func(chunk, 0), rows, cols + return out_chunk writer = io.BackgroundRasterWriter(output_file, like_filename=file_list[0]) with NamedTemporaryFile(mode="w", suffix=".vrt") as f: @@ -644,6 +645,17 @@ def read_and_average( writer.notify_finished() +def create_temporal_average(*args, **kwargs): + import warnings + + warnings.warn( + "'create_temporal_average' is deprecated. Use 'create_average' instead.", + DeprecationWarning, + stacklevel=2, + ) + return create_average(*args, **kwargs) + + def invert_unw_network( unw_file_list: Sequence[PathOrStr], reference: ReferencePoint, @@ -894,7 +906,7 @@ def intersect_conncomp(arr: np.ma.MaskedArray, axis: int) -> np.ndarray: conncomp_intersection_file = Path(output_dir) / "conncomp_intersection.tif" if ccl_file_list and not conncomp_intersection_file.exists(): logger.info("Creating intersection of connected components") - create_temporal_average( + create_average( file_list=ccl_file_list, output_file=conncomp_intersection_file, block_shape=block_shape, diff --git a/src/dolphin/workflows/sequential.py b/src/dolphin/workflows/sequential.py index b428e2d5..d4ab7f6d 100644 --- a/src/dolphin/workflows/sequential.py +++ b/src/dolphin/workflows/sequential.py @@ -7,16 +7,13 @@ import logging from itertools import chain -from os import fspath from pathlib import Path from typing import Optional -from osgeo_utils import gdal_calc - -from dolphin import io from dolphin._types import Filename from dolphin.io import VRTStack from dolphin.stack import MiniStackPlanner +from dolphin.timeseries import create_average from .config import ShpMethod from .single import run_wrapped_phase_single @@ -132,8 +129,8 @@ def already_processed(d: Path, search_ext: str = ".tif") -> bool: # we can pass the list of files to gdal_calc, which interprets it # as a multi-band file - _average_rasters(temp_coh_files, output_temp_coh_file, "Float32") - _average_rasters(shp_count_files, output_shp_count_file, "Int16") + create_average(temp_coh_files, output_file=output_temp_coh_file) + create_average(shp_count_files, output_file=output_shp_count_file) # Combine the separate SLC output lists into a single list all_slc_files = list(chain.from_iterable(output_slc_files)) @@ -162,22 +159,3 @@ def _get_outputs_from_folder( # Currently ignoring to not stitch: # eigenvalues, estimator, avg_coh return cur_output_files, cur_comp_slc_file, temp_coh_file, shp_count_file - - -def _average_rasters(file_list: list[Path], outfile: Path, output_type: str): - if len(file_list) == 1: - file_list[0].rename(outfile) - return - - logger.info(f"Averaging {len(file_list)} files into {outfile}") - gdal_calc.Calc( - NoDataValue=0, - format="GTiff", - outfile=fspath(outfile), - type=output_type, - quiet=True, - overwrite=True, - creation_options=io.DEFAULT_TIFF_OPTIONS, - A=file_list, - calc="numpy.nanmean(A, axis=0)", - ) From 1842cf16bc10ec19eff91146c1af16ed7c5a8624 Mon Sep 17 00:00:00 2001 From: Scott Staniewicz Date: Fri, 19 Jul 2024 06:58:49 -0400 Subject: [PATCH 2/7] Allow `create_average` to work with masked arrays using a `mask_average_func` --- src/dolphin/timeseries.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/dolphin/timeseries.py b/src/dolphin/timeseries.py index 8f84c082..61476879 100644 --- a/src/dolphin/timeseries.py +++ b/src/dolphin/timeseries.py @@ -593,6 +593,7 @@ def create_average( block_shape: tuple[int, int] = (512, 512), num_threads: int = 5, average_func: Callable[[ArrayLike, int], np.ndarray] = np.nanmean, + mask_average_func: Callable[[ArrayLike, int], np.ndarray] = np.any, read_masked: bool = False, ) -> None: """Average all images in `file_list` to create a 2D image in `output_file`. @@ -612,6 +613,11 @@ def create_average( average_func : Callable[[ArrayLike, int], np.ndarray], optional The function to use to average the images. Default is `np.nanmean`, which calls `np.nanmean(arr, axis=0)` on each block. + mask_average_func : Callable[[ArrayLike, int], np.ndarray], optional + If `read_masked` is true, the function to use to average the masks. + Default is `np.any`, which calls `np.any(masks, axis=0)` on each block + and masks *any* pixel that is masked in one of the images. + Use `np.all` to have more valid pixels (a smaller masked region). read_masked : bool, optional If True, reads the data as a masked array based on the rasters' nodata values. Default is False. @@ -623,6 +629,9 @@ def read_and_average( ) -> tuple[slice, slice, np.ndarray]: chunk = readers[0][:, rows, cols] out_chunk = average_func(chunk, 0), rows, cols + if isinstance(chunk, np.ma.MaskedArray): + mask = mask_average_func(chunk.mask, 0) + out_chunk = np.ma.MaskedArray(data=out_chunk, mask=mask) return out_chunk writer = io.BackgroundRasterWriter(output_file, like_filename=file_list[0]) From 3d7e6f783fcc82d4f28e513c7ff548c801f8d10f Mon Sep 17 00:00:00 2001 From: Scott Staniewicz Date: Fri, 19 Jul 2024 07:03:08 -0400 Subject: [PATCH 3/7] delete unused protocol --- src/dolphin/timeseries.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/dolphin/timeseries.py b/src/dolphin/timeseries.py index 61476879..f236e558 100644 --- a/src/dolphin/timeseries.py +++ b/src/dolphin/timeseries.py @@ -4,7 +4,7 @@ import shutil from pathlib import Path from tempfile import NamedTemporaryFile -from typing import Callable, Optional, Protocol, Sequence, TypeVar +from typing import Callable, Optional, Sequence, TypeVar import jax.numpy as jnp import numpy as np @@ -581,12 +581,6 @@ def read_and_fit( create_overviews([output_file]) -class AverageFunc(Protocol): - """Protocol for temporally averaging a block of data.""" - - def __call__(self, ArrayLike, axis: int) -> ArrayLike: ... - - def create_average( file_list: Sequence[PathOrStr], output_file: PathOrStr, From e96760b21bf3e45c5d3da11184589c2b19236b39 Mon Sep 17 00:00:00 2001 From: Scott Staniewicz Date: Fri, 19 Jul 2024 08:14:24 -0400 Subject: [PATCH 4/7] add weighted average --- src/dolphin/timeseries.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/src/dolphin/timeseries.py b/src/dolphin/timeseries.py index f236e558..fe67dc03 100644 --- a/src/dolphin/timeseries.py +++ b/src/dolphin/timeseries.py @@ -586,8 +586,9 @@ def create_average( output_file: PathOrStr, block_shape: tuple[int, int] = (512, 512), num_threads: int = 5, - average_func: Callable[[ArrayLike, int], np.ndarray] = np.nanmean, + average_func: Callable[[ArrayLike, int], np.ndarray] = np.average, mask_average_func: Callable[[ArrayLike, int], np.ndarray] = np.any, + weights: ArrayLike | None = None, read_masked: bool = False, ) -> None: """Average all images in `file_list` to create a 2D image in `output_file`. @@ -606,23 +607,33 @@ def create_average( Default is 5. average_func : Callable[[ArrayLike, int], np.ndarray], optional The function to use to average the images. - Default is `np.nanmean`, which calls `np.nanmean(arr, axis=0)` on each block. + Default calls `np.average(arr, axis=0, weights=weights)` on each block. mask_average_func : Callable[[ArrayLike, int], np.ndarray], optional If `read_masked` is true, the function to use to average the masks. Default is `np.any`, which calls `np.any(masks, axis=0)` on each block and masks *any* pixel that is masked in one of the images. Use `np.all` to have more valid pixels (a smaller masked region). + weights: ArrayLike, optional + If provided, assigns a floating point weight to each file in `file_list`. + The output is a weighted average. + Default is `None`, equivalent to `np.ones(len(file_list))`. read_masked : bool, optional If True, reads the data as a masked array based on the rasters' nodata values. Default is False. """ + if weights is None: + weights = np.ones(len(file_list), dtype="float32") + if weights.shape != (len(file_list),): + msg = f"weights must be shape (len(file_list),) got {weights.shape}" + raise ValueError(msg) def read_and_average( readers: Sequence[io.StackReader], rows: slice, cols: slice ) -> tuple[slice, slice, np.ndarray]: chunk = readers[0][:, rows, cols] - out_chunk = average_func(chunk, 0), rows, cols + + out_chunk = average_func(chunk, axis=0, weights=weights), rows, cols if isinstance(chunk, np.ma.MaskedArray): mask = mask_average_func(chunk.mask, 0) out_chunk = np.ma.MaskedArray(data=out_chunk, mask=mask) From 5c550c6e2af70fc5b469d13a3d7152e2f651f128 Mon Sep 17 00:00:00 2001 From: Scott Staniewicz Date: Fri, 19 Jul 2024 15:59:41 -0400 Subject: [PATCH 5/7] add test for `create_average` --- tests/test_timeseries.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tests/test_timeseries.py b/tests/test_timeseries.py index 61b7b2d1..085d14d7 100644 --- a/tests/test_timeseries.py +++ b/tests/test_timeseries.py @@ -222,6 +222,17 @@ def test_stack_unweighted(self, data, x_arr, expected_velo): npt.assert_allclose(velocities, expected_velo, atol=1e-5) +class TestCreateAverage: + def test_basic(self, tmp_path, slc_file_list, slc_stack): + output_file = tmp_path / "average.tif" + timeseries.create_average( + file_list=slc_file_list, output_file=output_file, num_threads=1 + ) + computed = io.load_gdal(output_file) + expected = np.average(slc_stack, axis=0) + npt.assert_allclose(computed, expected) + + if __name__ == "__main__": sar_dates = make_sar_dates() sar_phases = make_sar_phases(sar_dates) From e281990e2ae4f3cbb10af7ad3c07626f153c216c Mon Sep 17 00:00:00 2001 From: Scott Staniewicz Date: Fri, 19 Jul 2024 16:09:19 -0400 Subject: [PATCH 6/7] add weighted tests --- tests/test_timeseries.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/test_timeseries.py b/tests/test_timeseries.py index 085d14d7..bb1f047b 100644 --- a/tests/test_timeseries.py +++ b/tests/test_timeseries.py @@ -232,6 +232,21 @@ def test_basic(self, tmp_path, slc_file_list, slc_stack): expected = np.average(slc_stack, axis=0) npt.assert_allclose(computed, expected) + def test_weighted(self, tmp_path, slc_file_list, slc_stack): + output_file = tmp_path / "average.tif" + weights = np.ones(len(slc_file_list)) + weights[-1] = 0 + timeseries.create_average( + file_list=slc_file_list, + output_file=output_file, + num_threads=1, + weights=weights, + ) + + computed = io.load_gdal(output_file) + expected = np.average(slc_stack[:-1], axis=0) + npt.assert_allclose(computed, expected, atol=1e-6) + if __name__ == "__main__": sar_dates = make_sar_dates() From 857430b9f1930633ae0adbfa52143f81f330c8a0 Mon Sep 17 00:00:00 2001 From: Scott Staniewicz Date: Fri, 19 Jul 2024 16:54:56 -0400 Subject: [PATCH 7/7] bring back protocol for weighted version --- src/dolphin/timeseries.py | 23 ++++++++++++++++++----- src/dolphin/workflows/sequential.py | 6 ++---- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/src/dolphin/timeseries.py b/src/dolphin/timeseries.py index fe67dc03..af339578 100644 --- a/src/dolphin/timeseries.py +++ b/src/dolphin/timeseries.py @@ -4,12 +4,12 @@ import shutil from pathlib import Path from tempfile import NamedTemporaryFile -from typing import Callable, Optional, Sequence, TypeVar +from typing import Callable, Optional, Protocol, Sequence, TypeVar import jax.numpy as jnp import numpy as np from jax import Array, jit, vmap -from numpy.typing import ArrayLike +from numpy.typing import ArrayLike, NDArray from opera_utils import get_dates from scipy import ndimage @@ -581,12 +581,20 @@ def read_and_fit( create_overviews([output_file]) +class WeightedAverager(Protocol): + """Protocol for temporally averaging a block of data.""" + + def __call__( + self, arr: ArrayLike, axis: int, weights: ArrayLike | None + ) -> NDArray: ... + + def create_average( file_list: Sequence[PathOrStr], output_file: PathOrStr, block_shape: tuple[int, int] = (512, 512), num_threads: int = 5, - average_func: Callable[[ArrayLike, int], np.ndarray] = np.average, + average_func: WeightedAverager = np.average, mask_average_func: Callable[[ArrayLike, int], np.ndarray] = np.any, weights: ArrayLike | None = None, read_masked: bool = False, @@ -906,8 +914,12 @@ def _get_largest_conncomp_mask( block_shape: tuple[int, int] = (512, 512), num_threads: int = 5, ) -> np.ndarray: - def intersect_conncomp(arr: np.ma.MaskedArray, axis: int) -> np.ndarray: + def intersect_conncomp( + arr: ArrayLike, axis: int, _weights: ArrayLike | None + ) -> np.ndarray: # Track where input is nodata + if not isinstance(arr, np.ma.MaskedArray): + arr = np.ma.MaskedArray(data=arr, mask=np.ma.nomask) any_masked = np.any(arr.mask, axis=axis) # Get the logical AND of all nonzero conncomp labels fillval = arr.fill_value @@ -918,6 +930,7 @@ def intersect_conncomp(arr: np.ma.MaskedArray, axis: int) -> np.ndarray: return all_are_valid conncomp_intersection_file = Path(output_dir) / "conncomp_intersection.tif" + average_func: WeightedAverager = intersect_conncomp # type: ignore[assignment] if ccl_file_list and not conncomp_intersection_file.exists(): logger.info("Creating intersection of connected components") create_average( @@ -925,7 +938,7 @@ def intersect_conncomp(arr: np.ma.MaskedArray, axis: int) -> np.ndarray: output_file=conncomp_intersection_file, block_shape=block_shape, num_threads=num_threads, - average_func=intersect_conncomp, + average_func=average_func, read_masked=True, ) diff --git a/src/dolphin/workflows/sequential.py b/src/dolphin/workflows/sequential.py index d4ab7f6d..6d367039 100644 --- a/src/dolphin/workflows/sequential.py +++ b/src/dolphin/workflows/sequential.py @@ -125,11 +125,9 @@ def already_processed(d: Path, search_ext: str = ".tif") -> bool: # Average the temporal coherence files in each ministack full_span = ministack_planner.real_slc_date_range_str output_temp_coh_file = output_folder / f"temporal_coherence_average_{full_span}.tif" - output_shp_count_file = output_folder / f"shp_counts_average_{full_span}.tif" - - # we can pass the list of files to gdal_calc, which interprets it - # as a multi-band file create_average(temp_coh_files, output_file=output_temp_coh_file) + + output_shp_count_file = output_folder / f"shp_counts_average_{full_span}.tif" create_average(shp_count_files, output_file=output_shp_count_file) # Combine the separate SLC output lists into a single list