Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
f7dab6f
Fix Darkbloom analytics tracking
Gajesh2007 Apr 28, 2026
e515244
Harden release workflow protections (#103)
Gajesh2007 Apr 30, 2026
b5dd048
Harden release registration and binary hash policy (#99)
anupsv Apr 30, 2026
e6d63a8
Remove stale Python integration test (#109)
hankbobtheresearchoor May 1, 2026
7ccc592
chore: remove unused dependencies (#112)
hankbobtheresearchoor May 2, 2026
98a3a02
ci: run CI on any PR, not just master/main (#119)
ethenotethan May 3, 2026
cf4c0ef
ci: remove racing deploy-dev-coordinator workflow (#137)
hankbobtheresearchoor May 7, 2026
ae24144
feat: add Datadog observability stack for dev coordinator
ethenotethan May 8, 2026
d57c8dd
fix: prevent double-decrement when untrusted provider disconnects
ethenotethan May 8, 2026
d625899
merge: resolve conflicts with swift-provider branch
ethenotethan May 12, 2026
1d581d0
feat: add fleet version and binary hash observability
ethenotethan May 12, 2026
87ed7ae
fix: update Dockerfile + cloudbuild for go.mod at repo root
ethenotethan May 12, 2026
75f3831
fix: chmod +x coordinator binary in Dockerfile
ethenotethan May 12, 2026
f049d24
fix: ensure coordinator binary is executable in builder stage
ethenotethan May 12, 2026
a0db387
fix: rename coordinator source dir in builder to avoid colliding with…
ethenotethan May 12, 2026
d524ece
fix: copy full repo in Dockerfile builder so go.mod resolves all pack…
ethenotethan May 13, 2026
030136f
fix: remove unused modelTypeTag and format Go files for CI
ethenotethan May 13, 2026
51b1ba0
fix: skip python/dangerous-modules check for swift runtime in private…
ethenotethan May 13, 2026
287f8e5
billing telemetry + MarkUntrusted race fix + Swift routing tests
ethenotethan May 13, 2026
971c988
fix Heartbeat reviving untrusted providers causing onlineCount double…
ethenotethan May 13, 2026
4a6c098
revert orthogonal landing/console-ui/provider changes
ethenotethan May 14, 2026
13e1338
remove unbounded binary_hash cardinality, add input token metrics + s…
ethenotethan May 14, 2026
6996109
fix review feedback: ModelType() untrusted filter, routing.cost_ms by…
ethenotethan May 14, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ on:
push:
branches: [master, main]
pull_request:
branches: [master, main]

jobs:
test-coordinator:
Expand Down
5 changes: 3 additions & 2 deletions coordinator/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -o coordinator ./coordinator/cmd/coordinator
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -o /app/coordinator-bin ./coordinator/cmd/coordinator

# Pre-built base with MicroMDM v1.13.1, step-ca v0.28.3, step CLI v0.28.3.
# Rebuild base only when tool versions change: docker build -f Dockerfile.base -t eigengajesh/d-inference-base:latest . && docker push
FROM eigengajesh/d-inference-base:v1-amd64

COPY --from=builder /app/coordinator /usr/local/bin/coordinator
COPY --from=builder /app/coordinator-bin /usr/local/bin/coordinator
RUN chmod +x /usr/local/bin/coordinator

RUN mkdir -p /var/www/html/dl

Expand Down
4 changes: 2 additions & 2 deletions coordinator/api/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ func (c *ttlCache) PurgeExpired() {

// writeCachedJSON writes pre-serialized JSON bytes with the standard
// Content-Type header. Used on cache hit to skip json.Marshal.
func writeCachedJSON(w http.ResponseWriter, status int, body []byte) {
func writeCachedJSON(w http.ResponseWriter, body []byte) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
w.WriteHeader(http.StatusOK)
_, _ = w.Write(body)
}
47 changes: 29 additions & 18 deletions coordinator/api/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,19 +595,25 @@ func (s *Server) handleChatCompletions(w http.ResponseWriter, r *http.Request) {
if s.billing != nil {
consumerKey := consumerKeyFromContext(r.Context())
reservedMicroUSD = s.reservationCost(model, estimatedPromptTokens, requestedMaxTokens)
start := time.Now()
if err := s.ledger.Charge(consumerKey, reservedMicroUSD, "reserve:"+consumerKey); err != nil {
writeJSON(w, http.StatusPaymentRequired, errorResponse("insufficient_funds",
"your balance is too low for this request — add funds at /billing or lower max_tokens"))
return
}
s.ddHistogram("billing.reserved_micro_usd", float64(reservedMicroUSD), []string{"model:" + model})
s.ddHistogram("store.debit.latency_ms", float64(time.Since(start).Milliseconds()), []string{"op:reserve"})
}
timing.ReservedAt = time.Now()

// Refund reservation on early errors (before inference starts).
refundReservation := func() {
if reservedMicroUSD > 0 {
consumerKey := consumerKeyFromContext(r.Context())
start := time.Now()
_ = s.store.Credit(consumerKey, reservedMicroUSD, store.LedgerRefund, "reservation_refund")
s.ddIncr("billing.reservation_refunds", []string{"model:" + model})
s.ddHistogram("store.credit.latency_ms", float64(time.Since(start).Milliseconds()), []string{"op:reservation_refund"})
}
}

Expand Down Expand Up @@ -678,7 +684,7 @@ func (s *Server) handleChatCompletions(w http.ResponseWriter, r *http.Request) {
// "fleet over-subscribed for this model size".
outcome = "over_capacity"
}
s.ddIncr("routing.decisions", []string{"model:" + model, "outcome:" + outcome})
s.ddIncr("routing.decisions", []string{"model:" + model, "model_type:" + s.registry.ModelType(model), "outcome:" + outcome})
break
}
// No idle provider — try queueing.
Expand All @@ -690,12 +696,12 @@ func (s *Server) handleChatCompletions(w http.ResponseWriter, r *http.Request) {
}
pr.Timing.QueuedAt = time.Now()
if err := s.registry.Queue().Enqueue(queuedReq); err != nil {
s.ddIncr("routing.decisions", []string{"model:" + model, "outcome:over_capacity"})
s.ddIncr("routing.decisions", []string{"model:" + model, "model_type:" + s.registry.ModelType(model), "outcome:over_capacity"})
refundReservation()
writeJSON(w, http.StatusServiceUnavailable, errorResponse("model_not_available", fmt.Sprintf("no hardware-trusted provider available for model %q and queue is full", model)))
return
}
s.ddIncr("routing.decisions", []string{"model:" + model, "outcome:queued"})
s.ddIncr("routing.decisions", []string{"model:" + model, "model_type:" + s.registry.ModelType(model), "outcome:queued"})

s.logger.Info("request queued, waiting for provider",
"model", model,
Expand All @@ -710,7 +716,7 @@ func (s *Server) handleChatCompletions(w http.ResponseWriter, r *http.Request) {
return
}
refundReservation()
s.ddIncr("request_queue.timeout", []string{"model:" + model})
s.ddIncr("request_queue.timeout", []string{"model:" + model, "model_type:" + s.registry.ModelType(model)})
writeJSON(w, http.StatusServiceUnavailable, errorResponse("model_not_available", fmt.Sprintf("no hardware-trusted provider became available for model %q (queue timeout)", model)))
return
}
Expand All @@ -719,7 +725,7 @@ func (s *Server) handleChatCompletions(w http.ResponseWriter, r *http.Request) {
timing.RoutedAt = time.Now()
s.ddIncr("routing.decisions", []string{"model:" + model, "outcome:selected"})
s.ddIncr("routing.provider_selected", []string{"provider_id:" + provider.ID, "model:" + model})
s.ddHistogram("routing.cost_ms", decision.CostMs, []string{"model:" + model})
s.ddHistogram("routing.cost_ms", decision.CostMs, []string{"model:" + model, "provider_id:" + provider.ID})
if decision.EffectiveTPS > 0 {
s.ddGauge("routing.effective_decode_tps", decision.EffectiveTPS, []string{"provider_id:" + provider.ID})
}
Expand Down Expand Up @@ -839,7 +845,7 @@ func (s *Server) handleChatCompletions(w http.ResponseWriter, r *http.Request) {
"attempt", attempt+1,
"error", errMsg.Error,
)
s.emitRequest(r.Context(), protocol.SeverityWarn, protocol.KindInferenceError, requestID,
s.emitRequest(r.Context(), protocol.SeverityWarn, requestID,
"provider failed, retrying",
map[string]any{
"provider_id": provider.ID,
Expand Down Expand Up @@ -875,7 +881,7 @@ func (s *Server) handleChatCompletions(w http.ResponseWriter, r *http.Request) {
"provider_id", provider.ID,
"attempt", attempt+1,
)
s.emitRequest(r.Context(), protocol.SeverityWarn, protocol.KindInferenceError, requestID,
s.emitRequest(r.Context(), protocol.SeverityWarn, requestID,
"provider first-chunk timeout",
map[string]any{
"provider_id": provider.ID,
Expand Down Expand Up @@ -924,7 +930,7 @@ func (s *Server) handleChatCompletions(w http.ResponseWriter, r *http.Request) {
"attempt", attempt+1,
"error", errMsg.Error,
)
s.emitRequest(r.Context(), protocol.SeverityWarn, protocol.KindInferenceError, requestID,
s.emitRequest(r.Context(), protocol.SeverityWarn, requestID,
"provider failed after accepting request, retrying",
map[string]any{
"provider_id": provider.ID,
Expand Down Expand Up @@ -956,7 +962,7 @@ func (s *Server) handleChatCompletions(w http.ResponseWriter, r *http.Request) {
"attempt", attempt+1,
"error", errMsg.Error,
)
s.emitRequest(r.Context(), protocol.SeverityWarn, protocol.KindInferenceError, requestID,
s.emitRequest(r.Context(), protocol.SeverityWarn, requestID,
"provider failed after accepting request, retrying",
map[string]any{
"provider_id": provider.ID,
Expand All @@ -983,7 +989,7 @@ func (s *Server) handleChatCompletions(w http.ResponseWriter, r *http.Request) {
"provider_id", provider.ID,
"attempt", attempt+1,
)
s.emitRequest(r.Context(), protocol.SeverityWarn, protocol.KindInferenceError, requestID,
s.emitRequest(r.Context(), protocol.SeverityWarn, requestID,
"provider accepted timeout",
map[string]any{
"provider_id": provider.ID,
Expand Down Expand Up @@ -1015,7 +1021,7 @@ func (s *Server) handleChatCompletions(w http.ResponseWriter, r *http.Request) {
if statusCode == 0 {
statusCode = http.StatusServiceUnavailable
}
s.emitRequest(r.Context(), protocol.SeverityError, protocol.KindInferenceError, requestID,
s.emitRequest(r.Context(), protocol.SeverityError, requestID,
fmt.Sprintf("inference failed after %d attempt(s)", maxDispatchAttempts),
map[string]any{
"reason": "dispatch_exhausted",
Expand Down Expand Up @@ -2223,7 +2229,7 @@ func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleVersion(w http.ResponseWriter, r *http.Request) {
const cacheKey = "api_version:v1"
if cached, ok := s.readCache.Get(cacheKey); ok {
writeCachedJSON(w, http.StatusOK, cached)
writeCachedJSON(w, cached)
return
}

Expand Down Expand Up @@ -2258,7 +2264,7 @@ func (s *Server) handleVersion(w http.ResponseWriter, r *http.Request) {
return
}
s.readCache.Set(cacheKey, body, time.Minute)
writeCachedJSON(w, http.StatusOK, body)
writeCachedJSON(w, body)
}

// --- payment handlers ---
Expand Down Expand Up @@ -2457,16 +2463,21 @@ func (s *Server) handleGenericInference(w http.ResponseWriter, r *http.Request,
var reservedMicroUSD int64
if s.billing != nil {
reservedMicroUSD = s.reservationCost(model, estimatedPromptTokens, requestedMaxTokens)
start := time.Now()
if err := s.ledger.Charge(consumerKey, reservedMicroUSD, "reserve:"+consumerKey); err != nil {
writeJSON(w, http.StatusPaymentRequired, errorResponse("insufficient_funds",
"your balance is too low for this request — add funds at /billing or lower max_tokens"))
return
}
s.ddHistogram("billing.reserved_micro_usd", float64(reservedMicroUSD), []string{"model:" + model})
s.ddHistogram("store.debit.latency_ms", float64(time.Since(start).Milliseconds()), []string{"op:reserve"})
}
// Refund the reservation on any early failure before dispatch.
refundReservation := func() {
if reservedMicroUSD > 0 {
start := time.Now()
_ = s.store.Credit(consumerKey, reservedMicroUSD, store.LedgerRefund, "reservation_refund")
s.ddIncr("billing.reservation_refunds", []string{"model:" + model})
s.ddHistogram("store.credit.latency_ms", float64(time.Since(start).Milliseconds()), []string{"op:reservation_refund"})
}
}

Expand Down Expand Up @@ -2494,12 +2505,12 @@ func (s *Server) handleGenericInference(w http.ResponseWriter, r *http.Request,
}
if err := s.registry.Queue().Enqueue(queuedReq); err != nil {
refundReservation()
s.ddIncr("routing.decisions", []string{"model:" + model, "outcome:over_capacity"})
s.ddIncr("routing.decisions", []string{"model:" + model, "model_type:" + s.registry.ModelType(model), "outcome:over_capacity"})
writeJSON(w, http.StatusServiceUnavailable, errorResponse("model_not_available",
fmt.Sprintf("no provider available for model %q", model)))
return
}
s.ddIncr("routing.decisions", []string{"model:" + model, "outcome:queued"})
s.ddIncr("routing.decisions", []string{"model:" + model, "model_type:" + s.registry.ModelType(model), "outcome:queued"})
provider, err = s.registry.Queue().WaitForProviderContext(r.Context(), queuedReq)
if err != nil {
if errors.Is(err, context.Canceled) {
Expand All @@ -2513,9 +2524,9 @@ func (s *Server) handleGenericInference(w http.ResponseWriter, r *http.Request,
}
decision = queuedReq.Decision
}
s.ddIncr("routing.decisions", []string{"model:" + model, "outcome:selected"})
s.ddIncr("routing.decisions", []string{"model:" + model, "model_type:" + s.registry.ModelType(model), "outcome:selected"})
s.ddIncr("routing.provider_selected", []string{"provider_id:" + provider.ID, "model:" + model})
s.ddHistogram("routing.cost_ms", decision.CostMs, []string{"model:" + model})
s.ddHistogram("routing.cost_ms", decision.CostMs, []string{"model:" + model, "provider_id:" + provider.ID})
if decision.EffectiveTPS > 0 {
s.ddGauge("routing.effective_decode_tps", decision.EffectiveTPS, []string{"provider_id:" + provider.ID})
}
Expand Down
Loading
Loading