diff --git a/sdk/cliproxy/auth/conductor.go b/sdk/cliproxy/auth/conductor.go index b29e04db8c..1152bca0b1 100644 --- a/sdk/cliproxy/auth/conductor.go +++ b/sdk/cliproxy/auth/conductor.go @@ -233,6 +233,84 @@ func (m *Manager) RefreshSchedulerEntry(authID string) { m.scheduler.upsertAuth(snapshot) } +// ReconcileRegistryModelStates aligns per-model runtime state with the current +// registry snapshot for one auth. +// +// Supported models are reset to a clean state because re-registration already +// cleared the registry-side cooldown/suspension snapshot. ModelStates for +// models that are no longer present in the registry are pruned entirely so +// renamed/removed models cannot keep auth-level status stale. +func (m *Manager) ReconcileRegistryModelStates(ctx context.Context, authID string) { + if m == nil || authID == "" { + return + } + + supportedModels := registry.GetGlobalRegistry().GetModelsForClient(authID) + supported := make(map[string]struct{}, len(supportedModels)) + for _, model := range supportedModels { + if model == nil { + continue + } + modelKey := canonicalModelKey(model.ID) + if modelKey == "" { + continue + } + supported[modelKey] = struct{}{} + } + + var snapshot *Auth + now := time.Now() + + m.mu.Lock() + auth, ok := m.auths[authID] + if ok && auth != nil && len(auth.ModelStates) > 0 { + changed := false + for modelKey, state := range auth.ModelStates { + baseModel := canonicalModelKey(modelKey) + if baseModel == "" { + baseModel = strings.TrimSpace(modelKey) + } + if _, supportedModel := supported[baseModel]; !supportedModel { + // Drop state for models that disappeared from the current registry + // snapshot. Keeping them around leaks stale errors into auth-level + // status, management output, and websocket fallback checks. + delete(auth.ModelStates, modelKey) + changed = true + continue + } + if state == nil { + continue + } + if modelStateIsClean(state) { + continue + } + resetModelState(state, now) + changed = true + } + if len(auth.ModelStates) == 0 { + auth.ModelStates = nil + } + if changed { + updateAggregatedAvailability(auth, now) + if !hasModelError(auth, now) { + auth.LastError = nil + auth.StatusMessage = "" + auth.Status = StatusActive + } + auth.UpdatedAt = now + if errPersist := m.persist(ctx, auth); errPersist != nil { + logEntryWithRequestID(ctx).WithField("auth_id", auth.ID).Warnf("failed to persist auth changes during model state reconciliation: %v", errPersist) + } + snapshot = auth.Clone() + } + } + m.mu.Unlock() + + if m.scheduler != nil && snapshot != nil { + m.scheduler.upsertAuth(snapshot) + } +} + func (m *Manager) SetSelector(selector Selector) { if m == nil { return @@ -1735,8 +1813,28 @@ func resetModelState(state *ModelState, now time.Time) { state.UpdatedAt = now } +func modelStateIsClean(state *ModelState) bool { + if state == nil { + return true + } + if state.Status != StatusActive { + return false + } + if state.Unavailable || state.StatusMessage != "" || !state.NextRetryAfter.IsZero() || state.LastError != nil { + return false + } + if state.Quota.Exceeded || state.Quota.Reason != "" || !state.Quota.NextRecoverAt.IsZero() || state.Quota.BackoffLevel != 0 { + return false + } + return true +} + func updateAggregatedAvailability(auth *Auth, now time.Time) { - if auth == nil || len(auth.ModelStates) == 0 { + if auth == nil { + return + } + if len(auth.ModelStates) == 0 { + clearAggregatedAvailability(auth) return } allUnavailable := true @@ -1744,10 +1842,12 @@ func updateAggregatedAvailability(auth *Auth, now time.Time) { quotaExceeded := false quotaRecover := time.Time{} maxBackoffLevel := 0 + hasState := false for _, state := range auth.ModelStates { if state == nil { continue } + hasState = true stateUnavailable := false if state.Status == StatusDisabled { stateUnavailable = true @@ -1777,6 +1877,10 @@ func updateAggregatedAvailability(auth *Auth, now time.Time) { } } } + if !hasState { + clearAggregatedAvailability(auth) + return + } auth.Unavailable = allUnavailable if allUnavailable { auth.NextRetryAfter = earliestRetry @@ -1796,6 +1900,15 @@ func updateAggregatedAvailability(auth *Auth, now time.Time) { } } +func clearAggregatedAvailability(auth *Auth) { + if auth == nil { + return + } + auth.Unavailable = false + auth.NextRetryAfter = time.Time{} + auth.Quota = QuotaState{} +} + func hasModelError(auth *Auth, now time.Time) bool { if auth == nil || len(auth.ModelStates) == 0 { return false diff --git a/sdk/cliproxy/service.go b/sdk/cliproxy/service.go index abe1deed5f..a562cfb317 100644 --- a/sdk/cliproxy/service.go +++ b/sdk/cliproxy/service.go @@ -310,6 +310,7 @@ func (s *Service) applyCoreAuthAddOrUpdate(ctx context.Context, auth *coreauth.A // This operation may block on network calls, but the auth configuration // is already effective at this point. s.registerModelsForAuth(auth) + s.coreManager.ReconcileRegistryModelStates(ctx, auth.ID) // Refresh the scheduler entry so that the auth's supportedModelSet is rebuilt // from the now-populated global model registry. Without this, newly added auths @@ -1019,6 +1020,7 @@ func (s *Service) refreshModelRegistrationForAuth(current *coreauth.Auth) bool { s.ensureExecutorsForAuth(current) } s.registerModelsForAuth(current) + s.coreManager.ReconcileRegistryModelStates(context.Background(), current.ID) latest, ok := s.latestAuthForModelRegistration(current.ID) if !ok || latest.Disabled { @@ -1032,6 +1034,7 @@ func (s *Service) refreshModelRegistrationForAuth(current *coreauth.Auth) bool { // no auth fields changed, but keeps the refresh path simple and correct. s.ensureExecutorsForAuth(latest) s.registerModelsForAuth(latest) + s.coreManager.ReconcileRegistryModelStates(context.Background(), latest.ID) s.coreManager.RefreshSchedulerEntry(current.ID) return true }