Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 68 additions & 52 deletions pkg/scheduler/actions/common/solvers/by_pod_solver.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,60 +40,65 @@ type solutionResult struct {
}

type byPodSolver struct {
feasibleNodes map[string]*node_info.NodeInfo
solutionValidator SolutionValidator
allowVictimConsolidation bool
actionType framework.ActionType
feasibleNodes map[string]*node_info.NodeInfo
amountOfNewPreemptorTasks int
solutionValidator SolutionValidator
allowVictimConsolidation bool
actionType framework.ActionType
}

func newByPodSolver(
feasibleNodes map[string]*node_info.NodeInfo,
amountOfNewPreemptorTasks int,
checkVictims SolutionValidator,
allowVictimConsolidation bool,
action framework.ActionType,
) *byPodSolver {
return &byPodSolver{
feasibleNodes: feasibleNodes,
solutionValidator: checkVictims,
allowVictimConsolidation: allowVictimConsolidation,
actionType: action,
feasibleNodes: feasibleNodes,
amountOfNewPreemptorTasks: amountOfNewPreemptorTasks,
solutionValidator: checkVictims,
allowVictimConsolidation: allowVictimConsolidation,
actionType: action,
}
}

func (s *byPodSolver) solve(
session *framework.Session, scenario *scenario.ByNodeScenario,
) *solutionResult {
statement := session.Statement()

pendingJob := scenario.GetPreemptor()
nextTaskToFindAllocation := scenario.PendingTasks()[len(scenario.PendingTasks())-1]
latestPotentialVictim := scenario.LatestPotentialVictim()
nextTask := scenario.PendingTasks()[len(scenario.PendingTasks())-1]

err := common.EvictAllPreemptees(session, scenario.RecordedVictimsTasks(), pendingJob, statement, s.actionType)
if err != nil {
return handleSolveError(pendingJob, nextTaskToFindAllocation, err, statement)
if err := common.EvictAllPreemptees(session, scenario.RecordedVictimsTasks(), pendingJob, statement, s.actionType); err != nil {
return handleSolveError(pendingJob, nextTask, err, statement)
}

if latestPotentialVictim == nil {
var result *solutionResult
var err error

latestPotentialVictims := scenario.LatestPotentialVictims()
if latestPotentialVictims == nil {
if hasRecordedVictimsForSimulation(scenario) {
log.InfraLogger.V(6).Infof("Trying to solve scenario with priviously calculated victims only")
result := s.runSimulation(session, scenario, statement, scenario.RecordedVictimsTasks())
if result != nil {
return result
}
log.InfraLogger.V(6).Infof("Trying to solve scenario with previously calculated victims only")
result = s.runSimulation(session, scenario, statement, scenario.RecordedVictimsTasks())
}
} else {
potentialVictimNodes := getNodesOfJob(latestPotentialVictim)
result, err := s.solveOnPotentialNodes(session, scenario, statement, potentialVictimNodes)
if err != nil {
return handleSolveError(pendingJob, nextTaskToFindAllocation, err, statement)
potentialVictimNodes := getNodesOfJobs(latestPotentialVictims)
if s.amountOfNewPreemptorTasks <= 1 {
result, err = s.solveFromSingleNodePreemption(session, scenario, statement, potentialVictimNodes)
} else {
result, err = s.solveOnPotentialNodes(session, scenario, statement, potentialVictimNodes)
}
if result != nil {
return result
if err != nil {
return handleSolveError(pendingJob, nextTask, err, statement)
}
}

statement.Discard() // No solution for scenario
if result != nil {
return result
}
statement.Discard()
return &solutionResult{false, nil, nil, nil}
}

Expand All @@ -115,29 +120,37 @@ func (s *byPodSolver) runSimulation(
return nil
}

func (s *byPodSolver) solveOnPotentialNodes(ssn *framework.Session, scenario *scenario.ByNodeScenario,
// solveFromSingleNodePreemption tries to solve the scenario by preempting the least amount of potential victims possible, while looking for the best cleanup of a single node.
// We can assume that cleaning a single node is enough only if we know that the preemptor job has only one more task then the previous scenario we solved.
// We try to remove potential victims from each node separately, and see if the simulation will succeed.
func (s *byPodSolver) solveFromSingleNodePreemption(ssn *framework.Session, scenario *scenario.ByNodeScenario,
statement *framework.Statement, potentialVictimNodeNames []string) (*solutionResult, error) {
for _, nodeToTest := range potentialVictimNodeNames {
log.InfraLogger.V(6).Infof(
"Trying to solve scenario with potantial victims from node: %s", nodeToTest)

nodeVictimsEvictionCheckpoint, potentialVictimsTasks, err :=
s.evictPotentialVictimsFromNode(ssn, scenario, statement, nodeToTest)
if err != nil {
return nil, err
for _, node := range potentialVictimNodeNames {
log.InfraLogger.V(6).Infof("Trying to solve scenario with potential victims from node: %s", node)
if result, err := s.solveOnPotentialNodes(ssn, scenario, statement, []string{node}); err != nil || result != nil {
return result, err
}
newFeasibleNodes := s.updateFeasibleNodes(ssn, potentialVictimsTasks)
}
return nil, nil
}

victimTasks := getVictimTasks(scenario.RecordedVictimsTasks(), potentialVictimsTasks)
result := s.runSimulation(ssn, scenario, statement, victimTasks)
if result != nil {
return result, nil
}
// solveOnPotentialNodes tries to solve the scenario by preempting all the potential victims from the given nodes, and running a simulation
func (s *byPodSolver) solveOnPotentialNodes(ssn *framework.Session, scenario *scenario.ByNodeScenario,
statement *framework.Statement, potentialVictimNodeNames []string) (*solutionResult, error) {
checkpoint, potentialVictimsTasks, err := s.evictPotentialVictimsFromNodes(ssn, scenario, statement, potentialVictimNodeNames...)
if err != nil {
return nil, err
}
newFeasibleNodes := s.updateFeasibleNodes(ssn, potentialVictimsTasks)

s.feasibleNodesRollback(newFeasibleNodes)
if err = statement.Rollback(*nodeVictimsEvictionCheckpoint); err != nil {
return nil, err
}
victimTasks := getVictimTasks(scenario.RecordedVictimsTasks(), potentialVictimsTasks)
if result := s.runSimulation(ssn, scenario, statement, victimTasks); result != nil {
return result, nil
}

s.feasibleNodesRollback(newFeasibleNodes)
if err = statement.Rollback(*checkpoint); err != nil {
return nil, err
}
return nil, nil
}
Expand All @@ -160,13 +173,13 @@ func (s *byPodSolver) updateFeasibleNodes(ssn *framework.Session, victimTasks []
return newFeasibleNodes
}

func (s *byPodSolver) evictPotentialVictimsFromNode(
session *framework.Session, scenario *scenario.ByNodeScenario, statement *framework.Statement, nodeToTest string,
func (s *byPodSolver) evictPotentialVictimsFromNodes(
session *framework.Session, scenario *scenario.ByNodeScenario, statement *framework.Statement, nodeToTest ...string,
) (*framework.Checkpoint, []*pod_info.PodInfo, error) {
recordedVictimsCheckpoint := statement.Checkpoint()
pendingJob := scenario.GetPreemptor()

potentialVictimsTasks := scenario.VictimsTasksFromNodes([]string{nodeToTest})
potentialVictimsTasks := scenario.VictimsTasksFromNodes(nodeToTest)
if err := common.EvictAllPreemptees(session, potentialVictimsTasks, pendingJob, statement, s.actionType); err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -201,15 +214,18 @@ func (s *byPodSolver) handleScenarioSolution(
return &solutionResult{true, victimsTasks, actualVictimJobs, statement}
}

func getNodesOfJob(pj *podgroup_info.PodGroupInfo) []string {
func getNodesOfJobs(pj []*podgroup_info.PodGroupInfo) []string {
if pj == nil {
return []string{}
}

pjNodeNames := map[string]string{}
for _, latestPotentialVictimTask := range pj.GetAllPodsMap() {
pjNodeNames[latestPotentialVictimTask.NodeName] = latestPotentialVictimTask.NodeName
for _, job := range pj {
for _, latestPotentialVictimTask := range job.GetAllPodsMap() {
pjNodeNames[latestPotentialVictimTask.NodeName] = latestPotentialVictimTask.NodeName
}
}

return maps.Keys(pjNodeNames)
}

Expand Down
19 changes: 11 additions & 8 deletions pkg/scheduler/actions/common/solvers/job_solver.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s *JobSolver) Solve(
return false, nil, calcVictimNames(state.recordedVictimsTasks)
}

result := s.probeAtK(ssn, &state, pendingJob, tasksToAllocate, n)
result := s.probeAtK(ssn, &state, pendingJob, tasksToAllocate, n, 0)
if result == nil || !result.solved {
return false, nil, calcVictimNames(state.recordedVictimsTasks)
}
Expand Down Expand Up @@ -110,7 +110,7 @@ func (s *JobSolver) searchMaxSolvableK(
var hi int
k := 1
for {
if !s.tryProbeAndDiscard(ssn, state, pendingJob, tasksToAllocate, k) {
if !s.tryProbeAndDiscard(ssn, state, pendingJob, tasksToAllocate, k, lo) {
hi = k
break
}
Expand All @@ -126,7 +126,7 @@ func (s *JobSolver) searchMaxSolvableK(

for hi-lo > 1 {
mid := (lo + hi) / 2
if s.tryProbeAndDiscard(ssn, state, pendingJob, tasksToAllocate, mid) {
if s.tryProbeAndDiscard(ssn, state, pendingJob, tasksToAllocate, mid, lo) {
lo = mid
} else {
hi = mid
Expand All @@ -143,8 +143,9 @@ func (s *JobSolver) tryProbeAndDiscard(
pendingJob *podgroup_info.PodGroupInfo,
tasksToAllocate []*pod_info.PodInfo,
k int,
previousK int,
) bool {
result := s.probeAtK(ssn, state, pendingJob, tasksToAllocate, k)
result := s.probeAtK(ssn, state, pendingJob, tasksToAllocate, k, previousK)
if result == nil || !result.solved {
log.InfraLogger.V(5).Infof("No solution found for %d tasks out of %d tasks to allocate for %s",
k, len(tasksToAllocate), pendingJob.Name)
Expand All @@ -167,13 +168,15 @@ func (s *JobSolver) probeAtK(
pendingJob *podgroup_info.PodGroupInfo,
tasksToAllocate []*pod_info.PodInfo,
k int,
previousK int,
) *solutionResult {
pendingTasks := tasksToAllocate[:k]
partialPendingJob := getPartialJobRepresentative(pendingJob, pendingTasks)
return s.solvePartialJob(ssn, state, partialPendingJob)
return s.solvePartialJob(ssn, state, partialPendingJob, k-previousK)
}

func (s *JobSolver) solvePartialJob(ssn *framework.Session, state *solvingState, partialPendingJob *podgroup_info.PodGroupInfo) *solutionResult {
func (s *JobSolver) solvePartialJob(
ssn *framework.Session, state *solvingState, partialPendingJob *podgroup_info.PodGroupInfo, amountOfNewPreemptorTasks int) *solutionResult {
feasibleNodeMap := map[string]*node_info.NodeInfo{}
for _, node := range s.feasibleNodes {
feasibleNodeMap[node.Name] = node
Expand All @@ -188,8 +191,8 @@ func (s *JobSolver) solvePartialJob(ssn *framework.Session, state *solvingState,

for scenarioToSolve := scenarioBuilder.GetValidScenario(); scenarioToSolve != nil; scenarioToSolve =
scenarioBuilder.GetNextScenario() {
scenarioSolver := newByPodSolver(feasibleNodeMap, s.solutionValidator, ssn.AllowConsolidatingReclaim(),
s.actionType)
scenarioSolver := newByPodSolver(feasibleNodeMap, amountOfNewPreemptorTasks, s.solutionValidator,
ssn.AllowConsolidatingReclaim(), s.actionType)

log.InfraLogger.V(5).Infof("Trying to solve scenario: %s", scenarioToSolve)
metrics.IncScenarioSimulatedByAction()
Expand Down
6 changes: 6 additions & 0 deletions pkg/scheduler/actions/common/solvers/pod_scenario_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type PodAccumulatedScenarioBuilder struct {
victimsJobsQueue *utils.JobsOrderByQueues

recordedVictimsTasks map[common_info.PodID]*pod_info.PodInfo

amountOfNewPotentialTasksInCurrentScenario int
}

func NewPodAccumulatedScenarioBuilder(
Expand Down Expand Up @@ -73,6 +75,7 @@ func NewPodAccumulatedScenarioBuilder(
recordedVictimsTasks: recordedVictimsTasks,
lastScenario: scenario,
scenarioFilters: scenarioFilters,
amountOfNewPotentialTasksInCurrentScenario: 0,
}
}

Expand Down Expand Up @@ -130,6 +133,7 @@ func (asb *PodAccumulatedScenarioBuilder) addNextPotentialVictims() bool {

if asb.lastScenario != nil {
asb.lastScenario.AddPotentialVictimsTasks(potentialVictimTasks)
asb.amountOfNewPotentialTasksInCurrentScenario += len(potentialVictimTasks)
}
return true
}
Expand All @@ -141,6 +145,8 @@ func (asb *PodAccumulatedScenarioBuilder) GetValidScenario() *solverscenario.ByN

return asb.GetNextScenario()
}
asb.lastScenario.SetAmountOfNewPotentialTasks(asb.amountOfNewPotentialTasksInCurrentScenario)
asb.amountOfNewPotentialTasksInCurrentScenario = 0
return asb.lastScenario
}

Expand Down
48 changes: 32 additions & 16 deletions pkg/scheduler/actions/common/solvers/scenario/base_scenario.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ var _ api.ScenarioInfo = &BaseScenario{}
type BaseScenario struct {
session *framework.Session

preemptor *podgroup_info.PodGroupInfo
victims map[common_info.PodGroupID]*api.VictimInfo
pendingTasks []*pod_info.PodInfo
potentialVictimsTasks []*pod_info.PodInfo
recordedVictimsJobs []*podgroup_info.PodGroupInfo
recordedVictimsTasks []*pod_info.PodInfo
preemptor *podgroup_info.PodGroupInfo
victims map[common_info.PodGroupID]*api.VictimInfo
pendingTasks []*pod_info.PodInfo
potentialVictimsTasks []*pod_info.PodInfo
amountOfNewPotentialTasks int
recordedVictimsJobs []*podgroup_info.PodGroupInfo
recordedVictimsTasks []*pod_info.PodInfo

// Deprecated: Use preemptor instead
victimsJobsTaskGroups map[common_info.PodGroupID][]*podgroup_info.PodGroupInfo
Expand All @@ -34,14 +35,15 @@ func NewBaseScenario(
recordedVictimsJobs []*podgroup_info.PodGroupInfo,
) *BaseScenario {
s := &BaseScenario{
session: session,
preemptor: originalJob,
victims: make(map[common_info.PodGroupID]*api.VictimInfo),
pendingTasks: make([]*pod_info.PodInfo, 0),
potentialVictimsTasks: make([]*pod_info.PodInfo, 0),
recordedVictimsJobs: make([]*podgroup_info.PodGroupInfo, len(recordedVictimsJobs)),
recordedVictimsTasks: nil,
victimsJobsTaskGroups: make(map[common_info.PodGroupID][]*podgroup_info.PodGroupInfo),
session: session,
preemptor: originalJob,
victims: make(map[common_info.PodGroupID]*api.VictimInfo),
pendingTasks: make([]*pod_info.PodInfo, 0),
potentialVictimsTasks: make([]*pod_info.PodInfo, 0),
amountOfNewPotentialTasks: 0,
recordedVictimsJobs: make([]*podgroup_info.PodGroupInfo, len(recordedVictimsJobs)),
recordedVictimsTasks: nil,
victimsJobsTaskGroups: make(map[common_info.PodGroupID][]*podgroup_info.PodGroupInfo),
}

for _, task := range pendingTasks {
Expand All @@ -50,6 +52,7 @@ func NewBaseScenario(
for _, task := range victimsTasks {
s.AddPotentialVictimsTasks([]*pod_info.PodInfo{task})
}
s.SetAmountOfNewPotentialTasks(len(victimsTasks))
for index, recordedVictimJob := range recordedVictimsJobs {
s.recordedVictimsJobs[index] = recordedVictimJob
var tasks []*pod_info.PodInfo
Expand Down Expand Up @@ -86,9 +89,13 @@ func (s *BaseScenario) RecordedVictimsJobs() []*podgroup_info.PodGroupInfo {
return s.recordedVictimsJobs
}

func (s *BaseScenario) LatestPotentialVictim() *podgroup_info.PodGroupInfo {
func (s *BaseScenario) LatestPotentialVictims() []*podgroup_info.PodGroupInfo {
if len(s.potentialVictimsTasks) > 0 {
return s.getJobForTask(s.potentialVictimsTasks[len(s.potentialVictimsTasks)-1])
latestPotentialVictims := []*podgroup_info.PodGroupInfo{}
for i := len(s.potentialVictimsTasks) - s.amountOfNewPotentialTasks; i < len(s.potentialVictimsTasks); i++ {
latestPotentialVictims = append(latestPotentialVictims, s.getJobForTask(s.potentialVictimsTasks[i]))
}
return latestPotentialVictims
} else {
return nil
}
Expand All @@ -107,6 +114,15 @@ func (s *BaseScenario) AddPotentialVictimsTasks(tasks []*pod_info.PodInfo) {
s.appendTasksAsVictimJob(tasks)
}

// SetAmountOfNewPotentialTasks sets the amount of new potential tasks that have been added to the scenario in comparison to the previous scenario tested.
func (s *BaseScenario) SetAmountOfNewPotentialTasks(amount int) {
s.amountOfNewPotentialTasks = amount
}

func (s *BaseScenario) GetAmountOfNewPotentialTasks() int {
return s.amountOfNewPotentialTasks
}

func (s *BaseScenario) appendTasksAsVictimJob(tasks []*pod_info.PodInfo) {
originalJob := s.getJobForTask(tasks[0])
job := originalJob.CloneWithTasks(tasks)
Expand Down
Loading
Loading