Skip to content

Commit ccfd0a9

Browse files
committed
Issue #741: Move ProcessBasedJobCreator to own submodule
1 parent 742999e commit ccfd0a9

File tree

5 files changed

+928
-884
lines changed

5 files changed

+928
-884
lines changed

docs/cookbook/job_manager.rst

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ API
2121
.. autoclass:: openeo.extra.job_management.ParquetJobDatabase
2222

2323

24-
.. autoclass:: openeo.extra.job_management.ProcessBasedJobCreator
24+
.. autoclass:: openeo.extra.job_management.process_based.ProcessBasedJobCreator
2525
:members:
2626
:special-members: __call__
2727

@@ -41,29 +41,29 @@ define a "template" job as a parameterized process
4141
and let the job manager fill in the parameters
4242
from a given data frame.
4343

44-
The :py:class:`~openeo.extra.job_management.ProcessBasedJobCreator` helper class
44+
The :py:class:`~openeo.extra.job_management.process_based.ProcessBasedJobCreator` helper class
4545
allows to do exactly that.
4646
Given a reference to a parameterized process,
4747
such as a user-defined process or remote process definition,
4848
it can be used directly as ``start_job`` callable to
4949
:py:meth:`~openeo.extra.job_management.MultiBackendJobManager.run_jobs`
5050
which will fill in the process parameters from the dataframe.
5151

52-
Basic :py:class:`~openeo.extra.job_management.ProcessBasedJobCreator` example
53-
-----------------------------------------------------------------------------
52+
Basic :py:class:`~openeo.extra.job_management.process_based.ProcessBasedJobCreator` example
53+
--------------------------------------------------------------------------------------------
5454

5555
Basic usage example with a remote process definition:
5656

5757
.. code-block:: python
5858
:linenos:
59-
:caption: Basic :py:class:`~openeo.extra.job_management.ProcessBasedJobCreator` example snippet
59+
:caption: Basic :py:class:`~openeo.extra.job_management.process_based.ProcessBasedJobCreator` example snippet
6060
:emphasize-lines: 10-15, 28
6161
6262
from openeo.extra.job_management import (
6363
MultiBackendJobManager,
6464
create_job_db,
65-
ProcessBasedJobCreator,
6665
)
66+
from openeo.extra.job_management.process_based import ProcessBasedJobCreator
6767
6868
# Job creator, based on a parameterized openEO process
6969
# (specified by the remote process definition at given URL)
@@ -90,7 +90,7 @@ Basic usage example with a remote process definition:
9090
job_manager = MultiBackendJobManager(...)
9191
job_manager.run_jobs(job_db=job_db, start_job=job_starter)
9292
93-
In this example, a :py:class:`ProcessBasedJobCreator` is instantiated
93+
In this example, a :py:class:`~openeo.extra.job_management.process_based.ProcessBasedJobCreator` is instantiated
9494
based on a remote process definition,
9595
which has parameters ``start_date`` and ``bands``.
9696
When passed to :py:meth:`~openeo.extra.job_management.MultiBackendJobManager.run_jobs`,
@@ -103,11 +103,11 @@ with parameter values based on matching columns in the dataframe:
103103
and will get its value from the default specified in the ``parameter_defaults`` argument.
104104

105105

106-
:py:class:`~openeo.extra.job_management.ProcessBasedJobCreator` with geometry handling
107-
---------------------------------------------------------------------------------------------
106+
:py:class:`~openeo.extra.job_management.process_based.ProcessBasedJobCreator` with geometry handling
107+
-----------------------------------------------------------------------------------------------------
108108

109109
Apart from the intuitive name-based parameter-column linking,
110-
:py:class:`~openeo.extra.job_management.ProcessBasedJobCreator`
110+
:py:class:`~openeo.extra.job_management.process_based.ProcessBasedJobCreator`
111111
also automatically links:
112112

113113
- a process parameters that accepts inline GeoJSON geometries/features

openeo/extra/job_management/__init__.py

Lines changed: 12 additions & 185 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import datetime
66
import json
77
import logging
8-
import re
98
import time
109
import warnings
1110
from pathlib import Path
@@ -23,7 +22,6 @@
2322
Union,
2423
)
2524

