Skip to content

Commit

Permalink
consider in-progress instances for UniformAcrossAZ update strategy (#438
Browse files Browse the repository at this point in the history
)

* consider in-progress instances for UniformAcrossAZ update strategy

Signed-off-by: sbadiger <[email protected]>

* refactor SelectTargets

Signed-off-by: sbadiger <[email protected]>

* additional log messages

Signed-off-by: sbadiger <[email protected]>

* add log message
  • Loading branch information
shreyas-badiger authored Mar 15, 2024
1 parent 7511a22 commit b48d119
Show file tree
Hide file tree
Showing 6 changed files with 1,077 additions and 293 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ jobs:
uses: actions/checkout@v4

- name: Golangci-lint
uses: golangci/golangci-lint-action@v3
uses: golangci/golangci-lint-action@v4
with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
version: v1.53.2
version: v1.54
args: --timeout 3m

- name: Get kubebuilder
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ all: manager
# Run tests
ENVTEST_ASSETS_DIR=$(shell pwd)/testbin
test: manifests generate fmt vet envtest
go test ./controllers/... ./api/...
go test ./controllers/... ./api/... -coverprofile=coverage.txt
go tool cover -html=./coverage.txt -o cover.html

# Build manager binary
Expand Down
36 changes: 22 additions & 14 deletions controllers/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,15 +192,23 @@ type MockEC2 struct {

var _ ec2iface.EC2API = &MockEC2{}

func createASGInstance(instanceID string, launchConfigName string) *autoscaling.Instance {
func createASGInstance(instanceID string, launchConfigName string, az string) *autoscaling.Instance {
return &autoscaling.Instance{
InstanceId: &instanceID,
LaunchConfigurationName: &launchConfigName,
AvailabilityZone: aws.String("az-1"),
AvailabilityZone: aws.String(az),
LifecycleState: aws.String("InService"),
}
}

func createDriftedASGInstance(instanceID string, az string) *autoscaling.Instance {
return &autoscaling.Instance{
InstanceId: &instanceID,
AvailabilityZone: aws.String(az),
LifecycleState: aws.String("InService"),
}
}

func createASGInstanceWithLaunchTemplate(instanceID string, launchTemplateName string) *autoscaling.Instance {
return &autoscaling.Instance{
InstanceId: &instanceID,
Expand Down Expand Up @@ -231,9 +239,9 @@ func createASG(asgName string, launchConfigName string) *autoscaling.Group {
AutoScalingGroupName: &asgName,
LaunchConfigurationName: &launchConfigName,
Instances: []*autoscaling.Instance{
createASGInstance("mock-instance-1", launchConfigName),
createASGInstance("mock-instance-2", launchConfigName),
createASGInstance("mock-instance-3", launchConfigName),
createASGInstance("mock-instance-1", launchConfigName, "az-1"),
createASGInstance("mock-instance-2", launchConfigName, "az-2"),
createASGInstance("mock-instance-3", launchConfigName, "az-3"),
},
DesiredCapacity: func(x int) *int64 { i := int64(x); return &i }(3),
}
Expand All @@ -246,9 +254,9 @@ func createASGWithLaunchTemplate(asgName string, launchTemplate string) *autosca
LaunchTemplateName: &asgName,
},
Instances: []*autoscaling.Instance{
createASGInstance("mock-instance-1", launchTemplate),
createASGInstance("mock-instance-2", launchTemplate),
createASGInstance("mock-instance-3", launchTemplate),
createASGInstance("mock-instance-1", launchTemplate, "az-1"),
createASGInstance("mock-instance-2", launchTemplate, "az-2"),
createASGInstance("mock-instance-3", launchTemplate, "az-3"),
},
DesiredCapacity: func(x int) *int64 { i := int64(x); return &i }(3),
}
Expand All @@ -265,9 +273,9 @@ func createASGWithMixedInstanceLaunchTemplate(asgName string, launchTemplate str
},
},
Instances: []*autoscaling.Instance{
createASGInstance("mock-instance-1", launchTemplate),
createASGInstance("mock-instance-2", launchTemplate),
createASGInstance("mock-instance-3", launchTemplate),
createASGInstance("mock-instance-1", launchTemplate, "az-1"),
createASGInstance("mock-instance-2", launchTemplate, "az-2"),
createASGInstance("mock-instance-3", launchTemplate, "az-3"),
},
DesiredCapacity: func(x int) *int64 { i := int64(x); return &i }(3),
}
Expand All @@ -278,9 +286,9 @@ func createDriftedASG(asgName string, launchConfigName string) *autoscaling.Grou
AutoScalingGroupName: &asgName,
LaunchConfigurationName: &launchConfigName,
Instances: []*autoscaling.Instance{
createASGInstance("mock-instance-1", "different-launch-config"),
createASGInstance("mock-instance-2", "different-launch-config"),
createASGInstance("mock-instance-3", "different-launch-config"),
createDriftedASGInstance("mock-instance-1", "az-1"),
createDriftedASGInstance("mock-instance-2", "az-2"),
createDriftedASGInstance("mock-instance-3", "az-3"),
},
DesiredCapacity: func(x int) *int64 { i := int64(x); return &i }(3),
}
Expand Down
92 changes: 55 additions & 37 deletions controllers/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,9 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance)
inProcessingNodes = make(map[string]*v1alpha1.NodeInProcessing)
}

