Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 13 additions & 17 deletions durabletask/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -980,23 +980,6 @@ def execute(
for old_event in old_events:
self.process_event(ctx, old_event)

# Process versioning if applicable
execution_started_events = [e.executionStarted for e in old_events if e.HasField("executionStarted")]
# We only check versioning if there are executionStarted events - otherwise, on the first replay when
# ctx.version will be Null, we may invalidate orchestrations early depending on the versioning strategy.
if self._registry.versioning and len(execution_started_events) > 0:
version_failure = self.evaluate_orchestration_versioning(
self._registry.versioning,
ctx.version
)
if version_failure:
self._logger.warning(
f"Orchestration version did not meet worker versioning requirements. "
f"Error action = '{self._registry.versioning.failure_strategy}'. "
f"Version error = '{version_failure}'"
)
raise pe.VersionFailureException

# Get new actions by executing newly received events into the orchestrator function
if self._logger.level <= logging.DEBUG:
summary = _get_new_event_summary(new_events)
Expand Down Expand Up @@ -1068,6 +1051,19 @@ def process_event(
if event.executionStarted.version:
ctx._version = event.executionStarted.version.value

if self._registry.versioning:
version_failure = self.evaluate_orchestration_versioning(
self._registry.versioning,
ctx.version
)
if version_failure:
self._logger.warning(
f"Orchestration version did not meet worker versioning requirements. "
f"Error action = '{self._registry.versioning.failure_strategy}'. "
f"Version error = '{version_failure}'"
)
raise pe.VersionFailureException

# deserialize the input, if any
input = None
if (
Expand Down
Loading