From 01bb9b0fa916330458fbeb8c8c06a6d9bae76f15 Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Tue, 10 Sep 2024 13:49:32 +0200 Subject: [PATCH] WIP: improve dry run https://github.com/EU-GRASSLAND-WATCH/EUGW/issues/7 --- openeo_driver/ProcessGraphDeserializer.py | 33 +++++--------- openeo_driver/datacube.py | 6 +++ openeo_driver/dry_run.py | 55 +++++++++++++++++++---- 3 files changed, 65 insertions(+), 29 deletions(-) diff --git a/openeo_driver/ProcessGraphDeserializer.py b/openeo_driver/ProcessGraphDeserializer.py index d7885c87..d922e581 100644 --- a/openeo_driver/ProcessGraphDeserializer.py +++ b/openeo_driver/ProcessGraphDeserializer.py @@ -1222,9 +1222,6 @@ def aggregate_spatial(args: ProcessArgs, env: EvalEnv) -> DriverDataCube: # TODO #114: convert all cases to DriverVectorCube first and just work with that if isinstance(geoms, DriverVectorCube): pass - elif isinstance(geoms, DryRunDataCube): - # TODO: properly support DriverVectorCube in dry run - geoms = DriverVectorCube(geometries=gpd.GeoDataFrame(geometry=[]), cube=None) elif isinstance(geoms, dict): try: # Automatically convert inline GeoJSON to a vector cube #114/#141 @@ -1397,7 +1394,6 @@ def filter_spatial(args: Dict, env: EvalEnv) -> DriverDataCube: elif isinstance(geometries, DriverVectorCube): pass else: - # TODO #114: support DriverVectorCube raise NotImplementedError( "filter_spatial does not support {g!r}".format(g=geometries) ) @@ -1510,7 +1506,6 @@ def run_udf(args: dict, env: EvalEnv): _log.info(f"run_udf: data of type {type(data)} has direct run_udf support") return data.run_udf(udf=udf, runtime=runtime, context=context, env=env) - # TODO #114 add support for DriverVectorCube if isinstance(data, AggregatePolygonResult): pass if isinstance(data, DriverVectorCube): @@ -1860,11 +1855,13 @@ def get_geometries(args: Dict, env: EvalEnv) -> Union[DelayedVector, dict]: ) def raster_to_vector(args: Dict, env: EvalEnv): image_collection = extract_arg(args, 'data') + if not isinstance(image_collection, DriverDataCube): raise ProcessParameterInvalidException( parameter="data", process="raster_to_vector", reason=f"Invalid data type {type(image_collection)!r} expected raster-cube." ) + return image_collection.raster_to_vector() @@ -1876,34 +1873,28 @@ def raster_to_vector(args: Dict, env: EvalEnv): ) def vector_to_raster(args: dict, env: EvalEnv) -> DriverDataCube: input_vector_cube = extract_arg(args, "data") - dry_run_tracer: DryRunDataTracer = env.get(ENV_DRY_RUN_TRACER) - if dry_run_tracer: - if not isinstance(input_vector_cube, DryRunDataCube): - raise ProcessParameterInvalidException( - parameter="data", - process="vector_to_raster", - reason=f"Invalid data type {type(input_vector_cube)!r} expected vector-cube.", - ) - return input_vector_cube if "target_data_cube" in args: target = extract_arg(args, "target_data_cube") # TODO: remove after full migration to use of 'target' else: target = extract_arg(args, "target") # TODO: to_driver_vector_cube is temporary. Remove it when vector cube is fully supported. - if not isinstance(input_vector_cube, DriverVectorCube) and not hasattr(input_vector_cube, "to_driver_vector_cube"): - raise ProcessParameterInvalidException( - parameter="data", - process="vector_to_raster", - reason=f"Invalid data type {type(input_vector_cube)!r} expected vector-cube.", - ) + if not isinstance(input_vector_cube, DriverVectorCube): + if not hasattr(input_vector_cube, "to_driver_vector_cube"): + raise ProcessParameterInvalidException( + parameter="data", + process="vector_to_raster", + reason=f"Invalid data type {type(input_vector_cube)!r} expected vector-cube.", + ) + input_vector_cube = input_vector_cube.to_driver_vector_cube() if not isinstance(target, DriverDataCube): raise ProcessParameterInvalidException( parameter="target", process="vector_to_raster", reason=f"Invalid data type {type(target)!r} expected raster-cube.", ) - return env.backend_implementation.vector_to_raster(input_vector_cube, target) + + return input_vector_cube.to_raster(target, env) def _get_udf(args, env: EvalEnv) -> Tuple[str, str]: diff --git a/openeo_driver/datacube.py b/openeo_driver/datacube.py index b705ec9e..e15bdef7 100644 --- a/openeo_driver/datacube.py +++ b/openeo_driver/datacube.py @@ -236,6 +236,9 @@ def to_scl_dilation_mask( # Note: this is a non-standard process self._not_implemented() + def raster_to_vector(self) -> DriverVectorCube: + return self._not_implemented() + class VectorCubeError(InternalException): code = "VectorCubeError" @@ -799,6 +802,9 @@ def apply_dimension( message=f"DriverVectorCube.apply_dimension with {dimension=} and {bool(single_run_udf)=}" ) + def to_raster(self, target: DriverDataCube, env: EvalEnv): + return env.backend_implementation.vector_to_raster(self, target) + def __repr__(self): bbox = repr(self.get_bounding_box()) if self.geometry_count() > 0 else "(none)" return ( diff --git a/openeo_driver/dry_run.py b/openeo_driver/dry_run.py index f9ca7780..726ccf06 100644 --- a/openeo_driver/dry_run.py +++ b/openeo_driver/dry_run.py @@ -37,6 +37,7 @@ from enum import Enum from typing import List, Union, Tuple, Any, Optional +import geopandas as gpd import numpy import shapely.geometry.base from shapely.geometry import Point, Polygon, MultiPolygon, GeometryCollection @@ -563,14 +564,18 @@ def aggregate_spatial( geometries: Union[BaseGeometry, str, DriverVectorCube], reducer: dict, target_dimension: str = "result", - ) -> "DryRunDataCube": + ) -> DryRunVectorCube: # TODO #71 #114 EP-3981 normalize to vector cube instead of GeometryCollection geoms_is_empty = isinstance(geometries, DriverVectorCube) and len(geometries.get_geometries()) == 0 cube = self if not geoms_is_empty: geometries, bbox = self._normalize_geometry(geometries) cube = self.filter_bbox(**bbox, operation="weak_spatial_extent") - return cube._process(operation="aggregate_spatial", arguments={"geometries": geometries}) + + traces = cube._data_tracer.process_traces( + traces=cube._traces, operation="aggregate_spatial", arguments={"geometries": geometries} + ) + return DryRunVectorCube(traces=traces, data_tracer=cube._data_tracer, metadata=cube.metadata) def _normalize_geometry(self, geometries) -> Tuple[Union[DriverVectorCube, DelayedVector, BaseGeometry], dict]: """ @@ -618,15 +623,19 @@ def _normalize_geometry(self, geometries) -> Tuple[Union[DriverVectorCube, Delay bbox = dict(west=bbox[0], south=bbox[1], east=bbox[2], north=bbox[3], crs=crs) return geometries, bbox - def raster_to_vector(self): - dimensions = [SpatialDimension(name=DriverVectorCube.DIM_GEOMETRY,extent=self.metadata.extent)] - if(self.metadata.has_temporal_dimension()): + def raster_to_vector(self) -> DriverVectorCube: + dimensions = [SpatialDimension(name=DriverVectorCube.DIM_GEOMETRY, extent=self.metadata.extent)] + if self.metadata.has_temporal_dimension(): dimensions.append(self.metadata.temporal_dimension) - if(self.metadata.has_band_dimension()): + if self.metadata.has_band_dimension(): dimensions.append(self.metadata.band_dimension) - return self._process(operation="raster_to_vector", arguments={},metadata=CollectionMetadata(metadata={}, dimensions=dimensions)) - + traces = self._data_tracer.process_traces(traces=self._traces, operation="raster_to_vector", arguments={}) + return DryRunVectorCube( + traces=traces, + data_tracer=self._data_tracer, + metadata=CollectionMetadata(metadata={}, dimensions=dimensions), + ) def resample_cube_spatial(self, target: 'DryRunDataCube', method: str = 'near') -> 'DryRunDataCube': cube = self._process("process_type", [ProcessType.FOCAL_SPACE]) @@ -809,3 +818,33 @@ def _nop(self, *args, **kwargs) -> 'DryRunDataCube': water_vapor = _nop linear_scale_range = _nop dimension_labels = _nop + + +class DryRunVectorCube(DriverVectorCube): + def __init__( + self, + traces: List[DataTraceBase], + data_tracer: DryRunDataTracer, + metadata: CollectionMetadata, + ): + super().__init__(geometries=gpd.GeoDataFrame(geometry=[]), cube=None) + self._traces = traces or [] + self._data_tracer = data_tracer + self.metadata = metadata + + def to_raster(self, target: DriverDataCube, env: EvalEnv): + dimensions = [SpatialDimension(name="x", extent=[]), SpatialDimension(name="y", extent=[])] + + if self.metadata.has_temporal_dimension(): + dimensions.append(self.metadata.temporal_dimension) + if self.metadata.has_band_dimension(): + dimensions.append(self.metadata.band_dimension) + + traces = self._data_tracer.process_traces(traces=self._traces, operation="raster_to_vector", arguments={}) + return DryRunDataCube( + traces=traces, + data_tracer=self._data_tracer, + metadata=CollectionMetadata(metadata={}, dimensions=dimensions), + ) + + # TODO: support apply_dimension, filter_bands