Skip to content

Plugin Support #952

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Jul 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 137 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ informal introduction to the features and their implementation.
- [Testing](#testing-1)
- [Interceptors](#interceptors)
- [Nexus](#nexus)
- [Plugins](#plugins)
- [Client Plugins](#client-plugins)
- [Worker Plugins](#worker-plugins)
- [Workflow Replay](#workflow-replay)
- [Observability](#observability)
- [Metrics](#metrics)
Expand Down Expand Up @@ -1482,6 +1485,140 @@ https://github.com/temporalio/samples-python/tree/nexus/hello_nexus).
```


### Plugins

Plugins provide a way to extend and customize the behavior of Temporal clients and workers through a chain of
responsibility pattern. They allow you to intercept and modify client creation, service connections, worker
configuration, and worker execution. Common customizations may include but are not limited to:

1. DataConverter
2. Activities
3. Workflows
4. Interceptors

A single plugin class can implement both client and worker plugin interfaces to share common logic between both
contexts. When used with a client, it will automatically be propagated to any workers created with that client.

#### Client Plugins

Client plugins can intercept and modify client configuration and service connections. They are useful for adding
authentication, modifying connection parameters, or adding custom behavior during client creation.

Here's an example of a client plugin that adds custom authentication:

```python
from temporalio.client import Plugin, ClientConfig
import temporalio.service

class AuthenticationPlugin(Plugin):
def __init__(self, api_key: str):
self.api_key = api_key

def configure_client(self, config: ClientConfig) -> ClientConfig:
# Modify client configuration
config["namespace"] = "my-secure-namespace"
return super().configure_client(config)

async def connect_service_client(
self, config: temporalio.service.ConnectConfig
) -> temporalio.service.ServiceClient:
# Add authentication to the connection
config.api_key = self.api_key
return await super().connect_service_client(config)

# Use the plugin when connecting
client = await Client.connect(
"my-server.com:7233",
plugins=[AuthenticationPlugin("my-api-key")]
)
```

#### Worker Plugins

Worker plugins can modify worker configuration and intercept worker execution. They are useful for adding monitoring,
custom lifecycle management, or modifying worker settings.

Here's an example of a worker plugin that adds custom monitoring:

```python
from temporalio.worker import Plugin, WorkerConfig, Worker
import logging

class MonitoringPlugin(Plugin):
def __init__(self):
self.logger = logging.getLogger(__name__)

def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
# Modify worker configuration
original_task_queue = config["task_queue"]
config["task_queue"] = f"monitored-{original_task_queue}"
self.logger.info(f"Worker created for task queue: {config['task_queue']}")
return super().configure_worker(config)

async def run_worker(self, worker: Worker) -> None:
self.logger.info("Starting worker execution")
try:
await super().run_worker(worker)
finally:
self.logger.info("Worker execution completed")

# Use the plugin when creating a worker
worker = Worker(
client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[my_activity],
plugins=[MonitoringPlugin()]
)
```

For plugins that need to work with both clients and workers, you can implement both interfaces in a single class:

```python
from temporalio.client import Plugin as ClientPlugin, ClientConfig
from temporalio.worker import Plugin as WorkerPlugin, WorkerConfig


class UnifiedPlugin(ClientPlugin, WorkerPlugin):
def configure_client(self, config: ClientConfig) -> ClientConfig:
# Client-side customization
config["namespace"] = "unified-namespace"
return super().configure_client(config)

def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
# Worker-side customization
config["max_cached_workflows"] = 500
return super().configure_worker(config)

async def run_worker(self, worker: Worker) -> None:
print("Starting unified worker")
await super().run_worker(worker)


# Create client with the unified plugin
client = await Client.connect(
"localhost:7233",
plugins=[UnifiedPlugin()]
)

# Worker will automatically inherit the plugin from the client
worker = Worker(
client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[my_activity]
)
```

**Important Notes:**

- Plugins are executed in reverse order (last plugin wraps the first), forming a chain of responsibility
- Client plugins that also implement worker plugin interfaces are automatically propagated to workers
- Avoid providing the same plugin to both client and worker to prevent double execution
- Plugin methods should call `super()` to maintain the plugin chain
- Each plugin's `name()` method returns a unique identifier for debugging purposes


### Workflow Replay

Given a workflow's history, it can be replayed locally to check for things like non-determinism errors. For example,
Expand Down
122 changes: 115 additions & 7 deletions temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import abc
import asyncio
import copy
import dataclasses
Expand Down Expand Up @@ -107,6 +108,7 @@ async def connect(
namespace: str = "default",
api_key: Optional[str] = None,
data_converter: temporalio.converter.DataConverter = temporalio.converter.DataConverter.default,
plugins: Sequence[Plugin] = [],
interceptors: Sequence[Interceptor] = [],
default_workflow_query_reject_condition: Optional[
temporalio.common.QueryRejectCondition
Expand All @@ -132,6 +134,14 @@ async def connect(
metadata doesn't already have an "authorization" key.
data_converter: Data converter to use for all data conversions
to/from payloads.
plugins: Set of plugins that are chained together to allow
intercepting and modifying client creation and service connection.
The earlier plugins wrap the later ones.

Any plugins that also implement
:py:class:`temporalio.worker.Plugin` will be used as worker
plugins too so they should not be given when creating a
worker.
interceptors: Set of interceptors that are chained together to allow
intercepting of client calls. The earlier interceptors wrap the
later ones.
Expand Down Expand Up @@ -178,13 +188,21 @@ async def connect(
runtime=runtime,
http_connect_proxy_config=http_connect_proxy_config,
)

root_plugin: Plugin = _RootPlugin()
for plugin in reversed(plugins):
root_plugin = plugin.init_client_plugin(root_plugin)

service_client = await root_plugin.connect_service_client(connect_config)

return Client(
await temporalio.service.ServiceClient.connect(connect_config),
service_client,
namespace=namespace,
data_converter=data_converter,
interceptors=interceptors,
default_workflow_query_reject_condition=default_workflow_query_reject_condition,
header_codec_behavior=header_codec_behavior,
plugins=plugins,
)

def __init__(
Expand All @@ -193,6 +211,7 @@ def __init__(
*,
namespace: str = "default",
data_converter: temporalio.converter.DataConverter = temporalio.converter.DataConverter.default,
plugins: Sequence[Plugin] = [],
interceptors: Sequence[Interceptor] = [],
default_workflow_query_reject_condition: Optional[
temporalio.common.QueryRejectCondition
Expand All @@ -203,21 +222,31 @@ def __init__(

See :py:meth:`connect` for details on the parameters.
"""
# Iterate over interceptors in reverse building the impl
self._impl: OutboundInterceptor = _ClientImpl(self)
for interceptor in reversed(list(interceptors)):
self._impl = interceptor.intercept_client(self._impl)

# Store the config for tracking
self._config = ClientConfig(
config = ClientConfig(
service_client=service_client,
namespace=namespace,
data_converter=data_converter,
interceptors=interceptors,
default_workflow_query_reject_condition=default_workflow_query_reject_condition,
header_codec_behavior=header_codec_behavior,
plugins=plugins,
)

root_plugin: Plugin = _RootPlugin()
for plugin in reversed(plugins):
root_plugin = plugin.init_client_plugin(root_plugin)

self._init_from_config(root_plugin.configure_client(config))

def _init_from_config(self, config: ClientConfig):
self._config = config

# Iterate over interceptors in reverse building the impl
self._impl: OutboundInterceptor = _ClientImpl(self)
for interceptor in reversed(list(self._config["interceptors"])):
self._impl = interceptor.intercept_client(self._impl)

def config(self) -> ClientConfig:
"""Config, as a dictionary, used to create this client.

Expand Down Expand Up @@ -1507,6 +1536,7 @@ class ClientConfig(TypedDict, total=False):
Optional[temporalio.common.QueryRejectCondition]
]
header_codec_behavior: Required[HeaderCodecBehavior]
plugins: Required[Sequence[Plugin]]


class WorkflowHistoryEventFilterType(IntEnum):
Expand Down Expand Up @@ -7356,3 +7386,81 @@ async def _decode_user_metadata(
if not metadata.HasField("details")
else (await converter.decode([metadata.details]))[0],
)


class Plugin(abc.ABC):
"""Base class for client plugins that can intercept and modify client behavior.

Plugins allow customization of client creation and service connection processes
through a chain of responsibility pattern. Each plugin can modify the client
configuration or intercept service client connections.

If the plugin is also a temporalio.worker.Plugin, it will additionally be propagated as a worker plugin.
You should likley not also provide it to the worker as that will result in the plugin being applied twice.
"""

def name(self) -> str:
"""Get the name of this plugin. Can be overridden if desired to provide a more appropriate name.

Returns:
The fully qualified name of the plugin class (module.classname).
"""
return type(self).__module__ + "." + type(self).__qualname__

def init_client_plugin(self, next: Plugin) -> Plugin:
"""Initialize this plugin in the plugin chain.

This method sets up the chain of responsibility pattern by storing a reference
to the next plugin in the chain. It is called during client creation to build
the plugin chain. Note, this may be called twice in the case of :py:meth:`connect`.

Args:
next: The next plugin in the chain to delegate to.

Returns:
This plugin instance for method chaining.
"""
self.next_client_plugin = next
return self

def configure_client(self, config: ClientConfig) -> ClientConfig:
"""Hook called when creating a client to allow modification of configuration.

This method is called during client creation and allows plugins to modify
the client configuration before the client is fully initialized. Plugins
can add interceptors, modify connection parameters, or change other settings.

Args:
config: The client configuration dictionary to potentially modify.

Returns:
The modified client configuration.
"""
return self.next_client_plugin.configure_client(config)

async def connect_service_client(
self, config: temporalio.service.ConnectConfig
) -> temporalio.service.ServiceClient:
"""Hook called when connecting to the Temporal service.

This method is called during service client connection and allows plugins
to intercept or modify the connection process. Plugins can modify connection
parameters, add authentication, or provide custom connection logic.

Args:
config: The service connection configuration.

Returns:
The connected service client.
"""
return await self.next_client_plugin.connect_service_client(config)


class _RootPlugin(Plugin):
def configure_client(self, config: ClientConfig) -> ClientConfig:
return config

async def connect_service_client(
self, config: temporalio.service.ConnectConfig
) -> temporalio.service.ServiceClient:
return await temporalio.service.ServiceClient.connect(config)
2 changes: 2 additions & 0 deletions temporalio/worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
WorkflowSlotInfo,
)
from ._worker import (
Plugin,
PollerBehavior,
PollerBehaviorAutoscaling,
PollerBehaviorSimpleMaximum,
Expand Down Expand Up @@ -78,6 +79,7 @@
"ActivityOutboundInterceptor",
"WorkflowInboundInterceptor",
"WorkflowOutboundInterceptor",
"Plugin",
# Interceptor input
"ContinueAsNewInput",
"ExecuteActivityInput",
Expand Down
Loading
Loading