diff --git a/pkg/scheduler/actions/reclaim/reclaim_test.go b/pkg/scheduler/actions/reclaim/reclaim_test.go index a2aa26acc..1f1850a98 100644 --- a/pkg/scheduler/actions/reclaim/reclaim_test.go +++ b/pkg/scheduler/actions/reclaim/reclaim_test.go @@ -2595,6 +2595,99 @@ func getTestsMetadata() []integration_tests_utils.TestTopologyMetadata { }, }, }, + { + TestTopologyBasic: test_utils.TestTopologyBasic{ + Name: "over-deserved queue should not consolidate-reclaim from under-deserved queue", + Jobs: []*jobs_fake.TestJobBasic{ + { + Name: "mid_job0", + RequiredGPUsPerTask: 2, + Priority: constants.PriorityTrainNumber, + QueueName: "mid-training", + Tasks: []*tasks_fake.TestTaskBasic{ + { + NodeName: "node0", + State: pod_status.Running, + }, + }, + }, + { + Name: "mid_job1", + RequiredGPUsPerTask: 1, + Priority: constants.PriorityTrainNumber, + QueueName: "mid-training", + Tasks: []*tasks_fake.TestTaskBasic{ + { + NodeName: "node1", + State: pod_status.Running, + }, + }, + }, + { + Name: "pre_job0", + RequiredGPUsPerTask: 2, + Priority: constants.PriorityTrainNumber, + QueueName: "pre-training", + Tasks: []*tasks_fake.TestTaskBasic{ + { + State: pod_status.Pending, + }, + }, + }, + }, + Nodes: map[string]nodes_fake.TestNodeBasic{ + "node0": { + GPUs: 3, + }, + "node1": { + GPUs: 2, + }, + }, + Queues: []test_utils.TestQueueBasic{ + { + Name: "mid-training", + DeservedGPUs: 4, + GPUOverQuotaWeight: 0, + }, + { + Name: "pre-training", + DeservedGPUs: 1, + GPUOverQuotaWeight: 1, + }, + }, + // mid-training: 3 GPU allocated, 4 deserved → under deserved + // pre-training: 0 GPU allocated, 1 deserved, requests 2 GPU + // FairShare expands pre-training to ~2 (absorbs mid's unused capacity), + // so CanReclaimResources passes (0+2 ≤ 2). + // Without the fix: solver consolidates mid_job1 to node0, pipelines + // pre_job0 to node1. All victims are Pipelined → empty resource map → + // validator trivially approves → mid-training is disrupted. + // With the fix: handleAllVictimsConsolidated checks 0+2=2 > 1 (deserved) + // → rejects. mid-training jobs stay Running. + JobExpectedResults: map[string]test_utils.TestExpectedResultBasic{ + "mid_job0": { + NodeName: "node0", + GPUsRequired: 2, + Status: pod_status.Running, + DontValidateGPUGroup: true, + }, + "mid_job1": { + NodeName: "node1", + GPUsRequired: 1, + Status: pod_status.Running, + DontValidateGPUGroup: true, + }, + "pre_job0": { + GPUsRequired: 2, + Status: pod_status.Pending, + DontValidateGPUGroup: true, + }, + }, + Mocks: &test_utils.TestMock{ + CacheRequirements: &test_utils.CacheMocking{}, + }, + }, + }, { TestTopologyBasic: test_utils.TestTopologyBasic{ Name: "Attempt to reclaim from an inference workload", diff --git a/pkg/scheduler/metrics/metrics.go b/pkg/scheduler/metrics/metrics.go index eae920878..901677941 100644 --- a/pkg/scheduler/metrics/metrics.go +++ b/pkg/scheduler/metrics/metrics.go @@ -55,7 +55,8 @@ var ( queueMemoryUsage *prometheus.GaugeVec queueGPUUsage *prometheus.GaugeVec usageQueryLatency *prometheus.HistogramVec - podGroupEvictedPodsTotal *prometheus.CounterVec + podGroupEvictedPodsTotal *prometheus.CounterVec + reclaimConsolidationEmptyVictimMap *prometheus.CounterVec ) func init() { @@ -207,6 +208,13 @@ func InitMetrics(namespace string) { Name: "pod_group_evicted_pods_total", Help: "Total number of pods evicted per pod group", }, []string{"podgroup", "namespace", "uid", "nodepool", "action"}) + + reclaimConsolidationEmptyVictimMap = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "reclaim_consolidation_empty_victim_map_total", + Help: "Reclaim scenarios where victims existed but all were excluded from fair-share accounting (all re-pipelined).", + }, []string{"reclaimer_queue", "outcome"}) } // UpdateOpenSessionDuration updates latency for open session, including all plugins @@ -308,6 +316,10 @@ func RecordPodGroupEvictedPods(name, namespace, uid, nodepool, action string, co podGroupEvictedPodsTotal.WithLabelValues(name, namespace, uid, nodepool, action).Add(float64(count)) } +func RecordReclaimConsolidationEmptyVictimMap(reclaimerQueue, outcome string) { + reclaimConsolidationEmptyVictimMap.WithLabelValues(reclaimerQueue, outcome).Inc() +} + // Duration get the time since specified start func Duration(start time.Time) time.Duration { return time.Since(start) diff --git a/pkg/scheduler/plugins/proportion/proportion.go b/pkg/scheduler/plugins/proportion/proportion.go index 6405f3239..f53648fcc 100644 --- a/pkg/scheduler/plugins/proportion/proportion.go +++ b/pkg/scheduler/plugins/proportion/proportion.go @@ -146,9 +146,11 @@ func (pp *proportionPlugin) reclaimableFn( reclaimerInfo := pp.buildReclaimerInfo(scenario.GetPreemptor(), pp.minNodeGPUMemory) totalVictimsResources := make(map[common_info.QueueID][]resource_info.ResourceVector) victims := scenario.GetVictims() + skippedVictimCount := 0 for _, victim := range victims { totalJobResources := pp.getVictimResources(victim) if len(totalJobResources) == 0 { + skippedVictimCount++ continue } @@ -158,9 +160,72 @@ func (pp *proportionPlugin) reclaimableFn( ) } + if len(victims) > 0 && len(totalVictimsResources) == 0 { + return pp.handleAllVictimsConsolidated(reclaimerInfo, len(victims), skippedVictimCount) + } + return pp.reclaimablePlugin.Reclaimable(pp.jobSimulationQueues, reclaimerInfo, totalVictimsResources) } +// handleAllVictimsConsolidated handles the case where victims exist but all were +// re-pipelined and excluded from resource accounting. Allows the reclaim only if +// the reclaimer queue stays within its deserved quota for managed resources — +// preventing an over-fed queue from disrupting another queue's running work +// under the guise of defragmentation. +func (pp *proportionPlugin) handleAllVictimsConsolidated( + reclaimerInfo *rec.ReclaimerInfo, totalVictims, skippedVictims int, +) bool { + reclaimerQueue, found := pp.jobSimulationQueues[reclaimerInfo.Queue] + if !found { + log.InfraLogger.Errorf("Reclaim consolidation check: reclaimer queue <%s> not found", reclaimerInfo.Queue) + return false + } + + allocated := reclaimerQueue.GetAllocatedShare() + allocated.Add(utils.QuantifyVector(reclaimerInfo.RequiredResources, reclaimerInfo.VectorMap)) + deserved := reclaimerQueue.GetDeservedShare() + underDeserved := isUnderDeservedForManagedResources(allocated, deserved) + + log.InfraLogger.V(3).Infof( + "Reclaim consolidation check for <%s/%s> in queue <%s>: "+ + "totalVictims=%d skippedVictims=%d (all re-pipelined). "+ + "Reclaimer allocated+request=%s deserved=%s underDeserved=%v", + reclaimerInfo.Namespace, reclaimerInfo.Name, reclaimerQueue.Name, + totalVictims, skippedVictims, allocated, deserved, underDeserved) + + if !underDeserved { + log.InfraLogger.V(2).Infof( + "Rejecting consolidating reclaim for <%s/%s>: reclaimer queue <%s> "+ + "would exceed deserved quota. %d victims were re-pipelined but disruption is not justified", + reclaimerInfo.Namespace, reclaimerInfo.Name, reclaimerQueue.Name, totalVictims) + metrics.RecordReclaimConsolidationEmptyVictimMap(reclaimerQueue.Name, "rejected") + return false + } + + log.InfraLogger.V(3).Infof( + "Allowing consolidating reclaim for <%s/%s>: reclaimer queue <%s> is under deserved quota", + reclaimerInfo.Namespace, reclaimerInfo.Name, reclaimerQueue.Name) + metrics.RecordReclaimConsolidationEmptyVictimMap(reclaimerQueue.Name, "approved") + return true +} + +// isUnderDeservedForManagedResources checks allocated <= deserved only for +// resources where the queue has a real quota (deserved > 0 and not unlimited). +// Resources with deserved=0 or unlimited are not managed by the quota system +// and should not block consolidation. +func isUnderDeservedForManagedResources(allocated, deserved rs.ResourceQuantities) bool { + for _, resource := range rs.AllResources { + d := deserved[resource] + if d <= 0 || d == commonconstants.UnlimitedResourceQuantity { + continue + } + if allocated[resource] > d { + return false + } + } + return true +} + func (pp *proportionPlugin) getVictimResources(victim *api.VictimInfo) []resource_info.ResourceVector { var victimResources []resource_info.ResourceVector diff --git a/pkg/scheduler/plugins/proportion/proportion_test.go b/pkg/scheduler/plugins/proportion/proportion_test.go index 8748f6d8e..e825a20cf 100644 --- a/pkg/scheduler/plugins/proportion/proportion_test.go +++ b/pkg/scheduler/plugins/proportion/proportion_test.go @@ -29,6 +29,7 @@ import ( "github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/conf" "github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/framework" k8splugins "github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/k8s_internal/plugins" + rec "github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/plugins/proportion/reclaimable" rs "github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/plugins/proportion/resource_share" ) @@ -1004,4 +1005,103 @@ var _ = Describe("New", func() { Expect(plugin.relcaimerSaturationMultiplier).To(Equal(1.0)) }) }) + + Context("handleAllVictimsConsolidated", func() { + makeQueue := func(name string, allocated, deserved float64) *rs.QueueAttributes { + q := &rs.QueueAttributes{ + UID: common_info.QueueID(name), + Name: name, + } + q.SetQuotaResources(rs.GpuResource, deserved, commonconstants.UnlimitedResourceQuantity, 0) + q.SetQuotaResources(rs.CpuResource, commonconstants.UnlimitedResourceQuantity, commonconstants.UnlimitedResourceQuantity, 0) + q.SetQuotaResources(rs.MemoryResource, commonconstants.UnlimitedResourceQuantity, commonconstants.UnlimitedResourceQuantity, 0) + q.GPU.Allocated = allocated + return q + } + + It("should reject when reclaimer would exceed deserved quota", func() { + plugin := &proportionPlugin{ + jobSimulationQueues: map[common_info.QueueID]*rs.QueueAttributes{ + "pre-training": makeQueue("pre-training", 6215, 5120), + }, + } + reclaimerInfo := &rec.ReclaimerInfo{ + Name: "test-job", + Namespace: "pre-training", + Queue: "pre-training", + RequiredResources: resource_info.NewResourceVectorWithValues(0, 0, 32, testVectorMap), + VectorMap: testVectorMap, + IsPreemptable: true, + } + result := plugin.handleAllVictimsConsolidated(reclaimerInfo, 15, 15) + Expect(result).To(BeFalse()) + }) + + It("should approve when reclaimer stays within deserved quota", func() { + plugin := &proportionPlugin{ + jobSimulationQueues: map[common_info.QueueID]*rs.QueueAttributes{ + "queue1": makeQueue("queue1", 0, 2), + }, + } + reclaimerInfo := &rec.ReclaimerInfo{ + Name: "test-job", + Namespace: "queue1", + Queue: "queue1", + RequiredResources: resource_info.NewResourceVectorWithValues(0, 0, 2, testVectorMap), + VectorMap: testVectorMap, + IsPreemptable: true, + } + result := plugin.handleAllVictimsConsolidated(reclaimerInfo, 1, 1) + Expect(result).To(BeTrue()) + }) + + It("should approve GPU-starved queue even when CPU/Memory deserved is zero", func() { + q := &rs.QueueAttributes{ + UID: "gpu-queue", + Name: "gpu-queue", + } + q.SetQuotaResources(rs.GpuResource, 4, commonconstants.UnlimitedResourceQuantity, 0) + q.SetQuotaResources(rs.CpuResource, 0, commonconstants.UnlimitedResourceQuantity, 0) + q.SetQuotaResources(rs.MemoryResource, 0, commonconstants.UnlimitedResourceQuantity, 0) + q.GPU.Allocated = 1 + q.CPU.Allocated = 500 + q.Memory.Allocated = 1000 + + plugin := &proportionPlugin{ + jobSimulationQueues: map[common_info.QueueID]*rs.QueueAttributes{ + "gpu-queue": q, + }, + } + reclaimerInfo := &rec.ReclaimerInfo{ + Name: "test-job", + Namespace: "ns", + Queue: "gpu-queue", + RequiredResources: resource_info.NewResourceVectorWithValues(100, 500, 2, testVectorMap), + VectorMap: testVectorMap, + IsPreemptable: true, + } + result := plugin.handleAllVictimsConsolidated(reclaimerInfo, 1, 1) + Expect(result).To(BeTrue()) + }) + }) + + Context("isUnderDeservedForManagedResources", func() { + It("should return true when all managed resources are under deserved", func() { + allocated := rs.ResourceQuantities{rs.GpuResource: 3, rs.CpuResource: 100, rs.MemoryResource: 200} + deserved := rs.ResourceQuantities{rs.GpuResource: 5, rs.CpuResource: commonconstants.UnlimitedResourceQuantity, rs.MemoryResource: commonconstants.UnlimitedResourceQuantity} + Expect(isUnderDeservedForManagedResources(allocated, deserved)).To(BeTrue()) + }) + + It("should return false when GPU exceeds deserved", func() { + allocated := rs.ResourceQuantities{rs.GpuResource: 6, rs.CpuResource: 100, rs.MemoryResource: 200} + deserved := rs.ResourceQuantities{rs.GpuResource: 5, rs.CpuResource: commonconstants.UnlimitedResourceQuantity, rs.MemoryResource: commonconstants.UnlimitedResourceQuantity} + Expect(isUnderDeservedForManagedResources(allocated, deserved)).To(BeFalse()) + }) + + It("should skip resources with deserved=0", func() { + allocated := rs.ResourceQuantities{rs.GpuResource: 2, rs.CpuResource: 500, rs.MemoryResource: 1000} + deserved := rs.ResourceQuantities{rs.GpuResource: 4, rs.CpuResource: 0, rs.MemoryResource: 0} + Expect(isUnderDeservedForManagedResources(allocated, deserved)).To(BeTrue()) + }) + }) })