diff --git a/.changelog/26662.txt b/.changelog/26662.txt new file mode 100644 index 00000000000..c1ad01f2914 --- /dev/null +++ b/.changelog/26662.txt @@ -0,0 +1,3 @@ +```release-note:bug +scheduler: fixes a bug selecting nodes for updated jobs with ephemeral disks when nodepool changes +``` diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 8bdd13e1b53..a5d85c889b7 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -6,6 +6,7 @@ package scheduler import ( "fmt" "runtime/debug" + "slices" "sort" "time" @@ -860,6 +861,17 @@ func (s *GenericScheduler) findPreferredNode(place reconciler.PlacementResult) ( if prev == nil { return nil, nil } + + // when a jobs nodepool or datacenter are updated, we should ignore setting a preferred node + // even if a task has ephemeral disk, as this would bypass the normal nodepool/datacenter node + // selection logic, which would result in the alloc being place incorrectly. + if prev.Job != nil && prev.Job.NodePool != s.job.NodePool { + return nil, nil + } + if !slices.Equal(prev.Job.Datacenters, s.job.Datacenters) { + return nil, nil + } + if place.TaskGroup().EphemeralDisk.Sticky || place.TaskGroup().EphemeralDisk.Migrate { var preferredNode *structs.Node ws := memdb.NewWatchSet() diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 0c6b88ab5e7..6557742b142 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -127,98 +127,245 @@ func TestServiceSched_JobRegister(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } -func TestServiceSched_JobRegister_StickyAllocs(t *testing.T) { +func TestServiceSched_JobRegister_EphemeralDisk(t *testing.T) { ci.Parallel(t) - h := tests.NewHarness(t) + createEphemeralJob := func(t *testing.T, h *tests.Harness, sticky, migrate bool) *structs.Job { + job := mock.Job() + job.TaskGroups[0].EphemeralDisk.Sticky = sticky + job.TaskGroups[0].EphemeralDisk.Migrate = migrate + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job)) - // Create some nodes - for i := 0; i < 10; i++ { + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + must.NoError(t, h.Process(NewServiceScheduler, eval)) + + return job + } + + t.Run("sticky ephemeral allocs in same node pool does not change nodes", func(t *testing.T) { + h := tests.NewHarness(t) + + // Create some nodes + for range 10 { + node := mock.Node() + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) + } + + // create a job + job := createEphemeralJob(t, h, true, false) + + // Ensure the plan allocated + plan := h.Plans[0] + planned := make(map[string]*structs.Allocation) + for _, allocList := range plan.NodeAllocation { + for _, alloc := range allocList { + planned[alloc.ID] = alloc + } + } + must.MapLen(t, 10, planned) + + // Update the job to force a rolling upgrade + updated := job.Copy() + updated.TaskGroups[0].Tasks[0].Resources.CPU += 10 + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, updated)) + + // Create a mock evaluation to handle the update + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerNodeUpdate, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + h1 := tests.NewHarnessWithState(t, h.State) + must.NoError(t, h1.Process(NewServiceScheduler, eval)) + + // Ensure we have created only one new allocation + // Ensure a single plan + must.SliceLen(t, 1, h1.Plans) + + plan = h1.Plans[0] + var newPlanned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + newPlanned = append(newPlanned, allocList...) + } + must.SliceLen(t, 10, newPlanned) + // Ensure that the new allocations were placed on the same node as the older + // ones + for _, new := range newPlanned { + // new alloc should have a previous allocation + must.NotEq(t, new.PreviousAllocation, "") + + // new allocs PreviousAllocation must be a valid previously placed alloc + old, ok := planned[new.PreviousAllocation] + must.True(t, ok) + + // new alloc should be placed in the same node + must.Eq(t, new.NodeID, old.NodeID) + } + }) + + t.Run("ephemeral alloc should migrate if node pool changes", func(t *testing.T) { + h := tests.NewHarness(t) + + // Create some nodes + for range 5 { + node := mock.Node() + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) + } + + testNodePool := "test" node := mock.Node() + node.NodePool = testNodePool must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) - } - // Create a job - job := mock.Job() - job.TaskGroups[0].EphemeralDisk.Sticky = true - must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job)) + // Create test node pools with different scheduler algorithms. + testPool := mock.NodePool() + testPool.Name = "test" - // Create a mock evaluation to register the job - eval := &structs.Evaluation{ - Namespace: structs.DefaultNamespace, - ID: uuid.Generate(), - Priority: job.Priority, - TriggeredBy: structs.EvalTriggerJobRegister, - JobID: job.ID, - Status: structs.EvalStatusPending, - } - must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + nodePools := []*structs.NodePool{ + testPool, + } + h.State.UpsertNodePools(structs.MsgTypeTestSetup, h.NextIndex(), nodePools) - // Process the evaluation - if err := h.Process(NewServiceScheduler, eval); err != nil { - t.Fatalf("err: %v", err) - } + // Create a job + job := createEphemeralJob(t, h, true, true) - // Ensure the plan allocated - plan := h.Plans[0] - planned := make(map[string]*structs.Allocation) - for _, allocList := range plan.NodeAllocation { - for _, alloc := range allocList { - planned[alloc.ID] = alloc + // Ensure the plan allocated + plan := h.Plans[0] + planned := make(map[string]*structs.Allocation) + for _, allocList := range plan.NodeAllocation { + for _, alloc := range allocList { + planned[alloc.ID] = alloc + } } - } - if len(planned) != 10 { - t.Fatalf("bad: %#v", plan) - } + must.MapLen(t, 10, planned) - // Update the job to force a rolling upgrade - updated := job.Copy() - updated.TaskGroups[0].Tasks[0].Resources.CPU += 10 - must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, updated)) + // Update the job to force a rolling upgrade + updated := job.Copy() + updated.NodePool = "test" + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, updated)) - // Create a mock evaluation to handle the update - eval = &structs.Evaluation{ - Namespace: structs.DefaultNamespace, - ID: uuid.Generate(), - Priority: job.Priority, - TriggeredBy: structs.EvalTriggerNodeUpdate, - JobID: job.ID, - Status: structs.EvalStatusPending, - } - must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) - h1 := tests.NewHarnessWithState(t, h.State) - if err := h1.Process(NewServiceScheduler, eval); err != nil { - t.Fatalf("err: %v", err) - } + // Create a mock evaluation to handle the update + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerNodeUpdate, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + h1 := tests.NewHarnessWithState(t, h.State) + must.NoError(t, h1.Process(NewServiceScheduler, eval)) - // Ensure we have created only one new allocation - // Ensure a single plan - if len(h1.Plans) != 1 { - t.Fatalf("bad: %#v", h1.Plans) - } - plan = h1.Plans[0] - var newPlanned []*structs.Allocation - for _, allocList := range plan.NodeAllocation { - newPlanned = append(newPlanned, allocList...) - } - if len(newPlanned) != 10 { - t.Fatalf("bad plan: %#v", plan) - } - // Ensure that the new allocations were placed on the same node as the older - // ones - for _, new := range newPlanned { - if new.PreviousAllocation == "" { - t.Fatalf("new alloc %q doesn't have a previous allocation", new.ID) + // Ensure we have created only one new allocation + // Ensure a single plan + must.SliceLen(t, 1, h1.Plans) + + plan = h1.Plans[0] + var newPlanned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + newPlanned = append(newPlanned, allocList...) } + must.SliceLen(t, 10, newPlanned) + + // ensure new allocation has expected fields + for _, new := range newPlanned { + // new alloc should have a previous allocation + must.NotEq(t, new.PreviousAllocation, "") + + // new allocs PreviousAllocation must be a valid previously placed alloc + _, ok := planned[new.PreviousAllocation] + must.True(t, ok) - old, ok := planned[new.PreviousAllocation] - if !ok { - t.Fatalf("new alloc %q previous allocation doesn't match any prior placed alloc (%q)", new.ID, new.PreviousAllocation) + // new alloc should be placed in the correct node pool + must.Eq(t, new.Job.NodePool, testNodePool) } - if new.NodeID != old.NodeID { - t.Fatalf("new alloc and old alloc node doesn't match; got %q; want %q", new.NodeID, old.NodeID) + }) + + t.Run("ephemeral alloc should migrate if datacenter changes", func(t *testing.T) { + h := tests.NewHarness(t) + + // Create some nodes + for range 5 { + node := mock.Node() + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) } - } + + testDatacenter := "test" + node := mock.Node() + node.Datacenter = testDatacenter + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) + + // Create a job + job := createEphemeralJob(t, h, true, true) + + // Ensure the plan allocated + plan := h.Plans[0] + planned := make(map[string]*structs.Allocation) + for _, allocList := range plan.NodeAllocation { + for _, alloc := range allocList { + planned[alloc.ID] = alloc + } + } + must.MapLen(t, 10, planned) + + // Update the job to force a rolling upgrade + updated := job.Copy() + updated.Datacenters = []string{"test"} + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, updated)) + + // Create a mock evaluation to handle the update + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerNodeUpdate, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + h1 := tests.NewHarnessWithState(t, h.State) + must.NoError(t, h1.Process(NewServiceScheduler, eval)) + + // Ensure we have created only one new allocation + // Ensure a single plan + must.SliceLen(t, 1, h1.Plans) + + plan = h1.Plans[0] + var newPlanned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + newPlanned = append(newPlanned, allocList...) + } + must.SliceLen(t, 10, newPlanned) + + // ensure new allocation has expected fields + for _, new := range newPlanned { + // new alloc should have a previous allocation + must.NotEq(t, new.PreviousAllocation, "") + + // new allocs PreviousAllocation must be a valid previously placed alloc + _, ok := planned[new.PreviousAllocation] + must.True(t, ok) + + // new alloc should be placed in the correct node pool + must.Eq(t, new.NodeID, node.ID) + } + }) } func TestServiceSched_JobRegister_StickyHostVolumes(t *testing.T) {