@@ -250,6 +250,21 @@ type Item struct {
250250// A PriorityQueue implements heap.Interface and holds Items.
251251type PriorityQueue []* Item
252252
253+ func (it * Item ) DeepCopy () Item {
254+ itNew := new (Item )
255+ itNew .mpiJob = * it .mpiJob .DeepCopy ()
256+ itNew .priority = it .priority
257+ return * itNew
258+ }
259+
260+ func (pq PriorityQueue ) DeepCopy () PriorityQueue {
261+ cpy := make (PriorityQueue , len (pq ))
262+ for idx , item := range pq {
263+ * cpy [idx ] = item .DeepCopy ()
264+ }
265+ return cpy
266+ }
267+
253268func (pq PriorityQueue ) Len () int { return len (pq ) }
254269
255270func (pq PriorityQueue ) Less (i , j int ) bool {
@@ -652,11 +667,11 @@ func signalRescale(ipAddr string, port int32, oldProcs int32, newProcs int32) er
652667
653668func (c * MPIJobController ) sendRescaleSignal (mpiJob * kubeflow.MPIJob , oldPodCount int32 , newPodCount int32 ) error {
654669 launcher , err := c .getLauncherJob (mpiJob )
655- if err != nil {
670+ if err != nil || launcher == nil {
656671 return err
657672 }
658673 launcherPods , err := c .jobPods (launcher )
659- if err != nil {
674+ if err != nil || launcherPods == nil || len ( launcherPods ) == 0 {
660675 return err
661676 }
662677 ipAddr := launcherPods [0 ].Status .PodIP
@@ -667,10 +682,16 @@ func getJobKey(mpiJob *kubeflow.MPIJob) string {
667682 return mpiJob .Namespace + "/" + mpiJob .Name
668683}
669684
685+ func (c * MPIJobController ) printJobStatuses () {
686+ klog .Infof ("%s" , fmt .Sprint (c .jobStatus ))
687+ }
688+
670689// syncHandler compares the actual state with the desired, and attempts to
671690// converge the two. It then updates the Status block of the MPIJob resource
672691// with the current status of the resource.
673692func (c * MPIJobController ) syncHandler (key string ) error {
693+ klog .Infof ("syncHandler called for %s" , key )
694+ c .printJobStatuses ()
674695 startTime := c .clock .Now ()
675696 defer func () {
676697 klog .Infof ("Finished syncing job %q (%v)" , key , c .clock .Since (startTime ))
@@ -732,6 +753,7 @@ func (c *MPIJobController) syncHandler(key string) error {
732753 delete (c .deferredAction , key )
733754 delete (c .latestReplicas , key )
734755 delete (c .jobStatus , key )
756+
735757 for idx , item := range c .runningJobs {
736758 if getJobKey (& item .mpiJob ) == getJobKey (mpiJob ) {
737759 c .runningJobs = append (c .runningJobs [:idx ], c .runningJobs [idx + 1 :]... )
@@ -879,18 +901,22 @@ func (c *MPIJobController) syncHandler(key string) error {
879901 klog .Infof ("Replicas for %s set to %d" , getJobKey (mpiJob ), c .latestReplicas [getJobKey (mpiJob )])
880902 if err == jobQueuedError {
881903 klog .Infof ("Queued a job due to low capacity from calculateWorkerReplicas" )
904+ c .enqueueJobInternal (mpiJob )
882905 return nil
883906 } else if err != nil {
884907 return fmt .Errorf ("Error: %w" , err )
885908 }
886909 c .jobStatus [getJobKey (mpiJob )] = created
887- } else if status == queued {
910+ c .freeSlots -= 1 // This one is for the launcher
911+ } else if status == queued && c .latestReplicas [getJobKey (mpiJob )] > 0 {
888912 c .jobStatus [getJobKey (mpiJob )] = created
913+ c .freeSlots -= 1 // This one is for the launcher
889914 }
890915
891916 worker , err = c .getOrCreateWorker (mpiJob )
892917 if err == jobQueuedError {
893918 klog .Infof ("Queued a job due to low capacity from getOrCreateWorker" )
919+ c .enqueueJobInternal (mpiJob )
894920 return nil
895921 } else if err != nil {
896922 return err
@@ -930,18 +956,27 @@ func (c *MPIJobController) syncHandler(key string) error {
930956 }
931957 }
932958 if launcher == nil {
933- if c .countReadyWorkerPods (worker ) == len (worker ) {
959+ if c .jobStatus [ getJobKey ( mpiJob )] == created && c . countReadyWorkerPods (worker ) == len (worker ) {
934960 launcher , err = c .kubeClient .BatchV1 ().Jobs (namespace ).Create (context .TODO (), c .newLauncherJob (mpiJob , len (worker )), metav1.CreateOptions {})
935961 if err != nil {
936962 c .recorder .Eventf (mpiJob , corev1 .EventTypeWarning , mpiJobFailedReason , "launcher pod created failed: %v" , err )
937963 return fmt .Errorf ("creating launcher Pod: %w" , err )
938964 }
939965 c .jobStatus [getJobKey (mpiJob )] = running
940966 c .runningJobs .Push (& Item {* mpiJob , int (* mpiJob .Spec .Priority )})
941- c .freeSlots -= 1
942967 } else {
943968 klog .V (4 ).Infof ("Waiting for workers %s/%s to start." , mpiJob .Namespace , mpiJob .Name )
944969 }
970+ } else {
971+ launcherPods , err := c .jobPods (launcher )
972+ if err != nil {
973+ return err
974+ }
975+ if launcherPods != nil && len (launcherPods ) > 0 && isPodRunning (launcherPods [0 ]) {
976+ klog .Infof ("Setting job status for %s to running" , getJobKey (mpiJob ))
977+ c .jobStatus [getJobKey (mpiJob )] = running
978+ c .checkJobQueue ()
979+ }
945980 }
946981 }
947982
@@ -1275,8 +1310,40 @@ func (c *MPIJobController) addWorker(mpiJob *kubeflow.MPIJob, workerIndex int) (
12751310}
12761311
12771312func (c * MPIJobController ) enqueueJobInternal (mpiJob * kubeflow.MPIJob ) {
1313+ klog .Infof ("CALLED!" )
1314+ for _ , item := range c .queuedJobs {
1315+ if getJobKey (& item .mpiJob ) == getJobKey (mpiJob ) {
1316+ klog .Infof ("Skipping enqueue since already in queue" )
1317+ return
1318+ }
1319+ }
12781320 c .queuedJobs .Push (& Item {* mpiJob , int (* mpiJob .Spec .Priority )})
12791321 c .jobStatus [getJobKey (mpiJob )] = queued
1322+ klog .Infof ("enqueueJobInternal called for job %s. Queue size = %d" , getJobKey (mpiJob ), len (c .queuedJobs ))
1323+ }
1324+
1325+ func (c * MPIJobController ) checkJobQueue () {
1326+ index := 0
1327+ klog .Infof ("checkJobQueue called, queue size = %d" , len (c .queuedJobs ))
1328+ var err error
1329+ for {
1330+ if index == len (c .queuedJobs ) {
1331+ break
1332+ }
1333+ mpiJob := c .queuedJobs [index ].mpiJob
1334+ c .latestReplicas [getJobKey (& mpiJob )], err = c .calculateWorkerReplicas (& mpiJob )
1335+ if err != nil {
1336+ //c.enqueueJobInternal(&mpiJob)
1337+ index += 1
1338+ continue
1339+ }
1340+ c .queue .AddRateLimited (getJobKey (& mpiJob ))
1341+ if index < len (c .queuedJobs )- 1 {
1342+ c .queuedJobs = append (c .queuedJobs [:index ], c .queuedJobs [index + 1 :]... )
1343+ } else {
1344+ c .queuedJobs = c .queuedJobs [:index ]
1345+ }
1346+ }
12801347}
12811348
12821349func (c * MPIJobController ) calculateWorkerReplicas (mpiJob * kubeflow.MPIJob ) (int32 , error ) {
@@ -1311,7 +1378,7 @@ func (c *MPIJobController) calculateWorkerReplicas(mpiJob *kubeflow.MPIJob) (int
13111378
13121379 if numWorkersToFree > 0 {
13131380 // queue this job
1314- c .enqueueJobInternal (mpiJob )
1381+ // c.enqueueJobInternal(mpiJob)
13151382 klog .Infof ("Queued job 1 %s, %d" , getJobKey (mpiJob ), numWorkersToFree )
13161383 return - 1 , jobQueuedError
13171384 } else {
@@ -1321,9 +1388,18 @@ func (c *MPIJobController) calculateWorkerReplicas(mpiJob *kubeflow.MPIJob) (int
13211388 if numWorkersToFree == 0 || index < 0 {
13221389 break
13231390 }
1324- // TODO - get head of running and queued and choose job
1325- // with highest priority to run
1391+
13261392 it := c .runningJobs [index ]
1393+
1394+ // if the running job priority is higher than the new job
1395+ // don't shrink it
1396+ if * it .mpiJob .Spec .Priority > * mpiJob .Spec .Priority {
1397+ break
1398+ }
1399+
1400+ if c .jobStatus [getJobKey (& it .mpiJob )] != running {
1401+ continue
1402+ }
13271403 index -= 1
13281404 workerPodList , err := c .getRunningWorkerPods (& it .mpiJob )
13291405 if err != nil {
@@ -1350,7 +1426,7 @@ func (c *MPIJobController) calculateWorkerReplicas(mpiJob *kubeflow.MPIJob) (int
13501426 }
13511427 if numWorkersToFree > 0 {
13521428 // queue this job
1353- c .enqueueJobInternal (mpiJob )
1429+ // c.enqueueJobInternal(mpiJob)
13541430 klog .Infof ("Queued job 2 %s, %d" , getJobKey (mpiJob ), numWorkersToFree )
13551431 return - 1 , jobQueuedError
13561432 }
@@ -1386,7 +1462,7 @@ func (c *MPIJobController) getOrCreateWorker(mpiJob *kubeflow.MPIJob) ([]*corev1
13861462 getJobKey (mpiJob ), c .latestReplicas [getJobKey (mpiJob )], len (podRunningList ))
13871463
13881464 if int (c .latestReplicas [getJobKey (mpiJob )])- len (podFullList ) > c .freeSlots {
1389- c .enqueueJobInternal (mpiJob )
1465+ // c.enqueueJobInternal(mpiJob)
13901466 klog .Infof ("Queued job from getOrCreateWorker %s, %d, %d, %d" , getJobKey (mpiJob ),
13911467 int (c .latestReplicas [getJobKey (mpiJob )]), len (podFullList ), c .freeSlots )
13921468 return podFullList , jobQueuedError
@@ -1856,6 +1932,45 @@ func (c *MPIJobController) newWorker(mpiJob *kubeflow.MPIJob, index int) *corev1
18561932 podTemplate .Labels [kubeflow .ReplicaIndexLabel ] = strconv .Itoa (index )
18571933 podTemplate .Spec .Hostname = name
18581934 podTemplate .Spec .Subdomain = mpiJob .Name // Matches job' Service name.
1935+
1936+ launcherMatch := make (map [string ]string )
1937+ launcherMatch [kubeflow .OperatorNameLabel ] = kubeflow .OperatorName
1938+ launcherMatch [kubeflow .JobNameLabel ] = mpiJob .Name
1939+ launcherMatch [kubeflow .JobRoleLabel ] = launcher
1940+
1941+ workerMatch := make (map [string ]string )
1942+ workerMatch [kubeflow .OperatorNameLabel ] = kubeflow .OperatorName
1943+ workerMatch [kubeflow .JobNameLabel ] = mpiJob .Name
1944+ workerMatch [kubeflow .JobRoleLabel ] = worker
1945+
1946+ schedulingAffinity := make ([]corev1.WeightedPodAffinityTerm , 0 )
1947+ schedulingAffinity = append (schedulingAffinity , corev1.WeightedPodAffinityTerm {
1948+ Weight : 50 ,
1949+ PodAffinityTerm : corev1.PodAffinityTerm {
1950+ LabelSelector : & metav1.LabelSelector {
1951+ MatchLabels : launcherMatch ,
1952+ },
1953+ TopologyKey : "topology.kubernetes.io/zone" ,
1954+ },
1955+ })
1956+ schedulingAffinity = append (schedulingAffinity , corev1.WeightedPodAffinityTerm {
1957+ Weight : 100 ,
1958+ PodAffinityTerm : corev1.PodAffinityTerm {
1959+ LabelSelector : & metav1.LabelSelector {
1960+ MatchLabels : workerMatch ,
1961+ },
1962+ TopologyKey : "topology.kubernetes.io/zone" ,
1963+ },
1964+ })
1965+
1966+ if podTemplate .Spec .Affinity == nil {
1967+ podTemplate .Spec .Affinity = & corev1.Affinity {
1968+ PodAffinity : & corev1.PodAffinity {
1969+ PreferredDuringSchedulingIgnoredDuringExecution : schedulingAffinity ,
1970+ },
1971+ }
1972+ }
1973+
18591974 if podTemplate .Spec .HostNetwork {
18601975 // Allows resolution of worker hostnames without needing to include the
18611976 // namespace or cluster domain.
@@ -2002,6 +2117,30 @@ func (c *MPIJobController) newLauncherPodTemplate(mpiJob *kubeflow.MPIJob, numWo
20022117 MountPath : configMountPath ,
20032118 })
20042119
2120+ workerMatch := make (map [string ]string )
2121+ workerMatch [kubeflow .OperatorNameLabel ] = kubeflow .OperatorName
2122+ workerMatch [kubeflow .JobNameLabel ] = mpiJob .Name
2123+ workerMatch [kubeflow .JobRoleLabel ] = worker
2124+
2125+ schedulingAffinity := make ([]corev1.WeightedPodAffinityTerm , 0 )
2126+ schedulingAffinity = append (schedulingAffinity , corev1.WeightedPodAffinityTerm {
2127+ Weight : 100 ,
2128+ PodAffinityTerm : corev1.PodAffinityTerm {
2129+ LabelSelector : & metav1.LabelSelector {
2130+ MatchLabels : workerMatch ,
2131+ },
2132+ TopologyKey : "topology.kubernetes.io/zone" ,
2133+ },
2134+ })
2135+
2136+ if podTemplate .Spec .Affinity == nil {
2137+ podTemplate .Spec .Affinity = & corev1.Affinity {
2138+ PodAffinity : & corev1.PodAffinity {
2139+ PreferredDuringSchedulingIgnoredDuringExecution : schedulingAffinity ,
2140+ },
2141+ }
2142+ }
2143+
20052144 return corev1.PodTemplateSpec {
20062145 ObjectMeta : metav1.ObjectMeta {
20072146 Labels : podTemplate .Labels ,
0 commit comments