Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor refactor to scale-up orchestrator for more re-usability #7649

Merged
merged 1 commit into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ type AsyncNodeGroupInitializer struct {
atomicScaleUp bool
}

func newAsyncNodeGroupInitializer(
// NewAsyncNodeGroupInitializer creates a new AsyncNodeGroupInitializer instance.
func NewAsyncNodeGroupInitializer(
nodeGroup cloudprovider.NodeGroup,
nodeInfo *framework.NodeInfo,
scaleUpExecutor *scaleUpExecutor,
Expand Down
63 changes: 1 addition & 62 deletions cluster-autoscaler/core/scaleup/orchestrator/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package orchestrator

import (
"fmt"
"sort"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -138,7 +136,7 @@ func (e *scaleUpExecutor) executeScaleUpsParallel(
failedNodeGroups[i] = result.info.Group
scaleUpErrors[i] = result.err
}
return combineConcurrentScaleUpErrors(scaleUpErrors), failedNodeGroups
return errors.Combine(scaleUpErrors), failedNodeGroups
}
return nil, nil
}
Expand Down Expand Up @@ -188,65 +186,6 @@ func (e *scaleUpExecutor) executeScaleUp(
return nil
}

func combineConcurrentScaleUpErrors(errs []errors.AutoscalerError) errors.AutoscalerError {
if len(errs) == 0 {
return nil
}
if len(errs) == 1 {
return errs[0]
}
uniqueMessages := make(map[string]bool)
uniqueTypes := make(map[errors.AutoscalerErrorType]bool)
for _, err := range errs {
uniqueTypes[err.Type()] = true
uniqueMessages[err.Error()] = true
}
if len(uniqueTypes) == 1 && len(uniqueMessages) == 1 {
return errs[0]
}
// sort to stabilize the results and easier log aggregation
sort.Slice(errs, func(i, j int) bool {
errA := errs[i]
errB := errs[j]
if errA.Type() == errB.Type() {
return errs[i].Error() < errs[j].Error()
}
return errA.Type() < errB.Type()
})
firstErr := errs[0]
printErrorTypes := len(uniqueTypes) > 1
message := formatMessageFromConcurrentErrors(errs, printErrorTypes)
return errors.NewAutoscalerError(firstErr.Type(), message)
}

func formatMessageFromConcurrentErrors(errs []errors.AutoscalerError, printErrorTypes bool) string {
firstErr := errs[0]
var builder strings.Builder
builder.WriteString(firstErr.Error())
builder.WriteString(" ...and other concurrent errors: [")
formattedErrs := map[errors.AutoscalerError]bool{
firstErr: true,
}
for _, err := range errs {
if _, has := formattedErrs[err]; has {
continue
}
formattedErrs[err] = true
var message string
if printErrorTypes {
message = fmt.Sprintf("[%s] %s", err.Type(), err.Error())
} else {
message = err.Error()
}
if len(formattedErrs) > 2 {
builder.WriteString(", ")
}
builder.WriteString(fmt.Sprintf("%q", message))
}
builder.WriteString("]")
return builder.String()
}

// Checks if all groups are scaled only once.
// Scaling one group multiple times concurrently may cause problems.
func checkUniqueNodeGroups(scaleUpInfos []nodegroupset.ScaleUpInfo) errors.AutoscalerError {
Expand Down
127 changes: 0 additions & 127 deletions cluster-autoscaler/core/scaleup/orchestrator/executor_test.go

This file was deleted.

72 changes: 47 additions & 25 deletions cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,13 @@ func (o *ScaleUpOrchestrator) ScaleUp(
return buildNoOptionsAvailableStatus(markedEquivalenceGroups, skippedNodeGroups, nodeGroups), nil
}
var scaleUpStatus *status.ScaleUpStatus
createNodeGroupResults, scaleUpStatus, aErr = o.CreateNodeGroup(bestOption, nodeInfos, schedulablePodGroups, podEquivalenceGroups, daemonSets, allOrNothing)
oldId := bestOption.NodeGroup.Id()
if o.autoscalingContext.AsyncNodeGroupsEnabled {
initializer := NewAsyncNodeGroupInitializer(bestOption.NodeGroup, nodeInfos[oldId], o.scaleUpExecutor, o.taintConfig, daemonSets, o.processors.ScaleUpStatusProcessor, o.autoscalingContext, allOrNothing)
createNodeGroupResults, scaleUpStatus, aErr = o.CreateNodeGroupAsync(bestOption, nodeInfos, schedulablePodGroups, podEquivalenceGroups, daemonSets, initializer)
} else {
createNodeGroupResults, scaleUpStatus, aErr = o.CreateNodeGroup(bestOption, nodeInfos, schedulablePodGroups, podEquivalenceGroups, daemonSets)
}
if aErr != nil {
return scaleUpStatus, aErr
}
Expand Down Expand Up @@ -501,46 +507,62 @@ func (o *ScaleUpOrchestrator) CreateNodeGroup(
schedulablePodGroups map[string][]estimator.PodEquivalenceGroup,
podEquivalenceGroups []*equivalence.PodGroup,
daemonSets []*appsv1.DaemonSet,
allOrNothing bool,
) ([]nodegroups.CreateNodeGroupResult, *status.ScaleUpStatus, errors.AutoscalerError) {
createNodeGroupResults := make([]nodegroups.CreateNodeGroupResult, 0)
oldId := initialOption.NodeGroup.Id()
res, aErr := o.processors.NodeGroupManager.CreateNodeGroup(o.autoscalingContext, initialOption.NodeGroup)
return o.processCreateNodeGroupResult(initialOption, oldId, nodeInfos, schedulablePodGroups, podEquivalenceGroups, daemonSets, res, aErr)
}

// CreateNodeGroupAsync will try to create a new node group asynchronously based on the initialOption.
func (o *ScaleUpOrchestrator) CreateNodeGroupAsync(
initialOption *expander.Option,
nodeInfos map[string]*framework.NodeInfo,
schedulablePodGroups map[string][]estimator.PodEquivalenceGroup,
podEquivalenceGroups []*equivalence.PodGroup,
daemonSets []*appsv1.DaemonSet,
initializer nodegroups.AsyncNodeGroupInitializer,
) ([]nodegroups.CreateNodeGroupResult, *status.ScaleUpStatus, errors.AutoscalerError) {
oldId := initialOption.NodeGroup.Id()
var createNodeGroupResult nodegroups.CreateNodeGroupResult
var aErr errors.AutoscalerError
if o.autoscalingContext.AsyncNodeGroupsEnabled {
initializer := newAsyncNodeGroupInitializer(initialOption.NodeGroup, nodeInfos[oldId], o.scaleUpExecutor, o.taintConfig, daemonSets, o.processors.ScaleUpStatusProcessor, o.autoscalingContext, allOrNothing)
createNodeGroupResult, aErr = o.processors.NodeGroupManager.CreateNodeGroupAsync(o.autoscalingContext, initialOption.NodeGroup, initializer)
} else {
createNodeGroupResult, aErr = o.processors.NodeGroupManager.CreateNodeGroup(o.autoscalingContext, initialOption.NodeGroup)
}
res, aErr := o.processors.NodeGroupManager.CreateNodeGroupAsync(o.autoscalingContext, initialOption.NodeGroup, initializer)
return o.processCreateNodeGroupResult(initialOption, oldId, nodeInfos, schedulablePodGroups, podEquivalenceGroups, daemonSets, res, aErr)
}

func (o *ScaleUpOrchestrator) processCreateNodeGroupResult(
initialOption *expander.Option,
initialOptionId string,
nodeInfos map[string]*framework.NodeInfo,
schedulablePodGroups map[string][]estimator.PodEquivalenceGroup,
podEquivalenceGroups []*equivalence.PodGroup,
daemonSets []*appsv1.DaemonSet,
result nodegroups.CreateNodeGroupResult,
aErr errors.AutoscalerError,
) ([]nodegroups.CreateNodeGroupResult, *status.ScaleUpStatus, errors.AutoscalerError) {
if aErr != nil {
status, err := status.UpdateScaleUpError(
&status.ScaleUpStatus{FailedCreationNodeGroups: []cloudprovider.NodeGroup{initialOption.NodeGroup}, PodsTriggeredScaleUp: initialOption.Pods},
aErr)
return createNodeGroupResults, status, err
return []nodegroups.CreateNodeGroupResult{}, status, err
}

createNodeGroupResults = append(createNodeGroupResults, createNodeGroupResult)
initialOption.NodeGroup = createNodeGroupResult.MainCreatedNodeGroup
initialOption.NodeGroup = result.MainCreatedNodeGroup

// If possible replace candidate node-info with node info based on crated node group. The latter
// one should be more in line with nodes which will be created by node group.
mainCreatedNodeInfo, aErr := simulator.SanitizedTemplateNodeInfoFromNodeGroup(createNodeGroupResult.MainCreatedNodeGroup, daemonSets, o.taintConfig)
mainCreatedNodeInfo, aErr := simulator.SanitizedTemplateNodeInfoFromNodeGroup(result.MainCreatedNodeGroup, daemonSets, o.taintConfig)
if aErr == nil {
nodeInfos[createNodeGroupResult.MainCreatedNodeGroup.Id()] = mainCreatedNodeInfo
schedulablePodGroups[createNodeGroupResult.MainCreatedNodeGroup.Id()] = o.SchedulablePodGroups(podEquivalenceGroups, createNodeGroupResult.MainCreatedNodeGroup, mainCreatedNodeInfo)
nodeInfos[result.MainCreatedNodeGroup.Id()] = mainCreatedNodeInfo
schedulablePodGroups[result.MainCreatedNodeGroup.Id()] = o.SchedulablePodGroups(podEquivalenceGroups, result.MainCreatedNodeGroup, mainCreatedNodeInfo)
} else {
klog.Warningf("Cannot build node info for newly created main node group %v; balancing similar node groups may not work; err=%v", createNodeGroupResult.MainCreatedNodeGroup.Id(), aErr)
klog.Warningf("Cannot build node info for newly created main node group %v; balancing similar node groups may not work; err=%v", result.MainCreatedNodeGroup.Id(), aErr)
// Use node info based on expansion candidate but update Id which likely changed when node group was created.
nodeInfos[createNodeGroupResult.MainCreatedNodeGroup.Id()] = nodeInfos[oldId]
schedulablePodGroups[createNodeGroupResult.MainCreatedNodeGroup.Id()] = schedulablePodGroups[oldId]
nodeInfos[result.MainCreatedNodeGroup.Id()] = nodeInfos[initialOptionId]
schedulablePodGroups[result.MainCreatedNodeGroup.Id()] = schedulablePodGroups[initialOptionId]
}
if oldId != createNodeGroupResult.MainCreatedNodeGroup.Id() {
delete(nodeInfos, oldId)
delete(schedulablePodGroups, oldId)
if initialOptionId != result.MainCreatedNodeGroup.Id() {
delete(nodeInfos, initialOptionId)
delete(schedulablePodGroups, initialOptionId)
}
for _, nodeGroup := range createNodeGroupResult.ExtraCreatedNodeGroups {
for _, nodeGroup := range result.ExtraCreatedNodeGroups {
nodeInfo, aErr := simulator.SanitizedTemplateNodeInfoFromNodeGroup(nodeGroup, daemonSets, o.taintConfig)
if aErr != nil {
klog.Warningf("Cannot build node info for newly created extra node group %v; balancing similar node groups will not work; err=%v", nodeGroup.Id(), aErr)
Expand All @@ -554,7 +576,7 @@ func (o *ScaleUpOrchestrator) CreateNodeGroup(
// TODO(lukaszos) when pursuing scalability update this call with one which takes list of changed node groups so we do not
// do extra API calls. (the call at the bottom of ScaleUp() could be also changed then)
o.clusterStateRegistry.Recalculate()
return createNodeGroupResults, nil, nil
return []nodegroups.CreateNodeGroupResult{result}, nil, nil
}

// SchedulablePodGroups returns a list of pods that could be scheduled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ func TestCloudProviderFailingToScaleUpGroups(t *testing.T) {
assert.False(t, result.ScaleUpStatus.WasSuccessful())
assert.Equal(t, errors.CloudProviderError, result.ScaleUpError.Type())
assert.Equal(t, tc.expectedTotalTargetSizes, result.GroupTargetSizes["ng1"]+result.GroupTargetSizes["ng2"])
assert.Equal(t, tc.expectConcurrentErrors, strings.Contains(result.ScaleUpError.Error(), "...and other concurrent errors"))
assert.Equal(t, tc.expectConcurrentErrors, strings.Contains(result.ScaleUpError.Error(), "...and other errors"))
})
}
}
Expand Down
Loading
Loading