Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

CHANGED

- Add/update type-hinting for various worker methods

## v1.2.0

ADDED:
Expand Down
4 changes: 2 additions & 2 deletions durabletask/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,8 +538,8 @@ def task_id(self) -> int:
return self._task_id


# Orchestrators are generators that yield tasks and receive/return any type
Orchestrator = Callable[[OrchestrationContext, TInput], Union[Generator[Task, Any, Any], TOutput]]
# Orchestrators are generators that yield tasks, recieve any type, and return TOutput
Copy link

Copilot AI Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spelling error: "recieve" should be "receive"

Suggested change
# Orchestrators are generators that yield tasks, recieve any type, and return TOutput
# Orchestrators are generators that yield tasks, receive any type, and return TOutput

Copilot uses AI. Check for mistakes.
Orchestrator = Callable[[OrchestrationContext, TInput], Union[Generator[Task[Any], Any, TOutput], TOutput]]

# Activities are simple functions that can be scheduled by orchestrators
Activity = Callable[[ActivityContext, TInput], TOutput]
Expand Down
14 changes: 7 additions & 7 deletions durabletask/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,42 +150,42 @@ def __init__(self):
self.entities = {}
self.entity_instances = {}

def add_orchestrator(self, fn: task.Orchestrator) -> str:
def add_orchestrator(self, fn: task.Orchestrator[TInput, TOutput]) -> str:
if fn is None:
raise ValueError("An orchestrator function argument is required.")

name = task.get_name(fn)
self.add_named_orchestrator(name, fn)
return name

def add_named_orchestrator(self, name: str, fn: task.Orchestrator) -> None:
def add_named_orchestrator(self, name: str, fn: task.Orchestrator[TInput, TOutput]) -> None:
if not name:
raise ValueError("A non-empty orchestrator name is required.")
if name in self.orchestrators:
raise ValueError(f"A '{name}' orchestrator already exists.")

self.orchestrators[name] = fn

def get_orchestrator(self, name: str) -> Optional[task.Orchestrator]:
def get_orchestrator(self, name: str) -> Optional[task.Orchestrator[Any, Any]]:
return self.orchestrators.get(name)

def add_activity(self, fn: task.Activity) -> str:
def add_activity(self, fn: task.Activity[TInput, TOutput]) -> str:
if fn is None:
raise ValueError("An activity function argument is required.")

name = task.get_name(fn)
self.add_named_activity(name, fn)
return name

def add_named_activity(self, name: str, fn: task.Activity) -> None:
def add_named_activity(self, name: str, fn: task.Activity[TInput, TOutput]) -> None:
if not name:
raise ValueError("A non-empty activity name is required.")
if name in self.activities:
raise ValueError(f"A '{name}' activity already exists.")

self.activities[name] = fn

def get_activity(self, name: str) -> Optional[task.Activity]:
def get_activity(self, name: str) -> Optional[task.Activity[Any, Any]]:
return self.activities.get(name)

def add_entity(self, fn: task.Entity) -> str:
Expand Down Expand Up @@ -362,7 +362,7 @@ def __enter__(self):
def __exit__(self, type, value, traceback):
self.stop()

def add_orchestrator(self, fn: task.Orchestrator) -> str:
def add_orchestrator(self, fn: task.Orchestrator[TInput, TOutput]) -> str:
"""Registers an orchestrator function with the worker."""
if self._is_running:
raise RuntimeError(
Expand Down
Loading