Durable workflows for modern Python. Build resilient async applications with native support for streaming and interruption.
- π¬ Interactive workflows β AI agents, chatbots, and human-in-the-loop automation with bidirectional streaming
- β‘ Crash recovery β Deterministic replay from append-only logs means workflows survive restarts
- π― Graceful interruption β Cancel or redirect operations mid-execution with signals
- π Zero dependencies β Pure Python built on asyncio, fully typed
- π§© Pluggable storage β Bring your own database or filesystem backend
Duron requires Python 3.10+.
uv pip install duron# /// script
# dependencies = ["duron"]
# ///
import asyncio
from pathlib import Path
from typing import Optional, TypedDict
import duron
from duron.contrib.storage import FileLogStorage
class Event(TypedDict):
"""
Event type for communicating workflow progress and approvals.
Fields:
message: Log or status message for the user.
approval_id: Durable future ID to request approval (None for normal logs).
"""
message: str
approval_id: Optional[str]
# -----------------------
# Effect definitions
# -----------------------
@duron.effect
async def check_fraud(amount: float, recipient: str) -> float:
"""Simulate a risk engine returning a fraud probability."""
print("Executing risk check...")
await asyncio.sleep(0.5)
return 0.85
@duron.effect
async def execute_transfer(amount: float, recipient: str) -> str:
"""Simulate a real transfer execution."""
print("Executing transfer...")
await asyncio.sleep(1)
return f"Transferred ${amount} to {recipient}"
# -----------------------
# Durable workflow
# -----------------------
@duron.durable
async def transfer_workflow(
ctx: duron.Context,
amount: float,
recipient: str,
events: duron.StreamWriter[Event] = duron.Provided,
) -> str:
"""
Durable workflow to execute a transfer with fraud detection
and optional manager approval.
"""
async with events:
# Log start of transfer
await events.send({
"message": f"Checking transfer: ${amount} β {recipient}",
"approval_id": None,
})
# Step 1: Fraud check
risk = await ctx.run(check_fraud, amount, recipient)
# Step 2: Approval required if high risk
if risk > 0.8:
approval_id, approval = await ctx.create_future(bool)
await events.send({
"message": "β οΈ High risk - approval required",
"approval_id": approval_id,
})
if not await approval:
await events.send({
"message": "β Transfer rejected by manager",
"approval_id": None,
})
return "Transfer rejected"
# Step 3: Execute transfer
result = await ctx.run(execute_transfer, amount, recipient)
await events.send({"message": f"β {result}", "approval_id": None})
return result
# -----------------------
# Host process
# -----------------------
async def main():
"""
Run the workflow locally with file-based state storage.
"""
async with duron.Session(FileLogStorage(Path("transfer.jsonl"))) as session:
task = await session.start(transfer_workflow, 10000.0, "suspicious-account")
stream = await task.open_stream("events", "r")
async def handle_events():
async for event in stream:
# Always print message
print(event["message"])
# If approval_id is present, prompt for manager decision
# If the future is not pending, it means it was already resolved (e.g., workflow resumed)
if event["approval_id"] and task.is_future_pending(
event["approval_id"]
):
decision = await asyncio.to_thread(input, "Approve? (y/n): ")
await task.complete_future(
event["approval_id"], result=(decision.lower() == "y")
)
await asyncio.gather(task.result(), handle_events())
if __name__ == "__main__":
asyncio.run(main())- Read the getting started guide
- Explore a more advanced example with streams and signals: examples/agent.py