Skip to content

Commit 6f7ffc6

Browse files
committed
Merge remote-tracking branch 'origin/master' into issue801-py314
2 parents 111046a + 7d11a4e commit 6f7ffc6

File tree

16 files changed

+572
-193
lines changed

16 files changed

+572
-193
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1313

1414
### Removed
1515

16+
- Remove unused/outdated `XarrayDataCube.plot()` and its related matplotlib dependency ([#472](https://github.com/Open-EO/openeo-python-client/issues/472))
17+
1618
### Fixed
1719

20+
- `DataCube.sar_backscatter()`: add corresponding band names to metadata when enabling "mask", "contributing_area", "local_incidence_angle" or "ellipsoid_incidence_angle" ([#804](https://github.com/Open-EO/openeo-python-client/issues/804))
21+
- Proactively refresh access/bearer token in `MultiBackendJobManager` before launching a job start thread ([#817](https://github.com/Open-EO/openeo-python-client/issues/817))
22+
- `Connection.list_services()`: Fix list access error for federation extension
23+
1824

1925
## [0.45.0] - 2025-09-17
2026

openeo/_version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.46.0a1"
1+
__version__ = "0.46.0a2"

openeo/extra/job_management/__init__.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,8 @@ def __init__(
244244
)
245245
self._thread = None
246246
self._worker_pool = None
247+
# Generic cache
248+
self._cache = {}
247249

248250
def add_backend(
249251
self,
@@ -650,6 +652,8 @@ def _launch_job(self, start_job, df, i, backend_name, stats: Optional[dict] = No
650652
# start job if not yet done by callback
651653
try:
652654
job_con = job.connection
655+
# Proactively refresh bearer token (because task in thread will not be able to do that)
656+
self._refresh_bearer_token(connection=job_con)
653657
task = _JobStartTask(
654658
root_url=job_con.root_url,
655659
bearer_token=job_con.auth.bearer if isinstance(job_con.auth, BearerAuth) else None,
@@ -670,6 +674,21 @@ def _launch_job(self, start_job, df, i, backend_name, stats: Optional[dict] = No
670674
df.loc[i, "status"] = "skipped"
671675
stats["start_job skipped"] += 1
672676

677+
def _refresh_bearer_token(self, connection: Connection, *, max_age: float = 60) -> None:
678+
"""
679+
Helper to proactively refresh the bearer (access) token of the connection
680+
(but not too often, based on `max_age`).
681+
"""
682+
# TODO: be smarter about timing, e.g. by inspecting expiry of current token?
683+
now = time.time()
684+
key = f"connection:{id(connection)}:refresh-time"
685+
if self._cache.get(key, 0) + max_age < now:
686+
refreshed = connection.try_access_token_refresh()
687+
if refreshed:
688+
self._cache[key] = now
689+
else:
690+
_log.warning("Failed to proactively refresh bearer token")
691+
673692
def _process_threadworker_updates(
674693
self,
675694
worker_pool: _JobManagerWorkerThreadPool,

openeo/metadata.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -193,8 +193,10 @@ def filter_bands(self, bands: List[Union[int, str]]) -> BandDimension:
193193
bands=[self.bands[self.band_index(b)] for b in bands]
194194
)
195195

196-
def append_band(self, band: Band) -> BandDimension:
196+
def append_band(self, band: Union[Band, str]) -> BandDimension:
197197
"""Create new BandDimension with appended band."""
198+
if isinstance(band, str):
199+
band = Band(name=band)
198200
if band.name in self.band_names:
199201
raise ValueError("Duplicate band {b!r}".format(b=band))
200202

@@ -260,7 +262,10 @@ class CubeMetadata:
260262

261263
def __init__(self, dimensions: Optional[List[Dimension]] = None):
262264
# Original collection metadata (actual cube metadata might be altered through processes)
263-
self._dimensions = dimensions
265+
# TODO: for `self._dimensions` we use `None` here to indicate an unknown/unspecified dimension set,
266+
# but most usage actually assumes it is a list that can be iterated over.
267+
# Can we handle this more consistently and less error-prone?
268+
self._dimensions: Union[List[Dimension], None] = dimensions
264269
self._band_dimension = None
265270
self._temporal_dimension = None
266271

@@ -283,8 +288,12 @@ def __eq__(self, o: Any) -> bool:
283288
return isinstance(o, type(self)) and self._dimensions == o._dimensions
284289

285290
def __repr__(self) -> str:
286-
bands = self.band_names if self.has_band_dimension() else "no bands dimension"
287-
return f"{self.__class__.__name__}({bands} - {self.dimension_names()})"
291+
if self.has_band_dimension():
292+
return f"{self.__class__.__name__}(dimension_names={self.dimension_names()}, band_names={self.band_names})"
293+
elif self._dimensions is not None:
294+
return f"{self.__class__.__name__}(dimension_names={self.dimension_names()})"
295+
else:
296+
return f"{self.__class__.__name__}(dimensions=None)"
288297

289298
def __str__(self) -> str:
290299
bands = self.band_names if self.has_band_dimension() else "no bands dimension"
@@ -297,7 +306,10 @@ def _clone_and_update(self, dimensions: Optional[List[Dimension]] = None, **kwar
297306
dimensions = self._dimensions
298307
return cls(dimensions=dimensions, **kwargs)
299308

300-
def dimension_names(self) -> List[str]:
309+
def dimension_names(self) -> Union[List[str], None]:
310+
if self._dimensions is None:
311+
# TODO: better solution for unknown dimensions?
312+
return None
301313
return list(d.name for d in self._dimensions)
302314

303315
def assert_valid_dimension(self, dimension: str) -> str:
@@ -369,7 +381,7 @@ def filter_bands(self, band_names: List[Union[int, str]]) -> CubeMetadata:
369381
dimensions=[d.filter_bands(band_names) if isinstance(d, BandDimension) else d for d in self._dimensions]
370382
)
371383

372-
def append_band(self, band: Band) -> CubeMetadata:
384+
def append_band(self, band: Union[Band, str]) -> CubeMetadata:
373385
"""
374386
Create new `CubeMetadata` with given band added to band dimension.
375387
"""

openeo/rest/_testing.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,22 @@ def at_url(cls, root_url: str, *, requests_mock, capabilities: Optional[dict] =
132132
connection = Connection(root_url)
133133
return cls(requests_mock=requests_mock, connection=connection)
134134

135+
def setup_credentials_oidc(self, *, issuer: str = "https://oidc.test", id: str = "oi"):
136+
self._requests_mock.get(
137+
self.connection.build_url("/credentials/oidc"),
138+
json={
139+
"providers": [
140+
{
141+
"id": id,
142+
"issuer": issuer,
143+
"title": id,
144+
"scopes": ["openid"],
145+
}
146+
]
147+
},
148+
)
149+
return self
150+
135151
def setup_collection(
136152
self,
137153
collection_id: str,

openeo/rest/auth/testing.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,11 @@ def token_callback_resource_owner_password_credentials(self, params: dict, conte
143143
assert params["scope"] == self.expected_fields["scope"]
144144
return self._build_token_response()
145145

146+
def token_callback_block_400(self, params: dict, context):
147+
"""Failing callback with 400 Bad Request"""
148+
context.status_code = 400
149+
return "block_400"
150+
146151
def device_code_callback(self, request: requests_mock.request._RequestObjectProxy, context):
147152
params = self._get_query_params(query=request.text)
148153
assert params["client_id"] == self.expected_client_id

openeo/rest/connection.py

Lines changed: 59 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -342,28 +342,32 @@ def _authenticate_oidc(
342342
*,
343343
provider_id: str,
344344
store_refresh_token: bool = False,
345-
fallback_refresh_token_to_store: Optional[str] = None,
345+
auto_renew_from_refresh_token: bool = False,
346+
fallback_refresh_token: Optional[str] = None,
346347
oidc_auth_renewer: Optional[OidcAuthenticator] = None,
347348
) -> Connection:
348349
"""
349350
Authenticate through OIDC and set up bearer token (based on OIDC access_token) for further requests.
350351
"""
351-
tokens = authenticator.get_tokens(request_refresh_token=store_refresh_token)
352+
request_refresh_token = store_refresh_token or (not oidc_auth_renewer and auto_renew_from_refresh_token)
353+
tokens = authenticator.get_tokens(request_refresh_token=request_refresh_token)
352354
_log.info("Obtained tokens: {t}".format(t=[k for k, v in tokens._asdict().items() if v]))
355+
356+
refresh_token = tokens.refresh_token or fallback_refresh_token
353357
if store_refresh_token:
354-
refresh_token = tokens.refresh_token or fallback_refresh_token_to_store
355358
if refresh_token:
356359
self._get_refresh_token_store().set_refresh_token(
357360
issuer=authenticator.provider_info.issuer,
358361
client_id=authenticator.client_id,
359362
refresh_token=refresh_token
360363
)
361-
if not oidc_auth_renewer:
362-
oidc_auth_renewer = OidcRefreshTokenAuthenticator(
363-
client_info=authenticator.client_info, refresh_token=refresh_token
364-
)
365364
else:
366365
_log.warning("No OIDC refresh token to store.")
366+
if not oidc_auth_renewer and auto_renew_from_refresh_token and refresh_token:
367+
oidc_auth_renewer = OidcRefreshTokenAuthenticator(
368+
client_info=authenticator.client_info, refresh_token=refresh_token
369+
)
370+
367371
token = tokens.access_token
368372
self.auth = OidcBearerAuth(provider_id=provider_id, access_token=token)
369373
self._oidc_auth_renewer = oidc_auth_renewer
@@ -452,7 +456,12 @@ def authenticate_oidc_resource_owner_password_credentials(
452456
authenticator = OidcResourceOwnerPasswordAuthenticator(
453457
client_info=client_info, username=username, password=password
454458
)
455-
return self._authenticate_oidc(authenticator, provider_id=provider_id, store_refresh_token=store_refresh_token)
459+
return self._authenticate_oidc(
460+
authenticator,
461+
provider_id=provider_id,
462+
store_refresh_token=store_refresh_token,
463+
oidc_auth_renewer=authenticator,
464+
)
456465

457466
def authenticate_oidc_refresh_token(
458467
self,
@@ -493,7 +502,7 @@ def authenticate_oidc_refresh_token(
493502
authenticator,
494503
provider_id=provider_id,
495504
store_refresh_token=store_refresh_token,
496-
fallback_refresh_token_to_store=refresh_token,
505+
fallback_refresh_token=refresh_token,
497506
oidc_auth_renewer=authenticator,
498507
)
499508

@@ -534,7 +543,13 @@ def authenticate_oidc_device(
534543
authenticator = OidcDeviceAuthenticator(
535544
client_info=client_info, use_pkce=use_pkce, max_poll_time=max_poll_time, **kwargs
536545
)
537-
return self._authenticate_oidc(authenticator, provider_id=provider_id, store_refresh_token=store_refresh_token)
546+
return self._authenticate_oidc(
547+
authenticator,
548+
provider_id=provider_id,
549+
store_refresh_token=store_refresh_token,
550+
# TODO: expose `auto_renew_from_refresh_token` directly as option instead of reusing `store_refresh_token` arg?
551+
auto_renew_from_refresh_token=store_refresh_token,
552+
)
538553

539554
def authenticate_oidc(
540555
self,
@@ -604,7 +619,8 @@ def authenticate_oidc(
604619
authenticator,
605620
provider_id=provider_id,
606621
store_refresh_token=store_refresh_token,
607-
fallback_refresh_token_to_store=refresh_token,
622+
fallback_refresh_token=refresh_token,
623+
oidc_auth_renewer=authenticator,
608624
)
609625
# TODO: pluggable/jupyter-aware display function?
610626
print("Authenticated using refresh token.")
@@ -622,6 +638,8 @@ def authenticate_oidc(
622638
authenticator,
623639
provider_id=provider_id,
624640
store_refresh_token=store_refresh_token,
641+
# TODO: expose `auto_renew_from_refresh_token` directly as option instead of reusing `store_refresh_token` arg?
642+
auto_renew_from_refresh_token=store_refresh_token,
625643
)
626644
print("Authenticated using device code flow.")
627645
return con
@@ -665,6 +683,28 @@ def authenticate_bearer_token(self, bearer_token: str) -> Connection:
665683
self._oidc_auth_renewer = None
666684
return self
667685

686+
def try_access_token_refresh(self, *, reason: Optional[str] = None) -> bool:
687+
"""
688+
Try to get a fresh access token if possible.
689+
Returns whether a new access token was obtained.
690+
"""
691+
reason = f" Reason: {reason}" if reason else ""
692+
if isinstance(self.auth, OidcBearerAuth) and self._oidc_auth_renewer:
693+
try:
694+
self._authenticate_oidc(
695+
authenticator=self._oidc_auth_renewer,
696+
provider_id=self._oidc_auth_renewer.provider_info.id,
697+
store_refresh_token=False,
698+
oidc_auth_renewer=self._oidc_auth_renewer,
699+
)
700+
_log.info(f"Obtained new access token (grant {self._oidc_auth_renewer.grant_type!r}).{reason}")
701+
return True
702+
except OpenEoClientException as auth_exc:
703+
_log.error(
704+
f"Failed to obtain new access token (grant {self._oidc_auth_renewer.grant_type!r}): {auth_exc!r}.{reason}"
705+
)
706+
return False
707+
668708
def request(
669709
self,
670710
method: str,
@@ -690,24 +730,11 @@ def _request():
690730
api_exc.http_status_code in {HTTP_401_UNAUTHORIZED, HTTP_403_FORBIDDEN}
691731
and api_exc.code == "TokenInvalid"
692732
):
693-
# Auth token expired: can we refresh?
694-
if isinstance(self.auth, OidcBearerAuth) and self._oidc_auth_renewer:
695-
msg = f"OIDC access token expired ({api_exc.http_status_code} {api_exc.code})."
696-
try:
697-
self._authenticate_oidc(
698-
authenticator=self._oidc_auth_renewer,
699-
provider_id=self._oidc_auth_renewer.provider_info.id,
700-
store_refresh_token=False,
701-
oidc_auth_renewer=self._oidc_auth_renewer,
702-
)
703-
_log.info(f"{msg} Obtained new access token (grant {self._oidc_auth_renewer.grant_type!r}).")
704-
except OpenEoClientException as auth_exc:
705-
_log.error(
706-
f"{msg} Failed to obtain new access token (grant {self._oidc_auth_renewer.grant_type!r}): {auth_exc!r}."
707-
)
708-
else:
709-
# Retry request.
710-
return _request()
733+
# Retry if we can refresh the access token
734+
if self.try_access_token_refresh(
735+
reason=f"OIDC access token expired ({api_exc.http_status_code} {api_exc.code})."
736+
):
737+
return _request()
711738
raise
712739

713740
def describe_account(self) -> dict:
@@ -821,12 +848,12 @@ def list_services(self) -> list:
821848
:return: data_dict: Dict All available services
822849
"""
823850
# TODO return parsed service objects
824-
services = self.get('/services', expected_status=200).json()["services"]
825-
federation_missing = federation_extension.get_federation_missing(data=services, resource_name="services")
851+
response = self.get("/services", expected_status=200).json()
852+
federation_missing = federation_extension.get_federation_missing(data=response, resource_name="services")
826853
federation = self.capabilities().ext_federation_backend_details()
827854
return VisualList(
828855
"data-table",
829-
data=services,
856+
data=response["services"],
830857
parameters={"columns": "services", "missing": federation_missing, "federation": federation},
831858
)
832859

openeo/rest/datacube.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2943,7 +2943,16 @@ def sar_backscatter(
29432943
}
29442944
if options:
29452945
arguments["options"] = options
2946-
return self.process(process_id="sar_backscatter", arguments=arguments)
2946+
metadata = self.metadata
2947+
if mask:
2948+
metadata = metadata.append_band(band="mask")
2949+
if contributing_area:
2950+
metadata = metadata.append_band(band="contributing_area")
2951+
if local_incidence_angle:
2952+
metadata = metadata.append_band(band="local_incidence_angle")
2953+
if ellipsoid_incidence_angle:
2954+
metadata = metadata.append_band(band="ellipsoid_incidence_angle")
2955+
return self.process(process_id="sar_backscatter", arguments=arguments, metadata=metadata)
29472956

29482957
@openeo_process
29492958
def fit_curve(self, parameters: list, function: Union[str, PGNode, typing.Callable], dimension: str):

0 commit comments

Comments
 (0)