Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
## [v0.9.17] - 2026-05-07

### Fixed
- Improved reclaim, preempt, and consolidation performance by skipping solver work for jobs blocked by victim-invariant pre-predicate failures such as missing PVCs, missing required ConfigMaps, and tasks larger than any node. [#1502](https://github.com/kai-scheduler/KAI-Scheduler/issues/1502)
- Suppressed noisy `Reconciler error` logs and `PodGrouperWarning` events on transient PodGroup update conflicts. The podgrouper now treats `IsConflict` errors as expected and silently requeues the reconcile instead of surfacing the apiserver's "object has been modified" message.

## [v0.9.16] - 2026-04-29
Expand Down
43 changes: 43 additions & 0 deletions pkg/scheduler/actions/common/action_eligibility.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2026 NVIDIA CORPORATION
// SPDX-License-Identifier: Apache-2.0

package common

import (
"fmt"

"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api"
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/common_info"
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/pod_info"
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/podgroup_info"
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/framework"
)

func VictimInvariantPrePredicateFailureForTasks(
ssn *framework.Session,
tasks []*pod_info.PodInfo,
) (*pod_info.PodInfo, *api.VictimInvariantPrePredicateFailure) {
for _, task := range tasks {
if failure := ssn.VictimInvariantPrePredicateFailure(task); failure != nil {
return task, failure
}
}

return nil, nil
}

func RecordVictimInvariantPrePredicateFailure(
job *podgroup_info.PodGroupInfo,
task *pod_info.PodInfo,
failure *api.VictimInvariantPrePredicateFailure,
) {
fitErrors := common_info.NewFitErrors()
fitErrors.SetError(failure.Err.Error())
job.SetTaskFitError(task, fitErrors)
job.SetJobFitError(
podgroup_info.PodSchedulingErrors,
fmt.Sprintf("Resources were not found for pod %s/%s due to: %s",
task.Namespace, task.Name, fitErrors.Error()),
nil,
)
}
60 changes: 60 additions & 0 deletions pkg/scheduler/actions/common/action_eligibility_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2026 NVIDIA CORPORATION
// SPDX-License-Identifier: Apache-2.0

package common

import (
"errors"
"testing"

"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/types"

"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api"
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/common_info"
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/pod_info"
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/podgroup_info"
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/framework"
)

func TestVictimInvariantPrePredicateFailureForTasks(t *testing.T) {
task1 := &pod_info.PodInfo{UID: common_info.PodID(types.UID("task-1")), Name: "task-1", Namespace: "ns1"}
task2 := &pod_info.PodInfo{UID: common_info.PodID(types.UID("task-2")), Name: "task-2", Namespace: "ns1"}
expectedErr := errors.New("missing dependency")

var seenTasks []common_info.PodID
ssn := &framework.Session{}
ssn.AddVictimInvariantPrePredicateFn(func(task *pod_info.PodInfo) *api.VictimInvariantPrePredicateFailure {
seenTasks = append(seenTasks, task.UID)
if task.UID != task2.UID {
return nil
}
return &api.VictimInvariantPrePredicateFailure{
Err: expectedErr,
}
})

blockedTask, failure := VictimInvariantPrePredicateFailureForTasks(ssn, []*pod_info.PodInfo{task1, task2})
require.NotNil(t, failure)
require.Same(t, task2, blockedTask)
require.Same(t, expectedErr, failure.Err)
require.Equal(t, []common_info.PodID{task1.UID, task2.UID}, seenTasks)
}

func TestRecordVictimInvariantPrePredicateFailure(t *testing.T) {
job := podgroup_info.NewPodGroupInfo("job-1")
job.Name = "job-1"
job.Namespace = "ns1"
task := &pod_info.PodInfo{UID: common_info.PodID(types.UID("task-1")), Name: "task-1", Namespace: "ns1"}
failure := &api.VictimInvariantPrePredicateFailure{
Err: errors.New("persistentvolumeclaim \"missing\" not found"),
}

RecordVictimInvariantPrePredicateFailure(job, task, failure)

taskFitError, found := job.NodesFitErrors[task.UID]
require.True(t, found)
require.Contains(t, taskFitError.Error(), `persistentvolumeclaim "missing" not found`)
require.Len(t, job.JobFitErrors, 1)
require.Contains(t, job.JobFitErrors[0].Message, "Resources were not found for pod ns1/task-1 due to:")
}
5 changes: 5 additions & 0 deletions pkg/scheduler/actions/consolidation/consolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ func (alloc *consolidationAction) Execute(ssn *framework.Session) {
continue
}
}
tasks := podgroup_info.GetTasksToAllocate(job, ssn.SubGroupOrderFn, ssn.TaskOrderFn, false)
if task, failure := common.VictimInvariantPrePredicateFailureForTasks(ssn, tasks); failure != nil {
common.RecordVictimInvariantPrePredicateFailure(job, task, failure)
continue
}

