Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
- Added validation for `subgroup` name in podgroup [faizanexe](https://github.com/faizan-exe)
- Added memory profile and run duration to snapshot tool [#1411](https://github.com/NVIDIA/KAI-Scheduler/issues/1411)
- Added support for configuring pod and container security contexts on resource reservation pods via CLI flags [AdheipSingh](https://github.com/AdheipSingh)
- Add a retry for pod patching (labels or status) by the scheduler. This follows a similar pattern to the current podgroup patching retries on failure. [#1437](https://github.com/kai-scheduler/KAI-Scheduler/pull/1437) - [davidLif](https://github.com/davidLif)
- Do not retry podgroup status updates if the update failure was "podgroup not found". [#1437](https://github.com/kai-scheduler/KAI-Scheduler/pull/1437) - [davidLif](https://github.com/davidLif)

### Changed
- **Breaking:** JobSet PodGroups no longer auto-calculate `minAvailable` from `parallelism × replicas`. The default is now 1. Use the `kai.scheduler/batch-min-member` annotation to set a custom value.
Expand Down
30 changes: 23 additions & 7 deletions pkg/scheduler/cache/status_updater/concurrency.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"

enginev2alpha2 "github.com/kai-scheduler/KAI-scheduler/pkg/apis/scheduling/v2alpha2"
Expand Down Expand Up @@ -43,7 +42,7 @@ func (su *defaultStatusUpdater) processPayload(ctx context.Context, payload *upd

switch payload.objectType {
case podType:
su.updatePod(ctx, payload.key, updateData.patchData, updateData.subResources, updateData.object)
su.updatePod(ctx, payload.key, updateData)
case podGroupType:
su.updatePodGroup(ctx, payload.key, updateData)
}
Expand All @@ -66,15 +65,26 @@ func (su *defaultStatusUpdater) loadInflightUpdate(payload *updatePayload) (*inf
}

func (su *defaultStatusUpdater) updatePod(
ctx context.Context, key updatePayloadKey, patchData []byte, subResources []string, object runtime.Object,
ctx context.Context, key updatePayloadKey, updateData *inflightUpdate,
) {
pod := object.(*v1.Pod)
pod := updateData.object.(*v1.Pod)
_, err := su.kubeClient.CoreV1().Pods(pod.Namespace).Patch(
ctx, pod.Name, types.StrategicMergePatchType, patchData, metav1.PatchOptions{}, subResources...,
ctx, pod.Name, types.StrategicMergePatchType, updateData.patchData, metav1.PatchOptions{}, updateData.subResources...,
)

if err != nil {
log.StatusUpdaterLogger.V(1).Errorf("Failed to patch pod %s/%s: %v", pod.Namespace, pod.Name, err)
if apierrors.IsNotFound(err) {
log.StatusUpdaterLogger.V(5).Infof("Pod %s/%s not found, skipping pod patch: %v",
pod.Namespace, pod.Name, err)
su.inFlightPods.Delete(key)
return
}
log.StatusUpdaterLogger.V(1).Errorf("Failed to patch pod %s/%s, retrying: %v",
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be a limit to the retries

pod.Namespace, pod.Name, err)
su.pushToUpdateQueue(
&updatePayload{key: key, objectType: podType},
updateData,
)
return
}

Expand Down Expand Up @@ -103,6 +113,12 @@ func (su *defaultStatusUpdater) updatePodGroup(
}

if statusErr != nil || patchErr != nil {
if apierrors.IsNotFound(statusErr) || apierrors.IsNotFound(patchErr) {
log.StatusUpdaterLogger.V(5).Infof("Pod group %s/%s not found, skipping podgroup update: %v",
podGroup.Namespace, podGroup.Name, statusErr)
su.inFlightPodGroups.Delete(key)
return
}

if statusErr != nil {
if apierrors.IsConflict(statusErr) {
Expand All @@ -118,7 +134,7 @@ func (su *defaultStatusUpdater) updatePodGroup(
podGroup.Namespace, podGroup.Name, statusErr)
}
if patchErr != nil {
log.StatusUpdaterLogger.V(1).Errorf("Failed to patch pod group %s/%s: %v",
log.StatusUpdaterLogger.V(1).Errorf("Failed to patch pod group %s/%s, retrying: %v",
podGroup.Namespace, podGroup.Name, patchErr)
}

Expand Down
94 changes: 94 additions & 0 deletions pkg/scheduler/cache/status_updater/concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,98 @@ var _ = Describe("Status Updater Concurrency - large scale: increase queue size"
// Expected - queue is empty, meaning no retry was scheduled
}
})

It("updatePod - Retry after transient error", func() {
patchCalls := 0
kubeClient.CoreV1().(*fakecorev1.FakeCoreV1).PrependReactor(
"patch", "pods", func(action faketesting.Action) (handled bool, ret runtime.Object, err error) {
patchCalls++
return true, nil, errors.New("transient API server error")
},
)

pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Namespace: "test-ns",
UID: "test-uid",
},
}

key := statusUpdater.keyForPodStatusPayload(pod.Name, pod.Namespace, pod.UID)
updateData := &inflightUpdate{
object: pod,
patchData: []byte(`{"status":{"conditions":[{"type":"PodScheduled","status":"False","reason":"Unschedulable"}]}}`),
subResources: []string{"status"},
}

statusUpdater.inFlightPods.Store(key, updateData)
statusUpdater.Run(make(chan struct{}))

ctx := context.Background()
statusUpdater.updatePod(ctx, key, updateData)

Expect(patchCalls).To(Equal(1), "Patch should be called once")

// Inflight entry should still exist (not deleted on error)
_, exists := statusUpdater.inFlightPods.Load(key)
Expect(exists).To(BeTrue(), "Inflight entry should remain after transient error")

// Verify retry was queued
select {
case payload := <-statusUpdater.updateQueueOut:
Expect(payload.key).To(Equal(key))
Expect(payload.objectType).To(Equal(podType))
case <-time.After(100 * time.Millisecond):
Fail("Expected retry payload in the update queue after transient error")
}
})

It("updatePod - No retry after NotFound error", func() {
patchCalls := 0
kubeClient.CoreV1().(*fakecorev1.FakeCoreV1).PrependReactor(
"patch", "pods", func(action faketesting.Action) (handled bool, ret runtime.Object, err error) {
patchCalls++
return true, nil, apierrors.NewNotFound(
schema.GroupResource{Group: "", Resource: "pods"},
"test-pod",
)
},
)

pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Namespace: "test-ns",
UID: "test-uid",
},
}

key := statusUpdater.keyForPodStatusPayload(pod.Name, pod.Namespace, pod.UID)
updateData := &inflightUpdate{
object: pod,
patchData: []byte(`{"status":{"conditions":[{"type":"PodScheduled","status":"False","reason":"Unschedulable"}]}}`),
subResources: []string{"status"},
}

statusUpdater.inFlightPods.Store(key, updateData)
statusUpdater.Run(make(chan struct{}))

ctx := context.Background()
statusUpdater.updatePod(ctx, key, updateData)

Expect(patchCalls).To(Equal(1), "Patch should be called once")

// Inflight entry should be cleaned up
_, exists := statusUpdater.inFlightPods.Load(key)
Expect(exists).To(BeFalse(), "Inflight entry should be deleted after NotFound error")

// No retry should be queued
select {
case <-statusUpdater.updateQueueOut:
Fail("Update queue should be empty - no retry should be queued for NotFound errors")
case <-time.After(100 * time.Millisecond):
// Expected - queue is empty
}
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -750,11 +750,10 @@ func TestDefaultStatusUpdater_RetryAfterError(t *testing.T) {
statusUpdater := New(kubeClient, kubeAiSchedClient, recorder, 1, false, nodePoolLabelKey)

updateCalls := 0
// wait with pod groups update until signal is given.
kubeAiSchedClient.SchedulingV2alpha2().(*fakeschedulingv2alpha2.FakeSchedulingV2alpha2).PrependReactor(
"update", "podgroups", func(action faketesting.Action) (handled bool, ret runtime.Object, err error) {
updateCalls += 1
return false, nil, errors.New("test")
return true, nil, errors.New("retryable error")
},
)

Expand All @@ -769,20 +768,6 @@ func TestDefaultStatusUpdater_RetryAfterError(t *testing.T) {
},
Status: enginev2alpha2.PodGroupStatus{},
}
jobCopy := job.DeepCopy()

jobCopy.Status.SchedulingConditions = []enginev2alpha2.SchedulingCondition{
{
TransitionID: "1",
Type: enginev2alpha2.UnschedulableOnNodePool,
NodePool: "test",
Reason: "test",
Message: "test",
},
}

patchData, err := getPodGroupPatch(job, jobCopy)
assert.NoError(t, err)

go func() {
time.Sleep(time.Millisecond * 75)
Expand All @@ -791,7 +776,6 @@ func TestDefaultStatusUpdater_RetryAfterError(t *testing.T) {
objectType: "podgroup",
}, &inflightUpdate{
object: job,
patchData: patchData,
updateStatus: true,
subResources: nil,
})
Expand Down
Loading