Skip to content

Commit a825ee3

Browse files
scheduler: skip setting preferred node when nodepool changes (#26662)
Preferred node is used when a task group has an ephemeral disk, so we ideally stay on the same node. However if the jobs node pool changes, we should not select the current node as the preferred node, and let the scheduler decide which node to pick from the correct node pool.
1 parent 933f12b commit a825ee3

File tree

3 files changed

+237
-75
lines changed

3 files changed

+237
-75
lines changed

.changelog/26662.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:bug
2+
scheduler: fixes a bug selecting nodes for updated jobs with ephemeral disks when nodepool changes
3+
```

scheduler/generic_sched.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package scheduler
66
import (
77
"fmt"
88
"runtime/debug"
9+
"slices"
910
"sort"
1011
"time"
1112

@@ -860,6 +861,17 @@ func (s *GenericScheduler) findPreferredNode(place reconciler.PlacementResult) (
860861
if prev == nil {
861862
return nil, nil
862863
}
864+
865+
// when a jobs nodepool or datacenter are updated, we should ignore setting a preferred node
866+
// even if a task has ephemeral disk, as this would bypass the normal nodepool/datacenter node
867+
// selection logic, which would result in the alloc being place incorrectly.
868+
if prev.Job != nil && prev.Job.NodePool != s.job.NodePool {
869+
return nil, nil
870+
}
871+
if !slices.Equal(prev.Job.Datacenters, s.job.Datacenters) {
872+
return nil, nil
873+
}
874+
863875
if place.TaskGroup().EphemeralDisk.Sticky || place.TaskGroup().EphemeralDisk.Migrate {
864876
var preferredNode *structs.Node
865877
ws := memdb.NewWatchSet()

scheduler/generic_sched_test.go

Lines changed: 222 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -127,98 +127,245 @@ func TestServiceSched_JobRegister(t *testing.T) {
127127
h.AssertEvalStatus(t, structs.EvalStatusComplete)
128128
}
129129

130-
func TestServiceSched_JobRegister_StickyAllocs(t *testing.T) {
130+
func TestServiceSched_JobRegister_EphemeralDisk(t *testing.T) {
131131
ci.Parallel(t)
132132

133-
h := tests.NewHarness(t)
133+
createEphemeralJob := func(t *testing.T, h *tests.Harness, sticky, migrate bool) *structs.Job {
134+
job := mock.Job()
135+
job.TaskGroups[0].EphemeralDisk.Sticky = sticky
136+
job.TaskGroups[0].EphemeralDisk.Migrate = migrate
137+
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
134138

135-
// Create some nodes
136-
for i := 0; i < 10; i++ {
139+
// Create a mock evaluation to register the job
140+
eval := &structs.Evaluation{
141+
Namespace: structs.DefaultNamespace,
142+
ID: uuid.Generate(),
143+
Priority: job.Priority,
144+
TriggeredBy: structs.EvalTriggerJobRegister,
145+
JobID: job.ID,
146+
Status: structs.EvalStatusPending,
147+
}
148+
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
149+
150+
// Process the evaluation
151+
must.NoError(t, h.Process(NewServiceScheduler, eval))
152+
153+
return job
154+
}
155+
156+
t.Run("sticky ephemeral allocs in same node pool does not change nodes", func(t *testing.T) {
157+
h := tests.NewHarness(t)
158+
159+
// Create some nodes
160+
for range 10 {
161+
node := mock.Node()
162+
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
163+
}
164+
165+
// create a job
166+
job := createEphemeralJob(t, h, true, false)
167+
168+
// Ensure the plan allocated
169+
plan := h.Plans[0]
170+
planned := make(map[string]*structs.Allocation)
171+
for _, allocList := range plan.NodeAllocation {
172+
for _, alloc := range allocList {
173+
planned[alloc.ID] = alloc
174+
}
175+
}
176+
must.MapLen(t, 10, planned)
177+
178+
// Update the job to force a rolling upgrade
179+
updated := job.Copy()
180+
updated.TaskGroups[0].Tasks[0].Resources.CPU += 10
181+
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, updated))
182+
183+
// Create a mock evaluation to handle the update
184+
eval := &structs.Evaluation{
185+
Namespace: structs.DefaultNamespace,
186+
ID: uuid.Generate(),
187+
Priority: job.Priority,
188+
TriggeredBy: structs.EvalTriggerNodeUpdate,
189+
JobID: job.ID,
190+
Status: structs.EvalStatusPending,
191+
}
192+
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
193+
h1 := tests.NewHarnessWithState(t, h.State)
194+
must.NoError(t, h1.Process(NewServiceScheduler, eval))
195+
196+
// Ensure we have created only one new allocation
197+
// Ensure a single plan
198+
must.SliceLen(t, 1, h1.Plans)
199+
200+
plan = h1.Plans[0]
201+
var newPlanned []*structs.Allocation
202+
for _, allocList := range plan.NodeAllocation {
203+
newPlanned = append(newPlanned, allocList...)
204+
}
205+
must.SliceLen(t, 10, newPlanned)
206+
// Ensure that the new allocations were placed on the same node as the older
207+
// ones
208+
for _, new := range newPlanned {
209+
// new alloc should have a previous allocation
210+
must.NotEq(t, new.PreviousAllocation, "")
211+
212+
// new allocs PreviousAllocation must be a valid previously placed alloc
213+
old, ok := planned[new.PreviousAllocation]
214+
must.True(t, ok)
215+
216+
// new alloc should be placed in the same node
217+
must.Eq(t, new.NodeID, old.NodeID)
218+
}
219+
})
220+
221+
t.Run("ephemeral alloc should migrate if node pool changes", func(t *testing.T) {
222+
h := tests.NewHarness(t)
223+
224+
// Create some nodes
225+
for range 5 {
226+
node := mock.Node()
227+
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
228+
}
229+
230+
testNodePool := "test"
137231
node := mock.Node()
232+
node.NodePool = testNodePool
138233
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
139-
}
140234

141-
// Create a job
142-
job := mock.Job()
143-
job.TaskGroups[0].EphemeralDisk.Sticky = true
144-
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
235+
// Create test node pools with different scheduler algorithms.
236+
testPool := mock.NodePool()
237+
testPool.Name = "test"
145238

146-
// Create a mock evaluation to register the job
147-
eval := &structs.Evaluation{
148-
Namespace: structs.DefaultNamespace,
149-
ID: uuid.Generate(),
150-
Priority: job.Priority,
151-
TriggeredBy: structs.EvalTriggerJobRegister,
152-
JobID: job.ID,
153-
Status: structs.EvalStatusPending,
154-
}
155-
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
239+
nodePools := []*structs.NodePool{
240+
testPool,
241+
}
242+
h.State.UpsertNodePools(structs.MsgTypeTestSetup, h.NextIndex(), nodePools)
156243

157-
// Process the evaluation
158-
if err := h.Process(NewServiceScheduler, eval); err != nil {
159-
t.Fatalf("err: %v", err)
160-
}
244+
// Create a job
245+
job := createEphemeralJob(t, h, true, true)
161246

162-
// Ensure the plan allocated
163-
plan := h.Plans[0]
164-
planned := make(map[string]*structs.Allocation)
165-
for _, allocList := range plan.NodeAllocation {
166-
for _, alloc := range allocList {
167-
planned[alloc.ID] = alloc
247+
// Ensure the plan allocated
248+
plan := h.Plans[0]
249+
planned := make(map[string]*structs.Allocation)
250+
for _, allocList := range plan.NodeAllocation {
251+
for _, alloc := range allocList {
252+
planned[alloc.ID] = alloc
253+
}
168254
}
169-
}
170-
if len(planned) != 10 {
171-
t.Fatalf("bad: %#v", plan)
172-
}
255+
must.MapLen(t, 10, planned)
173256

174-
// Update the job to force a rolling upgrade
175-
updated := job.Copy()
176-
updated.TaskGroups[0].Tasks[0].Resources.CPU += 10
177-
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, updated))
257+
// Update the job to force a rolling upgrade
258+
updated := job.Copy()
259+
updated.NodePool = "test"
260+
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, updated))
178261

179-
// Create a mock evaluation to handle the update
180-
eval = &structs.Evaluation{
181-
Namespace: structs.DefaultNamespace,
182-
ID: uuid.Generate(),
183-
Priority: job.Priority,
184-
TriggeredBy: structs.EvalTriggerNodeUpdate,
185-
JobID: job.ID,
186-
Status: structs.EvalStatusPending,
187-
}
188-
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
189-
h1 := tests.NewHarnessWithState(t, h.State)
190-
if err := h1.Process(NewServiceScheduler, eval); err != nil {
191-
t.Fatalf("err: %v", err)
192-
}
262+
// Create a mock evaluation to handle the update
263+
eval := &structs.Evaluation{
264+
Namespace: structs.DefaultNamespace,
265+
ID: uuid.Generate(),
266+
Priority: job.Priority,
267+
TriggeredBy: structs.EvalTriggerNodeUpdate,
268+
JobID: job.ID,
269+
Status: structs.EvalStatusPending,
270+
}
271+
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
272+
h1 := tests.NewHarnessWithState(t, h.State)
273+
must.NoError(t, h1.Process(NewServiceScheduler, eval))
193274

194-
// Ensure we have created only one new allocation
195-
// Ensure a single plan
196-
if len(h1.Plans) != 1 {
197-
t.Fatalf("bad: %#v", h1.Plans)
198-
}
199-
plan = h1.Plans[0]
200-
var newPlanned []*structs.Allocation
201-
for _, allocList := range plan.NodeAllocation {
202-
newPlanned = append(newPlanned, allocList...)
203-
}
204-
if len(newPlanned) != 10 {
205-
t.Fatalf("bad plan: %#v", plan)
206-
}
207-
// Ensure that the new allocations were placed on the same node as the older
208-
// ones
209-
for _, new := range newPlanned {
210-
if new.PreviousAllocation == "" {
211-
t.Fatalf("new alloc %q doesn't have a previous allocation", new.ID)
275+
// Ensure we have created only one new allocation
276+
// Ensure a single plan
277+
must.SliceLen(t, 1, h1.Plans)
278+
279+
plan = h1.Plans[0]
280+
var newPlanned []*structs.Allocation
281+
for _, allocList := range plan.NodeAllocation {
282+
newPlanned = append(newPlanned, allocList...)
212283
}
284+
must.SliceLen(t, 10, newPlanned)
285+
286+
// ensure new allocation has expected fields
287+
for _, new := range newPlanned {
288+
// new alloc should have a previous allocation
289+
must.NotEq(t, new.PreviousAllocation, "")
290+
291+
// new allocs PreviousAllocation must be a valid previously placed alloc
292+
_, ok := planned[new.PreviousAllocation]
293+
must.True(t, ok)
213294

214-
old, ok := planned[new.PreviousAllocation]
215-
if !ok {
216-
t.Fatalf("new alloc %q previous allocation doesn't match any prior placed alloc (%q)", new.ID, new.PreviousAllocation)
295+
// new alloc should be placed in the correct node pool
296+
must.Eq(t, new.Job.NodePool, testNodePool)
217297
}
218-
if new.NodeID != old.NodeID {
219-
t.Fatalf("new alloc and old alloc node doesn't match; got %q; want %q", new.NodeID, old.NodeID)
298+
})
299+
300+
t.Run("ephemeral alloc should migrate if datacenter changes", func(t *testing.T) {
301+
h := tests.NewHarness(t)
302+
303+
// Create some nodes
304+
for range 5 {
305+
node := mock.Node()
306+
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
220307
}
221-
}
308+
309+
testDatacenter := "test"
310+
node := mock.Node()
311+
node.Datacenter = testDatacenter
312+
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
313+
314+
// Create a job
315+
job := createEphemeralJob(t, h, true, true)
316+
317+
// Ensure the plan allocated
318+
plan := h.Plans[0]
319+
planned := make(map[string]*structs.Allocation)
320+
for _, allocList := range plan.NodeAllocation {
321+
for _, alloc := range allocList {
322+
planned[alloc.ID] = alloc
323+
}
324+
}
325+
must.MapLen(t, 10, planned)
326+
327+
// Update the job to force a rolling upgrade
328+
updated := job.Copy()
329+
updated.Datacenters = []string{"test"}
330+
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, updated))
331+
332+
// Create a mock evaluation to handle the update
333+
eval := &structs.Evaluation{
334+
Namespace: structs.DefaultNamespace,
335+
ID: uuid.Generate(),
336+
Priority: job.Priority,
337+
TriggeredBy: structs.EvalTriggerNodeUpdate,
338+
JobID: job.ID,
339+
Status: structs.EvalStatusPending,
340+
}
341+
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
342+
h1 := tests.NewHarnessWithState(t, h.State)
343+
must.NoError(t, h1.Process(NewServiceScheduler, eval))
344+
345+
// Ensure we have created only one new allocation
346+
// Ensure a single plan
347+
must.SliceLen(t, 1, h1.Plans)
348+
349+
plan = h1.Plans[0]
350+
var newPlanned []*structs.Allocation
351+
for _, allocList := range plan.NodeAllocation {
352+
newPlanned = append(newPlanned, allocList...)
353+
}
354+
must.SliceLen(t, 10, newPlanned)
355+
356+
// ensure new allocation has expected fields
357+
for _, new := range newPlanned {
358+
// new alloc should have a previous allocation
359+
must.NotEq(t, new.PreviousAllocation, "")
360+
361+
// new allocs PreviousAllocation must be a valid previously placed alloc
362+
_, ok := planned[new.PreviousAllocation]
363+
must.True(t, ok)
364+
365+
// new alloc should be placed in the correct node pool
366+
must.Eq(t, new.NodeID, node.ID)
367+
}
368+
})
222369
}
223370

224371
func TestServiceSched_JobRegister_StickyHostVolumes(t *testing.T) {

0 commit comments

Comments
 (0)