Skip to content

Commit 51f4b66

Browse files
authored
Update schedule search attributes (#753)
Also fix flaky test test_workflow_failure_types_configured: run tests sequentially and fix reference to workflow.NondeterminismError
1 parent 35a0e6c commit 51f4b66

File tree

3 files changed

+185
-36
lines changed

3 files changed

+185
-36
lines changed

Diff for: temporalio/client.py

+17-7
Original file line numberDiff line numberDiff line change
@@ -4047,6 +4047,7 @@ async def _to_proto(
40474047
temporalio.converter.encode_search_attributes(
40484048
untyped_not_in_typed, action.start_workflow.search_attributes
40494049
)
4050+
# TODO (dan): confirm whether this be `is not None`
40504051
if self.typed_search_attributes:
40514052
temporalio.converter.encode_search_attributes(
40524053
self.typed_search_attributes, action.start_workflow.search_attributes
@@ -4499,6 +4500,9 @@ class ScheduleUpdate:
44994500
schedule: Schedule
45004501
"""Schedule to update."""
45014502

4503+
search_attributes: Optional[temporalio.common.TypedSearchAttributes] = None
4504+
"""Search attributes to update."""
4505+
45024506

45034507
@dataclass
45044508
class ScheduleListDescription:
@@ -6520,14 +6524,20 @@ async def update_schedule(self, input: UpdateScheduleInput) -> None:
65206524
if not update:
65216525
return
65226526
assert isinstance(update, ScheduleUpdate)
6527+
request = temporalio.api.workflowservice.v1.UpdateScheduleRequest(
6528+
namespace=self._client.namespace,
6529+
schedule_id=input.id,
6530+
schedule=await update.schedule._to_proto(self._client),
6531+
identity=self._client.identity,
6532+
request_id=str(uuid.uuid4()),
6533+
)
6534+
if update.search_attributes is not None:
6535+
request.search_attributes.indexed_fields.clear() # Ensure that we at least create an empty map
6536+
temporalio.converter.encode_search_attributes(
6537+
update.search_attributes, request.search_attributes
6538+
)
65236539
await self._client.workflow_service.update_schedule(
6524-
temporalio.api.workflowservice.v1.UpdateScheduleRequest(
6525-
namespace=self._client.namespace,
6526-
schedule_id=input.id,
6527-
schedule=await update.schedule._to_proto(self._client),
6528-
identity=self._client.identity,
6529-
request_id=str(uuid.uuid4()),
6530-
),
6540+
request,
65316541
retry=True,
65326542
metadata=input.rpc_metadata,
65336543
timeout=input.rpc_timeout,

Diff for: tests/test_client.py

+146-5
Original file line numberDiff line numberDiff line change
@@ -1188,18 +1188,16 @@ async def test_schedule_create_limited_actions_validation(
11881188
assert "are remaining actions set" in str(err.value)
11891189

11901190

1191-
async def test_schedule_search_attribute_update(
1191+
async def test_schedule_workflow_search_attribute_update(
11921192
client: Client, env: WorkflowEnvironment
11931193
):
11941194
if env.supports_time_skipping:
11951195
pytest.skip("Java test server doesn't support schedules")
11961196
await assert_no_schedules(client)
11971197

11981198
# Put search attribute on server
1199-
text_attr_key = SearchAttributeKey.for_text(f"python-test-schedule-text")
1200-
untyped_keyword_key = SearchAttributeKey.for_keyword(
1201-
f"python-test-schedule-keyword"
1202-
)
1199+
text_attr_key = SearchAttributeKey.for_text("python-test-schedule-text")
1200+
untyped_keyword_key = SearchAttributeKey.for_keyword("python-test-schedule-keyword")
12031201
await ensure_search_attributes_present(client, text_attr_key, untyped_keyword_key)
12041202

12051203
# Create a schedule with search attributes on the schedule and on the
@@ -1273,6 +1271,7 @@ def update_schedule_typed_attrs(
12731271
# Check that it changed
12741272
desc = await handle.describe()
12751273
assert isinstance(desc.schedule.action, ScheduleActionStartWorkflow)
1274+
# Check that the workflow search attributes were changed
12761275
# This assertion has changed since server 1.24. Now, even untyped search
12771276
# attributes are given a type server side
12781277
assert (
@@ -1283,6 +1282,148 @@ def update_schedule_typed_attrs(
12831282
and desc.schedule.action.typed_search_attributes[untyped_keyword_key]
12841283
== "some-untyped-attr1"
12851284
)
1285+
# Check that the schedule search attributes were not changed
1286+
assert desc.search_attributes[text_attr_key.name] == ["some-schedule-attr1"]
1287+
assert desc.typed_search_attributes[text_attr_key] == "some-schedule-attr1"
1288+
1289+
await handle.delete()
1290+
await assert_no_schedules(client)
1291+
1292+
1293+
@pytest.mark.parametrize(
1294+
"test_case",
1295+
[
1296+
"none-is-noop",
1297+
"empty-but-non-none-clears",
1298+
"all-new-values-overwrites",
1299+
"partial-new-values-overwrites-and-drops",
1300+
],
1301+
)
1302+
async def test_schedule_search_attribute_update(
1303+
client: Client, env: WorkflowEnvironment, test_case: str
1304+
):
1305+
if env.supports_time_skipping:
1306+
pytest.skip("Java test server doesn't support schedules")
1307+
await assert_no_schedules(client)
1308+
1309+
# Put search attributes on server
1310+
key_1 = SearchAttributeKey.for_text("python-test-schedule-sa-update-key-1")
1311+
key_2 = SearchAttributeKey.for_keyword("python-test-schedule-sa-update-key-2")
1312+
await ensure_search_attributes_present(client, key_1, key_2)
1313+
val_1 = "val-1"
1314+
val_2 = "val-2"
1315+
1316+
# Create a schedule with search attributes
1317+
create_action = ScheduleActionStartWorkflow(
1318+
"some workflow",
1319+
[],
1320+
id=f"workflow-{uuid.uuid4()}",
1321+
task_queue=f"tq-{uuid.uuid4()}",
1322+
)
1323+
handle = await client.create_schedule(
1324+
f"schedule-{uuid.uuid4()}",
1325+
Schedule(action=create_action, spec=ScheduleSpec()),
1326+
search_attributes=TypedSearchAttributes(
1327+
[
1328+
SearchAttributePair(key_1, val_1),
1329+
SearchAttributePair(key_2, val_2),
1330+
]
1331+
),
1332+
)
1333+
1334+
def update_search_attributes(
1335+
input: ScheduleUpdateInput,
1336+
) -> Optional[ScheduleUpdate]:
1337+
# Make sure the initial search attributes are present
1338+
assert input.description.search_attributes[key_1.name] == [val_1]
1339+
assert input.description.search_attributes[key_2.name] == [val_2]
1340+
assert input.description.typed_search_attributes[key_1] == val_1
1341+
assert input.description.typed_search_attributes[key_2] == val_2
1342+
1343+
if test_case == "none-is-noop":
1344+
# Passing None makes no changes
1345+
return ScheduleUpdate(input.description.schedule, search_attributes=None)
1346+
elif test_case == "empty-but-non-none-clears":
1347+
# Pass empty but non-None to clear all attributes
1348+
return ScheduleUpdate(
1349+
input.description.schedule,
1350+
search_attributes=TypedSearchAttributes.empty,
1351+
)
1352+
elif test_case == "all-new-values-overwrites":
1353+
# Pass all new values to overwrite existing
1354+
return ScheduleUpdate(
1355+
input.description.schedule,
1356+
search_attributes=input.description.typed_search_attributes.updated(
1357+
SearchAttributePair(key_1, val_1 + "-new"),
1358+
SearchAttributePair(key_2, val_2 + "-new"),
1359+
),
1360+
)
1361+
elif test_case == "partial-new-values-overwrites-and-drops":
1362+
# Only update key_1, which should drop key_2
1363+
return ScheduleUpdate(
1364+
input.description.schedule,
1365+
search_attributes=TypedSearchAttributes(
1366+
[
1367+
SearchAttributePair(key_1, val_1 + "-new"),
1368+
]
1369+
),
1370+
)
1371+
else:
1372+
raise ValueError(f"Invalid test case: {test_case}")
1373+
1374+
await handle.update(update_search_attributes)
1375+
1376+
if test_case == "none-is-noop":
1377+
1378+
async def expectation() -> bool:
1379+
desc = await handle.describe()
1380+
return (
1381+
desc.search_attributes[key_1.name] == [val_1]
1382+
and desc.search_attributes[key_2.name] == [val_2]
1383+
and desc.typed_search_attributes[key_1] == val_1
1384+
and desc.typed_search_attributes[key_2] == val_2
1385+
)
1386+
1387+
await assert_eq_eventually(True, expectation)
1388+
elif test_case == "empty-but-non-none-clears":
1389+
1390+
async def expectation() -> bool:
1391+
desc = await handle.describe()
1392+
return (
1393+
len(desc.typed_search_attributes) == 0
1394+
and len(desc.search_attributes) == 0
1395+
)
1396+
1397+
await assert_eq_eventually(True, expectation)
1398+
elif test_case == "all-new-values-overwrites":
1399+
1400+
async def expectation() -> bool:
1401+
desc = await handle.describe()
1402+
return (
1403+
desc.search_attributes[key_1.name] == [val_1 + "-new"]
1404+
and desc.search_attributes[key_2.name] == [val_2 + "-new"]
1405+
and desc.typed_search_attributes[key_1] == val_1 + "-new"
1406+
and desc.typed_search_attributes[key_2] == val_2 + "-new"
1407+
)
1408+
1409+
await assert_eq_eventually(True, expectation)
1410+
elif test_case == "partial-new-values-overwrites-and-drops":
1411+
1412+
async def expectation() -> bool:
1413+
desc = await handle.describe()
1414+
return (
1415+
desc.search_attributes[key_1.name] == [val_1 + "-new"]
1416+
and desc.typed_search_attributes[key_1] == val_1 + "-new"
1417+
and key_2.name not in desc.search_attributes
1418+
and key_2 not in desc.typed_search_attributes
1419+
)
1420+
1421+
await assert_eq_eventually(True, expectation)
1422+
else:
1423+
raise ValueError(f"Invalid test case: {test_case}")
1424+
1425+
await handle.delete()
1426+
await assert_no_schedules(client)
12861427

12871428

12881429
async def assert_no_schedules(client: Client) -> None:

Diff for: tests/worker/test_workflow.py

+22-24
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
from typing_extensions import Literal, Protocol, runtime_checkable
3939

4040
import temporalio.worker
41+
import temporalio.workflow
4142
from temporalio import activity, workflow
4243
from temporalio.api.common.v1 import Payload, Payloads, WorkflowExecution
4344
from temporalio.api.enums.v1 import EventType
@@ -5040,61 +5041,58 @@ async def run_scenario(
50405041
update_scenario=scenario,
50415042
)
50425043

5043-
# Run all tasks concurrently
5044-
await asyncio.gather(
50455044
# When unconfigured completely, confirm task fails as normal
5046-
run_scenario(
5045+
await run_scenario(
50475046
FailureTypesUnconfiguredWorkflow,
50485047
FailureTypesScenario.THROW_CUSTOM_EXCEPTION,
50495048
expect_task_fail=True,
5050-
),
5051-
run_scenario(
5049+
)
5050+
await run_scenario(
50525051
FailureTypesUnconfiguredWorkflow,
50535052
FailureTypesScenario.CAUSE_NON_DETERMINISM,
50545053
expect_task_fail=True,
5055-
),
5054+
)
50565055
# When configured at the worker level explicitly, confirm not task fail
50575056
# but rather expected exceptions
5058-
run_scenario(
5057+
await run_scenario(
50595058
FailureTypesUnconfiguredWorkflow,
50605059
FailureTypesScenario.THROW_CUSTOM_EXCEPTION,
50615060
worker_level_failure_exception_type=FailureTypesCustomException,
5062-
),
5063-
run_scenario(
5061+
)
5062+
await run_scenario(
50645063
FailureTypesUnconfiguredWorkflow,
50655064
FailureTypesScenario.CAUSE_NON_DETERMINISM,
5066-
worker_level_failure_exception_type=workflow.NondeterminismError,
5067-
),
5065+
worker_level_failure_exception_type=temporalio.workflow.NondeterminismError,
5066+
)
50685067
# When configured at the worker level inherited
5069-
run_scenario(
5068+
await run_scenario(
50705069
FailureTypesUnconfiguredWorkflow,
50715070
FailureTypesScenario.THROW_CUSTOM_EXCEPTION,
50725071
worker_level_failure_exception_type=Exception,
5073-
),
5074-
run_scenario(
5072+
)
5073+
await run_scenario(
50755074
FailureTypesUnconfiguredWorkflow,
50765075
FailureTypesScenario.CAUSE_NON_DETERMINISM,
50775076
worker_level_failure_exception_type=Exception,
5078-
),
5077+
)
50795078
# When configured at the workflow level explicitly
5080-
run_scenario(
5079+
await run_scenario(
50815080
FailureTypesConfiguredExplicitlyWorkflow,
50825081
FailureTypesScenario.THROW_CUSTOM_EXCEPTION,
5083-
),
5084-
run_scenario(
5082+
)
5083+
await run_scenario(
50855084
FailureTypesConfiguredExplicitlyWorkflow,
50865085
FailureTypesScenario.CAUSE_NON_DETERMINISM,
5087-
),
5086+
)
50885087
# When configured at the workflow level inherited
5089-
run_scenario(
5088+
await run_scenario(
50905089
FailureTypesConfiguredInheritedWorkflow,
50915090
FailureTypesScenario.THROW_CUSTOM_EXCEPTION,
5092-
),
5093-
run_scenario(
5091+
)
5092+
await run_scenario(
50945093
FailureTypesConfiguredInheritedWorkflow,
50955094
FailureTypesScenario.CAUSE_NON_DETERMINISM,
5096-
),
5097-
)
5095+
)
50985096

50995097

51005098
@workflow.defn(failure_exception_types=[Exception])

0 commit comments

Comments
 (0)