Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: dropping data and some refactoring #307

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ vet: ## Run go vet against code.

.PHONY: test
test: manifests generate fmt vet envtest ## Run tests.
GOBIN=$(LOCALBIN) go install github.com/onsi/ginkgo/v2/ginkgo@v2.12.1
GOBIN=$(LOCALBIN) go install github.com/onsi/ginkgo/v2/ginkgo@v2.17.2
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" $(GINKGO) -vv -r -p -coverprofile cover.out

##@ Build
Expand Down Expand Up @@ -168,4 +168,4 @@ $(CONTROLLER_GEN): $(LOCALBIN)
.PHONY: envtest
envtest: $(ENVTEST) ## Download envtest-setup locally if necessary.
$(ENVTEST): $(LOCALBIN)
test -s $(LOCALBIN)/setup-envtest || GOBIN=$(LOCALBIN) go install sigs.k8s.io/controller-runtime/tools/setup-envtest@latest
test -s $(LOCALBIN)/setup-envtest || GOBIN=$(LOCALBIN) go install sigs.k8s.io/controller-runtime/tools/setup-envtest@release-0.18
16 changes: 10 additions & 6 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,22 @@ func main() {
defer eventBroadcaster.Shutdown()

if err = (&controller.DragonflyReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
EventRecorder: eventRecorder,
Reconciler: controller.Reconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
EventRecorder: eventRecorder,
},
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Dragonfly")
os.Exit(1)
}

