feat: DAG-based workflow support#75
Conversation
|
Please make the following changes in this PR:
Why I’m asking for this:
|
|
Fixed in 731b253 — all 5 items addressed:
|
|
Here’s a concise comment you can paste into PR #75: This looks great. One capability we’ll need next is conditional downstream execution based on upstream outputs. For example, we need to express logic like:
So beyond pure dependency ordering, it would be helpful to support an optional condition on a task (or edge) that the coordinator evaluates once upstream dependencies are complete. Concretely, something along these lines:
I’d strongly prefer a structured predicate over arbitrary expression evaluation, e.g. a validated condition model rather than raw code/strings, since that will be safer and easier to test/document. Not asking to block this PR on it, but I’d like to make sure the workflow model can evolve in that direction, since conditional branching is a key use case for us. If you want, I can also give you:
|
|
Good call — conditional branching is a natural next step. The current model should extend cleanly:
Will track as a follow-up. |
|
This is great. One specific suggestion for the condition model: please make conditions evaluate against the outputs of all direct dependencies, exposed through a templated path syntax. For example, for a task
and then compose them normally:
I’d suggest these semantics:
I’d also keep the first version intentionally small and validated:
That would cover the cases we need — including conditions depending on two inputs or a set of inputs — while keeping the model predictable, safe, and easy to document/test. |
|
Implemented in ad02eb8 — replaced structured What changed:
|
There was a problem hiding this comment.
Pull request overview
Adds a new ai4s.jobq.workflow module that layers DAG-based orchestration on top of the existing JobQ backends, persisting workflow/task state in Azure Table Storage and advancing workflows via a coordinator consuming completion messages.
Changes:
- Introduces workflow core components (entities/definition validation, Table Storage store, coordinator loop, workflow-aware worker processor, local runner, condition evaluator).
- Adds CLI support for
ai4s-jobq workflow ...pluspush --workflow, along with extensive documentation/tutorials. - Adds tests for workflow entities/condition evaluation and Table Storage store behavior (Azurite).
Reviewed changes
Copilot reviewed 20 out of 20 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
| vale-styles/config/vocabularies/JobQ/accept.txt | Adds workflow terminology to Vale accepted vocabulary. |
| tests/test_workflow_store.py | Adds Azurite-backed integration tests for WorkflowStore. |
| tests/test_workflow_entities.py | Adds unit tests for DAG validation, serialization, and condition evaluation. |
| pyproject.toml | Adds [workflow] optional dependency (azure-data-tables). |
| docs/workflows.md | Adds workflow architecture and user guide documentation. |
| docs/workflow-tutorial.md | Adds an end-to-end local tutorial using Azurite. |
| docs/workflow-races.md | Documents race conditions, failure modes, and mitigations. |
| docs/index.rst | Links new workflow documentation into Sphinx TOC. |
| ai4s/jobq/workflow/worker.py | Implements WorkflowShellCommandProcessor and workflow context helper for workers/scripts. |
| ai4s/jobq/workflow/store.py | Implements Table Storage persistence with ETag-guarded updates and query helpers. |
| ai4s/jobq/workflow/runner.py | Adds LocalRunner for in-process workflow execution/testing. |
| ai4s/jobq/workflow/entities.py | Defines workflow/task/completion dataclasses and (de)serialization helpers. |
| ai4s/jobq/workflow/data/workflow-definition.schema.json | Adds JSON Schema for workflow definition files. |
| ai4s/jobq/workflow/coordinator.py | Adds stateless coordinator loop to process completions and enqueue ready tasks. |
| ai4s/jobq/workflow/context.py | Adds WorkflowContext plus sync helpers for user scripts (get_upstream_output, etc.). |
| ai4s/jobq/workflow/condition.py | Adds restricted AST-based condition validator/evaluator with aggregates + globbing. |
| ai4s/jobq/workflow/client.py | Adds WorkflowClient API for submit/status/list/cancel/purge and summary aggregation. |
| ai4s/jobq/workflow/cli.py | Adds ai4s-jobq workflow subcommands (submit/status/list/tasks/summary/cancel/purge/run-local/coordinator). |
| ai4s/jobq/workflow/init.py | Exposes workflow public API surface. |
| ai4s/jobq/cli.py | Wires workflow commands into the main CLI and adds push --workflow plus auto processor upgrade. |
|
I think we should follow up on stage 1 + stage 2 coordinator throughput improvements before adding more coordinator parallelism. Stage 1
Stage 2The bigger issue is that the hot path currently does a full Please switch to:
Important detail: please document and keep consistent whether Defensive testing I’d want with this
My reason for pushing on this now: the current implementation is correct, but the hot path is optimized for simplicity, not throughput. These two stages seem like the best next step before scaling out coordinator concurrency. |
|
Implemented in 489aa4f — Stage 1 + Stage 2 coordinator throughput improvements with defensive tests. Stage 1: Definition cache, ETag retry on recount_summary(), CoordinatorMetrics instrumentation. Stage 2: Incremental increment_summary() replaces full recount in hot path. recount_summary() kept as repair path only. ready counts as running at summary level (documented, consistent). Initial counters fixed. Clamping prevents negative drift. 20 new tests: Definition cache hit/miss, incremental summary transition matrix, diamond fan-in, failure propagation, reconciliation (corrupt counters repaired), duplicate completion idempotency, wide fan-in (10→1), initial counter accuracy. |
`run-local` previously printed a single startup line, dropped silent during the run (apart from DEBUG "Running workflow X" / "Workflow X finished" entries from the runner), and emitted a final summary panel only after every workflow completed. With thousands of workflows the CLI looked stuck for minutes. `LocalRunner` already exposes an `on_progress(event, task_delta, wf_delta)` callback; the CLI just was not wiring it up. Add a Rich `Progress` with two bars (Workflows and Tasks) populated from the aggregate summary and updated from the runner callback. Users now see continuous task throughput plus per-bar elapsed/remaining estimates while the run is in flight. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
When `workflow run-local` is invoked with `--concurrency <= 5`, the user is almost always debugging a specific run rather than churning through a stress test. In that mode it's helpful to see which workflow is currently executing and how far along its task DAG it is, not just a summary of "N tasks done". Add a per-workflow bar that appears when each workflow is picked up by a worker and disappears when it finishes. Total task count comes from the first `get_workflow_status` round, so each bar is correctly sized. At higher concurrencies (the default 50, stress tests) we keep just the two summary bars to avoid screen thrash. To support this, replace `LocalRunner.on_progress`'s old tuple-style callback `(event, task_delta, wf_delta)` with a typed `ProgressEvent` dataclass that carries `workflow_id` and (for `workflow_started`) `total_tasks`. Only the workflow CLI consumed the old callback, so this is purely a refactor — no public-API impact. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
… with logger Two unrelated fixes that became visible together while watching `workflow run-local --concurrency 5`: 1. **Per-task hot-path was doing a full table scan.** `LocalRunner._apply_and_advance` was calling `recount_summary` after every task — a method whose own docstring labels it as the reconciliation/repair path that scans every task row in the workflow partition. It was also re-fetching and re-parsing the workflow definition for every task. Mirror the coordinator's hot path instead: cache the definition once in `_run_one`, then use atomic `increment_summary` deltas (running → completed/failed plus any children that flipped pending → ready/upstream_failed). This drops per-task latency from N table-scan round-trips to a single ETag-protected counter update — what made per-workflow progress bars look stuck. 2. **Live Rich progress bars were being clobbered by log records.** Both the `run-local` Progress and the batch-`submit` Progress created their own `Console()` instances, while the `RichHandler` installed by `setup_logging` rendered to a different console. Whenever a log record fired during a live region, the handler wrote to its own console and overwrote the bar (only briefly, because Rich repaints — but visibly enough that the user noticed "random log messages" flashing). Promote `manager._get_rich_console` to a public `logging_utils.get_rich_console()` and use it as the `console=` argument for every `Progress` in the workflow CLI (`run-local`, batch-`submit`, `purge`). When a Progress shares its console with the logger, Rich pauses the live region, prints the log record above the bars, and re-renders cleanly. While here, also tighten the `LocalRunner.on_progress` callback to the new `ProgressEvent` dataclass introduced in the previous commit (workflow_started carries `total_tasks` for per-workflow bar sizing). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…event Two bugs the live progress bars in `workflow run-local` were exposing once they finally got to render properly: 1. **Tasks bar advanced twice per task.** An earlier edit to `LocalRunner._apply_and_advance` left the original `self._on_progress(...)` call in place AND added a new one, so every task completion fired the callback twice. Caught by counting events vs `stats.tasks_completed` in a small repro (15 actual completions → 30 task_completed events). Remove the duplicate. 2. **Tasks bar total was wrong.** `workflow_run_local` set the bar's total to `agg.total_tasks`, which sums every workflow including already-completed and failed ones, so a fresh purge+submit+run could legitimately end at "25/40". Compute the bar total from the active (pending+running) workflows instead, and pre-credit the bar with tasks already finished in those workflows so resumed runs end at 100% as well. End-to-end on Azurite: a 5×5-task purge+submit+run-local now ends cleanly at 25/25 tasks, 5/5 workflows. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
`LocalRunner._run_one` was calling ``await asyncio.gather(*coros)`` on every round and only firing ``task_completed`` events from the post-gather loop. When a single round had many ready tasks (e.g. diamond width=20, wide_parallel up to 80) and the per-callback semaphore was tight (concurrency=5), the gather blocked for ``ceil(N/concurrency) * sleep_s`` seconds with zero progress events — per-workflow bars stayed at 0 then jumped straight to N the moment the round finished. Switch to ``asyncio.as_completed`` so each completion is applied (and its progress event emitted) the instant the underlying callback returns, even if siblings in the same round are still running. Per-workflow bars now tick smoothly during fan-outs. The change is safe because all per-task state mutations (``increment_child_deps``, ``increment_summary``) are already atomic via ETag-CAS, so interleaving completions from the same round behaves identically to applying them in batch. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Each ready task in _run_one needed two sequential Azure round-trips before its callback could even be scheduled — get_task_entity to fetch kwargs, plus update_entity to mark it running. For a wide_parallel-80 workflow against real Azure (~100 ms latency) that was 80 × 2 × 100 ms = ~16 s of zero progress before the first task even started running. Combined with the as_completed streaming fix in 2feb7d5, this is what was making per-workflow bars sit at 0/80 for tens of seconds before suddenly ticking. Hoist the prep into an inner _prep coroutine and run them in parallel via asyncio.gather, so all N reads + writes happen concurrently and the first callback can fire after a single round-trip's worth of latency rather than 2N. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
* purge --drain-queues now drains the task queues and the coordinator's completion queue alongside the table purge. Useful after a coordinator-driven run where in-flight messages would otherwise surprise the next run. * The confirmation prompt now lists the discovered queues and the queue backend so users see exactly what's about to be drained. * monitor.py: change --prefix default from 'JobQ' to None, so JOBQ_WORKFLOW=<account>/<prefix> is actually honored. Previously the hard-coded default silently overrode the env var, causing monitor to read from the wrong tables (e.g. user submits to account/StressTest, monitor reads account/JobQ and sees stale data). * monitor.py: added a startup banner printing the storage account and table names being polled, plus a hint that queue depths are not tracked. * README: document the new --drain-queues flag and clarify what monitor.py covers. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…ning'
The workflow row's 'running' counter is initialised at submission to
the number of root tasks (tasks with no dependencies), so for
wide-fanout workflows every task shows as 'running' from t=0 even
before a worker has picked it up. Rename the column to match the
actual semantics ('ready or running') rather than overpromising.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Only the 'resilient_diamond' topology injects failures (about 30% of branches get fail_probability=0.5). The merge task uses dep_policy='any' so the workflow itself almost never fails, but individual branch tasks do. generate.py now prints how many tasks were tagged flaky and the expected number of task failures so users can sanity-check what they see in monitor.py. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
`WorkflowStore.increment_child_deps` short-circuits when the child is already in a non-pending state — a common case under `dep_policy='any'` (one success suffices) or when a late failure arrives at a merge that's already ready. But callers in both the LocalRunner and the Coordinator interpreted any "ready" / "upstream_failed" return as a fresh transition, so every non-first sibling parent ended up bumping the workflow's running counter and decrementing pending again. Symptoms: - Workflows with resilient diamonds (any-policy merges with flaky branches) finish all their tasks but the workflow summary row stays at status='running' with phantom running/pending counters, visible in monitor.py and `workflow list`. - For the coordinator path, the merge task was also enqueued multiple times — once per successful sibling parent beyond the first — risking duplicate work and queue-backed lock contention. Fix: - `increment_child_deps` now returns `(TaskStatus, transitioned)` where transitioned=False means the call observed an already- non-pending child and didn't mutate it. - Coordinator and runner only count / enqueue when transitioned is True and the new status is ready/upstream_failed. - LocalRunner._run_one runs a single recount_summary after each workflow as a safety net to self-heal any drift from previously affected workflows. - Added `test_any_policy_no_summary_drift` regression test: resilient diamond width=5, one flaky branch fails, merge runs, workflow must reach a terminal status with running=pending=0. - Tests using `await store.increment_child_deps(...)` updated to unpack the new tuple return. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…letions - Make Coordinator._process_completion defensive against ResourceNotFoundError on the workflow row: log a warning and return instead of crashing the consumer loop. This also helps real-world scenarios (a worker had a completion in-flight when an operator purged workflow data, or a queue retry delivers a duplicate after the workflow row was deleted). - TestRealShellCommands previously shared the global jobq-workflow-completions queue and the literal wf-test-shell task queue across runs. Azurite preserves in-flight messages (with active visibility leases) across queue delete+recreate, so stale messages from prior test runs were re-delivered to the new run's coordinator with an unknown workflow id. Each test now uses unique wf-test-shell-<rand> and wf-test-comp-<rand> queue names and passes the latter through both WorkflowShellCommandProcessor and Coordinator constructors. - Strip stale JOBQ_WORKFLOW_STATE/_PREFIX/_QUEUES/_BLOBS env vars in fixtures (TestWorkflowCLI.cli_env, TestRealShellCommands.workflow_env) so developer-shell leakage doesn't trip the CLI's hard-break check. Add an autouse _clean_env fixture to TestCLISubmitAndRunLocal which doesn't share cli_env. - Update unit-test callers in tests/test_workflow_store.py to unpack the new (TaskStatus, transitioned) tuple returned by increment_child_deps. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The completion queue was a single shared string ("jobq-workflow-completions")
regardless of which workflow prefix the deployment used. That meant
multiple workflow deployments sharing the same storage account but with
different table prefixes would cross-contaminate completions: a worker
in deployment A would push to the shared queue, and the coordinator in
deployment B would pick it up and try to look up the unknown workflow
in B's tables.
This change derives the completion queue name from the workflow prefix
(``<prefix>-workflow-completions``, lower-cased to satisfy Azure
Storage Queue naming rules). The default prefix ``"JobQ"`` yields
``"jobq-workflow-completions"``, so existing single-deployment users
keep the same queue name; users who set a custom prefix get their own
isolated queue automatically.
Changes:
- ``ai4s.jobq.workflow.store.completion_queue_name(prefix)`` new helper.
- ``WorkflowStore.prefix`` / ``.completion_queue`` properties so the
queue name is easily reachable from anywhere that has a store handle.
- ``Coordinator.__init__`` defaults ``completion_queue`` to
``store.completion_queue`` (was the hardcoded global constant).
- ``Coordinator.from_environment`` derives the queue from ``env.prefix``.
- ``WorkflowShellCommandProcessor`` derives the queue from its
``WorkflowEnv.prefix`` at startup (workers and coordinator pick the
same name automatically as long as ``JOBQ_WORKFLOW`` agrees).
- ``WorkflowClient.discover_queues`` (used by ``purge --drain-queues``)
uses ``store.completion_queue`` so the right queue gets drained.
- ``ai4s-jobq workflow doctor`` reports and probes the prefix-scoped
queue.
- Docs (workflows, workflow-races, workflow-tutorial) updated to
describe the new naming scheme.
- The ``COMPLETION_QUEUE`` module-level constant in ``coordinator.py``
is preserved for backwards compatibility but now resolves to the
default-prefix queue name (no behavioural change for default users).
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The in-process LocalRunner wrote 'started_at' as a '.isoformat()'
string, while WorkflowStore.mark_task_running wrote a 'datetime'.
Azure Tables roundtripped each value with the type it was written
as, so 'TaskStatus.started_at' could be either 'datetime' or 'str'
depending on which code path claimed the task.
Consumers that expected a 'datetime' tripped over this:
File '.../workflow/cli.py', line 976, in doctor
age_min = (now - t.started_at).total_seconds() / 60
TypeError: unsupported operand type(s) for -: 'datetime.datetime' and 'str'
The same latent bug exists in 'coordinator.py' (task-timeout sweep).
Fix in two parts so it's robust to existing rows already on disk:
1. 'runner.py' now writes a 'datetime' (matching the store), so new
rows are typed consistently.
2. '_row_to_task_status' / '_row_to_workflow_status' route every
timestamp field through a new '_to_datetime' helper that parses
legacy ISO strings (with or without a trailing 'Z') back into
aware UTC 'datetime's. Unparseable or missing values become
'None' (for optional fields) or fall back to 'now' (for required
workflow timestamps).
Adds focused unit tests for '_to_datetime' and a regression test that
'_row_to_task_status' coerces a string 'started_at' to a 'datetime'.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Replaces ~150 string-literal state comparisons across the workflow
subsystem with str-Enum members. Each enum subclasses str so existing
string-typed APIs (Azure Table rows, JSON serialisation, CLI flags)
keep working unchanged.
New enums in entities.py:
- TaskState (pending/ready/running/completed/failed/skipped/
upstream_failed/cancelled) with classmethod predicates
is_terminal/is_active/is_dep_satisfied/is_dep_failed backed by
ClassVar frozenset members.
- WorkflowState (pending/running/completed/failed/cancelled) with
is_terminal/is_active.
- DepPolicy (all/any).
Each enum overrides __str__ to return self.value so f-string and log
formatting produce the bare value (e.g. 'ready') rather than the enum
name (e.g. 'TaskState.READY'). This is needed because the project
targets Python 3.10, where StrEnum is unavailable.
Migrations:
- store.py: state checks and writes throughout; collapses
'all' if … else 'any' if … else str(...) ternary in
submit_workflow to plain str(t.dep_policy); replaces singleton
'in ("cancelled",)' / 'not in ("pending",)' with == / !=.
- coordinator.py: _process_completion and _process_synthetic_completion
now use 'match result.status', extracting _handle_task_completed
and _handle_task_failed helpers.
- runner.py: _apply_and_advance's child loop uses 'match
child_result.status'.
- context.py: get_available_upstream_outputs uses
TaskState.is_dep_satisfied.
- cli.py: adds _styled_status helper used at four sites;
--status options on 'workflow list' and 'workflow tasks' now use
click.Choice backed by the enums.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
….from_account Three near-identical 13-line blocks in coordinator.from_environment, client.from_environment, and context._store_from_env each dispatched an account string to the appropriate WorkflowStore factory based on its shape. They are now collapsed into a single classmethod, WorkflowStore.from_account(account, *, prefix), which performs the dispatch in one place. The 'workflow' CLI's _make_client also uses it. Also rewrites _jobq_for_queue as a 'match' statement for symmetry with the rest of the refactor. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
_is_dep_policy_satisfied and _is_dep_policy_doomed now dispatch on DepPolicy.ALL / DepPolicy.ANY / numeric (default case), making the two-branches-plus-fallback structure self-documenting. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Adds tests/test_workflow_perf.py: a stress-marked end-to-end test
that submits a 1000-task fan-out/fan-in workflow against Azurite
and asserts ≥50 cps single-workflow throughput on the coordinator
hot path.
Workflow shape: 1 root + 998 leaves (all depending on root) + 1
terminal joining all leaves. The test simulates workers by draining
the task queue and bulk-pushing synthetic WorkflowCompletion
messages directly to the completion queue. The measurement window
covers the steady-state batch-flush-ack loop in run_once(), which
is exactly the metric the clean-cut design promises (plan.md R4).
Local measurement: 998 completions in 12.18 s = 81.9 cps
(33 batches, no flush conflicts). Clears the 50 cps target with
~64 % headroom.
Run with:
pytest tests/test_workflow_perf.py --run-stress-tests -v
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Cuts the legacy workflow stack and the abandoned green-field A/B/C/D
modules now that R0-R4 land a working replacement (state + persistence
+ coordinator + client + worker, ~2.3 k LOC of new production code).
Production files removed (~13.5 k LOC):
* coordinator.py (legacy, 4 254 LOC)
* store.py (legacy, 3 502 LOC)
* worker_legacy.py, client_legacy.py (the R3 rename targets)
* _actor.py, _lease.py, summary_coalescer.py, child_dep_coalescer.py
* diagnostics.py, diagnostics_catalogue.py
* dag.py, snapshot.py, hot_path.py, control.py, replay.py
(abandoned green-field)
* _az_broker.py, _az_stores.py (untracked, also abandoned)
Test files removed (~19 k LOC):
* test_workflow_{actor,actor_integration,adversarial,batched_writes,
budget,bugs,causal_annotations,client_legacy,coalescers,consistency,
context,control,coordinator,dag,diagnostics,diagnostics_catalogue,
diagnostics_loop,e2e,explain_cli,hot_path,infra_recovery,lease,
replay,snapshot,store,summary_accumulator,terminal_index,
worker_legacy}.py
* test_result_policy.py (tested a deleted legacy helper)
Surviving rewrites:
* context.py: rewritten to drop the WorkflowStore dependency. The
user-facing surface (WorkflowContext, set_output, get_upstream_output,
get_upstream_outputs, is_cancelled) is preserved; under the hood it
now uses WorkflowPersistence and the worker's JOBQ_WORKFLOW_UPSTREAM_REFS
sidecar (no full-runtime load needed on the hot path).
* __init__.py: drop the WorkflowStore re-export.
* ai4s/jobq/track/: patch utils/workflow_store.py and
components/connection_info.py to use WorkflowPersistence and
ai4s.jobq.workflow.ids instead of the deleted store.py symbols.
File renames:
* coordinator_v2.py → coordinator.py
* test_workflow_coordinator_v2.py → test_workflow_coordinator.py
Test stash trimmed: TestBlobStashEndToEnd dropped (heavy legacy-store
coupling, redundant with the integration coverage in
test_workflow_worker.py); the wire-format + materialisation +
download tests stay and are pointed at the new
WorkflowShellCommandProcessor.
Verification:
* 371 passed, 17 skipped, 0 failures across the full non-stress
pytest suite (~193 s).
* test_workflow_perf still hits 81.9 cps with --run-stress-tests.
* ruff check + format clean.
* mypy: 0 new errors (3 pre-existing errors in
ai4s/jobq/track/components/workflow_graph.py remain, unrelated).
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Sync the documentation surface with the runtime-blob/single-coordinator
architecture shipped in R0–R5, restore the user-facing
get_real_upstream_tasks() helper so the --max-fan-in pattern keeps
working, and add a 3.14.0 CHANGELOG entry that itemises the breaking
changes vs 3.13.x.
Code changes:
* Restore get_real_upstream_tasks on WorkflowContext and
_LazyWorkflowContext (sync wrapper at module level, exported via
ai4s.jobq.workflow). Walks the runtime DAG, skipping __batch_merge
nodes inserted by sequentialize_fan_in to recover the original
upstream task names.
* Two new tests in tests/test_workflow_worker.py covering both the
with-merge and no-merge cases.
* Rewrite sequentialize_fan_in's docstring to drop the dead
ChildDepCoalescer / Tables 64 KB row-cap framing; new wording
describes the ETag-CAS contention on the runtime blob.
Docs:
* Delete docs/workflow-design.md (was a 562-line orphan spec of the
deleted per-task-row architecture).
* Rewrite docs/workflow.md and docs/workflow-races.md end-to-end for
the new model: one runtime-state blob per workflow, ETag-CAS on
every flush, exactly one coordinator process per prefix.
* Surgical edits to docs/workflows.md: new how-services-interact
table, diamond message-flow narrative, coordinator-driven retry
section, ready-repair sweep coverage, new tuning-flag table (7
flags), new env-var table. Drops all single-coordinator-mode,
coalescer, recount/sweeper, and 64-KB-property-limit language.
* docs/api.md: replace client._store.list_tasks(...) with the public
client.list_tasks(...) API.
* docs/workflow-tutorial.md: env-var table now references
{prefix}WorkflowsIndex Table + {prefix}-workflows blob container
(the old {prefix}Workflows / {prefix}WorkflowTasks tables are
gone).
CHANGELOG:
* Add 3.14.0 (2026-05-20) entry covering the breaking persistence
switch (state lives in a blob, not Tables), the single-coordinator
requirement, every removed CLI flag and env var, the removed
resubmit-task subcommand, the new coordinator-side retry
semantics, the restored get_real_upstream_tasks helper, and the
measured ~80 cps Azurite throughput. Includes a migration note:
drain in-flight workflows before upgrading; legacy tables and the
coordinator-locks container can be deleted manually after the
rollback window.
Other:
* Refresh .github/copilot-instructions.md "Workflow module" section
with the new module list, key patterns, single-coordinator
warning, and the new env vars.
* Add 'indexable', 'unacked', and dedup(s|ed|ping)? to the JobQ Vale
vocabulary so the prose lints clean.
Validation:
* ruff check + ruff format check both green.
* mypy ai4s/ --no-namespace-packages: no new errors (3 pre-existing
errors in track/components/workflow_graph.py).
* markdownlint and vale --minAlertLevel=error: clean across the
changed docs.
* pytest -q (full non-stress suite): 373 passed, 19 skipped, 0
failures (the lone error is examples/stress_test/test_wide_fanin.py,
a pre-existing example referencing the now-deleted WorkflowStore;
not part of the production test suite).
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Replace the inlined {parent_name: full_blob_ref} dict in task queue
messages with a compact form that strips the redundant
blob:{workflow_id}/ prefix and .json suffix for parents stashed under
the canonical name (the common case).
The new wire format is a dict with up to three optional sections:
{
"s": ["pA", "pB", ...], # canonical blob refs
"i": {"pC": "<json>", ...}, # non-canonical / inline refs
"n": ["pD", ...], # parents with no output (None)
}
At a 2 060-parent fan-in (cse-mindless's widest leaf) the queue
message shrinks from ~105 KiB (over Azure Storage Queue's ~48 KiB
raw cap) to ~25 KiB — a ~4x reduction that lifts the practical
fan-in ceiling without any extra IO on the hot path.
The coordinator and client now emit `__upstream_outputs_compact`;
the worker accepts either it or the legacy `__upstream_output_refs`
dict so in-flight messages from older coordinators continue to work.
Also corrects the `sequentialize_fan_in` and workflows.md sizing
narratives, which previously over-claimed CAS-contention as the
fan-in concern. In the v1 single-coordinator design CAS conflicts
are rare (per-workflow flushes are serialised), and the real cap
is queue-message size — which the compact encoding now addresses
directly.
Includes 12 new unit tests covering round-trip encoding, mixed
ref shapes, legacy-format fallback, and a size-budget assertion
that pins the 2 060-parent canonical case under the queue cap.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The pre-R5 DummyWorkflowProcessor subclassed Processor and pulled in WorkflowCompletionEmitter — both deleted in the workflow rewrite. The new implementation subclasses WorkflowShellCommandProcessor to inherit the completion-publish, cancel-poll, and queue-handle plumbing, then overrides __call__ to skip the per-task subprocess fork. Concretely it: * Pops the workflow metadata kwargs the parent class would normally read (__workflow_id / __workflow_task / __attempt_no plus the new __upstream_outputs_compact wire payload). * asyncio.sleeps for sleep_s (default 0). * Honours fail_probability for chaos testing. * Builds a WorkflowCompletion (with serialize_output for the tiny inline result) and publishes via the inherited _publish_completion (tenacity-wrapped). This keeps shell-fork overhead out of throughput measurements — the intended use is large stress workloads (cse-mindless, etc.) where the per-task work is itself trivial and the subprocess churn would mask the coordinator + queue + persistence numbers we actually want to measure. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- coordinator: add _sweep_stuck_running_loop, _sweep_stuck_running_once, and _timeout_single_workflow methods. RUNNING tasks that exceed their per-task timeout_s (or the coordinator-level running_timeout_s default) are failed by the sweep, triggering the normal retry/fail cascade. - coordinator: add running_sweeps and tasks_timed_out fields to _Stats. - cli: add --running-timeout-s and --running-sweep-interval-s options to the 'coordinator' subcommand (envvars JOBQ_COORDINATOR_RUNNING_TIMEOUT_S and JOBQ_COORDINATOR_RUNNING_SWEEP_INTERVAL_S); update startup banner. - tests/test_workflow_recovery.py: 16 Azurite-backed tests covering every infrastructure failure point in the coordinator/worker chain: crash-before-flush, crash-after-flush-before-ack, worker redelivery deduplication, stale attempt_no ignored, push-before-flush recovery, ETag conflict retry, output_ref=None handling, upstream blob missing, push failure leaves blob unchanged, index write failure, workflow deleted mid-run, cancel flush conflict, stuck-READY repair sweep, and stuck-RUNNING timeout sweep (coordinator default + per-task + no-op). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- persistence.py: gzip-compress state blobs at level 1 on write (15.5 MB → 1.4 MB for 16 k-task workflows, 91% reduction); decompress transparently on read via magic-bytes check so existing plain-JSON blobs still load. Added _encode_state / _decode_state helpers. - client.py _dispatch: replace serial for-loop with asyncio.gather + 64-wide semaphore so 2108 root-task pushes happen concurrently instead of one at a time. - coordinator.py _push_ready_tasks: same concurrency fix. - cli/__init__.py workflow_submit: wrap the load/validate/submit sequence in a Rich status spinner with live progress text. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
coordinator: skip the blob download on the hot path by caching (WorkflowRuntime, etag) after every flush. On a single 16k-task workflow the download+parse accounted for ~250ms per 32-completion cycle; eliminating it roughly doubles sustained throughput. Also accumulate up to 4 back-to-back receive calls before processing, amortising the blob round-trip over up to 128 completions instead of 32. Cache invariants: - Updated with fresh (runtime, etag) after every successful flush. - Evicted (not updated) by cancel / ready-repair / running-timeout sweepers, which load independently and could race the main loop. - Evicted on WorkflowConflictError, forcing a clean reload. - Evicted on workflow completion/failure to release memory. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Catch KeyboardInterrupt and CancelledError around coord.run(), call coord.stop(), and print a shutdown summary with final completions/tasks-pushed/batches stats. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
_render_status accepts an optional cps float. When provided: - Adds 'Rate: X.X completions/s' line to the panel header - Adds 'eta Xs/Xm/Xh' to the panel subtitle based on remaining tasks workflow_watch tracks rate using an exponential moving average (alpha=0.3) over per-poll samples, so the displayed rate smooths out rather than jumping on single-poll outliers. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
EMA on per-poll deltas was averaging in zero-delta polls between coordinator flushes, giving a misleadingly low rate when completions arrive in bursts. Overall average (total_completed / total_elapsed since watch started) gives the true sustained throughput. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Logs a one-liner every 10s at INFO level: coordinator stats: completions=1280 (+128, 102.4/s) pushed=384 batches=42 cached=1 conflicts=0 dupes=0 latency=0.021s Fields: - completions: cumulative + delta since last log + rate (completions/s) - pushed: tasks dispatched to worker queues (children + root) - batches: receive cycles processed - cached: workflows currently held in the in-memory runtime cache - conflicts: ETag flush conflicts (should be ~0 in single-coordinator) - dupes: stale/duplicate completions dropped - latency: last per-workflow processing time Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
When a single receive batch spans multiple workflow IDs (3 workflows seen in the stats log), the previous code processed them sequentially. Each blob flush takes 2-8 s, so N workflows × 3 s/flush = 9 s/batch, giving only 1 batch per 10 s window. Since each workflow has its own blob path and ETag guard there is no cross-workflow shared state, so asyncio.gather over _process_workflow is safe. The batch now takes max(latencies) instead of sum(latencies). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Previously `workflow purge` (without --id) only deleted terminal (completed/failed/cancelled) workflows via terminal_only=True. This left running workflows alive after a purge --drain-queues, causing their state blobs to persist; the coordinator's ready-repair sweep would then re-dispatch their stuck tasks, polluting the completion queue with completions for zombie workflows. Changes: - Add --all flag to `workflow purge` that sets terminal_only=False, deleting all workflows including those still in running state - Add terminal_only param to WorkflowClient.purge() (default True for backward compatibility) - Fix misleading confirmation message: now says "terminal (completed/failed/cancelled)" vs "ALL (including running)" to accurately reflect what will be deleted Usage for a full reset between test runs: ai4s-jobq workflow purge --all --drain-queues --yes Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Two changes to reduce flush latency for large workflows: 1. Cache BlobClient per workflow_id in WorkflowPersistence._blob_client_cache. Previously get_blob_client() was called on every load()/flush(), allocating a new BlobClient + Pipeline + TransportWrapper each time. The underlying aiohttp session is shared (AsyncTransportWrapper.close() is a no-op), but per-call object creation adds unnecessary overhead. 2. Add update_index: bool param to flush() (default True for backward compat). The coordinator now passes update_index=runtime.is_terminal() so the Table Storage upsert — including the O(N) to_status() scan over all tasks — only runs when the workflow reaches a terminal state. Sweepers and the cancel path retain update_index=True since those state changes are meaningful to watch/list/status users. For a 16,774-task workflow this removes one sequential HTTP round-trip from every intermediate batch flush. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Previously the coordinator flushed the state blob after every batch of
32 completions. For barrier DAGs (wave-2 tasks depend on all wave-1
tasks), intermediate flushes never unlock new work: no wave-2 task has
all its dependencies satisfied until all wave-1 completions arrive.
This caused ~17 sequential blob writes with zero tasks dispatched in
between ("pushed=0" starvation).
New approach:
* Receive up to _MAX_RECEIVE_LOOPS=64 batches (2,048 messages) before
writing anything to Blob Storage. Apply each batch in-memory and
push ready tasks to workers immediately (no write needed).
* After the inner loop drains (or hits the deadline), flush all dirty
workflows in parallel in a single gather().
* Each workflow's messages are acked only after its own flush succeeds,
preserving at-least-once durability.
* Deadline is visibility_timeout * 0.8 so early messages don't expire
before we ack them.
* ETag conflicts in the flush phase: reload, re-apply accumulated items,
re-push (duplicate pushes are idempotent), retry up to
flush_retry_limit times. On final failure, defer ack so messages are
redelivered by the broker.
* run_once() returns messages *received* (drives event loop); the
completions_handled stat counts only successfully acked messages.
* Removed dead _process_workflow() (superseded by the new path).
* Updated test mock flush signatures to accept **kw for update_index.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Replace random fail_probability with a deterministic hash-based model:
* Each task carries a fail_threshold int kwarg (0–100 = percent).
* DummyWorkflowProcessor fails a task on its **first attempt** when
sha256(task_name)[:2] % 100 < fail_threshold; succeeds on every
retry. Two bytes give ~0.01 pp accuracy vs the target rate.
* cse_workflow.py: new --fail-m PCT (default 0) and --fail-other PCT
(default 0) per-role thresholds. Typical usage:
--fail-m 7 --fail-other 3 --num-retries 1
* run_cse_stack.py: same --fail-m / --fail-other flags replacing the
old --fail-probability float arg.
Actual rates over 10k tasks: m- ≈7.24%, b-/sim/d- ≈3.0%.
The same task name always produces the same failure decision, making
runs reproducible and failure outcomes verifiable offline.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Remove the attempt_no guard so hash-selected tasks fail on every attempt, not just the first. Downstream tasks will cascade to upstream_failed as expected. Update all docstrings and help text accordingly. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Sequential delete_message calls dominated flush latency after a deferred-flush batch: 2048 messages × ~2ms/call ≈ 4s per flush cycle. Replace the for-loop with asyncio.gather + Semaphore(64), matching the existing _push_ready_tasks pattern. Expected flush latency for a 2048-message batch: 32 rounds × ~2ms ≈ 64ms (≈60× faster). Also clarify the EmptyQueue break comment to make it explicit that it falls through to the dirty-flush path (i.e. 'flush on empty poll' is already implemented). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Add a _progress_bar() helper that renders a 50-char Rich Text bar with four colour segments: green = completed red = failed / upstream-failed / cancelled cyan = running dim = pending magenta = skipped (only if > 0) Followed by a dim percentage of terminal tasks (completed + failed + skipped), so the bar converges to 100% as the workflow finishes regardless of how many tasks fail. The bar appears in both 'workflow watch' (live refresh) and 'workflow status' (one-shot), between the panel and the counters line. Also adds TYPE_CHECKING imports (WorkflowStatus, rich.text.Text) so mypy can resolve the annotations without paying the runtime import cost. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Update _layers._progress_bar() to accept failed, running, and skipped as separate kwargs, matching the colour scheme of the main workflow bar: green = completed red = failed / upstream-failed / cancelled cyan = running magenta = skipped (when > 0) dim = pending The percentage now shows terminal tasks (completed + failed + skipped) out of total, so the bar converges to 100% regardless of failure rate. Update the render_layer_table() call site to pass each count separately. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…kflow tests - Test 15: un-parseable completion is deleted and stats.poison_messages increments; coordinator does not crash. - Test 16: a RuntimeError from receive_messages_batch is caught by _main_loop; the coordinator retries on the next iteration. - Test 17: a stale completion arriving after the workflow reaches WorkflowState.COMPLETED is acked as a no-op with no state corruption. - Update module docstring failure-catalogue items 15-17. - Add two new sections to docs/workflow-races.md: 'Unparseable (poison) completion message' and 'Transient queue receive error'. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Coordinator lease: - Add _lease.py with CoordinatorLease (60s sentinel blob lease, 15s renewal) - Integrate into coordinator __aenter__ via AsyncExitStack - Add --break-lease flag to 'workflow coordinator' command - Add standalone 'workflow break-lease' subcommand - Export CoordinatorLeaseError from ai4s.jobq.workflow Documentation improvements (workflow.md, workflow-races.md): - Introduce deferred-flush / in-memory snapshot design explicitly - Document coordinator lease (replaces stale 'no lease' statements) - Add 'Two defense mechanisms' section (ETag CAS + queue redelivery) - Compact 'What can go wrong' into definition-list style - Rewrite 'Task-level retries' to explain coordinator ownership - Add 'Why a separate index?' rationale paragraph - Fix scaling notes: no N-way blob contention (batched flush) - Fix 'Two completions' race: sequential in-memory, not parallel handlers - Fix orphan blob recovery guidance - Cross-link defense mechanisms throughout Other: - Change WorkflowTask.num_retries default from 1 to 0 - Fix test helper to match new default - Add coroutine, misconfigured, subgraphs, unparseable to Vale vocabulary - Disable Google.Spacing rule (conflicts with double-space-after-period style) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- workflow-tutorial.md: rewrite Retries section (coordinator-driven, not worker queue redelivery) - workflows.md: fix default num_retries from 1 to 0 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Reduce per-flush write size by ~6x by separating the immutable workflow
definition (topology, kwargs, queues, dep policies) from the mutable
state (task states, counters, timestamps).
Storage layout:
- {wf_id}.def.bin: written once at submission (never overwritten)
- {wf_id}.state.bin: rewritten per coordinator flush (~5 KB vs 30 KB)
The load() path tries split format first, falls back to legacy single-
blob format transparently. New submissions always use split format.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Hard rename across all source, tests, docs, and examples. The old name is retained in LEGACY_ENV_VARS for a deprecation warning but no longer accepted silently. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Avoids keeping decoded dicts in memory for all tasks. The kwargs property decodes on demand (only needed at dispatch time). Saves ~13% memory for 30k-task workflows with typical kwargs, more for workflows with empty kwargs. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Replace unbounded dict with OrderedDict limited to 128 entries (configurable via cache_max_workflows). Eldest entries evicted on insert; terminal workflows evicted immediately after final flush. Prevents unbounded memory growth when the coordinator processes completions for many concurrent workflows. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
sys.intern() is applied to task names, parent/child references, and queue strings in all three construction paths (from_definition, from_json, from_split_json). Since the same strings repeat across thousands of tasks, CPython deduplicates the underlying storage, saving ~33% memory for large workflows (576 B/task vs 858 B/task). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…--single flags These CLI options never existed in the coordinator command. Removing them allows the stress/chaos tests to actually launch coordinators successfully. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Summary
Adds DAG-based workflow orchestration to jobq. Users define tasks with dependencies, submit them via the CLI or API, and a coordinator process advances the DAG — dispatching ready tasks through the standard JobQ backend (Storage Queues or Service Bus).
What this enables
depends_onrelationships; the coordinator handles scheduling, fan-out, and join semanticsset_output()and downstream tasks read it viaget_upstream_output()— small values inline in Table Storage, large values spilled to blobLocalRunnerexecutes workflows in-process for testing without needing queues or Table Storageworkflow doctorchecks connectivity, finds stuck workflows, and surfaces stale tasksArchitecture
{wf_id}.def.bin, mutable state (task states, counters) in{wf_id}.state.bin. Flushes are ~6x smaller than a full-state rewrite.{prefix}WorkflowsIndexrows for listings, cancel signalling, and sweeps (not source of truth)Scalability
The coordinator is designed for deployments with tens of thousands of concurrent workflows:
asyncio.gatherwith a configurable semaphore instead of sequential iterationupdated_athas not moved since the last sweep are skipped entirely--max-running-workflowslimit queues excess pending workflows and activates them in FIFO order as slots openSafety mechanisms
task_name + attempt_no)Configuration
Not included