Skip to content

IWF-465: Initial search attributes #88

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 59 additions & 20 deletions iwf/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing_extensions import deprecated

from iwf.client_options import ClientOptions
from iwf.errors import InvalidArgumentError
from iwf.errors import InvalidArgumentError, WorkflowDefinitionError
from iwf.iwf_api.models import (
SearchAttribute,
SearchAttributeKeyAndType,
Expand All @@ -16,6 +16,7 @@
from iwf.stop_workflow_options import StopWorkflowOptions
from iwf.unregistered_client import UnregisteredClient, UnregisteredWorkflowOptions
from iwf.utils.iwf_typing import unset_to_none
from iwf.utils.persistence_utils import get_search_attribute_value
from iwf.workflow import ObjectWorkflow, get_workflow_type_by_class
from iwf.workflow_options import WorkflowOptions
from iwf.workflow_state import (
Expand Down Expand Up @@ -94,7 +95,12 @@ def start_workflow(
options.wait_for_completion_state_execution_ids
)

# TODO: set initial search attributes here
if options.initial_search_attributes:
sa_types = self._registry.get_search_attribute_types(wf_type)
converted_sas = convert_to_sa_list(
sa_types, options.initial_search_attributes
)
unreg_opts.initial_search_attributes = converted_sas

starting_state_id = None

Expand Down Expand Up @@ -404,22 +410,55 @@ def _do_set_workflow_search_attributes(
)


def get_search_attribute_value(
sa_type: SearchAttributeValueType, attribute: SearchAttribute
def convert_to_sa_list(
sa_types: dict[str, SearchAttributeValueType], initial_sas: dict[str, Any]
):
if (
sa_type == SearchAttributeValueType.KEYWORD
or sa_type == SearchAttributeValueType.DATETIME
or sa_type == SearchAttributeValueType.TEXT
):
return unset_to_none(attribute.string_value)
elif sa_type == SearchAttributeValueType.INT:
return unset_to_none(attribute.integer_value)
elif sa_type == SearchAttributeValueType.DOUBLE:
return unset_to_none(attribute.double_value)
elif sa_type == SearchAttributeValueType.BOOL:
return unset_to_none(attribute.bool_value)
elif sa_type == SearchAttributeValueType.KEYWORD_ARRAY:
return unset_to_none(attribute.string_array_value)
else:
raise ValueError(f"not supported search attribute value type, {sa_type}")
converted_sas: list[SearchAttribute] = []
if initial_sas:
for initial_sa_key, initial_sa_val in initial_sas.items():
if initial_sa_key not in sa_types:
raise WorkflowDefinitionError(
f"key {initial_sa_key} is not defined as search attribute, all keys are: {','.join(sa_types)}"
)

val_type = sa_types[initial_sa_key]
new_sa = SearchAttribute(key=initial_sa_key, value_type=val_type)
is_val_correct_type = False
if val_type == SearchAttributeValueType.INT:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know much Python, but could this be whole block with a switch type statement? Unsure if it's a feature available in Python though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wish it could! https://www.datacamp.com/tutorial/python-switch-case

It's available in 3.10+

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense! Thank you 💯

Yes, I think it's best to keep the 3.9 support for now. We can do one big refactor a some point if we decide that the features are worth it. I don't think it's worth it just for the match feature 👍

if isinstance(initial_sa_val, int):
new_sa.integer_value = initial_sa_val
converted_sas.append(new_sa)
is_val_correct_type = True
elif val_type == SearchAttributeValueType.DOUBLE:
if isinstance(initial_sa_val, float):
new_sa.double_value = initial_sa_val
converted_sas.append(new_sa)
is_val_correct_type = True
elif val_type == SearchAttributeValueType.BOOL:
if isinstance(initial_sa_val, bool):
new_sa.bool_value = initial_sa_val
converted_sas.append(new_sa)
is_val_correct_type = True
elif (
val_type == SearchAttributeValueType.KEYWORD
or val_type == SearchAttributeValueType.TEXT
or val_type == SearchAttributeValueType.DATETIME
):
if isinstance(initial_sa_val, str):
new_sa.string_value = initial_sa_val
converted_sas.append(new_sa)
is_val_correct_type = True
elif val_type == SearchAttributeValueType.KEYWORD_ARRAY:
if isinstance(initial_sa_val, list):
new_sa.string_array_value = initial_sa_val
converted_sas.append(new_sa)
is_val_correct_type = True
else:
raise ValueError("unsupported type")

if not is_val_correct_type:
raise InvalidArgumentError(
f"search attribute value is not set correctly for key {initial_sa_key} with value type {val_type}"
)

return converted_sas
73 changes: 67 additions & 6 deletions iwf/tests/test_persistence_search_attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from time import sleep

from iwf.client import Client
from iwf.command_request import CommandRequest
from iwf.command_request import CommandRequest, TimerCommand
from iwf.command_results import CommandResults
from iwf.communication import Communication
from iwf.iwf_api.models import SearchAttributeValueType
Expand All @@ -25,6 +25,13 @@
sa_datetime_key = "CustomDatetimeField"
sa_keyword_array_key = "CustomKeywordArrayField"

initial_sa_keyword: str = "initial_keyword"
initial_sa_double: float = 1.11
initial_sa_int: int = 1
initial_sa_bool: bool = True
initial_sa_datetime: str = "2024-11-09T16:00:01.731455544-08:00"
initial_sa_keyword_array: list[str] = ["initial_keyword-1", "initial_keyword-2"]

sa_keyword: str = "keyword"
sa_double: float = 2.34
sa_int: int = 234
Expand All @@ -39,15 +46,17 @@
final_sa_keyword_array: list[str] = ["final_keyword-1", "final_keyword-2"]


class SearchAttributeState1(WorkflowState[None]):
class SearchAttributeStateInit(WorkflowState[None]):
def wait_until(
self,
ctx: WorkflowContext,
input: T,
persistence: Persistence,
communication: Communication,
) -> CommandRequest:
return CommandRequest.empty()
return CommandRequest.for_all_command_completed(
TimerCommand.by_seconds(2),
)

def execute(
self,
Expand All @@ -65,6 +74,29 @@ def execute(
)
persistence.set_search_attribute_int64(sa_int_key, sa_int)
persistence.set_search_attribute_datetime(sa_datetime_key, sa_datetime)
return StateDecision.single_next_state(SearchAttributeState1)


class SearchAttributeState1(WorkflowState[None]):
def wait_until(
self,
ctx: WorkflowContext,
input: T,
persistence: Persistence,
communication: Communication,
) -> CommandRequest:
return CommandRequest.for_all_command_completed(
TimerCommand.by_seconds(2),
)

def execute(
self,
ctx: WorkflowContext,
input: T,
command_results: CommandResults,
persistence: Persistence,
communication: Communication,
) -> StateDecision:
return StateDecision.single_next_state(SearchAttributeState2)


Expand All @@ -86,7 +118,7 @@ def execute(
persistence: Persistence,
communication: Communication,
) -> StateDecision:
# Delay updating search attributes to allow for the first assertion
# Delay updating search attributes to allow for the previous assertion
sleep(1)
persistence.set_search_attribute_keyword(sa_keyword_key, final_sa_keyword)
persistence.set_search_attribute_boolean(sa_bool_key, final_sa_bool)
Expand All @@ -101,7 +133,7 @@ def execute(
class PersistenceSearchAttributesWorkflow(ObjectWorkflow):
def get_workflow_states(self) -> StateSchema:
return StateSchema.with_starting_state(
SearchAttributeState1(), SearchAttributeState2()
SearchAttributeStateInit(), SearchAttributeState1(), SearchAttributeState2()
)

def get_persistence_schema(self) -> PersistenceSchema:
Expand Down Expand Up @@ -137,12 +169,41 @@ def setUpClass(cls):
def test_persistence_search_attributes_workflow(self):
wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}"

wf_opts = WorkflowOptions()
wf_opts = WorkflowOptions(
initial_search_attributes={
sa_keyword_key: initial_sa_keyword,
sa_double_key: initial_sa_double,
sa_int_key: initial_sa_int,
sa_bool_key: initial_sa_bool,
sa_datetime_key: initial_sa_datetime,
sa_keyword_array_key: initial_sa_keyword_array,
}
)
wf_opts.add_wait_for_completion_state_ids(SearchAttributeState1)
self.client.start_workflow(
PersistenceSearchAttributesWorkflow, wf_id, 100, None, wf_opts
)

initial_returned_search_attributes = self.client.get_all_search_attributes(
PersistenceSearchAttributesWorkflow,
wf_id,
)

initial_expected_search_attributes = dict()
initial_expected_search_attributes[sa_keyword_key] = initial_sa_keyword
initial_expected_search_attributes[sa_double_key] = initial_sa_double
initial_expected_search_attributes[sa_bool_key] = initial_sa_bool
initial_expected_search_attributes[sa_keyword_array_key] = (
initial_sa_keyword_array
)
initial_expected_search_attributes[sa_int_key] = initial_sa_int
initial_expected_search_attributes[sa_datetime_key] = (
"2024-11-10T00:00:01.731455544Z" # This is a bug. The iwf-server always returns utc time. See https://github.com/indeedeng/iwf/issues/261
# "2024-11-09T18:00:01.731455544-06:00"
)

assert initial_expected_search_attributes == initial_returned_search_attributes

self.client.wait_for_state_execution_completion_with_state_execution_id(
SearchAttributeState1, wf_id
)
Expand Down
15 changes: 15 additions & 0 deletions iwf/unregistered_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
parse_unexpected_error,
process_http_error,
process_workflow_abnormal_exit_error,
InvalidArgumentError,
)
from iwf.iwf_api import Client, errors
from iwf.iwf_api.api.default import (
Expand Down Expand Up @@ -62,6 +63,8 @@
from iwf.iwf_api.types import Response
from iwf.reset_workflow_type_and_options import ResetWorkflowTypeAndOptions
from iwf.stop_workflow_options import StopWorkflowOptions
from iwf.utils.iwf_typing import assert_not_unset
from iwf.utils.persistence_utils import get_search_attribute_value


@dataclass
Expand All @@ -76,6 +79,7 @@ class UnregisteredWorkflowOptions:
initial_data_attributes: Optional[dict[str, Any]] = None
wait_for_completion_state_execution_ids: Optional[list[str]] = None
wait_for_completion_state_ids: Optional[list[str]] = None
initial_search_attributes: Optional[list[SearchAttribute]] = None


T = TypeVar("T")
Expand Down Expand Up @@ -157,6 +161,17 @@ def start_workflow(
options.workflow_already_started_options
)

if options.initial_search_attributes:
for search_attribute in options.initial_search_attributes:
val = get_search_attribute_value(
assert_not_unset(search_attribute.value_type), search_attribute
)
if val is None:
raise InvalidArgumentError(
f"search attribute value is not set correctly for key {search_attribute.key} with value type {search_attribute.value_type}"
)
start_options.search_attributes = options.initial_search_attributes

if options.initial_data_attributes:
das = []
for key, value in options.initial_data_attributes.items():
Expand Down
23 changes: 23 additions & 0 deletions iwf/utils/persistence_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from iwf.iwf_api.models import SearchAttributeValueType, SearchAttribute
from iwf.utils.iwf_typing import unset_to_none


def get_search_attribute_value(
sa_type: SearchAttributeValueType, attribute: SearchAttribute
):
if (
sa_type == SearchAttributeValueType.KEYWORD
or sa_type == SearchAttributeValueType.DATETIME
or sa_type == SearchAttributeValueType.TEXT
):
return unset_to_none(attribute.string_value)
elif sa_type == SearchAttributeValueType.INT:
return unset_to_none(attribute.integer_value)
elif sa_type == SearchAttributeValueType.DOUBLE:
return unset_to_none(attribute.double_value)
elif sa_type == SearchAttributeValueType.BOOL:
return unset_to_none(attribute.bool_value)
elif sa_type == SearchAttributeValueType.KEYWORD_ARRAY:
return unset_to_none(attribute.string_array_value)
else:
raise ValueError(f"not supported search attribute value type, {sa_type}")
1 change: 1 addition & 0 deletions iwf/workflow_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class WorkflowOptions:
initial_data_attributes: Optional[dict[str, Any]] = None
_wait_for_completion_state_ids: list[str] = field(default_factory=list)
_wait_for_completion_state_execution_ids: list[str] = field(default_factory=list)
initial_search_attributes: Optional[dict[str, Any]] = None

@property
def wait_for_completion_state_ids(self) -> Optional[list[str]]:
Expand Down