Skip to content

use context to create new loggers #184

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion controllers/appWrapper_controller_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package controllers

import (
"context"
"testing"

"github.com/onsi/gomega"
Expand Down Expand Up @@ -99,7 +100,7 @@ func (r *AppWrapperReconciler) TestDiscoverInstanceTypes(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
result := r.discoverInstanceTypes(test.input)
result := r.discoverInstanceTypes(context.TODO(), test.input)
g.Expect(result).To(gomega.Equal(test.expected))
})
}
Expand Down
52 changes: 39 additions & 13 deletions controllers/appwrapper_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/klog"

"k8s.io/apimachinery/pkg/labels"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -81,7 +80,6 @@ const (
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {

_ = log.FromContext(ctx)
// todo: Move the getOCMClusterID call out of reconcile loop.
// Only reason we are calling it here is that the client is not able to make
Expand Down Expand Up @@ -122,7 +120,7 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, nil
}

demandPerInstanceType := r.discoverInstanceTypes(&appwrapper)
demandPerInstanceType := r.discoverInstanceTypes(ctx, &appwrapper)
if ocmSecretRef := r.Config.OCMSecretRef; ocmSecretRef != nil {
return r.scaleMachinePool(ctx, &appwrapper, demandPerInstanceType)
} else {
Expand All @@ -137,6 +135,7 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}

func (r *AppWrapperReconciler) finalizeScalingDownMachines(ctx context.Context, appwrapper *arbv1.AppWrapper) error {
logger := ctrl.LoggerFrom(ctx)
if appwrapper.Status.State == arbv1.AppWrapperStateCompleted {
deletionMessage = "completed"
} else {
Expand All @@ -147,24 +146,41 @@ func (r *AppWrapperReconciler) finalizeScalingDownMachines(ctx context.Context,
case "reuse":
matchedAw := r.findExactMatch(ctx, appwrapper)
if matchedAw != nil {
klog.Infof("Appwrapper %s %s, swapping machines to %s", appwrapper.Name, deletionMessage, matchedAw.Name)
logger.Info(
"AppWrapper deleted transferring machines",
"oldAppWrapper", appwrapper,
"deletionMessage", deletionMessage,
"newAppWrapper", matchedAw,
)
if err := r.swapNodeLabels(ctx, appwrapper, matchedAw); err != nil {
return err
}
} else {
klog.Infof("Appwrapper %s %s, scaling down machines", appwrapper.Name, deletionMessage)
logger.Info(
"Scaling down machines associated with deleted AppWrapper",
"appWrapper", appwrapper,
"deletionMessage", deletionMessage,
)
if err := r.annotateToDeleteMachine(ctx, appwrapper); err != nil {
return err
}
}
case "duplicate":
klog.Infof("Appwrapper %s scale-down machineset: %s ", deletionMessage, appwrapper.Name)
logger.Info(
"AppWrapper deleted, scaling down machineset",
"appWrapper", appwrapper,
"deletionMessage", deletionMessage,
)
if err := r.deleteMachineSet(ctx, appwrapper); err != nil {
return err
}
}
} else {
klog.Infof("Appwrapper %s scale-down machine pool: %s ", deletionMessage, appwrapper.Name)
logger.Info(
"AppWrapper deleted, scaling down machine pool",
"appWrapper", appwrapper,
"deletionMessage", deletionMessage,
)
if _, err := r.deleteMachinePool(ctx, appwrapper); err != nil {
return err
}
Expand All @@ -175,6 +191,7 @@ func (r *AppWrapperReconciler) finalizeScalingDownMachines(ctx context.Context,
// SetupWithManager sets up the controller with the Manager.
func (r *AppWrapperReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {

logger := ctrl.LoggerFrom(ctx)
restConfig := mgr.GetConfig()

var err error
Expand All @@ -197,20 +214,21 @@ func (r *AppWrapperReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Ma
if ok, err := r.machinePoolExists(); err != nil {
return err
} else if ok {
klog.Info("Using machine pools for cluster auto-scaling")
logger.Info("Using machine pools for cluster auto-scaling")
}
}

return ctrl.NewControllerManagedBy(mgr).
For(&arbv1.AppWrapper{}).
For(&arbv1.AppWrapper{}).Named("instascale").
Complete(r)
}

func (r *AppWrapperReconciler) getOCMSecret(ctx context.Context, secretRef *corev1.SecretReference) (*corev1.Secret, error) {
return r.kubeClient.CoreV1().Secrets(secretRef.Namespace).Get(ctx, secretRef.Name, metav1.GetOptions{})
}

func (r *AppWrapperReconciler) discoverInstanceTypes(aw *arbv1.AppWrapper) map[string]int {
func (r *AppWrapperReconciler) discoverInstanceTypes(ctx context.Context, aw *arbv1.AppWrapper) map[string]int {
logger := ctrl.LoggerFrom(ctx)
demandMapPerInstanceType := make(map[string]int)
var instanceRequired []string
for k, v := range aw.Labels {
Expand All @@ -220,7 +238,10 @@ func (r *AppWrapperReconciler) discoverInstanceTypes(aw *arbv1.AppWrapper) map[s
}

if len(instanceRequired) < 1 {
klog.Infof("Found AW %s that cannot be scaled due to missing orderedinstance label", aw.ObjectMeta.Name)
logger.Info(
"AppWrapper cannot be scaled out due to missing orderedinstance label",
"appWrapper", aw,
)
return demandMapPerInstanceType
}

Expand All @@ -237,6 +258,7 @@ func (r *AppWrapperReconciler) discoverInstanceTypes(aw *arbv1.AppWrapper) map[s
}

func (r *AppWrapperReconciler) findExactMatch(ctx context.Context, aw *arbv1.AppWrapper) *arbv1.AppWrapper {
logger := ctrl.LoggerFrom(ctx)
var match *arbv1.AppWrapper = nil
appwrappers := arbv1.AppWrapperList{}

Expand All @@ -250,7 +272,7 @@ func (r *AppWrapperReconciler) findExactMatch(ctx context.Context, aw *arbv1.App

err := r.List(ctx, &appwrappers, listOptions)
if err != nil {
klog.Error("Cannot list queued appwrappers, associated machines will be deleted")
logger.Error(err, "Cannot list queued appwrappers, associated machines will be deleted")
return match
}
var existingAcquiredMachineTypes = ""
Expand All @@ -265,7 +287,11 @@ func (r *AppWrapperReconciler) findExactMatch(ctx context.Context, aw *arbv1.App
if eachAw.Status.State != arbv1.AppWrapperStateEnqueued {
if eachAw.Labels["orderedinstance"] == existingAcquiredMachineTypes {
match = &eachAw
klog.Infof("Found exact match, %v appwrapper has acquired machinetypes %v", eachAw.Name, existingAcquiredMachineTypes)
logger.Info(
"AppWrapper has successfully acquired requested machine types",
"appWrapper", eachAw,
"acquiredMachineTypes", existingAcquiredMachineTypes,
)
break
}
}
Expand Down
52 changes: 38 additions & 14 deletions controllers/machinepools.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package controllers
import (
"context"
"fmt"
"os"
"strings"

ocmsdk "github.com/openshift-online/ocm-sdk-go"
Expand All @@ -12,7 +11,6 @@ import (
arbv1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/controller/v1beta1"

"k8s.io/apimachinery/pkg/types"
"k8s.io/klog"
ctrl "sigs.k8s.io/controller-runtime"
)

Expand Down Expand Up @@ -44,9 +42,10 @@ func hasAwLabel(machinePool *cmv1.MachinePool, aw *arbv1.AppWrapper) bool {
}

func (r *AppWrapperReconciler) scaleMachinePool(ctx context.Context, aw *arbv1.AppWrapper, demandPerInstanceType map[string]int) (ctrl.Result, error) {
logger := ctrl.LoggerFrom(ctx)
connection, err := r.createOCMConnection()
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating OCM connection: %v", err)
logger.Error(err, "Error creating OCM connection")
return ctrl.Result{}, err
}
defer connection.Close()
Expand All @@ -72,28 +71,40 @@ func (r *AppWrapperReconciler) scaleMachinePool(ctx context.Context, aw *arbv1.A
if numberOfMachines != replicas {
m := make(map[string]string)
m[aw.Name] = aw.Name
klog.Infof("The instanceRequired array: %v", userRequestedInstanceType)

machinePoolID := strings.ReplaceAll(aw.Name+"-"+userRequestedInstanceType, ".", "-")
createMachinePool, err := cmv1.NewMachinePool().ID(machinePoolID).InstanceType(userRequestedInstanceType).Replicas(replicas).Labels(m).Build()
if err != nil {
klog.Errorf(`Error building MachinePool: %v`, err)
logger.Error(
err, "Error building MachinePool",
"userRequestedInstanceType", userRequestedInstanceType,
)
}
klog.Infof("Built MachinePool with instance type %v and name %v", userRequestedInstanceType, createMachinePool.ID())
logger.Info(
"Sending MachinePool creation request",
"instanceType", userRequestedInstanceType,
"machinePoolName", createMachinePool.ID(),
)
response, err := clusterMachinePools.Add().Body(createMachinePool).SendContext(ctx)
if err != nil {
klog.Errorf(`Error creating MachinePool: %v`, err)
logger.Error(err, "Error creating MachinePool")
} else {
logger.Info(
"Successfully created MachinePool",
"machinePoolName", createMachinePool.ID(),
"response", response,
)
}
klog.Infof("Created MachinePool: %v", response)
}
}
return ctrl.Result{Requeue: false}, nil
}

func (r *AppWrapperReconciler) deleteMachinePool(ctx context.Context, aw *arbv1.AppWrapper) (ctrl.Result, error) {
logger := ctrl.LoggerFrom(ctx)
connection, err := r.createOCMConnection()
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating OCM connection: %v", err)
logger.Error(err, "Error creating OCM connection")
return ctrl.Result{}, err
}
defer connection.Close()
Expand All @@ -107,9 +118,16 @@ func (r *AppWrapperReconciler) deleteMachinePool(ctx context.Context, aw *arbv1.
if strings.Contains(id, aw.Name) {
targetMachinePool, err := connection.ClustersMgmt().V1().Clusters().Cluster(r.ocmClusterID).MachinePools().MachinePool(id).Delete().SendContext(ctx)
if err != nil {
klog.Infof("Error deleting target machinepool %v", targetMachinePool)
logger.Error(
err, "Error deleting machinepool",
"machinePool", targetMachinePool,
)
} else {
logger.Info(
"Successfully scaled down target machinepool",
"machinePool", targetMachinePool,
)
}
klog.Infof("Successfully Scaled down target machinepool %v", id)
}
return true
})
Expand All @@ -129,6 +147,7 @@ func (r *AppWrapperReconciler) machinePoolExists() (bool, error) {

// getOCMClusterID determines the internal clusterID to be used for OCM API calls
func (r *AppWrapperReconciler) getOCMClusterID(ctx context.Context) error {
logger := ctrl.LoggerFrom(ctx)
cv := &configv1.ClusterVersion{}
err := r.Get(ctx, types.NamespacedName{Name: "version"}, cv)
if err != nil {
Expand All @@ -139,7 +158,7 @@ func (r *AppWrapperReconciler) getOCMClusterID(ctx context.Context) error {

connection, err := r.createOCMConnection()
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating OCM connection: %v", err)
logger.Error(err, "Error creating OCM connection")
}
defer connection.Close()

Expand All @@ -148,12 +167,17 @@ func (r *AppWrapperReconciler) getOCMClusterID(ctx context.Context) error {

response, err := collection.List().Search(fmt.Sprintf("external_id = '%s'", internalClusterID)).Size(1).Page(1).SendContext(ctx)
if err != nil {
klog.Errorf(`Error getting cluster id: %v`, err)
logger.Error(err, "Error getting cluster id")
}

response.Items().Each(func(cluster *cmv1.Cluster) bool {
r.ocmClusterID = cluster.ID()
fmt.Printf("%s - %s - %s\n", cluster.ID(), cluster.Name(), cluster.State())
logger.Info(
"Cluster Info",
"clusterId", cluster.ID(),
"clusterName", cluster.Name(),
"clusterState", cluster.State(),
)
return true
})
return nil
Expand Down
Loading