@@ -1489,21 +1489,21 @@ https://github.com/temporalio/samples-python/tree/nexus/hello_nexus).
1489
1489
1490
1490
### Plugins
1491
1491
1492
- Plugins provide a way to extend and customize the behavior of Temporal clients and workers through a chain of
1493
- responsibility pattern. They allow you to intercept and modify client creation, service connections, worker
1492
+ Plugins provide a way to extend and customize the behavior of Temporal clients and workers through a chain of
1493
+ responsibility pattern. They allow you to intercept and modify client creation, service connections, worker
1494
1494
configuration, and worker execution. Common customizations may include but are not limited to:
1495
1495
1496
1496
1. DataConverter
1497
1497
2. Activities
1498
1498
3. Workflows
1499
1499
4. Interceptors
1500
1500
1501
- A single plugin class can implement both client and worker plugin interfaces to share common logic between both
1501
+ A single plugin class can implement both client and worker plugin interfaces to share common logic between both
1502
1502
contexts. When used with a client, it will automatically be propagated to any workers created with that client.
1503
1503
1504
1504
#### Client Plugins
1505
1505
1506
- Client plugins can intercept and modify client configuration and service connections. They are useful for adding
1506
+ Client plugins can intercept and modify client configuration and service connections. They are useful for adding
1507
1507
authentication, modifying connection parameters, or adding custom behavior during client creation.
1508
1508
1509
1509
Here's an example of a client plugin that adds custom authentication:
@@ -1515,7 +1515,7 @@ import temporalio.service
1515
1515
class AuthenticationPlugin(Plugin):
1516
1516
def __init__(self, api_key: str):
1517
1517
self.api_key = api_key
1518
-
1518
+
1519
1519
def init_client_plugin(self, next: Plugin) -> None:
1520
1520
self.next_client_plugin = next
1521
1521
@@ -1540,10 +1540,10 @@ client = await Client.connect(
1540
1540
1541
1541
#### Worker Plugins
1542
1542
1543
- Worker plugins can modify worker configuration and intercept worker execution. They are useful for adding monitoring,
1544
- custom lifecycle management, or modifying worker settings. Worker plugins can also configure replay.
1545
- They should do this in the case that they modified the worker in a way which would also need to be present
1546
- for replay to function. For instance, changing the data converter or adding workflows.
1543
+ Worker plugins can modify worker configuration and intercept worker execution. They are useful for adding monitoring,
1544
+ custom lifecycle management, or modifying worker settings. Worker plugins can also configure replay.
1545
+ They should do this in the case that they modified the worker in a way which would also need to be present
1546
+ for replay to function. For instance, changing the data converter or adding workflows.
1547
1547
1548
1548
Here's an example of a worker plugin that adds custom monitoring:
1549
1549
@@ -1560,7 +1560,7 @@ class MonitoringPlugin(Plugin):
1560
1560
1561
1561
def init_worker_plugin (self , next : Plugin) -> None :
1562
1562
self .next_worker_plugin = next
1563
-
1563
+
1564
1564
def configure_worker (self , config : WorkerConfig) -> WorkerConfig:
1565
1565
# Modify worker configuration
1566
1566
original_task_queue = config[" task_queue" ]
@@ -1574,22 +1574,22 @@ class MonitoringPlugin(Plugin):
1574
1574
await self .next_worker_plugin.run_worker(worker)
1575
1575
finally :
1576
1576
self .logger.info(" Worker execution completed" )
1577
-
1578
- def configure_replayer (self , config : ReplayerConfig) -> ReplayerConfig:
1579
- return self .next_worker_plugin.configure_replayer(config)
1580
-
1581
- @asynccontextmanager
1582
- async def run_replayer (
1583
- self ,
1584
- replayer : Replayer,
1585
- histories : AsyncIterator[temporalio.client.WorkflowHistory],
1586
- ) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]:
1587
- self .logger.info(" Starting replay execution" )
1588
- try :
1589
- async with self .next_worker_plugin.run_replayer(replayer, histories) as results:
1590
- yield results
1591
- finally :
1592
- self .logger.info(" Replay execution completed" )
1577
+
1578
+ def configure_replayer (self , config : ReplayerConfig) -> ReplayerConfig:
1579
+ return self .next_worker_plugin.configure_replayer(config)
1580
+
1581
+ @asynccontextmanager
1582
+ async def run_replayer (
1583
+ self ,
1584
+ replayer : Replayer,
1585
+ histories : AsyncIterator[temporalio.client.WorkflowHistory],
1586
+ ) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]:
1587
+ self .logger.info(" Starting replay execution" )
1588
+ try :
1589
+ async with self .next_worker_plugin.run_replayer(replayer, histories) as results:
1590
+ yield results
1591
+ finally :
1592
+ self .logger.info(" Replay execution completed" )
1593
1593
1594
1594
# Use the plugin when creating a worker
1595
1595
worker = Worker(
@@ -1617,38 +1617,38 @@ class UnifiedPlugin(ClientPlugin, WorkerPlugin):
1617
1617
1618
1618
def init_worker_plugin (self , next : WorkerPlugin) -> None :
1619
1619
self .next_worker_plugin = next
1620
-
1620
+
1621
1621
def configure_client (self , config : ClientConfig) -> ClientConfig:
1622
1622
# Client-side customization
1623
1623
config[" data_converter" ] = pydantic_data_converter
1624
1624
return self .next_client_plugin.configure_client(config)
1625
-
1625
+
1626
1626
async def connect_service_client (
1627
1627
self , config : temporalio.service.ConnectConfig
1628
1628
) -> temporalio.service.ServiceClient:
1629
1629
# Add authentication to the connection
1630
1630
config.api_key = self .api_key
1631
1631
return await self .next_client_plugin.connect_service_client(config)
1632
-
1632
+
1633
1633
def configure_worker (self , config : WorkerConfig) -> WorkerConfig:
1634
1634
# Worker-side customization
1635
1635
return self .next_worker_plugin.configure_worker(config)
1636
-
1636
+
1637
1637
async def run_worker (self , worker : Worker) -> None :
1638
1638
print (" Starting unified worker" )
1639
1639
await self .next_worker_plugin.run_worker(worker)
1640
-
1640
+
1641
1641
def configure_replayer (self , config : ReplayerConfig) -> ReplayerConfig:
1642
1642
config[" data_converter" ] = pydantic_data_converter
1643
- return config
1644
-
1643
+ return self .next_worker_plugin.configure_replayer( config)
1644
+
1645
1645
async def run_replayer (
1646
1646
self ,
1647
1647
replayer : Replayer,
1648
1648
histories : AsyncIterator[temporalio.client.WorkflowHistory],
1649
1649
) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]:
1650
1650
return self .next_worker_plugin.run_replayer(replayer, histories)
1651
-
1651
+
1652
1652
# Create client with the unified plugin
1653
1653
client = await Client.connect(
1654
1654
" localhost:7233" ,
@@ -1902,20 +1902,18 @@ poe test -s --log-cli-level=DEBUG -k test_sync_activity_thread_cancel_caught
1902
1902
1903
1903
#### Proto Generation and Testing
1904
1904
1905
- To allow for backwards compatibility, protobuf code is generated on the 3.x series of the protobuf library. To generate
1906
- protobuf code, you must be on Python <= 3.10, and then run ` uv add "protobuf<4" ` + ` uv sync --all-extras ` . Then the
1907
- protobuf files can be generated via ` poe gen-protos ` . Tests can be run for protobuf version 3 by setting the
1908
- ` TEMPORAL_TEST_PROTO3 ` env var to ` 1 ` prior to running tests.
1909
-
1910
- Do not commit ` uv.lock ` or ` pyproject.toml ` changes. To go back from this downgrade, restore both of those files and run
1911
- ` uv sync --all-extras ` . Make sure you ` poe format ` the results.
1905
+ If you have docker available, run
1912
1906
1913
- For a less system-intrusive approach, you can:
1914
- ``` shell
1915
- docker build -f scripts/_proto/Dockerfile .
1916
- docker run --rm -v " ${PWD} /temporalio/api:/api_new" -v " ${PWD} /temporalio/bridge/proto:/bridge_new" < just built image sha>
1917
- poe format
1918
1907
```
1908
+ poe gen-protos-docker
1909
+ ```
1910
+
1911
+ Alternatively: to generate protobuf code, you must be on Python <= 3.10, and then run `uv add
1912
+ "protobuf<4"` + ` uv sync --all-extras` . Then the protobuf files can be generated via ` poe
1913
+ gen-protos` followed by ` poe format` . Do not commit ` uv.lock` or ` pyproject.toml` changes. To go
1914
+ back from this downgrade, restore both of those files and run ` uv sync --all-extras ` . Tests can be
1915
+ run for protobuf version 3 by setting the ` TEMPORAL_TEST_PROTO3 ` env var to ` 1 ` prior to running
1916
+ tests.
1919
1917
1920
1918
### Style
1921
1919
0 commit comments