26-
import numpy
2725
import pandas as pd
2826
import requests
2927
import shapely.errors
@@ -37,14 +35,20 @@
3735
_JobManagerWorkerThreadPool,
3836
_JobStartTask,
3937
)
40-
from openeo.internal.processes.parse import (
41-
Parameter,
42-
Process,
43-
parse_remote_process_definition,
44-
)
38+
from openeo.extra.job_management.process_based import ProcessBasedJobCreator
4539
from openeo.rest import OpenEoApiError
4640
from openeo.rest.auth.auth import BearerAuth
47-
from openeo.util import LazyLoadCache, deep_get, repr_truncate, rfc3339
41+
from openeo.util import deep_get, rfc3339
42+
43+
__all__ = [
44+
"JobDatabaseInterface",
45+
"FullDataFrameJobDatabase",
46+
"ParquetJobDatabase",
47+
"CsvJobDatabase",
48+
"ProcessBasedJobCreator",
49+
"create_job_db",
50+
"get_job_db",
51+
]
4852

4953
_log = logging.getLogger(__name__)
5054

@@ -1157,180 +1161,3 @@ def create_job_db(path: Union[str, Path], df: pd.DataFrame, *, on_exists: str =
11571161
else:
11581162
raise NotImplementedError(f"Initialization of {type(job_db)} is not supported.")
11591163
return job_db
1160-
1161-
1162-
class ProcessBasedJobCreator:
1163-
"""
1164-
Batch job creator
1165-
(to be used together with :py:class:`MultiBackendJobManager`)
1166-
that takes a parameterized openEO process definition
1167-
(e.g a user-defined process (UDP) or a remote openEO process definition),
1168-
and creates a batch job
1169-
for each row of the dataframe managed by the :py:class:`MultiBackendJobManager`
1170-
by filling in the process parameters with corresponding row values.
1171-
1172-
.. seealso::
1173-
See :ref:`job-management-with-process-based-job-creator`
1174-
for more information and examples.
1175-
1176-
Process parameters are linked to dataframe columns by name.
1177-
While this intuitive name-based matching should cover most use cases,
1178-
there are additional options for overrides or fallbacks:
1179-
1180-
- When provided, ``parameter_column_map`` will be consulted
1181-
for resolving a process parameter name (key in the dictionary)
1182-
to a desired dataframe column name (corresponding value).
1183-
- One common case is handled automatically as convenience functionality.
1184-
1185-
When:
1186-
1187-
- ``parameter_column_map`` is not provided (or set to ``None``),
1188-
- and there is a *single parameter* that accepts inline GeoJSON geometries,
1189-
- and the dataframe is a GeoPandas dataframe with a *single geometry* column,
1190-
1191-
then this parameter and this geometries column will be linked automatically.
1192-
1193-
- If a parameter can not be matched with a column by name as described above,
1194-
a default value will be picked,
1195-
first by looking in ``parameter_defaults`` (if provided),
1196-
and then by looking up the default value from the parameter schema in the process definition.
1197-
- Finally if no (default) value can be determined and the parameter
1198-
is not flagged as optional, an error will be raised.
1199-
1200-
1201-
:param process_id: (optional) openEO process identifier.
1202-
Can be omitted when working with a remote process definition
1203-
that is fully defined with a URL in the ``namespace`` parameter.
1204-
:param namespace: (optional) openEO process namespace.
1205-
Typically used to provide a URL to a remote process definition.
1206-
:param parameter_defaults: (optional) default values for process parameters,
1207-
to be used when not available in the dataframe managed by
1208-
:py:class:`MultiBackendJobManager`.
1209-
:param parameter_column_map: Optional overrides
1210-
for linking process parameters to dataframe columns:
1211-
mapping of process parameter names as key
1212-
to dataframe column names as value.
1213-
1214-
.. versionadded:: 0.33.0
1215-
1216-
.. warning::
1217-
This is an experimental API subject to change,
1218-
and we greatly welcome
1219-
`feedback and suggestions for improvement <https://github.com/Open-EO/openeo-python-client/issues>`_.
1220-
1221-
"""
1222-
1223-
def __init__(
1224-
self,
1225-
*,
1226-
process_id: Optional[str] = None,
1227-
namespace: Union[str, None] = None,
1228-
parameter_defaults: Optional[dict] = None,
1229-
parameter_column_map: Optional[dict] = None,
1230-
):
1231-
if process_id is None and namespace is None:
1232-
raise ValueError("At least one of `process_id` and `namespace` should be provided.")
1233-
self._process_id = process_id
1234-
self._namespace = namespace
1235-
self._parameter_defaults = parameter_defaults or {}
1236-
self._parameter_column_map = parameter_column_map
1237-
self._cache = LazyLoadCache()
1238-
1239-
def _get_process_definition(self, connection: Connection) -> Process:
1240-
if isinstance(self._namespace, str) and re.match("https?://", self._namespace):
1241-
# Remote process definition handling
1242-
return self._cache.get(
1243-
key=("remote_process_definition", self._namespace, self._process_id),
1244-
load=lambda: parse_remote_process_definition(namespace=self._namespace, process_id=self._process_id),
1245-
)
1246-
elif self._namespace is None:
1247-
# Handling of a user-specific UDP
1248-
udp_raw = connection.user_defined_process(self._process_id).describe()
1249-
return Process.from_dict(udp_raw)
1250-
else:
1251-
raise NotImplementedError(
1252-
f"Unsupported process definition source udp_id={self._process_id!r} namespace={self._namespace!r}"
1253-
)
1254-
1255-
def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob:
1256-
"""
1257-
Implementation of the ``start_job`` callable interface
1258-
of :py:meth:`MultiBackendJobManager.run_jobs`
1259-
to create a job based on given dataframe row
1260-
1261-
:param row: The row in the pandas dataframe that stores the jobs state and other tracked data.
1262-
:param connection: The connection to the backend.
1263-
"""
1264-
# TODO: refactor out some methods, for better reuse and decoupling:
1265-
# `get_arguments()` (to build the arguments dictionary), `get_cube()` (to create the cube),
1266-
1267-
process_definition = self._get_process_definition(connection=connection)
1268-
process_id = process_definition.id
1269-
parameters = process_definition.parameters or []
1270-
1271-
if self._parameter_column_map is None:
1272-
self._parameter_column_map = self._guess_parameter_column_map(parameters=parameters, row=row)
1273-
1274-
arguments = {}
1275-
for parameter in parameters:
1276-
param_name = parameter.name
1277-
column_name = self._parameter_column_map.get(param_name, param_name)
1278-
if column_name in row.index:
1279-
# Get value from dataframe row
1280-
value = row.loc[column_name]
1281-
elif param_name in self._parameter_defaults:
1282-
# Fallback on default values from constructor
1283-
value = self._parameter_defaults[param_name]
1284-
elif parameter.has_default():
1285-
# Explicitly use default value from parameter schema
1286-
value = parameter.default
1287-
elif parameter.optional:
1288-
# Skip optional parameters without any fallback default value
1289-
continue
1290-
else:
1291-
raise ValueError(f"Missing required parameter {param_name!r} for process {process_id!r}")
1292-
1293-
# Prepare some values/dtypes for JSON encoding
1294-
if isinstance(value, numpy.integer):
1295-
value = int(value)
1296-
elif isinstance(value, numpy.number):
1297-
value = float(value)
1298-
elif isinstance(value, shapely.geometry.base.BaseGeometry):
1299-
value = shapely.geometry.mapping(value)
1300-
1301-
arguments[param_name] = value
1302-
1303-
cube = connection.datacube_from_process(process_id=process_id, namespace=self._namespace, **arguments)
1304-
1305-
title = row.get("title", f"Process {process_id!r} with {repr_truncate(arguments)}")
1306-
description = row.get("description", f"Process {process_id!r} (namespace {self._namespace}) with {arguments}")
1307-
job = connection.create_job(cube, title=title, description=description)
1308-
1309-
return job
1310-
1311-
def __call__(self, *arg, **kwargs) -> BatchJob:
1312-
"""Syntactic sugar for calling :py:meth:`start_job`."""
1313-
return self.start_job(*arg, **kwargs)
1314-
1315-
@staticmethod
1316-
def _guess_parameter_column_map(parameters: List[Parameter], row: pd.Series) -> dict:
1317-
"""
1318-
Guess parameter-column mapping from given parameter list and dataframe row
1319-
"""
1320-
parameter_column_map = {}
1321-
# Geometry based mapping: try to automatically map geometry columns to geojson parameters
1322-
geojson_parameters = [p.name for p in parameters if p.schema.accepts_geojson()]
1323-
geometry_columns = [i for (i, v) in row.items() if isinstance(v, shapely.geometry.base.BaseGeometry)]
1324-
if geojson_parameters and geometry_columns:
1325-
if len(geojson_parameters) == 1 and len(geometry_columns) == 1:
1326-
# Most common case: one geometry parameter and one geometry column: can be mapped naively
1327-
parameter_column_map[geojson_parameters[0]] = geometry_columns[0]
1328-
elif all(p in geometry_columns for p in geojson_parameters):
1329-
# Each geometry param has geometry column with same name: easy to map
1330-
parameter_column_map.update((p, p) for p in geojson_parameters)
1331-
else:
1332-
raise RuntimeError(
1333-
f"Problem with mapping geometry columns ({geometry_columns}) to process parameters ({geojson_parameters})"
1334-
)
1335-
_log.debug(f"Guessed parameter-column map: {parameter_column_map}")
1336-
return parameter_column_map

0 commit comments

Comments
 (0)