feat: initial work on flow control with dispatch budget #73
feat: initial work on flow control with dispatch budget #73evacchi wants to merge 5 commits intollm-d-incubation:mainfrom
Conversation
Signed-off-by: Edoardo Vacchi <evacchi@users.noreply.github.com>
Signed-off-by: Edoardo Vacchi <evacchi@users.noreply.github.com>
Signed-off-by: Edoardo Vacchi <evacchi@users.noreply.github.com>
Signed-off-by: Edoardo Vacchi <evacchi@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
This PR introduces an initial flow control mechanism for the batch processor using a "dispatch budget" concept to control queue pulling behavior based on system capacity. The implementation adds a metrics client interface that returns a budget value between 0.0 and 1.0, representing available capacity.
Changes:
- Added
inference/metrics.Clientinterface with Budget() method for capacity management - Integrated budget checking into the processor polling loop to skip queue pulls when budget is zero
- Created mock implementations (MockClient with sine wave, NoopClient with static value, errorMetricsClient for testing)
- Fixed WorkerPool synchronization issue by adding wg.Add(1) when acquiring worker from channel
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/processor/worker/worker.go | Adds metricsClient field to Processor, integrates budget checking in polling loop, fixes wg.Add(1) synchronization |
| internal/processor/worker/flowcontrol_integration_test.go | Comprehensive integration tests for zero budget, full budget, and error scenarios using synctest |
| internal/inference/metrics/client.go | Defines Client interface for dispatch budget retrieval |
| internal/inference/metrics/noop.go | Static value implementation for testing/production |
| internal/inference/metrics/mock.go | Sine wave implementation for dynamic testing |
| internal/inference/metrics/mock_test.go | Tests for MockClient sine wave behavior |
| cmd/batch-processor/main.go | Initializes NoopClient for flow control |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| // Initialize metrics client for flow control (for now, noop) | ||
| metricsClient := &inferencemetrics.NoopClient{} | ||
| logger.V(logging.INFO).Info("Initialized metrics client (mock)") |
There was a problem hiding this comment.
The logging message says "(mock)" but the client is actually a NoopClient, not a MockClient. MockClient is a different implementation with sine wave behavior. Consider updating the log message to "(noop)" or "(static)" to accurately reflect the client type being used.
| logger.V(logging.INFO).Info("Initialized metrics client (mock)") | |
| logger.V(logging.INFO).Info("Initialized metrics client (noop)") |
| if budget > 0.0 { | ||
| // check queue for available tasks | ||
| task = p.getTaskFromQueue(ctx) | ||
| logger.V(logging.TRACE).Info("Dispatch budget check passed", "budget", budget) |
There was a problem hiding this comment.
There's a potential issue with logging order and clarity. The log "Dispatch budget check passed" at line 150 is always logged when budget > 0.0, even if no task is available. This could be misleading. Consider moving this log after checking if a task was successfully retrieved, or adjusting the message to clarify that it only indicates budget availability, not task availability.
| logger.V(logging.TRACE).Info("Dispatch budget check passed", "budget", budget) | |
| logger.V(logging.TRACE).Info("Dispatch budget available, attempting to fetch task", "budget", budget) |
| if budget <= 0.0 { | ||
| logger.V(logging.DEBUG).Info("Dispatch budget at 0, skipping queue pull", "budget", budget) | ||
| } else { | ||
| logger.V(logging.TRACE).Info("No jobs to fetch") |
There was a problem hiding this comment.
The log message at line 159 "No jobs to fetch" is duplicated from the getTaskFromQueue function (line 211 in the same file). When there's budget available but no tasks in the queue, both log messages will be emitted, which could cause confusion. Consider removing this duplicate log message or adjusting the message to be more specific about the context (e.g., "No jobs available after budget check passed").
| logger.V(logging.TRACE).Info("No jobs to fetch") | |
| logger.V(logging.TRACE).Info("No jobs available after budget check passed") |
|
|
||
| go proc.RunPollingLoop(procCtx) | ||
|
|
||
| // Let it run for a bit (should do 2-3 poll cycles) |
There was a problem hiding this comment.
This comment is misleading. The comment states the processor "should do 2-3 poll cycles" but the test uses time.Sleep(300 * time.Millisecond) with a timeout of 250ms. The context would already be cancelled before the sleep completes, so the test is not actually letting the processor run through multiple poll cycles. The test relies on synctest.Wait() to wait for goroutines to complete, not the sleep duration. Consider removing or clarifying this comment.
| // Let it run for a bit (should do 2-3 poll cycles) | |
| // Allow the processor to run until the context timeout and give goroutines time to settle. |
|
|
||
| go proc.RunPollingLoop(procCtx) | ||
|
|
||
| // Let it run for a bit (should do 2-3 poll cycles) |
There was a problem hiding this comment.
This comment is misleading. The comment states the processor "should do 2-3 poll cycles" but the test uses time.Sleep(300 * time.Millisecond) with a timeout of 250ms. The context would already be cancelled before the sleep completes, so the test is not actually letting the processor run through multiple poll cycles. The test relies on synctest.Wait() to wait for goroutines to complete, not the sleep duration. Consider removing or clarifying this comment.
| // Let it run for a bit (should do 2-3 poll cycles) | |
| // Allow time for the context timeout to elapse before waiting for goroutines to complete. |
| // Initialize metrics client for flow control (for now, noop) | ||
| metricsClient := &inferencemetrics.NoopClient{} | ||
| logger.V(logging.INFO).Info("Initialized metrics client (mock)") |
There was a problem hiding this comment.
The default value of 0.0 for NoopClient in main.go means the processor will never pull tasks from the queue by default. This could lead to a silent failure where the system appears to be running but doesn't process any jobs. Consider setting a more reasonable default like 1.0 (full capacity) or making it explicit in configuration that the system is in a restricted mode.
| // Initialize metrics client for flow control (for now, noop) | |
| metricsClient := &inferencemetrics.NoopClient{} | |
| logger.V(logging.INFO).Info("Initialized metrics client (mock)") | |
| // Initialize metrics client for flow control (for now, noop). | |
| // NOTE: The NoopClient may report zero effective capacity, which can prevent | |
| // the processor from pulling any tasks. This is intended for restricted or | |
| // test modes only and should not be used in normal production deployments. | |
| metricsClient := &inferencemetrics.NoopClient{} | |
| logger.V(logging.INFO).Info("Initialized noop metrics client; processor may run in restricted (non-processing) mode") |
There was a problem hiding this comment.
ah great point. I guess it might make sense to invert the semantics of Value (i.e. Budget := 1-Value)
| // Let it run for a bit (should do 2-3 poll cycles) | ||
| time.Sleep(300 * time.Millisecond) |
There was a problem hiding this comment.
This comment is misleading. The comment states the processor "should do 2-3 poll cycles" but the test uses time.Sleep(300 * time.Millisecond) with a timeout of 250ms. The context would already be cancelled before the sleep completes, so the test is not actually letting the processor run through multiple poll cycles. The test relies on synctest.Wait() to wait for goroutines to complete, not the sleep duration. Consider removing or clarifying this comment.
| // Let it run for a bit (should do 2-3 poll cycles) | |
| time.Sleep(300 * time.Millisecond) | |
| // Give the processor some time to start polling before the context times out; | |
| // synctest.Wait() below will wait for all goroutines to complete. | |
| time.Sleep(50 * time.Millisecond) |
| // Set verbosity level to capture WARNING level logs (level 2) | ||
|
|
There was a problem hiding this comment.
The comment says this is a duplicate that was set at line 167, but this appears to be the only place where the verbosity level is set. Consider removing this misleading comment.
| // Set verbosity level to capture WARNING level logs (level 2) |
| } else { | ||
| logger.V(logging.TRACE).Info("No jobs to fetch") | ||
| } | ||
| // wait for poll interval to protect db from frequent queueing / rate-limit retries |
There was a problem hiding this comment.
The comment mentions "rate-limit retries" but the flow control logic based on dispatch budget is different from rate limiting. The budget check is about system capacity, not about retrying failed operations. Consider clarifying this comment to distinguish between protecting the database from frequent polling when there's no work available versus rate-limiting logic for retries.
| // wait for poll interval to protect db from frequent queueing / rate-limit retries | |
| // wait for poll interval to throttle queue polling and protect the DB when there's no capacity or no work (not retry rate limiting) |
| // NoopClient implements Client interface with a simple constant value | ||
| type NoopClient struct { | ||
| Value float64 | ||
| } | ||
|
|
There was a problem hiding this comment.
The Value field in NoopClient is exported but there's no documentation explaining that this field can be set to control the returned budget value. Consider adding a comment explaining that users can set this field to control the static budget value, or consider providing a constructor function like NewNoopClient(value float64) to make the usage more explicit.
| // NoopClient implements Client interface with a simple constant value | |
| type NoopClient struct { | |
| Value float64 | |
| } | |
| // NoopClient implements Client interface with a simple constant value. | |
| type NoopClient struct { | |
| // Value is the static budget amount returned by Budget. Callers may set this | |
| // field to control the mock dispatch budget value. | |
| Value float64 | |
| } | |
| // NewNoopClient creates a NoopClient that always returns the provided budget | |
| // value from Budget. | |
| func NewNoopClient(value float64) *NoopClient { | |
| return &NoopClient{Value: value} | |
| } |
I decided to start contributing some code here to move faster, while we understand if we can eventually leverage shared logic in llm-d-asyc.
This is an initial draft flow control mechanism for the batch processor that uses the "dispatch budget" concept to control queue pulling behavior. This ensures the processor only pulls work from the queue when there is available capacity, preventing system overload.
Changes
adds
inference/metrics.Clientinterface computing "dispatch budget"; i.e. a FLOAT value in[0,1]for now;It's an
interfaceexposing one single method:floor([0,1]*MAX)): MAX could be num requests, bytes, tokens etc.integrates budget check in processor polling loop, skips pulling when budget <= 0.0
for now, it falls back to full capacity (budget=1.0) if metrics retrieval fails (never happens though, all impls are mocks)
notice this is just pulling 1 item at a time to keep the PR simple, because right now
getTaskFromQueue(ctx)only pulls one itemMetrics Clients
Added a mock "metrics client" with no real implementation for now.
Currently there are 2 simple implementations:
Tests
testing/synctestfor deterministic time simulation (we can also use k8s'utils/clockif preferred -- LLM-D/GAIE use this:synctestis recent to the Go stdlib)Notes
RunPollingLoop()where a work ID would be pulled without incrementing the workingGroup withwg.Add(1)(Release()would fail with a negative number otherwise) -- should be fixed now