if err = (&controller.DfPodLifeCycleReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
EventRecorder: eventRecorder,
Reconciler: controller.Reconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
EventRecorder: eventRecorder,
},
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Health")
os.Exit(1)
Expand Down
12 changes: 6 additions & 6 deletions e2e/dragonfly_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
// Check if there are relevant pods with expected roles
var pods corev1.PodList
err := k8sClient.List(ctx, &pods, client.InNamespace(namespace), client.MatchingLabels{
"app": name,
resources.DragonflyNameLabelKey: name,
resources.KubernetesPartOfLabelKey: "dragonfly",
})
Expect(err).To(BeNil())
Expand Down Expand Up @@ -231,7 +231,7 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
// Check if there are relevant pods with expected roles
var pods corev1.PodList
err = k8sClient.List(ctx, &pods, client.InNamespace(namespace), client.MatchingLabels{
"app": name,
resources.DragonflyNameLabelKey: name,
resources.KubernetesPartOfLabelKey: "dragonfly",
})
Expect(err).To(BeNil())
Expand Down Expand Up @@ -274,7 +274,7 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
// Check if there are relevant pods with expected roles
var pods corev1.PodList
err = k8sClient.List(ctx, &pods, client.InNamespace(namespace), client.MatchingLabels{
"app": name,
resources.DragonflyNameLabelKey: name,
resources.KubernetesPartOfLabelKey: "dragonfly",
})
Expect(err).To(BeNil())
Expand Down Expand Up @@ -331,7 +331,7 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
// Check if there are relevant pods with expected roles
var pods corev1.PodList
err = k8sClient.List(ctx, &pods, client.InNamespace(namespace), client.MatchingLabels{
"app": name,
resources.DragonflyNameLabelKey: name,
resources.KubernetesPartOfLabelKey: "dragonfly",
})
Expect(err).To(BeNil())
Expand Down Expand Up @@ -456,7 +456,7 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
// check for pods too
var pods corev1.PodList
err = k8sClient.List(ctx, &pods, client.InNamespace(namespace), client.MatchingLabels{
"app": name,
resources.DragonflyNameLabelKey: name,
resources.KubernetesPartOfLabelKey: "dragonfly",
})
Expect(err).To(BeNil())
Expand Down Expand Up @@ -716,7 +716,7 @@ var _ = Describe("Dragonfly PVC Test with single replica", Ordered, FlakeAttempt
// check if the pvc is created
var pvcs corev1.PersistentVolumeClaimList
err = k8sClient.List(ctx, &pvcs, client.InNamespace(namespace), client.MatchingLabels{
"app": name,
resources.DragonflyNameLabelKey: name,
resources.KubernetesPartOfLabelKey: "dragonfly",
})
Expect(err).To(BeNil())
Expand Down
6 changes: 3 additions & 3 deletions e2e/dragonfly_pod_lifecycle_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ var _ = Describe("DF Pod Lifecycle Reconciler", Ordered, FlakeAttempts(3), func(
// Check if there are relevant pods with expected roles
var pods corev1.PodList
err = k8sClient.List(ctx, &pods, client.InNamespace(namespace), client.MatchingLabels{
"app": name,
resources.DragonflyNameLabelKey: name,
resources.KubernetesPartOfLabelKey: "dragonfly",
})
Expect(err).To(BeNil())
Expand Down Expand Up @@ -136,7 +136,7 @@ var _ = Describe("DF Pod Lifecycle Reconciler", Ordered, FlakeAttempts(3), func(
// Check if there are relevant pods with expected roles
var pods corev1.PodList
err = k8sClient.List(ctx, &pods, client.InNamespace(namespace), client.MatchingLabels{
"app": name,
resources.DragonflyNameLabelKey: name,
resources.KubernetesPartOfLabelKey: "dragonfly",
})
Expect(err).To(BeNil())
Expand Down Expand Up @@ -184,7 +184,7 @@ var _ = Describe("DF Pod Lifecycle Reconciler", Ordered, FlakeAttempts(3), func(
// Check if there are relevant pods with expected roles
var pods corev1.PodList
err = k8sClient.List(ctx, &pods, client.InNamespace(namespace), client.MatchingLabels{
"app": name,
resources.DragonflyNameLabelKey: name,
resources.KubernetesPartOfLabelKey: "dragonfly",
})
Expect(err).To(BeNil())
Expand Down
63 changes: 63 additions & 0 deletions internal/controller/base_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
Copyright 2023 DragonflyDB authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller

import (
"context"
dfv1alpha1 "github.com/dragonflydb/dragonfly-operator/api/v1alpha1"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type (
Reconciler struct {
Client client.Client // Explicitly named
Scheme *runtime.Scheme
EventRecorder record.EventRecorder
}
// DragonflyInstance is an abstraction over the `Dragonfly` CRD
// and provides methods to handle replication.
DragonflyInstance struct {
// Dragonfly is the relevant Dragonfly CRD that it performs actions over
df *dfv1alpha1.Dragonfly

client client.Client
scheme *runtime.Scheme
eventRecorder record.EventRecorder
log logr.Logger
}
)

func (r *Reconciler) getDragonflyInstance(ctx context.Context, namespacedName types.NamespacedName, log logr.Logger) (*DragonflyInstance, error) {
// Retrieve the relevant Dragonfly object
var df dfv1alpha1.Dragonfly
err := r.Client.Get(ctx, namespacedName, &df)
if err != nil {
return nil, err
}

return &DragonflyInstance{
df: &df,
client: r.Client,
scheme: r.Scheme,
eventRecorder: r.EventRecorder,
log: log,
}, nil
}
37 changes: 16 additions & 21 deletions internal/controller/dragonfly_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -37,10 +35,7 @@ import (

// DragonflyReconciler reconciles a Dragonfly object
type DragonflyReconciler struct {
client.Client
Scheme *runtime.Scheme

EventRecorder record.EventRecorder
Reconciler
}

//+kubebuilder:rbac:groups=dragonflydb.io,resources=dragonflies,verbs=get;list;watch;create;update;patch;delete
Expand All @@ -59,7 +54,7 @@ type DragonflyReconciler struct {
func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)
var df dfv1alpha1.Dragonfly
if err := r.Get(ctx, req.NamespacedName, &df); err != nil {
if err := r.Client.Get(ctx, req.NamespacedName, &df); err != nil {
log.Info(fmt.Sprintf("could not get the Dragonfly object: %s", req.NamespacedName))
return ctrl.Result{}, client.IgnoreNotFound(err)
}
Expand All @@ -76,7 +71,7 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (

// create all resources
for _, resource := range resources {
if err := r.Create(ctx, resource); err != nil {
if err := r.Client.Create(ctx, resource); err != nil {
log.Error(err, fmt.Sprintf("could not create resource %s/%s/%s", resource.GetObjectKind(), resource.GetNamespace(), resource.GetName()))
return ctrl.Result{}, err
}
Expand All @@ -85,7 +80,7 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
// Update Status
df.Status.Phase = PhaseResourcesCreated
log.Info("Created resources for object")
if err := r.Status().Update(ctx, &df); err != nil {
if err := r.Client.Status().Update(ctx, &df); err != nil {
log.Error(err, "could not update the Dragonfly object")
return ctrl.Result{}, err
}
Expand All @@ -102,14 +97,14 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
}
for _, resource := range missingResources {
// recreate missing resources
if err := r.Create(ctx, resource); err != nil {
if err := r.Client.Create(ctx, resource); err != nil {
log.Error(err, fmt.Sprintf("could not create resource %s/%s/%s", resource.GetObjectKind(), resource.GetNamespace(), resource.GetName()))
return ctrl.Result{}, err
}
}

var statefulSet appsv1.StatefulSet
if err := r.Get(ctx, client.ObjectKey{Namespace: df.Namespace, Name: df.Name}, &statefulSet); err != nil {
if err := r.Client.Get(ctx, client.ObjectKey{Namespace: df.Namespace, Name: df.Name}, &statefulSet); err != nil {
log.Error(err, "could not get statefulset")
return ctrl.Result{}, err
}
Expand All @@ -125,7 +120,7 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (

// update all resources
for _, resource := range newResources {
if err := r.Update(ctx, resource); err != nil {
if err := r.Client.Update(ctx, resource); err != nil {
log.Error(err, fmt.Sprintf("could not update resource %s/%s/%s", resource.GetObjectKind(), resource.GetNamespace(), resource.GetName()))
return ctrl.Result{}, err
}
Expand All @@ -138,15 +133,15 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
// This is a Rollout
log.Info("Rolling out new version")
var updatedStatefulset appsv1.StatefulSet
if err := r.Get(ctx, client.ObjectKey{Namespace: df.Namespace, Name: df.Name}, &updatedStatefulset); err != nil {
if err := r.Client.Get(ctx, client.ObjectKey{Namespace: df.Namespace, Name: df.Name}, &updatedStatefulset); err != nil {
log.Error(err, "could not get statefulset")
return ctrl.Result{Requeue: true}, err
}

// get pods of the statefulset
var pods corev1.PodList
if err := r.List(ctx, &pods, client.InNamespace(df.Namespace), client.MatchingLabels(map[string]string{
"app": df.Name,
if err := r.Client.List(ctx, &pods, client.InNamespace(df.Namespace), client.MatchingLabels(map[string]string{
resources.DragonflyNameLabelKey: df.Name,
resources.KubernetesAppNameLabelKey: "dragonfly",
})); err != nil {
log.Error(err, "could not list pods")
Expand All @@ -173,7 +168,7 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
if isFailedToStart(&pod) {
// This is a new pod which is trying to be ready, but couldn't start due to misconfig.
// Delete the pod and create a new one.
if err := r.Delete(ctx, &pod); err != nil {
if err := r.Client.Delete(ctx, &pod); err != nil {
log.Error(err, "could not delete pod")
return ctrl.Result{RequeueAfter: 5 * time.Second}, err
}
Expand Down Expand Up @@ -228,7 +223,7 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
// delete the replica
log.Info("deleting replica", "pod", replica.Name)
r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Rollout", "Deleting replica")
if err := r.Delete(ctx, &replica); err != nil {
if err := r.Client.Delete(ctx, &replica); err != nil {
log.Error(err, "could not delete pod")
return ctrl.Result{RequeueAfter: 5 * time.Second}, err
}
Expand Down Expand Up @@ -268,7 +263,7 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (

// delete the old master, so that it gets recreated with the new version
log.Info("deleting master", "pod", master.Name)
if err := r.Delete(ctx, &master); err != nil {
if err := r.Client.Delete(ctx, &master); err != nil {
log.Error(err, "could not delete pod")
return ctrl.Result{RequeueAfter: 5 * time.Second}, err
}
Expand All @@ -279,7 +274,7 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (

// update status
df.Status.IsRollingUpdate = false
if err := r.Status().Update(ctx, &df); err != nil {
if err := r.Client.Status().Update(ctx, &df); err != nil {
log.Error(err, "could not update the Dragonfly object")
return ctrl.Result{Requeue: true}, err
}
Expand All @@ -295,7 +290,7 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
// Start rollout and update status
// update status so that we can track progress
df.Status.IsRollingUpdate = true
if err := r.Status().Update(ctx, &df); err != nil {
if err := r.Client.Status().Update(ctx, &df); err != nil {
log.Error(err, "could not update the Dragonfly object")
return ctrl.Result{Requeue: true}, err
}
Expand Down Expand Up @@ -332,7 +327,7 @@ func (r *DragonflyReconciler) getMissingResources(ctx context.Context, df *dfv1a
for _, resource := range resources {
obj := resource.DeepCopyObject().(client.Object)

err := r.Get(ctx, client.ObjectKey{
err := r.Client.Get(ctx, client.ObjectKey{
Namespace: df.Namespace,
Name: resource.GetName(),
}, obj)
Expand Down
Loading