Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions coordinator/internal/api/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,16 @@ const (
// backend crashed, model not loaded after idle shutdown).
maxDispatchAttempts = 3

// firstChunkTimeout is how long to wait for the first chunk from a provider
// before considering the attempt failed and retrying.
firstChunkTimeout = 10 * time.Second
// defaultFirstResponseTimeout is the baseline TTFT deadline before the
// coordinator retries on another provider. Keep this short: retries are
// invisible until the first chunk, so this is our Valorant-style timing
// authority guardrail for fast TTFT.
defaultFirstResponseTimeout = 4 * time.Second

// maxFirstResponseTimeout caps adaptive first-response deadlines for
// intentionally queued/backlogged routes. A route can earn extra time from
// its measured queue/backlog/network cost, but never an unbounded wait.
maxFirstResponseTimeout = 15 * time.Second

// cancelWriteTimeout bounds how long a cancel write to the provider can
// block. Using context.Background() unbounded here risks hanging the HTTP
Expand All @@ -65,6 +72,18 @@ const (

var thinkBlockPattern = regexp.MustCompile(`(?is)<think>(.*?)</think>\s*`)

func firstResponseTimeout(decision registry.RoutingDecision) time.Duration {
costMs := decision.StateMs + decision.QueueMs + decision.PendingMs + decision.BacklogMs + decision.NetworkMs
if costMs <= 0 {
return defaultFirstResponseTimeout
}
adaptive := defaultFirstResponseTimeout + time.Duration(costMs*float64(time.Millisecond))
if adaptive > maxFirstResponseTimeout {
return maxFirstResponseTimeout
}
return adaptive
}

// sendProviderCancel sends a Cancel message for the given request to the
// provider with a bounded timeout so a half-dead WebSocket doesn't hang the
// caller. Errors are logged at debug level because a disconnect race is the
Expand Down Expand Up @@ -786,7 +805,7 @@ func (s *Server) handleChatCompletions(w http.ResponseWriter, r *http.Request) {

// Wait for an accepted signal, first chunk, or error before committing.
// No HTTP response has been written yet, so retries are invisible.
timer := time.NewTimer(firstChunkTimeout)
timer := time.NewTimer(firstResponseTimeout(decision))
accepted := false
select {
case <-pr.AcceptedCh:
Expand Down Expand Up @@ -888,6 +907,7 @@ func (s *Server) handleChatCompletions(w http.ResponseWriter, r *http.Request) {
}

// Provider accepted or sent first chunk — commit to this provider.
s.dispatchStandbyPreloads(r.Context(), model, provider)
// If only accepted (no chunk yet), wait for the first chunk with
// the full inference timeout since the backend may be reloading.
if accepted && !committed {
Expand Down
27 changes: 27 additions & 0 deletions coordinator/internal/api/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,33 @@ func testServer(t *testing.T) (*Server, *store.MemoryStore) {
return srv, st
}

func TestFirstResponseTimeoutUsesFastDefaultForHealthyRoutes(t *testing.T) {
got := firstResponseTimeout(registry.RoutingDecision{})
if got != defaultFirstResponseTimeout {
t.Fatalf("firstResponseTimeout(zero)=%s, want %s", got, defaultFirstResponseTimeout)
}
}

func TestFirstResponseTimeoutAccountsForQueueAndNetworkButCaps(t *testing.T) {
decision := registry.RoutingDecision{
StateMs: 500,
QueueMs: 3_000,
PendingMs: 750,
BacklogMs: 2_000,
NetworkMs: 1_000,
}
got := firstResponseTimeout(decision)
if got <= defaultFirstResponseTimeout {
t.Fatalf("firstResponseTimeout(degraded)=%s, want > %s", got, defaultFirstResponseTimeout)
}

decision.BacklogMs = 100_000
got = firstResponseTimeout(decision)
if got != maxFirstResponseTimeout {
t.Fatalf("firstResponseTimeout(huge backlog)=%s, want cap %s", got, maxFirstResponseTimeout)
}
}

func TestHealthEndpoint(t *testing.T) {
srv, _ := testServer(t)

Expand Down
90 changes: 90 additions & 0 deletions coordinator/internal/api/prewarm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package api

import (
"context"
"encoding/json"
"time"

"github.com/eigeninference/coordinator/internal/protocol"
"github.com/eigeninference/coordinator/internal/registry"
"nhooyr.io/websocket"
)

const (
standbyPrewarmTimeout = 2 * time.Second
maxStandbyPrewarmHints = 2
)

func providerIsReadyForModel(p *registry.Provider, model string) bool {
if p == nil || model == "" {
return false
}
p.Mu().Lock()
defer p.Mu().Unlock()
if p.CurrentModel == model {
return true
}
for _, warm := range p.WarmModels {
if warm == model {
return true
}
}
return false
}

func (s *Server) dispatchStandbyPreloads(ctx context.Context, model string, primary *registry.Provider) {
if s == nil || s.registry == nil || model == "" {
return
}
writer := s.providerPrewarmWriter
if writer == nil {
writer = s.writeProviderPrewarm
}
primaryID := ""
if primary != nil {
primaryID = primary.ID
}
count := 0
s.registry.ForEachProvider(func(p *registry.Provider) {
if count >= maxStandbyPrewarmHints || p == nil || p.ID == primaryID {
return
}
p.Mu().Lock()
eligible := p.Backend == registry.BackendMLXSwift &&
p.Status != registry.StatusOffline && p.Status != registry.StatusUntrusted &&
p.RuntimeVerified && providerHasModelLocked(p, model)
p.Mu().Unlock()
if !eligible || providerIsReadyForModel(p, model) {
return
}
count++
go func(provider *registry.Provider) {
prewarmCtx, cancel := context.WithTimeout(ctx, standbyPrewarmTimeout)
defer cancel()
msg := protocol.LoadModelMessage{Type: protocol.TypeLoadModel, ModelID: model}
if err := writer(prewarmCtx, provider, msg); err != nil && s.logger != nil {
s.logger.Debug("failed to dispatch standby preload", "provider_id", provider.ID, "model", model, "error", err)
}
}(p)
})
}

func providerHasModelLocked(p *registry.Provider, model string) bool {
for _, m := range p.Models {
if m.ID == model {
return true
}
}
return false
}

func (s *Server) writeProviderPrewarm(ctx context.Context, p *registry.Provider, msg protocol.LoadModelMessage) error {
if p == nil || p.Conn == nil {
return nil
}
data, err := json.Marshal(msg)
if err != nil {
return err
}
return p.Conn.Write(ctx, websocket.MessageText, data)
}
96 changes: 96 additions & 0 deletions coordinator/internal/api/prewarm_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package api

import (
"context"
"log/slog"
"os"
"testing"
"time"

"github.com/eigeninference/coordinator/internal/protocol"
"github.com/eigeninference/coordinator/internal/registry"
)

func TestProviderIsReadyForModel(t *testing.T) {
p := &registry.Provider{CurrentModel: "m1", WarmModels: []string{"m2"}}
if !providerIsReadyForModel(p, "m1") {
t.Fatal("current model should be ready")
}
if !providerIsReadyForModel(p, "m2") {
t.Fatal("warm model should be ready")
}
if providerIsReadyForModel(p, "m3") {
t.Fatal("unknown model should not be ready")
}
}

func TestDispatchStandbyPreloadsColdSwiftProviders(t *testing.T) {
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
reg := registry.New(logger)
model := "standby-model"
primary := &registry.Provider{
ID: "primary",
Backend: registry.BackendMLXSwift,
Models: []protocol.ModelInfo{{ID: model}},
WarmModels: []string{model},
Status: registry.StatusServing,
TrustLevel: registry.TrustHardware,
RuntimeVerified: true,
LastHeartbeat: time.Now(),
}
cold := &registry.Provider{
ID: "cold",
Backend: registry.BackendMLXSwift,
Models: []protocol.ModelInfo{{ID: model}},
Status: registry.StatusOnline,
TrustLevel: registry.TrustHardware,
RuntimeVerified: true,
LastHeartbeat: time.Now(),
}

primaryMsg := protocol.RegisterMessage{
Type: protocol.TypeRegister,
Models: []protocol.ModelInfo{{ID: model}},
Backend: registry.BackendMLXSwift,
EncryptedResponseChunks: true,
PublicKey: "pub-primary",
PrivacyCapabilities: testPrivacyCaps(),
}
coldMsg := primaryMsg
coldMsg.PublicKey = "pub-cold"
primary = reg.Register("primary", nil, &primaryMsg)
cold = reg.Register("cold", nil, &coldMsg)
primary.WarmModels = []string{model}
primary.RuntimeVerified = true
cold.RuntimeVerified = true
reg.SetTrustLevel(primary.ID, registry.TrustHardware)
reg.SetTrustLevel(cold.ID, registry.TrustHardware)
reg.RecordChallengeSuccess(primary.ID)
reg.RecordChallengeSuccess(cold.ID)

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
sent := make(chan protocol.LoadModelMessage, 1)
server := &Server{registry: reg, logger: logger, providerPrewarmWriter: func(ctx context.Context, p *registry.Provider, msg protocol.LoadModelMessage) error {
sent <- msg
return nil
}}

server.dispatchStandbyPreloads(ctx, model, primary)
select {
case msg := <-sent:
if msg.Type != protocol.TypeLoadModel || msg.ModelID != model {
t.Fatalf("unexpected preload message: %+v", msg)
}
case <-time.After(time.Second):
t.Fatal("expected standby preload message")
}

cold.WarmModels = []string{model}
server.dispatchStandbyPreloads(ctx, model, primary)
select {
case msg := <-sent:
t.Fatalf("unexpected preload for already-warm provider: %+v", msg)
default:
}
}
23 changes: 14 additions & 9 deletions coordinator/internal/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ type Server struct {
// and used by internal counters/histograms. Never nil.
metrics *Metrics

// providerPrewarmWriter sends coordinator-driven preload hints. Tests can
// replace it to assert standby behavior without a real WebSocket.
providerPrewarmWriter func(context.Context, *registry.Provider, protocol.LoadModelMessage) error

// telemetryLimiter throttles telemetry ingestion per submitter.
telemetryLimiter *telemetryLimiter

Expand Down Expand Up @@ -230,15 +234,16 @@ func NewServer(reg *registry.Registry, st store.Store, logger *slog.Logger) *Ser
reg.SetStore(st)

s := &Server{
registry: reg,
store: st,
ledger: payments.NewLedger(st),
logger: logger,
mux: http.NewServeMux(),
knownRuntimeManifest: &RuntimeManifest{},
metrics: NewMetrics(),
telemetryLimiter: newTelemetryLimiter(),
readCache: newTTLCache(),
registry: reg,
store: st,
ledger: payments.NewLedger(st),
logger: logger,
mux: http.NewServeMux(),
knownRuntimeManifest: &RuntimeManifest{},
metrics: NewMetrics(),
providerPrewarmWriter: nil,
telemetryLimiter: newTelemetryLimiter(),
readCache: newTTLCache(),
}
s.registerDefaultGauges()
s.routes()
Expand Down
21 changes: 14 additions & 7 deletions coordinator/internal/api/telemetry_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,20 @@ var telemetryFieldAllowlist = map[string]struct{}{
"error": {},
"target": {},
// Provider / backend
"model": {},
"backend": {},
"exit_code": {},
"signal": {},
"hardware_chip": {},
"memory_gb": {},
"macos_version": {},
"model": {},
"backend": {},
"queue_ms": {},
"admit_ms": {},
"prompt_tokens": {},
"completion_tokens": {},
"ttft_ms": {},
"total_ms": {},
"active_count": {},
"exit_code": {},
"signal": {},
"hardware_chip": {},
"memory_gb": {},
"macos_version": {},
// Coordinator
"handler": {},
"provider_id": {},
Expand Down
12 changes: 12 additions & 0 deletions coordinator/internal/protocol/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ type HeartbeatMessage struct {
Stats HeartbeatStats `json:"stats"`
WarmModels []string `json:"warm_models,omitempty"` // models currently loaded in memory
SystemMetrics SystemMetrics `json:"system_metrics"` // live resource utilization
NetworkQuality NetworkQuality `json:"network_quality"` // provider-observed coordinator transport quality
BackendCapacity *BackendCapacity `json:"backend_capacity,omitempty"` // live backend capacity (nil for old providers)
}

Expand Down Expand Up @@ -166,6 +167,17 @@ type SystemMetrics struct {
ThermalState string `json:"thermal_state"` // nominal, fair, serious, critical
}

// NetworkQuality contains provider-observed coordinator WebSocket transport
// health. All fields default to zero for backward compatibility with older
// providers, and zero means "no measured penalty" rather than "bad".
type NetworkQuality struct {
RTTMs float64 `json:"rtt_ms"` // latest WebSocket ping/pong round-trip time
JitterMs float64 `json:"jitter_ms"` // absolute delta between latest and previous RTT
ReconnectCount int64 `json:"reconnect_count"` // reconnect attempts since provider process start
WebSocketWriteFailures int64 `json:"websocket_write_failures"` // failed WebSocket writes since provider process start
LastWriteLatencyMs float64 `json:"last_write_latency_ms"` // duration of most recent successful WebSocket write
}

// HeartbeatStats contains counters reported in heartbeats.
type HeartbeatStats struct {
RequestsServed int64 `json:"requests_served"`
Expand Down
Loading