metrics.IncPodgroupsConsideredByAction()
if succeeded, stmt := attemptToConsolidateForPreemptor(ssn, job); succeeded {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2026 NVIDIA CORPORATION
// SPDX-License-Identifier: Apache-2.0

package consolidation_test

import (
"errors"
"testing"

"github.com/stretchr/testify/require"
. "go.uber.org/mock/gomock"

"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/actions/consolidation"
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api"
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/common_info"
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/pod_info"
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/pod_status"
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/constants"
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/test_utils"
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/test_utils/jobs_fake"
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/test_utils/nodes_fake"
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/test_utils/tasks_fake"
)

func TestConsolidationSkipsSolverForVictimInvariantPrePredicateFailure(t *testing.T) {
test_utils.InitTestingInfrastructure()
controller := NewController(t)
defer controller.Finish()

ssn := test_utils.BuildSession(test_utils.TestTopologyBasic{
Name: "consolidation victim invariant blocker",
Jobs: []*jobs_fake.TestJobBasic{
{
Name: "running-job-0",
RequiredGPUsPerTask: 2,
Priority: constants.PriorityTrainNumber,
QueueName: "queue0",
Tasks: []*tasks_fake.TestTaskBasic{{
NodeName: "node0",
State: pod_status.Running,
}},
},
{
Name: "running-job-1",
RequiredGPUsPerTask: 2,
Priority: constants.PriorityTrainNumber,
QueueName: "queue0",
Tasks: []*tasks_fake.TestTaskBasic{{
NodeName: "node1",
State: pod_status.Running,
}},
},
{
Name: "pending-job",
RequiredGPUsPerTask: 3,
Priority: constants.PriorityTrainNumber,
QueueName: "queue0",
Tasks: []*tasks_fake.TestTaskBasic{{
State: pod_status.Pending,
}},
},
},
Nodes: map[string]nodes_fake.TestNodeBasic{
"node0": {GPUs: 4},
"node1": {GPUs: 4},
},
Queues: []test_utils.TestQueueBasic{{
Name: "queue0",
DeservedGPUs: 2,
}},
}, controller)

job := ssn.PodGroupInfos[common_info.PodGroupID("pending-job")]
task := job.GetAllPodsMap()[common_info.PodID("pending-job-0")]
blockerErr := errors.New("blocked before consolidation")
calls := 0
ssn.AddVictimInvariantPrePredicateFn(func(gotTask *pod_info.PodInfo) *api.VictimInvariantPrePredicateFailure {
calls++
require.Same(t, task, gotTask)
return &api.VictimInvariantPrePredicateFailure{
Err: blockerErr,
}
})

consolidation.New().Execute(ssn)

require.Positive(t, calls)
require.Equal(t, pod_status.Pending, task.Status)
require.Contains(t, job.NodesFitErrors[task.UID].Error(), blockerErr.Error())
require.NotEmpty(t, job.JobFitErrors)
require.Contains(t, job.JobFitErrors[0].Message, "Resources were not found for pod /pending-job-0 due to:")
}
5 changes: 5 additions & 0 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) {
continue
}
}
tasks := podgroup_info.GetTasksToAllocate(job, ssn.SubGroupOrderFn, ssn.TaskOrderFn, false)
if task, failure := common.VictimInvariantPrePredicateFailureForTasks(ssn, tasks); failure != nil {
common.RecordVictimInvariantPrePredicateFailure(job, task, failure)
continue
}

metrics.IncPodgroupsConsideredByAction()
succeeded, statement, preemptedTasksNames := attemptToPreemptForPreemptor(ssn, job)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2026 NVIDIA CORPORATION
// SPDX-License-Identifier: Apache-2.0

package preempt_test

import (
"errors"
"testing"

"github.com/stretchr/testify/require"
. "go.uber.org/mock/gomock"

"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/actions/preempt"
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api"
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/common_info"
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/pod_info"
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/pod_status"
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/constants"
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/test_utils"
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/test_utils/jobs_fake"
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/test_utils/nodes_fake"
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/test_utils/tasks_fake"
)

