diff --git a/CHANGELOG.md b/CHANGELOG.md index 0fd6fd423..aa5b9d0e7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). - Allow the configuration of plugins in the binder service. [#1480](https://github.com/kai-scheduler/KAI-Scheduler/pull/1480) - [davidLif](https://github.com/davidLif) - Added support for configuring scheduler log level and custom scheduler args via Helm values (`scheduler.args`) [#1452](https://github.com/kai-scheduler/KAI-Scheduler/pull/1452) [dttung2905](https://github.com/dttung2905) - Added `crdupgrader.image.registry` Helm value to override `global.registry` for the `crd-upgrader` pre-install/pre-upgrade hook image, allowing the hook image to be served from a separate mirror without redirecting all chart images. [#1404](https://github.com/kai-scheduler/KAI-Scheduler/issues/1404) +- Added support for externally-created PodGroups. Workloads can opt out of podgrouper mutation with `kai.scheduler/skip-podgrouper: "true"` on the pod or owner chain, join an existing PodGroup via `pod-group-name`, and now get a pod condition when they reference a non-existent subgroup. [#1420](https://github.com/kai-scheduler/KAI-Scheduler/issues/1420) ### Changed - **Breaking:** JobSet PodGroups no longer auto-calculate `minAvailable` from `parallelism × replicas`. The default is now 1. Use the `kai.scheduler/batch-min-member` annotation to set a custom value. diff --git a/docs/batch/README.md b/docs/batch/README.md index 8cd5e7bf8..0df8a4e62 100644 --- a/docs/batch/README.md +++ b/docs/batch/README.md @@ -18,6 +18,29 @@ This will create a job with parallelism of 6, but requires at least 2 pods to be For JobSets, the annotation overrides the calculated minAvailable for all PodGroups created by the JobSet. +## External PodGroups + +KAI also supports PodGroups that are created outside the podgrouper. This is useful when multiple workloads should join the same gang or when an external controller owns the PodGroup lifecycle. + +Use the following contract: + +- Create the `PodGroup` explicitly. +- Set `pod-group-name` on the pod template metadata to join that PodGroup. +- Set `kai.scheduler/subgroup-name` on the pod template metadata labels when using non-default subgroups. +- Set `kai.scheduler/skip-podgrouper: "true"` on the workload or any readable owner in the owner chain to prevent podgrouper from creating or rewriting PodGroup membership. + +Example: + +```bash +kubectl apply -f examples/batch/external-podgroup-job.yaml +``` + +Behavior notes: + +- `PodGroup.spec.queue` is authoritative for scheduling. +- If a pod references a PodGroup that does not exist yet, KAI leaves that case unchanged and does not set a new pod condition. +- If a pod references a subgroup that does not exist in the PodGroup, KAI ignores only that pod for scheduling and sets a pod condition explaining the invalid subgroup. + ## PyTorchJob To run in a distributed way across multiple pods, you can use PyTorchJob. diff --git a/docs/developer/pod-grouper.md b/docs/developer/pod-grouper.md index a04b3f618..6316b4e9e 100644 --- a/docs/developer/pod-grouper.md +++ b/docs/developer/pod-grouper.md @@ -22,6 +22,16 @@ The Pod Grouper uses the PodGroup Custom Resource Definition (CRD) to represent While users or third-party tools can manually create PodGroup resources, the Pod Grouper automates this process by analyzing incoming pods and applying appropriate grouping logic based on the pod's characteristics and ownership. +### External PodGroups + +Podgrouper can also be told to leave PodGroup membership unchanged. When a pod or any readable object in its owner chain has `kai.scheduler/skip-podgrouper: "true"`, podgrouper does not create or update a PodGroup for that pod and does not patch `pod-group-name` or `kai.scheduler/subgroup-name`. + +This is the supported path for externally-created PodGroups. External controllers or manifests still need to: + +- Create the `PodGroup` resource explicitly. +- Set `pod-group-name` on the pod template annotations. +- Set `kai.scheduler/subgroup-name` on the pod template labels when using non-default subgroups. + ## Plugin Architecture The Pod Grouper uses a plugin-based architecture similar to the scheduler's plugin framework. Each plugin implements specific grouping logic for different types of workloads: diff --git a/examples/batch/README.md b/examples/batch/README.md index 22b30c594..50d6eacf7 100644 --- a/examples/batch/README.md +++ b/examples/batch/README.md @@ -9,3 +9,18 @@ kubectl apply -f batch-job-min-member.yaml ``` This creates a job with `parallelism: 6` but requires at least 2 pods to be schedulable before any pod starts running. The annotation value must be a positive integer. + +## External PodGroup + +Use `external-podgroup-job.yaml` when the PodGroup is created manually or by another controller and the Job should join it without podgrouper interference. + +```bash +kubectl apply -f external-podgroup-job.yaml +``` + +This example shows: + +- An explicit `PodGroup` resource with queue and subgroup definitions. +- `kai.scheduler/skip-podgrouper: "true"` on the Job. +- `pod-group-name` on the pod template annotations. +- `kai.scheduler/subgroup-name` on the pod template labels. diff --git a/examples/batch/external-podgroup-job.yaml b/examples/batch/external-podgroup-job.yaml new file mode 100644 index 000000000..3f0930ddf --- /dev/null +++ b/examples/batch/external-podgroup-job.yaml @@ -0,0 +1,43 @@ +# Copyright 2025 NVIDIA CORPORATION +# SPDX-License-Identifier: Apache-2.0 + +# This example demonstrates how to use an externally-created PodGroup with a +# batch Job. The PodGroup is created explicitly and the Job opts out of +# podgrouper reconciliation while attaching its pods to the external PodGroup. + +apiVersion: scheduling.run.ai/v2alpha2 +kind: PodGroup +metadata: + name: external-batch-job +spec: + minMember: 2 + queue: default-queue + subGroups: + - name: workers + minMember: 2 +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: external-batch-job + annotations: + kai.scheduler/skip-podgrouper: "true" +spec: + parallelism: 2 + completions: 2 + template: + metadata: + annotations: + pod-group-name: external-batch-job + labels: + kai.scheduler/subgroup-name: workers + spec: + schedulerName: kai-scheduler + restartPolicy: Never + containers: + - name: main + image: ubuntu + args: ["sleep", "infinity"] + resources: + limits: + nvidia.com/gpu: "1" diff --git a/pkg/common/constants/constants.go b/pkg/common/constants/constants.go index 1d710c0a8..18d309506 100644 --- a/pkg/common/constants/constants.go +++ b/pkg/common/constants/constants.go @@ -33,6 +33,7 @@ const ( // Annotations PodGroupAnnotationForPod = "pod-group-name" + SkipPodGrouperAnnotation = "kai.scheduler/skip-podgrouper" GpuFraction = "gpu-fraction" GpuFractionContainerName = "gpu-fraction-container-name" GpuMemory = "gpu-memory" diff --git a/pkg/podgrouper/pod_controller.go b/pkg/podgrouper/pod_controller.go index 0437a324d..36c84cd97 100644 --- a/pkg/podgrouper/pod_controller.go +++ b/pkg/podgrouper/pod_controller.go @@ -10,6 +10,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" @@ -96,6 +97,10 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R } }() + if shouldSkipPodGrouper(&pod) { + return ctrl.Result{}, nil + } + if isOrphanPodWithPodGroup(&pod) { return ctrl.Result{}, nil } @@ -110,11 +115,18 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R return ctrl.Result{}, err } + if shouldSkipAnyOwner(allOwners) { + return ctrl.Result{}, nil + } + metadata, err := r.podGrouper.GetPGMetadata(ctx, &pod, topOwner, allOwners) if err != nil { logger.V(1).Error(err, "Failed to create pod group metadata for pod", req.Namespace, req.Name) return ctrl.Result{}, err } + if metadata == nil { + return ctrl.Result{}, nil + } if len(r.configs.NodePoolLabelKey) > 0 { addNodePoolLabel(metadata, &pod, r.configs.NodePoolLabelKey) @@ -219,7 +231,24 @@ func addNodePoolLabel(metadata *podgroup.Metadata, pod *v1.Pod, nodePoolKey stri func isOrphanPodWithPodGroup(pod *v1.Pod) bool { _, foundPGAnnotation := pod.Annotations[constants.PodGroupAnnotationForPod] - return foundPGAnnotation && pod.OwnerReferences == nil + return foundPGAnnotation && len(pod.OwnerReferences) == 0 +} + +func shouldSkipAnyOwner(owners []*metav1.PartialObjectMetadata) bool { + for _, owner := range owners { + if shouldSkipPodGrouper(owner) { + return true + } + } + return false +} + +func shouldSkipPodGrouper(obj metav1.Object) bool { + if obj == nil { + return false + } + + return obj.GetAnnotations()[constants.SkipPodGrouperAnnotation] == "true" } // +kubebuilder:rbac:groups="",resources=namespaces,verbs=get;list;watch diff --git a/pkg/podgrouper/pod_controller_test.go b/pkg/podgrouper/pod_controller_test.go index f5569748b..c2d0ccd0b 100644 --- a/pkg/podgrouper/pod_controller_test.go +++ b/pkg/podgrouper/pod_controller_test.go @@ -109,6 +109,9 @@ func TestIsOrphanPodWithPodGroup(t *testing.T) { assert.True(t, isOrphanPodWithPodGroup(&pod)) + pod.OwnerReferences = []metav1.OwnerReference{} + assert.True(t, isOrphanPodWithPodGroup(&pod)) + pod.OwnerReferences = []metav1.OwnerReference{ { APIVersion: "v1", @@ -158,16 +161,268 @@ func TestEventOnFailure(t *testing.T) { } -type fakePodGrouper struct{} +type fakePodGrouper struct { + getPGMetadataFn func(ctx context.Context, pod *v1.Pod, topOwner *unstructured.Unstructured, allOwners []*metav1.PartialObjectMetadata) (*podgroup.Metadata, error) + getPodOwnersFn func(ctx context.Context, pod *v1.Pod) (*unstructured.Unstructured, []*metav1.PartialObjectMetadata, error) + getPGMetadataCalls int + getPodOwnersCalls int +} -func (*fakePodGrouper) GetPGMetadata(ctx context.Context, pod *v1.Pod, topOwner *unstructured.Unstructured, allOwners []*metav1.PartialObjectMetadata) (*podgroup.Metadata, error) { +func (f *fakePodGrouper) GetPGMetadata(ctx context.Context, pod *v1.Pod, topOwner *unstructured.Unstructured, allOwners []*metav1.PartialObjectMetadata) (*podgroup.Metadata, error) { + f.getPGMetadataCalls++ + if f.getPGMetadataFn != nil { + return f.getPGMetadataFn(ctx, pod, topOwner, allOwners) + } return nil, nil } -func (*fakePodGrouper) GetPodOwners(ctx context.Context, pod *v1.Pod) (*unstructured.Unstructured, []*metav1.PartialObjectMetadata, error) { +func (f *fakePodGrouper) GetPodOwners(ctx context.Context, pod *v1.Pod) (*unstructured.Unstructured, []*metav1.PartialObjectMetadata, error) { + f.getPodOwnersCalls++ + if f.getPodOwnersFn != nil { + return f.getPodOwnersFn(ctx, pod) + } return nil, nil, fmt.Errorf("failed") } +func TestShouldSkipPodGrouper(t *testing.T) { + pod := &v1.Pod{} + assert.False(t, shouldSkipPodGrouper(pod)) + + pod.Annotations = map[string]string{constants.SkipPodGrouperAnnotation: "false"} + assert.False(t, shouldSkipPodGrouper(pod)) + + pod.Annotations[constants.SkipPodGrouperAnnotation] = "true" + assert.True(t, shouldSkipPodGrouper(pod)) +} + +func TestShouldSkipAnyOwner(t *testing.T) { + owner := &metav1.PartialObjectMetadata{} + assert.False(t, shouldSkipAnyOwner([]*metav1.PartialObjectMetadata{owner})) + + owner.Annotations = map[string]string{constants.SkipPodGrouperAnnotation: "true"} + assert.True(t, shouldSkipAnyOwner([]*metav1.PartialObjectMetadata{owner})) +} + +func TestReconcileSkipsAnnotatedPod(t *testing.T) { + pod := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-pod", + Namespace: "my-namespace", + Annotations: map[string]string{ + constants.SkipPodGrouperAnnotation: "true", + }, + }, + Spec: v1.PodSpec{ + SchedulerName: "kai-scheduler", + }, + } + + fakeGrouper := &fakePodGrouper{ + getPodOwnersFn: func(ctx context.Context, pod *v1.Pod) (*unstructured.Unstructured, []*metav1.PartialObjectMetadata, error) { + return nil, nil, nil + }, + } + podReconciler := PodReconciler{ + Client: fake.NewClientBuilder().WithObjects(&pod).Build(), + Scheme: scheme.Scheme, + podGrouper: fakeGrouper, + configs: Configs{ + SchedulerName: "kai-scheduler", + }, + eventRecorder: record.NewFakeRecorder(10), + } + + _, err := podReconciler.Reconcile(context.TODO(), ctrl.Request{ + NamespacedName: types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, + }) + + assert.NoError(t, err) + assert.Zero(t, fakeGrouper.getPodOwnersCalls) +} + +func TestReconcileSkipsAnnotatedOwner(t *testing.T) { + pod := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-pod", + Namespace: "my-namespace", + }, + Spec: v1.PodSpec{ + SchedulerName: "kai-scheduler", + }, + } + + fakeGrouper := &fakePodGrouper{ + getPodOwnersFn: func(ctx context.Context, pod *v1.Pod) (*unstructured.Unstructured, []*metav1.PartialObjectMetadata, error) { + return nil, []*metav1.PartialObjectMetadata{{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{constants.SkipPodGrouperAnnotation: "true"}, + }, + }}, nil + }, + getPGMetadataFn: func(ctx context.Context, pod *v1.Pod, topOwner *unstructured.Unstructured, allOwners []*metav1.PartialObjectMetadata) (*podgroup.Metadata, error) { + return nil, nil + }, + } + podReconciler := PodReconciler{ + Client: fake.NewClientBuilder().WithObjects(&pod).Build(), + Scheme: scheme.Scheme, + podGrouper: fakeGrouper, + configs: Configs{ + SchedulerName: "kai-scheduler", + }, + eventRecorder: record.NewFakeRecorder(10), + } + + _, err := podReconciler.Reconcile(context.TODO(), ctrl.Request{ + NamespacedName: types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, + }) + + assert.NoError(t, err) + assert.Equal(t, 1, fakeGrouper.getPodOwnersCalls) + assert.Zero(t, fakeGrouper.getPGMetadataCalls) +} + +func TestReconcileSkipsIntermediateAnnotatedOwner(t *testing.T) { + pod := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-pod", + Namespace: "my-namespace", + }, + Spec: v1.PodSpec{ + SchedulerName: "kai-scheduler", + }, + } + + fakeGrouper := &fakePodGrouper{ + getPodOwnersFn: func(ctx context.Context, pod *v1.Pod) (*unstructured.Unstructured, []*metav1.PartialObjectMetadata, error) { + return nil, []*metav1.PartialObjectMetadata{ + {ObjectMeta: metav1.ObjectMeta{}}, + {ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{constants.SkipPodGrouperAnnotation: "true"}, + }}, + }, nil + }, + } + podReconciler := PodReconciler{ + Client: fake.NewClientBuilder().WithObjects(&pod).Build(), + Scheme: scheme.Scheme, + podGrouper: fakeGrouper, + configs: Configs{ + SchedulerName: "kai-scheduler", + }, + eventRecorder: record.NewFakeRecorder(10), + } + + _, err := podReconciler.Reconcile(context.TODO(), ctrl.Request{ + NamespacedName: types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, + }) + + assert.NoError(t, err) + assert.Equal(t, 1, fakeGrouper.getPodOwnersCalls) + assert.Zero(t, fakeGrouper.getPGMetadataCalls) +} + +func TestReconcileDoesNotSkipOnFalseAnnotationValue(t *testing.T) { + pod := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-pod", + Namespace: "my-namespace", + Annotations: map[string]string{ + constants.SkipPodGrouperAnnotation: "false", + }, + }, + Spec: v1.PodSpec{ + SchedulerName: "kai-scheduler", + }, + } + + fakeGrouper := &fakePodGrouper{} + podReconciler := PodReconciler{ + Client: fake.NewClientBuilder().WithObjects(&pod).Build(), + Scheme: scheme.Scheme, + podGrouper: fakeGrouper, + configs: Configs{ + SchedulerName: "kai-scheduler", + }, + eventRecorder: record.NewFakeRecorder(10), + } + + _, err := podReconciler.Reconcile(context.TODO(), ctrl.Request{ + NamespacedName: types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, + }) + + assert.Error(t, err) + assert.Equal(t, 1, fakeGrouper.getPodOwnersCalls) +} + +func TestReconcileSkipsOwnerlessPodGroupPod(t *testing.T) { + pod := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-pod", + Namespace: "my-namespace", + Annotations: map[string]string{ + constants.PodGroupAnnotationForPod: "external-pg", + }, + OwnerReferences: []metav1.OwnerReference{}, + }, + Spec: v1.PodSpec{ + SchedulerName: "kai-scheduler", + }, + } + + fakeGrouper := &fakePodGrouper{} + podReconciler := PodReconciler{ + Client: fake.NewClientBuilder().WithObjects(&pod).Build(), + Scheme: scheme.Scheme, + podGrouper: fakeGrouper, + configs: Configs{ + SchedulerName: "kai-scheduler", + }, + eventRecorder: record.NewFakeRecorder(10), + } + + _, err := podReconciler.Reconcile(context.TODO(), ctrl.Request{ + NamespacedName: types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, + }) + + assert.NoError(t, err) + assert.Zero(t, fakeGrouper.getPodOwnersCalls) +} + +func TestReconcileNoOpsOnNilMetadata(t *testing.T) { + pod := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-pod", + Namespace: "my-namespace", + }, + Spec: v1.PodSpec{ + SchedulerName: "kai-scheduler", + }, + } + + fakeGrouper := &fakePodGrouper{ + getPodOwnersFn: func(ctx context.Context, pod *v1.Pod) (*unstructured.Unstructured, []*metav1.PartialObjectMetadata, error) { + return nil, []*metav1.PartialObjectMetadata{{ObjectMeta: metav1.ObjectMeta{}}}, nil + }, + } + podReconciler := PodReconciler{ + Client: fake.NewClientBuilder().WithObjects(&pod).Build(), + Scheme: scheme.Scheme, + podGrouper: fakeGrouper, + configs: Configs{ + SchedulerName: "kai-scheduler", + }, + eventRecorder: record.NewFakeRecorder(10), + } + + _, err := podReconciler.Reconcile(context.TODO(), ctrl.Request{ + NamespacedName: types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, + }) + + assert.NoError(t, err) + assert.Equal(t, 1, fakeGrouper.getPGMetadataCalls) +} + func TestEventFilterFn(t *testing.T) { // Setup test cases tests := []struct { diff --git a/pkg/scheduler/api/podgroup_info/job_info.go b/pkg/scheduler/api/podgroup_info/job_info.go index 5abeba04a..297239e40 100644 --- a/pkg/scheduler/api/podgroup_info/job_info.go +++ b/pkg/scheduler/api/podgroup_info/job_info.go @@ -78,8 +78,9 @@ type PodGroupInfo struct { PodGroup *enginev2alpha2.PodGroup PodGroupUID types.UID - RootSubGroupSet *subgroup_info.SubGroupSet - PodSets map[string]*subgroup_info.PodSet + RootSubGroupSet *subgroup_info.SubGroupSet + PodSets map[string]*subgroup_info.PodSet + InvalidSubGroupTasks pod_info.PodsMap StalenessInfo @@ -115,8 +116,9 @@ func NewPodGroupInfoWithVectorMap(uid common_info.PodGroupID, vectorMap *resourc TimeStamp: nil, Stale: false, }, - RootSubGroupSet: defaultSubGroupSet, - PodSets: defaultSubGroupSet.GetDescendantPodSets(), + RootSubGroupSet: defaultSubGroupSet, + PodSets: defaultSubGroupSet.GetDescendantPodSets(), + InvalidSubGroupTasks: pod_info.PodsMap{}, LastStartTimestamp: nil, activeAllocatedCount: ptr.To(0), @@ -254,6 +256,7 @@ func (pgi *PodGroupInfo) AddTaskInfo(ti *pod_info.PodInfo) { podSet, found := pgi.PodSets[taskSubGroupName] if !found { log.InfraLogger.Warningf("AddTaskInfo for task <%s/%s> of podGroup: <%s/%s>: SubGroup not found <%s>", ti.Namespace, ti.Name, pgi.Namespace, pgi.Name, taskSubGroupName) + pgi.addInvalidSubGroupTask(ti, taskSubGroupName) return } @@ -511,6 +514,7 @@ func (pgi *PodGroupInfo) CloneWithTasks(tasks []*pod_info.PodInfo) *PodGroupInfo PodGroupUID: pgi.PodGroupUID, PodStatusIndex: map[pod_status.PodStatus]pod_info.PodsMap{}, + InvalidSubGroupTasks: pod_info.PodsMap{}, activeAllocatedCount: ptr.To(0), } @@ -553,6 +557,15 @@ func (pgi *PodGroupInfo) AddTaskFitErrors(task *pod_info.PodInfo, fitErrors *com } } +func (pgi *PodGroupInfo) GetInvalidSubGroupTasks() pod_info.PodsMap { + return pgi.InvalidSubGroupTasks +} + +func (pgi *PodGroupInfo) IsInvalidSubGroupTask(taskID common_info.PodID) bool { + _, found := pgi.InvalidSubGroupTasks[taskID] + return found +} + func (pgi *PodGroupInfo) AddSimpleJobFitError(reason enginev2alpha2.UnschedulableReason, message string) { pgi.AddJobFitError(common_info.NewJobFitError(pgi.Name, DefaultSubGroup, pgi.Namespace, reason, []string{message})) } @@ -585,3 +598,16 @@ func (pgi *PodGroupInfo) generateSchedulingConstraintsSignature() common_info.Sc return common_info.SchedulingConstraintsSignature(fmt.Sprintf("%x", hash.Sum(nil))) } + +func (pgi *PodGroupInfo) addInvalidSubGroupTask(ti *pod_info.PodInfo, taskSubGroupName string) { + pgi.InvalidSubGroupTasks[ti.UID] = ti + + fitErrors := common_info.NewFitErrors() + fitErrors.SetError(fmt.Sprintf( + "Pod references subgroup %q, which does not exist in PodGroup %s/%s", + taskSubGroupName, + pgi.Namespace, + pgi.Name, + )) + pgi.AddTaskFitErrors(ti, fitErrors) +} diff --git a/pkg/scheduler/api/podgroup_info/job_info_test.go b/pkg/scheduler/api/podgroup_info/job_info_test.go index da460b70f..6b60cbc29 100644 --- a/pkg/scheduler/api/podgroup_info/job_info_test.go +++ b/pkg/scheduler/api/podgroup_info/job_info_test.go @@ -23,11 +23,13 @@ import ( "reflect" "testing" + "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/utils/ptr" + enginev2alpha2 "github.com/kai-scheduler/KAI-scheduler/pkg/apis/scheduling/v2alpha2" commonconstants "github.com/kai-scheduler/KAI-scheduler/pkg/common/constants" "github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api/common_info" "github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api/pod_info" @@ -47,6 +49,8 @@ func jobInfoEqual(l, r *PodGroupInfo) bool { rCopy.AllocatedVector = nil lCopy.VectorMap = nil rCopy.VectorMap = nil + lCopy.InvalidSubGroupTasks = nil + rCopy.InvalidSubGroupTasks = nil if !reflect.DeepEqual(lCopy, rCopy) { return false @@ -132,6 +136,47 @@ func TestAddTaskInfo(t *testing.T) { } } +func TestAddTaskInfoTracksInvalidSubGroupTask(t *testing.T) { + vectorMap := resource_info.BuildResourceVectorMap([]v1.ResourceList{common_info.BuildResourceList("1000m", "1G")}) + pod := common_info.BuildPod( + "ns-1", + "pod-1", + "", + v1.PodPending, + common_info.BuildResourceList("1000m", "1G"), + nil, + map[string]string{commonconstants.SubGroupLabelKey: "missing-subgroup"}, + map[string]string{ + commonconstants.PodGroupAnnotationForPod: "group-1", + }, + ) + task := pod_info.NewTaskInfo(pod, nil, vectorMap) + + info := NewPodGroupInfoWithVectorMap("group-1", vectorMap) + info.SetPodGroup(&enginev2alpha2.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "group-1", + Namespace: "ns-1", + }, + Spec: enginev2alpha2.PodGroupSpec{ + Queue: "queue-1", + SubGroups: []enginev2alpha2.SubGroup{ + { + Name: "valid-subgroup", + MinMember: ptr.To(int32(1)), + }, + }, + }, + }) + + info.AddTaskInfo(task) + + assert.Empty(t, info.GetAllPodsMap()) + assert.Len(t, info.GetInvalidSubGroupTasks(), 1) + assert.Equal(t, task, info.GetInvalidSubGroupTasks()[task.UID]) + assert.Contains(t, info.TasksFitErrors[task.UID].Error(), `missing-subgroup`) +} + func TestDeleteTaskInfo(t *testing.T) { // case1 case01_uid := common_info.PodGroupID("owner1") diff --git a/pkg/scheduler/cache/cluster_info/cluster_info_test.go b/pkg/scheduler/cache/cluster_info/cluster_info_test.go index 65e76f544..f84ef357c 100644 --- a/pkg/scheduler/cache/cluster_info/cluster_info_test.go +++ b/pkg/scheduler/cache/cluster_info/cluster_info_test.go @@ -962,9 +962,10 @@ func TestBindRequests(t *testing.T) { func TestSnapshotPodGroups(t *testing.T) { tests := map[string]struct { - objs []runtime.Object - kubeObjs []runtime.Object - results []*podgroup_info.PodGroupInfo + objs []runtime.Object + kubeObjs []runtime.Object + results []*podgroup_info.PodGroupInfo + invalidSubGroupTasks map[common_info.PodGroupID][]common_info.PodID }{ "BasicUsage": { objs: []runtime.Object{ @@ -1233,6 +1234,74 @@ func TestSnapshotPodGroups(t *testing.T) { }(), }, }, + "With invalid subgroup pod": { + objs: []runtime.Object{ + &enginev2alpha2.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "podGroup-0", + UID: "ABC", + }, + Spec: enginev2alpha2.PodGroupSpec{ + Queue: "queue-0", + SubGroups: []enginev2alpha2.SubGroup{ + { + Name: "SubGroup-0", + MinMember: ptr.To(int32(1)), + }, + }, + }, + }, + }, + kubeObjs: []runtime.Object{ + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNamespace, + Name: "pod-valid", + UID: types.UID(fmt.Sprintf("%s/pod-valid", testNamespace)), + Annotations: map[string]string{ + commonconstants.PodGroupAnnotationForPod: "podGroup-0", + }, + Labels: map[string]string{ + commonconstants.SubGroupLabelKey: "SubGroup-0", + }, + }, + }, + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNamespace, + Name: "pod-invalid", + UID: types.UID(fmt.Sprintf("%s/pod-invalid", testNamespace)), + Annotations: map[string]string{ + commonconstants.PodGroupAnnotationForPod: "podGroup-0", + }, + Labels: map[string]string{ + commonconstants.SubGroupLabelKey: "missing-subgroup", + }, + }, + }, + }, + results: []*podgroup_info.PodGroupInfo{ + func() *podgroup_info.PodGroupInfo { + subGroup0 := subgroup_info.NewPodSet("SubGroup-0", 1, nil) + subGroup0.AssignTask(&pod_info.PodInfo{UID: "pod-valid", SubGroupName: "SubGroup-0"}) + + subGroupSet := subgroup_info.NewSubGroupSet(subgroup_info.RootSubGroupSetName, nil) + subGroupSet.AddPodSet(subGroup0) + + return &podgroup_info.PodGroupInfo{ + Name: "podGroup-0", + Queue: "queue-0", + RootSubGroupSet: subGroupSet, + PodSets: map[string]*subgroup_info.PodSet{ + "SubGroup-0": subGroup0, + }, + } + }(), + }, + invalidSubGroupTasks: map[common_info.PodGroupID][]common_info.PodID{ + "podGroup-0": {common_info.PodID(fmt.Sprintf("%s/pod-invalid", testNamespace))}, + }, + }, } for name, test := range tests { @@ -1275,6 +1344,12 @@ func TestSnapshotPodGroups(t *testing.T) { } } } + + expectedInvalidTasks := test.invalidSubGroupTasks[common_info.PodGroupID(expected.Name)] + assert.Len(t, pg.GetInvalidSubGroupTasks(), len(expectedInvalidTasks)) + for _, taskID := range expectedInvalidTasks { + assert.Contains(t, pg.GetInvalidSubGroupTasks(), taskID) + } } } diff --git a/pkg/scheduler/cache/record_job_status_event_test.go b/pkg/scheduler/cache/record_job_status_event_test.go index 62cea4e83..cb836ded5 100644 --- a/pkg/scheduler/cache/record_job_status_event_test.go +++ b/pkg/scheduler/cache/record_job_status_event_test.go @@ -452,6 +452,104 @@ func TestRecordJobStatusEvent(t *testing.T) { } } +func TestRecordJobStatusEventInvalidSubGroupPod(t *testing.T) { + validPod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-valid", + Namespace: "namespace-1", + UID: "pod-valid", + Annotations: map[string]string{ + "pod-group-name": "group-1", + }, + Labels: map[string]string{ + "kai.scheduler/subgroup-name": "valid-subgroup", + }, + }, + } + invalidPod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-invalid", + Namespace: "namespace-1", + UID: "pod-invalid", + Annotations: map[string]string{ + "pod-group-name": "group-1", + }, + Labels: map[string]string{ + "kai.scheduler/subgroup-name": "missing-subgroup", + }, + }, + } + podGroup := &enginev2alpha2.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "group-1", + Namespace: "namespace-1", + UID: "group-1", + }, + Spec: enginev2alpha2.PodGroupSpec{ + Queue: "queue-1", + SubGroups: []enginev2alpha2.SubGroup{ + { + Name: "valid-subgroup", + MinMember: ptr.To(int32(2)), + }, + }, + }, + } + + kubeClient := fake.NewSimpleClientset(validPod, invalidPod) + kubeAiSchedulerClient := kubeaischedulerfake.NewSimpleClientset(podGroup) + cache := New(&SchedulerCacheParams{ + KubeClient: kubeClient, + KAISchedulerClient: kubeAiSchedulerClient, + NodePoolParams: &conf.SchedulingNodePoolParams{}, + DetailedFitErrors: false, + FullHierarchyFairness: true, + NumOfStatusRecordingWorkers: 4, + DiscoveryClient: kubeClient.Discovery(), + }) + + stopCh := make(chan struct{}) + cache.Run(stopCh) + cache.WaitForCacheSync(stopCh) + defer close(stopCh) + + vectorMap := resource_info.NewResourceVectorMap() + job := podgroup_info.NewPodGroupInfoWithVectorMap("group-1", vectorMap) + job.SetPodGroup(podGroup) + job.AddTaskInfo(pod_info.NewTaskInfo(validPod, nil, vectorMap)) + job.AddTaskInfo(pod_info.NewTaskInfo(invalidPod, nil, vectorMap)) + + err := cache.RecordJobStatusEvent(job) + assert.NoError(t, err) + + invalidPodObj, err := waitForCondition(func() (runtime.Object, error) { + pod, err := kubeClient.CoreV1().Pods("namespace-1").Get(context.TODO(), "pod-invalid", metav1.GetOptions{}) + if err != nil { + return nil, err + } + if len(pod.Status.Conditions) > 0 { + return pod, nil + } + return nil, fmt.Errorf("no conditions found for pod %s", pod.Name) + }) + if err != nil { + t.Fatal(err) + } + + updatedInvalidPod := invalidPodObj.(*v1.Pod) + assert.Len(t, updatedInvalidPod.Status.Conditions, 1) + assert.Equal(t, v1.PodReasonUnschedulable, updatedInvalidPod.Status.Conditions[0].Reason) + assert.Contains(t, updatedInvalidPod.Status.Conditions[0].Message, "missing-subgroup") + + updatedValidPod, err := kubeClient.CoreV1().Pods("namespace-1").Get(context.TODO(), "pod-valid", metav1.GetOptions{}) + assert.NoError(t, err) + assert.Empty(t, updatedValidPod.Status.Conditions) + + updatedPodGroup, err := kubeAiSchedulerClient.SchedulingV2alpha2().PodGroups("namespace-1").Get(context.TODO(), "group-1", metav1.GetOptions{}) + assert.NoError(t, err) + assert.Empty(t, updatedPodGroup.Status.SchedulingConditions) +} + func waitForCondition(condition func() (runtime.Object, error)) (runtime.Object, error) { timer := time.NewTimer(1 * time.Second) ticker := time.NewTicker(10 * time.Millisecond) diff --git a/pkg/scheduler/cache/status_updater/default_status_updater.go b/pkg/scheduler/cache/status_updater/default_status_updater.go index d09eb53a6..dc684b46b 100644 --- a/pkg/scheduler/cache/status_updater/default_status_updater.go +++ b/pkg/scheduler/cache/status_updater/default_status_updater.go @@ -205,6 +205,9 @@ func (su *defaultStatusUpdater) RecordJobStatusEvent(job *podgroup_info.PodGroup if job.StalenessInfo.Stale { su.recordStaleJobEvent(job) } + if err := su.recordInvalidSubGroupPodsEvents(job); err != nil { + return err + } updatePodgroupStatus := false if job.GetNumPendingTasks() > 0 || job.GetNumGatedTasks() > 0 { @@ -348,6 +351,10 @@ func (su *defaultStatusUpdater) recordUnschedulablePodsEvents(job *podgroup_info // Update podCondition for tasks Allocated and Pending before job discarded var errs []error for _, taskInfo := range job.PodStatusIndex[pod_status.Pending] { + if job.IsInvalidSubGroupTask(taskInfo.UID) { + continue + } + msg := common_info.DefaultPodError fitError := job.TasksFitErrors[taskInfo.UID] if fitError != nil { @@ -374,6 +381,28 @@ func (su *defaultStatusUpdater) recordUnschedulablePodsEvents(job *podgroup_info return errors.Join(errs...) } +func (su *defaultStatusUpdater) recordInvalidSubGroupPodsEvents(job *podgroup_info.PodGroupInfo) error { + var errs []error + + for _, taskInfo := range job.GetInvalidSubGroupTasks() { + msg := common_info.DefaultPodError + if fitError := job.TasksFitErrors[taskInfo.UID]; fitError != nil { + msg = fitError.Error() + if su.detailedFitErrors { + msg = fitError.DetailedError() + } + } + + msg = su.addNodePoolPrefixIfNeeded(job, msg) + if err := su.markTaskUnschedulable(taskInfo.Pod, msg, true); err != nil { + errs = append(errs, fmt.Errorf("failed to update invalid subgroup task status <%s/%s>: %v", + taskInfo.Namespace, taskInfo.Name, err)) + } + } + + return errors.Join(errs...) +} + func (su *defaultStatusUpdater) updatePodGroupAnnotations(job *podgroup_info.PodGroupInfo) ([]byte, error) { old := job.PodGroup.DeepCopy() updatedStaleTime := setPodGroupStaleTimeStamp(job.PodGroup, job.StalenessInfo.TimeStamp) diff --git a/test/e2e/suites/integrations/k8s_native/k8s_native_specs.go b/test/e2e/suites/integrations/k8s_native/k8s_native_specs.go index 91458b209..b3120414a 100644 --- a/test/e2e/suites/integrations/k8s_native/k8s_native_specs.go +++ b/test/e2e/suites/integrations/k8s_native/k8s_native_specs.go @@ -18,10 +18,12 @@ import ( "k8s.io/utils/ptr" v2 "github.com/kai-scheduler/KAI-scheduler/pkg/apis/scheduling/v2" + schedulingv2alpha2 "github.com/kai-scheduler/KAI-scheduler/pkg/apis/scheduling/v2alpha2" "github.com/kai-scheduler/KAI-scheduler/pkg/common/constants" testcontext "github.com/kai-scheduler/KAI-scheduler/test/e2e/modules/context" "github.com/kai-scheduler/KAI-scheduler/test/e2e/modules/resources/capacity" "github.com/kai-scheduler/KAI-scheduler/test/e2e/modules/resources/rd" + "github.com/kai-scheduler/KAI-scheduler/test/e2e/modules/resources/rd/pod_group" "github.com/kai-scheduler/KAI-scheduler/test/e2e/modules/resources/rd/queue" "github.com/kai-scheduler/KAI-scheduler/test/e2e/modules/utils" "github.com/kai-scheduler/KAI-scheduler/test/e2e/modules/wait" @@ -218,6 +220,67 @@ func DescribeK8sNativeSpecs() bool { wait.ForPodScheduled(ctx, testCtx.ControllerClient, &pods.Items[0]) }) + It("BatchJob with external PodGroup", func(ctx context.Context) { + namespace := queue.GetConnectedNamespaceToQueue(testCtx.Queues[0]) + externalPodGroupName := utils.GenerateRandomK8sName(10) + + externalPodGroup := pod_group.Create(namespace, externalPodGroupName, testCtx.Queues[0].Name) + externalPodGroup.Spec.MinMember = ptr.To(int32(1)) + externalPodGroup.Spec.SubGroups = []schedulingv2alpha2.SubGroup{ + { + Name: "workers", + MinMember: ptr.To(int32(1)), + }, + } + externalPodGroup, err := testCtx.KubeAiSchedClientset.SchedulingV2alpha2(). + PodGroups(namespace).Create(ctx, externalPodGroup, metav1.CreateOptions{}) + Expect(err).To(Succeed()) + + job := rd.CreateBatchJobObject(testCtx.Queues[0], v1.ResourceRequirements{}) + job.Annotations = map[string]string{ + constants.SkipPodGrouperAnnotation: "true", + } + job.Spec.Template.Annotations[constants.PodGroupAnnotationForPod] = externalPodGroupName + job.Spec.Template.Labels[constants.SubGroupLabelKey] = "workers" + + job, err = testCtx.KubeClientset.BatchV1().Jobs(job.Namespace).Create(ctx, job, metav1.CreateOptions{}) + Expect(err).To(Succeed()) + + defer func() { + rd.DeleteJob(ctx, testCtx.KubeClientset, job) + }() + + labelSelector := metav1.LabelSelector{ + MatchLabels: map[string]string{ + rd.BatchJobAppLabel: job.Labels[rd.BatchJobAppLabel], + }, + } + wait.ForAtLeastOnePodCreation(ctx, testCtx.ControllerClient, labelSelector) + + Eventually(func(g Gomega) { + podGroups := &schedulingv2alpha2.PodGroupList{} + g.Expect(testCtx.ControllerClient.List(ctx, podGroups, client.InNamespace(namespace))).To(Succeed()) + g.Expect(podGroups.Items).To(HaveLen(1)) + g.Expect(podGroups.Items[0].Name).To(Equal(externalPodGroupName)) + }).WithTimeout(watcher.FlowTimeout).WithPolling(time.Second).WithContext(ctx).Should(Succeed()) + + pods := rd.GetJobPods(ctx, testCtx.KubeClientset, job) + Expect(pods).To(HaveLen(1)) + + Eventually(func(g Gomega) { + updatedPod := &v1.Pod{} + g.Expect(testCtx.ControllerClient.Get(ctx, client.ObjectKeyFromObject(&pods[0]), updatedPod)).To(Succeed()) + g.Expect(updatedPod.Annotations[constants.PodGroupAnnotationForPod]).To(Equal(externalPodGroupName)) + g.Expect(updatedPod.Labels[constants.SubGroupLabelKey]).To(Equal("workers")) + }).WithTimeout(watcher.FlowTimeout).WithPolling(time.Second).WithContext(ctx).Should(Succeed()) + + wait.ForPodScheduled(ctx, testCtx.ControllerClient, &pods[0]) + + createdPodGroup := &schedulingv2alpha2.PodGroup{} + Expect(testCtx.ControllerClient.Get(ctx, client.ObjectKeyFromObject(externalPodGroup), createdPodGroup)).To(Succeed()) + Expect(createdPodGroup.Name).To(Equal(externalPodGroupName)) + }) + It("CronJob", func(ctx context.Context) { namespace := queue.GetConnectedNamespaceToQueue(testCtx.Queues[0]) cronJob := rd.CreateCronJobObject(namespace, testCtx.Queues[0].Name)