Skip to content

feat: per-message worker gating#76

Open
temporaer wants to merge 6 commits into
mainfrom
feat/per-message-gating
Open

feat: per-message worker gating#76
temporaer wants to merge 6 commits into
mainfrom
feat/per-message-gating

Conversation

@temporaer

Copy link
Copy Markdown
Contributor

Adds an optional gate_cmd field to Task. When set, each worker runs the gate command before the task; non-zero exit causes the worker to return the message to the queue (with a short visibility delay) without consuming a retry, so a different worker can pick it up. Gate results are cached per-worker (LRU keyed on sha256(gate_cmd)) so subsequent messages with the same gate are resolved without spawning another subprocess.

Public API additions

  • ai4s.jobq.GateNotPassedError
  • JobQ.push(..., gate_cmd=...)
  • batch_enqueue(..., gate_cmd=...)
  • ai4s-jobq push --gate-cmd
  • Envelope.requeue(visibility_timeout=...)
  • New module ai4s.jobq.gating

Backward compatibility

Tasks without a gate keep schema v1 and stable deterministic IDs. Tasks with a gate use schema v2.

Backend behavior

  • Storage Queue — honors the requested visibility delay.
  • Service Bus REST — logs a one-time warning that the delay degrades to immediate redelivery (no native scheduled-redelivery primitive).

GateNotPassedError is treated by launch_workers like LockLostError: not counted toward --max-consecutive-failures.

Configuration

Tunable via JOBQ_GATE_CACHE_SIZE, JOBQ_GATE_TIMEOUT_S, and JOBQ_GATE_REQUEUE_DELAY_S.

Adds an optional gate_cmd field to Task. When set, each worker runs the
gate command before the task; non-zero exit causes the worker to return
the message to the queue (with a short visibility delay) without
consuming a retry, so a different worker can pick it up. Gate results
are cached per-worker (LRU keyed on sha256(gate_cmd)) so subsequent
messages with the same gate are resolved without spawning another
subprocess.

Public API additions:

* ai4s.jobq.GateNotPassedError
* JobQ.push(..., gate_cmd=...)
* batch_enqueue(..., gate_cmd=...)
* ai4s-jobq push --gate-cmd
* Envelope.requeue(visibility_timeout=...)
* New module ai4s.jobq.gating

Backward compatibility: tasks without a gate keep schema v1 and stable
deterministic IDs. Tasks with a gate use schema v2.

Storage Queue honors the requested visibility delay; the Service Bus
REST backend logs a one-time warning that the delay degrades to
immediate redelivery (no native scheduled-redelivery primitive).

GateNotPassedError is treated by launch_workers like LockLostError:
not counted toward --max-consecutive-failures.

Tunable via JOBQ_GATE_CACHE_SIZE, JOBQ_GATE_TIMEOUT_S, and
JOBQ_GATE_REQUEUE_DELAY_S.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@temporaer temporaer requested a review from a team May 15, 2026 13:51
@temporaer

Copy link
Copy Markdown
Contributor Author

@copilot , tests are failing: make sure they pass. Then:

  1. create & document a subcommand ai4s-jobq verify-installed '{version constraing}' that the user can leverage in the new gating command
  2. when a worker wasn't qualified for let's say 500 times (make it configurable using an envvar), it should exit.

Copilot AI commented May 18, 2026

Copy link
Copy Markdown
Contributor

@copilot , tests are failing: make sure they pass. Then:

  1. create & document a subcommand ai4s-jobq verify-installed '{version constraing}' that the user can leverage in the new gating command
    2...

Implemented in af9d9bc.

  • Added ai4s-jobq verify-installed '<version constraint>' (supports both pkg>=x.y and bare specifier like >=3.12 for ai4s-jobq itself), and documented using it with --gate-cmd in docs/misc/45-gating.md.
  • Added configurable worker exit behavior for repeated gate-not-qualified outcomes via JOBQ_GATE_MAX_NOT_QUALIFIED (default 500).
  • Added/updated tests for both behaviors and verified passing (tests/test_gating.py, tests/test_cli.py).

No UI changes were made, so there is no screenshot to provide.

Warning

Firewall rules blocked me from connecting to one or more addresses (expand for details)

