Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/26662.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
scheduler: fixes a bug selecting nodes for updated jobs with ephemeral disks when nodepool changes
```
16 changes: 7 additions & 9 deletions nomad/client_agent_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,10 +840,7 @@ func TestAgentHost_Server(t *testing.T) {
defer cleanup()

TestJoin(t, s1, s2)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForKeyring(t, s1.RPC, s1.Region())
testutil.WaitForLeader(t, s2.RPC)
testutil.WaitForKeyring(t, s2.RPC, s2.Region())

// determine leader and nonleader
servers := []*Server{s1, s2}
Expand All @@ -865,9 +862,9 @@ func TestAgentHost_Server(t *testing.T) {

testutil.WaitForResult(func() (bool, error) {
numNodes := len(s1.connectedNodes()) + len(s2.connectedNodes())
return numNodes == 1, nil
return numNodes > 0, nil
}, func(err error) {
t.Fatalf("should have a clients")
t.Fatalf("client should be connected to a server")
})

cases := []struct {
Expand Down Expand Up @@ -937,13 +934,14 @@ func TestAgentHost_Server(t *testing.T) {

err := tc.origin.RPC("Agent.Host", &req, &reply)
if tc.expectedErr != "" {
require.Contains(t, err.Error(), tc.expectedErr)
must.ErrorContains(t, err, tc.expectedErr)
} else {
require.Nil(t, err)
require.NotEmpty(t, reply.HostData)
must.NoError(t, err)
must.NotNil(t, reply.HostData)
}

require.Equal(t, tc.expectedAgentID, reply.AgentID)
// note: we expect this to be populated even on error
must.Eq(t, tc.expectedAgentID, reply.AgentID)
})
}
}
Expand Down
12 changes: 12 additions & 0 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package scheduler
import (
"fmt"
"runtime/debug"
"slices"
"sort"
"time"

Expand Down Expand Up @@ -886,6 +887,17 @@ func (s *GenericScheduler) findPreferredNode(place placementResult) (*structs.No
if prev == nil {
return nil, nil
}

// when a jobs nodepool or datacenter are updated, we should ignore setting a preferred node
// even if a task has ephemeral disk, as this would bypass the normal nodepool/datacenter node
// selection logic, which would result in the alloc being place incorrectly.
if prev.Job != nil && prev.Job.NodePool != s.job.NodePool {
return nil, nil
}
if !slices.Equal(prev.Job.Datacenters, s.job.Datacenters) {
return nil, nil
}

if place.TaskGroup().EphemeralDisk.Sticky || place.TaskGroup().EphemeralDisk.Migrate {
var preferredNode *structs.Node
ws := memdb.NewWatchSet()
Expand Down
297 changes: 222 additions & 75 deletions scheduler/generic_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,98 +122,245 @@ func TestServiceSched_JobRegister(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}

func TestServiceSched_JobRegister_StickyAllocs(t *testing.T) {
func TestServiceSched_JobRegister_EphemeralDisk(t *testing.T) {
ci.Parallel(t)

h := NewHarness(t)
createEphemeralJob := func(t *testing.T, h *Harness, sticky, migrate bool) *structs.Job {
job := mock.Job()
job.TaskGroups[0].EphemeralDisk.Sticky = sticky
job.TaskGroups[0].EphemeralDisk.Migrate = migrate
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))

// Create some nodes
for i := 0; i < 10; i++ {
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))

// Process the evaluation
must.NoError(t, h.Process(NewServiceScheduler, eval))

return job
}

t.Run("sticky ephemeral allocs in same node pool does not change nodes", func(t *testing.T) {
h := NewHarness(t)

// Create some nodes
for range 10 {
node := mock.Node()
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
}

// create a job
job := createEphemeralJob(t, h, true, false)

// Ensure the plan allocated
plan := h.Plans[0]
planned := make(map[string]*structs.Allocation)
for _, allocList := range plan.NodeAllocation {
for _, alloc := range allocList {
planned[alloc.ID] = alloc
}
}
must.MapLen(t, 10, planned)

// Update the job to force a rolling upgrade
updated := job.Copy()
updated.TaskGroups[0].Tasks[0].Resources.CPU += 10
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, updated))

// Create a mock evaluation to handle the update
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
h1 := NewHarnessWithState(t, h.State)
must.NoError(t, h1.Process(NewServiceScheduler, eval))

// Ensure we have created only one new allocation
// Ensure a single plan
must.SliceLen(t, 1, h1.Plans)

plan = h1.Plans[0]
var newPlanned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
newPlanned = append(newPlanned, allocList...)
}
must.SliceLen(t, 10, newPlanned)
// Ensure that the new allocations were placed on the same node as the older
// ones
for _, new := range newPlanned {
// new alloc should have a previous allocation
must.NotEq(t, new.PreviousAllocation, "")

// new allocs PreviousAllocation must be a valid previously placed alloc
old, ok := planned[new.PreviousAllocation]
must.True(t, ok)

// new alloc should be placed in the same node
must.Eq(t, new.NodeID, old.NodeID)
}
})

t.Run("ephemeral alloc should migrate if node pool changes", func(t *testing.T) {
h := NewHarness(t)

// Create some nodes
for range 5 {
node := mock.Node()
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
}

testNodePool := "test"
node := mock.Node()
node.NodePool = testNodePool
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
}

// Create a job
job := mock.Job()
job.TaskGroups[0].EphemeralDisk.Sticky = true
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
// Create test node pools with different scheduler algorithms.
testPool := mock.NodePool()
testPool.Name = "test"

// Create a mock evaluation to register the job
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
nodePools := []*structs.NodePool{
testPool,
}
h.State.UpsertNodePools(structs.MsgTypeTestSetup, h.NextIndex(), nodePools)

// Process the evaluation
if err := h.Process(NewServiceScheduler, eval); err != nil {
t.Fatalf("err: %v", err)
}
// Create a job
job := createEphemeralJob(t, h, true, true)

// Ensure the plan allocated
plan := h.Plans[0]
planned := make(map[string]*structs.Allocation)
for _, allocList := range plan.NodeAllocation {
for _, alloc := range allocList {
planned[alloc.ID] = alloc
// Ensure the plan allocated
plan := h.Plans[0]
planned := make(map[string]*structs.Allocation)
for _, allocList := range plan.NodeAllocation {
for _, alloc := range allocList {
planned[alloc.ID] = alloc
}
}
}
if len(planned) != 10 {
t.Fatalf("bad: %#v", plan)
}
must.MapLen(t, 10, planned)

// Update the job to force a rolling upgrade
updated := job.Copy()
updated.TaskGroups[0].Tasks[0].Resources.CPU += 10
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, updated))
// Update the job to force a rolling upgrade
updated := job.Copy()
updated.NodePool = "test"
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, updated))

// Create a mock evaluation to handle the update
eval = &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
h1 := NewHarnessWithState(t, h.State)
if err := h1.Process(NewServiceScheduler, eval); err != nil {
t.Fatalf("err: %v", err)
}
// Create a mock evaluation to handle the update
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
h1 := NewHarnessWithState(t, h.State)
must.NoError(t, h1.Process(NewServiceScheduler, eval))

// Ensure we have created only one new allocation
// Ensure a single plan
if len(h1.Plans) != 1 {
t.Fatalf("bad: %#v", h1.Plans)
}
plan = h1.Plans[0]
var newPlanned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
newPlanned = append(newPlanned, allocList...)
}
if len(newPlanned) != 10 {
t.Fatalf("bad plan: %#v", plan)
}
// Ensure that the new allocations were placed on the same node as the older
// ones
for _, new := range newPlanned {
if new.PreviousAllocation == "" {
t.Fatalf("new alloc %q doesn't have a previous allocation", new.ID)
// Ensure we have created only one new allocation
// Ensure a single plan
must.SliceLen(t, 1, h1.Plans)

plan = h1.Plans[0]
var newPlanned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
newPlanned = append(newPlanned, allocList...)
}
must.SliceLen(t, 10, newPlanned)

// ensure new allocation has expected fields
for _, new := range newPlanned {
// new alloc should have a previous allocation
must.NotEq(t, new.PreviousAllocation, "")

// new allocs PreviousAllocation must be a valid previously placed alloc
_, ok := planned[new.PreviousAllocation]
must.True(t, ok)

old, ok := planned[new.PreviousAllocation]
if !ok {
t.Fatalf("new alloc %q previous allocation doesn't match any prior placed alloc (%q)", new.ID, new.PreviousAllocation)
// new alloc should be placed in the correct node pool
must.Eq(t, new.Job.NodePool, testNodePool)
}
if new.NodeID != old.NodeID {
t.Fatalf("new alloc and old alloc node doesn't match; got %q; want %q", new.NodeID, old.NodeID)
})

t.Run("ephemeral alloc should migrate if datacenter changes", func(t *testing.T) {
h := NewHarness(t)

// Create some nodes
for range 5 {
node := mock.Node()
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
}
}

testDatacenter := "test"
node := mock.Node()
node.Datacenter = testDatacenter
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))

// Create a job
job := createEphemeralJob(t, h, true, true)

// Ensure the plan allocated
plan := h.Plans[0]
planned := make(map[string]*structs.Allocation)
for _, allocList := range plan.NodeAllocation {
for _, alloc := range allocList {
planned[alloc.ID] = alloc
}
}
must.MapLen(t, 10, planned)

// Update the job to force a rolling upgrade
updated := job.Copy()
updated.Datacenters = []string{"test"}
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, updated))

// Create a mock evaluation to handle the update
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
h1 := NewHarnessWithState(t, h.State)
must.NoError(t, h1.Process(NewServiceScheduler, eval))

// Ensure we have created only one new allocation
// Ensure a single plan
must.SliceLen(t, 1, h1.Plans)

plan = h1.Plans[0]
var newPlanned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
newPlanned = append(newPlanned, allocList...)
}
must.SliceLen(t, 10, newPlanned)

// ensure new allocation has expected fields
for _, new := range newPlanned {
// new alloc should have a previous allocation
must.NotEq(t, new.PreviousAllocation, "")

// new allocs PreviousAllocation must be a valid previously placed alloc
_, ok := planned[new.PreviousAllocation]
must.True(t, ok)

// new alloc should be placed in the correct node pool
must.Eq(t, new.NodeID, node.ID)
}
})
}

func TestServiceSched_JobRegister_StickyHostVolumes(t *testing.T) {
Expand Down