Skip to content

Commit

Permalink
use NamespacedName (#160)
Browse files Browse the repository at this point in the history
Signed-off-by: Eytan Avisror <[email protected]>

Co-authored-by: Shri Javadekar <[email protected]>
  • Loading branch information
eytan-avisror and shrinandj authored Dec 11, 2020
1 parent 6f57dcf commit aa2b73b
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 62 deletions.
6 changes: 6 additions & 0 deletions api/v1alpha1/rollingupgrade_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ limitations under the License.
package v1alpha1

import (
"fmt"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
Expand Down Expand Up @@ -135,6 +137,10 @@ func (c UpdateStrategyMode) String() string {
return string(c)
}

func (r RollingUpgrade) NamespacedName() string {
return fmt.Sprintf("%v/%v", r.Name, r.Namespace)
}

// UpdateStrategy holds the information needed to perform update based on different update strategies
type UpdateStrategy struct {
Type UpdateStrategyType `json:"type,omitempty"`
Expand Down
40 changes: 20 additions & 20 deletions controllers/rollingupgrade_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (r *RollingUpgradeReconciler) WaitForDesiredInstances(ruObj *upgrademgrv1al
return err
}

asg, err := r.GetAutoScalingGroup(ruObj.Name)
asg, err := r.GetAutoScalingGroup(ruObj.NamespacedName())
if err != nil {
return fmt.Errorf("Unable to load ASG with name: %s", ruObj.Name)
}
Expand Down Expand Up @@ -270,7 +270,7 @@ func (r *RollingUpgradeReconciler) WaitForDesiredNodes(ruObj *upgrademgrv1alpha1
r.error(ruObj, err, "unable to populate node list")
}

asg, err := r.GetAutoScalingGroup(ruObj.Name)
asg, err := r.GetAutoScalingGroup(ruObj.NamespacedName())
if err != nil {
return fmt.Errorf("Unable to load ASG with name: %s", ruObj.Name)
}
Expand Down Expand Up @@ -337,7 +337,7 @@ func (r *RollingUpgradeReconciler) GetAutoScalingGroup(rollupName string) (*auto
func (r *RollingUpgradeReconciler) SetStandby(ruObj *upgrademgrv1alpha1.RollingUpgrade, instanceID string) error {
r.info(ruObj, "Setting to stand-by", ruObj.Name, instanceID)

asg, err := r.GetAutoScalingGroup(ruObj.Name)
asg, err := r.GetAutoScalingGroup(ruObj.NamespacedName())
if err != nil {
return err
}
Expand Down Expand Up @@ -434,7 +434,7 @@ func (r *RollingUpgradeReconciler) getNodeFromAsg(i *autoscaling.Instance, nodeL

func (r *RollingUpgradeReconciler) populateAsg(ruObj *upgrademgrv1alpha1.RollingUpgrade) error {
// if value is still in cache, do nothing.
if !r.ruObjNameToASG.IsExpired(ruObj.Name) {
if !r.ruObjNameToASG.IsExpired(ruObj.NamespacedName()) {
return nil
}

Expand All @@ -458,7 +458,7 @@ func (r *RollingUpgradeReconciler) populateAsg(ruObj *upgrademgrv1alpha1.Rolling
}

asg := result.AutoScalingGroups[0]
r.ruObjNameToASG.Store(ruObj.Name, asg)
r.ruObjNameToASG.Store(ruObj.NamespacedName(), asg)

return nil
}
Expand Down Expand Up @@ -503,7 +503,7 @@ func (r *RollingUpgradeReconciler) getInProgressInstances(instances []*autoscali

func (r *RollingUpgradeReconciler) runRestack(ctx *context.Context, ruObj *upgrademgrv1alpha1.RollingUpgrade) (int, error) {

asg, err := r.GetAutoScalingGroup(ruObj.Name)
asg, err := r.GetAutoScalingGroup(ruObj.NamespacedName())
if err != nil {
return 0, fmt.Errorf("Unable to load ASG with name: %s", ruObj.Name)
}
Expand Down Expand Up @@ -612,7 +612,7 @@ func (r *RollingUpgradeReconciler) finishExecution(finalStatus string, nodesProc
r.ClusterState.deleteAllInstancesInAsg(ruObj.Spec.AsgName)
r.info(ruObj, "Deleted the entries of ASG in the cluster store", "asgName", ruObj.Spec.AsgName)
r.inProcessASGs.Delete(ruObj.Spec.AsgName)
r.admissionMap.Delete(ruObj.Name)
r.admissionMap.Delete(ruObj.NamespacedName())
r.info(ruObj, "Deleted from admission map ", "admissionMap", &r.admissionMap)
}

Expand All @@ -629,7 +629,7 @@ func (r *RollingUpgradeReconciler) Process(ctx *context.Context,
MarkObjForCleanup(ruObj)
}

r.admissionMap.Delete(ruObj.Name)
r.admissionMap.Delete(ruObj.NamespacedName())
r.info(ruObj, "Deleted object from admission map")
return
}
Expand Down Expand Up @@ -659,7 +659,7 @@ func (r *RollingUpgradeReconciler) Process(ctx *context.Context,
return
}

asg, err := r.GetAutoScalingGroup(ruObj.Name)
asg, err := r.GetAutoScalingGroup(ruObj.NamespacedName())
if err != nil {
r.error(ruObj, err, "Unable to load ASG for rolling upgrade")
r.finishExecution(StatusError, 0, ctx, ruObj)
Expand Down Expand Up @@ -703,7 +703,7 @@ func (r *RollingUpgradeReconciler) validateNodesLaunchDefinition(ruObj *upgradem
if err != nil {
return errors.New("Unable to populate the ASG object")
}
asg, err := r.GetAutoScalingGroup(ruObj.Name)
asg, err := r.GetAutoScalingGroup(ruObj.NamespacedName())
if err != nil {
return fmt.Errorf("Unable to load ASG with name: %s", ruObj.Name)
}
Expand Down Expand Up @@ -763,7 +763,7 @@ func (r *RollingUpgradeReconciler) Reconcile(req ctrl.Request) (ctrl.Result, err
if k8serrors.IsNotFound(err) {
// Object not found, return. Created objects are automatically garbage collected.
// For additional cleanup logic use finalizers.
r.admissionMap.Delete(req.Name)
r.admissionMap.Delete(req.NamespacedName)
r.info(ruObj, "Deleted object from map", "name", req.NamespacedName)
return ctrl.Result{}, nil
}
Expand All @@ -774,8 +774,8 @@ func (r *RollingUpgradeReconciler) Reconcile(req ctrl.Request) (ctrl.Result, err
// If the resource is being deleted, remove it from the admissionMap
if !ruObj.DeletionTimestamp.IsZero() {
r.info(ruObj, "Object is being deleted. No more processing")
r.admissionMap.Delete(ruObj.Name)
r.ruObjNameToASG.Delete(ruObj.Name)
r.admissionMap.Delete(ruObj.NamespacedName())
r.ruObjNameToASG.Delete(ruObj.NamespacedName())
r.info(ruObj, "Deleted object from admission map")
return reconcile.Result{}, nil
}
Expand All @@ -797,17 +797,17 @@ func (r *RollingUpgradeReconciler) Reconcile(req ctrl.Request) (ctrl.Result, err
return reconcile.Result{}, err
}

result, ok := r.admissionMap.Load(ruObj.Name)
result, ok := r.admissionMap.Load(ruObj.NamespacedName())
if ok {
if result == "processing" {
r.info(ruObj, "Found obj in map:", "name", ruObj.Name)
r.info(ruObj, "Object already being processed", "name", ruObj.Name)
r.info(ruObj, "Found obj in map:", "name", ruObj.NamespacedName())
r.info(ruObj, "Object already being processed", "name", ruObj.NamespacedName())
} else {
r.info(ruObj, "Sync map with invalid entry for ", "name", ruObj.Name)
r.info(ruObj, "Sync map with invalid entry for ", "name", ruObj.NamespacedName())
}
} else {
r.info(ruObj, "Adding obj to map: ", "name", ruObj.Name)
r.admissionMap.Store(ruObj.Name, "processing")
r.info(ruObj, "Adding obj to map: ", "name", ruObj.NamespacedName())
r.admissionMap.Store(ruObj.NamespacedName(), "processing")
go r.Process(&ctx, ruObj)
}

Expand Down Expand Up @@ -1011,7 +1011,7 @@ func (r *RollingUpgradeReconciler) UpdateInstance(ctx *context.Context,
defer r.ClusterState.markUpdateCompleted(targetInstanceID)

// Check if the rollingupgrade object still exists
_, ok := r.admissionMap.Load(ruObj.Name)
_, ok := r.admissionMap.Load(ruObj.NamespacedName())
if !ok {
r.info(ruObj, "Object either force completed or deleted. Ignoring node update")
ruObj.Status.NodesProcessed = ruObj.Status.NodesProcessed + 1
Expand Down
Loading

0 comments on commit aa2b73b

Please sign in to comment.