Skip to content

Commit 151308a

Browse files
Creation of DTS example and passing of completionToken (#40)
* Creation of DTS example and passing of completionToken Signed-off-by: Ryan Lettieri <[email protected]> * Adressing review feedback Signed-off-by: Ryan Lettieri <[email protected]> * Reverting dapr readme Signed-off-by: Ryan Lettieri <[email protected]> * Adding accessTokenManager class for refreshing credential token Signed-off-by: Ryan Lettieri <[email protected]> * Adding comments to the example Signed-off-by: Ryan Lettieri <[email protected]> * Adding in requirement for azure-identity Signed-off-by: Ryan Lettieri <[email protected]> * Moving dts logic into its own module Signed-off-by: Ryan Lettieri <[email protected]> * Fixing whitesapce Signed-off-by: Ryan Lettieri <[email protected]> * Updating dts client to refresh token Signed-off-by: Ryan Lettieri <[email protected]> * Cleaning up construction of dts objects and improving examples Signed-off-by: Ryan Lettieri <[email protected]> * Migrating shared access token logic to new grpc class Signed-off-by: Ryan Lettieri <[email protected]> * Adding log statements to access_token_manager Signed-off-by: Ryan Lettieri <[email protected]> * breaking for loop when setting interceptors Signed-off-by: Ryan Lettieri <[email protected]> * Removing changes to client.py and adding additional steps to readme.md Signed-off-by: Ryan Lettieri <[email protected]> * Refactoring client and worker to pass around interceptors Signed-off-by: Ryan Lettieri <[email protected]> * Fixing import for DefaultClientInterceptorImpl Signed-off-by: Ryan Lettieri <[email protected]> * Adressing round 1 of feedback Signed-off-by: Ryan Lettieri <[email protected]> * Fixing interceptor issue Signed-off-by: Ryan Lettieri <[email protected]> * Moving some files around to remove dependencies Signed-off-by: Ryan Lettieri <[email protected]> * Adressing more feedback Signed-off-by: Ryan Lettieri <[email protected]> * More review feedback Signed-off-by: Ryan Lettieri <[email protected]> * Passing token credential as an argument rather than 2 strings Signed-off-by: Ryan Lettieri <[email protected]> * More review feedback for token passing Signed-off-by: Ryan Lettieri <[email protected]> * Addressing None comment and using correct metadata Signed-off-by: Ryan Lettieri <[email protected]> * Updating unit tests Signed-off-by: Ryan Lettieri <[email protected]> * Fixing the type for the unit test Signed-off-by: Ryan Lettieri <[email protected]> * Fixing grpc calls Signed-off-by: Ryan Lettieri <[email protected]> * Fix linter errors and update documentation * Specifying version reqiuirement for pyproject.toml Signed-off-by: Ryan Lettieri <[email protected]> * Updating README Signed-off-by: Ryan Lettieri <[email protected]> * Adding comment for credential type Signed-off-by: Ryan Lettieri <[email protected]> --------- Signed-off-by: Ryan Lettieri <[email protected]> Signed-off-by: Ryan Lettieri <[email protected]> Co-authored-by: Chris Gillum <[email protected]>
1 parent c0ce226 commit 151308a

20 files changed

+514
-54
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1111

1212
- Added `set_custom_status` orchestrator API ([#31](https://github.com/microsoft/durabletask-python/pull/31)) - contributed by [@famarting](https://github.com/famarting)
1313
- Added `purge_orchestration` client API ([#34](https://github.com/microsoft/durabletask-python/pull/34)) - contributed by [@famarting](https://github.com/famarting)
14+
- Added new `durabletask-azuremanaged` package for use with the [Durable Task Scheduler](https://techcommunity.microsoft.com/blog/appsonazureblog/announcing-limited-early-access-of-the-durable-task-scheduler-for-azure-durable-/4286526) - by [@RyanLettieri](https://github.com/RyanLettieri)
1415

1516
### Changes
1617

README.md

+3-4
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
1-
# Durable Task Client SDK for Python
1+
# Durable Task SDK for Python
22

33
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
44
[![Build Validation](https://github.com/microsoft/durabletask-python/actions/workflows/pr-validation.yml/badge.svg)](https://github.com/microsoft/durabletask-python/actions/workflows/pr-validation.yml)
55
[![PyPI version](https://badge.fury.io/py/durabletask.svg)](https://badge.fury.io/py/durabletask)
66

7-
This repo contains a Python client SDK for use with the [Durable Task Framework for Go](https://github.com/microsoft/durabletask-go) and [Dapr Workflow](https://docs.dapr.io/developing-applications/building-blocks/workflow/workflow-overview/). With this SDK, you can define, schedule, and manage durable orchestrations using ordinary Python code.
7+
This repo contains a Python SDK for use with the [Azure Durable Task Scheduler](https://techcommunity.microsoft.com/blog/appsonazureblog/announcing-limited-early-access-of-the-durable-task-scheduler-for-azure-durable-/4286526) and the [Durable Task Framework for Go](https://github.com/microsoft/durabletask-go). With this SDK, you can define, schedule, and manage durable orchestrations using ordinary Python code.
88

99
⚠️ **This SDK is currently under active development and is not yet ready for production use.** ⚠️
1010

11-
> Note that this project is **not** currently affiliated with the [Durable Functions](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-overview) project for Azure Functions. If you are looking for a Python SDK for Durable Functions, please see [this repo](https://github.com/Azure/azure-functions-durable-python).
12-
11+
> Note that this SDK is **not** currently compatible with [Azure Durable Functions](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-overview). If you are looking for a Python SDK for Azure Durable Functions, please see [this repo](https://github.com/Azure/azure-functions-durable-python).
1312
1413
## Supported patterns
1514

durabletask-azuremanaged/__init__.py

Whitespace-only changes.

durabletask-azuremanaged/durabletask/azuremanaged/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT License.
3+
4+
from azure.core.credentials import TokenCredential
5+
6+
from durabletask.azuremanaged.internal.durabletask_grpc_interceptor import \
7+
DTSDefaultClientInterceptorImpl
8+
from durabletask.client import TaskHubGrpcClient
9+
10+
11+
# Client class used for Durable Task Scheduler (DTS)
12+
class DurableTaskSchedulerClient(TaskHubGrpcClient):
13+
def __init__(self, *,
14+
host_address: str,
15+
taskhub: str,
16+
token_credential: TokenCredential,
17+
secure_channel: bool = True):
18+
19+
if not taskhub:
20+
raise ValueError("Taskhub value cannot be empty. Please provide a value for your taskhub")
21+
22+
interceptors = [DTSDefaultClientInterceptorImpl(token_credential, taskhub)]
23+
24+
# We pass in None for the metadata so we don't construct an additional interceptor in the parent class
25+
# Since the parent class doesn't use anything metadata for anything else, we can set it as None
26+
super().__init__(
27+
host_address=host_address,
28+
secure_channel=secure_channel,
29+
metadata=None,
30+
interceptors=interceptors)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT License.
3+
from datetime import datetime, timedelta, timezone
4+
from typing import Optional
5+
6+
from azure.core.credentials import AccessToken, TokenCredential
7+
8+
import durabletask.internal.shared as shared
9+
10+
11+
# By default, when there's 10minutes left before the token expires, refresh the token
12+
class AccessTokenManager:
13+
14+
_token: Optional[AccessToken]
15+
16+
def __init__(self, token_credential: Optional[TokenCredential], refresh_interval_seconds: int = 600):
17+
self._scope = "https://durabletask.io/.default"
18+
self._refresh_interval_seconds = refresh_interval_seconds
19+
self._logger = shared.get_logger("token_manager")
20+
21+
self._credential = token_credential
22+
23+
if self._credential is not None:
24+
self._token = self._credential.get_token(self._scope)
25+
self.expiry_time = datetime.fromtimestamp(self._token.expires_on, tz=timezone.utc)
26+
else:
27+
self._token = None
28+
self.expiry_time = None
29+
30+
def get_access_token(self) -> Optional[AccessToken]:
31+
if self._token is None or self.is_token_expired():
32+
self.refresh_token()
33+
return self._token
34+
35+
# Checks if the token is expired, or if it will expire in the next "refresh_interval_seconds" seconds.
36+
# For example, if the token is created to have a lifespan of 2 hours, and the refresh buffer is set to 30 minutes,
37+
# We will grab a new token when there're 30minutes left on the lifespan of the token
38+
def is_token_expired(self) -> bool:
39+
if self.expiry_time is None:
40+
return True
41+
return datetime.now(timezone.utc) >= (self.expiry_time - timedelta(seconds=self._refresh_interval_seconds))
42+
43+
def refresh_token(self):
44+
if self._credential is not None:
45+
self._token = self._credential.get_token(self._scope)
46+
47+
# Convert UNIX timestamp to timezone-aware datetime
48+
self.expiry_time = datetime.fromtimestamp(self._token.expires_on, tz=timezone.utc)
49+
self._logger.debug(f"Token refreshed. Expires at: {self.expiry_time}")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT License.
3+
4+
import grpc
5+
from azure.core.credentials import TokenCredential
6+
7+
from durabletask.azuremanaged.internal.access_token_manager import \
8+
AccessTokenManager
9+
from durabletask.internal.grpc_interceptor import (
10+
DefaultClientInterceptorImpl, _ClientCallDetails)
11+
12+
13+
class DTSDefaultClientInterceptorImpl (DefaultClientInterceptorImpl):
14+
"""The class implements a UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor,
15+
StreamUnaryClientInterceptor and StreamStreamClientInterceptor from grpc to add an
16+
interceptor to add additional headers to all calls as needed."""
17+
18+
def __init__(self, token_credential: TokenCredential, taskhub_name: str):
19+
self._metadata = [("taskhub", taskhub_name)]
20+
super().__init__(self._metadata)
21+
22+
if token_credential is not None:
23+
self._token_credential = token_credential
24+
self._token_manager = AccessTokenManager(token_credential=self._token_credential)
25+
access_token = self._token_manager.get_access_token()
26+
if access_token is not None:
27+
self._metadata.append(("authorization", f"Bearer {access_token.token}"))
28+
29+
def _intercept_call(
30+
self, client_call_details: _ClientCallDetails) -> grpc.ClientCallDetails:
31+
"""Internal intercept_call implementation which adds metadata to grpc metadata in the RPC
32+
call details."""
33+
# Refresh the auth token if it is present and needed
34+
if self._metadata is not None:
35+
for i, (key, _) in enumerate(self._metadata):
36+
if key.lower() == "authorization": # Ensure case-insensitive comparison
37+
new_token = self._token_manager.get_access_token() # Get the new token
38+
if new_token is not None:
39+
self._metadata[i] = ("authorization", f"Bearer {new_token.token}") # Update the token
40+
41+
return super()._intercept_call(client_call_details)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT License.
3+
4+
from azure.core.credentials import TokenCredential
5+
6+
from durabletask.azuremanaged.internal.durabletask_grpc_interceptor import \
7+
DTSDefaultClientInterceptorImpl
8+
from durabletask.worker import TaskHubGrpcWorker
9+
10+
11+
# Worker class used for Durable Task Scheduler (DTS)
12+
class DurableTaskSchedulerWorker(TaskHubGrpcWorker):
13+
def __init__(self, *,
14+
host_address: str,
15+
taskhub: str,
16+
token_credential: TokenCredential,
17+
secure_channel: bool = True):
18+
19+
if not taskhub:
20+
raise ValueError("The taskhub value cannot be empty.")
21+
22+
interceptors = [DTSDefaultClientInterceptorImpl(token_credential, taskhub)]
23+
24+
# We pass in None for the metadata so we don't construct an additional interceptor in the parent class
25+
# Since the parent class doesn't use anything metadata for anything else, we can set it as None
26+
super().__init__(
27+
host_address=host_address,
28+
secure_channel=secure_channel,
29+
metadata=None,
30+
interceptors=interceptors)
+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT License.
3+
4+
# For more information on pyproject.toml, see https://peps.python.org/pep-0621/
5+
6+
[build-system]
7+
requires = ["setuptools", "wheel"]
8+
build-backend = "setuptools.build_meta"
9+
10+
[project]
11+
name = "durabletask.azuremanaged"
12+
version = "0.1b1"
13+
description = "Extensions for the Durable Task Python SDK for integrating with the Durable Task Scheduler in Azure"
14+
keywords = [
15+
"durable",
16+
"task",
17+
"workflow",
18+
"azure"
19+
]
20+
classifiers = [
21+
"Development Status :: 3 - Alpha",
22+
"Programming Language :: Python :: 3",
23+
"License :: OSI Approved :: MIT License",
24+
]
25+
requires-python = ">=3.9"
26+
license = {file = "LICENSE"}
27+
readme = "README.md"
28+
dependencies = [
29+
"durabletask>=0.2.0",
30+
"azure-identity>=1.19.0"
31+
]
32+
33+
[project.urls]
34+
repository = "https://github.com/microsoft/durabletask-python"
35+
changelog = "https://github.com/microsoft/durabletask-python/blob/main/CHANGELOG.md"
36+
37+
[tool.setuptools.packages.find]
38+
include = ["durabletask.azuremanaged", "durabletask.azuremanaged.*"]
39+
40+
[tool.pytest.ini_options]
41+
minversion = "6.0"

durabletask/client.py

+22-4
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from dataclasses import dataclass
77
from datetime import datetime
88
from enum import Enum
9-
from typing import Any, Optional, TypeVar, Union
9+
from typing import Any, Optional, Sequence, TypeVar, Union
1010

1111
import grpc
1212
from google.protobuf import wrappers_pb2
@@ -16,6 +16,7 @@
1616
import durabletask.internal.orchestrator_service_pb2_grpc as stubs
1717
import durabletask.internal.shared as shared
1818
from durabletask import task
19+
from durabletask.internal.grpc_interceptor import DefaultClientInterceptorImpl
1920

2021
TInput = TypeVar('TInput')
2122
TOutput = TypeVar('TOutput')
@@ -96,8 +97,25 @@ def __init__(self, *,
9697
metadata: Optional[list[tuple[str, str]]] = None,
9798
log_handler: Optional[logging.Handler] = None,
9899
log_formatter: Optional[logging.Formatter] = None,
99-
secure_channel: bool = False):
100-
channel = shared.get_grpc_channel(host_address, metadata, secure_channel=secure_channel)
100+
secure_channel: bool = False,
101+
interceptors: Optional[Sequence[shared.ClientInterceptor]] = None):
102+
103+
# If the caller provided metadata, we need to create a new interceptor for it and
104+
# add it to the list of interceptors.
105+
if interceptors is not None:
106+
interceptors = list(interceptors)
107+
if metadata is not None:
108+
interceptors.append(DefaultClientInterceptorImpl(metadata))
109+
elif metadata is not None:
110+
interceptors = [DefaultClientInterceptorImpl(metadata)]
111+
else:
112+
interceptors = None
113+
114+
channel = shared.get_grpc_channel(
115+
host_address=host_address,
116+
secure_channel=secure_channel,
117+
interceptors=interceptors
118+
)
101119
self._stub = stubs.TaskHubSidecarServiceStub(channel)
102120
self._logger = shared.get_logger("client", log_handler, log_formatter)
103121

@@ -116,7 +134,7 @@ def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInpu
116134
scheduledStartTimestamp=helpers.new_timestamp(start_at) if start_at else None,
117135
version=wrappers_pb2.StringValue(value=""),
118136
orchestrationIdReusePolicy=reuse_id_policy,
119-
)
137+
)
120138

121139
self._logger.info(f"Starting new '{name}' instance with ID = '{req.instanceId}'.")
122140
res: pb.CreateInstanceResponse = self._stub.StartInstance(req)

durabletask/internal/grpc_interceptor.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -19,28 +19,28 @@ class _ClientCallDetails(
1919

2020

2121
class DefaultClientInterceptorImpl (
22-
grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor,
23-
grpc.StreamUnaryClientInterceptor, grpc.StreamStreamClientInterceptor):
22+
grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor,
23+
grpc.StreamUnaryClientInterceptor, grpc.StreamStreamClientInterceptor):
2424
"""The class implements a UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor,
25-
StreamUnaryClientInterceptor and StreamStreamClientInterceptor from grpc to add an
25+
StreamUnaryClientInterceptor and StreamStreamClientInterceptor from grpc to add an
2626
interceptor to add additional headers to all calls as needed."""
2727

2828
def __init__(self, metadata: list[tuple[str, str]]):
2929
super().__init__()
3030
self._metadata = metadata
3131

3232
def _intercept_call(
33-
self, client_call_details: _ClientCallDetails) -> grpc.ClientCallDetails:
33+
self, client_call_details: _ClientCallDetails) -> grpc.ClientCallDetails:
3434
"""Internal intercept_call implementation which adds metadata to grpc metadata in the RPC
3535
call details."""
3636
if self._metadata is None:
3737
return client_call_details
38-
38+
3939
if client_call_details.metadata is not None:
4040
metadata = list(client_call_details.metadata)
4141
else:
4242
metadata = []
43-
43+
4444
metadata.extend(self._metadata)
4545
client_call_details = _ClientCallDetails(
4646
client_call_details.method, client_call_details.timeout, metadata,

durabletask/internal/shared.py

+15-7
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,16 @@
55
import json
66
import logging
77
from types import SimpleNamespace
8-
from typing import Any, Optional
8+
from typing import Any, Optional, Sequence, Union
99

1010
import grpc
1111

12-
from durabletask.internal.grpc_interceptor import DefaultClientInterceptorImpl
12+
ClientInterceptor = Union[
13+
grpc.UnaryUnaryClientInterceptor,
14+
grpc.UnaryStreamClientInterceptor,
15+
grpc.StreamUnaryClientInterceptor,
16+
grpc.StreamStreamClientInterceptor
17+
]
1318

1419
# Field name used to indicate that an object was automatically serialized
1520
# and should be deserialized as a SimpleNamespace
@@ -25,8 +30,9 @@ def get_default_host_address() -> str:
2530

2631
def get_grpc_channel(
2732
host_address: Optional[str],
28-
metadata: Optional[list[tuple[str, str]]],
29-
secure_channel: bool = False) -> grpc.Channel:
33+
secure_channel: bool = False,
34+
interceptors: Optional[Sequence[ClientInterceptor]] = None) -> grpc.Channel:
35+
3036
if host_address is None:
3137
host_address = get_default_host_address()
3238

@@ -44,16 +50,18 @@ def get_grpc_channel(
4450
host_address = host_address[len(protocol):]
4551
break
4652

53+
# Create the base channel
4754
if secure_channel:
4855
channel = grpc.secure_channel(host_address, grpc.ssl_channel_credentials())
4956
else:
5057
channel = grpc.insecure_channel(host_address)
5158

52-
if metadata is not None and len(metadata) > 0:
53-
interceptors = [DefaultClientInterceptorImpl(metadata)]
59+
# Apply interceptors ONLY if they exist
60+
if interceptors:
5461
channel = grpc.intercept_channel(channel, *interceptors)
5562
return channel
5663

64+
5765
def get_logger(
5866
name_suffix: str,
5967
log_handler: Optional[logging.Handler] = None,
@@ -98,7 +106,7 @@ def default(self, obj):
98106
if dataclasses.is_dataclass(obj):
99107
# Dataclasses are not serializable by default, so we convert them to a dict and mark them for
100108
# automatic deserialization by the receiver
101-
d = dataclasses.asdict(obj) # type: ignore
109+
d = dataclasses.asdict(obj) # type: ignore
102110
d[AUTO_SERIALIZED] = True
103111
return d
104112
elif isinstance(obj, SimpleNamespace):

0 commit comments

Comments
 (0)