|
5 | 5 | import datetime |
6 | 6 | import json |
7 | 7 | import logging |
8 | | -import re |
9 | 8 | import time |
10 | 9 | import warnings |
11 | 10 | from pathlib import Path |
|
23 | 22 | Union, |
24 | 23 | ) |
25 | 24 |
|
26 | | -import numpy |
27 | 25 | import pandas as pd |
28 | 26 | import requests |
29 | 27 | import shapely.errors |
|
37 | 35 | _JobManagerWorkerThreadPool, |
38 | 36 | _JobStartTask, |
39 | 37 | ) |
40 | | -from openeo.internal.processes.parse import ( |
41 | | - Parameter, |
42 | | - Process, |
43 | | - parse_remote_process_definition, |
44 | | -) |
| 38 | +from openeo.extra.job_management.process_based import ProcessBasedJobCreator |
45 | 39 | from openeo.rest import OpenEoApiError |
46 | 40 | from openeo.rest.auth.auth import BearerAuth |
47 | | -from openeo.util import LazyLoadCache, deep_get, repr_truncate, rfc3339 |
| 41 | +from openeo.util import deep_get, rfc3339 |
| 42 | + |
| 43 | +__all__ = [ |
| 44 | + "JobDatabaseInterface", |
| 45 | + "FullDataFrameJobDatabase", |
| 46 | + "ParquetJobDatabase", |
| 47 | + "CsvJobDatabase", |
| 48 | + "ProcessBasedJobCreator", |
| 49 | + "create_job_db", |
| 50 | + "get_job_db", |
| 51 | +] |
48 | 52 |
|
49 | 53 | _log = logging.getLogger(__name__) |
50 | 54 |
|
@@ -1157,180 +1161,3 @@ def create_job_db(path: Union[str, Path], df: pd.DataFrame, *, on_exists: str = |
1157 | 1161 | else: |
1158 | 1162 | raise NotImplementedError(f"Initialization of {type(job_db)} is not supported.") |
1159 | 1163 | return job_db |
1160 | | - |
1161 | | - |
1162 | | -class ProcessBasedJobCreator: |
1163 | | - """ |
1164 | | - Batch job creator |
1165 | | - (to be used together with :py:class:`MultiBackendJobManager`) |
1166 | | - that takes a parameterized openEO process definition |
1167 | | - (e.g a user-defined process (UDP) or a remote openEO process definition), |
1168 | | - and creates a batch job |
1169 | | - for each row of the dataframe managed by the :py:class:`MultiBackendJobManager` |
1170 | | - by filling in the process parameters with corresponding row values. |
1171 | | -
|
1172 | | - .. seealso:: |
1173 | | - See :ref:`job-management-with-process-based-job-creator` |
1174 | | - for more information and examples. |
1175 | | -
|
1176 | | - Process parameters are linked to dataframe columns by name. |
1177 | | - While this intuitive name-based matching should cover most use cases, |
1178 | | - there are additional options for overrides or fallbacks: |
1179 | | -
|
1180 | | - - When provided, ``parameter_column_map`` will be consulted |
1181 | | - for resolving a process parameter name (key in the dictionary) |
1182 | | - to a desired dataframe column name (corresponding value). |
1183 | | - - One common case is handled automatically as convenience functionality. |
1184 | | -
|
1185 | | - When: |
1186 | | -
|
1187 | | - - ``parameter_column_map`` is not provided (or set to ``None``), |
1188 | | - - and there is a *single parameter* that accepts inline GeoJSON geometries, |
1189 | | - - and the dataframe is a GeoPandas dataframe with a *single geometry* column, |
1190 | | -
|
1191 | | - then this parameter and this geometries column will be linked automatically. |
1192 | | -
|
1193 | | - - If a parameter can not be matched with a column by name as described above, |
1194 | | - a default value will be picked, |
1195 | | - first by looking in ``parameter_defaults`` (if provided), |
1196 | | - and then by looking up the default value from the parameter schema in the process definition. |
1197 | | - - Finally if no (default) value can be determined and the parameter |
1198 | | - is not flagged as optional, an error will be raised. |
1199 | | -
|
1200 | | -
|
1201 | | - :param process_id: (optional) openEO process identifier. |
1202 | | - Can be omitted when working with a remote process definition |
1203 | | - that is fully defined with a URL in the ``namespace`` parameter. |
1204 | | - :param namespace: (optional) openEO process namespace. |
1205 | | - Typically used to provide a URL to a remote process definition. |
1206 | | - :param parameter_defaults: (optional) default values for process parameters, |
1207 | | - to be used when not available in the dataframe managed by |
1208 | | - :py:class:`MultiBackendJobManager`. |
1209 | | - :param parameter_column_map: Optional overrides |
1210 | | - for linking process parameters to dataframe columns: |
1211 | | - mapping of process parameter names as key |
1212 | | - to dataframe column names as value. |
1213 | | -
|
1214 | | - .. versionadded:: 0.33.0 |
1215 | | -
|
1216 | | - .. warning:: |
1217 | | - This is an experimental API subject to change, |
1218 | | - and we greatly welcome |
1219 | | - `feedback and suggestions for improvement <https://github.com/Open-EO/openeo-python-client/issues>`_. |
1220 | | -
|
1221 | | - """ |
1222 | | - |
1223 | | - def __init__( |
1224 | | - self, |
1225 | | - *, |
1226 | | - process_id: Optional[str] = None, |
1227 | | - namespace: Union[str, None] = None, |
1228 | | - parameter_defaults: Optional[dict] = None, |
1229 | | - parameter_column_map: Optional[dict] = None, |
1230 | | - ): |
1231 | | - if process_id is None and namespace is None: |
1232 | | - raise ValueError("At least one of `process_id` and `namespace` should be provided.") |
1233 | | - self._process_id = process_id |
1234 | | - self._namespace = namespace |
1235 | | - self._parameter_defaults = parameter_defaults or {} |
1236 | | - self._parameter_column_map = parameter_column_map |
1237 | | - self._cache = LazyLoadCache() |
1238 | | - |
1239 | | - def _get_process_definition(self, connection: Connection) -> Process: |
1240 | | - if isinstance(self._namespace, str) and re.match("https?://", self._namespace): |
1241 | | - # Remote process definition handling |
1242 | | - return self._cache.get( |
1243 | | - key=("remote_process_definition", self._namespace, self._process_id), |
1244 | | - load=lambda: parse_remote_process_definition(namespace=self._namespace, process_id=self._process_id), |
1245 | | - ) |
1246 | | - elif self._namespace is None: |
1247 | | - # Handling of a user-specific UDP |
1248 | | - udp_raw = connection.user_defined_process(self._process_id).describe() |
1249 | | - return Process.from_dict(udp_raw) |
1250 | | - else: |
1251 | | - raise NotImplementedError( |
1252 | | - f"Unsupported process definition source udp_id={self._process_id!r} namespace={self._namespace!r}" |
1253 | | - ) |
1254 | | - |
1255 | | - def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob: |
1256 | | - """ |
1257 | | - Implementation of the ``start_job`` callable interface |
1258 | | - of :py:meth:`MultiBackendJobManager.run_jobs` |
1259 | | - to create a job based on given dataframe row |
1260 | | -
|
1261 | | - :param row: The row in the pandas dataframe that stores the jobs state and other tracked data. |
1262 | | - :param connection: The connection to the backend. |
1263 | | - """ |
1264 | | - # TODO: refactor out some methods, for better reuse and decoupling: |
1265 | | - # `get_arguments()` (to build the arguments dictionary), `get_cube()` (to create the cube), |
1266 | | - |
1267 | | - process_definition = self._get_process_definition(connection=connection) |
1268 | | - process_id = process_definition.id |
1269 | | - parameters = process_definition.parameters or [] |
1270 | | - |
1271 | | - if self._parameter_column_map is None: |
1272 | | - self._parameter_column_map = self._guess_parameter_column_map(parameters=parameters, row=row) |
1273 | | - |
1274 | | - arguments = {} |
1275 | | - for parameter in parameters: |
1276 | | - param_name = parameter.name |
1277 | | - column_name = self._parameter_column_map.get(param_name, param_name) |
1278 | | - if column_name in row.index: |
1279 | | - # Get value from dataframe row |
1280 | | - value = row.loc[column_name] |
1281 | | - elif param_name in self._parameter_defaults: |
1282 | | - # Fallback on default values from constructor |
1283 | | - value = self._parameter_defaults[param_name] |
1284 | | - elif parameter.has_default(): |
1285 | | - # Explicitly use default value from parameter schema |
1286 | | - value = parameter.default |
1287 | | - elif parameter.optional: |
1288 | | - # Skip optional parameters without any fallback default value |
1289 | | - continue |
1290 | | - else: |
1291 | | - raise ValueError(f"Missing required parameter {param_name!r} for process {process_id!r}") |
1292 | | - |
1293 | | - # Prepare some values/dtypes for JSON encoding |
1294 | | - if isinstance(value, numpy.integer): |
1295 | | - value = int(value) |
1296 | | - elif isinstance(value, numpy.number): |
1297 | | - value = float(value) |
1298 | | - elif isinstance(value, shapely.geometry.base.BaseGeometry): |
1299 | | - value = shapely.geometry.mapping(value) |
1300 | | - |
1301 | | - arguments[param_name] = value |
1302 | | - |
1303 | | - cube = connection.datacube_from_process(process_id=process_id, namespace=self._namespace, **arguments) |
1304 | | - |
1305 | | - title = row.get("title", f"Process {process_id!r} with {repr_truncate(arguments)}") |
1306 | | - description = row.get("description", f"Process {process_id!r} (namespace {self._namespace}) with {arguments}") |
1307 | | - job = connection.create_job(cube, title=title, description=description) |
1308 | | - |
1309 | | - return job |
1310 | | - |
1311 | | - def __call__(self, *arg, **kwargs) -> BatchJob: |
1312 | | - """Syntactic sugar for calling :py:meth:`start_job`.""" |
1313 | | - return self.start_job(*arg, **kwargs) |
1314 | | - |
1315 | | - @staticmethod |
1316 | | - def _guess_parameter_column_map(parameters: List[Parameter], row: pd.Series) -> dict: |
1317 | | - """ |
1318 | | - Guess parameter-column mapping from given parameter list and dataframe row |
1319 | | - """ |
1320 | | - parameter_column_map = {} |
1321 | | - # Geometry based mapping: try to automatically map geometry columns to geojson parameters |
1322 | | - geojson_parameters = [p.name for p in parameters if p.schema.accepts_geojson()] |
1323 | | - geometry_columns = [i for (i, v) in row.items() if isinstance(v, shapely.geometry.base.BaseGeometry)] |
1324 | | - if geojson_parameters and geometry_columns: |
1325 | | - if len(geojson_parameters) == 1 and len(geometry_columns) == 1: |
1326 | | - # Most common case: one geometry parameter and one geometry column: can be mapped naively |
1327 | | - parameter_column_map[geojson_parameters[0]] = geometry_columns[0] |
1328 | | - elif all(p in geometry_columns for p in geojson_parameters): |
1329 | | - # Each geometry param has geometry column with same name: easy to map |
1330 | | - parameter_column_map.update((p, p) for p in geojson_parameters) |
1331 | | - else: |
1332 | | - raise RuntimeError( |
1333 | | - f"Problem with mapping geometry columns ({geometry_columns}) to process parameters ({geojson_parameters})" |
1334 | | - ) |
1335 | | - _log.debug(f"Guessed parameter-column map: {parameter_column_map}") |
1336 | | - return parameter_column_map |
0 commit comments