Skip to content

Commit a004faa

Browse files
authored
Python support for activity reset (#1065)
* WIP activity reset * Fix test for activity reset * Cleanup, remove bad test * Add test and error case * Remove investigation code accidentally added * Add test * Skip on time skipping
1 parent c8b0b78 commit a004faa

File tree

7 files changed

+140
-2
lines changed

7 files changed

+140
-2
lines changed

temporalio/activity.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ class ActivityCancellationDetails:
154154
not_found: bool = False
155155
cancel_requested: bool = False
156156
paused: bool = False
157+
reset: bool = False
157158
timed_out: bool = False
158159
worker_shutdown: bool = False
159160

@@ -167,6 +168,7 @@ def _from_proto(
167168
paused=proto.is_paused,
168169
timed_out=proto.is_timed_out,
169170
worker_shutdown=proto.is_worker_shutdown,
171+
reset=proto.is_reset,
170172
)
171173

172174

temporalio/bridge/src/client.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,9 @@ impl ClientRef {
262262
"request_cancel_workflow_execution" => {
263263
rpc_call!(retry_client, call, request_cancel_workflow_execution)
264264
}
265+
"reset_activity" => {
266+
rpc_call!(retry_client, call, reset_activity)
267+
}
265268
"reset_sticky_task_queue" => {
266269
rpc_call!(retry_client, call, reset_sticky_task_queue)
267270
}

temporalio/client.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6363,11 +6363,16 @@ async def heartbeat_async_activity(
63636363
metadata=input.rpc_metadata,
63646364
timeout=input.rpc_timeout,
63656365
)
6366-
if resp_by_id.cancel_requested or resp_by_id.activity_paused:
6366+
if (
6367+
resp_by_id.cancel_requested
6368+
or resp_by_id.activity_paused
6369+
or resp_by_id.activity_reset
6370+
):
63676371
raise AsyncActivityCancelledError(
63686372
details=ActivityCancellationDetails(
63696373
cancel_requested=resp_by_id.cancel_requested,
63706374
paused=resp_by_id.activity_paused,
6375+
reset=resp_by_id.activity_reset,
63716376
)
63726377
)
63736378

@@ -6388,6 +6393,7 @@ async def heartbeat_async_activity(
63886393
details=ActivityCancellationDetails(
63896394
cancel_requested=resp.cancel_requested,
63906395
paused=resp.activity_paused,
6396+
reset=resp.activity_reset,
63916397
)
63926398
)
63936399

temporalio/worker/_activity.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,24 @@ async def _handle_start_activity_task(
334334
),
335335
completion.result.failed.failure,
336336
)
337+
elif (
338+
isinstance(
339+
err,
340+
(asyncio.CancelledError, temporalio.exceptions.CancelledError),
341+
)
342+
and running_activity.cancellation_details.details
343+
and running_activity.cancellation_details.details.reset
344+
):
345+
temporalio.activity.logger.warning(
346+
"Completing as failure due to unhandled cancel error produced by activity reset",
347+
)
348+
await self._data_converter.encode_failure(
349+
temporalio.exceptions.ApplicationError(
350+
type="ActivityReset",
351+
message="Unhandled activity cancel error produced by activity reset",
352+
),
353+
completion.result.failed.failure,
354+
)
337355
elif (
338356
isinstance(
339357
err,

temporalio/worker/_workflow_instance.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2791,6 +2791,7 @@ def _apply_schedule_command(
27912791
command.user_metadata.summary.CopyFrom(
27922792
self._instance._payload_converter.to_payload(self._input.summary)
27932793
)
2794+
print("Activity summary: ", command.user_metadata.summary)
27942795
if self._input.priority:
27952796
command.schedule_activity.priority.CopyFrom(
27962797
self._input.priority._to_proto()

tests/worker/test_activity.py

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,15 @@
99
import threading
1010
import time
1111
import uuid
12+
from concurrent.futures import ThreadPoolExecutor
1213
from concurrent.futures.process import BrokenProcessPool
1314
from contextvars import ContextVar
1415
from dataclasses import dataclass
1516
from datetime import datetime, timedelta, timezone
17+
from time import sleep
1618
from typing import Any, Callable, List, NoReturn, Optional, Sequence, Type
1719

20+
import temporalio.api.workflowservice.v1
1821
from temporalio import activity, workflow
1922
from temporalio.client import (
2023
AsyncActivityHandle,
@@ -1486,3 +1489,105 @@ async def h():
14861489
client, worker, heartbeat, retry_max_attempts=2
14871490
)
14881491
assert result.result == "details: Some detail"
1492+
1493+
1494+
async def test_activity_reset_catch(
1495+
client: Client, worker: ExternalWorker, env: WorkflowEnvironment
1496+
):
1497+
if env.supports_time_skipping:
1498+
pytest.skip("Time skipping server doesn't support activity reset")
1499+
1500+
@activity.defn
1501+
async def wait_cancel() -> str:
1502+
req = temporalio.api.workflowservice.v1.ResetActivityRequest(
1503+
namespace=client.namespace,
1504+
execution=temporalio.api.common.v1.WorkflowExecution(
1505+
workflow_id=activity.info().workflow_id,
1506+
run_id=activity.info().workflow_run_id,
1507+
),
1508+
id=activity.info().activity_id,
1509+
)
1510+
await client.workflow_service.reset_activity(req)
1511+
try:
1512+
while True:
1513+
await asyncio.sleep(0.3)
1514+
activity.heartbeat()
1515+
except asyncio.CancelledError:
1516+
details = activity.cancellation_details()
1517+
assert details is not None
1518+
return "Got cancelled error, reset? " + str(details.reset)
1519+
1520+
@activity.defn
1521+
def sync_wait_cancel() -> str:
1522+
req = temporalio.api.workflowservice.v1.ResetActivityRequest(
1523+
namespace=client.namespace,
1524+
execution=temporalio.api.common.v1.WorkflowExecution(
1525+
workflow_id=activity.info().workflow_id,
1526+
run_id=activity.info().workflow_run_id,
1527+
),
1528+
id=activity.info().activity_id,
1529+
)
1530+
asyncio.run(client.workflow_service.reset_activity(req))
1531+
try:
1532+
while True:
1533+
sleep(0.3)
1534+
activity.heartbeat()
1535+
except temporalio.exceptions.CancelledError:
1536+
details = activity.cancellation_details()
1537+
assert details is not None
1538+
return "Got cancelled error, reset? " + str(details.reset)
1539+
except Exception as e:
1540+
return str(type(e)) + str(e)
1541+
1542+
result = await _execute_workflow_with_activity(
1543+
client,
1544+
worker,
1545+
wait_cancel,
1546+
)
1547+
assert result.result == "Got cancelled error, reset? True"
1548+
1549+
config = WorkerConfig(
1550+
activity_executor=ThreadPoolExecutor(max_workers=1),
1551+
)
1552+
result = await _execute_workflow_with_activity(
1553+
client,
1554+
worker,
1555+
sync_wait_cancel,
1556+
worker_config=config,
1557+
)
1558+
assert result.result == "Got cancelled error, reset? True"
1559+
1560+
1561+
async def test_activity_reset_history(
1562+
client: Client, worker: ExternalWorker, env: WorkflowEnvironment
1563+
):
1564+
if env.supports_time_skipping:
1565+
pytest.skip("Time skipping server doesn't support activity reset")
1566+
1567+
@activity.defn
1568+
async def wait_cancel() -> str:
1569+
req = temporalio.api.workflowservice.v1.ResetActivityRequest(
1570+
namespace=client.namespace,
1571+
execution=temporalio.api.common.v1.WorkflowExecution(
1572+
workflow_id=activity.info().workflow_id,
1573+
run_id=activity.info().workflow_run_id,
1574+
),
1575+
id=activity.info().activity_id,
1576+
)
1577+
await client.workflow_service.reset_activity(req)
1578+
while True:
1579+
await asyncio.sleep(0.3)
1580+
activity.heartbeat()
1581+
1582+
with pytest.raises(WorkflowFailureError) as e:
1583+
result = await _execute_workflow_with_activity(
1584+
client,
1585+
worker,
1586+
wait_cancel,
1587+
)
1588+
assert isinstance(e.value.cause, ActivityError)
1589+
assert isinstance(e.value.cause.cause, ApplicationError)
1590+
assert (
1591+
e.value.cause.cause.message
1592+
== "Unhandled activity cancel error produced by activity reset"
1593+
)

tests/worker/test_workflow.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -814,7 +814,10 @@ class SimpleActivityWorkflow:
814814
@workflow.run
815815
async def run(self, name: str) -> str:
816816
return await workflow.execute_activity(
817-
say_hello, name, schedule_to_close_timeout=timedelta(seconds=5)
817+
say_hello,
818+
name,
819+
schedule_to_close_timeout=timedelta(seconds=5),
820+
summary="Do a thing",
818821
)
819822

820823

0 commit comments

Comments
 (0)