//Early-Cordon - Cordon all the nodes to avoid any further scheduling of new pods.
//Early-Cordon - Cordon all the nodes to prevent scheduling of new pods on older nodes.
if r.EarlyCordonNodes {
r.Info("early-cordon has been enabled, all the instances in the node group will be cordoned", "name", r.RollingUpgrade.NamespacedName())
if ok, err := r.CordonUncordonAllNodes(true); !ok {
return ok, err
}
Expand Down Expand Up @@ -443,9 +444,11 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance)

func (r *RollingUpgradeContext) SelectTargets(scalingGroup *autoscaling.Group, excludedInstances []string) []*autoscaling.Instance {
var (
batchSize = r.RollingUpgrade.MaxUnavailable()
totalNodes = int(aws.Int64Value(scalingGroup.DesiredCapacity))
targets = make([]*autoscaling.Instance, 0)
batchSize = r.RollingUpgrade.MaxUnavailable()
totalNodes = int(aws.Int64Value(scalingGroup.DesiredCapacity))
inprogressTargets = make([]*autoscaling.Instance, 0)
unrpocessedTargets = make([]*autoscaling.Instance, 0)
finalTargets = make([]*autoscaling.Instance, 0)
)
unavailableInt := CalculateMaxUnavailable(batchSize, totalNodes)

Expand All @@ -458,64 +461,73 @@ func (r *RollingUpgradeContext) SelectTargets(scalingGroup *autoscaling.Group, e
if selectedInstance := awsprovider.SelectScalingGroupInstance(instance, scalingGroup); !reflect.DeepEqual(selectedInstance, &autoscaling.Instance{}) {
//In-progress instances shouldn't be considered if they are in terminating state.
if !common.ContainsEqualFold(awsprovider.TerminatingInstanceStates, aws.StringValue(selectedInstance.LifecycleState)) {
targets = append(targets, selectedInstance)
inprogressTargets = append(inprogressTargets, selectedInstance)
}
}
}

if len(targets) > 0 {
r.Info("found in-progress instances", "instances", awsprovider.GetInstanceIDs(targets))
if len(inprogressTargets) > 0 {
r.Info("found in-progress instances", "instances", awsprovider.GetInstanceIDs(inprogressTargets), "name", r.RollingUpgrade.NamespacedName())
}

// select via strategy if there are no in-progress instances
if r.RollingUpgrade.UpdateStrategyType() == v1alpha1.RandomUpdateStrategy {
for _, instance := range scalingGroup.Instances {
if r.IsInstanceDrifted(instance) && !common.ContainsEqualFold(awsprovider.GetInstanceIDs(targets), aws.StringValue(instance.InstanceId)) && !common.ContainsEqualFold(excludedInstances, aws.StringValue(instance.InstanceId)) {
targets = append(targets, instance)
// continue to select other instances, if any.
instances, err := r.Cloud.AmazonClientSet.DescribeInstancesWithoutTagValue(instanceStateTagKey, inProgressTagValue)
if err != nil {
r.Info("unable to select targets, will retry.", "name", r.RollingUpgrade.NamespacedName())
return nil
}

for _, instance := range instances {
//don't consider instances when - terminating, empty, duplicates, excluded (errored our previously), not drifted.
if selectedInstance := awsprovider.SelectScalingGroupInstance(instance, scalingGroup); !reflect.DeepEqual(selectedInstance, &autoscaling.Instance{}) {
if r.IsInstanceDrifted(selectedInstance) && !common.ContainsEqualFold(awsprovider.TerminatingInstanceStates, aws.StringValue(selectedInstance.LifecycleState)) {
if !common.ContainsEqualFold(awsprovider.GetInstanceIDs(unrpocessedTargets), aws.StringValue(selectedInstance.InstanceId)) && !common.ContainsEqualFold(excludedInstances, aws.StringValue(selectedInstance.InstanceId)) {
unrpocessedTargets = append(unrpocessedTargets, selectedInstance)
}
}
}
if unavailableInt > len(targets) {
unavailableInt = len(targets)
}
return targets[:unavailableInt]
}

r.Info("found unprocessed instances", "instances", unrpocessedTargets, "name", r.RollingUpgrade.NamespacedName())

if r.RollingUpgrade.UpdateStrategyType() == v1alpha1.RandomUpdateStrategy {

finalTargets = append(inprogressTargets, unrpocessedTargets...)

} else if r.RollingUpgrade.UpdateStrategyType() == v1alpha1.UniformAcrossAzUpdateStrategy {
for _, instance := range scalingGroup.Instances {
if r.IsInstanceDrifted(instance) && !common.ContainsEqualFold(awsprovider.GetInstanceIDs(targets), aws.StringValue(instance.InstanceId)) && !common.ContainsEqualFold(excludedInstances, aws.StringValue(instance.InstanceId)) {
targets = append(targets, instance)
}
}

var AZtargets = make([]*autoscaling.Instance, 0)
var uniformAZTargets = make([]*autoscaling.Instance, 0)

// split targets into groups based on their AZ
targetsByAZ := map[string][]*autoscaling.Instance{}
for _, target := range targets {
targetsByAZMap := map[string][]*autoscaling.Instance{}
for _, target := range unrpocessedTargets {
az := aws.StringValue(target.AvailabilityZone)
targetsByAZ[az] = append(targetsByAZ[az], target)
targetsByAZMap[az] = append(targetsByAZMap[az], target)
}

// round-robin across the AZs with targets uniformly first and then best effort with remaining
for {
if len(AZtargets) == len(targets) {
if len(uniformAZTargets) == len(unrpocessedTargets) {
break
}

for az := range targetsByAZ {
targetsByAZGroupSize := len(targetsByAZ[az])
for az := range targetsByAZMap {
targetsByAZGroupSize := len(targetsByAZMap[az])
if targetsByAZGroupSize > 0 {
AZtargets = append(AZtargets, targetsByAZ[az][targetsByAZGroupSize-1]) // append last target
targetsByAZ[az] = targetsByAZ[az][:targetsByAZGroupSize-1] // pop last target
uniformAZTargets = append(uniformAZTargets, targetsByAZMap[az][targetsByAZGroupSize-1]) // append last target
targetsByAZMap[az] = targetsByAZMap[az][:targetsByAZGroupSize-1] // pop last target
}
}
}

if unavailableInt > len(AZtargets) {
unavailableInt = len(AZtargets)
}
return AZtargets[:unavailableInt]
finalTargets = append(inprogressTargets, uniformAZTargets...)
}

if unavailableInt > len(finalTargets) {
unavailableInt = len(finalTargets)
}
return targets

return finalTargets[:unavailableInt]
}

func (r *RollingUpgradeContext) IsInstanceDrifted(instance *autoscaling.Instance) bool {
Expand Down Expand Up @@ -559,6 +571,7 @@ func (r *RollingUpgradeContext) IsInstanceDrifted(instance *autoscaling.Instance
}
} else if scalingGroup.LaunchTemplate != nil {
if instance.LaunchTemplate == nil {
r.Info("instance is drifted, instance launchtemplate is empty", "name", r.RollingUpgrade.NamespacedName())
return true
}

Expand All @@ -575,13 +588,16 @@ func (r *RollingUpgradeContext) IsInstanceDrifted(instance *autoscaling.Instance
}

if !strings.EqualFold(launchTemplateName, instanceTemplateName) {
r.Info("instance is drifted, mismatch in launchtemplate name", "instanceID", instanceID, "instanceLT", instanceTemplateName, "asgLT", launchTemplateName, "name", r.RollingUpgrade.NamespacedName())
return true
} else if !strings.EqualFold(instanceTemplateVersion, templateVersion) {
r.Info("instance is drifted, mismatch in launchtemplate version", "instanceID", instanceID, "instanceLT-version", instanceTemplateVersion, "asgLT-version", templateVersion, "name", r.RollingUpgrade.NamespacedName())
return true
}

} else if scalingGroup.MixedInstancesPolicy != nil {
if instance.LaunchTemplate == nil {
r.Info("instance is drifted, instance launchtemplate is empty", "name", r.RollingUpgrade.NamespacedName())
return true
}

Expand All @@ -598,8 +614,10 @@ func (r *RollingUpgradeContext) IsInstanceDrifted(instance *autoscaling.Instance
}

if !strings.EqualFold(launchTemplateName, instanceTemplateName) {
r.Info("instance is drifted, mismatch in launchtemplate name", "instanceID", instanceID, "instanceLT", instanceTemplateName, "asgLT", launchTemplateName, "name", r.RollingUpgrade.NamespacedName())
return true
} else if !strings.EqualFold(instanceTemplateVersion, templateVersion) {
r.Info("instance is drifted, mismatch in launchtemplate version", "instanceID", instanceID, "instanceLT-version", instanceTemplateVersion, "asgLT-version", templateVersion, "name", r.RollingUpgrade.NamespacedName())
return true
}
}
Expand Down Expand Up @@ -773,12 +791,12 @@ func (r *RollingUpgradeContext) CordonUncordonAllNodes(cordonNode bool) (bool, e
} else {
instanceIDs, err = r.Auth.DescribeTaggedInstanceIDs(instanceStateTagKey, earlyCordonedTagValue)
if err != nil {
r.Error(err, "failed to discover ec2 instances with early-cordoned tag", "name", r.RollingUpgrade.NamespacedName())
r.Info("failed to discover ec2 instances with early-cordoned tag", "name", r.RollingUpgrade.NamespacedName())
}

r.Info("removing early-cordoning tag while uncordoning instances", "name", r.RollingUpgrade.NamespacedName())
if err := r.Auth.UntagEC2instances(instanceIDs, instanceStateTagKey, earlyCordonedTagValue); err != nil {
r.Error(err, "failed to delete early-cordoned tag for instances", "name", r.RollingUpgrade.NamespacedName())
r.Info("failed to delete early-cordoned tag for instances", "name", r.RollingUpgrade.NamespacedName())
}
// add unit test as well.

Expand Down
Loading

0 comments on commit b48d119

Please sign in to comment.