From 2f4c1312e386bf9d652e52217ad885873c8f89e8 Mon Sep 17 00:00:00 2001 From: hc-github-team-nomad-core <82989552+hc-github-team-nomad-core@users.noreply.github.com> Date: Wed, 8 Oct 2025 12:19:35 -0400 Subject: [PATCH 1/2] Backport of testing: fix flake in Agent.Host RPC handler test into release/1.10.x (#26913) Co-authored-by: Tim Gross --- nomad/client_agent_endpoint_test.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/nomad/client_agent_endpoint_test.go b/nomad/client_agent_endpoint_test.go index 87fec367bf4..a218811ef34 100644 --- a/nomad/client_agent_endpoint_test.go +++ b/nomad/client_agent_endpoint_test.go @@ -840,10 +840,7 @@ func TestAgentHost_Server(t *testing.T) { defer cleanup() TestJoin(t, s1, s2) - testutil.WaitForLeader(t, s1.RPC) testutil.WaitForKeyring(t, s1.RPC, s1.Region()) - testutil.WaitForLeader(t, s2.RPC) - testutil.WaitForKeyring(t, s2.RPC, s2.Region()) // determine leader and nonleader servers := []*Server{s1, s2} @@ -865,9 +862,9 @@ func TestAgentHost_Server(t *testing.T) { testutil.WaitForResult(func() (bool, error) { numNodes := len(s1.connectedNodes()) + len(s2.connectedNodes()) - return numNodes == 1, nil + return numNodes > 0, nil }, func(err error) { - t.Fatalf("should have a clients") + t.Fatalf("client should be connected to a server") }) cases := []struct { @@ -937,13 +934,14 @@ func TestAgentHost_Server(t *testing.T) { err := tc.origin.RPC("Agent.Host", &req, &reply) if tc.expectedErr != "" { - require.Contains(t, err.Error(), tc.expectedErr) + must.ErrorContains(t, err, tc.expectedErr) } else { - require.Nil(t, err) - require.NotEmpty(t, reply.HostData) + must.NoError(t, err) + must.NotNil(t, reply.HostData) } - require.Equal(t, tc.expectedAgentID, reply.AgentID) + // note: we expect this to be populated even on error + must.Eq(t, tc.expectedAgentID, reply.AgentID) }) } } From 4dfac160486f55986d114e73c424cef230cf14ca Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Wed, 8 Oct 2025 12:28:28 -0400 Subject: [PATCH 2/2] scheduler: skip setting preferred node when nodepool changes (#26662) Preferred node is used when a task group has an ephemeral disk, so we ideally stay on the same node. However if the jobs node pool changes, we should not select the current node as the preferred node, and let the scheduler decide which node to pick from the correct node pool. --- .changelog/26662.txt | 3 + scheduler/generic_sched.go | 12 ++ scheduler/generic_sched_test.go | 297 ++++++++++++++++++++++++-------- 3 files changed, 237 insertions(+), 75 deletions(-) create mode 100644 .changelog/26662.txt diff --git a/.changelog/26662.txt b/.changelog/26662.txt new file mode 100644 index 00000000000..c1ad01f2914 --- /dev/null +++ b/.changelog/26662.txt @@ -0,0 +1,3 @@ +```release-note:bug +scheduler: fixes a bug selecting nodes for updated jobs with ephemeral disks when nodepool changes +``` diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 835434e7628..99a0962bbb8 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -6,6 +6,7 @@ package scheduler import ( "fmt" "runtime/debug" + "slices" "sort" "time" @@ -886,6 +887,17 @@ func (s *GenericScheduler) findPreferredNode(place placementResult) (*structs.No if prev == nil { return nil, nil } + + // when a jobs nodepool or datacenter are updated, we should ignore setting a preferred node + // even if a task has ephemeral disk, as this would bypass the normal nodepool/datacenter node + // selection logic, which would result in the alloc being place incorrectly. + if prev.Job != nil && prev.Job.NodePool != s.job.NodePool { + return nil, nil + } + if !slices.Equal(prev.Job.Datacenters, s.job.Datacenters) { + return nil, nil + } + if place.TaskGroup().EphemeralDisk.Sticky || place.TaskGroup().EphemeralDisk.Migrate { var preferredNode *structs.Node ws := memdb.NewWatchSet() diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index f1702e8132e..eff4da89c3b 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -122,98 +122,245 @@ func TestServiceSched_JobRegister(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } -func TestServiceSched_JobRegister_StickyAllocs(t *testing.T) { +func TestServiceSched_JobRegister_EphemeralDisk(t *testing.T) { ci.Parallel(t) - h := NewHarness(t) + createEphemeralJob := func(t *testing.T, h *Harness, sticky, migrate bool) *structs.Job { + job := mock.Job() + job.TaskGroups[0].EphemeralDisk.Sticky = sticky + job.TaskGroups[0].EphemeralDisk.Migrate = migrate + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job)) - // Create some nodes - for i := 0; i < 10; i++ { + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + must.NoError(t, h.Process(NewServiceScheduler, eval)) + + return job + } + + t.Run("sticky ephemeral allocs in same node pool does not change nodes", func(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + for range 10 { + node := mock.Node() + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) + } + + // create a job + job := createEphemeralJob(t, h, true, false) + + // Ensure the plan allocated + plan := h.Plans[0] + planned := make(map[string]*structs.Allocation) + for _, allocList := range plan.NodeAllocation { + for _, alloc := range allocList { + planned[alloc.ID] = alloc + } + } + must.MapLen(t, 10, planned) + + // Update the job to force a rolling upgrade + updated := job.Copy() + updated.TaskGroups[0].Tasks[0].Resources.CPU += 10 + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, updated)) + + // Create a mock evaluation to handle the update + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerNodeUpdate, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + h1 := NewHarnessWithState(t, h.State) + must.NoError(t, h1.Process(NewServiceScheduler, eval)) + + // Ensure we have created only one new allocation + // Ensure a single plan + must.SliceLen(t, 1, h1.Plans) + + plan = h1.Plans[0] + var newPlanned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + newPlanned = append(newPlanned, allocList...) + } + must.SliceLen(t, 10, newPlanned) + // Ensure that the new allocations were placed on the same node as the older + // ones + for _, new := range newPlanned { + // new alloc should have a previous allocation + must.NotEq(t, new.PreviousAllocation, "") + + // new allocs PreviousAllocation must be a valid previously placed alloc + old, ok := planned[new.PreviousAllocation] + must.True(t, ok) + + // new alloc should be placed in the same node + must.Eq(t, new.NodeID, old.NodeID) + } + }) + + t.Run("ephemeral alloc should migrate if node pool changes", func(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + for range 5 { + node := mock.Node() + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) + } + + testNodePool := "test" node := mock.Node() + node.NodePool = testNodePool must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) - } - // Create a job - job := mock.Job() - job.TaskGroups[0].EphemeralDisk.Sticky = true - must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job)) + // Create test node pools with different scheduler algorithms. + testPool := mock.NodePool() + testPool.Name = "test" - // Create a mock evaluation to register the job - eval := &structs.Evaluation{ - Namespace: structs.DefaultNamespace, - ID: uuid.Generate(), - Priority: job.Priority, - TriggeredBy: structs.EvalTriggerJobRegister, - JobID: job.ID, - Status: structs.EvalStatusPending, - } - must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + nodePools := []*structs.NodePool{ + testPool, + } + h.State.UpsertNodePools(structs.MsgTypeTestSetup, h.NextIndex(), nodePools) - // Process the evaluation - if err := h.Process(NewServiceScheduler, eval); err != nil { - t.Fatalf("err: %v", err) - } + // Create a job + job := createEphemeralJob(t, h, true, true) - // Ensure the plan allocated - plan := h.Plans[0] - planned := make(map[string]*structs.Allocation) - for _, allocList := range plan.NodeAllocation { - for _, alloc := range allocList { - planned[alloc.ID] = alloc + // Ensure the plan allocated + plan := h.Plans[0] + planned := make(map[string]*structs.Allocation) + for _, allocList := range plan.NodeAllocation { + for _, alloc := range allocList { + planned[alloc.ID] = alloc + } } - } - if len(planned) != 10 { - t.Fatalf("bad: %#v", plan) - } + must.MapLen(t, 10, planned) - // Update the job to force a rolling upgrade - updated := job.Copy() - updated.TaskGroups[0].Tasks[0].Resources.CPU += 10 - must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, updated)) + // Update the job to force a rolling upgrade + updated := job.Copy() + updated.NodePool = "test" + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, updated)) - // Create a mock evaluation to handle the update - eval = &structs.Evaluation{ - Namespace: structs.DefaultNamespace, - ID: uuid.Generate(), - Priority: job.Priority, - TriggeredBy: structs.EvalTriggerNodeUpdate, - JobID: job.ID, - Status: structs.EvalStatusPending, - } - must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) - h1 := NewHarnessWithState(t, h.State) - if err := h1.Process(NewServiceScheduler, eval); err != nil { - t.Fatalf("err: %v", err) - } + // Create a mock evaluation to handle the update + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerNodeUpdate, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + h1 := NewHarnessWithState(t, h.State) + must.NoError(t, h1.Process(NewServiceScheduler, eval)) - // Ensure we have created only one new allocation - // Ensure a single plan - if len(h1.Plans) != 1 { - t.Fatalf("bad: %#v", h1.Plans) - } - plan = h1.Plans[0] - var newPlanned []*structs.Allocation - for _, allocList := range plan.NodeAllocation { - newPlanned = append(newPlanned, allocList...) - } - if len(newPlanned) != 10 { - t.Fatalf("bad plan: %#v", plan) - } - // Ensure that the new allocations were placed on the same node as the older - // ones - for _, new := range newPlanned { - if new.PreviousAllocation == "" { - t.Fatalf("new alloc %q doesn't have a previous allocation", new.ID) + // Ensure we have created only one new allocation + // Ensure a single plan + must.SliceLen(t, 1, h1.Plans) + + plan = h1.Plans[0] + var newPlanned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + newPlanned = append(newPlanned, allocList...) } + must.SliceLen(t, 10, newPlanned) + + // ensure new allocation has expected fields + for _, new := range newPlanned { + // new alloc should have a previous allocation + must.NotEq(t, new.PreviousAllocation, "") + + // new allocs PreviousAllocation must be a valid previously placed alloc + _, ok := planned[new.PreviousAllocation] + must.True(t, ok) - old, ok := planned[new.PreviousAllocation] - if !ok { - t.Fatalf("new alloc %q previous allocation doesn't match any prior placed alloc (%q)", new.ID, new.PreviousAllocation) + // new alloc should be placed in the correct node pool + must.Eq(t, new.Job.NodePool, testNodePool) } - if new.NodeID != old.NodeID { - t.Fatalf("new alloc and old alloc node doesn't match; got %q; want %q", new.NodeID, old.NodeID) + }) + + t.Run("ephemeral alloc should migrate if datacenter changes", func(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + for range 5 { + node := mock.Node() + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) } - } + + testDatacenter := "test" + node := mock.Node() + node.Datacenter = testDatacenter + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) + + // Create a job + job := createEphemeralJob(t, h, true, true) + + // Ensure the plan allocated + plan := h.Plans[0] + planned := make(map[string]*structs.Allocation) + for _, allocList := range plan.NodeAllocation { + for _, alloc := range allocList { + planned[alloc.ID] = alloc + } + } + must.MapLen(t, 10, planned) + + // Update the job to force a rolling upgrade + updated := job.Copy() + updated.Datacenters = []string{"test"} + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, updated)) + + // Create a mock evaluation to handle the update + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerNodeUpdate, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + h1 := NewHarnessWithState(t, h.State) + must.NoError(t, h1.Process(NewServiceScheduler, eval)) + + // Ensure we have created only one new allocation + // Ensure a single plan + must.SliceLen(t, 1, h1.Plans) + + plan = h1.Plans[0] + var newPlanned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + newPlanned = append(newPlanned, allocList...) + } + must.SliceLen(t, 10, newPlanned) + + // ensure new allocation has expected fields + for _, new := range newPlanned { + // new alloc should have a previous allocation + must.NotEq(t, new.PreviousAllocation, "") + + // new allocs PreviousAllocation must be a valid previously placed alloc + _, ok := planned[new.PreviousAllocation] + must.True(t, ok) + + // new alloc should be placed in the correct node pool + must.Eq(t, new.NodeID, node.ID) + } + }) } func TestServiceSched_JobRegister_StickyHostVolumes(t *testing.T) {