Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/26662.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
scheduler: fixes a bug selecting nodes for updated jobs with ephemeral disks when nodepool changes
```
12 changes: 12 additions & 0 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package scheduler
import (
"fmt"
"runtime/debug"
"slices"
"sort"
"time"

Expand Down Expand Up @@ -860,6 +861,17 @@ func (s *GenericScheduler) findPreferredNode(place reconciler.PlacementResult) (
if prev == nil {
return nil, nil
}

// when a jobs nodepool or datacenter are updated, we should ignore setting a preferred node
// even if a task has ephemeral disk, as this would bypass the normal nodepool/datacenter node
// selection logic, which would result in the alloc being place incorrectly.
if prev.Job != nil && prev.Job.NodePool != s.job.NodePool {
return nil, nil
}
if !slices.Equal(prev.Job.Datacenters, s.job.Datacenters) {
return nil, nil
}

if place.TaskGroup().EphemeralDisk.Sticky || place.TaskGroup().EphemeralDisk.Migrate {
var preferredNode *structs.Node
ws := memdb.NewWatchSet()
Expand Down
297 changes: 222 additions & 75 deletions scheduler/generic_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,98 +127,245 @@ func TestServiceSched_JobRegister(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}

func TestServiceSched_JobRegister_StickyAllocs(t *testing.T) {
func TestServiceSched_JobRegister_EphemeralDisk(t *testing.T) {
ci.Parallel(t)

h := tests.NewHarness(t)
createEphemeralJob := func(t *testing.T, h *tests.Harness, sticky, migrate bool) *structs.Job {
job := mock.Job()
job.TaskGroups[0].EphemeralDisk.Sticky = sticky
job.TaskGroups[0].EphemeralDisk.Migrate = migrate
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))

// Create some nodes
for i := 0; i < 10; i++ {
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))

// Process the evaluation
must.NoError(t, h.Process(NewServiceScheduler, eval))

return job
}

t.Run("sticky ephemeral allocs in same node pool does not change nodes", func(t *testing.T) {
h := tests.NewHarness(t)

// Create some nodes
for range 10 {
node := mock.Node()
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
}

// create a job
job := createEphemeralJob(t, h, true, false)

// Ensure the plan allocated
plan := h.Plans[0]
planned := make(map[string]*structs.Allocation)
for _, allocList := range plan.NodeAllocation {
for _, alloc := range allocList {
planned[alloc.ID] = alloc
}
}
must.MapLen(t, 10, planned)

// Update the job to force a rolling upgrade
updated := job.Copy()
updated.TaskGroups[0].Tasks[0].Resources.CPU += 10
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, updated))

// Create a mock evaluation to handle the update
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
h1 := tests.NewHarnessWithState(t, h.State)
must.NoError(t, h1.Process(NewServiceScheduler, eval))

// Ensure we have created only one new allocation
// Ensure a single plan
must.SliceLen(t, 1, h1.Plans)

plan = h1.Plans[0]
var newPlanned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
newPlanned = append(newPlanned, allocList...)
}
must.SliceLen(t, 10, newPlanned)
// Ensure that the new allocations were placed on the same node as the older
// ones
for _, new := range newPlanned {
// new alloc should have a previous allocation
must.NotEq(t, new.PreviousAllocation, "")

// new allocs PreviousAllocation must be a valid previously placed alloc
old, ok := planned[new.PreviousAllocation]
must.True(t, ok)

// new alloc should be placed in the same node
must.Eq(t, new.NodeID, old.NodeID)
}
})

t.Run("ephemeral alloc should migrate if node pool changes", func(t *testing.T) {
h := tests.NewHarness(t)

// Create some nodes
for range 5 {
node := mock.Node()
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
}

testNodePool := "test"
node := mock.Node()
node.NodePool = testNodePool
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
}

// Create a job
job := mock.Job()
job.TaskGroups[0].EphemeralDisk.Sticky = true
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
// Create test node pools with different scheduler algorithms.
testPool := mock.NodePool()
testPool.Name = "test"

// Create a mock evaluation to register the job
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
nodePools := []*structs.NodePool{
testPool,
}
h.State.UpsertNodePools(structs.MsgTypeTestSetup, h.NextIndex(), nodePools)

// Process the evaluation
if err := h.Process(NewServiceScheduler, eval); err != nil {
t.Fatalf("err: %v", err)
}
// Create a job
job := createEphemeralJob(t, h, true, true)

// Ensure the plan allocated
plan := h.Plans[0]
planned := make(map[string]*structs.Allocation)
for _, allocList := range plan.NodeAllocation {
for _, alloc := range allocList {
planned[alloc.ID] = alloc
// Ensure the plan allocated
plan := h.Plans[0]
planned := make(map[string]*structs.Allocation)
for _, allocList := range plan.NodeAllocation {
for _, alloc := range allocList {
planned[alloc.ID] = alloc
}
}
}
if len(planned) != 10 {
t.Fatalf("bad: %#v", plan)
}
must.MapLen(t, 10, planned)

// Update the job to force a rolling upgrade
updated := job.Copy()
updated.TaskGroups[0].Tasks[0].Resources.CPU += 10
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, updated))
// Update the job to force a rolling upgrade
updated := job.Copy()
updated.NodePool = "test"
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, updated))

// Create a mock evaluation to handle the update
eval = &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
h1 := tests.NewHarnessWithState(t, h.State)
if err := h1.Process(NewServiceScheduler, eval); err != nil {
t.Fatalf("err: %v", err)
}
// Create a mock evaluation to handle the update
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
h1 := tests.NewHarnessWithState(t, h.State)
must.NoError(t, h1.Process(NewServiceScheduler, eval))

// Ensure we have created only one new allocation
// Ensure a single plan
if len(h1.Plans) != 1 {
t.Fatalf("bad: %#v", h1.Plans)
}
plan = h1.Plans[0]
var newPlanned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
newPlanned = append(newPlanned, allocList...)
}
if len(newPlanned) != 10 {
t.Fatalf("bad plan: %#v", plan)
}
// Ensure that the new allocations were placed on the same node as the older
// ones
for _, new := range newPlanned {
if new.PreviousAllocation == "" {
t.Fatalf("new alloc %q doesn't have a previous allocation", new.ID)
// Ensure we have created only one new allocation
// Ensure a single plan
must.SliceLen(t, 1, h1.Plans)

plan = h1.Plans[0]
var newPlanned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
newPlanned = append(newPlanned, allocList...)
}
must.SliceLen(t, 10, newPlanned)

// ensure new allocation has expected fields
for _, new := range newPlanned {
// new alloc should have a previous allocation
must.NotEq(t, new.PreviousAllocation, "")

// new allocs PreviousAllocation must be a valid previously placed alloc
_, ok := planned[new.PreviousAllocation]
must.True(t, ok)

old, ok := planned[new.PreviousAllocation]
if !ok {
t.Fatalf("new alloc %q previous allocation doesn't match any prior placed alloc (%q)", new.ID, new.PreviousAllocation)
// new alloc should be placed in the correct node pool
must.Eq(t, new.Job.NodePool, testNodePool)
}
if new.NodeID != old.NodeID {
t.Fatalf("new alloc and old alloc node doesn't match; got %q; want %q", new.NodeID, old.NodeID)
})

t.Run("ephemeral alloc should migrate if datacenter changes", func(t *testing.T) {
h := tests.NewHarness(t)

// Create some nodes
for range 5 {
node := mock.Node()
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
}
}

testDatacenter := "test"
node := mock.Node()
node.Datacenter = testDatacenter
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))

// Create a job
job := createEphemeralJob(t, h, true, true)

// Ensure the plan allocated
plan := h.Plans[0]
planned := make(map[string]*structs.Allocation)
for _, allocList := range plan.NodeAllocation {
for _, alloc := range allocList {
planned[alloc.ID] = alloc
}
}
must.MapLen(t, 10, planned)

// Update the job to force a rolling upgrade
updated := job.Copy()
updated.Datacenters = []string{"test"}
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, updated))

// Create a mock evaluation to handle the update
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
h1 := tests.NewHarnessWithState(t, h.State)
must.NoError(t, h1.Process(NewServiceScheduler, eval))

// Ensure we have created only one new allocation
// Ensure a single plan
must.SliceLen(t, 1, h1.Plans)

plan = h1.Plans[0]
var newPlanned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
newPlanned = append(newPlanned, allocList...)
}
must.SliceLen(t, 10, newPlanned)

// ensure new allocation has expected fields
for _, new := range newPlanned {
// new alloc should have a previous allocation
must.NotEq(t, new.PreviousAllocation, "")

// new allocs PreviousAllocation must be a valid previously placed alloc
_, ok := planned[new.PreviousAllocation]
must.True(t, ok)

// new alloc should be placed in the correct node pool
must.Eq(t, new.NodeID, node.ID)
}
})
}

func TestServiceSched_JobRegister_StickyHostVolumes(t *testing.T) {
Expand Down