diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7b7be1df..f5a9d8ea 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -4,7 +4,6 @@ on: push: branches: [master, main] pull_request: - branches: [master, main] jobs: test-coordinator: diff --git a/coordinator/Dockerfile b/coordinator/Dockerfile index 56af7cdd..31e05bbc 100644 --- a/coordinator/Dockerfile +++ b/coordinator/Dockerfile @@ -3,13 +3,14 @@ WORKDIR /app COPY go.mod go.sum ./ RUN go mod download COPY . . -RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -o coordinator ./coordinator/cmd/coordinator +RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -o /app/coordinator-bin ./coordinator/cmd/coordinator # Pre-built base with MicroMDM v1.13.1, step-ca v0.28.3, step CLI v0.28.3. # Rebuild base only when tool versions change: docker build -f Dockerfile.base -t eigengajesh/d-inference-base:latest . && docker push FROM eigengajesh/d-inference-base:v1-amd64 -COPY --from=builder /app/coordinator /usr/local/bin/coordinator +COPY --from=builder /app/coordinator-bin /usr/local/bin/coordinator +RUN chmod +x /usr/local/bin/coordinator RUN mkdir -p /var/www/html/dl diff --git a/coordinator/api/cache.go b/coordinator/api/cache.go index 4038ee42..70a1eb48 100644 --- a/coordinator/api/cache.go +++ b/coordinator/api/cache.go @@ -69,8 +69,8 @@ func (c *ttlCache) PurgeExpired() { // writeCachedJSON writes pre-serialized JSON bytes with the standard // Content-Type header. Used on cache hit to skip json.Marshal. -func writeCachedJSON(w http.ResponseWriter, status int, body []byte) { +func writeCachedJSON(w http.ResponseWriter, body []byte) { w.Header().Set("Content-Type", "application/json") - w.WriteHeader(status) + w.WriteHeader(http.StatusOK) _, _ = w.Write(body) } diff --git a/coordinator/api/consumer.go b/coordinator/api/consumer.go index 817ed8e8..c96a7d16 100644 --- a/coordinator/api/consumer.go +++ b/coordinator/api/consumer.go @@ -595,11 +595,14 @@ func (s *Server) handleChatCompletions(w http.ResponseWriter, r *http.Request) { if s.billing != nil { consumerKey := consumerKeyFromContext(r.Context()) reservedMicroUSD = s.reservationCost(model, estimatedPromptTokens, requestedMaxTokens) + start := time.Now() if err := s.ledger.Charge(consumerKey, reservedMicroUSD, "reserve:"+consumerKey); err != nil { writeJSON(w, http.StatusPaymentRequired, errorResponse("insufficient_funds", "your balance is too low for this request — add funds at /billing or lower max_tokens")) return } + s.ddHistogram("billing.reserved_micro_usd", float64(reservedMicroUSD), []string{"model:" + model}) + s.ddHistogram("store.debit.latency_ms", float64(time.Since(start).Milliseconds()), []string{"op:reserve"}) } timing.ReservedAt = time.Now() @@ -607,7 +610,10 @@ func (s *Server) handleChatCompletions(w http.ResponseWriter, r *http.Request) { refundReservation := func() { if reservedMicroUSD > 0 { consumerKey := consumerKeyFromContext(r.Context()) + start := time.Now() _ = s.store.Credit(consumerKey, reservedMicroUSD, store.LedgerRefund, "reservation_refund") + s.ddIncr("billing.reservation_refunds", []string{"model:" + model}) + s.ddHistogram("store.credit.latency_ms", float64(time.Since(start).Milliseconds()), []string{"op:reservation_refund"}) } } @@ -678,7 +684,7 @@ func (s *Server) handleChatCompletions(w http.ResponseWriter, r *http.Request) { // "fleet over-subscribed for this model size". outcome = "over_capacity" } - s.ddIncr("routing.decisions", []string{"model:" + model, "outcome:" + outcome}) + s.ddIncr("routing.decisions", []string{"model:" + model, "model_type:" + s.registry.ModelType(model), "outcome:" + outcome}) break } // No idle provider — try queueing. @@ -690,12 +696,12 @@ func (s *Server) handleChatCompletions(w http.ResponseWriter, r *http.Request) { } pr.Timing.QueuedAt = time.Now() if err := s.registry.Queue().Enqueue(queuedReq); err != nil { - s.ddIncr("routing.decisions", []string{"model:" + model, "outcome:over_capacity"}) + s.ddIncr("routing.decisions", []string{"model:" + model, "model_type:" + s.registry.ModelType(model), "outcome:over_capacity"}) refundReservation() writeJSON(w, http.StatusServiceUnavailable, errorResponse("model_not_available", fmt.Sprintf("no hardware-trusted provider available for model %q and queue is full", model))) return } - s.ddIncr("routing.decisions", []string{"model:" + model, "outcome:queued"}) + s.ddIncr("routing.decisions", []string{"model:" + model, "model_type:" + s.registry.ModelType(model), "outcome:queued"}) s.logger.Info("request queued, waiting for provider", "model", model, @@ -710,7 +716,7 @@ func (s *Server) handleChatCompletions(w http.ResponseWriter, r *http.Request) { return } refundReservation() - s.ddIncr("request_queue.timeout", []string{"model:" + model}) + s.ddIncr("request_queue.timeout", []string{"model:" + model, "model_type:" + s.registry.ModelType(model)}) writeJSON(w, http.StatusServiceUnavailable, errorResponse("model_not_available", fmt.Sprintf("no hardware-trusted provider became available for model %q (queue timeout)", model))) return } @@ -719,7 +725,7 @@ func (s *Server) handleChatCompletions(w http.ResponseWriter, r *http.Request) { timing.RoutedAt = time.Now() s.ddIncr("routing.decisions", []string{"model:" + model, "outcome:selected"}) s.ddIncr("routing.provider_selected", []string{"provider_id:" + provider.ID, "model:" + model}) - s.ddHistogram("routing.cost_ms", decision.CostMs, []string{"model:" + model}) + s.ddHistogram("routing.cost_ms", decision.CostMs, []string{"model:" + model, "provider_id:" + provider.ID}) if decision.EffectiveTPS > 0 { s.ddGauge("routing.effective_decode_tps", decision.EffectiveTPS, []string{"provider_id:" + provider.ID}) } @@ -839,7 +845,7 @@ func (s *Server) handleChatCompletions(w http.ResponseWriter, r *http.Request) { "attempt", attempt+1, "error", errMsg.Error, ) - s.emitRequest(r.Context(), protocol.SeverityWarn, protocol.KindInferenceError, requestID, + s.emitRequest(r.Context(), protocol.SeverityWarn, requestID, "provider failed, retrying", map[string]any{ "provider_id": provider.ID, @@ -875,7 +881,7 @@ func (s *Server) handleChatCompletions(w http.ResponseWriter, r *http.Request) { "provider_id", provider.ID, "attempt", attempt+1, ) - s.emitRequest(r.Context(), protocol.SeverityWarn, protocol.KindInferenceError, requestID, + s.emitRequest(r.Context(), protocol.SeverityWarn, requestID, "provider first-chunk timeout", map[string]any{ "provider_id": provider.ID, @@ -924,7 +930,7 @@ func (s *Server) handleChatCompletions(w http.ResponseWriter, r *http.Request) { "attempt", attempt+1, "error", errMsg.Error, ) - s.emitRequest(r.Context(), protocol.SeverityWarn, protocol.KindInferenceError, requestID, + s.emitRequest(r.Context(), protocol.SeverityWarn, requestID, "provider failed after accepting request, retrying", map[string]any{ "provider_id": provider.ID, @@ -956,7 +962,7 @@ func (s *Server) handleChatCompletions(w http.ResponseWriter, r *http.Request) { "attempt", attempt+1, "error", errMsg.Error, ) - s.emitRequest(r.Context(), protocol.SeverityWarn, protocol.KindInferenceError, requestID, + s.emitRequest(r.Context(), protocol.SeverityWarn, requestID, "provider failed after accepting request, retrying", map[string]any{ "provider_id": provider.ID, @@ -983,7 +989,7 @@ func (s *Server) handleChatCompletions(w http.ResponseWriter, r *http.Request) { "provider_id", provider.ID, "attempt", attempt+1, ) - s.emitRequest(r.Context(), protocol.SeverityWarn, protocol.KindInferenceError, requestID, + s.emitRequest(r.Context(), protocol.SeverityWarn, requestID, "provider accepted timeout", map[string]any{ "provider_id": provider.ID, @@ -1015,7 +1021,7 @@ func (s *Server) handleChatCompletions(w http.ResponseWriter, r *http.Request) { if statusCode == 0 { statusCode = http.StatusServiceUnavailable } - s.emitRequest(r.Context(), protocol.SeverityError, protocol.KindInferenceError, requestID, + s.emitRequest(r.Context(), protocol.SeverityError, requestID, fmt.Sprintf("inference failed after %d attempt(s)", maxDispatchAttempts), map[string]any{ "reason": "dispatch_exhausted", @@ -2223,7 +2229,7 @@ func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { func (s *Server) handleVersion(w http.ResponseWriter, r *http.Request) { const cacheKey = "api_version:v1" if cached, ok := s.readCache.Get(cacheKey); ok { - writeCachedJSON(w, http.StatusOK, cached) + writeCachedJSON(w, cached) return } @@ -2258,7 +2264,7 @@ func (s *Server) handleVersion(w http.ResponseWriter, r *http.Request) { return } s.readCache.Set(cacheKey, body, time.Minute) - writeCachedJSON(w, http.StatusOK, body) + writeCachedJSON(w, body) } // --- payment handlers --- @@ -2457,16 +2463,21 @@ func (s *Server) handleGenericInference(w http.ResponseWriter, r *http.Request, var reservedMicroUSD int64 if s.billing != nil { reservedMicroUSD = s.reservationCost(model, estimatedPromptTokens, requestedMaxTokens) + start := time.Now() if err := s.ledger.Charge(consumerKey, reservedMicroUSD, "reserve:"+consumerKey); err != nil { writeJSON(w, http.StatusPaymentRequired, errorResponse("insufficient_funds", "your balance is too low for this request — add funds at /billing or lower max_tokens")) return } + s.ddHistogram("billing.reserved_micro_usd", float64(reservedMicroUSD), []string{"model:" + model}) + s.ddHistogram("store.debit.latency_ms", float64(time.Since(start).Milliseconds()), []string{"op:reserve"}) } - // Refund the reservation on any early failure before dispatch. refundReservation := func() { if reservedMicroUSD > 0 { + start := time.Now() _ = s.store.Credit(consumerKey, reservedMicroUSD, store.LedgerRefund, "reservation_refund") + s.ddIncr("billing.reservation_refunds", []string{"model:" + model}) + s.ddHistogram("store.credit.latency_ms", float64(time.Since(start).Milliseconds()), []string{"op:reservation_refund"}) } } @@ -2494,12 +2505,12 @@ func (s *Server) handleGenericInference(w http.ResponseWriter, r *http.Request, } if err := s.registry.Queue().Enqueue(queuedReq); err != nil { refundReservation() - s.ddIncr("routing.decisions", []string{"model:" + model, "outcome:over_capacity"}) + s.ddIncr("routing.decisions", []string{"model:" + model, "model_type:" + s.registry.ModelType(model), "outcome:over_capacity"}) writeJSON(w, http.StatusServiceUnavailable, errorResponse("model_not_available", fmt.Sprintf("no provider available for model %q", model))) return } - s.ddIncr("routing.decisions", []string{"model:" + model, "outcome:queued"}) + s.ddIncr("routing.decisions", []string{"model:" + model, "model_type:" + s.registry.ModelType(model), "outcome:queued"}) provider, err = s.registry.Queue().WaitForProviderContext(r.Context(), queuedReq) if err != nil { if errors.Is(err, context.Canceled) { @@ -2513,9 +2524,9 @@ func (s *Server) handleGenericInference(w http.ResponseWriter, r *http.Request, } decision = queuedReq.Decision } - s.ddIncr("routing.decisions", []string{"model:" + model, "outcome:selected"}) + s.ddIncr("routing.decisions", []string{"model:" + model, "model_type:" + s.registry.ModelType(model), "outcome:selected"}) s.ddIncr("routing.provider_selected", []string{"provider_id:" + provider.ID, "model:" + model}) - s.ddHistogram("routing.cost_ms", decision.CostMs, []string{"model:" + model}) + s.ddHistogram("routing.cost_ms", decision.CostMs, []string{"model:" + model, "provider_id:" + provider.ID}) if decision.EffectiveTPS > 0 { s.ddGauge("routing.effective_decode_tps", decision.EffectiveTPS, []string{"provider_id:" + provider.ID}) } diff --git a/coordinator/api/edge_case_test.go b/coordinator/api/edge_case_test.go index b762bf09..d0bfee0e 100644 --- a/coordinator/api/edge_case_test.go +++ b/coordinator/api/edge_case_test.go @@ -7,9 +7,14 @@ package api // (no real backends needed) and run in CI. import ( + "archive/tar" + "bytes" + "compress/gzip" "context" "crypto/rand" + "crypto/sha256" "encoding/base64" + "encoding/hex" "encoding/json" "fmt" "io" @@ -769,9 +774,18 @@ func TestEdge_ReleaseRegisterAndRetrieve(t *testing.T) { srv, st := testServer(t) srv.SetReleaseKey("release-key") - // Register a release - body := fmt.Sprintf(`{"version":"1.0.0","platform":"macos-arm64","backend":"mlx-swift","binary_hash":%q,"bundle_hash":%q,"metallib_hash":%q,"url":"http://example.com/bundle.tar.gz","changelog":"First release"}`, - strings.Repeat("a", 64), strings.Repeat("b", 64), strings.Repeat("c", 64)) + bundle, binaryHash, bundleHash := buildReleaseBundleForTest(t, []byte("provider-binary")) + cdn := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/releases/v1.0.0/eigeninference-bundle-macos-arm64.tar.gz" { + http.NotFound(w, r) + return + } + w.Write(bundle) + })) + defer cdn.Close() + srv.SetR2CDNURL(cdn.URL + "/") + + body := fmt.Sprintf(`{"version":"1.0.0","platform":"macos-arm64","backend":"mlx-swift","binary_hash":%q,"bundle_hash":%q,"metallib_hash":%q,"url":%q,"changelog":"First release"}`, binaryHash, bundleHash, strings.Repeat("c", 64), cdn.URL+"/releases/v1.0.0/eigeninference-bundle-macos-arm64.tar.gz") req := httptest.NewRequest(http.MethodPost, "/v1/releases", strings.NewReader(body)) req.Header.Set("Authorization", "Bearer release-key") w := httptest.NewRecorder() @@ -803,6 +817,351 @@ func TestEdge_ReleaseRegisterAndRetrieve(t *testing.T) { } } +func TestEdge_ReleaseRegisterRejectsInvalidHashMetadata(t *testing.T) { + srv, _ := testServer(t) + srv.SetReleaseKey("release-key") + + body := `{"version":"1.0.0","platform":"macos-arm64","binary_hash":"abc123","bundle_hash":"def456","url":"http://example.com/bundle.tar.gz"}` + req := httptest.NewRequest(http.MethodPost, "/v1/releases", strings.NewReader(body)) + req.Header.Set("Authorization", "Bearer release-key") + w := httptest.NewRecorder() + srv.Handler().ServeHTTP(w, req) + + if w.Code != http.StatusBadRequest { + t.Fatalf("release register with invalid hashes: status = %d, want 400, body = %s", w.Code, w.Body.String()) + } +} + +func TestEdge_ReleaseRegisterRejectsStoreOnlyFields(t *testing.T) { + srv, _ := testServer(t) + srv.SetReleaseKey("release-key") + + binaryHash := strings.Repeat("a", 64) + bundleHash := strings.Repeat("b", 64) + body := fmt.Sprintf(`{"version":"1.0.0","platform":"macos-arm64","binary_hash":%q,"bundle_hash":%q,"url":"https://r2.example.com/releases/v1.0.0/eigeninference-bundle-macos-arm64.tar.gz","active":true,"created_at":"2099-01-01T00:00:00Z"}`, binaryHash, bundleHash) + req := httptest.NewRequest(http.MethodPost, "/v1/releases", strings.NewReader(body)) + req.Header.Set("Authorization", "Bearer release-key") + w := httptest.NewRecorder() + srv.Handler().ServeHTTP(w, req) + + if w.Code != http.StatusBadRequest { + t.Fatalf("release register with store-only fields: status = %d, want 400, body = %s", w.Code, w.Body.String()) + } +} + +func TestEdge_ReleaseRegisterRejectsOffOriginURLWhenR2Configured(t *testing.T) { + srv, _ := testServer(t) + srv.SetReleaseKey("release-key") + srv.SetR2CDNURL("https://r2.example.com") + + binaryHash := strings.Repeat("a", 64) + bundleHash := strings.Repeat("b", 64) + body := fmt.Sprintf(`{"version":"1.0.0","platform":"macos-arm64","binary_hash":%q,"bundle_hash":%q,"url":"https://evil.example.com/releases/v1.0.0/eigeninference-bundle-macos-arm64.tar.gz"}`, binaryHash, bundleHash) + req := httptest.NewRequest(http.MethodPost, "/v1/releases", strings.NewReader(body)) + req.Header.Set("Authorization", "Bearer release-key") + w := httptest.NewRecorder() + srv.Handler().ServeHTTP(w, req) + + if w.Code != http.StatusBadRequest { + t.Fatalf("release register with off-origin URL: status = %d, want 400, body = %s", w.Code, w.Body.String()) + } +} + +func TestEdge_ReleaseRegisterRejectsHTTPArtifactOrigin(t *testing.T) { + srv, _ := testServer(t) + srv.SetReleaseKey("release-key") + srv.SetR2CDNURL("http://r2.example.com") + + binaryHash := strings.Repeat("a", 64) + bundleHash := strings.Repeat("b", 64) + body := fmt.Sprintf(`{"version":"1.0.0","platform":"macos-arm64","binary_hash":%q,"bundle_hash":%q,"url":"http://r2.example.com/releases/v1.0.0/eigeninference-bundle-macos-arm64.tar.gz"}`, binaryHash, bundleHash) + req := httptest.NewRequest(http.MethodPost, "/v1/releases", strings.NewReader(body)) + req.Header.Set("Authorization", "Bearer release-key") + w := httptest.NewRecorder() + srv.Handler().ServeHTTP(w, req) + + if w.Code != http.StatusBadRequest { + t.Fatalf("release register with http artifact origin: status = %d, want 400, body = %s", w.Code, w.Body.String()) + } +} + +func TestEdge_ReleaseRegisterRejectsCredentialedArtifactURL(t *testing.T) { + srv, _ := testServer(t) + srv.SetReleaseKey("release-key") + srv.SetR2CDNURL("https://r2.example.com") + + binaryHash := strings.Repeat("a", 64) + bundleHash := strings.Repeat("b", 64) + body := fmt.Sprintf(`{"version":"1.0.0","platform":"macos-arm64","binary_hash":%q,"bundle_hash":%q,"url":"https://user:pass@r2.example.com/releases/v1.0.0/eigeninference-bundle-macos-arm64.tar.gz"}`, binaryHash, bundleHash) + req := httptest.NewRequest(http.MethodPost, "/v1/releases", strings.NewReader(body)) + req.Header.Set("Authorization", "Bearer release-key") + w := httptest.NewRecorder() + srv.Handler().ServeHTTP(w, req) + + if w.Code != http.StatusBadRequest { + t.Fatalf("release register with credentialed artifact URL: status = %d, want 400, body = %s", w.Code, w.Body.String()) + } +} + +func TestEdge_ReleaseRegisterVerifiesBundleArtifact(t *testing.T) { + srv, st := testServer(t) + srv.SetReleaseKey("release-key") + + bundle, binaryHash, bundleHash := buildReleaseBundleForTest(t, []byte("provider-binary")) + cdn := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/releases/v1.0.0/eigeninference-bundle-macos-arm64.tar.gz" { + http.NotFound(w, r) + return + } + w.Write(bundle) + })) + defer cdn.Close() + srv.SetR2CDNURL(cdn.URL) + + body := fmt.Sprintf(`{"version":"1.0.0","platform":"macos-arm64","binary_hash":%q,"bundle_hash":%q,"url":%q}`, binaryHash, bundleHash, cdn.URL+"/releases/v1.0.0/eigeninference-bundle-macos-arm64.tar.gz") + req := httptest.NewRequest(http.MethodPost, "/v1/releases", strings.NewReader(body)) + req.Header.Set("Authorization", "Bearer release-key") + w := httptest.NewRecorder() + srv.Handler().ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("release register with verified artifact: status = %d, want 200, body = %s", w.Code, w.Body.String()) + } + releases := st.ListReleases() + if len(releases) != 1 || releases[0].BinaryHash != binaryHash { + t.Fatalf("release was not stored with verified binary hash: %+v", releases) + } +} + +func TestEdge_ReleaseRegisterAcceptsLegacyRegularBundleEntry(t *testing.T) { + srv, st := testServer(t) + srv.SetReleaseKey("release-key") + + bundle, binaryHash, bundleHash := buildReleaseBundleWithEntryForTest(t, "bin/darkbloom", tar.TypeRegA, []byte("provider-binary"), "") + cdn := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/releases/v1.0.0/eigeninference-bundle-macos-arm64.tar.gz" { + http.NotFound(w, r) + return + } + w.Write(bundle) + })) + defer cdn.Close() + srv.SetR2CDNURL(cdn.URL) + + body := fmt.Sprintf(`{"version":"1.0.0","platform":"macos-arm64","binary_hash":%q,"bundle_hash":%q,"url":%q}`, binaryHash, bundleHash, cdn.URL+"/releases/v1.0.0/eigeninference-bundle-macos-arm64.tar.gz") + req := httptest.NewRequest(http.MethodPost, "/v1/releases", strings.NewReader(body)) + req.Header.Set("Authorization", "Bearer release-key") + w := httptest.NewRecorder() + srv.Handler().ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("release register with legacy regular bundle entry: status = %d, want 200, body = %s", w.Code, w.Body.String()) + } + releases := st.ListReleases() + if len(releases) != 1 || releases[0].BinaryHash != binaryHash { + t.Fatalf("release was not stored with legacy regular bundle entry: %+v", releases) + } +} + +func TestEdge_ReleaseRegisterRejectsBundledBinaryHashMismatch(t *testing.T) { + srv, _ := testServer(t) + srv.SetReleaseKey("release-key") + + bundle, _, bundleHash := buildReleaseBundleForTest(t, []byte("provider-binary")) + cdn := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/releases/v1.0.0/eigeninference-bundle-macos-arm64.tar.gz" { + http.NotFound(w, r) + return + } + w.Write(bundle) + })) + defer cdn.Close() + srv.SetR2CDNURL(cdn.URL) + + wrongBinaryHash := strings.Repeat("c", 64) + body := fmt.Sprintf(`{"version":"1.0.0","platform":"macos-arm64","binary_hash":%q,"bundle_hash":%q,"url":%q}`, wrongBinaryHash, bundleHash, cdn.URL+"/releases/v1.0.0/eigeninference-bundle-macos-arm64.tar.gz") + req := httptest.NewRequest(http.MethodPost, "/v1/releases", strings.NewReader(body)) + req.Header.Set("Authorization", "Bearer release-key") + w := httptest.NewRecorder() + srv.Handler().ServeHTTP(w, req) + + if w.Code != http.StatusBadRequest { + t.Fatalf("release register with mismatched binary hash: status = %d, want 400, body = %s", w.Code, w.Body.String()) + } +} + +func TestEdge_ReleaseRegisterRejectsOversizedBundledBinary(t *testing.T) { + srv, _ := testServer(t) + srv.SetReleaseKey("release-key") + + bundle, bundleHash := buildOversizedBinaryReleaseBundleForTest(t) + cdn := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/releases/v1.0.0/eigeninference-bundle-macos-arm64.tar.gz" { + http.NotFound(w, r) + return + } + w.Write(bundle) + })) + defer cdn.Close() + srv.SetR2CDNURL(cdn.URL) + + binaryHash := strings.Repeat("d", 64) + body := fmt.Sprintf(`{"version":"1.0.0","platform":"macos-arm64","binary_hash":%q,"bundle_hash":%q,"url":%q}`, binaryHash, bundleHash, cdn.URL+"/releases/v1.0.0/eigeninference-bundle-macos-arm64.tar.gz") + req := httptest.NewRequest(http.MethodPost, "/v1/releases", strings.NewReader(body)) + req.Header.Set("Authorization", "Bearer release-key") + w := httptest.NewRecorder() + srv.Handler().ServeHTTP(w, req) + + if w.Code != http.StatusBadRequest { + t.Fatalf("release register with oversized bundled binary: status = %d, want 400, body = %s", w.Code, w.Body.String()) + } +} + +func TestEdge_ReleaseRegisterRejectsRedirectedBundleDownload(t *testing.T) { + srv, _ := testServer(t) + srv.SetReleaseKey("release-key") + + bundle, binaryHash, bundleHash := buildReleaseBundleForTest(t, []byte("provider-binary")) + target := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write(bundle) + })) + defer target.Close() + + cdn := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Redirect(w, r, target.URL+"/bundle.tar.gz", http.StatusFound) + })) + defer cdn.Close() + srv.SetR2CDNURL(cdn.URL) + + body := fmt.Sprintf(`{"version":"1.0.0","platform":"macos-arm64","binary_hash":%q,"bundle_hash":%q,"url":%q}`, binaryHash, bundleHash, cdn.URL+"/releases/v1.0.0/eigeninference-bundle-macos-arm64.tar.gz") + req := httptest.NewRequest(http.MethodPost, "/v1/releases", strings.NewReader(body)) + req.Header.Set("Authorization", "Bearer release-key") + w := httptest.NewRecorder() + srv.Handler().ServeHTTP(w, req) + + if w.Code != http.StatusBadRequest { + t.Fatalf("release register with redirected bundle: status = %d, want 400, body = %s", w.Code, w.Body.String()) + } +} + +func TestEdge_ReleaseRegisterRejectsUnsafeBundlePath(t *testing.T) { + srv, _ := testServer(t) + srv.SetReleaseKey("release-key") + + bundle, binaryHash, bundleHash := buildReleaseBundleWithEntryForTest(t, "../bin/darkbloom", tar.TypeReg, []byte("provider-binary"), "") + cdn := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/releases/v1.0.0/eigeninference-bundle-macos-arm64.tar.gz" { + http.NotFound(w, r) + return + } + w.Write(bundle) + })) + defer cdn.Close() + srv.SetR2CDNURL(cdn.URL) + + body := fmt.Sprintf(`{"version":"1.0.0","platform":"macos-arm64","binary_hash":%q,"bundle_hash":%q,"url":%q}`, binaryHash, bundleHash, cdn.URL+"/releases/v1.0.0/eigeninference-bundle-macos-arm64.tar.gz") + req := httptest.NewRequest(http.MethodPost, "/v1/releases", strings.NewReader(body)) + req.Header.Set("Authorization", "Bearer release-key") + w := httptest.NewRecorder() + srv.Handler().ServeHTTP(w, req) + + if w.Code != http.StatusBadRequest { + t.Fatalf("release register with unsafe bundle path: status = %d, want 400, body = %s", w.Code, w.Body.String()) + } +} + +func TestEdge_ReleaseRegisterRejectsNonRegularProviderBinary(t *testing.T) { + srv, _ := testServer(t) + srv.SetReleaseKey("release-key") + + bundle, _, bundleHash := buildReleaseBundleWithEntryForTest(t, "bin/darkbloom", tar.TypeSymlink, nil, "darkbloom.real") + cdn := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/releases/v1.0.0/eigeninference-bundle-macos-arm64.tar.gz" { + http.NotFound(w, r) + return + } + w.Write(bundle) + })) + defer cdn.Close() + srv.SetR2CDNURL(cdn.URL) + + binaryHash := strings.Repeat("e", 64) + body := fmt.Sprintf(`{"version":"1.0.0","platform":"macos-arm64","binary_hash":%q,"bundle_hash":%q,"url":%q}`, binaryHash, bundleHash, cdn.URL+"/releases/v1.0.0/eigeninference-bundle-macos-arm64.tar.gz") + req := httptest.NewRequest(http.MethodPost, "/v1/releases", strings.NewReader(body)) + req.Header.Set("Authorization", "Bearer release-key") + w := httptest.NewRecorder() + srv.Handler().ServeHTTP(w, req) + + if w.Code != http.StatusBadRequest { + t.Fatalf("release register with non-regular provider binary: status = %d, want 400, body = %s", w.Code, w.Body.String()) + } +} + +func buildReleaseBundleForTest(t *testing.T, binary []byte) ([]byte, string, string) { + t.Helper() + + return buildReleaseBundleWithEntryForTest(t, "bin/darkbloom", tar.TypeReg, binary, "") +} + +func buildReleaseBundleWithEntryForTest(t *testing.T, name string, typeflag byte, binary []byte, linkname string) ([]byte, string, string) { + t.Helper() + + var buf bytes.Buffer + gz := gzip.NewWriter(&buf) + tw := tar.NewWriter(gz) + + header := &tar.Header{ + Name: name, + Mode: 0o755, + Typeflag: typeflag, + Linkname: linkname, + } + if typeflag == tar.TypeReg || typeflag == tar.TypeRegA { + header.Size = int64(len(binary)) + } + if err := tw.WriteHeader(header); err != nil { + t.Fatalf("write tar header: %v", err) + } + if len(binary) > 0 { + if _, err := tw.Write(binary); err != nil { + t.Fatalf("write binary: %v", err) + } + } + if err := tw.Close(); err != nil { + t.Fatalf("close tar: %v", err) + } + if err := gz.Close(); err != nil { + t.Fatalf("close gzip: %v", err) + } + + return buf.Bytes(), sha256HexBytesForReleaseTest(binary), sha256HexBytesForReleaseTest(buf.Bytes()) +} + +func buildOversizedBinaryReleaseBundleForTest(t *testing.T) ([]byte, string) { + t.Helper() + + var buf bytes.Buffer + gz := gzip.NewWriter(&buf) + tw := tar.NewWriter(gz) + if err := tw.WriteHeader(&tar.Header{ + Name: "bin/darkbloom", + Mode: 0o755, + Size: maxReleaseProviderBinBytes + 1, + }); err != nil { + t.Fatalf("write oversized tar header: %v", err) + } + if err := gz.Close(); err != nil { + t.Fatalf("close gzip: %v", err) + } + + return buf.Bytes(), sha256HexBytesForReleaseTest(buf.Bytes()) +} + +func sha256HexBytesForReleaseTest(data []byte) string { + sum := sha256.Sum256(data) + return hex.EncodeToString(sum[:]) +} + // --------------------------------------------------------------------------- // Error response format // --------------------------------------------------------------------------- diff --git a/coordinator/api/leaderboard.go b/coordinator/api/leaderboard.go index 0d2e11b4..ee94e4d5 100644 --- a/coordinator/api/leaderboard.go +++ b/coordinator/api/leaderboard.go @@ -66,7 +66,7 @@ func (s *Server) handleLeaderboard(w http.ResponseWriter, r *http.Request) { cacheKey := fmt.Sprintf("leaderboard:%s:%s:%d", metric, windowParam, limit) if cached, ok := s.readCache.Get(cacheKey); ok { - writeCachedJSON(w, http.StatusOK, cached) + writeCachedJSON(w, cached) return } @@ -102,7 +102,7 @@ func (s *Server) handleLeaderboard(w http.ResponseWriter, r *http.Request) { return } s.readCache.Set(cacheKey, body, 5*time.Minute) - writeCachedJSON(w, http.StatusOK, body) + writeCachedJSON(w, body) } func windowParamOrDefault(s string) string { @@ -126,7 +126,7 @@ func (s *Server) handleNetworkTotals(w http.ResponseWriter, r *http.Request) { cacheKey := "network_totals:" + windowParamOrDefault(windowParam) if cached, ok := s.readCache.Get(cacheKey); ok { - writeCachedJSON(w, http.StatusOK, cached) + writeCachedJSON(w, cached) return } @@ -145,5 +145,5 @@ func (s *Server) handleNetworkTotals(w http.ResponseWriter, r *http.Request) { return } s.readCache.Set(cacheKey, body, time.Minute) - writeCachedJSON(w, http.StatusOK, body) + writeCachedJSON(w, body) } diff --git a/coordinator/api/provider.go b/coordinator/api/provider.go index 756ecb3a..37e1d3a5 100644 --- a/coordinator/api/provider.go +++ b/coordinator/api/provider.go @@ -265,6 +265,7 @@ func (s *Server) providerReadLoop(ctx context.Context, conn *websocket.Conn, pro "version", regMsg.Version, "min_version", s.minProviderVersion, ) + s.ddIncr("provider_version_below_minimum", []string{"gate:registration", "version:" + regMsg.Version}) provider.Mu().Lock() provider.RuntimeVerified = false provider.RuntimeManifestChecked = false @@ -284,7 +285,7 @@ func (s *Server) providerReadLoop(ctx context.Context, conn *websocket.Conn, pro case protocol.TypeInferenceAccepted: acceptMsg := msg.Payload.(*protocol.InferenceAcceptedMessage) - s.handleInferenceAccepted(providerID, provider, acceptMsg) + s.handleInferenceAccepted(provider, acceptMsg) case protocol.TypeInferenceResponseChunk: chunkMsg := msg.Payload.(*protocol.InferenceResponseChunkMessage) @@ -475,7 +476,7 @@ func (s *Server) sendChallenge(ctx context.Context, conn *websocket.Conn, provid tracker.remove(nonce) return } - s.ddIncr("attestation.challenges", []string{"outcome:sent"}) + s.ddIncr("attestation.challenges_sent", nil) s.logger.Debug("sent attestation challenge", "provider_id", providerID, "nonce", nonce[:8]+"...") @@ -814,6 +815,7 @@ func (s *Server) verifyChallengeResponse(providerID string, provider *registry.P "version", version, "min_version", s.minProviderVersion, ) + s.ddIncr("provider_version_below_minimum", []string{"gate:challenge_revalidation", "version:" + version}) provider.Mu().Lock() provider.RuntimeVerified = false provider.RuntimeManifestChecked = false @@ -965,7 +967,7 @@ func (e *textChunkViolationError) Error() string { return e.reason } -func (s *Server) handleInferenceAccepted(providerID string, provider *registry.Provider, msg *protocol.InferenceAcceptedMessage) { +func (s *Server) handleInferenceAccepted(provider *registry.Provider, msg *protocol.InferenceAcceptedMessage) { if provider == nil { return } @@ -1030,6 +1032,7 @@ func (s *Server) handleComplete(providerID string, provider *registry.Provider, "prompt_tokens", msg.Usage.PromptTokens, "completion_tokens", msg.Usage.CompletionTokens, ) + s.ddIncr("billing.cost_clamped", []string{"model:" + pr.Model}) totalCost = pr.ReservedMicroUSD } providerPayout := payments.ProviderPayout(totalCost) @@ -1041,10 +1044,13 @@ func (s *Server) handleComplete(providerID string, provider *registry.Provider, if pr.ReservedMicroUSD > 0 { if totalCost < pr.ReservedMicroUSD { refund := pr.ReservedMicroUSD - totalCost + start := time.Now() _ = s.store.Credit(pr.ConsumerKey, refund, store.LedgerRefund, msg.RequestID) + s.ddHistogram("billing.settlement_refund_micro_usd", float64(refund), []string{"model:" + pr.Model}) + s.ddHistogram("store.credit.latency_ms", float64(time.Since(start).Milliseconds()), []string{"op:settlement_refund"}) } } else { - // No reservation (billing not configured). Charge best-effort. + start := time.Now() if err := s.ledger.Charge(pr.ConsumerKey, totalCost, msg.RequestID); err != nil { s.logger.Warn("could not charge consumer (insufficient balance)", "consumer_key", pr.ConsumerKey, @@ -1052,6 +1058,7 @@ func (s *Server) handleComplete(providerID string, provider *registry.Provider, "error", err, ) } + s.ddHistogram("store.debit.latency_ms", float64(time.Since(start).Milliseconds()), []string{"op:charge"}) } // Record usage entry — both in-memory (for current session) and persisted @@ -1066,6 +1073,9 @@ func (s *Server) handleComplete(providerID string, provider *registry.Provider, }) s.store.RecordUsageWithCost(providerID, pr.ConsumerKey, pr.Model, msg.RequestID, msg.Usage.PromptTokens, msg.Usage.CompletionTokens, totalCost) s.ddIncr("inference.completions", []string{"model:" + pr.Model}) + s.ddCount("inference.prompt_tokens_total", int64(msg.Usage.PromptTokens), []string{"model:" + pr.Model}) + s.ddHistogram("inference.prompt_tokens", float64(msg.Usage.PromptTokens), []string{"model:" + pr.Model}) + s.ddCount("inference.completion_tokens_total", int64(msg.Usage.CompletionTokens), []string{"model:" + pr.Model}) s.ddHistogram("inference.completion_tokens", float64(msg.Usage.CompletionTokens), []string{"model:" + pr.Model}) // Credit the provider's pending payout. @@ -1073,8 +1083,7 @@ func (s *Server) handleComplete(providerID string, provider *registry.Provider, // Otherwise, fall back to the provider's self-reported wallet address. if p := s.registry.GetProvider(providerID); p != nil { if p.AccountID != "" { - // Provider is linked to a Privy account — atomically credit the - // account and record the per-node earning in one store transaction. + start := time.Now() if err := s.store.CreditProviderAccount(&store.ProviderEarning{ AccountID: p.AccountID, ProviderID: providerID, @@ -1093,8 +1102,10 @@ func (s *Server) handleComplete(providerID string, provider *registry.Provider, "error", err, ) } + s.ddHistogram("store.credit.latency_ms", float64(time.Since(start).Milliseconds()), []string{"op:provider_account_credit"}) + s.ddCount("billing.provider_credits_micro_usd", providerPayout, []string{"model:" + pr.Model, "type:account"}) } else if p.WalletAddress != "" { - // Unlinked provider — atomically credit the wallet and record payout history. + start := time.Now() if err := s.ledger.CreditProvider(p.WalletAddress, providerPayout, pr.Model, msg.RequestID); err != nil { s.logger.Error("failed to credit provider wallet payout", "provider_id", providerID, @@ -1103,6 +1114,8 @@ func (s *Server) handleComplete(providerID string, provider *registry.Provider, "error", err, ) } + s.ddHistogram("store.credit.latency_ms", float64(time.Since(start).Milliseconds()), []string{"op:provider_wallet_credit"}) + s.ddCount("billing.provider_credits_micro_usd", providerPayout, []string{"model:" + pr.Model, "type:wallet"}) } } @@ -1114,7 +1127,10 @@ func (s *Server) handleComplete(providerID string, provider *registry.Provider, if s.billing != nil && s.billing.Referral() != nil { platformFee = s.billing.Referral().DistributeReferralReward(pr.ConsumerKey, platformFee, msg.RequestID) } + start := time.Now() _ = s.store.Credit("platform", platformFee, store.LedgerPlatformFee, msg.RequestID) + s.ddHistogram("store.credit.latency_ms", float64(time.Since(start).Milliseconds()), []string{"op:platform_fee"}) + s.ddCount("billing.platform_fees_micro_usd", platformFee, []string{"model:" + pr.Model}) } // Signal completion to the consumer response handler. This must happen diff --git a/coordinator/api/provider_test.go b/coordinator/api/provider_test.go index 1d25f43a..37e31a1d 100644 --- a/coordinator/api/provider_test.go +++ b/coordinator/api/provider_test.go @@ -9,6 +9,7 @@ import ( "encoding/asn1" "encoding/base64" "encoding/json" + "fmt" "io" "log/slog" "math/big" @@ -420,6 +421,252 @@ func TestProviderRegistrationWithValidAttestation(t *testing.T) { } } +func TestProviderRegistrationRequiresBinaryHashWhenPolicyConfigured(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + st := store.NewMemory("test-key") + reg := registry.New(logger) + srv := NewServer(reg, st, logger) + srv.SetKnownBinaryHashes([]string{knownGoodBinaryHashForTest}) + + pubKey := testPublicKeyB64() + regMsg := &protocol.RegisterMessage{ + Type: protocol.TypeRegister, + Hardware: protocol.Hardware{ChipName: "Apple M3 Max", MemoryGB: 64}, + Models: []protocol.ModelInfo{{ID: "missing-binary-hash-model", ModelType: "chat", Quantization: "4bit"}}, + Backend: "inprocess-mlx", + PublicKey: pubKey, + EncryptedResponseChunks: true, + PrivacyCapabilities: testPrivacyCaps(), + Attestation: createTestAttestationJSON(t, pubKey), + } + p := reg.Register("provider-1", nil, regMsg) + + srv.verifyProviderAttestation("provider-1", p, regMsg) + + if p.AttestationResult == nil { + t.Fatal("expected attestation result") + } + if p.AttestationResult.Valid { + t.Fatal("attestation should be invalid when binary hash policy is configured and hash is missing") + } + if p.AttestationResult.Error != "binary hash missing" { + t.Fatalf("attestation error = %q, want %q", p.AttestationResult.Error, "binary hash missing") + } + p.Mu().Lock() + defer p.Mu().Unlock() + if p.Status != registry.StatusUntrusted { + t.Fatalf("provider status = %q, want %q", p.Status, registry.StatusUntrusted) + } + if p.TrustLevel != registry.TrustNone { + t.Fatalf("provider trust = %q, want %q", p.TrustLevel, registry.TrustNone) + } +} + +func TestProviderRegistrationAcceptsKnownBinaryHash(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + st := store.NewMemory("test-key") + reg := registry.New(logger) + srv := NewServer(reg, st, logger) + srv.SetKnownBinaryHashes([]string{knownGoodBinaryHashForTest}) + + pubKey := testPublicKeyB64() + regMsg := &protocol.RegisterMessage{ + Type: protocol.TypeRegister, + Hardware: protocol.Hardware{ChipName: "Apple M3 Max", MemoryGB: 64}, + Models: []protocol.ModelInfo{{ID: "known-binary-hash-model", ModelType: "chat", Quantization: "4bit"}}, + Backend: "inprocess-mlx", + PublicKey: pubKey, + EncryptedResponseChunks: true, + PrivacyCapabilities: testPrivacyCaps(), + Attestation: createTestAttestationJSONWithBinaryHash(t, pubKey, knownGoodBinaryHashForTest), + } + p := reg.Register("provider-1", nil, regMsg) + + srv.verifyProviderAttestation("provider-1", p, regMsg) + + if p.AttestationResult == nil { + t.Fatal("expected attestation result") + } + if !p.AttestationResult.Valid { + t.Fatalf("attestation should be valid with a known binary hash, got %q", p.AttestationResult.Error) + } + p.Mu().Lock() + defer p.Mu().Unlock() + if p.Status == registry.StatusUntrusted { + t.Fatal("provider should not be marked untrusted with a known binary hash") + } + if p.TrustLevel != registry.TrustSelfSigned { + t.Fatalf("provider trust = %q, want %q", p.TrustLevel, registry.TrustSelfSigned) + } +} + +func TestProviderRegistrationRejectsInvalidConfiguredBinaryHash(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + st := store.NewMemory("test-key") + reg := registry.New(logger) + srv := NewServer(reg, st, logger) + srv.SetKnownBinaryHashes([]string{"not-a-sha256"}) + + pubKey := testPublicKeyB64() + regMsg := &protocol.RegisterMessage{ + Type: protocol.TypeRegister, + Hardware: protocol.Hardware{ChipName: "Apple M3 Max", MemoryGB: 64}, + Models: []protocol.ModelInfo{{ID: "invalid-configured-hash-model", ModelType: "chat", Quantization: "4bit"}}, + Backend: "inprocess-mlx", + PublicKey: pubKey, + EncryptedResponseChunks: true, + PrivacyCapabilities: testPrivacyCaps(), + Attestation: createTestAttestationJSONWithBinaryHash(t, pubKey, "not-a-sha256"), + } + p := reg.Register("provider-1", nil, regMsg) + + srv.verifyProviderAttestation("provider-1", p, regMsg) + + policyConfigured, knownHashes := srv.binaryHashPolicySnapshot() + if !policyConfigured { + t.Fatal("binary hash policy should remain configured even when configured hashes are invalid") + } + if len(knownHashes) != 0 { + t.Fatalf("known binary hashes = %d, want 0 valid hashes", len(knownHashes)) + } + if p.AttestationResult == nil { + t.Fatal("expected attestation result") + } + if p.AttestationResult.Valid { + t.Fatal("attestation should be invalid when configured hash and reported hash are invalid") + } + p.Mu().Lock() + defer p.Mu().Unlock() + if p.Status != registry.StatusUntrusted { + t.Fatalf("provider status = %q, want %q", p.Status, registry.StatusUntrusted) + } +} + +func TestSyncBinaryHashesRejectsInvalidStoredReleaseHashWithoutFailingOpen(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + st := store.NewMemory("test-key") + reg := registry.New(logger) + srv := NewServer(reg, st, logger) + if err := st.SetRelease(&store.Release{ + Version: "1.0.0", + Platform: "macos-arm64", + BinaryHash: "not-a-sha256", + BundleHash: strings.Repeat("b", 64), + URL: "https://r2.example.com/releases/v1.0.0/eigeninference-bundle-macos-arm64.tar.gz", + }); err != nil { + t.Fatalf("SetRelease: %v", err) + } + + srv.SyncBinaryHashes() + + policyConfigured, knownHashes := srv.binaryHashPolicySnapshot() + if !policyConfigured { + t.Fatal("binary hash policy should remain configured when an active release has an invalid hash") + } + if len(knownHashes) != 0 { + t.Fatalf("known binary hashes = %d, want 0 valid hashes", len(knownHashes)) + } +} + +func TestSyncBinaryHashesPreservesAdditionalConfiguredHashes(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + st := store.NewMemory("test-key") + reg := registry.New(logger) + srv := NewServer(reg, st, logger) + + manualHash := strings.Repeat("a", 64) + releaseHash := strings.Repeat("b", 64) + srv.AddKnownBinaryHashes([]string{manualHash}) + if err := st.SetRelease(&store.Release{ + Version: "1.0.0", + Platform: "macos-arm64", + BinaryHash: releaseHash, + BundleHash: strings.Repeat("c", 64), + URL: "https://r2.example.com/releases/v1.0.0/eigeninference-bundle-macos-arm64.tar.gz", + }); err != nil { + t.Fatalf("SetRelease: %v", err) + } + + srv.SyncBinaryHashes() + policyConfigured, knownHashes := srv.binaryHashPolicySnapshot() + if !policyConfigured { + t.Fatal("binary hash policy should be configured after manual hash and active release") + } + if !knownHashes[manualHash] { + t.Fatal("manual binary hash was dropped during release sync") + } + if !knownHashes[releaseHash] { + t.Fatal("release binary hash was not synced") + } + + if err := st.DeleteRelease("1.0.0", "macos-arm64"); err != nil { + t.Fatalf("DeleteRelease: %v", err) + } + srv.SyncBinaryHashes() + policyConfigured, knownHashes = srv.binaryHashPolicySnapshot() + if !policyConfigured { + t.Fatal("binary hash policy should remain configured after release deletion because manual hash remains") + } + if !knownHashes[manualHash] { + t.Fatal("manual binary hash was dropped during release deletion sync") + } + if knownHashes[releaseHash] { + t.Fatal("inactive release binary hash should not remain after sync") + } +} + +func TestBinaryHashPolicySnapshotConcurrentSync(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + st := store.NewMemory("test-key") + reg := registry.New(logger) + srv := NewServer(reg, st, logger) + manualHash := strings.Repeat("a", 64) + srv.AddKnownBinaryHashes([]string{manualHash}) + + done := make(chan struct{}) + var wg sync.WaitGroup + for i := 0; i < 4; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-done: + return + default: + policyConfigured, knownHashes := srv.binaryHashPolicySnapshot() + if policyConfigured && !knownHashes[manualHash] { + t.Errorf("manual hash missing from policy snapshot") + return + } + } + } + }() + } + + for i := 0; i < 50; i++ { + version := fmt.Sprintf("1.0.%d", i) + releaseHash := fmt.Sprintf("%064x", i+1) + if err := st.SetRelease(&store.Release{ + Version: version, + Platform: "macos-arm64", + BinaryHash: releaseHash, + BundleHash: strings.Repeat("c", 64), + URL: "https://r2.example.com/releases/v" + version + "/eigeninference-bundle-macos-arm64.tar.gz", + }); err != nil { + t.Fatalf("SetRelease: %v", err) + } + srv.SyncBinaryHashes() + if err := st.DeleteRelease(version, "macos-arm64"); err != nil { + t.Fatalf("DeleteRelease: %v", err) + } + srv.SyncBinaryHashes() + } + + close(done) + wg.Wait() +} + // TestProviderRegistrationWithInvalidAttestation verifies that a provider // with an invalid attestation is still registered but not marked as attested. func TestProviderRegistrationWithInvalidAttestation(t *testing.T) { @@ -513,82 +760,6 @@ func TestProviderRegistrationWithoutAttestation(t *testing.T) { } } -func TestProviderRegistrationRequiresBinaryHashWhenPolicyConfigured(t *testing.T) { - logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) - st := store.NewMemory("test-key") - reg := registry.New(logger) - srv := NewServer(reg, st, logger) - srv.SetKnownBinaryHashes([]string{knownGoodBinaryHashForTest}) - - pubKey := testPublicKeyB64() - regMsg := &protocol.RegisterMessage{ - Type: protocol.TypeRegister, - Hardware: protocol.Hardware{ChipName: "Apple M3 Max", MemoryGB: 64}, - Models: []protocol.ModelInfo{{ID: "missing-binary-hash-model", ModelType: "chat", Quantization: "4bit"}}, - Backend: registry.BackendMLXSwift, - PublicKey: pubKey, - EncryptedResponseChunks: true, - PrivacyCapabilities: testPrivacyCaps(), - Attestation: createTestAttestationJSON(t, pubKey), - } - p := reg.Register("provider-1", nil, regMsg) - - srv.verifyProviderAttestation("provider-1", p, regMsg) - - if p.AttestationResult == nil { - t.Fatal("expected attestation result") - } - if p.AttestationResult.Valid { - t.Fatal("attestation should be invalid when binary hash policy is configured and hash is missing") - } - if p.AttestationResult.Error != "binary hash missing" { - t.Fatalf("attestation error = %q, want %q", p.AttestationResult.Error, "binary hash missing") - } - p.Mu().Lock() - defer p.Mu().Unlock() - if p.Status != registry.StatusUntrusted { - t.Fatalf("provider status = %q, want %q", p.Status, registry.StatusUntrusted) - } -} - -func TestProviderRegistrationAcceptsKnownBinaryHash(t *testing.T) { - logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) - st := store.NewMemory("test-key") - reg := registry.New(logger) - srv := NewServer(reg, st, logger) - srv.SetKnownBinaryHashes([]string{knownGoodBinaryHashForTest}) - - pubKey := testPublicKeyB64() - regMsg := &protocol.RegisterMessage{ - Type: protocol.TypeRegister, - Hardware: protocol.Hardware{ChipName: "Apple M3 Max", MemoryGB: 64}, - Models: []protocol.ModelInfo{{ID: "known-binary-hash-model", ModelType: "chat", Quantization: "4bit"}}, - Backend: registry.BackendMLXSwift, - PublicKey: pubKey, - EncryptedResponseChunks: true, - PrivacyCapabilities: testPrivacyCaps(), - Attestation: createTestAttestationJSONWithBinaryHash(t, pubKey, knownGoodBinaryHashForTest), - } - p := reg.Register("provider-1", nil, regMsg) - - srv.verifyProviderAttestation("provider-1", p, regMsg) - - if p.AttestationResult == nil { - t.Fatal("expected attestation result") - } - if !p.AttestationResult.Valid { - t.Fatalf("attestation should be valid with a known binary hash, got %q", p.AttestationResult.Error) - } - p.Mu().Lock() - defer p.Mu().Unlock() - if p.Status == registry.StatusUntrusted { - t.Fatal("provider should not be marked untrusted with a known binary hash") - } - if p.TrustLevel != registry.TrustSelfSigned { - t.Fatalf("provider trust = %q, want %q", p.TrustLevel, registry.TrustSelfSigned) - } -} - // TestListModelsWithAttestationInfo verifies that /v1/models includes // attestation metadata. func TestListModelsWithAttestationInfo(t *testing.T) { @@ -1186,6 +1357,54 @@ func TestChallengeResponseRejectsMissingSIPStatus(t *testing.T) { } } +func TestChallengeResponseRejectsUnsignedBinaryHashWhenPolicyConfigured(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + st := store.NewMemory("test-key") + reg := registry.New(logger) + srv := NewServer(reg, st, logger) + srv.SetKnownBinaryHashes([]string{knownGoodBinaryHashForTest}) + + pubKey := testPublicKeyB64() + p := reg.Register("provider-1", nil, &protocol.RegisterMessage{ + Type: protocol.TypeRegister, + Hardware: protocol.Hardware{ChipName: "Apple M3 Max", MemoryGB: 64}, + Models: []protocol.ModelInfo{{ID: "unsigned-challenge-binary-hash-model", ModelType: "chat", Quantization: "4bit"}}, + Backend: "inprocess-mlx", + PublicKey: pubKey, + EncryptedResponseChunks: true, + PrivacyCapabilities: testPrivacyCaps(), + }) + sipEnabled := true + secureBootEnabled := true + rdmaDisabled := true + + srv.verifyChallengeResponse("provider-1", p, &pendingChallenge{ + nonce: "nonce-1", + timestamp: "2026-04-24T12:00:00Z", + }, &protocol.AttestationResponseMessage{ + Type: protocol.TypeAttestationResponse, + Nonce: "nonce-1", + Signature: "dGVzdHNpZ25hdHVyZQ==", + PublicKey: pubKey, + SIPEnabled: &sipEnabled, + SecureBootEnabled: &secureBootEnabled, + RDMADisabled: &rdmaDisabled, + BinaryHash: knownGoodBinaryHashForTest, + }) + + p.Mu().Lock() + defer p.Mu().Unlock() + if p.Status != registry.StatusUntrusted { + t.Fatalf("provider status = %q, want %q", p.Status, registry.StatusUntrusted) + } + if p.FailedChallenges != 1 { + t.Fatalf("failed challenges = %d, want 1", p.FailedChallenges) + } + if !p.LastChallengeVerified.IsZero() { + t.Fatal("provider should not record challenge success for an unsigned binary hash") + } +} + func TestChallengeResponseMissingSIPClearsExistingRoutingEligibility(t *testing.T) { logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) st := store.NewMemory("test-key") diff --git a/coordinator/api/release_handlers.go b/coordinator/api/release_handlers.go index 32f766dd..5a91d764 100644 --- a/coordinator/api/release_handlers.go +++ b/coordinator/api/release_handlers.go @@ -1,72 +1,122 @@ package api import ( + "archive/tar" + "compress/gzip" + "context" + "crypto/sha256" "crypto/subtle" + "encoding/hex" "encoding/json" + "fmt" + "io" + "net" "net/http" + "net/url" + "os" + "path" + "regexp" + "strings" "time" "github.com/eigeninference/d-inference/coordinator/auth" "github.com/eigeninference/d-inference/coordinator/store" ) +const ( + maxReleaseRegisterBodyBytes = 64 * 1024 + maxReleaseArtifactBytes = 2 << 30 // 2 GiB + maxReleaseProviderBinBytes = 512 << 20 + releaseArtifactTimeout = 2 * time.Minute +) + +var ( + releaseVersionPattern = regexp.MustCompile(`^[0-9]+\.[0-9]+\.[0-9]+(?:[-+][0-9A-Za-z.-]+)?$`) + releasePlatformPattern = regexp.MustCompile(`^[a-z0-9][a-z0-9._-]{0,63}$`) + releaseTemplateNamePattern = regexp.MustCompile(`^[A-Za-z0-9._-]+$`) +) + +type registerReleaseRequest struct { + Version string `json:"version"` + Platform string `json:"platform"` + Backend string `json:"backend,omitempty"` + BinaryHash string `json:"binary_hash"` + BundleHash string `json:"bundle_hash"` + MetallibHash string `json:"metallib_hash,omitempty"` + PythonHash string `json:"python_hash,omitempty"` + RuntimeHash string `json:"runtime_hash,omitempty"` + TemplateHashes string `json:"template_hashes,omitempty"` + URL string `json:"url"` + Changelog string `json:"changelog"` +} + +func (req registerReleaseRequest) toRelease() store.Release { + return store.Release{ + Version: req.Version, + Platform: req.Platform, + Backend: req.Backend, + BinaryHash: req.BinaryHash, + BundleHash: req.BundleHash, + MetallibHash: req.MetallibHash, + PythonHash: req.PythonHash, + RuntimeHash: req.RuntimeHash, + TemplateHashes: req.TemplateHashes, + URL: req.URL, + Changelog: req.Changelog, + } +} + // handleRegisterRelease handles POST /v1/releases. // Called by GitHub Actions to register a new provider binary release. // Authenticated with a scoped release key (NOT admin credentials). func (s *Server) handleRegisterRelease(w http.ResponseWriter, r *http.Request) { // Verify scoped release key. token := extractBearerToken(r) - if s.releaseKey == "" || token != s.releaseKey { + if !s.releaseKeyAuthorized(token) { writeJSON(w, http.StatusUnauthorized, errorResponse("unauthorized", "invalid release key")) return } - var release store.Release - if err := json.NewDecoder(r.Body).Decode(&release); err != nil { + var req registerReleaseRequest + r.Body = http.MaxBytesReader(w, r.Body, maxReleaseRegisterBodyBytes) + dec := json.NewDecoder(r.Body) + dec.DisallowUnknownFields() + if err := dec.Decode(&req); err != nil { writeJSON(w, http.StatusBadRequest, errorResponse("invalid_request_error", "invalid JSON: "+err.Error())) return } - if release.Version == "" { - writeJSON(w, http.StatusBadRequest, errorResponse("invalid_request_error", "version is required")) + if err := dec.Decode(&struct{}{}); err != io.EOF { + writeJSON(w, http.StatusBadRequest, errorResponse("invalid_request_error", "invalid JSON: multiple JSON values")) return } + release := req.toRelease() if release.Platform == "" { release.Platform = "macos-arm64" // default } - if release.BinaryHash == "" { - writeJSON(w, http.StatusBadRequest, errorResponse("invalid_request_error", "binary_hash is required")) - return - } - if release.BundleHash == "" { - writeJSON(w, http.StatusBadRequest, errorResponse("invalid_request_error", "bundle_hash is required")) - return - } - if normalized, err := normalizeSHA256Hex(release.BinaryHash, "binary_hash"); err != nil { - writeJSON(w, http.StatusBadRequest, errorResponse("invalid_request_error", err.Error())) - return - } else { - release.BinaryHash = normalized - } - if normalized, err := normalizeSHA256Hex(release.BundleHash, "bundle_hash"); err != nil { + + if err := s.validateReleaseMetadata(&release); err != nil { writeJSON(w, http.StatusBadRequest, errorResponse("invalid_request_error", err.Error())) return - } else { - release.BundleHash = normalized } - if release.MetallibHash != "" { - if normalized, err := normalizeSHA256Hex(release.MetallibHash, "metallib_hash"); err != nil { - writeJSON(w, http.StatusBadRequest, errorResponse("invalid_request_error", err.Error())) - return - } else { - release.MetallibHash = normalized - } - } - if release.Backend == "mlx-swift" && release.MetallibHash == "" { - writeJSON(w, http.StatusBadRequest, errorResponse("invalid_request_error", "metallib_hash is required for mlx-swift releases")) + + if s.r2CDNURL == "" { + s.logger.Error("release: artifact verification unavailable because R2 CDN URL is not configured", + "version", release.Version, + "platform", release.Platform, + ) + writeJSON(w, http.StatusServiceUnavailable, errorResponse("not_configured", "release artifact verification requires R2 CDN URL")) return } - if release.URL == "" { - writeJSON(w, http.StatusBadRequest, errorResponse("invalid_request_error", "url is required")) + + ctx, cancel := context.WithTimeout(r.Context(), releaseArtifactTimeout) + defer cancel() + if err := s.verifyReleaseArtifact(ctx, &release); err != nil { + s.logger.Warn("release: artifact verification failed", + "version", release.Version, + "platform", release.Platform, + "error", err, + ) + writeJSON(w, http.StatusBadRequest, errorResponse("invalid_request_error", "release artifact verification failed: "+err.Error())) return } @@ -99,6 +149,308 @@ func (s *Server) handleRegisterRelease(w http.ResponseWriter, r *http.Request) { }) } +func (s *Server) releaseKeyAuthorized(token string) bool { + if s.releaseKey == "" || token == "" { + return false + } + return subtle.ConstantTimeCompare([]byte(token), []byte(s.releaseKey)) == 1 +} + +func (s *Server) validateReleaseMetadata(release *store.Release) error { + release.Version = strings.TrimSpace(release.Version) + release.Platform = strings.TrimSpace(release.Platform) + release.Backend = strings.TrimSpace(release.Backend) + release.BinaryHash = strings.TrimSpace(release.BinaryHash) + release.BundleHash = strings.TrimSpace(release.BundleHash) + release.MetallibHash = strings.TrimSpace(release.MetallibHash) + release.PythonHash = strings.TrimSpace(release.PythonHash) + release.RuntimeHash = strings.TrimSpace(release.RuntimeHash) + release.TemplateHashes = strings.TrimSpace(release.TemplateHashes) + release.URL = strings.TrimSpace(release.URL) + + if release.Version == "" { + return fmt.Errorf("version is required") + } + if !releaseVersionPattern.MatchString(release.Version) { + return fmt.Errorf("version must be semver, e.g. 1.2.3 or 1.2.3-dev.1") + } + if release.Platform == "" { + return fmt.Errorf("platform is required") + } + if !releasePlatformPattern.MatchString(release.Platform) { + return fmt.Errorf("platform contains invalid characters") + } + + var err error + if release.BinaryHash, err = normalizeSHA256Hex(release.BinaryHash, "binary_hash"); err != nil { + return err + } + if release.BundleHash, err = normalizeSHA256Hex(release.BundleHash, "bundle_hash"); err != nil { + return err + } + if release.MetallibHash != "" { + if release.MetallibHash, err = normalizeSHA256Hex(release.MetallibHash, "metallib_hash"); err != nil { + return err + } + } + if release.Backend == "mlx-swift" && release.MetallibHash == "" { + return fmt.Errorf("metallib_hash is required for mlx-swift releases") + } + if release.PythonHash != "" { + if release.PythonHash, err = normalizeSHA256Hex(release.PythonHash, "python_hash"); err != nil { + return err + } + } + if release.RuntimeHash != "" { + if release.RuntimeHash, err = normalizeSHA256Hex(release.RuntimeHash, "runtime_hash"); err != nil { + return err + } + } + if release.TemplateHashes != "" { + if release.TemplateHashes, err = normalizeTemplateHashes(release.TemplateHashes); err != nil { + return err + } + } + if release.URL == "" { + return fmt.Errorf("url is required") + } + if s.r2CDNURL != "" { + if _, err := s.trustedReleaseArtifactURL(release); err != nil { + return err + } + } + return nil +} + +func (s *Server) trustedReleaseArtifactURL(release *store.Release) (*url.URL, error) { + expectedURL, err := expectedReleaseArtifactURL(s.r2CDNURL, release.Version, release.Platform) + if err != nil { + return nil, err + } + if !sameReleaseArtifactURL(release.URL, expectedURL) { + return nil, fmt.Errorf("url must match configured release artifact path") + } + parsed, err := url.Parse(expectedURL) + if err != nil { + return nil, fmt.Errorf("configured release artifact URL is invalid") + } + return parsed, nil +} + +func expectedReleaseArtifactURL(baseURL, version, platform string) (string, error) { + version = strings.TrimSpace(version) + platform = strings.TrimSpace(platform) + if !releaseVersionPattern.MatchString(version) { + return "", fmt.Errorf("version must be semver, e.g. 1.2.3 or 1.2.3-dev.1") + } + if !releasePlatformPattern.MatchString(platform) { + return "", fmt.Errorf("platform contains invalid characters") + } + + u, err := url.Parse(strings.TrimSpace(baseURL)) + if err != nil { + return "", fmt.Errorf("configured R2 CDN URL is invalid") + } + if u.User != nil || u.RawQuery != "" || u.Fragment != "" { + return "", fmt.Errorf("configured R2 CDN URL must not include credentials, query, or fragment") + } + if u.Host == "" { + return "", fmt.Errorf("configured R2 CDN URL must include a host") + } + if u.Scheme != "https" && u.Scheme != "http" { + return "", fmt.Errorf("configured R2 CDN URL must be absolute") + } + if u.Scheme == "http" && !isLoopbackHost(u.Hostname()) { + return "", fmt.Errorf("configured R2 CDN URL must use https") + } + u.Path = path.Join(u.Path, "releases", "v"+version, "eigeninference-bundle-"+platform+".tar.gz") + u.RawQuery = "" + u.Fragment = "" + return u.String(), nil +} + +func isLoopbackHost(host string) bool { + if strings.EqualFold(host, "localhost") { + return true + } + ip := net.ParseIP(host) + return ip != nil && ip.IsLoopback() +} + +func sameReleaseArtifactURL(actual, expected string) bool { + actualURL, err := url.Parse(strings.TrimSpace(actual)) + if err != nil { + return false + } + expectedURL, err := url.Parse(expected) + if err != nil { + return false + } + if actualURL.User != nil || expectedURL.User != nil { + return false + } + return strings.EqualFold(actualURL.Scheme, expectedURL.Scheme) && + strings.EqualFold(actualURL.Host, expectedURL.Host) && + path.Clean(actualURL.EscapedPath()) == path.Clean(expectedURL.EscapedPath()) && + actualURL.RawQuery == "" && + actualURL.Fragment == "" +} + +func normalizeSHA256Hex(value, field string) (string, error) { + value = strings.ToLower(strings.TrimSpace(value)) + if len(value) != sha256.Size*2 { + return "", fmt.Errorf("%s must be a 64-character SHA-256 hex digest", field) + } + if _, err := hex.DecodeString(value); err != nil { + return "", fmt.Errorf("%s must be a valid SHA-256 hex digest", field) + } + return value, nil +} + +func normalizeTemplateHashes(raw string) (string, error) { + entries := strings.Split(raw, ",") + normalized := make([]string, 0, len(entries)) + for _, entry := range entries { + entry = strings.TrimSpace(entry) + if entry == "" { + continue + } + name, hash, ok := strings.Cut(entry, "=") + if !ok { + return "", fmt.Errorf("template_hashes entries must be name=sha256") + } + name = strings.TrimSpace(name) + if name == "" || !releaseTemplateNamePattern.MatchString(name) { + return "", fmt.Errorf("template_hashes contains an invalid template name") + } + hash, err := normalizeSHA256Hex(hash, "template_hashes") + if err != nil { + return "", err + } + normalized = append(normalized, name+"="+hash) + } + return strings.Join(normalized, ","), nil +} + +func (s *Server) verifyReleaseArtifact(ctx context.Context, release *store.Release) error { + downloadURL, err := s.trustedReleaseArtifactURL(release) + if err != nil { + return err + } + req := &http.Request{ + Method: http.MethodGet, + URL: downloadURL, + Header: make(http.Header), + } + req = req.WithContext(ctx) + + client := &http.Client{ + CheckRedirect: func(req *http.Request, via []*http.Request) error { + return http.ErrUseLastResponse + }, + } + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("download bundle: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("download bundle returned status %d", resp.StatusCode) + } + + tmp, err := os.CreateTemp("", "darkbloom-release-*.tar.gz") + if err != nil { + return fmt.Errorf("create temp bundle: %w", err) + } + defer func() { + tmp.Close() + os.Remove(tmp.Name()) + }() + + bundleHash := sha256.New() + limited := io.LimitReader(resp.Body, maxReleaseArtifactBytes+1) + n, err := io.Copy(io.MultiWriter(tmp, bundleHash), limited) + if err != nil { + return fmt.Errorf("read bundle: %w", err) + } + if n > maxReleaseArtifactBytes { + return fmt.Errorf("bundle exceeds maximum size") + } + actualBundleHash := hex.EncodeToString(bundleHash.Sum(nil)) + if actualBundleHash != release.BundleHash { + return fmt.Errorf("bundle_hash does not match release artifact") + } + + if _, err := tmp.Seek(0, io.SeekStart); err != nil { + return fmt.Errorf("rewind bundle: %w", err) + } + + gz, err := gzip.NewReader(tmp) + if err != nil { + return fmt.Errorf("open bundle gzip: %w", err) + } + defer gz.Close() + + tarReader := tar.NewReader(gz) + binaryHash := sha256.New() + foundBinary := false + for { + header, err := tarReader.Next() + if err == io.EOF { + break + } + if err != nil { + return fmt.Errorf("read bundle tar: %w", err) + } + cleanName, err := cleanReleaseTarPath(header.Name) + if err != nil { + return err + } + if cleanName != "bin/darkbloom" { + continue + } + if header.Typeflag != tar.TypeReg && header.Typeflag != tar.TypeRegA { + return fmt.Errorf("bundled provider binary is not a regular file") + } + if foundBinary { + return fmt.Errorf("bundle contains multiple provider binaries") + } + if header.Size < 0 || header.Size > maxReleaseProviderBinBytes { + return fmt.Errorf("provider binary exceeds maximum size") + } + n, err := io.Copy(binaryHash, io.LimitReader(tarReader, maxReleaseProviderBinBytes+1)) + if err != nil { + return fmt.Errorf("read provider binary: %w", err) + } + if n > maxReleaseProviderBinBytes { + return fmt.Errorf("provider binary exceeds maximum size") + } + foundBinary = true + } + if !foundBinary { + return fmt.Errorf("bundle is missing bin/darkbloom") + } + + actualBinaryHash := hex.EncodeToString(binaryHash.Sum(nil)) + if actualBinaryHash != release.BinaryHash { + return fmt.Errorf("binary_hash does not match bundled provider binary") + } + return nil +} + +func cleanReleaseTarPath(name string) (string, error) { + if name == "" || strings.HasPrefix(name, "/") { + return "", fmt.Errorf("bundle contains unsafe path") + } + for _, part := range strings.Split(name, "/") { + if part == ".." { + return "", fmt.Errorf("bundle contains unsafe path") + } + } + return strings.TrimPrefix(path.Clean(name), "./"), nil +} + // handleLatestRelease handles GET /v1/releases/latest. // Public endpoint — returns the latest active release for a platform. // Used by install.sh to get the download URL and expected hash. @@ -110,7 +462,7 @@ func (s *Server) handleLatestRelease(w http.ResponseWriter, r *http.Request) { cacheKey := "latest_release:v1:" + platform if cached, ok := s.readCache.Get(cacheKey); ok { - writeCachedJSON(w, http.StatusOK, cached) + writeCachedJSON(w, cached) return } @@ -126,7 +478,7 @@ func (s *Server) handleLatestRelease(w http.ResponseWriter, r *http.Request) { return } s.readCache.Set(cacheKey, body, time.Minute) - writeCachedJSON(w, http.StatusOK, body) + writeCachedJSON(w, body) } // handleAdminListReleases handles GET /v1/admin/releases. diff --git a/coordinator/api/routing_metrics_test.go b/coordinator/api/routing_metrics_test.go index 96df419e..04d1e1e9 100644 --- a/coordinator/api/routing_metrics_test.go +++ b/coordinator/api/routing_metrics_test.go @@ -204,7 +204,7 @@ func TestRoutingMetrics_SelectedEmitsDecisionAndCost(t *testing.T) { srv.ddIncr("routing.decisions", []string{"model:" + model, "outcome:selected"}) srv.ddIncr("routing.provider_selected", []string{"provider_id:" + provider.ID, "model:" + model}) - srv.ddHistogram("routing.cost_ms", decision.CostMs, []string{"model:" + model}) + srv.ddHistogram("routing.cost_ms", decision.CostMs, []string{"model:" + model, "provider_id:" + provider.ID}) if decision.EffectiveTPS > 0 { srv.ddGauge("routing.effective_decode_tps", decision.EffectiveTPS, []string{"provider_id:" + provider.ID}) } @@ -415,18 +415,22 @@ func TestAttestationMetrics_AllOutcomes(t *testing.T) { defer ddClient.Close() srv.SetDatadog(ddClient) - for _, outcome := range []string{"sent", "passed", "failed", "status_sig_missing"} { + for _, outcome := range []string{"passed", "failed", "status_sig_missing"} { srv.ddIncr("attestation.challenges", []string{"outcome:" + outcome}) } + srv.ddIncr("attestation.challenges_sent", nil) _ = ddClient.Statsd.Flush() packets := collector.drain() - for _, outcome := range []string{"sent", "passed", "failed", "status_sig_missing"} { + for _, outcome := range []string{"passed", "failed", "status_sig_missing"} { if !hasMetric(packets, "outcome:"+outcome) { t.Errorf("missing attestation.challenges{outcome:%s}; got packets: %v", outcome, packets) } } + if !hasMetric(packets, "attestation.challenges_sent") { + t.Errorf("missing attestation.challenges_sent; got packets: %v", packets) + } } func TestInferenceMetrics_CompletionCounters(t *testing.T) { @@ -501,7 +505,7 @@ func TestRoutingMetrics_AllTagsOnSelection(t *testing.T) { srv.ddIncr("routing.decisions", []string{"model:" + model, "outcome:selected"}) srv.ddIncr("routing.provider_selected", []string{"provider_id:" + provider.ID, "model:" + model}) - srv.ddHistogram("routing.cost_ms", decision.CostMs, []string{"model:" + model}) + srv.ddHistogram("routing.cost_ms", decision.CostMs, []string{"model:" + model, "provider_id:" + provider.ID}) srv.ddGauge("routing.effective_decode_tps", decision.EffectiveTPS, []string{"provider_id:" + provider.ID}) _ = ddClient.Statsd.Flush() @@ -516,6 +520,7 @@ func TestRoutingMetrics_AllTagsOnSelection(t *testing.T) { {"routing.provider_selected", "provider_id:" + p.ID}, {"routing.provider_selected", "model:" + model}, {"routing.cost_ms", "model:" + model}, + {"routing.cost_ms", "provider_id:" + provider.ID}, {"routing.effective_decode_tps", "provider_id:" + p.ID}, } for _, c := range checks { diff --git a/coordinator/api/server.go b/coordinator/api/server.go index 53714010..527175a8 100644 --- a/coordinator/api/server.go +++ b/coordinator/api/server.go @@ -17,11 +17,9 @@ import ( "bufio" "context" "crypto/rand" - "crypto/sha256" "crypto/subtle" "crypto/x509" _ "embed" - "encoding/hex" "encoding/json" "errors" "fmt" @@ -320,13 +318,13 @@ func (s *Server) emit(ctx context.Context, severity protocol.TelemetrySeverity, } // emitRequest is like emit but preserves a request_id for correlation. -func (s *Server) emitRequest(ctx context.Context, severity protocol.TelemetrySeverity, kind protocol.TelemetryKind, requestID, message string, fields map[string]any) { +func (s *Server) emitRequest(ctx context.Context, severity protocol.TelemetrySeverity, requestID, message string, fields map[string]any) { if s.emitter == nil { return } s.emitter.Emit(ctx, telemetry.Event{ Severity: severity, - Kind: kind, + Kind: protocol.KindInferenceError, Message: message, Fields: fields, RequestID: requestID, @@ -340,6 +338,13 @@ func (s *Server) ddIncr(name string, tags []string) { } } +// ddCount increments a DogStatsD counter by the given value. No-op if DD is not configured. +func (s *Server) ddCount(name string, value int64, tags []string) { + if s.dd != nil { + s.dd.Count(name, value, tags) + } +} + // ddHistogram records a DogStatsD histogram value. No-op if DD is not configured. func (s *Server) ddHistogram(name string, value float64, tags []string) { if s.dd != nil { @@ -354,7 +359,6 @@ func (s *Server) ddGauge(name string, value float64, tags []string) { } } -// emitPanic is the panic-specific emit helper. Captures stack separately. func (s *Server) emitPanic(ctx context.Context, message, stack string, fields map[string]any) { if s.emitter == nil { return @@ -484,17 +488,6 @@ func hasConfiguredHashInput(hashes []string) bool { return false } -func normalizeSHA256Hex(value, field string) (string, error) { - value = strings.ToLower(strings.TrimSpace(value)) - if len(value) != sha256.Size*2 { - return "", fmt.Errorf("%s must be a 64-character SHA-256 hex digest", field) - } - if _, err := hex.DecodeString(value); err != nil { - return "", fmt.Errorf("%s must be a valid SHA-256 hex digest", field) - } - return value, nil -} - // SetConsoleURL sets the frontend URL for device auth verification links. func (s *Server) SetConsoleURL(url string) { s.consoleURL = url @@ -672,6 +665,7 @@ func (s *Server) revalidateConnectedProvidersAgainstRuntimePolicy() { semverLess(version, s.minProviderVersion): provider.RuntimeVerified = false provider.RuntimeManifestChecked = false + s.ddIncr("provider_version_below_minimum", []string{"gate:manifest_sync", "version:" + version}) default: runtimeOK, _ := s.verifyRuntimeHashesForBackend( backend, @@ -861,7 +855,7 @@ func (s *Server) verifyRuntimeHashesAgainstManifest(manifest *RuntimeManifest, p func (s *Server) handleRuntimeManifest(w http.ResponseWriter, r *http.Request) { const cacheKey = "runtime_manifest:v1" if cached, ok := s.readCache.Get(cacheKey); ok { - writeCachedJSON(w, http.StatusOK, cached) + writeCachedJSON(w, cached) return } var resp map[string]any @@ -881,7 +875,7 @@ func (s *Server) handleRuntimeManifest(w http.ResponseWriter, r *http.Request) { return } s.readCache.Set(cacheKey, body, time.Minute) - writeCachedJSON(w, http.StatusOK, body) + writeCachedJSON(w, body) } // HandleMDMWebhook processes a MicroMDM webhook callback. @@ -1101,6 +1095,12 @@ func (s *Server) registerDefaultGauges() { s.metrics.RegisterGauge("providers_online", func() float64 { return float64(s.registry.ProviderCount()) }) + s.metrics.RegisterGauge("min_provider_version_set", func() float64 { + if s.minProviderVersion != "" { + return 1 + } + return 0 + }) } // StartDDGaugeLoop periodically pushes gauge values to DogStatsD. Gauges @@ -1117,7 +1117,16 @@ func (s *Server) StartDDGaugeLoop(ctx context.Context) { case <-ctx.Done(): return case <-ticker.C: - s.ddGauge("providers.online", float64(s.registry.ProviderCount()), nil) + s.ddGauge("providers.online", float64(s.registry.OnlineCount()), nil) + for model, count := range s.registry.ModelProviderSnapshot() { + s.ddGauge("providers.per_model", float64(count), []string{"model:" + model}) + } + for ver, count := range s.registry.ProviderCountByVersion() { + s.ddGauge("providers.per_version", float64(count), []string{"version:" + ver}) + } + if s.minProviderVersion != "" { + s.ddGauge("coordinator.min_provider_version_set", 1, []string{"min_version:" + s.minProviderVersion}) + } if q := s.registry.Queue(); q != nil { s.ddGauge("request_queue.depth", float64(q.TotalSize()), nil) } @@ -1443,14 +1452,15 @@ func (s *Server) loggingMiddleware(next http.Handler) http.Handler { } // httpPathLabel returns a bounded label for HTTP metrics. -// We use the mux route pattern (e.g. "POST /v1/chat/completions") +// We use the mux route pattern (e.g. "POST-/v1/chat/completions") // instead of URL.Path so attacker-controlled unmatched paths cannot create -// unbounded metric cardinality. +// unbounded metric cardinality. Dashes replace spaces so DogStatsD tags +// parse cleanly (spaces break tag parsing). func httpPathLabel(route string) string { if route == "" { return "unmatched" } - return route + return strings.ReplaceAll(route, " ", "-") } // strconvItoa is a shim to avoid pulling strconv into every middleware file. diff --git a/coordinator/api/server_metrics_label_test.go b/coordinator/api/server_metrics_label_test.go index f2cb9083..24b2977f 100644 --- a/coordinator/api/server_metrics_label_test.go +++ b/coordinator/api/server_metrics_label_test.go @@ -8,7 +8,7 @@ func TestHTTPPathLabel_UsesBoundedRouteLabel(t *testing.T) { route string want string }{ - {name: "matched route", route: "POST /v1/chat/completions", want: "POST /v1/chat/completions"}, + {name: "matched route", route: "POST /v1/chat/completions", want: "POST-/v1/chat/completions"}, {name: "empty route", route: "", want: "unmatched"}, } diff --git a/coordinator/api/stats.go b/coordinator/api/stats.go index fbefbe4f..2034d567 100644 --- a/coordinator/api/stats.go +++ b/coordinator/api/stats.go @@ -15,7 +15,7 @@ import ( func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) { const cacheKey = "stats:v1" if cached, ok := s.readCache.Get(cacheKey); ok { - writeCachedJSON(w, http.StatusOK, cached) + writeCachedJSON(w, cached) return } var ( @@ -142,5 +142,5 @@ func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) { return } s.readCache.Set(cacheKey, body, time.Minute) - writeCachedJSON(w, http.StatusOK, body) + writeCachedJSON(w, body) } diff --git a/coordinator/datadog/datadog.go b/coordinator/datadog/datadog.go index e22d4e05..e9f4bfd5 100644 --- a/coordinator/datadog/datadog.go +++ b/coordinator/datadog/datadog.go @@ -147,6 +147,14 @@ func (c *Client) Incr(name string, tags []string) { _ = c.Statsd.Incr(name, tags, 1) } +// Count increments a DogStatsD counter by the given value. +func (c *Client) Count(name string, value int64, tags []string) { + if c == nil || c.Statsd == nil { + return + } + _ = c.Statsd.Count(name, value, tags, 1) +} + // Histogram records a histogram value. func (c *Client) Histogram(name string, value float64, tags []string) { if c == nil || c.Statsd == nil { diff --git a/coordinator/registry/registry.go b/coordinator/registry/registry.go index 77261cbb..aaa9dab1 100644 --- a/coordinator/registry/registry.go +++ b/coordinator/registry/registry.go @@ -25,6 +25,7 @@ import ( "math" "math/rand" "sync" + "sync/atomic" "time" "github.com/eigeninference/d-inference/coordinator/attestation" @@ -192,22 +193,22 @@ func providerSupportsPrivateTextLocked(p *Provider) bool { if !p.ChallengeVerifiedSIP { return false } + swiftRuntime := BackendUsesSwiftRuntime(p.Backend) caps := p.PrivacyCapabilities if caps == nil { return false } - // TextBackendInprocess, TextProxyDisabled, PythonRuntimeLocked, - // DangerousModulesBlocked, AntiDebugEnabled, CoreDumpsDisabled, EnvScrubbed - // remain provider-attested. They are gated by RuntimeManifestChecked - // (coordinator verifies the runtime binary hashes match known-good) and - // ChallengeVerifiedSIP (coordinator independently checks SIP status). - return caps.TextBackendInprocess && + base := caps.TextBackendInprocess && caps.TextProxyDisabled && - caps.PythonRuntimeLocked && - caps.DangerousModulesBlocked && caps.AntiDebugEnabled && caps.CoreDumpsDisabled && caps.EnvScrubbed + if swiftRuntime { + return base + } + return base && + caps.PythonRuntimeLocked && + caps.DangerousModulesBlocked } func privateTextBackendSupported(backend string) bool { @@ -365,32 +366,29 @@ type Registry struct { mu sync.RWMutex providers map[string]*Provider - // queue manages requests waiting for a provider to become available. queue *RequestQueue - // MinTrustLevel is the minimum trust level required for routing. - // Defaults to TrustHardware. Set to TrustNone for testing. MinTrustLevel TrustLevel - // modelCatalog maps active model IDs to their catalog metadata (including - // expected weight hashes). When non-empty, only models in this map are - // accepted from providers and routable by consumers. Updated via SetModelCatalog. modelCatalog map[string]CatalogEntry - // store provides persistence for provider fleet state. When non-nil, - // provider records and reputation are persisted across coordinator restarts. store store.Store logger *slog.Logger + + onlineCount atomic.Int64 + modelProviders map[string]*atomic.Int64 + modelProvidersMu sync.Mutex } // New creates a new Registry. func New(logger *slog.Logger) *Registry { return &Registry{ - providers: make(map[string]*Provider), - queue: NewRequestQueue(10, 120*time.Second), - MinTrustLevel: TrustHardware, - logger: logger, + providers: make(map[string]*Provider), + queue: NewRequestQueue(10, 120*time.Second), + MinTrustLevel: TrustHardware, + modelProviders: make(map[string]*atomic.Int64), + logger: logger, } } @@ -632,6 +630,24 @@ func (r *Registry) SetModelCatalog(entries []CatalogEntry) { r.modelCatalog = catalog } +// ModelType returns the model type string for the given model ID, or +// "unknown" if no provider is currently serving it. +func (r *Registry) ModelType(model string) string { + r.mu.RLock() + defer r.mu.RUnlock() + for _, p := range r.providers { + p.mu.Lock() + for _, m := range p.Models { + if m.ID == model && m.ModelType != "" { + p.mu.Unlock() + return m.ModelType + } + } + p.mu.Unlock() + } + return "unknown" +} + // IsModelInCatalog returns true if the model is in the active catalog, // or if no catalog is configured (all models allowed). func (r *Registry) IsModelInCatalog(model string) bool { @@ -854,6 +870,10 @@ func (r *Registry) Register(id string, conn *websocket.Conn, msg *protocol.Regis r.mu.Lock() r.providers[id] = p + r.onlineCount.Add(1) + for _, m := range models { + r.modelProviderInc(m.ID) + } r.mu.Unlock() r.logger.Info("provider registered", @@ -962,8 +982,13 @@ func (r *Registry) Heartbeat(id string, msg *protocol.HeartbeatMessage) { p.CurrentModel = *msg.ActiveModel } // Only update status from heartbeat if provider is not actively serving - // (serving status is managed by request lifecycle). - if p.Status != StatusServing || msg.Status == "idle" { + // (serving status is managed by request lifecycle). Crucially, an + // untrusted provider must NOT transition back to StatusOnline here — + // that would cause an onlineCount double-decrement when Disconnect + // later sees StatusOnline and decrements a second time. + if p.Status == StatusUntrusted { + // no status transitions allowed + } else if p.Status != StatusServing || msg.Status == "idle" { switch msg.Status { case "idle": p.Status = StatusOnline @@ -998,6 +1023,14 @@ func (r *Registry) Disconnect(id string) { p, ok := r.providers[id] if ok { delete(r.providers, id) + p.mu.Lock() + if p.Status != StatusUntrusted { + r.onlineCount.Add(-1) + for _, m := range p.Models { + r.modelProviderDec(m.ID) + } + } + p.mu.Unlock() } r.mu.Unlock() @@ -1035,16 +1068,23 @@ func (r *Registry) GetProvider(id string) *Provider { // receiving new jobs. This is called when a provider fails too many // challenge-response verifications. func (r *Registry) MarkUntrusted(providerID string) { - r.mu.RLock() + r.mu.Lock() p, ok := r.providers[providerID] - r.mu.RUnlock() if !ok { + r.mu.Unlock() return } p.mu.Lock() + if p.Status != StatusUntrusted { + r.onlineCount.Add(-1) + for _, m := range p.Models { + r.modelProviderDec(m.ID) + } + } p.Status = StatusUntrusted p.mu.Unlock() + r.mu.Unlock() r.logger.Warn("provider marked as untrusted", "provider_id", providerID, @@ -1601,12 +1641,110 @@ func (r *Registry) RecordJobFailure(providerID string) { } // ProviderCount returns the number of registered providers. +// modelProviderInc increments the provider count for a model. Must be called +// with r.mu held. +func (r *Registry) modelProviderInc(model string) { + r.modelProvidersMu.Lock() + c, ok := r.modelProviders[model] + if !ok { + c = &atomic.Int64{} + r.modelProviders[model] = c + } + r.modelProvidersMu.Unlock() + c.Add(1) +} + +// modelProviderDec decrements the provider count for a model. Must be called +// with r.mu held. +func (r *Registry) modelProviderDec(model string) { + r.modelProvidersMu.Lock() + c, ok := r.modelProviders[model] + r.modelProvidersMu.Unlock() + if ok { + v := c.Add(-1) + if v <= 0 { + r.modelProvidersMu.Lock() + delete(r.modelProviders, model) + r.modelProvidersMu.Unlock() + } + } +} + +// OnlineCount returns the number of online providers. +func (r *Registry) OnlineCount() int64 { + return r.onlineCount.Load() +} + +// ModelProviderSnapshot returns a snapshot of model_id -> provider count. +func (r *Registry) ModelProviderSnapshot() map[string]int64 { + r.modelProvidersMu.Lock() + snap := make(map[string]int64, len(r.modelProviders)) + for model, c := range r.modelProviders { + if v := c.Load(); v > 0 { + snap[model] = v + } + } + r.modelProvidersMu.Unlock() + return snap +} + +// ProviderCountByChip returns a map of chip_name -> count of online providers. +func (r *Registry) ProviderCountByChip() map[string]int { + r.mu.RLock() + defer r.mu.RUnlock() + counts := make(map[string]int) + for _, p := range r.providers { + p.mu.Lock() + online := p.Status != StatusOffline && p.Status != StatusUntrusted + p.mu.Unlock() + if online { + chip := p.Hardware.ChipName + if chip == "" { + chip = "unknown" + } + counts[chip]++ + } + } + return counts +} + +// ModelProviderCounts returns a map of model_id -> count of online providers +// serving that model. +func (r *Registry) ModelProviderCounts() map[string]int { + snap := r.ModelProviderSnapshot() + out := make(map[string]int, len(snap)) + for k, v := range snap { + out[k] = int(v) + } + return out +} + func (r *Registry) ProviderCount() int { r.mu.RLock() defer r.mu.RUnlock() return len(r.providers) } +func (r *Registry) ProviderCountByVersion() map[string]int { + r.mu.RLock() + defer r.mu.RUnlock() + counts := make(map[string]int) + for _, p := range r.providers { + p.mu.Lock() + online := p.Status != StatusOffline && p.Status != StatusUntrusted + p.mu.Unlock() + if !online { + continue + } + ver := p.Version + if ver == "" { + ver = "unknown" + } + counts[ver]++ + } + return counts +} + // FleetSnapshot is the read-only summary used by metrics polling. We // don't lock individual providers — counts may be off-by-one under // heavy churn — that's acceptable for gauges. diff --git a/coordinator/registry/registry_test.go b/coordinator/registry/registry_test.go index 6062f00a..79cfc046 100644 --- a/coordinator/registry/registry_test.go +++ b/coordinator/registry/registry_test.go @@ -171,6 +171,67 @@ func TestProviderWithoutChallengeVerifiedSIPExcluded(t *testing.T) { } } +func TestSwiftProviderPrivateTextWithoutPythonCaps(t *testing.T) { + reg := New(testLogger()) + msg := testRegisterMessage() + msg.Backend = BackendMLXSwift + msg.PrivacyCapabilities.PythonRuntimeLocked = false + msg.PrivacyCapabilities.DangerousModulesBlocked = false + + p := reg.Register("p-swift-nopython", nil, msg) + testMakeTextRoutable(p) + + if !providerSupportsPrivateTextLocked(p) { + t.Fatal("Swift provider should support private text without PythonRuntimeLocked/DangerousModulesBlocked") + } + + found := reg.FindProvider("mlx-community/Qwen3.5-9B-Instruct-4bit") + if found == nil { + t.Fatal("Swift provider without Python caps should be routable for text models") + } +} + +func TestPythonProviderRequiresPythonCaps(t *testing.T) { + reg := New(testLogger()) + msg := testRegisterMessage() + msg.Backend = BackendInprocessMLX + msg.PrivacyCapabilities.PythonRuntimeLocked = false + msg.PrivacyCapabilities.DangerousModulesBlocked = false + + p := reg.Register("p-python-nocaps", nil, msg) + testMakeTextRoutable(p) + + if providerSupportsPrivateTextLocked(p) { + t.Fatal("Python (inprocess-mlx) provider without PythonRuntimeLocked/DangerousModulesBlocked should NOT support private text") + } + + found := reg.FindProvider("mlx-community/Qwen3.5-9B-Instruct-4bit") + if found != nil { + t.Fatal("Python provider without Python caps should not be routable for text models") + } +} + +func TestSwiftProviderMissingBaseCapsExcluded(t *testing.T) { + reg := New(testLogger()) + msg := testRegisterMessage() + msg.Backend = BackendMLXSwift + msg.PrivacyCapabilities.PythonRuntimeLocked = false + msg.PrivacyCapabilities.DangerousModulesBlocked = false + msg.PrivacyCapabilities.AntiDebugEnabled = false + + p := reg.Register("p-swift-no-antidebug", nil, msg) + testMakeTextRoutable(p) + + if providerSupportsPrivateTextLocked(p) { + t.Fatal("Swift provider without AntiDebugEnabled should NOT support private text") + } + + found := reg.FindProvider("mlx-community/Qwen3.5-9B-Instruct-4bit") + if found != nil { + t.Fatal("Swift provider without base privacy caps should not be routable") + } +} + func TestProviderPartialPrivacyCapsExcluded(t *testing.T) { reg := New(testLogger()) msg := testRegisterMessage() @@ -721,6 +782,42 @@ func TestChallengeFailureThreshold(t *testing.T) { } } +func TestHeartbeatDoesNotReviveUntrusted(t *testing.T) { + reg := New(testLogger()) + msg := testRegisterMessage() + reg.Register("p1", nil, msg) + + if reg.OnlineCount() != 1 { + t.Fatalf("OnlineCount = %d, want 1 after register", reg.OnlineCount()) + } + + reg.MarkUntrusted("p1") + if reg.OnlineCount() != 0 { + t.Errorf("OnlineCount = %d, want 0 after MarkUntrusted", reg.OnlineCount()) + } + + p := reg.GetProvider("p1") + if p.Status != StatusUntrusted { + t.Fatalf("status = %q, want %q", p.Status, StatusUntrusted) + } + + // Heartbeat with idle status must not revive an untrusted provider + reg.Heartbeat("p1", &protocol.HeartbeatMessage{Status: "idle"}) + p = reg.GetProvider("p1") + if p.Status != StatusUntrusted { + t.Errorf("status = %q after heartbeat, want %q (untrusted must not revive)", p.Status, StatusUntrusted) + } + if reg.OnlineCount() != 0 { + t.Errorf("OnlineCount = %d after heartbeat on untrusted, want 0", reg.OnlineCount()) + } + + // Disconnect should NOT decrement again (no double-decrement) + reg.Disconnect("p1") + if reg.OnlineCount() != 0 { + t.Errorf("OnlineCount = %d after disconnect, want 0 (no double-decrement)", reg.OnlineCount()) + } +} + // --- scoring tests --- func TestScoringHigherDecodeTPS(t *testing.T) { @@ -1746,6 +1843,41 @@ func TestModelCatalogFilterOnRegister(t *testing.T) { } } +func TestModelTypeIncludesUntrusted(t *testing.T) { + reg := New(testLogger()) + reg.MinTrustLevel = TrustNone + + msg := &protocol.RegisterMessage{ + Type: protocol.TypeRegister, + Hardware: testRegisterMessage().Hardware, + Models: []protocol.ModelInfo{ + {ID: "model-a", SizeBytes: 1000, ModelType: "text", Quantization: "4bit"}, + {ID: "model-b", SizeBytes: 2000, ModelType: "image", Quantization: "8bit"}, + }, + Backend: "vllm_mlx", + } + p := reg.Register("p1", nil, msg) + + if got := reg.ModelType("model-a"); got != "text" { + t.Errorf("ModelType(model-a) = %q, want %q", got, "text") + } + if got := reg.ModelType("model-b"); got != "image" { + t.Errorf("ModelType(model-b) = %q, want %q", got, "image") + } + + reg.MarkUntrusted(p.ID) + + if got := reg.ModelType("model-a"); got != "text" { + t.Errorf("ModelType(model-a) after untrusted = %q, want %q", got, "text") + } + if got := reg.ModelType("model-b"); got != "image" { + t.Errorf("ModelType(model-b) after untrusted = %q, want %q", got, "image") + } + if got := reg.ModelType("nonexistent"); got != "unknown" { + t.Errorf("ModelType(nonexistent) = %q, want %q", got, "unknown") + } +} + func TestModelCatalogFilterOnRegisterNoCatalog(t *testing.T) { reg := New(testLogger()) reg.MinTrustLevel = TrustNone diff --git a/coordinator/registry/scheduler.go b/coordinator/registry/scheduler.go index 00dac4a1..b408cf1c 100644 --- a/coordinator/registry/scheduler.go +++ b/coordinator/registry/scheduler.go @@ -181,7 +181,7 @@ func (r *Registry) ReserveProviderEx(model string, pr *PendingRequest, excludeID // Re-check capacity under the provider lock in case another goroutine // changed the pending set between snapshot and reservation. - if !r.providerCanAdmitLocked(p, model, pr) { + if !r.providerCanAdmitLocked(p, model) { return nil, RoutingDecision{ Model: model, CandidateCount: candidateCount, @@ -249,7 +249,7 @@ func (r *Registry) selectBestCandidateLockedFull(model string, pr *PendingReques if _, excluded := excludeSet[p.ID]; excluded { continue } - snap, ok := r.snapshotProviderLocked(p, model, pr) + snap, ok := r.snapshotProviderLocked(p, model) if !ok { continue } @@ -355,7 +355,7 @@ func (r *Registry) logRoutingDecision(model string, pr *PendingRequest, winner * ) } -func (r *Registry) snapshotProviderLocked(p *Provider, model string, pr *PendingRequest) (routingSnapshot, bool) { +func (r *Registry) snapshotProviderLocked(p *Provider, model string) (routingSnapshot, bool) { now := time.Now() p.mu.Lock() @@ -643,7 +643,7 @@ func providerModelIDs(p *Provider) []string { return ids } -func (r *Registry) providerCanAdmitLocked(p *Provider, model string, pr *PendingRequest) bool { +func (r *Registry) providerCanAdmitLocked(p *Provider, model string) bool { if p.Status == StatusOffline || p.Status == StatusUntrusted { return false } diff --git a/deploy/datadog/dev-network-dashboard.json b/deploy/datadog/dev-network-dashboard.json index bc518947..95aca5fd 100644 --- a/deploy/datadog/dev-network-dashboard.json +++ b/deploy/datadog/dev-network-dashboard.json @@ -1,6 +1,6 @@ { - "title": "d-inference Dev", - "description": "Log-first operational dashboard for the d-inference dev coordinator. Provider telemetry is forwarded through the coordinator and indexed with source:provider; regular coordinator runtime logs are indexed with kind:coordinator_log.", + "title": "Darkbloom Observability", + "description": "Full observability dashboard for the d-inference dev coordinator. Metrics via DogStatsD (d_inference.*), logs via DD Agent journald collection + direct Logs API, traces via DD Agent APM, and system metrics from the host-level agent.", "layout_type": "ordered", "notify_list": [], "template_variables": [ @@ -13,13 +13,18 @@ "name": "service", "prefix": "service", "default": "d-inference-coordinator" + }, + { + "name": "model", + "prefix": "model", + "default": "*" } ], "widgets": [ { "definition": { "type": "note", - "content": "Dev d-inference observability. This dashboard is log-backed so it works without a Datadog Agent on the VM. Queries use `$env $service`. Provider telemetry appears as `source:provider`; coordinator runtime logs appear as `source:coordinator kind:coordinator_log`.", + "content": "## Darkbloom Observability\nMetrics: `d_inference.*` via DogStatsD (agent on 8125) | Logs: journald + direct Logs API | Traces: APM agent on 8126 | System: host-level DD agent checks\n\nTemplate vars: `$env`, `$service`, `$model`. Provider telemetry = `source:provider`; coordinator events = `source:coordinator`.", "background_color": "blue", "font_size": "14", "text_align": "left", @@ -28,316 +33,1443 @@ }, { "definition": { - "type": "query_value", - "title": "All Dev Logs", - "autoscale": true, - "precision": 0, - "requests": [ - { - "response_format": "scalar", - "queries": [ - { - "name": "query1", - "data_source": "logs", - "search": { - "query": "$env $service" - }, - "indexes": [ - "*" - ], - "compute": { - "aggregation": "count" - } - } - ], - "formulas": [ - { - "formula": "query1" - } - ] + "type": "group", + "title": "Overview", + "layout_type": "ordered", + "show_title": true, + "widgets": [ + { + "definition": { + "type": "timeseries", + "title": "Providers Online", + "show_legend": true, + "legend_layout": "auto", + "requests": [ + { + "response_format": "timeseries", + "queries": [ + { + "name": "query1", + "data_source": "metrics", + "query": "avg:d_inference.providers.online{$env}" + } + ], + "formulas": [ + { + "formula": "query1" + } + ] + } + ] + } + }, + { + "definition": { + "type": "timeseries", + "title": "Queue Depth", + "show_legend": true, + "legend_layout": "auto", + "requests": [ + { + "response_format": "timeseries", + "queries": [ + { + "name": "query1", + "data_source": "metrics", + "query": "avg:d_inference.request_queue.depth{$env}" + } + ], + "formulas": [ + { + "formula": "query1" + } + ] + } + ] + } + }, + { + "definition": { + "type": "query_table", + "title": "Providers per Model", + "has_search_bar": "auto", + "requests": [ + { + "response_format": "scalar", + "queries": [ + { + "name": "query1", + "data_source": "metrics", + "query": "avg:d_inference.providers.per_model{$env} by {model}", + "aggregator": "last" + } + ], + "formulas": [ + { + "formula": "query1" + } + ] + } + ] + } + }, + { + "definition": { + "type": "query_value", + "title": "All Dev Logs", + "autoscale": true, + "precision": 0, + "requests": [ + { + "response_format": "scalar", + "queries": [ + { + "name": "query1", + "data_source": "logs", + "search": { + "query": "$env $service" + }, + "indexes": ["*"], + "compute": { + "aggregation": "count" + } + } + ], + "formulas": [ + { + "formula": "query1" + } + ] + } + ] + } + }, + { + "definition": { + "type": "query_value", + "title": "Warnings + Errors", + "autoscale": true, + "precision": 0, + "requests": [ + { + "response_format": "scalar", + "queries": [ + { + "name": "query1", + "data_source": "logs", + "search": { + "query": "$env $service (severity:warn OR severity:error OR severity:fatal OR status:warn OR status:error)" + }, + "indexes": ["*"], + "compute": { + "aggregation": "count" + } + } + ], + "formulas": [ + { + "formula": "query1" + } + ] + } + ] + } } ] } }, { "definition": { - "type": "query_value", - "title": "Provider Telemetry", - "autoscale": true, - "precision": 0, - "requests": [ - { - "response_format": "scalar", - "queries": [ - { - "name": "query1", - "data_source": "logs", - "search": { - "query": "$env $service source:provider" - }, - "indexes": [ - "*" - ], - "compute": { - "aggregation": "count" - } - } - ], - "formulas": [ - { - "formula": "query1" - } - ] + "type": "group", + "title": "Inference & Request Flow", + "layout_type": "ordered", + "show_title": true, + "widgets": [ + { + "definition": { + "type": "timeseries", + "title": "Inference Dispatches by Status", + "show_legend": true, + "legend_layout": "auto", + "requests": [ + { + "response_format": "timeseries", + "queries": [ + { + "name": "query1", + "data_source": "metrics", + "query": "sum:d_inference.inference.dispatches{$env} by {status}.as_count()" + } + ], + "formulas": [ + { + "formula": "query1" + } + ], + "display_type": "bars" + } + ] + } + }, + { + "definition": { + "type": "timeseries", + "title": "HTTP Request Latency by Route (p50/p95/p99)", + "show_legend": true, + "legend_layout": "auto", + "requests": [ + { + "response_format": "timeseries", + "queries": [ + { + "name": "p50", + "data_source": "metrics", + "query": "avg:d_inference.http.latency_ms.50percentile{$env} by {path}" + }, + { + "name": "p95", + "data_source": "metrics", + "query": "avg:d_inference.http.latency_ms.95percentile{$env} by {path}" + }, + { + "name": "p99", + "data_source": "metrics", + "query": "avg:d_inference.http.latency_ms.99percentile{$env} by {path}" + } + ], + "formulas": [ + { + "formula": "p50" + }, + { + "formula": "p95" + }, + { + "formula": "p99" + } + ], + "style": { + "palette": "warm" + } + } + ] + } + }, + { + "definition": { + "type": "timeseries", + "title": "HTTP Requests by Route", + "show_legend": true, + "legend_layout": "auto", + "requests": [ + { + "response_format": "timeseries", + "queries": [ + { + "name": "query1", + "data_source": "metrics", + "query": "sum:d_inference.http.requests{$env} by {path}.as_count()" + } + ], + "formulas": [ + { + "formula": "query1" + } + ], + "display_type": "bars" + } + ] + } + }, + { + "definition": { + "type": "timeseries", + "title": "Routing Decisions by Outcome", + "show_legend": true, + "legend_layout": "auto", + "requests": [ + { + "response_format": "timeseries", + "queries": [ + { + "name": "query1", + "data_source": "metrics", + "query": "sum:d_inference.routing.decisions{$env} by {outcome}.as_count()" + } + ], + "formulas": [ + { + "formula": "query1" + } + ], + "display_type": "bars" + } + ] + } + }, + { + "definition": { + "type": "timeseries", + "title": "Completion Tokens by Model", + "show_legend": true, + "legend_layout": "auto", + "requests": [ + { + "response_format": "timeseries", + "queries": [ + { + "name": "query1", + "data_source": "metrics", + "query": "sum:d_inference.inference.completion_tokens_total{$env,model:$model}.as_count() by {model}" + } + ], + "formulas": [ + { + "formula": "query1" + } + ], + "display_type": "bars" + } + ] + } + }, + { + "definition": { + "type": "timeseries", + "title": "Input Tokens by Model", + "show_legend": true, + "legend_layout": "auto", + "requests": [ + { + "response_format": "timeseries", + "queries": [ + { + "name": "query1", + "data_source": "metrics", + "query": "sum:d_inference.inference.prompt_tokens_total{$env,model:$model}.as_count() by {model}" + } + ], + "formulas": [ + { + "formula": "query1" + } + ], + "display_type": "bars" + } + ] + } + }, + { + "definition": { + "type": "timeseries", + "title": "Queue Timeouts by Model", + "show_legend": true, + "legend_layout": "auto", + "requests": [ + { + "response_format": "timeseries", + "queries": [ + { + "name": "query1", + "data_source": "metrics", + "query": "sum:d_inference.request_queue.timeout{$env,model:$model}.as_count()" + } + ], + "formulas": [ + { + "formula": "query1" + } + ], + "style": { + "palette": "warm" + } + } + ] + } + }, + { + "definition": { + "type": "timeseries", + "title": "Routing Cost (p95) by Provider", + "show_legend": true, + "legend_layout": "auto", + "requests": [ + { + "response_format": "timeseries", + "queries": [ + { + "name": "query1", + "data_source": "metrics", + "query": "avg:d_inference.routing.cost_ms.95percentile{$env,model:$model} by {provider_id}" + } + ], + "formulas": [ + { + "formula": "query1" + } + ] + } + ] + } + }, + { + "definition": { + "type": "sunburst", + "title": "Requests by Model Type", + "requests": [ + { + "response_format": "scalar", + "queries": [ + { + "name": "query1", + "data_source": "metrics", + "query": "sum:d_inference.routing.decisions{$env} by {model_type}.as_count()" + } + ], + "formulas": [ + { + "formula": "query1" + } + ] + } + ] + } } ] } }, { "definition": { - "type": "query_value", - "title": "Warnings + Errors", - "autoscale": true, - "precision": 0, - "requests": [ - { - "response_format": "scalar", - "queries": [ - { - "name": "query1", - "data_source": "logs", - "search": { - "query": "$env $service (severity:warn OR severity:error OR severity:fatal OR status:warn OR status:error)" - }, - "indexes": [ - "*" - ], - "compute": { - "aggregation": "count" - } - } - ], - "formulas": [ - { - "formula": "query1" - } - ] + "type": "group", + "title": "Attestation & Security", + "layout_type": "ordered", + "show_title": true, + "widgets": [ + { + "definition": { + "type": "timeseries", + "title": "Attestation Challenges Sent", + "show_legend": true, + "legend_layout": "auto", + "requests": [ + { + "response_format": "timeseries", + "queries": [ + { + "name": "query1", + "data_source": "metrics", + "query": "sum:d_inference.attestation.challenges_sent{$env}.as_count()" + } + ], + "formulas": [ + { + "formula": "query1" + } + ], + "display_type": "bars" + } + ] + } + }, + { + "definition": { + "type": "timeseries", + "title": "Attestation Challenge Outcomes", + "show_legend": true, + "legend_layout": "auto", + "requests": [ + { + "response_format": "timeseries", + "queries": [ + { + "name": "query1", + "data_source": "metrics", + "query": "sum:d_inference.attestation.challenges{$env} by {outcome}.as_count()" + } + ], + "formulas": [ + { + "formula": "query1" + } + ], + "display_type": "bars" + } + ] + } + }, + { + "definition": { + "type": "timeseries", + "title": "Attestation Failures by Reason", + "show_legend": true, + "legend_layout": "auto", + "requests": [ + { + "response_format": "timeseries", + "queries": [ + { + "name": "query1", + "data_source": "metrics", + "query": "sum:d_inference.attestation.failures{$env} by {reason}.as_count()" + } + ], + "formulas": [ + { + "formula": "query1" + } + ], + "style": { + "palette": "warm" + } + } + ] + } + }, + { + "definition": { + "type": "timeseries", + "title": "Provider Registrations by Trust Level", + "show_legend": true, + "legend_layout": "auto", + "requests": [ + { + "response_format": "timeseries", + "queries": [ + { + "name": "query1", + "data_source": "metrics", + "query": "sum:d_inference.providers.registrations{$env} by {trust_level}.as_count()" + } + ], + "formulas": [ + { + "formula": "query1" + } + ], + "display_type": "bars" + } + ] + } } ] } }, { "definition": { - "type": "timeseries", - "title": "Log Volume by Source", - "show_legend": true, - "legend_layout": "auto", - "requests": [ - { - "response_format": "timeseries", - "queries": [ - { - "name": "query1", - "data_source": "logs", - "search": { - "query": "$env $service" - }, - "indexes": [ - "*" - ], - "compute": { - "aggregation": "count" - }, - "group_by": [ - { - "facet": "source", - "limit": 10, - "sort": { - "aggregation": "count", - "order": "desc" + "type": "group", + "title": "Rate Limiting & Errors", + "layout_type": "ordered", + "show_title": true, + "widgets": [ + { + "definition": { + "type": "timeseries", + "title": "Rate Limit Rejections by Tier", + "show_legend": true, + "legend_layout": "auto", + "requests": [ + { + "response_format": "timeseries", + "queries": [ + { + "name": "query1", + "data_source": "metrics", + "query": "sum:d_inference.ratelimit.rejections{$env} by {tier}.as_count()" + } + ], + "formulas": [ + { + "formula": "query1" } + ], + "style": { + "palette": "warm" } - ] - } - ], - "formulas": [ - { - "formula": "query1" - } - ], - "display_type": "bars" + } + ] + } + }, + { + "definition": { + "type": "timeseries", + "title": "WebSocket Disconnects by Reason", + "show_legend": true, + "legend_layout": "auto", + "requests": [ + { + "response_format": "timeseries", + "queries": [ + { + "name": "query1", + "data_source": "metrics", + "query": "sum:d_inference.ws.disconnects{$env} by {reason}.as_count()" + } + ], + "formulas": [ + { + "formula": "query1" + } + ] + } + ] + } + }, + { + "definition": { + "type": "timeseries", + "title": "Telemetry Events Ingested by Source & Severity", + "show_legend": true, + "legend_layout": "auto", + "requests": [ + { + "response_format": "timeseries", + "queries": [ + { + "name": "query1", + "data_source": "metrics", + "query": "sum:d_inference.telemetry.events_ingested{$env} by {source,severity}.as_count()" + } + ], + "formulas": [ + { + "formula": "query1" + } + ], + "display_type": "bars" + } + ] + } } ] } }, { "definition": { - "type": "timeseries", - "title": "Warnings + Errors by Kind", - "show_legend": true, - "legend_layout": "auto", - "requests": [ - { - "response_format": "timeseries", - "queries": [ - { - "name": "query1", - "data_source": "logs", - "search": { - "query": "$env $service (severity:warn OR severity:error OR severity:fatal OR status:warn OR status:error)" - }, - "indexes": [ - "*" - ], - "compute": { - "aggregation": "count" - }, - "group_by": [ - { - "facet": "kind", - "limit": 10, - "sort": { - "aggregation": "count", - "order": "desc" + "type": "group", + "title": "Logs", + "layout_type": "ordered", + "show_title": true, + "widgets": [ + { + "definition": { + "type": "timeseries", + "title": "Log Volume by Source", + "show_legend": true, + "legend_layout": "auto", + "requests": [ + { + "response_format": "timeseries", + "queries": [ + { + "name": "query1", + "data_source": "logs", + "search": { + "query": "$env $service" + }, + "indexes": ["*"], + "compute": { + "aggregation": "count" + }, + "group_by": [ + { + "facet": "source", + "limit": 10, + "sort": { + "aggregation": "count", + "order": "desc" + } + } + ] } - } - ] - } - ], - "formulas": [ - { - "formula": "query1" - } - ], - "display_type": "bars" + ], + "formulas": [ + { + "formula": "query1" + } + ], + "display_type": "bars" + } + ] + } + }, + { + "definition": { + "type": "timeseries", + "title": "Warnings + Errors by Kind", + "show_legend": true, + "legend_layout": "auto", + "requests": [ + { + "response_format": "timeseries", + "queries": [ + { + "name": "query1", + "data_source": "logs", + "search": { + "query": "$env $service (severity:warn OR severity:error OR severity:fatal OR status:warn OR status:error)" + }, + "indexes": ["*"], + "compute": { + "aggregation": "count" + }, + "group_by": [ + { + "facet": "kind", + "limit": 10, + "sort": { + "aggregation": "count", + "order": "desc" + } + } + ] + } + ], + "formulas": [ + { + "formula": "query1" + } + ], + "display_type": "bars" + } + ] + } + }, + { + "definition": { + "type": "list_stream", + "title": "Provider Telemetry Events", + "requests": [ + { + "query": { + "data_source": "logs_stream", + "query_string": "$env $service source:provider" + }, + "response_format": "event_list", + "columns": [ + { + "field": "timestamp", + "width": "auto" + }, + { + "field": "status", + "width": "auto" + }, + { + "field": "message", + "width": "compact" + } + ] + } + ] + } + }, + { + "definition": { + "type": "list_stream", + "title": "Coordinator Warnings + Errors", + "requests": [ + { + "query": { + "data_source": "logs_stream", + "query_string": "$env $service source:coordinator (severity:warn OR severity:error OR severity:fatal OR status:warn OR status:error)" + }, + "response_format": "event_list", + "columns": [ + { + "field": "timestamp", + "width": "auto" + }, + { + "field": "status", + "width": "auto" + }, + { + "field": "message", + "width": "compact" + } + ] + } + ] + } + }, + { + "definition": { + "type": "list_stream", + "title": "All Recent Dev Logs", + "requests": [ + { + "query": { + "data_source": "logs_stream", + "query_string": "$env $service" + }, + "response_format": "event_list", + "columns": [ + { + "field": "timestamp", + "width": "auto" + }, + { + "field": "status", + "width": "auto" + }, + { + "field": "message", + "width": "compact" + } + ] + } + ] + } } ] } }, { "definition": { - "type": "list_stream", - "title": "Provider Telemetry Events", - "requests": [ - { - "query": { - "data_source": "logs_stream", - "query_string": "$env $service source:provider" - }, - "response_format": "event_list", - "columns": [ - { - "field": "timestamp", - "width": "auto" - }, - { - "field": "status", - "width": "auto" - }, - { - "field": "message", - "width": "compact" - } - ] + "type": "group", + "title": "APM / Traces", + "layout_type": "ordered", + "show_title": true, + "widgets": [ + { + "definition": { + "type": "timeseries", + "title": "Trace Hits by Operation", + "show_legend": true, + "legend_layout": "auto", + "requests": [ + { + "response_format": "timeseries", + "queries": [ + { + "name": "query1", + "data_source": "metrics", + "query": "sum:trace.http.request.hits{$env} by {operation_name}.as_count()" + } + ], + "formulas": [ + { + "formula": "query1" + } + ], + "display_type": "bars" + } + ] + } + }, + { + "definition": { + "type": "timeseries", + "title": "Trace Latency (p50/p95/p99)", + "show_legend": true, + "legend_layout": "auto", + "requests": [ + { + "response_format": "timeseries", + "queries": [ + { + "name": "p50", + "data_source": "metrics", + "query": "avg:trace.http.request.latency.50percentile{$env}" + }, + { + "name": "p95", + "data_source": "metrics", + "query": "avg:trace.http.request.latency.95percentile{$env}" + }, + { + "name": "p99", + "data_source": "metrics", + "query": "avg:trace.http.request.latency.99percentile{$env}" + } + ], + "formulas": [ + { + "formula": "p50" + }, + { + "formula": "p95" + }, + { + "formula": "p99" + } + ], + "style": { + "palette": "warm" + } + } + ] + } + }, + { + "definition": { + "type": "query_value", + "title": "Trace Errors", + "autoscale": true, + "precision": 0, + "requests": [ + { + "response_format": "scalar", + "queries": [ + { + "name": "query1", + "data_source": "metrics", + "query": "sum:trace.http.request.errors{$env}.as_count()" + } + ], + "formulas": [ + { + "formula": "query1" + } + ] + } + ] + } + }, + { + "definition": { + "type": "toplist", + "title": "Slowest Operations (p99)", + "requests": [ + { + "response_format": "scalar", + "queries": [ + { + "name": "query1", + "data_source": "metrics", + "query": "avg:trace.http.request.latency.99percentile{$env} by {operation_name}" + } + ], + "formulas": [ + { + "formula": "query1" + } + ] + } + ] + } } ] } }, { "definition": { - "type": "list_stream", - "title": "Coordinator Warnings + Errors", - "requests": [ - { - "query": { - "data_source": "logs_stream", - "query_string": "$env $service source:coordinator (severity:warn OR severity:error OR severity:fatal OR status:warn OR status:error)" - }, - "response_format": "event_list", - "columns": [ - { - "field": "timestamp", - "width": "auto" + "type": "group", + "title": "System Metrics (Host)", + "layout_type": "ordered", + "show_title": true, + "widgets": [ + { + "definition": { + "type": "timeseries", + "title": "CPU Usage", + "show_legend": true, + "legend_layout": "auto", + "yaxis": { + "label": "%", + "min": "0", + "max": "100" }, - { - "field": "status", - "width": "auto" + "requests": [ + { + "response_format": "timeseries", + "queries": [ + { + "name": "idle", + "data_source": "metrics", + "query": "avg:system.cpu.idle{host:d-inference-dev}" + } + ], + "formulas": [ + { + "formula": "100 - idle", + "limit": { + "count": 1, + "order": "desc" + } + } + ], + "style": { + "palette": "cool" + } + } + ] + } + }, + { + "definition": { + "type": "timeseries", + "title": "Memory Usage", + "show_legend": true, + "legend_layout": "auto", + "requests": [ + { + "response_format": "timeseries", + "queries": [ + { + "name": "used", + "data_source": "metrics", + "query": "avg:system.mem.used{host:d-inference-dev}" + }, + { + "name": "total", + "data_source": "metrics", + "query": "avg:system.mem.total{host:d-inference-dev}" + } + ], + "formulas": [ + { + "formula": "used" + }, + { + "formula": "total" + } + ] + } + ] + } + }, + { + "definition": { + "type": "timeseries", + "title": "Disk Usage", + "show_legend": true, + "legend_layout": "auto", + "yaxis": { + "label": "%", + "min": "0", + "max": "100" }, - { - "field": "message", - "width": "compact" - } - ] + "requests": [ + { + "response_format": "timeseries", + "queries": [ + { + "name": "query1", + "data_source": "metrics", + "query": "avg:system.disk.in_use{host:d-inference-dev} by {device}" + } + ], + "formulas": [ + { + "formula": "query1 * 100" + } + ] + } + ] + } + }, + { + "definition": { + "type": "timeseries", + "title": "Network I/O", + "show_legend": true, + "legend_layout": "auto", + "requests": [ + { + "response_format": "timeseries", + "queries": [ + { + "name": "rcvd", + "data_source": "metrics", + "query": "avg:system.net.bytes_rcvd{host:d-inference-dev} by {device}" + }, + { + "name": "sent", + "data_source": "metrics", + "query": "avg:system.net.bytes_sent{host:d-inference-dev} by {device}" + } + ], + "formulas": [ + { + "formula": "rcvd" + }, + { + "formula": "sent" + } + ] + } + ] + } + }, + { + "definition": { + "type": "query_value", + "title": "Load Average (1m)", + "autoscale": true, + "precision": 2, + "requests": [ + { + "response_format": "scalar", + "queries": [ + { + "name": "query1", + "data_source": "metrics", + "query": "avg:system.load.1{host:d-inference-dev}" + } + ], + "formulas": [ + { + "formula": "query1" + } + ] + } + ] + } } ] } }, { "definition": { - "type": "list_stream", - "title": "Recent Coordinator Runtime Logs", - "requests": [ - { - "query": { - "data_source": "logs_stream", - "query_string": "$env $service source:coordinator kind:coordinator_log" - }, - "response_format": "event_list", - "columns": [ - { - "field": "timestamp", - "width": "auto" - }, - { - "field": "status", - "width": "auto" - }, - { - "field": "message", - "width": "compact" - } - ] + "type": "group", + "title": "Billing & Store", + "layout_type": "ordered", + "show_title": true, + "widgets": [ + { + "definition": { + "type": "timeseries", + "title": "Reservation Amount (p50/p95/p99) [¢]", + "show_legend": true, + "legend_layout": "auto", + "requests": [ + { + "response_format": "timeseries", + "queries": [ + { + "name": "p50", + "data_source": "metrics", + "query": "avg:d_inference.billing.reserved_micro_usd.50percentile{$env,model:$model} by {model}" + }, + { + "name": "p95", + "data_source": "metrics", + "query": "avg:d_inference.billing.reserved_micro_usd.95percentile{$env,model:$model} by {model}" + }, + { + "name": "p99", + "data_source": "metrics", + "query": "avg:d_inference.billing.reserved_micro_usd.99percentile{$env,model:$model} by {model}" + } + ], + "formulas": [ + { "formula": "p50 / 10000" }, + { "formula": "p95 / 10000" }, + { "formula": "p99 / 10000" } + ], + "style": { "palette": "warm" } + } + ] + } + }, + { + "definition": { + "type": "timeseries", + "title": "Settlement Refund Amount (p50/p95/p99) [¢]", + "show_legend": true, + "legend_layout": "auto", + "requests": [ + { + "response_format": "timeseries", + "queries": [ + { + "name": "p50", + "data_source": "metrics", + "query": "avg:d_inference.billing.settlement_refund_micro_usd.50percentile{$env,model:$model} by {model}" + }, + { + "name": "p95", + "data_source": "metrics", + "query": "avg:d_inference.billing.settlement_refund_micro_usd.95percentile{$env,model:$model} by {model}" + }, + { + "name": "p99", + "data_source": "metrics", + "query": "avg:d_inference.billing.settlement_refund_micro_usd.99percentile{$env,model:$model} by {model}" + } + ], + "formulas": [ + { "formula": "p50 / 10000" }, + { "formula": "p95 / 10000" }, + { "formula": "p99 / 10000" } + ], + "style": { "palette": "cool" } + } + ] + } + }, + { + "definition": { + "type": "timeseries", + "title": "Provider Credits by Model & Type [¢]", + "show_legend": true, + "legend_layout": "auto", + "requests": [ + { + "response_format": "timeseries", + "queries": [ + { + "name": "query1", + "data_source": "metrics", + "query": "sum:d_inference.billing.provider_credits_micro_usd{$env,model:$model} by {model,type}.as_count()" + } + ], + "formulas": [ + { "formula": "query1 / 10000" } + ], + "display_type": "bars" + } + ] + } + }, + { + "definition": { + "type": "timeseries", + "title": "Platform Fees by Model [¢]", + "show_legend": true, + "legend_layout": "auto", + "requests": [ + { + "response_format": "timeseries", + "queries": [ + { + "name": "query1", + "data_source": "metrics", + "query": "sum:d_inference.billing.platform_fees_micro_usd{$env,model:$model} by {model}.as_count()" + } + ], + "formulas": [ + { "formula": "query1 / 10000" } + ], + "display_type": "bars" + } + ] + } + }, + { + "definition": { + "type": "timeseries", + "title": "Cost Clamped Events (settlement cost > reservation, capped to reserved)", + "show_legend": true, + "legend_layout": "auto", + "requests": [ + { + "response_format": "timeseries", + "queries": [ + { + "name": "query1", + "data_source": "metrics", + "query": "sum:d_inference.billing.cost_clamped{$env,model:$model}.as_count()" + } + ], + "formulas": [ + { "formula": "query1" } + ], + "style": { "palette": "warm" } + } + ] + } + }, + { + "definition": { + "type": "timeseries", + "title": "Reservation Refunds", + "show_legend": true, + "legend_layout": "auto", + "requests": [ + { + "response_format": "timeseries", + "queries": [ + { + "name": "query1", + "data_source": "metrics", + "query": "sum:d_inference.billing.reservation_refunds{$env,model:$model}.as_count()" + } + ], + "formulas": [ + { "formula": "query1" } + ], + "display_type": "bars" + } + ] + } + }, + { + "definition": { + "type": "timeseries", + "title": "Store Operation Latency (p50/p95/p99)", + "show_legend": true, + "legend_layout": "auto", + "requests": [ + { + "response_format": "timeseries", + "queries": [ + { + "name": "p50", + "data_source": "metrics", + "query": "avg:d_inference.store.debit.latency_ms.50percentile{$env} by {op}" + }, + { + "name": "p95", + "data_source": "metrics", + "query": "avg:d_inference.store.debit.latency_ms.95percentile{$env} by {op}" + }, + { + "name": "p99", + "data_source": "metrics", + "query": "avg:d_inference.store.debit.latency_ms.99percentile{$env} by {op}" + }, + { + "name": "c50", + "data_source": "metrics", + "query": "avg:d_inference.store.credit.latency_ms.50percentile{$env} by {op}" + }, + { + "name": "c95", + "data_source": "metrics", + "query": "avg:d_inference.store.credit.latency_ms.95percentile{$env} by {op}" + }, + { + "name": "c99", + "data_source": "metrics", + "query": "avg:d_inference.store.credit.latency_ms.99percentile{$env} by {op}" + } + ], + "formulas": [ + { "formula": "p50" }, + { "formula": "p95" }, + { "formula": "p99" }, + { "formula": "c50" }, + { "formula": "c95" }, + { "formula": "c99" } + ], + "style": { "palette": "warm" } + } + ] + } + }, + { + "definition": { + "type": "query_table", + "title": "Store Latency by Operation (p99)", + "has_search_bar": "auto", + "requests": [ + { + "response_format": "scalar", + "queries": [ + { + "name": "debit", + "data_source": "metrics", + "query": "avg:d_inference.store.debit.latency_ms.99percentile{$env} by {op}", + "aggregator": "last" + }, + { + "name": "credit", + "data_source": "metrics", + "query": "avg:d_inference.store.credit.latency_ms.99percentile{$env} by {op}", + "aggregator": "last" + } + ], + "formulas": [ + { "formula": "debit", "alias": "Debit p99 (ms)" }, + { "formula": "credit", "alias": "Credit p99 (ms)" } + ] + } + ] + } } ] } }, { "definition": { - "type": "list_stream", - "title": "All Recent Dev Logs", - "requests": [ - { - "query": { - "data_source": "logs_stream", - "query_string": "$env $service" - }, - "response_format": "event_list", - "columns": [ - { - "field": "timestamp", - "width": "auto" - }, - { - "field": "status", - "width": "auto" - }, - { - "field": "message", - "width": "compact" - } - ] + "type": "group", + "title": "Fleet Version & Binary Hash", + "layout_type": "ordered", + "show_title": true, + "widgets": [ + { + "definition": { + "type": "timeseries", + "title": "Providers by Version", + "show_legend": true, + "legend_layout": "auto", + "requests": [ + { + "response_format": "timeseries", + "queries": [ + { + "name": "query1", + "data_source": "metrics", + "query": "sum:d_inference.providers.per_version{$env} by {version}" + } + ], + "formulas": [ + { + "formula": "query1" + } + ] + } + ] + } + }, + { + "definition": { + "type": "query_value", + "title": "Min Provider Version", + "autoscale": false, + "precision": 0, + "requests": [ + { + "response_format": "scalar", + "queries": [ + { + "name": "query1", + "data_source": "metrics", + "query": "sum:d_inference.coordinator.min_provider_version_set{$env} by {min_version}" + } + ], + "formulas": [ + { + "formula": "query1" + } + ] + } + ] + } + }, + { + "definition": { + "type": "timeseries", + "title": "Providers Below Minimum Version", + "show_legend": true, + "legend_layout": "auto", + "requests": [ + { + "response_format": "timeseries", + "queries": [ + { + "name": "query1", + "data_source": "metrics", + "query": "sum:d_inference.provider_version_below_minimum{$env} by {gate,version}.as_count()" + } + ], + "formulas": [ + { + "formula": "query1" + } + ], + "display_type": "bars", + "style": { + "palette": "warm" + } + } + ] + } } ] } - }, - { - "definition": { - "type": "note", - "content": "APM and DogStatsD panels are intentionally omitted here until the dev VM runs a Datadog Agent on 8126/8125. The coordinator currently forwards logs directly via the Datadog Logs API, which is why this dashboard uses log analytics instead of `d_inference.*` metrics.", - "background_color": "gray", - "font_size": "12", - "text_align": "left", - "show_tick": false - } } ] } diff --git a/deploy/gcp/cloudbuild.yaml b/deploy/gcp/cloudbuild.yaml index 1abe6ca4..88697688 100644 --- a/deploy/gcp/cloudbuild.yaml +++ b/deploy/gcp/cloudbuild.yaml @@ -27,12 +27,11 @@ options: steps: - id: build name: gcr.io/cloud-builders/docker - dir: coordinator args: - build - --tag=${_IMAGE}:$SHORT_SHA - --tag=${_IMAGE}:latest - - --file=Dockerfile + - --file=coordinator/Dockerfile - . - id: push-sha diff --git a/deploy/gcp/refresh-env.sh b/deploy/gcp/refresh-env.sh index 176aa825..54e959c3 100644 --- a/deploy/gcp/refresh-env.sh +++ b/deploy/gcp/refresh-env.sh @@ -57,6 +57,7 @@ DD_API_KEY=$(fetch eigeninference-dd-api-key) DD_SITE=$(fetch eigeninference-dd-site) DD_ENV=development DD_SERVICE=d-inference-coordinator +DD_AGENT_HOST=localhost EOF # Validate critical secrets before overwriting the live env file. diff --git a/deploy/gcp/vm-startup.sh b/deploy/gcp/vm-startup.sh index f54d3686..033de585 100755 --- a/deploy/gcp/vm-startup.sh +++ b/deploy/gcp/vm-startup.sh @@ -9,6 +9,7 @@ # 3. Install a systemd unit for cloud-sql-proxy (Cloud SQL on 127.0.0.1:5432) # 4. Install a systemd unit for the coordinator container # 5. Fetch secrets from Secret Manager, write /etc/d-inference/env +# 6. Install Datadog Agent (metrics + traces + journald log collection) # # On subsequent boots: # - Re-fetch secrets (picks up rotations) @@ -138,6 +139,7 @@ DD_API_KEY=$(fetch eigeninference-dd-api-key) DD_SITE=$(fetch eigeninference-dd-site) DD_ENV=development DD_SERVICE=d-inference-coordinator +DD_AGENT_HOST=localhost EOF chmod 600 "$ENV_FILE" @@ -190,8 +192,9 @@ chmod +x /usr/local/bin/d-inference-run.sh cat > /etc/systemd/system/d-inference-coordinator.service </dev/null 2>&1 && ! dpkg -l datadog-agent >/dev/null 2>&1; then + DD_API_KEY="$DD_API_KEY_VAL" \ + DD_SITE="${DD_SITE_VAL:-datadoghq.com}" \ + bash -c "$(curl -fsSL https://s3.amazonaws.com/dd-agent/scripts/install_script_agent7.sh)" + fi + + # Ensure the agent is configured for this environment. + mkdir -p /etc/datadog-agent + cat > /etc/datadog-agent/datadog.yaml < /etc/datadog-agent/conf.d/journald.d/conf.yaml < MicroMDM (127.0.0.1:9002, HTTPS self-signed) # /acme/* -> step-ca (127.0.0.1:9000, HTTPS self-signed) @@ -261,8 +310,9 @@ api.dev.darkbloom.xyz { CADDYFILE systemctl daemon-reload -systemctl enable cloud-sql-proxy.service d-inference-coordinator.service caddy.service +systemctl enable cloud-sql-proxy.service datadog-agent.service d-inference-coordinator.service caddy.service systemctl restart cloud-sql-proxy.service +systemctl restart datadog-agent.service systemctl restart d-inference-coordinator.service systemctl restart caddy.service diff --git a/e2e/integration_test.go b/e2e/integration_test.go index 9d76da99..66c85f88 100644 --- a/e2e/integration_test.go +++ b/e2e/integration_test.go @@ -19,8 +19,8 @@ import ( "github.com/eigeninference/d-inference/coordinator/payments" "github.com/eigeninference/d-inference/e2e/testbed" tbassert "github.com/eigeninference/d-inference/e2e/testbed/assert" - tbprofile "github.com/eigeninference/d-inference/e2e/testbed/profile" "github.com/eigeninference/d-inference/e2e/testbed/profile" + tbprofile "github.com/eigeninference/d-inference/e2e/testbed/profile" ) var httpTimeout = 300 * time.Second @@ -573,6 +573,32 @@ func TestIntegration_AttestationHeaders(t *testing.T) { ) } +func TestIntegration_SwiftProviderRealRoutingGates(t *testing.T) { + ctx := context.Background() + s := testbed.NewSuite(testbed.SuiteConfig{}) + require.NoError(t, s.Start(ctx), "suite startup failed") + t.Cleanup(s.Stop) + + for _, id := range s.Coordinator.Registry.ProviderIDs() { + p := s.Coordinator.Registry.GetProvider(id) + require.NotNil(t, p) + p.ChallengeVerifiedSIP = true + p.RuntimeManifestChecked = true + s.Coordinator.Registry.RecordChallengeSuccess(id) + } + + model := s.PrimaryModelID() + found := s.Coordinator.Registry.FindProvider(model) + require.NotNil(t, found, "Swift provider should be routable after challenge success without ForceTrustProvider") + + resp := postChatCompletions(t, s, "What is 1+1? Answer with just the number.", false, 20) + defer resp.Body.Close() + respBody, _ := io.ReadAll(resp.Body) + require.Equal(t, http.StatusOK, resp.StatusCode, "body: %s", string(respBody[:min(len(respBody), 500)])) + + t.Logf("Swift provider real routing: status=200 via challenge-verified path") +} + func TestIntegration_ReferralRewardDistribution(t *testing.T) { s := startSuite(t) diff --git a/e2e/testbed/config.go b/e2e/testbed/config.go index 911da74e..196e1bf0 100644 --- a/e2e/testbed/config.go +++ b/e2e/testbed/config.go @@ -8,8 +8,8 @@ type ModelSpec struct { } var KnownModelSizes = map[string]string{ - "mlx-community/Qwen3.5-0.8B-MLX-4bit": "0.5 GB", - "mlx-community/gemma-3-270m-4bit": "0.2 GB", + "mlx-community/Qwen3.5-0.8B-MLX-4bit": "0.5 GB", + "mlx-community/gemma-3-270m-4bit": "0.2 GB", } type TrustLevel string diff --git a/e2e/testbed/load.go b/e2e/testbed/load.go index 5de87de7..e153f507 100644 --- a/e2e/testbed/load.go +++ b/e2e/testbed/load.go @@ -313,36 +313,36 @@ func (r *LoadResult) SummaryTable() string { if r.ProfileRun != nil && len(r.ProfileRun.SegmentTimings) > 0 { s.WriteString("\n") - s.WriteString(fmt.Sprintf("%-30s %8s %8s %8s %8s %8s\n", "SEGMENT", "COUNT", "MEAN", "P50", "P95", "MAX")) - s.WriteString("─────────────────────────────────────────────────────────────────────\n") - - for _, seg := range []Segment{ - SegmentTotalE2E, - SegmentParse, - SegmentReserve, - SegmentRoute, - SegmentQueueWait, - SegmentEncrypt, - SegmentDispatch, - SegmentCoordinatorToProvider, - SegmentTTFT, - } { - durations, ok := r.ProfileRun.SegmentTimings[seg] - if !ok || len(durations) == 0 { - continue - } - stats := computeStats(durations) - precision := time.Millisecond - if stats.Max < time.Millisecond { - precision = time.Microsecond - } - s.WriteString(fmt.Sprintf("%-30s %8d %8s %8s %8s %8s\n", - seg, stats.Count, - stats.Mean.Round(precision), - stats.Median.Round(precision), - stats.P95.Round(precision), - stats.Max.Round(precision), - )) + s.WriteString(fmt.Sprintf("%-30s %8s %8s %8s %8s %8s\n", "SEGMENT", "COUNT", "MEAN", "P50", "P95", "MAX")) + s.WriteString("─────────────────────────────────────────────────────────────────────\n") + + for _, seg := range []Segment{ + SegmentTotalE2E, + SegmentParse, + SegmentReserve, + SegmentRoute, + SegmentQueueWait, + SegmentEncrypt, + SegmentDispatch, + SegmentCoordinatorToProvider, + SegmentTTFT, + } { + durations, ok := r.ProfileRun.SegmentTimings[seg] + if !ok || len(durations) == 0 { + continue + } + stats := computeStats(durations) + precision := time.Millisecond + if stats.Max < time.Millisecond { + precision = time.Microsecond + } + s.WriteString(fmt.Sprintf("%-30s %8d %8s %8s %8s %8s\n", + seg, stats.Count, + stats.Mean.Round(precision), + stats.Median.Round(precision), + stats.P95.Round(precision), + stats.Max.Round(precision), + )) } } diff --git a/e2e/testbed/suite.go b/e2e/testbed/suite.go index cd2199ef..f76add49 100644 --- a/e2e/testbed/suite.go +++ b/e2e/testbed/suite.go @@ -44,9 +44,9 @@ func execCommandContext(ctx context.Context, name string, args ...string) *exec. } type Suite struct { - Ctx context.Context - Logger *slog.Logger - Config SuiteConfig + Ctx context.Context + Logger *slog.Logger + Config SuiteConfig Pg *deps.PostgresLifecycle PgStore store.Store