Skip to content

Commit 1665a49

Browse files
committed
Merge branch 'issue197-vectorcube-applydim-udf'
2 parents 810220d + 53e9063 commit 1665a49

31 files changed

+1999
-1349
lines changed

openeo_driver/ProcessGraphDeserializer.py

+40-69
Original file line numberDiff line numberDiff line change
@@ -677,13 +677,13 @@ def apply_neighborhood(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
677677

678678
@process
679679
def apply_dimension(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
680-
data_cube = args.get_required("data", expected_type=DriverDataCube)
680+
data_cube = args.get_required("data", expected_type=(DriverDataCube, DriverVectorCube))
681681
process = args.get_deep("process", "process_graph", expected_type=dict)
682-
dimension = args.get_required("dimension", expected_type=str)
682+
dimension = args.get_required(
683+
"dimension", expected_type=str, validator=ProcessArgs.validator_one_of(data_cube.get_dimension_names())
684+
)
683685
target_dimension = args.get_optional("target_dimension", default=None, expected_type=str)
684686
context = args.get_optional("context", default=None)
685-
# do check_dimension here for error handling
686-
dimension, band_dim, temporal_dim = _check_dimension(cube=data_cube, dim=dimension, process="apply_dimension")
687687

688688
cube = data_cube.apply_dimension(
689689
process=process, dimension=dimension, target_dimension=target_dimension, context=context, env=env
@@ -747,10 +747,10 @@ def apply(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
747747
def reduce_dimension(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
748748
data_cube: DriverDataCube = args.get_required("data", expected_type=DriverDataCube)
749749
reduce_pg = args.get_deep("reducer", "process_graph", expected_type=dict)
750-
dimension = args.get_required("dimension", expected_type=str)
750+
dimension = args.get_required(
751+
"dimension", expected_type=str, validator=ProcessArgs.validator_one_of(data_cube.get_dimension_names())
752+
)
751753
context = args.get_optional("context", default=None)
752-
# do check_dimension here for error handling
753-
dimension, band_dim, temporal_dim = _check_dimension(cube=data_cube, dim=dimension, process="reduce_dimension")
754754
return data_cube.reduce_dimension(reducer=reduce_pg, dimension=dimension, context=context, env=env)
755755

756756

@@ -915,60 +915,35 @@ def rename_labels(args: dict, env: EvalEnv) -> DriverDataCube:
915915
)
916916

917917

918-
def _check_dimension(cube: DriverDataCube, dim: str, process: str):
919-
"""
920-
Helper to check/validate the requested and available dimensions of a cube.
921-
922-
:return: tuple (requested dimension, name of band dimension, name of temporal dimension)
923-
"""
924-
# Note: large part of this is support/adapting for old client
925-
# (pre https://github.com/Open-EO/openeo-python-client/issues/93)
926-
# TODO remove this legacy support when not necessary anymore
927-
metadata = cube.metadata
928-
try:
929-
band_dim = metadata.band_dimension.name
930-
except MetadataException:
931-
band_dim = None
932-
try:
933-
temporal_dim = metadata.temporal_dimension.name
934-
except MetadataException:
935-
temporal_dim = None
936-
937-
if dim not in metadata.dimension_names():
938-
if dim in ["spectral_bands", "bands"] and band_dim:
939-
_log.warning("Probably old client requesting band dimension {d!r},"
940-
" but actual band dimension name is {n!r}".format(d=dim, n=band_dim))
941-
dim = band_dim
942-
elif dim == "temporal" and temporal_dim:
943-
_log.warning("Probably old client requesting temporal dimension {d!r},"
944-
" but actual temporal dimension name is {n!r}".format(d=dim, n=temporal_dim))
945-
dim = temporal_dim
946-
else:
947-
raise ProcessParameterInvalidException(
948-
parameter="dimension", process=process,
949-
reason="got {d!r}, but should be one of {n!r}".format(d=dim, n=metadata.dimension_names()))
950-
951-
return dim, band_dim, temporal_dim
952-
953-
954918
@process
955919
def aggregate_temporal(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
956920
data_cube = args.get_required("data", expected_type=DriverDataCube)
957-
reduce_pg = args.get_deep("reducer", "process_graph", expected_type=dict)
958-
context = args.get_optional("context", default=None)
959921
intervals = args.get_required("intervals")
922+
reduce_pg = args.get_deep("reducer", "process_graph", expected_type=dict)
960923
labels = args.get_optional("labels", default=None)
961-
dimension = _get_time_dim_or_default(args, data_cube)
962-
return data_cube.aggregate_temporal(intervals=intervals,labels=labels,reducer=reduce_pg, dimension=dimension, context=context)
924+
dimension = args.get_optional(
925+
"dimension",
926+
default=lambda: data_cube.metadata.temporal_dimension.name,
927+
validator=ProcessArgs.validator_one_of(data_cube.get_dimension_names()),
928+
)
929+
context = args.get_optional("context", default=None)
930+
931+
return data_cube.aggregate_temporal(
932+
intervals=intervals, labels=labels, reducer=reduce_pg, dimension=dimension, context=context
933+
)
963934

964935

965936
@process_registry_100.add_function
966937
def aggregate_temporal_period(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
967938
data_cube = args.get_required("data", expected_type=DriverDataCube)
939+
period = args.get_required("period")
968940
reduce_pg = args.get_deep("reducer", "process_graph", expected_type=dict)
941+
dimension = args.get_optional(
942+
"dimension",
943+
default=lambda: data_cube.metadata.temporal_dimension.name,
944+
validator=ProcessArgs.validator_one_of(data_cube.get_dimension_names()),
945+
)
969946
context = args.get_optional("context", default=None)
970-
period = args.get_required("period")
971-
dimension = _get_time_dim_or_default(args, data_cube, "aggregate_temporal_period")
972947

973948
dry_run_tracer: DryRunDataTracer = env.get(ENV_DRY_RUN_TRACER)
974949
if dry_run_tracer:
@@ -1045,24 +1020,6 @@ def _period_to_intervals(start, end, period) -> List[Tuple[pd.Timestamp, pd.Time
10451020
return intervals
10461021

10471022

1048-
def _get_time_dim_or_default(args: ProcessArgs, data_cube, process_id="aggregate_temporal"):
1049-
dimension = args.get_optional("dimension", None)
1050-
if dimension is not None:
1051-
dimension, _, _ = _check_dimension(cube=data_cube, dim=dimension, process=process_id)
1052-
else:
1053-
# default: there is a single temporal dimension
1054-
try:
1055-
dimension = data_cube.metadata.temporal_dimension.name
1056-
except MetadataException:
1057-
raise ProcessParameterInvalidException(
1058-
parameter="dimension", process=process_id,
1059-
reason="No dimension was set, and no temporal dimension could be found. Available dimensions: {n!r}".format(
1060-
n=data_cube.metadata.dimension_names()))
1061-
# do check_dimension here for error handling
1062-
dimension, band_dim, temporal_dim = _check_dimension(cube=data_cube, dim=dimension, process=process_id)
1063-
return dimension
1064-
1065-
10661023
@process_registry_100.add_function
10671024
def aggregate_spatial(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
10681025
cube = args.get_required("data", expected_type=DriverDataCube)
@@ -1624,14 +1581,28 @@ def load_uploaded_files(args: dict, env: EvalEnv) -> Union[DriverVectorCube,Driv
16241581
.returns("vector-cube", schema={"type": "object", "subtype": "vector-cube"})
16251582
)
16261583
def to_vector_cube(args: Dict, env: EvalEnv):
1627-
# TODO: standardization of something like this? https://github.com/Open-EO/openeo-processes/issues/346
1584+
_log.warning("Experimental process `to_vector_cube` is deprecated, use `load_geojson` instead")
1585+
# TODO: remove this experimental/deprecated process
16281586
data = extract_arg(args, "data", process_id="to_vector_cube")
16291587
if isinstance(data, dict) and data.get("type") in {"Polygon", "MultiPolygon", "Feature", "FeatureCollection"}:
16301588
return env.backend_implementation.vector_cube_cls.from_geojson(data)
1631-
# TODO: support more inputs: string with geojson, string with WKT, list of WKT, string with URL to GeoJSON, ...
16321589
raise FeatureUnsupportedException(f"Converting {type(data)} to vector cube is not supported")
16331590

16341591

1592+
@process_registry_100.add_function(spec=read_spec("openeo-processes/2.x/proposals/load_geojson.json"))
1593+
def load_geojson(args: ProcessArgs, env: EvalEnv) -> DriverVectorCube:
1594+
data = args.get_required(
1595+
"data",
1596+
validator=ProcessArgs.validator_geojson_dict(
1597+
# TODO: also allow LineString and MultiLineString?
1598+
allowed_types=["Point", "MultiPoint", "Polygon", "MultiPolygon", "Feature", "FeatureCollection"]
1599+
),
1600+
)
1601+
properties = args.get_optional("properties", default=[], expected_type=(list, tuple))
1602+
vector_cube = env.backend_implementation.vector_cube_cls.from_geojson(data, columns_for_cube=properties)
1603+
return vector_cube
1604+
1605+
16351606
@non_standard_process(
16361607
ProcessSpec("get_geometries", description="Reads vector data from a file or a URL or get geometries from a FeatureCollection")
16371608
.param('filename', description="filename or http url of a vector file", schema={"type": "string"}, required=False)

openeo_driver/_version.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.60.0a1"
1+
__version__ = "0.61.0a1"

openeo_driver/datacube.py

+57-15
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,26 @@
11
import abc
22
import inspect
3+
import io
34
import logging
45
import zipfile
56
from pathlib import Path
6-
from typing import List, Union, Optional, Dict, Any, Tuple, Sequence
7-
import io
7+
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union
88

99
import geopandas as gpd
1010
import numpy
11+
import openeo.udf
12+
import pandas
1113
import pyproj
14+
import requests
1215
import shapely.geometry
1316
import shapely.geometry.base
1417
import shapely.ops
1518
import xarray
16-
from pyproj import CRS
17-
import requests
18-
1919
from openeo.metadata import CollectionMetadata
2020
from openeo.util import ensure_dir, str_truncate
21-
import openeo.udf
22-
from openeo_driver.datastructs import SarBackscatterArgs, ResolutionMergeArgs, StacAsset
21+
from pyproj import CRS
22+
23+
from openeo_driver.datastructs import ResolutionMergeArgs, SarBackscatterArgs, StacAsset
2324
from openeo_driver.errors import FeatureUnsupportedException, InternalException
2425
from openeo_driver.util.geometry import GeometryBufferer, validate_geojson_coordinates
2526
from openeo_driver.util.ioformats import IOFORMATS
@@ -61,6 +62,9 @@ def __eq__(self, o: object) -> bool:
6162
return True
6263
return False
6364

65+
def get_dimension_names(self) -> List[str]:
66+
return self.metadata.dimension_names()
67+
6468
def _not_implemented(self):
6569
"""Helper to raise a NotImplemented exception containing method name"""
6670
raise NotImplementedError("DataCube method not implemented: {m!r}".format(m=inspect.stack()[1].function))
@@ -221,6 +225,9 @@ class DriverVectorCube:
221225
COLUMN_SELECTION_ALL = "all"
222226
COLUMN_SELECTION_NUMERICAL = "numerical"
223227

228+
# Xarray cube attribute to indicate that it is a dummy cube
229+
CUBE_ATTR_VECTOR_CUBE_DUMMY = "vector_cube_dummy"
230+
224231
def __init__(
225232
self,
226233
geometries: gpd.GeoDataFrame,
@@ -281,14 +288,21 @@ def from_geodataframe(
281288
elif columns_for_cube == cls.COLUMN_SELECTION_ALL:
282289
columns_for_cube = available_columns
283290
elif isinstance(columns_for_cube, list):
284-
# TODO #114 limit to subset with available columns (and automatically fill in missing columns with nodata)?
285291
columns_for_cube = columns_for_cube
286292
else:
287293
raise ValueError(columns_for_cube)
288294
assert isinstance(columns_for_cube, list)
289295

290296
if columns_for_cube:
291-
cube_df = data[columns_for_cube]
297+
existing = [c for c in columns_for_cube if c in available_columns]
298+
to_add = [c for c in columns_for_cube if c not in available_columns]
299+
if existing:
300+
cube_df = data[existing]
301+
if to_add:
302+
cube_df.loc[:, to_add] = numpy.nan
303+
else:
304+
cube_df = pandas.DataFrame(index=data.index, columns=to_add)
305+
292306
# TODO: remove `columns_for_cube` from geopandas data frame?
293307
# Enabling that triggers failure of som existing tests that use `aggregate_spatial`
294308
# to "enrich" a vector cube with pre-existing properties
@@ -308,7 +322,14 @@ def from_geodataframe(
308322
return cls(geometries=geometries_df, cube=cube)
309323

310324
else:
311-
return cls(geometries=data)
325+
# Use 1D dummy cube of NaN values
326+
cube: xarray.DataArray = xarray.DataArray(
327+
data=numpy.full(shape=[data.shape[0]], fill_value=numpy.nan),
328+
dims=[cls.DIM_GEOMETRIES],
329+
coords={cls.DIM_GEOMETRIES: data.geometry.index.to_list()},
330+
attrs={cls.CUBE_ATTR_VECTOR_CUBE_DUMMY: True},
331+
)
332+
return cls(geometries=data, cube=cube)
312333

313334
@classmethod
314335
def from_fiona(
@@ -400,7 +421,7 @@ def _as_geopandas_df(
400421
"""Join geometries and cube as a geopandas dataframe"""
401422
# TODO: avoid copy?
402423
df = self._geometries.copy(deep=True)
403-
if self._cube is not None:
424+
if self._cube is not None and not self._cube.attrs.get(self.CUBE_ATTR_VECTOR_CUBE_DUMMY):
404425
assert self._cube.dims[0] == self.DIM_GEOMETRIES
405426
# TODO: better way to combine cube with geometries
406427
# Flatten multiple (non-geometry) dimensions from cube to new properties in geopandas dataframe
@@ -426,6 +447,16 @@ def to_wkt(self) -> List[str]:
426447
wkts = [str(g) for g in self._geometries.geometry]
427448
return wkts
428449

450+
def to_internal_json(self) -> dict:
451+
"""
452+
Export to an internal JSON-style representation.
453+
Subject to change any time: not intended for public consumption, just for (unit) test purposes.
454+
"""
455+
return {
456+
"geometries": shapely.geometry.mapping(self._geometries),
457+
"cube": self._cube.to_dict(data="array") if self._cube is not None else None,
458+
}
459+
429460
def get_crs(self) -> pyproj.CRS:
430461
return self._geometries.crs or pyproj.CRS.from_epsg(4326)
431462

@@ -485,7 +516,7 @@ def to_legacy_save_result(self) -> Union["AggregatePolygonResult", "JSONResult"]
485516
# TODO: eliminate these legacy, non-standard formats?
486517
from openeo_driver.save_result import AggregatePolygonResult, JSONResult
487518

488-
if self._cube is None:
519+
if self._cube is None or self._cube.attrs.get(self.CUBE_ATTR_VECTOR_CUBE_DUMMY):
489520
# No cube: no real data to return (in legacy style), so let's just return a `null` per geometry.
490521
return JSONResult(data=[None] * self.geometry_count())
491522

@@ -511,6 +542,12 @@ def to_legacy_save_result(self) -> Union["AggregatePolygonResult", "JSONResult"]
511542
f"Unsupported cube configuration {cube.dims} for _write_legacy_aggregate_polygon_result_json"
512543
)
513544

545+
def get_dimension_names(self) -> List[str]:
546+
if self._cube is None:
547+
return [self.DIM_GEOMETRIES]
548+
else:
549+
return list(self._cube.dims)
550+
514551
def get_bounding_box(self) -> Tuple[float, float, float, float]:
515552
# TODO: cache bounding box?
516553
# TODO #114 #141 Open-EO/openeo-geopyspark-driver#239: option to buffer point geometries (if any)
@@ -596,18 +633,23 @@ def apply_dimension(
596633
context: Optional[dict] = None,
597634
env: EvalEnv,
598635
) -> "DriverVectorCube":
636+
# Is callback a single run_udf node process?
599637
single_run_udf = SingleRunUDFProcessGraph.parse_or_none(process)
600638

601639
if single_run_udf:
602640
# Process with single "run_udf" node
603-
# TODO: check provided dimension with actual dimension of the cube
604-
if dimension in (self.DIM_BANDS, self.DIM_PROPERTIES) and target_dimension is None:
641+
if (
642+
dimension == self.DIM_GEOMETRIES
643+
or (dimension in {self.DIM_BANDS, self.DIM_PROPERTIES}.intersection(self.get_dimension_names()))
644+
and target_dimension is None
645+
):
605646
log.warning(
606647
f"Using experimental feature: DriverVectorCube.apply_dimension along dim {dimension} and empty cube"
607648
)
608-
# TODO: this is non-standard special case: vector cube with only geometries, but no "cube" data
649+
# TODO: data chunking (e.g. large feature collections)
609650
gdf = self._as_geopandas_df()
610651
feature_collection = openeo.udf.FeatureCollection(id="_", data=gdf)
652+
# TODO: dedicated UDF signature to indicate to work on vector cube through a feature collection based API
611653
udf_data = openeo.udf.UdfData(
612654
proj={"EPSG": self._geometries.crs.to_epsg()},
613655
feature_collection_list=[feature_collection],

openeo_driver/dummy/dummy_backend.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -181,10 +181,12 @@ def __init__(self, metadata: CollectionMetadata = None):
181181
self.apply_tiles = Mock(name="apply_tiles", return_value=self)
182182
self.apply_tiles_spatiotemporal = Mock(name="apply_tiles_spatiotemporal", return_value=self)
183183

184-
# Create mock methods for remaining data cube methods that are not yet defined
185-
already_defined = set(DummyDataCube.__dict__.keys()).union(self.__dict__.keys())
184+
# Create mock methods for remaining DriverDataCube methods that are not yet defined directly by DummyDataCube
185+
to_keep = set(DummyDataCube.__dict__.keys()).union(self.__dict__.keys())
186+
to_keep.update(m for m in DriverDataCube.__dict__.keys() if m.startswith("_"))
187+
to_keep.update(["get_dimension_names"])
186188
for name, method in DriverDataCube.__dict__.items():
187-
if not name.startswith('_') and name not in already_defined and callable(method):
189+
if not name in to_keep and callable(method):
188190
setattr(self, name, Mock(name=name, return_value=self))
189191

190192
for name in [n for n, m in DummyDataCube.__dict__.items() if getattr(m, '_mock_side_effect', False)]:

0 commit comments

Comments
 (0)