Skip to content

Commit f66b53f

Browse files
committed
refactor the internal logic of podGroupManager
1 parent 8b7cc50 commit f66b53f

File tree

4 files changed

+68
-39
lines changed

4 files changed

+68
-39
lines changed

pkg/coscheduling/core/core.go

Lines changed: 59 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ func (s *PermitState) Clone() framework.StateData {
6666
type Manager interface {
6767
PreFilter(context.Context, *corev1.Pod) error
6868
Permit(context.Context, *framework.CycleState, *corev1.Pod) Status
69+
Reserve(context.Context, *corev1.Pod)
6970
Unreserve(context.Context, *corev1.Pod)
7071
GetPodGroup(context.Context, *corev1.Pod) (string, *v1alpha1.PodGroup)
7172
GetAssignedPodCount(string) int
@@ -95,7 +96,7 @@ type PodGroupManager struct {
9596
sync.RWMutex
9697
}
9798

98-
func AddPodFactory(pgMgr *PodGroupManager) func(obj interface{}) {
99+
func AddPodFactory(ctx context.Context, pgMgr *PodGroupManager) func(obj interface{}) {
99100
return func(obj interface{}) {
100101
p, ok := obj.(*corev1.Pod)
101102
if !ok {
@@ -104,22 +105,52 @@ func AddPodFactory(pgMgr *PodGroupManager) func(obj interface{}) {
104105
if p.Spec.NodeName == "" {
105106
return
106107
}
107-
pgFullName, _ := pgMgr.GetPodGroup(context.Background(), p)
108-
if pgFullName == "" {
108+
pgMgr.Reserve(ctx, p)
109+
}
110+
}
111+
112+
func UpdatePodFactory(ctx context.Context, pgMgr *PodGroupManager) func(oldObj interface{}, newObj interface{}) {
113+
return func(oldObj interface{}, newObj interface{}) {
114+
oldPod, ok := oldObj.(*corev1.Pod)
115+
if !ok {
109116
return
110117
}
111-
pgMgr.RWMutex.Lock()
112-
defer pgMgr.RWMutex.Unlock()
113-
if assigned, exist := pgMgr.assignedPodsByPG[pgFullName]; exist {
114-
assigned.Insert(p.Name)
115-
} else {
116-
pgMgr.assignedPodsByPG[pgFullName] = sets.New(p.Name)
118+
newPod, ok := newObj.(*corev1.Pod)
119+
if !ok {
120+
return
121+
}
122+
// If the pod is assumed or bound, it should be reserved.
123+
if oldPod.Spec.NodeName == "" && newPod.Spec.NodeName != "" {
124+
pgMgr.Reserve(ctx, newPod)
125+
}
126+
}
127+
}
128+
129+
func DelPodFactory(ctx context.Context, pgMgr *PodGroupManager) func(obj interface{}) {
130+
return func(obj interface{}) {
131+
var pod *corev1.Pod
132+
switch t := obj.(type) {
133+
case *corev1.Pod:
134+
pod = t
135+
case cache.DeletedFinalStateUnknown:
136+
var ok bool
137+
if pod, ok = t.Obj.(*corev1.Pod); !ok {
138+
return
139+
}
140+
default:
141+
return
142+
}
143+
if pod.Spec.NodeName == "" {
144+
return
117145
}
146+
pgMgr.Unreserve(ctx, pod)
118147
}
119148
}
120149

121150
// NewPodGroupManager creates a new operation object.
122-
func NewPodGroupManager(client client.Client, snapshotSharedLister framework.SharedLister, scheduleTimeout *time.Duration, podInformer informerv1.PodInformer) *PodGroupManager {
151+
func NewPodGroupManager(
152+
ctx context.Context, client client.Client, snapshotSharedLister framework.SharedLister,
153+
scheduleTimeout *time.Duration, podInformer informerv1.PodInformer) *PodGroupManager {
123154
pgMgr := &PodGroupManager{
124155
client: client,
125156
snapshotSharedLister: snapshotSharedLister,
@@ -130,30 +161,9 @@ func NewPodGroupManager(client client.Client, snapshotSharedLister framework.Sha
130161
assignedPodsByPG: map[string]sets.Set[string]{},
131162
}
132163
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
133-
AddFunc: AddPodFactory(pgMgr),
134-
DeleteFunc: func(obj interface{}) {
135-
switch t := obj.(type) {
136-
case *corev1.Pod:
137-
pod := t
138-
if pod.Spec.NodeName == "" {
139-
return
140-
}
141-
pgMgr.Unreserve(context.Background(), pod)
142-
return
143-
case cache.DeletedFinalStateUnknown:
144-
pod, ok := t.Obj.(*corev1.Pod)
145-
if !ok {
146-
return
147-
}
148-
if pod.Spec.NodeName == "" {
149-
return
150-
}
151-
pgMgr.Unreserve(context.Background(), pod)
152-
return
153-
default:
154-
return
155-
}
156-
},
164+
AddFunc: AddPodFactory(ctx, pgMgr),
165+
UpdateFunc: UpdatePodFactory(ctx, pgMgr),
166+
DeleteFunc: DelPodFactory(ctx, pgMgr),
157167
})
158168
return pgMgr
159169
}
@@ -312,6 +322,21 @@ func (pgMgr *PodGroupManager) Permit(ctx context.Context, state *framework.Cycle
312322
return Wait
313323
}
314324

325+
// Reserve adds Pod to the assignedPodsByPG map when it is scheduled
326+
func (pgMgr *PodGroupManager) Reserve(ctx context.Context, pod *corev1.Pod) {
327+
pgFullName, _ := pgMgr.GetPodGroup(ctx, pod)
328+
if pgFullName == "" {
329+
return
330+
}
331+
pgMgr.RWMutex.Lock()
332+
defer pgMgr.RWMutex.Unlock()
333+
if assigned, exist := pgMgr.assignedPodsByPG[pgFullName]; exist {
334+
assigned.Insert(pod.Name)
335+
} else {
336+
pgMgr.assignedPodsByPG[pgFullName] = sets.New(pod.Name)
337+
}
338+
}
339+
315340
// Unreserve invalidates assigned pod from assignedPodsByPG when schedule or bind failed.
316341
func (pgMgr *PodGroupManager) Unreserve(ctx context.Context, pod *corev1.Pod) {
317342
pgFullName, _ := pgMgr.GetPodGroup(ctx, pod)

pkg/coscheduling/core/core_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -266,13 +266,13 @@ func TestPermit(t *testing.T) {
266266
informerFactory := informers.NewSharedInformerFactory(cs, 0)
267267
podInformer := informerFactory.Core().V1().Pods()
268268

269-
pgMgr := NewPodGroupManager(client, tu.NewFakeSharedLister(tt.existingPods, nodes), &scheduleTimeout, podInformer)
269+
pgMgr := NewPodGroupManager(ctx, client, tu.NewFakeSharedLister(tt.existingPods, nodes), &scheduleTimeout, podInformer)
270270

271271
informerFactory.Start(ctx.Done())
272272
if !clicache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced) {
273273
t.Fatal("WaitForCacheSync failed")
274274
}
275-
addFunc := AddPodFactory(pgMgr)
275+
addFunc := AddPodFactory(ctx, pgMgr)
276276
for _, p := range tt.existingPods {
277277
podInformer.Informer().GetStore().Add(p)
278278
// we call add func here because we can not ensure existing pods are added before premit are called

pkg/coscheduling/coscheduling.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ func New(ctx context.Context, obj runtime.Object, handle framework.Handle) (fram
8484

8585
scheduleTimeDuration := time.Duration(args.PermitWaitingTimeSeconds) * time.Second
8686
pgMgr := core.NewPodGroupManager(
87+
ctx,
8788
client,
8889
handle.SnapshotSharedLister(),
8990
&scheduleTimeDuration,
@@ -246,6 +247,7 @@ func (cs *Coscheduling) Permit(ctx context.Context, state *framework.CycleState,
246247

247248
// Reserve is the functions invoked by the framework at "reserve" extension point.
248249
func (cs *Coscheduling) Reserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
250+
cs.pgMgr.Reserve(ctx, pod)
249251
return nil
250252
}
251253

pkg/coscheduling/coscheduling_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ func TestPodGroupBackoffTime(t *testing.T) {
114114
}
115115

116116
pgMgr := core.NewPodGroupManager(
117+
ctx,
117118
client,
118119
tu.NewFakeSharedLister(tt.pods, nodes),
119120
// In this UT, 5 seconds should suffice to test the PreFilter's return code.
@@ -421,7 +422,7 @@ func TestLess(t *testing.T) {
421422
informerFactory := informers.NewSharedInformerFactory(cs, 0)
422423
podInformer := informerFactory.Core().V1().Pods()
423424

424-
pl := &Coscheduling{pgMgr: core.NewPodGroupManager(client, nil, nil, podInformer)}
425+
pl := &Coscheduling{pgMgr: core.NewPodGroupManager(ctx, client, nil, nil, podInformer)}
425426

426427
informerFactory.Start(ctx.Done())
427428
if !clicache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced) {
@@ -516,7 +517,7 @@ func TestPermit(t *testing.T) {
516517

517518
pl := &Coscheduling{
518519
frameworkHandler: f,
519-
pgMgr: core.NewPodGroupManager(client, tu.NewFakeSharedLister(nil, nodes), nil, podInformer),
520+
pgMgr: core.NewPodGroupManager(ctx, client, tu.NewFakeSharedLister(nil, nodes), nil, podInformer),
520521
scheduleTimeout: &scheduleTimeout,
521522
}
522523

@@ -623,6 +624,7 @@ func TestPostFilter(t *testing.T) {
623624
informerFactory := informers.NewSharedInformerFactory(cs, 0)
624625
podInformer := informerFactory.Core().V1().Pods()
625626
pgMgr := core.NewPodGroupManager(
627+
ctx,
626628
client,
627629
tu.NewFakeSharedLister(tt.existingPods, nodes),
628630
&scheduleTimeout,
@@ -638,7 +640,7 @@ func TestPostFilter(t *testing.T) {
638640
if !clicache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced) {
639641
t.Fatal("WaitForCacheSync failed")
640642
}
641-
addFunc := core.AddPodFactory(pgMgr)
643+
addFunc := core.AddPodFactory(ctx, pgMgr)
642644
for _, p := range tt.existingPods {
643645
podInformer.Informer().GetStore().Add(p)
644646
// we call add func here because we can not ensure existing pods are added before premit are called

0 commit comments

Comments
 (0)