Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions pkg/scheduler/actions/reclaim/reclaim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2595,6 +2595,99 @@ func getTestsMetadata() []integration_tests_utils.TestTopologyMetadata {
},
},
},
{
TestTopologyBasic: test_utils.TestTopologyBasic{
Name: "over-deserved queue should not consolidate-reclaim from under-deserved queue",
Jobs: []*jobs_fake.TestJobBasic{
{
Name: "mid_job0",
RequiredGPUsPerTask: 2,
Priority: constants.PriorityTrainNumber,
QueueName: "mid-training",
Tasks: []*tasks_fake.TestTaskBasic{
{
NodeName: "node0",
State: pod_status.Running,
},
},
},
{
Name: "mid_job1",
RequiredGPUsPerTask: 1,
Priority: constants.PriorityTrainNumber,
QueueName: "mid-training",
Tasks: []*tasks_fake.TestTaskBasic{
{
NodeName: "node1",
State: pod_status.Running,
},
},
},
{
Name: "pre_job0",
RequiredGPUsPerTask: 2,
Priority: constants.PriorityTrainNumber,
QueueName: "pre-training",
Tasks: []*tasks_fake.TestTaskBasic{
{
State: pod_status.Pending,
},
},
},
},
Nodes: map[string]nodes_fake.TestNodeBasic{
"node0": {
GPUs: 3,
},
"node1": {
GPUs: 2,
},
},
Queues: []test_utils.TestQueueBasic{
{
Name: "mid-training",
DeservedGPUs: 4,
GPUOverQuotaWeight: 0,
},
{
Name: "pre-training",
DeservedGPUs: 1,
GPUOverQuotaWeight: 1,
},
},
// mid-training: 3 GPU allocated, 4 deserved → under deserved
// pre-training: 0 GPU allocated, 1 deserved, requests 2 GPU
// FairShare expands pre-training to ~2 (absorbs mid's unused capacity),
// so CanReclaimResources passes (0+2 ≤ 2).
// Without the fix: solver consolidates mid_job1 to node0, pipelines
// pre_job0 to node1. All victims are Pipelined → empty resource map →
// validator trivially approves → mid-training is disrupted.
// With the fix: handleAllVictimsConsolidated checks 0+2=2 > 1 (deserved)
// → rejects. mid-training jobs stay Running.
JobExpectedResults: map[string]test_utils.TestExpectedResultBasic{
"mid_job0": {
NodeName: "node0",
GPUsRequired: 2,
Status: pod_status.Running,
DontValidateGPUGroup: true,
},
"mid_job1": {
NodeName: "node1",
GPUsRequired: 1,
Status: pod_status.Running,
DontValidateGPUGroup: true,
},
"pre_job0": {
GPUsRequired: 2,
Status: pod_status.Pending,
DontValidateGPUGroup: true,
},
},
Mocks: &test_utils.TestMock{
CacheRequirements: &test_utils.CacheMocking{},
},
},
},
{
TestTopologyBasic: test_utils.TestTopologyBasic{
Name: "Attempt to reclaim from an inference workload",
Expand Down
14 changes: 13 additions & 1 deletion pkg/scheduler/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ var (
queueMemoryUsage *prometheus.GaugeVec
queueGPUUsage *prometheus.GaugeVec
usageQueryLatency *prometheus.HistogramVec
podGroupEvictedPodsTotal *prometheus.CounterVec
podGroupEvictedPodsTotal *prometheus.CounterVec
reclaimConsolidationEmptyVictimMap *prometheus.CounterVec
)

func init() {
Expand Down Expand Up @@ -207,6 +208,13 @@ func InitMetrics(namespace string) {
Name: "pod_group_evicted_pods_total",
Help: "Total number of pods evicted per pod group",
}, []string{"podgroup", "namespace", "uid", "nodepool", "action"})

reclaimConsolidationEmptyVictimMap = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "reclaim_consolidation_empty_victim_map_total",
Help: "Reclaim scenarios where victims existed but all were excluded from fair-share accounting (all re-pipelined).",
}, []string{"reclaimer_queue", "outcome"})
}

// UpdateOpenSessionDuration updates latency for open session, including all plugins
Expand Down Expand Up @@ -308,6 +316,10 @@ func RecordPodGroupEvictedPods(name, namespace, uid, nodepool, action string, co
podGroupEvictedPodsTotal.WithLabelValues(name, namespace, uid, nodepool, action).Add(float64(count))
}

func RecordReclaimConsolidationEmptyVictimMap(reclaimerQueue, outcome string) {
reclaimConsolidationEmptyVictimMap.WithLabelValues(reclaimerQueue, outcome).Inc()
}

// Duration get the time since specified start
func Duration(start time.Time) time.Duration {
return time.Since(start)
Expand Down
65 changes: 65 additions & 0 deletions pkg/scheduler/plugins/proportion/proportion.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,11 @@ func (pp *proportionPlugin) reclaimableFn(
reclaimerInfo := pp.buildReclaimerInfo(scenario.GetPreemptor(), pp.minNodeGPUMemory)
totalVictimsResources := make(map[common_info.QueueID][]resource_info.ResourceVector)
victims := scenario.GetVictims()
skippedVictimCount := 0
for _, victim := range victims {
totalJobResources := pp.getVictimResources(victim)
if len(totalJobResources) == 0 {
skippedVictimCount++
continue
}

Expand All @@ -158,9 +160,72 @@ func (pp *proportionPlugin) reclaimableFn(
)
}

if len(victims) > 0 && len(totalVictimsResources) == 0 {
return pp.handleAllVictimsConsolidated(reclaimerInfo, len(victims), skippedVictimCount)
}

return pp.reclaimablePlugin.Reclaimable(pp.jobSimulationQueues, reclaimerInfo, totalVictimsResources)
}

// handleAllVictimsConsolidated handles the case where victims exist but all were
// re-pipelined and excluded from resource accounting. Allows the reclaim only if
// the reclaimer queue stays within its deserved quota for managed resources —
// preventing an over-fed queue from disrupting another queue's running work
// under the guise of defragmentation.
func (pp *proportionPlugin) handleAllVictimsConsolidated(
reclaimerInfo *rec.ReclaimerInfo, totalVictims, skippedVictims int,
) bool {
reclaimerQueue, found := pp.jobSimulationQueues[reclaimerInfo.Queue]
if !found {
log.InfraLogger.Errorf("Reclaim consolidation check: reclaimer queue <%s> not found", reclaimerInfo.Queue)
return false
}

allocated := reclaimerQueue.GetAllocatedShare()
allocated.Add(utils.QuantifyVector(reclaimerInfo.RequiredResources, reclaimerInfo.VectorMap))
deserved := reclaimerQueue.GetDeservedShare()
underDeserved := isUnderDeservedForManagedResources(allocated, deserved)

log.InfraLogger.V(3).Infof(
"Reclaim consolidation check for <%s/%s> in queue <%s>: "+
"totalVictims=%d skippedVictims=%d (all re-pipelined). "+
"Reclaimer allocated+request=%s deserved=%s underDeserved=%v",
reclaimerInfo.Namespace, reclaimerInfo.Name, reclaimerQueue.Name,
totalVictims, skippedVictims, allocated, deserved, underDeserved)

if !underDeserved {
log.InfraLogger.V(2).Infof(
"Rejecting consolidating reclaim for <%s/%s>: reclaimer queue <%s> "+
"would exceed deserved quota. %d victims were re-pipelined but disruption is not justified",
reclaimerInfo.Namespace, reclaimerInfo.Name, reclaimerQueue.Name, totalVictims)
metrics.RecordReclaimConsolidationEmptyVictimMap(reclaimerQueue.Name, "rejected")
return false
}

log.InfraLogger.V(3).Infof(
"Allowing consolidating reclaim for <%s/%s>: reclaimer queue <%s> is under deserved quota",
reclaimerInfo.Namespace, reclaimerInfo.Name, reclaimerQueue.Name)
metrics.RecordReclaimConsolidationEmptyVictimMap(reclaimerQueue.Name, "approved")
return true
}

// isUnderDeservedForManagedResources checks allocated <= deserved only for
// resources where the queue has a real quota (deserved > 0 and not unlimited).
// Resources with deserved=0 or unlimited are not managed by the quota system
// and should not block consolidation.
func isUnderDeservedForManagedResources(allocated, deserved rs.ResourceQuantities) bool {
for _, resource := range rs.AllResources {
d := deserved[resource]
if d <= 0 || d == commonconstants.UnlimitedResourceQuantity {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that mean that if I lower a queues resource quota to 0 it might suddenly be able to schedule more jobs?

continue
}
if allocated[resource] > d {
return false
}
}
return true
}

func (pp *proportionPlugin) getVictimResources(victim *api.VictimInfo) []resource_info.ResourceVector {
var victimResources []resource_info.ResourceVector

Expand Down
100 changes: 100 additions & 0 deletions pkg/scheduler/plugins/proportion/proportion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/conf"
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/framework"
k8splugins "github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/k8s_internal/plugins"
rec "github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/plugins/proportion/reclaimable"
rs "github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/plugins/proportion/resource_share"
)

Expand Down Expand Up @@ -1004,4 +1005,103 @@ var _ = Describe("New", func() {
Expect(plugin.relcaimerSaturationMultiplier).To(Equal(1.0))
})
})

Context("handleAllVictimsConsolidated", func() {
makeQueue := func(name string, allocated, deserved float64) *rs.QueueAttributes {
q := &rs.QueueAttributes{
UID: common_info.QueueID(name),
Name: name,
}
q.SetQuotaResources(rs.GpuResource, deserved, commonconstants.UnlimitedResourceQuantity, 0)
q.SetQuotaResources(rs.CpuResource, commonconstants.UnlimitedResourceQuantity, commonconstants.UnlimitedResourceQuantity, 0)
q.SetQuotaResources(rs.MemoryResource, commonconstants.UnlimitedResourceQuantity, commonconstants.UnlimitedResourceQuantity, 0)
q.GPU.Allocated = allocated
return q
}

It("should reject when reclaimer would exceed deserved quota", func() {
plugin := &proportionPlugin{
jobSimulationQueues: map[common_info.QueueID]*rs.QueueAttributes{
"pre-training": makeQueue("pre-training", 6215, 5120),
},
}
reclaimerInfo := &rec.ReclaimerInfo{
Name: "test-job",
Namespace: "pre-training",
Queue: "pre-training",
RequiredResources: resource_info.NewResourceVectorWithValues(0, 0, 32, testVectorMap),
VectorMap: testVectorMap,
IsPreemptable: true,
}
result := plugin.handleAllVictimsConsolidated(reclaimerInfo, 15, 15)
Expect(result).To(BeFalse())
})

It("should approve when reclaimer stays within deserved quota", func() {
plugin := &proportionPlugin{
jobSimulationQueues: map[common_info.QueueID]*rs.QueueAttributes{
"queue1": makeQueue("queue1", 0, 2),
},
}
reclaimerInfo := &rec.ReclaimerInfo{
Name: "test-job",
Namespace: "queue1",
Queue: "queue1",
RequiredResources: resource_info.NewResourceVectorWithValues(0, 0, 2, testVectorMap),
VectorMap: testVectorMap,
IsPreemptable: true,
}
result := plugin.handleAllVictimsConsolidated(reclaimerInfo, 1, 1)
Expect(result).To(BeTrue())
})

It("should approve GPU-starved queue even when CPU/Memory deserved is zero", func() {
q := &rs.QueueAttributes{
UID: "gpu-queue",
Name: "gpu-queue",
}
q.SetQuotaResources(rs.GpuResource, 4, commonconstants.UnlimitedResourceQuantity, 0)
q.SetQuotaResources(rs.CpuResource, 0, commonconstants.UnlimitedResourceQuantity, 0)
q.SetQuotaResources(rs.MemoryResource, 0, commonconstants.UnlimitedResourceQuantity, 0)
q.GPU.Allocated = 1
q.CPU.Allocated = 500
q.Memory.Allocated = 1000

plugin := &proportionPlugin{
jobSimulationQueues: map[common_info.QueueID]*rs.QueueAttributes{
"gpu-queue": q,
},
}
reclaimerInfo := &rec.ReclaimerInfo{
Name: "test-job",
Namespace: "ns",
Queue: "gpu-queue",
RequiredResources: resource_info.NewResourceVectorWithValues(100, 500, 2, testVectorMap),
VectorMap: testVectorMap,
IsPreemptable: true,
}
result := plugin.handleAllVictimsConsolidated(reclaimerInfo, 1, 1)
Expect(result).To(BeTrue())
})
})

Context("isUnderDeservedForManagedResources", func() {
It("should return true when all managed resources are under deserved", func() {
allocated := rs.ResourceQuantities{rs.GpuResource: 3, rs.CpuResource: 100, rs.MemoryResource: 200}
deserved := rs.ResourceQuantities{rs.GpuResource: 5, rs.CpuResource: commonconstants.UnlimitedResourceQuantity, rs.MemoryResource: commonconstants.UnlimitedResourceQuantity}
Expect(isUnderDeservedForManagedResources(allocated, deserved)).To(BeTrue())
})

It("should return false when GPU exceeds deserved", func() {
allocated := rs.ResourceQuantities{rs.GpuResource: 6, rs.CpuResource: 100, rs.MemoryResource: 200}
deserved := rs.ResourceQuantities{rs.GpuResource: 5, rs.CpuResource: commonconstants.UnlimitedResourceQuantity, rs.MemoryResource: commonconstants.UnlimitedResourceQuantity}
Expect(isUnderDeservedForManagedResources(allocated, deserved)).To(BeFalse())
})

It("should skip resources with deserved=0", func() {
allocated := rs.ResourceQuantities{rs.GpuResource: 2, rs.CpuResource: 500, rs.MemoryResource: 1000}
deserved := rs.ResourceQuantities{rs.GpuResource: 4, rs.CpuResource: 0, rs.MemoryResource: 0}
Expect(isUnderDeservedForManagedResources(allocated, deserved)).To(BeTrue())
})
})
})