Skip to content

CLOUDP-328219: Flex to Dedicated reconciliation #2526

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ jobs:
"dry-run",
"networkcontainer-controller",
"networkpeering-controller",
"flex-to-dedicated",
]
steps:
- uses: actions/checkout@v4
Expand Down
1 change: 1 addition & 0 deletions internal/controller/atlas/api_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
// ServerlessClusterFromClusterAPI indicates that we are trying to access
// a serverless instance from the cluster API, which is not allowed
ServerlessInstanceFromClusterAPI = "CANNOT_USE_SERVERLESS_INSTANCE_IN_CLUSTER_API"
FlexFromClusterAPI = "CANNOT_USE_FLEX_CLUSTER_IN_CLUSTER_API"

ClusterInstanceFromServerlessAPI = "CANNOT_USE_CLUSTER_IN_SERVERLESS_INSTANCE_API"

Expand Down
20 changes: 19 additions & 1 deletion internal/controller/atlasdeployment/advanced_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,29 @@ import (
const FreeTier = "M0"

func (r *AtlasDeploymentReconciler) handleAdvancedDeployment(ctx *workflow.Context, projectService project.ProjectService, deploymentService deployment.AtlasDeploymentsService, akoDeployment, atlasDeployment deployment.Deployment) (ctrl.Result, error) {
if akoDeployment.GetCustomResource().Spec.UpgradeToDedicated && !atlasDeployment.IsDedicated() {
if atlasDeployment.GetState() == status.StateUPDATING {
return r.inProgress(ctx, akoDeployment.GetCustomResource(), atlasDeployment, workflow.DeploymentUpdating, "deployment is updating")
}

updatedDeployment, err := deploymentService.UpgradeToDedicated(ctx.Context, atlasDeployment, akoDeployment)

if err != nil {
return r.terminate(ctx, workflow.DedicatedMigrationFailed, fmt.Errorf("failed to upgrade cluster: %w", err))
}

return r.inProgress(ctx, akoDeployment.GetCustomResource(), updatedDeployment, workflow.DedicatedMigrationProgressing, "Cluster upgrade to dedicated instance initiated in Atlas. The process may take several minutes")
}

akoCluster, ok := akoDeployment.(*deployment.Cluster)
if !ok {
return r.terminate(ctx, workflow.Internal, errors.New("deployment in AKO is not an advanced cluster"))
}
atlasCluster, _ := atlasDeployment.(*deployment.Cluster)

var atlasCluster *deployment.Cluster
if atlasCluster, ok = atlasDeployment.(*deployment.Cluster); atlasDeployment != nil && !ok {
return r.terminate(ctx, workflow.Internal, errors.New("deployment in Atlas is not an advanced cluster"))
}

if atlasCluster == nil {
ctx.Log.Infof("Advanced Deployment %s doesn't exist in Atlas - creating", akoCluster.GetName())
Expand Down
177 changes: 176 additions & 1 deletion internal/controller/atlasdeployment/advanced_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package atlasdeployment
import (
"context"
"errors"
"fmt"
"testing"

"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -49,7 +50,7 @@ func TestHandleAdvancedDeployment(t *testing.T) {
}
tests := map[string]struct {
atlasDeployment *akov2.AtlasDeployment
deploymentInAtlas *deployment.Cluster
deploymentInAtlas deployment.Deployment
deploymentService func() deployment.AtlasDeploymentsService
sdkMock func() *admin.APIClient
expectedResult workflowRes
Expand Down Expand Up @@ -779,6 +780,180 @@ func TestHandleAdvancedDeployment(t *testing.T) {
WithMessageRegexp("deployment is updating"),
},
},
"fail to upgrade a shared cluster to dedicated": {
atlasDeployment: &akov2.AtlasDeployment{
Spec: akov2.AtlasDeploymentSpec{
UpgradeToDedicated: true,
DeploymentSpec: &akov2.AdvancedDeploymentSpec{
Name: "cluster0",
ClusterType: "REPLICASET",
ReplicationSpecs: []*akov2.AdvancedReplicationSpec{
{
RegionConfigs: []*akov2.AdvancedRegionConfig{
{
ProviderName: "AWS",
RegionName: "US_WEST_1",
Priority: pointer.MakePtr(7),
ElectableSpecs: &akov2.Specs{
InstanceSize: "M20",
NodeCount: pointer.MakePtr(3),
},
},
},
},
},
},
},
},
deploymentInAtlas: &deployment.Flex{
ProjectID: "project-id",
State: "IDLE",
FlexSpec: &akov2.FlexSpec{
Name: "cluster0",
ProviderSettings: &akov2.FlexProviderSettings{
RegionName: "US_EAST_1",
BackingProviderName: "AWS",
},
},
},
deploymentService: func() deployment.AtlasDeploymentsService {
service := translation.NewAtlasDeploymentsServiceMock(t)
service.EXPECT().UpgradeToDedicated(context.Background(), mock.AnythingOfType("*deployment.Flex"), mock.AnythingOfType("*deployment.Cluster")).
Return(nil, errors.New("failed to update cluster"))

return service
},
sdkMock: func() *admin.APIClient {
return &admin.APIClient{}
},
expectedResult: workflowRes{
res: ctrl.Result{RequeueAfter: workflow.DefaultRetry},
err: fmt.Errorf("failed to upgrade cluster: %w", errors.New("failed to update cluster")),
},
expectedConditions: []api.Condition{
api.FalseCondition(api.DeploymentReadyType).
WithReason(string(workflow.DedicatedMigrationFailed)).
WithMessageRegexp("failed to upgrade cluster: failed to update cluster"),
},
},
"watch upgrade progress": {
atlasDeployment: &akov2.AtlasDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster0",
Namespace: "test",
},
Spec: akov2.AtlasDeploymentSpec{
UpgradeToDedicated: true,
DeploymentSpec: &akov2.AdvancedDeploymentSpec{
Name: "cluster0",
ClusterType: "REPLICASET",
ReplicationSpecs: []*akov2.AdvancedReplicationSpec{
{
RegionConfigs: []*akov2.AdvancedRegionConfig{
{
ProviderName: "AWS",
RegionName: "US_WEST_1",
Priority: pointer.MakePtr(7),
ElectableSpecs: &akov2.Specs{
InstanceSize: "M20",
NodeCount: pointer.MakePtr(3),
},
},
},
},
},
},
},
},
deploymentInAtlas: &deployment.Flex{
ProjectID: "project-id",
State: "UPDATING",
FlexSpec: &akov2.FlexSpec{
Name: "cluster0",
ProviderSettings: &akov2.FlexProviderSettings{
RegionName: "US_EAST_1",
BackingProviderName: "AWS",
},
},
},
deploymentService: func() deployment.AtlasDeploymentsService {
service := translation.NewAtlasDeploymentsServiceMock(t)

return service
},
sdkMock: func() *admin.APIClient {
return &admin.APIClient{}
},
expectedResult: workflowRes{
res: ctrl.Result{RequeueAfter: workflow.DefaultRetry},
err: nil,
},
expectedConditions: []api.Condition{
api.FalseCondition(api.DeploymentReadyType).
WithReason(string(workflow.DeploymentUpdating)).
WithMessageRegexp("deployment is updating"),
},
},
"upgrade a flex to dedicated cluster": {
atlasDeployment: &akov2.AtlasDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster0",
Namespace: "test",
},
Spec: akov2.AtlasDeploymentSpec{
UpgradeToDedicated: true,
DeploymentSpec: &akov2.AdvancedDeploymentSpec{
Name: "cluster0",
ClusterType: "REPLICASET",
ReplicationSpecs: []*akov2.AdvancedReplicationSpec{
{
RegionConfigs: []*akov2.AdvancedRegionConfig{
{
ProviderName: "AWS",
RegionName: "US_WEST_1",
Priority: pointer.MakePtr(7),
ElectableSpecs: &akov2.Specs{
InstanceSize: "M20",
NodeCount: pointer.MakePtr(3),
},
},
},
},
},
},
},
},
deploymentInAtlas: &deployment.Flex{
ProjectID: "project-id",
State: "IDLE",
FlexSpec: &akov2.FlexSpec{
Name: "cluster0",
ProviderSettings: &akov2.FlexProviderSettings{
RegionName: "US_EAST_1",
BackingProviderName: "AWS",
},
},
},
deploymentService: func() deployment.AtlasDeploymentsService {
service := translation.NewAtlasDeploymentsServiceMock(t)
service.EXPECT().UpgradeToDedicated(context.Background(), mock.AnythingOfType("*deployment.Flex"), mock.AnythingOfType("*deployment.Cluster")).
Return(&deployment.Flex{}, nil)

return service
},
sdkMock: func() *admin.APIClient {
return &admin.APIClient{}
},
expectedResult: workflowRes{
res: ctrl.Result{RequeueAfter: workflow.DefaultRetry},
err: nil,
},
expectedConditions: []api.Condition{
api.FalseCondition(api.DeploymentReadyType).
WithReason(string(workflow.DedicatedMigrationProgressing)).
WithMessageRegexp("Cluster upgrade to dedicated instance initiated in Atlas. The process may take several minutes"),
},
},
}

