@@ -37,6 +37,7 @@ import (
3737 "golang.org/x/crypto/ssh"
3838 batchv1 "k8s.io/api/batch/v1"
3939 corev1 "k8s.io/api/core/v1"
40+ v1 "k8s.io/api/core/v1"
4041 "k8s.io/apimachinery/pkg/api/equality"
4142 "k8s.io/apimachinery/pkg/api/errors"
4243 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -52,8 +53,10 @@ import (
5253 batchlisters "k8s.io/client-go/listers/batch/v1"
5354 corelisters "k8s.io/client-go/listers/core/v1"
5455 schedulinglisters "k8s.io/client-go/listers/scheduling/v1"
56+ restclientset "k8s.io/client-go/rest"
5557 "k8s.io/client-go/tools/cache"
5658 "k8s.io/client-go/tools/record"
59+ "k8s.io/client-go/tools/remotecommand"
5760 "k8s.io/client-go/util/workqueue"
5861 "k8s.io/klog"
5962 metrics "k8s.io/metrics/pkg/client/clientset/versioned"
@@ -269,7 +272,7 @@ func (pq PriorityQueue) Len() int { return len(pq) }
269272
270273func (pq PriorityQueue ) Less (i , j int ) bool {
271274 // We want Pop to give us the highest, not lowest, priority so we use greater than here.
272- return pq [i ].priority > pq [j ].priority
275+ return pq [i ].priority < pq [j ].priority
273276}
274277
275278func compare (a * Item , b * Item ) int {
@@ -300,6 +303,7 @@ func (pq *PriorityQueue) update(item *Item, priority int) {
300303
301304// MPIJobController is the controller implementation for MPIJob resources.
302305type MPIJobController struct {
306+ config restclientset.Config
303307 // kubeClient is a standard kubernetes clientset.
304308 kubeClient kubernetes.Interface
305309 metricsClient metrics.Interface
@@ -341,6 +345,7 @@ type MPIJobController struct {
341345 clock clock.WithTicker
342346
343347 latestReplicas map [string ]int32
348+ configMaps map [string ]string
344349 jobStatus map [string ]string
345350 deferredAction map [string ]string
346351 oldExpandReplicas map [string ]int32
@@ -353,6 +358,7 @@ type MPIJobController struct {
353358
354359// NewMPIJobController returns a new MPIJob controller.
355360func NewMPIJobController (
361+ config * restclientset.Config ,
356362 kubeClient kubernetes.Interface ,
357363 metricsClient metrics.Interface ,
358364 kubeflowClient clientset.Interface ,
@@ -366,13 +372,14 @@ func NewMPIJobController(
366372 priorityClassInformer schedulinginformers.PriorityClassInformer ,
367373 mpiJobInformer informers.MPIJobInformer ,
368374 namespace , gangSchedulingName string ) (* MPIJobController , error ) {
369- return NewMPIJobControllerWithClock (kubeClient , metricsClient , kubeflowClient , volcanoClient , schedClient ,
375+ return NewMPIJobControllerWithClock (config , kubeClient , metricsClient , kubeflowClient , volcanoClient , schedClient ,
370376 configMapInformer , secretInformer , serviceInformer , jobInformer , podInformer ,
371377 priorityClassInformer , mpiJobInformer , & clock.RealClock {}, namespace , gangSchedulingName )
372378}
373379
374380// NewMPIJobControllerWithClock returns a new MPIJob controller.
375381func NewMPIJobControllerWithClock (
382+ config * restclientset.Config ,
376383 kubeClient kubernetes.Interface ,
377384 metricsClient metrics.Interface ,
378385 kubeflowClient clientset.Interface ,
@@ -418,6 +425,7 @@ func NewMPIJobControllerWithClock(
418425 pqQueued := make (PriorityQueue , 0 )
419426
420427 controller := & MPIJobController {
428+ config : * config ,
421429 kubeClient : kubeClient ,
422430 metricsClient : metricsClient ,
423431 kubeflowClient : kubeflowClient ,
@@ -441,12 +449,13 @@ func NewMPIJobControllerWithClock(
441449 recorder : recorder ,
442450 clock : clock ,
443451 latestReplicas : make (map [string ]int32 ),
452+ configMaps : make (map [string ]string ),
444453 jobStatus : make (map [string ]string ),
445454 deferredAction : make (map [string ]string ),
446455 oldExpandReplicas : make (map [string ]int32 ),
447456 runningJobs : pqRunning ,
448457 queuedJobs : pqQueued ,
449- freeSlots : 8 ,
458+ freeSlots : 10 ,
450459 }
451460 // FIXME fix the free slots!
452461
@@ -657,6 +666,10 @@ func (c *MPIJobController) processNextWorkItem() bool {
657666func signalRescale (ipAddr string , port int32 , oldProcs int32 , newProcs int32 ) error {
658667 klog .Infof ("Running command: %s %s %s %s %s" , "./opt/rescale_client" , ipAddr , fmt .Sprint (port ), fmt .Sprint (oldProcs ), fmt .Sprint (newProcs ))
659668 out , err := exec .Command ("./opt/rescale_client" , ipAddr , fmt .Sprint (port ), fmt .Sprint (oldProcs ), fmt .Sprint (newProcs )).CombinedOutput ()
669+ if string (out ) == "0" {
670+ klog .Infof ("Error when rescaling" )
671+ return fmt .Errorf ("Error: %s" , string (out ))
672+ }
660673 klog .Infof ("Rescale signal output: %s" , string (out ))
661674 if err != nil {
662675 klog .Infof ("Error when rescaling" )
@@ -683,7 +696,46 @@ func getJobKey(mpiJob *kubeflow.MPIJob) string {
683696}
684697
685698func (c * MPIJobController ) printJobStatuses () {
686- klog .Infof ("%s" , fmt .Sprint (c .jobStatus ))
699+ klog .Infof ("Free slots = %d, %s" , c .freeSlots , fmt .Sprint (c .jobStatus ))
700+
701+ qPrios := make ([]int , 0 )
702+ for _ , item := range c .queuedJobs {
703+ qPrios = append (qPrios , item .priority )
704+ }
705+
706+ //klog.Infof("Running jobs = %s", fmt.Sprint(c.runningJobs))
707+ klog .Infof ("Queued job prios = %s" , fmt .Sprint (qPrios ))
708+ }
709+
710+ func (c * MPIJobController ) writeHostFile (pod * v1.Pod , hostFileString string ) (string , string , error ) {
711+ buf := & bytes.Buffer {}
712+ errBuf := & bytes.Buffer {}
713+ echoCmd := fmt .Sprintf ("echo '%s'" , hostFileString )
714+ command := fmt .Sprintf ("%s > %s" , echoCmd , configMountPath + "/" + hostfileName )
715+ klog .Infof ("Running command - %s" , command )
716+ request := c .kubeClient .CoreV1 ().RESTClient ().
717+ Post ().
718+ Namespace (pod .Namespace ).
719+ Resource ("pods" ).
720+ Name (pod .Name ).
721+ SubResource ("exec" ).
722+ VersionedParams (& v1.PodExecOptions {
723+ Command : []string {"/bin/sh" , "-c" , command },
724+ Stdin : false ,
725+ Stdout : true ,
726+ Stderr : true ,
727+ TTY : true ,
728+ }, scheme .ParameterCodec )
729+
730+ exec , err := remotecommand .NewSPDYExecutor (& c .config , "POST" , request .URL ())
731+ err = exec .StreamWithContext (context .TODO (), remotecommand.StreamOptions {
732+ Stdout : buf ,
733+ Stderr : errBuf ,
734+ })
735+ if err != nil {
736+ return "" , "" , fmt .Errorf ("%w Failed executing command %s on %v/%v" , err , command , pod .Namespace , pod .Name )
737+ }
738+ return buf .String (), errBuf .String (), nil
687739}
688740
689741// syncHandler compares the actual state with the desired, and attempts to
@@ -753,6 +805,7 @@ func (c *MPIJobController) syncHandler(key string) error {
753805 delete (c .deferredAction , key )
754806 delete (c .latestReplicas , key )
755807 delete (c .jobStatus , key )
808+ delete (c .configMaps , key )
756809
757810 for idx , item := range c .runningJobs {
758811 if getJobKey (& item .mpiJob ) == getJobKey (mpiJob ) {
@@ -922,16 +975,19 @@ func (c *MPIJobController) syncHandler(key string) error {
922975 return err
923976 }
924977
925- if config , err := c .getOrCreateConfigMap (mpiJob ); config == nil || err != nil {
978+ config , err := c .getOrCreateConfigMap (mpiJob )
979+ if config == nil || err != nil {
926980 return fmt .Errorf ("getting or creating ConfigMap: %w" , err )
927981 }
982+ c .configMaps [getJobKey (mpiJob )] = config .Data [hostfileName ]
928983
929984 if isExpand {
930985 c .deferredAction [key ] = expand
931986 }
932987
988+ config , err = c .getConfigMap (mpiJob )
933989 ready := c .countReadyWorkerPods (worker )
934- if ready == len (worker ) {
990+ if ready == len (worker ) && err == nil && config . Data [ hostfileName ] == c . configMaps [ getJobKey ( mpiJob )] {
935991 action , ok = c .deferredAction [key ]
936992 if ! ok {
937993 action = noop
@@ -941,7 +997,16 @@ func (c *MPIJobController) syncHandler(key string) error {
941997 klog .Infof ("Workers ready to expand %s, last replicas = %s, new pods = %s" ,
942998 fmt .Sprint (ready ), fmt .Sprint (lastReplicas ), fmt .Sprint (newPods ))
943999
944- time .Sleep (5 )
1000+ //launcherPods, err := c.jobPods(launcher)
1001+
1002+ //stdout, stderr, err := c.writeHostFile(launcherPods[0], config.Data[hostfileName])
1003+ //if err != nil {
1004+ // fmt.Sprintf("%w", err)
1005+ // return err
1006+ //}
1007+ //klog.Infof("Out: %s\nErr: %s", stdout, stderr)
1008+
1009+ time .Sleep (1.5e10 )
9451010
9461011 // wait for workers to be ready and send expand signal
9471012 klog .Infof ("Sending expand signal to job %q (%v)" , key , c .clock .Since (startTime ))
@@ -955,8 +1020,11 @@ func (c *MPIJobController) syncHandler(key string) error {
9551020 klog .Infof ("Waiting for workers to be ready %s/%s" , fmt .Sprint (ready ), fmt .Sprint (len (worker )))
9561021 }
9571022 }
1023+
1024+ config , err := c .getConfigMap (mpiJob )
9581025 if launcher == nil {
959- if c .jobStatus [getJobKey (mpiJob )] == created && c .countReadyWorkerPods (worker ) == len (worker ) {
1026+ if err == nil && config .Data [hostfileName ] == c .configMaps [getJobKey (mpiJob )] &&
1027+ c .jobStatus [getJobKey (mpiJob )] == created && c .countReadyWorkerPods (worker ) == len (worker ) {
9601028 launcher , err = c .kubeClient .BatchV1 ().Jobs (namespace ).Create (context .TODO (), c .newLauncherJob (mpiJob , len (worker )), metav1.CreateOptions {})
9611029 if err != nil {
9621030 c .recorder .Eventf (mpiJob , corev1 .EventTypeWarning , mpiJobFailedReason , "launcher pod created failed: %v" , err )
@@ -1139,6 +1207,14 @@ func (c *MPIJobController) countReadyWorkerPods(workers []*corev1.Pod) int {
11391207 return ready
11401208}
11411209
1210+ func (c * MPIJobController ) getConfigMap (mpiJob * kubeflow.MPIJob ) (* corev1.ConfigMap , error ) {
1211+ cm , err := c .configMapLister .ConfigMaps (mpiJob .Namespace ).Get (mpiJob .Name + configSuffix )
1212+ if err != nil {
1213+ return nil , err
1214+ }
1215+ return cm , nil
1216+ }
1217+
11421218// getOrCreateConfigMap gets the ConfigMap controlled by this MPIJob, or creates
11431219// one if it doesn't exist.
11441220func (c * MPIJobController ) getOrCreateConfigMap (mpiJob * kubeflow.MPIJob ) (* corev1.ConfigMap , error ) {
@@ -1169,12 +1245,16 @@ func (c *MPIJobController) getOrCreateConfigMap(mpiJob *kubeflow.MPIJob) (*corev
11691245
11701246 // If the ConfigMap is changed, update it
11711247 if ! equality .Semantic .DeepEqual (cm .Data , newCM .Data ) {
1248+ klog .Infof ("Update config map for job %s: %s" , getJobKey (mpiJob ), fmt .Sprint (newCM .Data ))
11721249 cm = cm .DeepCopy ()
11731250 cm .Data = newCM .Data
11741251 cm , err = c .kubeClient .CoreV1 ().ConfigMaps (mpiJob .Namespace ).Update (context .TODO (), cm , metav1.UpdateOptions {})
11751252 if err != nil {
11761253 return nil , err
11771254 }
1255+
1256+ cmCheck , _ := c .configMapLister .ConfigMaps (mpiJob .Namespace ).Get (mpiJob .Name + configSuffix )
1257+ klog .Infof ("After CM update: %s" , fmt .Sprint (cmCheck .Data ))
11781258 }
11791259
11801260 return cm , nil
@@ -1310,7 +1390,6 @@ func (c *MPIJobController) addWorker(mpiJob *kubeflow.MPIJob, workerIndex int) (
13101390}
13111391
13121392func (c * MPIJobController ) enqueueJobInternal (mpiJob * kubeflow.MPIJob ) {
1313- klog .Infof ("CALLED!" )
13141393 for _ , item := range c .queuedJobs {
13151394 if getJobKey (& item .mpiJob ) == getJobKey (mpiJob ) {
13161395 klog .Infof ("Skipping enqueue since already in queue" )
@@ -1365,6 +1444,9 @@ func (c *MPIJobController) calculateWorkerReplicas(mpiJob *kubeflow.MPIJob) (int
13651444 }
13661445 it := c .runningJobs [index ]
13671446 index -= 1
1447+ if * it .mpiJob .Spec .Priority > * mpiJob .Spec .Priority {
1448+ break
1449+ }
13681450 workerPodList , err := c .getRunningWorkerPods (& it .mpiJob )
13691451 if err != nil {
13701452 return - 1 , err
@@ -2012,6 +2094,7 @@ func (c *MPIJobController) newWorker(mpiJob *kubeflow.MPIJob, index int) *corev1
20122094}
20132095
20142096func (c * MPIJobController ) newLauncherJob (mpiJob * kubeflow.MPIJob , numWorkers int ) * batchv1.Job {
2097+ klog .Infof ("Creating launcher job for %s with %d workers" , getJobKey (mpiJob ), numWorkers )
20152098 job := & batchv1.Job {
20162099 ObjectMeta : metav1.ObjectMeta {
20172100 Name : mpiJob .Name + launcherSuffix ,
@@ -2064,7 +2147,7 @@ func (c *MPIJobController) newLauncherPodTemplate(mpiJob *kubeflow.MPIJob, numWo
20642147 container := & podTemplate .Spec .Containers [0 ]
20652148 container .Env = append (container .Env , launcherEnvVars ... )
20662149 container .Args = append ([]string {fmt .Sprint ("+p" , numWorkers )}, container .Args ... )
2067- container .Args = append (container .Args , "++nodelist" , "/etc/mpi/hostfile" , "++server" , "++server-port" , fmt .Sprint (ccsPort ))
2150+ container .Args = append (container .Args , "++nodelist" , configMountPath + "/" + hostfileName , "++server" , "++server-port" , fmt .Sprint (ccsPort ))
20682151 slotsStr := strconv .Itoa (int (* mpiJob .Spec .SlotsPerWorker ))
20692152 switch mpiJob .Spec .MPIImplementation {
20702153 case kubeflow .MPIImplementationOpenMPI :
0 commit comments