func TestPreemptSkipsSolverForVictimInvariantPrePredicateFailure(t *testing.T) {
test_utils.InitTestingInfrastructure()
controller := NewController(t)
defer controller.Finish()

ssn := test_utils.BuildSession(test_utils.TestTopologyBasic{
Name: "preempt victim invariant blocker",
Jobs: []*jobs_fake.TestJobBasic{
{
Name: "running-job",
RequiredGPUsPerTask: 1,
Priority: constants.PriorityTrainNumber,
QueueName: "queue0",
Tasks: []*tasks_fake.TestTaskBasic{{
NodeName: "node0",
State: pod_status.Running,
}},
},
{
Name: "pending-job",
RequiredGPUsPerTask: 1,
Priority: constants.PriorityBuildNumber,
QueueName: "queue0",
Tasks: []*tasks_fake.TestTaskBasic{{
State: pod_status.Pending,
}},
},
},
Nodes: map[string]nodes_fake.TestNodeBasic{
"node0": {GPUs: 1},
},
Queues: []test_utils.TestQueueBasic{{
Name: "queue0",
DeservedGPUs: 1,
}},
}, controller)

job := ssn.PodGroupInfos[common_info.PodGroupID("pending-job")]
task := job.GetAllPodsMap()[common_info.PodID("pending-job-0")]
blockerErr := errors.New("blocked before preempt")
calls := 0
ssn.AddVictimInvariantPrePredicateFn(func(gotTask *pod_info.PodInfo) *api.VictimInvariantPrePredicateFailure {
calls++
require.Same(t, task, gotTask)
return &api.VictimInvariantPrePredicateFailure{
Err: blockerErr,
}
})

preempt.New().Execute(ssn)

require.Positive(t, calls)
require.Equal(t, pod_status.Pending, task.Status)
require.Contains(t, job.NodesFitErrors[task.UID].Error(), blockerErr.Error())
require.NotEmpty(t, job.JobFitErrors)
require.Contains(t, job.JobFitErrors[0].Message, "Resources were not found for pod /pending-job-0 due to:")
}
5 changes: 5 additions & 0 deletions pkg/scheduler/actions/reclaim/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ func (ra *reclaimAction) Execute(ssn *framework.Session) {
continue
}
}
tasks := podgroup_info.GetTasksToAllocate(job, ssn.SubGroupOrderFn, ssn.TaskOrderFn, false)
if task, failure := common.VictimInvariantPrePredicateFailureForTasks(ssn, tasks); failure != nil {
common.RecordVictimInvariantPrePredicateFailure(job, task, failure)
continue
}
metrics.IncPodgroupsConsideredByAction()
succeeded, statement, reclaimeeTasksNames := ra.attemptToReclaimForSpecificJob(ssn, job)
if succeeded {
Expand Down
12 changes: 12 additions & 0 deletions pkg/scheduler/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@ type PredicateFn func(*pod_info.PodInfo, *podgroup_info.PodGroupInfo, *node_info
// PrePredicateFn is used to prepare for predicate on pod.
type PrePredicateFn func(*pod_info.PodInfo, *podgroup_info.PodGroupInfo) error

// VictimInvariantPrePredicateFailure is an action-level pre-predicate blocker
// that cannot be resolved by changing the victim set in the current session.
type VictimInvariantPrePredicateFailure struct {
Err error
}

// VictimInvariantPrePredicateFn returns a victim-invariant blocker for a task,
// or nil when action handling should continue normally.
type VictimInvariantPrePredicateFn func(
*pod_info.PodInfo,
) *VictimInvariantPrePredicateFailure

// CanReclaimResourcesFn is a function that determines if a reclaimer can get more resources
type CanReclaimResourcesFn func(pendingJob *podgroup_info.PodGroupInfo) bool

Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type Session struct {
IsJobOverCapacityFns []api.IsJobOverCapacityFn
IsTaskAllocationOnNodeOverCapacityFns []api.IsTaskAllocationOverCapacityFn
PrePredicateFns []api.PrePredicateFn
VictimInvariantPrePredicateFns []api.VictimInvariantPrePredicateFn
PredicateFns []api.PredicateFn
BindRequestMutateFns []api.BindRequestMutateFn
CleanAllocationAttemptCacheFns []api.CleanAllocationAttemptCacheFn
Expand Down
15 changes: 15 additions & 0 deletions pkg/scheduler/framework/session_plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ func (ssn *Session) AddPrePredicateFn(pf api.PrePredicateFn) {
ssn.PrePredicateFns = append(ssn.PrePredicateFns, pf)
}

func (ssn *Session) AddVictimInvariantPrePredicateFn(pf api.VictimInvariantPrePredicateFn) {
ssn.VictimInvariantPrePredicateFns = append(ssn.VictimInvariantPrePredicateFns, pf)
}
func (ssn *Session) AddPredicateFn(pf api.PredicateFn) {
ssn.PredicateFns = append(ssn.PredicateFns, pf)
}
Expand Down Expand Up @@ -330,6 +333,18 @@ func (ssn *Session) PrePredicateFn(task *pod_info.PodInfo, job *podgroup_info.Po
return nil
}

func (ssn *Session) VictimInvariantPrePredicateFailure(
task *pod_info.PodInfo,
) *api.VictimInvariantPrePredicateFailure {
for _, prePredicate := range ssn.VictimInvariantPrePredicateFns {
if failure := prePredicate(task); failure != nil {
return failure
}
}

return nil
}

func (ssn *Session) PredicateFn(task *pod_info.PodInfo, job *podgroup_info.PodGroupInfo, node *node_info.NodeInfo) error {
for _, pfn := range ssn.PredicateFns {
err := pfn(task, job, node)
Expand Down
Loading
Loading