Skip to content

Commit 364bdee

Browse files
Backport of: scheduler: skip setting preferred node when nodepool changes (#26662) (#26916)
Preferred node is used when a task group has an ephemeral disk, so we ideally stay on the same node. However if the jobs node pool changes, we should not select the current node as the preferred node, and let the scheduler decide which node to pick from the correct node pool.
1 parent df87a2b commit 364bdee

File tree

3 files changed

+237
-75
lines changed

3 files changed

+237
-75
lines changed

.changelog/26662.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:bug
2+
scheduler: fixes a bug selecting nodes for updated jobs with ephemeral disks when nodepool changes
3+
```

scheduler/generic_sched.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package scheduler
66
import (
77
"fmt"
88
"runtime/debug"
9+
"slices"
910
"sort"
1011
"time"
1112

@@ -886,6 +887,17 @@ func (s *GenericScheduler) findPreferredNode(place placementResult) (*structs.No
886887
if prev == nil {
887888
return nil, nil
888889
}
890+
891+
// when a jobs nodepool or datacenter are updated, we should ignore setting a preferred node
892+
// even if a task has ephemeral disk, as this would bypass the normal nodepool/datacenter node
893+
// selection logic, which would result in the alloc being place incorrectly.
894+
if prev.Job != nil && prev.Job.NodePool != s.job.NodePool {
895+
return nil, nil
896+
}
897+
if !slices.Equal(prev.Job.Datacenters, s.job.Datacenters) {
898+
return nil, nil
899+
}
900+
889901
if place.TaskGroup().EphemeralDisk.Sticky || place.TaskGroup().EphemeralDisk.Migrate {
890902
var preferredNode *structs.Node
891903
ws := memdb.NewWatchSet()

scheduler/generic_sched_test.go

Lines changed: 222 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -122,98 +122,245 @@ func TestServiceSched_JobRegister(t *testing.T) {
122122
h.AssertEvalStatus(t, structs.EvalStatusComplete)
123123
}
124124

125-
func TestServiceSched_JobRegister_StickyAllocs(t *testing.T) {
125+
func TestServiceSched_JobRegister_EphemeralDisk(t *testing.T) {
126126
ci.Parallel(t)
127127

128-
h := NewHarness(t)
128+
createEphemeralJob := func(t *testing.T, h *Harness, sticky, migrate bool) *structs.Job {
129+
job := mock.Job()
130+
job.TaskGroups[0].EphemeralDisk.Sticky = sticky
131+
job.TaskGroups[0].EphemeralDisk.Migrate = migrate
132+
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
129133

130-
// Create some nodes
131-
for i := 0; i < 10; i++ {
134+
// Create a mock evaluation to register the job
135+
eval := &structs.Evaluation{
136+
Namespace: structs.DefaultNamespace,
137+
ID: uuid.Generate(),
138+
Priority: job.Priority,
139+
TriggeredBy: structs.EvalTriggerJobRegister,
140+
JobID: job.ID,
141+
Status: structs.EvalStatusPending,
142+
}
143+
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
144+
145+
// Process the evaluation
146+
must.NoError(t, h.Process(NewServiceScheduler, eval))
147+
148+
return job
149+
}
150+
151+
t.Run("sticky ephemeral allocs in same node pool does not change nodes", func(t *testing.T) {
152+
h := NewHarness(t)
153+
154+
// Create some nodes
155+
for range 10 {
156+
node := mock.Node()
157+
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
158+
}
159+
160+
// create a job
161+
job := createEphemeralJob(t, h, true, false)
162+
163+
// Ensure the plan allocated
164+
plan := h.Plans[0]
165+
planned := make(map[string]*structs.Allocation)
166+
for _, allocList := range plan.NodeAllocation {
167+
for _, alloc := range allocList {
168+
planned[alloc.ID] = alloc
169+
}
170+
}
171+
must.MapLen(t, 10, planned)
172+
173+
// Update the job to force a rolling upgrade
174+
updated := job.Copy()
175+
updated.TaskGroups[0].Tasks[0].Resources.CPU += 10
176+
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, updated))
177+
178+
// Create a mock evaluation to handle the update
179+
eval := &structs.Evaluation{
180+
Namespace: structs.DefaultNamespace,
181+
ID: uuid.Generate(),
182+
Priority: job.Priority,
183+
TriggeredBy: structs.EvalTriggerNodeUpdate,
184+
JobID: job.ID,
185+
Status: structs.EvalStatusPending,
186+
}
187+
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
188+
h1 := NewHarnessWithState(t, h.State)
189+
must.NoError(t, h1.Process(NewServiceScheduler, eval))
190+
191+
// Ensure we have created only one new allocation
192+
// Ensure a single plan
193+
must.SliceLen(t, 1, h1.Plans)
194+
195+
plan = h1.Plans[0]
196+
var newPlanned []*structs.Allocation
197+
for _, allocList := range plan.NodeAllocation {
198+
newPlanned = append(newPlanned, allocList...)
199+
}
200+
must.SliceLen(t, 10, newPlanned)
201+
// Ensure that the new allocations were placed on the same node as the older
202+
// ones
203+
for _, new := range newPlanned {
204+
// new alloc should have a previous allocation
205+
must.NotEq(t, new.PreviousAllocation, "")
206+
207+
// new allocs PreviousAllocation must be a valid previously placed alloc
208+
old, ok := planned[new.PreviousAllocation]
209+
must.True(t, ok)
210+
211+
// new alloc should be placed in the same node
212+
must.Eq(t, new.NodeID, old.NodeID)
213+
}
214+
})
215+
216+
t.Run("ephemeral alloc should migrate if node pool changes", func(t *testing.T) {
217+
h := NewHarness(t)
218+
219+
// Create some nodes
220+
for range 5 {
221+
node := mock.Node()
222+
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
223+
}
224+
225+
testNodePool := "test"
132226
node := mock.Node()
227+
node.NodePool = testNodePool
133228
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
134-
}
135229

136-
// Create a job
137-
job := mock.Job()
138-
job.TaskGroups[0].EphemeralDisk.Sticky = true
139-
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
230+
// Create test node pools with different scheduler algorithms.
231+
testPool := mock.NodePool()
232+
testPool.Name = "test"
140233

141-
// Create a mock evaluation to register the job
142-
eval := &structs.Evaluation{
143-
Namespace: structs.DefaultNamespace,
144-
ID: uuid.Generate(),
145-
Priority: job.Priority,
146-
TriggeredBy: structs.EvalTriggerJobRegister,
147-
JobID: job.ID,
148-
Status: structs.EvalStatusPending,
149-
}
150-
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
234+
nodePools := []*structs.NodePool{
235+
testPool,
236+
}
237+
h.State.UpsertNodePools(structs.MsgTypeTestSetup, h.NextIndex(), nodePools)
151238

152-
// Process the evaluation
153-
if err := h.Process(NewServiceScheduler, eval); err != nil {
154-
t.Fatalf("err: %v", err)
155-
}
239+
// Create a job
240+
job := createEphemeralJob(t, h, true, true)
156241

157-
// Ensure the plan allocated
158-
plan := h.Plans[0]
159-
planned := make(map[string]*structs.Allocation)
160-
for _, allocList := range plan.NodeAllocation {
161-
for _, alloc := range allocList {
162-
planned[alloc.ID] = alloc
242+
// Ensure the plan allocated
243+
plan := h.Plans[0]
244+
planned := make(map[string]*structs.Allocation)
245+
for _, allocList := range plan.NodeAllocation {
246+
for _, alloc := range allocList {
247+
planned[alloc.ID] = alloc
248+
}
163249
}
164-
}
165-
if len(planned) != 10 {
166-
t.Fatalf("bad: %#v", plan)
167-
}
250+
must.MapLen(t, 10, planned)
168251

169-
// Update the job to force a rolling upgrade
170-
updated := job.Copy()
171-
updated.TaskGroups[0].Tasks[0].Resources.CPU += 10
172-
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, updated))
252+
// Update the job to force a rolling upgrade
253+
updated := job.Copy()
254+
updated.NodePool = "test"
255+
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, updated))
173256

174-
// Create a mock evaluation to handle the update
175-
eval = &structs.Evaluation{
176-
Namespace: structs.DefaultNamespace,
177-
ID: uuid.Generate(),
178-
Priority: job.Priority,
179-
TriggeredBy: structs.EvalTriggerNodeUpdate,
180-
JobID: job.ID,
181-
Status: structs.EvalStatusPending,
182-
}
183-
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
184-
h1 := NewHarnessWithState(t, h.State)
185-
if err := h1.Process(NewServiceScheduler, eval); err != nil {
186-
t.Fatalf("err: %v", err)
187-
}
257+
// Create a mock evaluation to handle the update
258+
eval := &structs.Evaluation{
259+
Namespace: structs.DefaultNamespace,
260+
ID: uuid.Generate(),
261+
Priority: job.Priority,
262+
TriggeredBy: structs.EvalTriggerNodeUpdate,
263+
JobID: job.ID,
264+
Status: structs.EvalStatusPending,
265+
}
266+
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
267+
h1 := NewHarnessWithState(t, h.State)
268+
must.NoError(t, h1.Process(NewServiceScheduler, eval))
188269

189-
// Ensure we have created only one new allocation
190-
// Ensure a single plan
191-
if len(h1.Plans) != 1 {
192-
t.Fatalf("bad: %#v", h1.Plans)
193-
}
194-
plan = h1.Plans[0]
195-
var newPlanned []*structs.Allocation
196-
for _, allocList := range plan.NodeAllocation {
197-
newPlanned = append(newPlanned, allocList...)
198-
}
199-
if len(newPlanned) != 10 {
200-
t.Fatalf("bad plan: %#v", plan)
201-
}
202-
// Ensure that the new allocations were placed on the same node as the older
203-
// ones
204-
for _, new := range newPlanned {
205-
if new.PreviousAllocation == "" {
206-
t.Fatalf("new alloc %q doesn't have a previous allocation", new.ID)
270+
// Ensure we have created only one new allocation
271+
// Ensure a single plan
272+
must.SliceLen(t, 1, h1.Plans)
273+
274+
plan = h1.Plans[0]
275+
var newPlanned []*structs.Allocation
276+
for _, allocList := range plan.NodeAllocation {
277+
newPlanned = append(newPlanned, allocList...)
207278
}
279+
must.SliceLen(t, 10, newPlanned)
280+
281+
// ensure new allocation has expected fields
282+
for _, new := range newPlanned {
283+
// new alloc should have a previous allocation
284+
must.NotEq(t, new.PreviousAllocation, "")
285+
286+
// new allocs PreviousAllocation must be a valid previously placed alloc
287+
_, ok := planned[new.PreviousAllocation]
288+
must.True(t, ok)
208289

209-
old, ok := planned[new.PreviousAllocation]
210-
if !ok {
211-
t.Fatalf("new alloc %q previous allocation doesn't match any prior placed alloc (%q)", new.ID, new.PreviousAllocation)
290+
// new alloc should be placed in the correct node pool
291+
must.Eq(t, new.Job.NodePool, testNodePool)
212292
}
213-
if new.NodeID != old.NodeID {
214-
t.Fatalf("new alloc and old alloc node doesn't match; got %q; want %q", new.NodeID, old.NodeID)
293+
})
294+
295+
t.Run("ephemeral alloc should migrate if datacenter changes", func(t *testing.T) {
296+
h := NewHarness(t)
297+
298+
// Create some nodes
299+
for range 5 {
300+
node := mock.Node()
301+
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
215302
}
216-
}
303+
304+
testDatacenter := "test"
305+
node := mock.Node()
306+
node.Datacenter = testDatacenter
307+
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
308+
309+
// Create a job
310+
job := createEphemeralJob(t, h, true, true)
311+
312+
// Ensure the plan allocated
313+
plan := h.Plans[0]
314+
planned := make(map[string]*structs.Allocation)
315+
for _, allocList := range plan.NodeAllocation {
316+
for _, alloc := range allocList {
317+
planned[alloc.ID] = alloc
318+
}
319+
}
320+
must.MapLen(t, 10, planned)
321+
322+
// Update the job to force a rolling upgrade
323+
updated := job.Copy()
324+
updated.Datacenters = []string{"test"}
325+
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, updated))
326+
327+
// Create a mock evaluation to handle the update
328+
eval := &structs.Evaluation{
329+
Namespace: structs.DefaultNamespace,
330+
ID: uuid.Generate(),
331+
Priority: job.Priority,
332+
TriggeredBy: structs.EvalTriggerNodeUpdate,
333+
JobID: job.ID,
334+
Status: structs.EvalStatusPending,
335+
}
336+
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
337+
h1 := NewHarnessWithState(t, h.State)
338+
must.NoError(t, h1.Process(NewServiceScheduler, eval))
339+
340+
// Ensure we have created only one new allocation
341+
// Ensure a single plan
342+
must.SliceLen(t, 1, h1.Plans)
343+
344+
plan = h1.Plans[0]
345+
var newPlanned []*structs.Allocation
346+
for _, allocList := range plan.NodeAllocation {
347+
newPlanned = append(newPlanned, allocList...)
348+
}
349+
must.SliceLen(t, 10, newPlanned)
350+
351+
// ensure new allocation has expected fields
352+
for _, new := range newPlanned {
353+
// new alloc should have a previous allocation
354+
must.NotEq(t, new.PreviousAllocation, "")
355+
356+
// new allocs PreviousAllocation must be a valid previously placed alloc
357+
_, ok := planned[new.PreviousAllocation]
358+
must.True(t, ok)
359+
360+
// new alloc should be placed in the correct node pool
361+
must.Eq(t, new.NodeID, node.ID)
362+
}
363+
})
217364
}
218365

219366
func TestServiceSched_JobRegister_StickyHostVolumes(t *testing.T) {

0 commit comments

Comments
 (0)