I tried to connect to the following addresses, but was blocked by firewall rules:

  • eastus2-3.in.applicationinsights.azure.com
    • Triggering command: /usr/local/bin/node node ./node_modules/.bin/azurite-queue --skipApiVersionCheck --inMemoryPersistence --queuePort 10001 -j DROP (dns block)
    • Triggering command: /usr/local/bin/node node ./node_modules/.bin/azurite-blob --skipApiVersionCheck --inMemoryPersistence --blobPort 10000 (dns block)
  • westus-0.in.applicationinsights.azure.com
    • Triggering command: /usr/local/bin/node node ./node_modules/.bin/azurite-queue --skipApiVersionCheck --inMemoryPersistence --queuePort 10001 -j DROP (dns block)
    • Triggering command: /usr/local/bin/node node ./node_modules/.bin/azurite-blob --skipApiVersionCheck --inMemoryPersistence --blobPort 10000 (dns block)

If you need me to access, download, or install something from one of these locations, you can either:

@temporaer temporaer left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot please address my comments.

Comment thread ai4s/jobq/orchestration/manager.py Outdated

max_gate_not_qualified = DEFAULT_MAX_NOT_QUALIFIED
raw_max_gate_not_qualified = os.environ.get("JOBQ_GATE_MAX_NOT_QUALIFIED")
if raw_max_gate_not_qualified:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extract this whole validation piece into a helper, see if you can field it in other places. Something like

max_gate_not_qualified = validate_envvar("JOBQ_GATE_MAX_NOT_QUALIFIED", int, DEFAULT_MAX_NOT_QUALIFIED)

I see you already created _env_int in another file.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in c1e21e1 (main changes in d8fb7ec). I extracted env-var parsing/validation into validate_envvar(...) in ai4s/jobq/gating.py and now launch_workers uses it for JOBQ_GATE_MAX_NOT_QUALIFIED exactly as requested. No UI changes were made, so there is no screenshot.

Comment thread ai4s/jobq/gating.py
return sha256(gate_cmd.encode("utf-8")).hexdigest()


class GateCache:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already depend on cachetools, no need to reinvent the wheel!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in c1e21e1 (cache change in d8fb7ec). GateCache now uses cachetools.LRUCache instead of a custom OrderedDict LRU implementation. No UI changes were made, so there is no screenshot.

Comment thread docs/misc/45-gating.md Outdated
## How it works

1. When pushing a task, the producer sets `gate_cmd` to any shell command.
2. When a worker dequeues the message, it computes

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think users would know how a cache works, just mention it in passing.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in c1e21e1 (docs update in d8fb7ec). I simplified the “How it works” section to mention caching briefly without deep cache internals. No UI changes were made, so there is no screenshot.

Copilot AI commented May 20, 2026

Copy link
Copy Markdown
Contributor

@copilot please address my comments.

Addressed in c1e21e1 (with primary implementation in d8fb7ec). I applied all your review comments: refactored env-var validation into a helper, switched gate cache to cachetools, and simplified the cache docs, with tests/lint passing. No UI changes were made, so there is no screenshot.

Warning

Firewall rules blocked me from connecting to one or more addresses (expand for details)

I tried to connect to the following addresses, but was blocked by firewall rules:

  • eastus2-3.in.applicationinsights.azure.com
    • Triggering command: /home/REDACTED/work/_temp/ghcca-node/node/bin/node node ./node_modules/.bin/azurite-queue --skipApiVersionCheck --inMemoryPersistence --queuePort 10001 (dns block)
    • Triggering command: /home/REDACTED/work/_temp/ghcca-node/node/bin/node node ./node_modules/.bin/azurite-blob --skipApiVersionCheck --inMemoryPersistence --blobPort 10000 (dns block)
  • westus-0.in.applicationinsights.azure.com
    • Triggering command: /home/REDACTED/work/_temp/ghcca-node/node/bin/node node ./node_modules/.bin/azurite-queue --skipApiVersionCheck --inMemoryPersistence --queuePort 10001 (dns block)
    • Triggering command: /home/REDACTED/work/_temp/ghcca-node/node/bin/node node ./node_modules/.bin/azurite-blob --skipApiVersionCheck --inMemoryPersistence --blobPort 10000 (dns block)

If you need me to access, download, or install something from one of these locations, you can either:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants