diff --git a/CHANGELOG.md b/CHANGELOG.md index e348caa91..a83229041 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## [v0.9.17] - 2026-05-07 ### Fixed +- Improved reclaim, preempt, and consolidation performance by skipping solver work for jobs blocked by victim-invariant pre-predicate failures such as missing PVCs, missing required ConfigMaps, and tasks larger than any node. [#1502](https://github.com/kai-scheduler/KAI-Scheduler/issues/1502) - Suppressed noisy `Reconciler error` logs and `PodGrouperWarning` events on transient PodGroup update conflicts. The podgrouper now treats `IsConflict` errors as expected and silently requeues the reconcile instead of surfacing the apiserver's "object has been modified" message. ## [v0.9.16] - 2026-04-29 diff --git a/pkg/scheduler/actions/common/action_eligibility.go b/pkg/scheduler/actions/common/action_eligibility.go new file mode 100644 index 000000000..c42f45375 --- /dev/null +++ b/pkg/scheduler/actions/common/action_eligibility.go @@ -0,0 +1,43 @@ +// Copyright 2026 NVIDIA CORPORATION +// SPDX-License-Identifier: Apache-2.0 + +package common + +import ( + "fmt" + + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/common_info" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/pod_info" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/podgroup_info" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/framework" +) + +func VictimInvariantPrePredicateFailureForTasks( + ssn *framework.Session, + tasks []*pod_info.PodInfo, +) (*pod_info.PodInfo, *api.VictimInvariantPrePredicateFailure) { + for _, task := range tasks { + if failure := ssn.VictimInvariantPrePredicateFailure(task); failure != nil { + return task, failure + } + } + + return nil, nil +} + +func RecordVictimInvariantPrePredicateFailure( + job *podgroup_info.PodGroupInfo, + task *pod_info.PodInfo, + failure *api.VictimInvariantPrePredicateFailure, +) { + fitErrors := common_info.NewFitErrors() + fitErrors.SetError(failure.Err.Error()) + job.SetTaskFitError(task, fitErrors) + job.SetJobFitError( + podgroup_info.PodSchedulingErrors, + fmt.Sprintf("Resources were not found for pod %s/%s due to: %s", + task.Namespace, task.Name, fitErrors.Error()), + nil, + ) +} diff --git a/pkg/scheduler/actions/common/action_eligibility_test.go b/pkg/scheduler/actions/common/action_eligibility_test.go new file mode 100644 index 000000000..581c60972 --- /dev/null +++ b/pkg/scheduler/actions/common/action_eligibility_test.go @@ -0,0 +1,60 @@ +// Copyright 2026 NVIDIA CORPORATION +// SPDX-License-Identifier: Apache-2.0 + +package common + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/types" + + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/common_info" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/pod_info" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/podgroup_info" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/framework" +) + +func TestVictimInvariantPrePredicateFailureForTasks(t *testing.T) { + task1 := &pod_info.PodInfo{UID: common_info.PodID(types.UID("task-1")), Name: "task-1", Namespace: "ns1"} + task2 := &pod_info.PodInfo{UID: common_info.PodID(types.UID("task-2")), Name: "task-2", Namespace: "ns1"} + expectedErr := errors.New("missing dependency") + + var seenTasks []common_info.PodID + ssn := &framework.Session{} + ssn.AddVictimInvariantPrePredicateFn(func(task *pod_info.PodInfo) *api.VictimInvariantPrePredicateFailure { + seenTasks = append(seenTasks, task.UID) + if task.UID != task2.UID { + return nil + } + return &api.VictimInvariantPrePredicateFailure{ + Err: expectedErr, + } + }) + + blockedTask, failure := VictimInvariantPrePredicateFailureForTasks(ssn, []*pod_info.PodInfo{task1, task2}) + require.NotNil(t, failure) + require.Same(t, task2, blockedTask) + require.Same(t, expectedErr, failure.Err) + require.Equal(t, []common_info.PodID{task1.UID, task2.UID}, seenTasks) +} + +func TestRecordVictimInvariantPrePredicateFailure(t *testing.T) { + job := podgroup_info.NewPodGroupInfo("job-1") + job.Name = "job-1" + job.Namespace = "ns1" + task := &pod_info.PodInfo{UID: common_info.PodID(types.UID("task-1")), Name: "task-1", Namespace: "ns1"} + failure := &api.VictimInvariantPrePredicateFailure{ + Err: errors.New("persistentvolumeclaim \"missing\" not found"), + } + + RecordVictimInvariantPrePredicateFailure(job, task, failure) + + taskFitError, found := job.NodesFitErrors[task.UID] + require.True(t, found) + require.Contains(t, taskFitError.Error(), `persistentvolumeclaim "missing" not found`) + require.Len(t, job.JobFitErrors, 1) + require.Contains(t, job.JobFitErrors[0].Message, "Resources were not found for pod ns1/task-1 due to:") +} diff --git a/pkg/scheduler/actions/consolidation/consolidation.go b/pkg/scheduler/actions/consolidation/consolidation.go index 57a45a33d..53adc1d19 100644 --- a/pkg/scheduler/actions/consolidation/consolidation.go +++ b/pkg/scheduler/actions/consolidation/consolidation.go @@ -63,6 +63,11 @@ func (alloc *consolidationAction) Execute(ssn *framework.Session) { continue } } + tasks := podgroup_info.GetTasksToAllocate(job, ssn.SubGroupOrderFn, ssn.TaskOrderFn, false) + if task, failure := common.VictimInvariantPrePredicateFailureForTasks(ssn, tasks); failure != nil { + common.RecordVictimInvariantPrePredicateFailure(job, task, failure) + continue + } metrics.IncPodgroupsConsideredByAction() if succeeded, stmt := attemptToConsolidateForPreemptor(ssn, job); succeeded { diff --git a/pkg/scheduler/actions/consolidation/consolidation_victim_invariant_prefilter_test.go b/pkg/scheduler/actions/consolidation/consolidation_victim_invariant_prefilter_test.go new file mode 100644 index 000000000..b0f3de251 --- /dev/null +++ b/pkg/scheduler/actions/consolidation/consolidation_victim_invariant_prefilter_test.go @@ -0,0 +1,92 @@ +// Copyright 2026 NVIDIA CORPORATION +// SPDX-License-Identifier: Apache-2.0 + +package consolidation_test + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/require" + . "go.uber.org/mock/gomock" + + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/actions/consolidation" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/common_info" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/pod_info" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/pod_status" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/constants" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/test_utils" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/test_utils/jobs_fake" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/test_utils/nodes_fake" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/test_utils/tasks_fake" +) + +func TestConsolidationSkipsSolverForVictimInvariantPrePredicateFailure(t *testing.T) { + test_utils.InitTestingInfrastructure() + controller := NewController(t) + defer controller.Finish() + + ssn := test_utils.BuildSession(test_utils.TestTopologyBasic{ + Name: "consolidation victim invariant blocker", + Jobs: []*jobs_fake.TestJobBasic{ + { + Name: "running-job-0", + RequiredGPUsPerTask: 2, + Priority: constants.PriorityTrainNumber, + QueueName: "queue0", + Tasks: []*tasks_fake.TestTaskBasic{{ + NodeName: "node0", + State: pod_status.Running, + }}, + }, + { + Name: "running-job-1", + RequiredGPUsPerTask: 2, + Priority: constants.PriorityTrainNumber, + QueueName: "queue0", + Tasks: []*tasks_fake.TestTaskBasic{{ + NodeName: "node1", + State: pod_status.Running, + }}, + }, + { + Name: "pending-job", + RequiredGPUsPerTask: 3, + Priority: constants.PriorityTrainNumber, + QueueName: "queue0", + Tasks: []*tasks_fake.TestTaskBasic{{ + State: pod_status.Pending, + }}, + }, + }, + Nodes: map[string]nodes_fake.TestNodeBasic{ + "node0": {GPUs: 4}, + "node1": {GPUs: 4}, + }, + Queues: []test_utils.TestQueueBasic{{ + Name: "queue0", + DeservedGPUs: 2, + }}, + }, controller) + + job := ssn.PodGroupInfos[common_info.PodGroupID("pending-job")] + task := job.GetAllPodsMap()[common_info.PodID("pending-job-0")] + blockerErr := errors.New("blocked before consolidation") + calls := 0 + ssn.AddVictimInvariantPrePredicateFn(func(gotTask *pod_info.PodInfo) *api.VictimInvariantPrePredicateFailure { + calls++ + require.Same(t, task, gotTask) + return &api.VictimInvariantPrePredicateFailure{ + Err: blockerErr, + } + }) + + consolidation.New().Execute(ssn) + + require.Positive(t, calls) + require.Equal(t, pod_status.Pending, task.Status) + require.Contains(t, job.NodesFitErrors[task.UID].Error(), blockerErr.Error()) + require.NotEmpty(t, job.JobFitErrors) + require.Contains(t, job.JobFitErrors[0].Message, "Resources were not found for pod /pending-job-0 due to:") +} diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index e9cb0df73..f093e9a5b 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -76,6 +76,11 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) { continue } } + tasks := podgroup_info.GetTasksToAllocate(job, ssn.SubGroupOrderFn, ssn.TaskOrderFn, false) + if task, failure := common.VictimInvariantPrePredicateFailureForTasks(ssn, tasks); failure != nil { + common.RecordVictimInvariantPrePredicateFailure(job, task, failure) + continue + } metrics.IncPodgroupsConsideredByAction() succeeded, statement, preemptedTasksNames := attemptToPreemptForPreemptor(ssn, job) diff --git a/pkg/scheduler/actions/preempt/preempt_victim_invariant_prefilter_test.go b/pkg/scheduler/actions/preempt/preempt_victim_invariant_prefilter_test.go new file mode 100644 index 000000000..c89562109 --- /dev/null +++ b/pkg/scheduler/actions/preempt/preempt_victim_invariant_prefilter_test.go @@ -0,0 +1,81 @@ +// Copyright 2026 NVIDIA CORPORATION +// SPDX-License-Identifier: Apache-2.0 + +package preempt_test + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/require" + . "go.uber.org/mock/gomock" + + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/actions/preempt" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/common_info" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/pod_info" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/pod_status" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/constants" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/test_utils" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/test_utils/jobs_fake" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/test_utils/nodes_fake" + "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/test_utils/tasks_fake" +) + +func TestPreemptSkipsSolverForVictimInvariantPrePredicateFailure(t *testing.T) { + test_utils.InitTestingInfrastructure() + controller := NewController(t) + defer controller.Finish() + + ssn := test_utils.BuildSession(test_utils.TestTopologyBasic{ + Name: "preempt victim invariant blocker", + Jobs: []*jobs_fake.TestJobBasic{ + { + Name: "running-job", + RequiredGPUsPerTask: 1, + Priority: constants.PriorityTrainNumber, + QueueName: "queue0", + Tasks: []*tasks_fake.TestTaskBasic{{ + NodeName: "node0", + State: pod_status.Running, + }}, + }, + { + Name: "pending-job", + RequiredGPUsPerTask: 1, + Priority: constants.PriorityBuildNumber, + QueueName: "queue0", + Tasks: []*tasks_fake.TestTaskBasic{{ + State: pod_status.Pending, + }}, + }, + }, + Nodes: map[string]nodes_fake.TestNodeBasic{ + "node0": {GPUs: 1}, + }, + Queues: []test_utils.TestQueueBasic{{ + Name: "queue0", + DeservedGPUs: 1, + }}, + }, controller) + + job := ssn.PodGroupInfos[common_info.PodGroupID("pending-job")] + task := job.GetAllPodsMap()[common_info.PodID("pending-job-0")] + blockerErr := errors.New("blocked before preempt") + calls := 0 + ssn.AddVictimInvariantPrePredicateFn(func(gotTask *pod_info.PodInfo) *api.VictimInvariantPrePredicateFailure { + calls++ + require.Same(t, task, gotTask) + return &api.VictimInvariantPrePredicateFailure{ + Err: blockerErr, + } + }) + + preempt.New().Execute(ssn) + + require.Positive(t, calls) + require.Equal(t, pod_status.Pending, task.Status) + require.Contains(t, job.NodesFitErrors[task.UID].Error(), blockerErr.Error()) + require.NotEmpty(t, job.JobFitErrors) + require.Contains(t, job.JobFitErrors[0].Message, "Resources were not found for pod /pending-job-0 due to:") +} diff --git a/pkg/scheduler/actions/reclaim/reclaim.go b/pkg/scheduler/actions/reclaim/reclaim.go index 7771756c6..0e29f73a2 100644 --- a/pkg/scheduler/actions/reclaim/reclaim.go +++ b/pkg/scheduler/actions/reclaim/reclaim.go @@ -80,6 +80,11 @@ func (ra *reclaimAction) Execute(ssn *framework.Session) { continue } } + tasks := podgroup_info.GetTasksToAllocate(job, ssn.SubGroupOrderFn, ssn.TaskOrderFn, false) + if task, failure := common.VictimInvariantPrePredicateFailureForTasks(ssn, tasks); failure != nil { + common.RecordVictimInvariantPrePredicateFailure(job, task, failure) + continue + } metrics.IncPodgroupsConsideredByAction() succeeded, statement, reclaimeeTasksNames := ra.attemptToReclaimForSpecificJob(ssn, job) if succeeded { diff --git a/pkg/scheduler/api/types.go b/pkg/scheduler/api/types.go index eeb08e593..dbaf847f4 100644 --- a/pkg/scheduler/api/types.go +++ b/pkg/scheduler/api/types.go @@ -34,6 +34,18 @@ type PredicateFn func(*pod_info.PodInfo, *podgroup_info.PodGroupInfo, *node_info // PrePredicateFn is used to prepare for predicate on pod. type PrePredicateFn func(*pod_info.PodInfo, *podgroup_info.PodGroupInfo) error +// VictimInvariantPrePredicateFailure is an action-level pre-predicate blocker +// that cannot be resolved by changing the victim set in the current session. +type VictimInvariantPrePredicateFailure struct { + Err error +} + +// VictimInvariantPrePredicateFn returns a victim-invariant blocker for a task, +// or nil when action handling should continue normally. +type VictimInvariantPrePredicateFn func( + *pod_info.PodInfo, +) *VictimInvariantPrePredicateFailure + // CanReclaimResourcesFn is a function that determines if a reclaimer can get more resources type CanReclaimResourcesFn func(pendingJob *podgroup_info.PodGroupInfo) bool diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index 20a54ee2d..ba7f39222 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -82,6 +82,7 @@ type Session struct { IsJobOverCapacityFns []api.IsJobOverCapacityFn IsTaskAllocationOnNodeOverCapacityFns []api.IsTaskAllocationOverCapacityFn PrePredicateFns []api.PrePredicateFn + VictimInvariantPrePredicateFns []api.VictimInvariantPrePredicateFn PredicateFns []api.PredicateFn BindRequestMutateFns []api.BindRequestMutateFn CleanAllocationAttemptCacheFns []api.CleanAllocationAttemptCacheFn diff --git a/pkg/scheduler/framework/session_plugins.go b/pkg/scheduler/framework/session_plugins.go index 38a5466f2..646ee457d 100644 --- a/pkg/scheduler/framework/session_plugins.go +++ b/pkg/scheduler/framework/session_plugins.go @@ -49,6 +49,9 @@ func (ssn *Session) AddPrePredicateFn(pf api.PrePredicateFn) { ssn.PrePredicateFns = append(ssn.PrePredicateFns, pf) } +func (ssn *Session) AddVictimInvariantPrePredicateFn(pf api.VictimInvariantPrePredicateFn) { + ssn.VictimInvariantPrePredicateFns = append(ssn.VictimInvariantPrePredicateFns, pf) +} func (ssn *Session) AddPredicateFn(pf api.PredicateFn) { ssn.PredicateFns = append(ssn.PredicateFns, pf) } @@ -330,6 +333,18 @@ func (ssn *Session) PrePredicateFn(task *pod_info.PodInfo, job *podgroup_info.Po return nil } +func (ssn *Session) VictimInvariantPrePredicateFailure( + task *pod_info.PodInfo, +) *api.VictimInvariantPrePredicateFailure { + for _, prePredicate := range ssn.VictimInvariantPrePredicateFns { + if failure := prePredicate(task); failure != nil { + return failure + } + } + + return nil +} + func (ssn *Session) PredicateFn(task *pod_info.PodInfo, job *podgroup_info.PodGroupInfo, node *node_info.NodeInfo) error { for _, pfn := range ssn.PredicateFns { err := pfn(task, job, node) diff --git a/pkg/scheduler/framework/session_plugins_test.go b/pkg/scheduler/framework/session_plugins_test.go index c95b4eec0..731a4eb39 100644 --- a/pkg/scheduler/framework/session_plugins_test.go +++ b/pkg/scheduler/framework/session_plugins_test.go @@ -6,6 +6,7 @@ SPDX-License-Identifier: Apache-2.0 package framework import ( + "errors" "testing" "github.com/stretchr/testify/assert" @@ -86,3 +87,39 @@ func TestMutateBindRequestAnnotations(t *testing.T) { }) } } + +func TestVictimInvariantPrePredicateFailure(t *testing.T) { + task := &pod_info.PodInfo{Name: "task-1"} + expectedErr := errors.New("missing pvc") + + t.Run("returns nil when no functions are registered", func(t *testing.T) { + ssn := &Session{} + assert.Nil(t, ssn.VictimInvariantPrePredicateFailure(task)) + }) + + t.Run("returns the first non-nil failure", func(t *testing.T) { + ssn := &Session{} + secondCalled := false + ssn.AddVictimInvariantPrePredicateFn(func(_ *pod_info.PodInfo) *api.VictimInvariantPrePredicateFailure { + return nil + }) + ssn.AddVictimInvariantPrePredicateFn(func(gotTask *pod_info.PodInfo) *api.VictimInvariantPrePredicateFailure { + assert.Same(t, task, gotTask) + return &api.VictimInvariantPrePredicateFailure{ + Err: expectedErr, + } + }) + ssn.AddVictimInvariantPrePredicateFn(func(_ *pod_info.PodInfo) *api.VictimInvariantPrePredicateFailure { + secondCalled = true + return &api.VictimInvariantPrePredicateFailure{ + Err: errors.New("should not be returned"), + } + }) + + failure := ssn.VictimInvariantPrePredicateFailure(task) + if assert.NotNil(t, failure) { + assert.Same(t, expectedErr, failure.Err) + } + assert.False(t, secondCalled) + }) +} diff --git a/pkg/scheduler/k8s_internal/predicates/config_maps.go b/pkg/scheduler/k8s_internal/predicates/config_maps.go index bc9e19842..8b3d5d240 100644 --- a/pkg/scheduler/k8s_internal/predicates/config_maps.go +++ b/pkg/scheduler/k8s_internal/predicates/config_maps.go @@ -61,7 +61,8 @@ func (cmp *ConfigMapPredicate) PreFilter(ctx context.Context, _ ksf.CycleState, } if len(missingConfigMaps) > 0 { - return nil, ksf.NewStatus(ksf.Unschedulable, fmt.Sprintf("Missing required configmaps: %v", missingConfigMaps)) + return nil, ksf.NewStatus(ksf.UnschedulableAndUnresolvable, + fmt.Sprintf("Missing required configmaps: %v", missingConfigMaps)) } return nil, nil diff --git a/pkg/scheduler/k8s_internal/predicates/config_maps_test.go b/pkg/scheduler/k8s_internal/predicates/config_maps_test.go index e228ad2e8..4d2f8dba4 100644 --- a/pkg/scheduler/k8s_internal/predicates/config_maps_test.go +++ b/pkg/scheduler/k8s_internal/predicates/config_maps_test.go @@ -9,6 +9,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ksf "k8s.io/kube-scheduler/framework" "github.com/NVIDIA/KAI-scheduler/pkg/common/constants" "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/common_info" @@ -641,3 +642,42 @@ type FilterTest struct { pod *v1.Pod expectedError bool } + +func TestPreFilterMissingRequiredConfigMapsReturnsUnschedulableAndUnresolvable(t *testing.T) { + cmp := NewConfigMapPredicate(nil) + _, status := cmp.PreFilter(context.Background(), nil, &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "test-namespace", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + VolumeMounts: []v1.VolumeMount{ + { + Name: "required-configmap-volume", + }, + }, + }, + }, + Volumes: []v1.Volume{ + { + Name: "required-configmap-volume", + VolumeSource: v1.VolumeSource{ + ConfigMap: &v1.ConfigMapVolumeSource{ + LocalObjectReference: v1.LocalObjectReference{ + Name: "required-cm", + }, + }, + }, + }, + }, + }, + }, nil) + if status == nil { + t.Fatal("PreFilter() returned nil status for a missing required ConfigMap") + } + if status.Code() != ksf.UnschedulableAndUnresolvable { + t.Fatalf("PreFilter() code = %v, want %v", status.Code(), ksf.UnschedulableAndUnresolvable) + } +} diff --git a/pkg/scheduler/k8s_internal/predicates/maxNodeResources.go b/pkg/scheduler/k8s_internal/predicates/maxNodeResources.go index f14869e72..28d972565 100644 --- a/pkg/scheduler/k8s_internal/predicates/maxNodeResources.go +++ b/pkg/scheduler/k8s_internal/predicates/maxNodeResources.go @@ -53,23 +53,23 @@ func (mnr *MaxNodeResourcesPredicate) PreFilter(_ context.Context, _ ksf.CycleSt podInfo := pod_info.NewTaskInfo(pod) if podInfo.ResReq.GPUs() > mnr.maxResources.GPUs() { - return nil, ksf.NewStatus(ksf.Unschedulable, + return nil, ksf.NewStatus(ksf.UnschedulableAndUnresolvable, mnr.buildUnschedulableMessage(podInfo, "GPU", mnr.maxResources.GPUs(), "")) } if podInfo.ResReq.Cpu() > mnr.maxResources.Cpu() { - return nil, ksf.NewStatus(ksf.Unschedulable, + return nil, ksf.NewStatus(ksf.UnschedulableAndUnresolvable, mnr.buildUnschedulableMessage(podInfo, "CPU", mnr.maxResources.Cpu()/resource_info.MilliCPUToCores, "cores")) } if podInfo.ResReq.Memory() > mnr.maxResources.Memory() { - return nil, ksf.NewStatus(ksf.Unschedulable, + return nil, ksf.NewStatus(ksf.UnschedulableAndUnresolvable, mnr.buildUnschedulableMessage(podInfo, "memory", mnr.maxResources.Memory()/resource_info.MemoryToGB, "GB")) } for rName, rQuant := range podInfo.ResReq.ScalarResources() { rrQuant, found := mnr.maxResources.ScalarResources()[rName] if !found || rQuant > rrQuant { - return nil, ksf.NewStatus(ksf.Unschedulable, + return nil, ksf.NewStatus(ksf.UnschedulableAndUnresolvable, mnr.buildUnschedulableMessage(podInfo, string(rName), float64(rrQuant), "")) } } diff --git a/pkg/scheduler/k8s_internal/predicates/maxNodeResources_test.go b/pkg/scheduler/k8s_internal/predicates/maxNodeResources_test.go index ddfa6f12e..1c7109578 100644 --- a/pkg/scheduler/k8s_internal/predicates/maxNodeResources_test.go +++ b/pkg/scheduler/k8s_internal/predicates/maxNodeResources_test.go @@ -109,7 +109,7 @@ func Test_podToMaxNodeResourcesFiltering(t *testing.T) { }, }, expected{ - ksf.NewStatus(ksf.Unschedulable, + ksf.NewStatus(ksf.UnschedulableAndUnresolvable, "The pod n1/name1 requires GPU: 0, CPU: 1 (cores), memory: 0 (GB). Max CPU resources available in a single node in the default node-pool is topped at 0.5 cores"), }, }, @@ -154,7 +154,7 @@ func Test_podToMaxNodeResourcesFiltering(t *testing.T) { }, }, expected{ - ksf.NewStatus(ksf.Unschedulable, + ksf.NewStatus(ksf.UnschedulableAndUnresolvable, "The pod n1/name1 requires GPU: 0, CPU: 0 (cores), memory: 1 (GB). Max memory resources available in a single node in the default node-pool is topped at 0.419 GB"), }, }, @@ -199,7 +199,7 @@ func Test_podToMaxNodeResourcesFiltering(t *testing.T) { }, }, expected{ - ksf.NewStatus(ksf.Unschedulable, + ksf.NewStatus(ksf.UnschedulableAndUnresolvable, "The pod n1/name1 requires GPU: 2, CPU: 0 (cores), memory: 0 (GB). Max GPU resources available in a single node in the default node-pool is topped at 1"), }, }, @@ -243,7 +243,7 @@ func Test_podToMaxNodeResourcesFiltering(t *testing.T) { }, }, expected{ - ksf.NewStatus(ksf.Unschedulable, + ksf.NewStatus(ksf.UnschedulableAndUnresolvable, "The pod n1/name1 requires GPU: 0.5, CPU: 0 (cores), memory: 0 (GB). No node in the default node-pool has GPU resources"), }, }, diff --git a/pkg/scheduler/plugins/predicates/predicates.go b/pkg/scheduler/plugins/predicates/predicates.go index cd5c49cd8..a70b439d8 100644 --- a/pkg/scheduler/plugins/predicates/predicates.go +++ b/pkg/scheduler/plugins/predicates/predicates.go @@ -21,6 +21,7 @@ package predicates import ( "fmt" + "slices" "strings" "k8s.io/apimachinery/pkg/util/sets" @@ -46,6 +47,12 @@ const ( prePredicateReasonsFormat = " Reasons: %s" ) +var victimInvariantPrePredicateCandidates = []k8s_internal.PredicateName{ + predicates.VolumeBinding, + predicates.ConfigMap, + predicates.MaxNodePoolResources, +} + type prePredicateError struct { name string err error @@ -85,12 +92,24 @@ func (sp SkipPredicates) ShouldSKip(podID common_info.PodID, predicateName k8s_i return skip && found } +type cachedPrePredicateResult struct { + required bool + nodes sets.Set[string] + status *ksf.Status +} + +type prePredicateCacheKey struct { + podID common_info.PodID + predicateName k8s_internal.PredicateName +} + type predicatesPlugin struct { // Arguments given for the plugin pluginArguments map[string]string storageSchedulingEnabled bool - skipPredicates SkipPredicates + skipPredicates SkipPredicates + prePredicateCache map[prePredicateCacheKey]cachedPrePredicateResult } func New(arguments map[string]string) framework.Plugin { @@ -108,9 +127,13 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { pp.storageSchedulingEnabled = ssn.ScheduleCSIStorage() pp.skipPredicates = SkipPredicates{} + pp.resetPrePredicateCache() ssn.AddPrePredicateFn(func(task *pod_info.PodInfo, _ *podgroup_info.PodGroupInfo) error { - return evaluateTaskOnPrePredicate(task, k8sPredicates, pp.skipPredicates) + return pp.evaluateTaskOnPrePredicate(task, k8sPredicates) + }) + ssn.AddVictimInvariantPrePredicateFn(func(task *pod_info.PodInfo) *api.VictimInvariantPrePredicateFailure { + return pp.evaluateTaskOnVictimInvariantPrePredicates(task, k8sPredicates) }) ssn.AddPredicateFn(func(task *pod_info.PodInfo, job *podgroup_info.PodGroupInfo, node *node_info.NodeInfo) error { @@ -119,21 +142,67 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { }) } -func evaluateTaskOnPrePredicate(task *pod_info.PodInfo, k8sPredicates k8s_internal.SessionPredicates, - skipPredicates SkipPredicates, +func (pp *predicatesPlugin) evaluateSinglePrePredicate( + task *pod_info.PodInfo, + predicateName k8s_internal.PredicateName, + predicate k8s_internal.SessionPredicate, +) (sets.Set[string], *ksf.Status, bool) { + cacheKey := prePredicateCacheKey{podID: task.UID, predicateName: predicateName} + shouldCache := isVictimInvariantPrePredicateCandidate(predicateName) + if shouldCache { + if cachedResult, found := pp.prePredicateCache[cacheKey]; found { + if cachedResult.status != nil && cachedResult.status.IsSkip() { + pp.skipPredicates.Add(task.UID, predicateName) + } + return cachedResult.nodes, cachedResult.status, cachedResult.required + } + } + + if !predicate.IsPreFilterRequired(task.Pod) { + if shouldCache { + pp.storePrePredicateResult(cacheKey, cachedPrePredicateResult{required: false}) + } + return nil, nil, false + } + + nodes, status := predicate.PreFilter(task.Pod) + if status != nil && status.IsSkip() { + pp.skipPredicates.Add(task.UID, predicateName) + } + if shouldCache { + pp.storePrePredicateResult(cacheKey, cachedPrePredicateResult{ + required: true, + nodes: nodes, + status: status, + }) + } + return nodes, status, true +} + +func (pp *predicatesPlugin) storePrePredicateResult( + cacheKey prePredicateCacheKey, + result cachedPrePredicateResult, +) { + pp.prePredicateCache[cacheKey] = result +} + +func (pp *predicatesPlugin) resetPrePredicateCache() { + pp.prePredicateCache = map[prePredicateCacheKey]cachedPrePredicateResult{} +} + +func (pp *predicatesPlugin) evaluateTaskOnPrePredicate( + task *pod_info.PodInfo, + k8sPredicates k8s_internal.SessionPredicates, ) error { var allErrors []prePredicateError var allowedNodes sets.Set[string] = nil for name, predicate := range k8sPredicates { - if !predicate.IsPreFilterRequired(task.Pod) { + nodes, status, required := pp.evaluateSinglePrePredicate(task, name, predicate) + if !required { continue } - nodes, status := predicate.PreFilter(task.Pod) - if status.IsSkip() { - skipPredicates.Add(task.UID, name) - } - if status.AsError() != nil { + if status != nil && status.AsError() != nil { allErrors = append(allErrors, newPrePredicateError(string(name), *status)) } else { if allowedNodes == nil { @@ -153,6 +222,54 @@ func evaluateTaskOnPrePredicate(task *pod_info.PodInfo, k8sPredicates k8s_intern return nil } +func (pp *predicatesPlugin) evaluateTaskOnVictimInvariantPrePredicates( + task *pod_info.PodInfo, + k8sPredicates k8s_internal.SessionPredicates, +) *api.VictimInvariantPrePredicateFailure { + for _, name := range victimInvariantPrePredicateCandidates { + predicate, found := k8sPredicates[name] + if !found { + continue + } + + _, status, required := pp.evaluateSinglePrePredicate(task, name, predicate) + if !required || status == nil || status.IsSkip() { + continue + } + + if failure := classifyVictimInvariantPrePredicateFailure(name, status); failure != nil { + return failure + } + } + + return nil +} + +func classifyVictimInvariantPrePredicateFailure( + predicateName k8s_internal.PredicateName, + status *ksf.Status, +) *api.VictimInvariantPrePredicateFailure { + if !isVictimInvariantPrePredicateCandidate(predicateName) || status == nil { + return nil + } + + if status.Code() != ksf.UnschedulableAndUnresolvable { + return nil + } + + if err := status.AsError(); err != nil { + return &api.VictimInvariantPrePredicateFailure{ + Err: err, + } + } + + return nil +} + +func isVictimInvariantPrePredicateCandidate(predicateName k8s_internal.PredicateName) bool { + return slices.Contains(victimInvariantPrePredicateCandidates, predicateName) +} + func generateErrorLog(allErrors []prePredicateError) string { errorsLog := "" for _, prePredicateError := range allErrors { diff --git a/pkg/scheduler/plugins/predicates/predicates_test.go b/pkg/scheduler/plugins/predicates/predicates_test.go index da7a0c09f..71c7944de 100644 --- a/pkg/scheduler/plugins/predicates/predicates_test.go +++ b/pkg/scheduler/plugins/predicates/predicates_test.go @@ -11,6 +11,9 @@ import ( "testing" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + ksf "k8s.io/kube-scheduler/framework" "k8s.io/utils/pointer" "github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api" @@ -149,8 +152,11 @@ func Test_evaluateTaskOnPrePredicate(t *testing.T) { for _, tt := range tests { t.Logf("Running test: %s", tt.name) t.Run(tt.name, func(t *testing.T) { - skipPredicates := SkipPredicates{} - got := evaluateTaskOnPrePredicate(tt.args.task, tt.args.k8sPredicates, skipPredicates) + pp := &predicatesPlugin{ + skipPredicates: SkipPredicates{}, + prePredicateCache: map[prePredicateCacheKey]cachedPrePredicateResult{}, + } + got := pp.evaluateTaskOnPrePredicate(tt.args.task, tt.args.k8sPredicates) // I use this weird way to compare the 2 errors because sometimes the line order in // the returning error will change @@ -1151,3 +1157,239 @@ func isNonPreemptableTaskOnNodeOverCapacityFnAlwaysSchedulable( Details: nil, } } + +func TestClassifyVictimInvariantPrePredicateFailure(t *testing.T) { + tests := []struct { + name string + predicateName k8s_internal.PredicateName + status *ksf.Status + wantFailure bool + wantErrorPart string + }{ + { + name: "supported predicate with unresolvable failure", + predicateName: predicates.VolumeBinding, + status: ksf.NewStatus(ksf.UnschedulableAndUnresolvable, "persistentvolumeclaim \"missing\" not found"), + wantFailure: true, + wantErrorPart: "persistentvolumeclaim \"missing\" not found", + }, + { + name: "supported predicate with resolvable status is ignored", + predicateName: predicates.VolumeBinding, + status: ksf.NewStatus(ksf.Unschedulable, "resolvable later"), + wantFailure: false, + }, + { + name: "unsupported predicate is ignored", + predicateName: predicates.PodAffinity, + status: ksf.NewStatus(ksf.UnschedulableAndUnresolvable, "not used by the action guard"), + wantFailure: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + failure := classifyVictimInvariantPrePredicateFailure(tt.predicateName, tt.status) + if tt.wantFailure { + if failure == nil { + t.Fatal("classifyVictimInvariantPrePredicateFailure() returned nil, want failure") + } + if !strings.Contains(failure.Err.Error(), tt.wantErrorPart) { + t.Fatalf("classifyVictimInvariantPrePredicateFailure() error = %q, want substring %q", + failure.Err.Error(), tt.wantErrorPart) + } + return + } + + if failure != nil { + t.Fatalf("classifyVictimInvariantPrePredicateFailure() = %v, want nil", failure) + } + }) + } +} + +func TestEvaluateTaskOnVictimInvariantPrePredicates(t *testing.T) { + task := &pod_info.PodInfo{ + UID: common_info.PodID(types.UID("pod-1")), + Name: "p1", + Namespace: "ns1", + Pod: &v1.Pod{}, + } + + configMapCalled := false + maxNodeResourcesCalled := false + pp := &predicatesPlugin{ + skipPredicates: SkipPredicates{}, + prePredicateCache: map[prePredicateCacheKey]cachedPrePredicateResult{}, + } + k8sPredicates := k8s_internal.SessionPredicates{ + predicates.VolumeBinding: { + IsPreFilterRequired: func(_ *v1.Pod) bool { return true }, + PreFilter: func(_ *v1.Pod) (sets.Set[string], *ksf.Status) { + return nil, ksf.NewStatus(ksf.Skip) + }, + }, + predicates.ConfigMap: { + IsPreFilterRequired: func(_ *v1.Pod) bool { return true }, + PreFilter: func(_ *v1.Pod) (sets.Set[string], *ksf.Status) { + configMapCalled = true + return nil, ksf.NewStatus(ksf.UnschedulableAndUnresolvable, "Missing required configmaps: [required-cm]") + }, + }, + predicates.MaxNodePoolResources: { + IsPreFilterRequired: func(_ *v1.Pod) bool { return true }, + PreFilter: func(_ *v1.Pod) (sets.Set[string], *ksf.Status) { + maxNodeResourcesCalled = true + return nil, ksf.NewStatus(ksf.UnschedulableAndUnresolvable, "should not be reached") + }, + }, + } + + failure := pp.evaluateTaskOnVictimInvariantPrePredicates(task, k8sPredicates) + if failure == nil { + t.Fatal("evaluateTaskOnVictimInvariantPrePredicates() returned nil, want failure") + } + if !strings.Contains(failure.Err.Error(), "Missing required configmaps: [required-cm]") { + t.Fatalf("evaluateTaskOnVictimInvariantPrePredicates() error = %q", failure.Err.Error()) + } + if !configMapCalled { + t.Fatal("ConfigMap candidate was not evaluated") + } + if maxNodeResourcesCalled { + t.Fatal("evaluation did not stop after the first victim-invariant failure") + } + if !pp.skipPredicates.ShouldSKip(task.UID, predicates.VolumeBinding) { + t.Fatal("skip predicate state for VolumeBinding was not recorded") + } +} + +func TestPredicatesPluginCachesPrePredicateResultsBetweenGuardAndRegularPath(t *testing.T) { + task := &pod_info.PodInfo{ + UID: common_info.PodID(types.UID("pod-1")), + Name: "p1", + Namespace: "ns1", + Pod: &v1.Pod{}, + } + preFilterCalls := 0 + pp := &predicatesPlugin{ + skipPredicates: SkipPredicates{}, + prePredicateCache: map[prePredicateCacheKey]cachedPrePredicateResult{}, + } + k8sPredicates := k8s_internal.SessionPredicates{ + predicates.ConfigMap: { + IsPreFilterRequired: func(_ *v1.Pod) bool { return true }, + PreFilter: func(_ *v1.Pod) (sets.Set[string], *ksf.Status) { + preFilterCalls++ + return nil, nil + }, + }, + } + + failure := pp.evaluateTaskOnVictimInvariantPrePredicates(task, k8sPredicates) + if failure != nil { + t.Fatalf("evaluateTaskOnVictimInvariantPrePredicates() = %v, want nil", failure) + } + err := pp.evaluateTaskOnPrePredicate(task, k8sPredicates) + if err != nil { + t.Fatalf("evaluateTaskOnPrePredicate() = %v, want nil", err) + } + if preFilterCalls != 1 { + t.Fatalf("PreFilter() call count = %d, want 1", preFilterCalls) + } +} + +func TestPredicatesPluginCachesPrePredicateResultsBetweenRegularAndGuardPath(t *testing.T) { + task := &pod_info.PodInfo{ + UID: common_info.PodID(types.UID("pod-1")), + Name: "p1", + Namespace: "ns1", + Pod: &v1.Pod{}, + } + preFilterCalls := 0 + pp := &predicatesPlugin{ + skipPredicates: SkipPredicates{}, + prePredicateCache: map[prePredicateCacheKey]cachedPrePredicateResult{}, + } + k8sPredicates := k8s_internal.SessionPredicates{ + predicates.ConfigMap: { + IsPreFilterRequired: func(_ *v1.Pod) bool { return true }, + PreFilter: func(_ *v1.Pod) (sets.Set[string], *ksf.Status) { + preFilterCalls++ + return nil, nil + }, + }, + } + + err := pp.evaluateTaskOnPrePredicate(task, k8sPredicates) + if err != nil { + t.Fatalf("evaluateTaskOnPrePredicate() = %v, want nil", err) + } + failure := pp.evaluateTaskOnVictimInvariantPrePredicates(task, k8sPredicates) + if failure != nil { + t.Fatalf("evaluateTaskOnVictimInvariantPrePredicates() = %v, want nil", failure) + } + if preFilterCalls != 1 { + t.Fatalf("PreFilter() call count = %d, want 1", preFilterCalls) + } +} + +func TestPredicatesPluginCachedSkipStatusUpdatesSkipPredicates(t *testing.T) { + task := &pod_info.PodInfo{ + UID: common_info.PodID(types.UID("pod-1")), + Pod: &v1.Pod{}, + } + pp := &predicatesPlugin{ + skipPredicates: SkipPredicates{}, + prePredicateCache: map[prePredicateCacheKey]cachedPrePredicateResult{}, + } + k8sPredicates := k8s_internal.SessionPredicates{ + predicates.VolumeBinding: { + IsPreFilterRequired: func(_ *v1.Pod) bool { return true }, + PreFilter: func(_ *v1.Pod) (sets.Set[string], *ksf.Status) { + return nil, ksf.NewStatus(ksf.Skip) + }, + }, + } + + if failure := pp.evaluateTaskOnVictimInvariantPrePredicates(task, k8sPredicates); failure != nil { + t.Fatalf("evaluateTaskOnVictimInvariantPrePredicates() = %v, want nil", failure) + } + pp.skipPredicates = SkipPredicates{} + if err := pp.evaluateTaskOnPrePredicate(task, k8sPredicates); err != nil { + t.Fatalf("evaluateTaskOnPrePredicate() = %v, want nil", err) + } + if !pp.skipPredicates.ShouldSKip(task.UID, predicates.VolumeBinding) { + t.Fatal("cached skip status did not restore skipPredicates state") + } +} + +func TestPredicatesPluginDoesNotCacheNonCandidatePrePredicates(t *testing.T) { + task := &pod_info.PodInfo{ + UID: common_info.PodID(types.UID("pod-1")), + Pod: &v1.Pod{}, + } + preFilterCalls := 0 + pp := &predicatesPlugin{ + skipPredicates: SkipPredicates{}, + prePredicateCache: map[prePredicateCacheKey]cachedPrePredicateResult{}, + } + k8sPredicates := k8s_internal.SessionPredicates{ + predicates.PodFitsHostPorts: { + IsPreFilterRequired: func(_ *v1.Pod) bool { return true }, + PreFilter: func(_ *v1.Pod) (sets.Set[string], *ksf.Status) { + preFilterCalls++ + return nil, nil + }, + }, + } + + if err := pp.evaluateTaskOnPrePredicate(task, k8sPredicates); err != nil { + t.Fatalf("evaluateTaskOnPrePredicate() = %v, want nil", err) + } + if err := pp.evaluateTaskOnPrePredicate(task, k8sPredicates); err != nil { + t.Fatalf("evaluateTaskOnPrePredicate() = %v, want nil", err) + } + if preFilterCalls != 2 { + t.Fatalf("PreFilter() call count = %d, want 2 for a non-candidate predicate", preFilterCalls) + } +}