Skip to content

Commit 6305d0f

Browse files
committed
[Refactor] ♻️ Avoid Unlock Prematurely When Apply Wrong at Resource NoEnough Which Can Lead to Concurrency Issues #5
1 parent 12a412f commit 6305d0f

File tree

4 files changed

+32
-12
lines changed

4 files changed

+32
-12
lines changed

internal/schedulers/cpuscheduler.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ func (cs *cpuScheduler) Apply(num int) (string, error) {
8080
}
8181

8282
cs.Lock()
83+
defer cs.Unlock()
8384

8485
keys := make([]int, 0, len(cs.CpuStatusMap))
8586
for k := range cs.CpuStatusMap {
@@ -103,29 +104,36 @@ func (cs *cpuScheduler) Apply(num int) (string, error) {
103104
}
104105

105106
if len(applyCpus) < num {
106-
cs.Unlock()
107-
cs.Restore(applyCpus)
107+
cs.restore(applyCpus)
108108
return "", xerrors.NewCpuNotEnoughError()
109109
}
110110

111111
cpuSet := strings.Trim(strings.Join(applyCpus, ","), ",")
112112

113-
cs.Unlock()
114113
go cs.putToEtcd()
115114

116115
return cpuSet, nil
117116
}
118117

119118
func (cs *cpuScheduler) Restore(cpuSet []string) error {
119+
120120
cs.Lock()
121121
defer cs.Unlock()
122122

123+
err := cs.restore(cpuSet)
124+
if err != nil {
125+
return errors.Wrap(err, "restore failed")
126+
}
127+
go cs.putToEtcd()
128+
129+
return nil
130+
}
131+
132+
func (cs *cpuScheduler) restore(cpuSet []string) error {
123133
for _, cpu := range cpuSet {
124134
cs.CpuStatusMap[cpu] = 0
125135
}
126136

127-
go cs.putToEtcd()
128-
129137
return nil
130138
}
131139

internal/schedulers/gpuscheduler.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ func (gs *gpuScheduler) Apply(num int) ([]string, error) {
8686
}
8787

8888
gs.Lock()
89+
defer gs.Unlock()
8990

9091
var availableGpus []string
9192
for k, v := range gs.GpuStatusMap {
@@ -99,12 +100,10 @@ func (gs *gpuScheduler) Apply(num int) ([]string, error) {
99100
}
100101

101102
if len(availableGpus) < num {
102-
gs.Unlock()
103-
gs.Restore(availableGpus)
103+
gs.restore(availableGpus)
104104
return nil, xerrors.NewGpuNotEnoughError()
105105
}
106106

107-
gs.Unlock()
108107
go gs.putToEtcd()
109108

110109
return availableGpus, nil
@@ -119,11 +118,19 @@ func (gs *gpuScheduler) Restore(gpus []string) {
119118
gs.Lock()
120119
defer gs.Unlock()
121120

121+
gs.restore(gpus)
122+
123+
go gs.putToEtcd()
124+
}
125+
126+
func (gs *gpuScheduler) restore(gpus []string) {
127+
if len(gpus) <= 0 || len(gpus) > gs.AvailableGpuNums {
128+
return
129+
}
130+
122131
for _, gpu := range gpus {
123132
gs.GpuStatusMap[gpu] = 0
124133
}
125-
126-
go gs.putToEtcd()
127134
}
128135

129136
func (gs *gpuScheduler) serialize() *string {

internal/schedulers/portscheduler.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,11 +113,15 @@ func (ps *portScheduler) Restore(ports []string) {
113113
ps.Lock()
114114
defer ps.Unlock()
115115

116+
ps.restore(ports)
117+
118+
go ps.putToEtcd()
119+
}
120+
121+
func (ps *portScheduler) restore(ports []string) {
116122
for _, port := range ports {
117123
delete(ps.UsedPortSet, port)
118124
}
119-
120-
go ps.putToEtcd()
121125
}
122126

123127
func (ps *portScheduler) serialize() *string {

internal/schedulers/scheduler.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ type Scheduler interface {
55
Restore([]string)
66
serialize() *string
77
putToEtcd()
8+
restore([]string)
89
}

0 commit comments

Comments
 (0)