for name, tt := range tests {
Expand Down
44 changes: 25 additions & 19 deletions internal/controller/atlasdeployment/atlasdeployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,10 @@ func (r *AtlasDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Requ

deploymentInAKO := deployment.NewDeployment(atlasProject.ID, atlasDeployment)

if ok, deprecationMsg := deploymentInAKO.Deprecated(); ok {
if ok, notificationReason, notificationMsg := deploymentInAKO.Notifications(); ok {
// emit Log and event
r.Log.Log(zapcore.WarnLevel, deprecationMsg)
r.EventRecorder.Event(deploymentInAKO.GetCustomResource(), corev1.EventTypeWarning, "DeprecationWarning", deprecationMsg)
r.Log.Log(zapcore.WarnLevel, notificationMsg)
r.EventRecorder.Event(deploymentInAKO.GetCustomResource(), corev1.EventTypeWarning, notificationReason, notificationMsg)
}
deploymentInAtlas, err := deploymentService.GetDeployment(workflowCtx.Context, atlasProject.ID, atlasDeployment)

Expand All @@ -173,7 +173,7 @@ func (r *AtlasDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Requ
existsInAtlas := deploymentInAtlas != nil
if !atlasDeployment.GetDeletionTimestamp().IsZero() {
if existsInAtlas {
return r.delete(workflowCtx, deploymentService, deploymentInAKO)
return r.delete(workflowCtx, deploymentService, deploymentInAKO, deploymentInAtlas)
}
return r.unmanage(workflowCtx, deploymentInAKO)
}
Expand All @@ -195,28 +195,29 @@ func (r *AtlasDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Requ
func (r *AtlasDeploymentReconciler) delete(
ctx *workflow.Context,
deploymentService deployment.AtlasDeploymentsService,
deployment deployment.Deployment, // this must be the original non converted deployment
deploymentInAKO deployment.Deployment, // this must be the original non converted deployment
deploymentInAtlas deployment.Deployment, // this must be the original non converted deployment
Comment on lines +198 to +199
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why 2 entries now?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use the type defined in the manifest to decide which API call on deletion (regular/flex/serverless). As now, even during a short transition period we can have Atlas and AKO different from each other (Atlas is flex, AKO is dedicated, for example), we should still be able to call the right API based on what is in Atlas, not on what manifest defines.

) (ctrl.Result, error) {
if err := r.cleanupBindings(ctx.Context, deployment); err != nil {
if err := r.cleanupBindings(ctx.Context, deploymentInAKO); err != nil {
return r.terminate(ctx, workflow.Internal, fmt.Errorf("failed to cleanup deployment bindings (backups): %w", err))
}

switch {
case customresource.IsResourcePolicyKeepOrDefault(deployment.GetCustomResource(), r.ObjectDeletionProtection):
case customresource.IsResourcePolicyKeepOrDefault(deploymentInAKO.GetCustomResource(), r.ObjectDeletionProtection):
ctx.Log.Info("Not removing Atlas deployment from Atlas as per configuration")
case customresource.IsResourcePolicyKeep(deployment.GetCustomResource()):
case customresource.IsResourcePolicyKeep(deploymentInAKO.GetCustomResource()):
ctx.Log.Infof("Not removing Atlas deployment from Atlas as the '%s' annotation is set", customresource.ResourcePolicyAnnotation)
case isTerminationProtectionEnabled(deployment.GetCustomResource()):
msg := fmt.Sprintf("Termination protection for %s deployment enabled. Deployment in Atlas won't be removed", deployment.GetName())
case isTerminationProtectionEnabled(deploymentInAKO.GetCustomResource()):
msg := fmt.Sprintf("Termination protection for %s deployment enabled. Deployment in Atlas won't be removed", deploymentInAKO.GetName())
ctx.Log.Info(msg)
r.EventRecorder.Event(deployment.GetCustomResource(), "Warning", "AtlasDeploymentTermination", msg)
r.EventRecorder.Event(deploymentInAKO.GetCustomResource(), "Warning", "AtlasDeploymentTermination", msg)
default:
if err := r.deleteDeploymentFromAtlas(ctx, deploymentService, deployment); err != nil {
if err := r.deleteDeploymentFromAtlas(ctx, deploymentService, deploymentInAKO, deploymentInAtlas); err != nil {
return r.terminate(ctx, workflow.Internal, fmt.Errorf("failed to remove deployment from Atlas: %w", err))
}
}

if err := customresource.ManageFinalizer(ctx.Context, r.Client, deployment.GetCustomResource(), customresource.UnsetFinalizer); err != nil {
if err := customresource.ManageFinalizer(ctx.Context, r.Client, deploymentInAKO.GetCustomResource(), customresource.UnsetFinalizer); err != nil {
return r.terminate(ctx, workflow.Internal, fmt.Errorf("failed to remove finalizer: %w", err))
}

Expand All @@ -235,15 +236,20 @@ func isTerminationProtectionEnabled(deployment *akov2.AtlasDeployment) bool {
deployment.Spec.ServerlessSpec.TerminationProtectionEnabled)
}

func (r *AtlasDeploymentReconciler) deleteDeploymentFromAtlas(ctx *workflow.Context, deploymentService deployment.AtlasDeploymentsService, deployment deployment.Deployment) error {
ctx.Log.Infow("-> Starting AtlasDeployment deletion", "spec", deployment)
func (r *AtlasDeploymentReconciler) deleteDeploymentFromAtlas(
ctx *workflow.Context,
deploymentService deployment.AtlasDeploymentsService,
deploymentInAKO deployment.Deployment,
deploymentInAtlas deployment.Deployment,
) error {
ctx.Log.Infow("-> Starting AtlasDeployment deletion", "spec", deploymentInAKO)

err := r.deleteConnectionStrings(ctx, deployment)
err := r.deleteConnectionStrings(ctx, deploymentInAKO)
if err != nil {
return err
}

err = deploymentService.DeleteDeployment(ctx.Context, deployment)
err = deploymentService.DeleteDeployment(ctx.Context, deploymentInAtlas)
if err != nil {
ctx.Log.Errorw("Cannot delete Atlas deployment", "error", err)
return err
Expand Down Expand Up @@ -349,10 +355,10 @@ func (r *AtlasDeploymentReconciler) ready(ctx *workflow.Context, deploymentInAKO
return r.terminate(ctx, workflow.AtlasFinalizerNotSet, err)
}

_, deprecationMsg := deploymentInAKO.Deprecated()
_, _, notificationMsg := deploymentInAKO.Notifications()

ctx.SetConditionTrue(api.DeploymentReadyType).
SetConditionTrueMsg(api.ReadyType, deprecationMsg).
SetConditionTrueMsg(api.ReadyType, notificationMsg).
EnsureStatusOption(status.AtlasDeploymentStateNameOption(deploymentInAtlas.GetState())).
EnsureStatusOption(status.AtlasDeploymentReplicaSet(deploymentInAtlas.GetReplicaSet())).
EnsureStatusOption(status.AtlasDeploymentMongoDBVersionOption(deploymentInAtlas.GetMongoDBVersion())).
Expand Down
Loading
Loading