diff --git a/internal/controller/atlasbackupcompliancepolicy/atlasbackupcompliancepolicy_controller.go b/internal/controller/atlasbackupcompliancepolicy/atlasbackupcompliancepolicy_controller.go index ff90dfe672..f1543d32f0 100644 --- a/internal/controller/atlasbackupcompliancepolicy/atlasbackupcompliancepolicy_controller.go +++ b/internal/controller/atlasbackupcompliancepolicy/atlasbackupcompliancepolicy_controller.go @@ -52,6 +52,7 @@ type AtlasBackupCompliancePolicyReconciler struct { Log *zap.SugaredLogger ObjectDeletionProtection bool SubObjectDeletionProtection bool + maxConcurrentReconciles int } // +kubebuilder:rbac:groups=atlas.mongodb.com,resources=atlasbackupcompliancepolicies,verbs=get;list;watch;create;update;patch;delete @@ -125,18 +126,13 @@ func (r *AtlasBackupCompliancePolicyReconciler) SetupWithManager(mgr ctrl.Manage builder.WithPredicates(predicate.GenerationChangedPredicate{}), ). WithOptions(controller.TypedOptions[reconcile.Request]{ - RateLimiter: ratelimit.NewRateLimiter[reconcile.Request](), - SkipNameValidation: pointer.MakePtr(skipNameValidation)}). + RateLimiter: ratelimit.NewRateLimiter[reconcile.Request](), + SkipNameValidation: pointer.MakePtr(skipNameValidation), + MaxConcurrentReconciles: r.maxConcurrentReconciles}). Complete(r) } -func NewAtlasBackupCompliancePolicyReconciler( - c cluster.Cluster, - predicates []predicate.Predicate, - atlasProvider atlas.Provider, - deletionProtection bool, - logger *zap.Logger, -) *AtlasBackupCompliancePolicyReconciler { +func NewAtlasBackupCompliancePolicyReconciler(c cluster.Cluster, predicates []predicate.Predicate, atlasProvider atlas.Provider, deletionProtection bool, logger *zap.Logger, maxConcurrentReconciles int) *AtlasBackupCompliancePolicyReconciler { return &AtlasBackupCompliancePolicyReconciler{ Scheme: c.GetScheme(), Client: c.GetClient(), @@ -145,6 +141,7 @@ func NewAtlasBackupCompliancePolicyReconciler( Log: logger.Named("controllers").Named("AtlasBackupCompliancePolicy").Sugar(), AtlasProvider: atlasProvider, ObjectDeletionProtection: deletionProtection, + maxConcurrentReconciles: maxConcurrentReconciles, } } diff --git a/internal/controller/atlascustomrole/atlascustomrole_controller.go b/internal/controller/atlascustomrole/atlascustomrole_controller.go index 9b4a151a3d..2432b71b06 100644 --- a/internal/controller/atlascustomrole/atlascustomrole_controller.go +++ b/internal/controller/atlascustomrole/atlascustomrole_controller.go @@ -55,17 +55,10 @@ type AtlasCustomRoleReconciler struct { ObjectDeletionProtection bool SubObjectDeletionProtection bool independentSyncPeriod time.Duration + maxConcurrentReconciles int } -func NewAtlasCustomRoleReconciler( - c cluster.Cluster, - predicates []predicate.Predicate, - atlasProvider atlas.Provider, - deletionProtection bool, - independentSyncPeriod time.Duration, - logger *zap.Logger, - globalSecretRef client.ObjectKey, -) *AtlasCustomRoleReconciler { +func NewAtlasCustomRoleReconciler(c cluster.Cluster, predicates []predicate.Predicate, atlasProvider atlas.Provider, deletionProtection bool, independentSyncPeriod time.Duration, logger *zap.Logger, globalSecretRef client.ObjectKey, maxConcurrentReconciles int) *AtlasCustomRoleReconciler { return &AtlasCustomRoleReconciler{ AtlasReconciler: reconciler.AtlasReconciler{ Client: c.GetClient(), @@ -78,6 +71,7 @@ func NewAtlasCustomRoleReconciler( GlobalPredicates: predicates, ObjectDeletionProtection: deletionProtection, independentSyncPeriod: independentSyncPeriod, + maxConcurrentReconciles: maxConcurrentReconciles, } } @@ -218,8 +212,9 @@ func (r *AtlasCustomRoleReconciler) SetupWithManager(mgr ctrl.Manager, skipNameV handler.EnqueueRequestsFromMapFunc(r.customRolesCredentials()), builder.WithPredicates(predicate.ResourceVersionChangedPredicate{})). WithOptions(controller.TypedOptions[reconcile.Request]{ - RateLimiter: ratelimit.NewRateLimiter[reconcile.Request](), - SkipNameValidation: pointer.MakePtr(skipNameValidation)}). + RateLimiter: ratelimit.NewRateLimiter[reconcile.Request](), + SkipNameValidation: pointer.MakePtr(skipNameValidation), + MaxConcurrentReconciles: r.maxConcurrentReconciles}). Complete(r) } diff --git a/internal/controller/atlasdatabaseuser/atlasdatabaseuser_controller.go b/internal/controller/atlasdatabaseuser/atlasdatabaseuser_controller.go index bcffe50dde..a23fc733fd 100644 --- a/internal/controller/atlasdatabaseuser/atlasdatabaseuser_controller.go +++ b/internal/controller/atlasdatabaseuser/atlasdatabaseuser_controller.go @@ -60,6 +60,7 @@ type AtlasDatabaseUserReconciler struct { ObjectDeletionProtection bool SubObjectDeletionProtection bool independentSyncPeriod time.Duration + maxConcurrentReconciles int } // +kubebuilder:rbac:groups=atlas.mongodb.com,resources=atlasdatabaseusers,verbs=get;list;watch;create;update;patch;delete @@ -221,8 +222,9 @@ func (r *AtlasDatabaseUserReconciler) SetupWithManager(mgr ctrl.Manager, skipNam builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}), ). WithOptions(controller.TypedOptions[reconcile.Request]{ - RateLimiter: ratelimit.NewRateLimiter[reconcile.Request](), - SkipNameValidation: pointer.MakePtr(skipNameValidation)}). + RateLimiter: ratelimit.NewRateLimiter[reconcile.Request](), + SkipNameValidation: pointer.MakePtr(skipNameValidation), + MaxConcurrentReconciles: r.maxConcurrentReconciles}). Complete(r) } @@ -273,16 +275,7 @@ func (r *AtlasDatabaseUserReconciler) databaseUsersForCredentialMapFunc() handle ) } -func NewAtlasDatabaseUserReconciler( - c cluster.Cluster, - predicates []predicate.Predicate, - atlasProvider atlas.Provider, - deletionProtection bool, - independentSyncPeriod time.Duration, - featureFlags *featureflags.FeatureFlags, - logger *zap.Logger, - globalSecretRef client.ObjectKey, -) *AtlasDatabaseUserReconciler { +func NewAtlasDatabaseUserReconciler(c cluster.Cluster, predicates []predicate.Predicate, atlasProvider atlas.Provider, deletionProtection bool, independentSyncPeriod time.Duration, featureFlags *featureflags.FeatureFlags, logger *zap.Logger, globalSecretRef client.ObjectKey, maxConcurrentReconciles int) *AtlasDatabaseUserReconciler { return &AtlasDatabaseUserReconciler{ AtlasReconciler: reconciler.AtlasReconciler{ Client: c.GetClient(), @@ -295,5 +288,6 @@ func NewAtlasDatabaseUserReconciler( GlobalPredicates: predicates, ObjectDeletionProtection: deletionProtection, independentSyncPeriod: independentSyncPeriod, + maxConcurrentReconciles: maxConcurrentReconciles, } } diff --git a/internal/controller/atlasdatafederation/datafederation_controller.go b/internal/controller/atlasdatafederation/datafederation_controller.go index 74a5807022..cdede3734a 100644 --- a/internal/controller/atlasdatafederation/datafederation_controller.go +++ b/internal/controller/atlasdatafederation/datafederation_controller.go @@ -60,6 +60,7 @@ type AtlasDataFederationReconciler struct { ObjectDeletionProtection bool SubObjectDeletionProtection bool GlobalSecretRef client.ObjectKey + maxConcurrentReconciles int } // +kubebuilder:rbac:groups=atlas.mongodb.com,resources=atlasdatafederations,verbs=get;list;watch;create;update;patch;delete @@ -244,8 +245,9 @@ func (r *AtlasDataFederationReconciler) SetupWithManager(mgr ctrl.Manager, skipN builder.WithPredicates(predicate.GenerationChangedPredicate{}), ). WithOptions(controller.TypedOptions[reconcile.Request]{ - RateLimiter: ratelimit.NewRateLimiter[reconcile.Request](), - SkipNameValidation: pointer.MakePtr(skipNameValidation)}). + RateLimiter: ratelimit.NewRateLimiter[reconcile.Request](), + SkipNameValidation: pointer.MakePtr(skipNameValidation), + MaxConcurrentReconciles: r.maxConcurrentReconciles}). Complete(r) } @@ -276,14 +278,7 @@ func (r *AtlasDataFederationReconciler) findAtlasDataFederationForProjects(ctx c return requests } -func NewAtlasDataFederationReconciler( - c cluster.Cluster, - predicates []predicate.Predicate, - atlasProvider atlas.Provider, - deletionProtection bool, - logger *zap.Logger, - globalSecretRef client.ObjectKey, -) *AtlasDataFederationReconciler { +func NewAtlasDataFederationReconciler(c cluster.Cluster, predicates []predicate.Predicate, atlasProvider atlas.Provider, deletionProtection bool, logger *zap.Logger, globalSecretRef client.ObjectKey, maxConcurrentReconciles int) *AtlasDataFederationReconciler { return &AtlasDataFederationReconciler{ Scheme: c.GetScheme(), Client: c.GetClient(), @@ -293,6 +288,7 @@ func NewAtlasDataFederationReconciler( AtlasProvider: atlasProvider, ObjectDeletionProtection: deletionProtection, GlobalSecretRef: globalSecretRef, + maxConcurrentReconciles: maxConcurrentReconciles, } } diff --git a/internal/controller/atlasdeployment/atlasdeployment_controller.go b/internal/controller/atlasdeployment/atlasdeployment_controller.go index 0854bb6452..a81e7cb35b 100644 --- a/internal/controller/atlasdeployment/atlasdeployment_controller.go +++ b/internal/controller/atlasdeployment/atlasdeployment_controller.go @@ -64,6 +64,7 @@ type AtlasDeploymentReconciler struct { ObjectDeletionProtection bool SubObjectDeletionProtection bool independentSyncPeriod time.Duration + maxConcurrentReconciles int } // +kubebuilder:rbac:groups=atlas.mongodb.com,resources=atlasdeployments,verbs=get;list;watch;create;update;patch;delete @@ -405,20 +406,13 @@ func (r *AtlasDeploymentReconciler) SetupWithManager(mgr ctrl.Manager, skipNameV builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}), ). WithOptions(controller.TypedOptions[reconcile.Request]{ - RateLimiter: ratelimit.NewRateLimiter[reconcile.Request](), - SkipNameValidation: pointer.MakePtr(skipNameValidation)}). + RateLimiter: ratelimit.NewRateLimiter[reconcile.Request](), + SkipNameValidation: pointer.MakePtr(skipNameValidation), + MaxConcurrentReconciles: r.maxConcurrentReconciles}). Complete(r) } -func NewAtlasDeploymentReconciler( - c cluster.Cluster, - predicates []predicate.Predicate, - atlasProvider atlas.Provider, - deletionProtection bool, - independentSyncPeriod time.Duration, - logger *zap.Logger, - globalSecretref client.ObjectKey, -) *AtlasDeploymentReconciler { +func NewAtlasDeploymentReconciler(c cluster.Cluster, predicates []predicate.Predicate, atlasProvider atlas.Provider, deletionProtection bool, independentSyncPeriod time.Duration, logger *zap.Logger, globalSecretref client.ObjectKey, maxConcurrentReconciles int) *AtlasDeploymentReconciler { suggaredLogger := logger.Named("controllers").Named("AtlasDeployment").Sugar() return &AtlasDeploymentReconciler{ @@ -433,6 +427,7 @@ func NewAtlasDeploymentReconciler( GlobalPredicates: predicates, ObjectDeletionProtection: deletionProtection, independentSyncPeriod: independentSyncPeriod, + maxConcurrentReconciles: maxConcurrentReconciles, } } diff --git a/internal/controller/atlasfederatedauth/atlasfederated_auth_controller.go b/internal/controller/atlasfederatedauth/atlasfederated_auth_controller.go index 7a83273e03..8a06433680 100644 --- a/internal/controller/atlasfederatedauth/atlasfederated_auth_controller.go +++ b/internal/controller/atlasfederatedauth/atlasfederated_auth_controller.go @@ -57,6 +57,7 @@ type AtlasFederatedAuthReconciler struct { ObjectDeletionProtection bool SubObjectDeletionProtection bool GlobalSecretRef client.ObjectKey + maxConcurrentReconciles int } // +kubebuilder:rbac:groups=atlas.mongodb.com,resources=atlasfederatedauths,verbs=get;list;watch;create;update;patch;delete @@ -143,8 +144,9 @@ func (r *AtlasFederatedAuthReconciler) SetupWithManager(mgr ctrl.Manager, skipNa handler.EnqueueRequestsFromMapFunc(r.findAtlasFederatedAuthForSecret), ). WithOptions(controller.TypedOptions[reconcile.Request]{ - RateLimiter: ratelimit.NewRateLimiter[reconcile.Request](), - SkipNameValidation: pointer.MakePtr(skipNameValidation)}). + RateLimiter: ratelimit.NewRateLimiter[reconcile.Request](), + SkipNameValidation: pointer.MakePtr(skipNameValidation), + MaxConcurrentReconciles: r.maxConcurrentReconciles}). Complete(r) } @@ -185,14 +187,7 @@ func (r *AtlasFederatedAuthReconciler) findAtlasFederatedAuthForSecret(ctx conte return requests } -func NewAtlasFederatedAuthReconciler( - c cluster.Cluster, - predicates []predicate.Predicate, - atlasProvider atlas.Provider, - deletionProtection bool, - logger *zap.Logger, - globalSecretRef client.ObjectKey, -) *AtlasFederatedAuthReconciler { +func NewAtlasFederatedAuthReconciler(c cluster.Cluster, predicates []predicate.Predicate, atlasProvider atlas.Provider, deletionProtection bool, logger *zap.Logger, globalSecretRef client.ObjectKey, maxConcurrentReconciles int) *AtlasFederatedAuthReconciler { return &AtlasFederatedAuthReconciler{ Scheme: c.GetScheme(), Client: c.GetClient(), @@ -202,6 +197,7 @@ func NewAtlasFederatedAuthReconciler( AtlasProvider: atlasProvider, ObjectDeletionProtection: deletionProtection, GlobalSecretRef: globalSecretRef, + maxConcurrentReconciles: maxConcurrentReconciles, } } diff --git a/internal/controller/atlasipaccesslist/atlasipaccesslist_controller.go b/internal/controller/atlasipaccesslist/atlasipaccesslist_controller.go index c049a8fc88..e382b5b3b2 100644 --- a/internal/controller/atlasipaccesslist/atlasipaccesslist_controller.go +++ b/internal/controller/atlasipaccesslist/atlasipaccesslist_controller.go @@ -51,6 +51,7 @@ type AtlasIPAccessListReconciler struct { GlobalPredicates []predicate.Predicate ObjectDeletionProtection bool independentSyncPeriod time.Duration + maxConcurrentReconciles int } // +kubebuilder:rbac:groups=atlas.mongodb.com,resources=atlasipaccesslists,verbs=get;list;watch;create;update;patch;delete @@ -91,9 +92,9 @@ func (r *AtlasIPAccessListReconciler) SetupWithManager(mgr manager.Manager, skip builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}), ). WithOptions(controller.TypedOptions[reconcile.Request]{ - RateLimiter: ratelimit.NewRateLimiter[reconcile.Request](), - SkipNameValidation: pointer.MakePtr(skipNameValidation), - }). + RateLimiter: ratelimit.NewRateLimiter[reconcile.Request](), + SkipNameValidation: pointer.MakePtr(skipNameValidation), + MaxConcurrentReconciles: r.maxConcurrentReconciles}). Complete(r) } @@ -139,15 +140,7 @@ func (r *AtlasIPAccessListReconciler) ipAccessListForCredentialMapFunc() handler ) } -func NewAtlasIPAccessListReconciler( - c cluster.Cluster, - predicates []predicate.Predicate, - atlasProvider atlas.Provider, - deletionProtection bool, - independentSyncPeriod time.Duration, - logger *zap.Logger, - globalSecretRef client.ObjectKey, -) *AtlasIPAccessListReconciler { +func NewAtlasIPAccessListReconciler(c cluster.Cluster, predicates []predicate.Predicate, atlasProvider atlas.Provider, deletionProtection bool, independentSyncPeriod time.Duration, logger *zap.Logger, globalSecretRef client.ObjectKey, maxConcurrentReconciles int) *AtlasIPAccessListReconciler { return &AtlasIPAccessListReconciler{ AtlasReconciler: reconciler.AtlasReconciler{ Client: c.GetClient(), @@ -160,5 +153,6 @@ func NewAtlasIPAccessListReconciler( GlobalPredicates: predicates, ObjectDeletionProtection: deletionProtection, independentSyncPeriod: independentSyncPeriod, + maxConcurrentReconciles: maxConcurrentReconciles, } } diff --git a/internal/controller/atlasnetworkcontainer/atlasnetworkcontainer_controller.go b/internal/controller/atlasnetworkcontainer/atlasnetworkcontainer_controller.go index 0bc3939a3f..1ed79c3b6c 100644 --- a/internal/controller/atlasnetworkcontainer/atlasnetworkcontainer_controller.go +++ b/internal/controller/atlasnetworkcontainer/atlasnetworkcontainer_controller.go @@ -48,6 +48,7 @@ type AtlasNetworkContainerReconciler struct { GlobalPredicates []predicate.Predicate ObjectDeletionProtection bool independentSyncPeriod time.Duration + maxConcurrentReconciles int } // +kubebuilder:rbac:groups=atlas.mongodb.com,resources=atlasnetworkcontainers,verbs=get;list;watch;create;update;patch;delete @@ -86,8 +87,9 @@ func (r *AtlasNetworkContainerReconciler) SetupWithManager(mgr ctrl.Manager, ski builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}), ). WithOptions(controller.TypedOptions[reconcile.Request]{ - RateLimiter: ratelimit.NewRateLimiter[reconcile.Request](), - SkipNameValidation: pointer.MakePtr(skipNameValidation)}). + RateLimiter: ratelimit.NewRateLimiter[reconcile.Request](), + SkipNameValidation: pointer.MakePtr(skipNameValidation), + MaxConcurrentReconciles: r.maxConcurrentReconciles}). Complete(r) } @@ -111,15 +113,7 @@ func (r *AtlasNetworkContainerReconciler) networkContainerForCredentialMapFunc() ) } -func NewAtlasNetworkContainerReconciler( - c cluster.Cluster, - predicates []predicate.Predicate, - atlasProvider atlas.Provider, - deletionProtection bool, - logger *zap.Logger, - independentSyncPeriod time.Duration, - globalSecretRef client.ObjectKey, -) *AtlasNetworkContainerReconciler { +func NewAtlasNetworkContainerReconciler(c cluster.Cluster, predicates []predicate.Predicate, atlasProvider atlas.Provider, deletionProtection bool, logger *zap.Logger, independentSyncPeriod time.Duration, globalSecretRef client.ObjectKey, maxConcurrentReconciles int) *AtlasNetworkContainerReconciler { return &AtlasNetworkContainerReconciler{ AtlasReconciler: reconciler.AtlasReconciler{ Client: c.GetClient(), @@ -132,5 +126,6 @@ func NewAtlasNetworkContainerReconciler( GlobalPredicates: predicates, ObjectDeletionProtection: deletionProtection, independentSyncPeriod: independentSyncPeriod, + maxConcurrentReconciles: maxConcurrentReconciles, } } diff --git a/internal/controller/atlasnetworkpeering/atlasnetworkpeering_controller.go b/internal/controller/atlasnetworkpeering/atlasnetworkpeering_controller.go index f4105f6d2f..d84547ed64 100644 --- a/internal/controller/atlasnetworkpeering/atlasnetworkpeering_controller.go +++ b/internal/controller/atlasnetworkpeering/atlasnetworkpeering_controller.go @@ -53,17 +53,10 @@ type AtlasNetworkPeeringReconciler struct { GlobalPredicates []predicate.Predicate ObjectDeletionProtection bool independentSyncPeriod time.Duration + maxConcurrentReconciles int } -func NewAtlasNetworkPeeringsReconciler( - c cluster.Cluster, - predicates []predicate.Predicate, - atlasProvider atlas.Provider, - deletionProtection bool, - logger *zap.Logger, - independentSyncPeriod time.Duration, - globalSecretRef client.ObjectKey, -) *AtlasNetworkPeeringReconciler { +func NewAtlasNetworkPeeringsReconciler(c cluster.Cluster, predicates []predicate.Predicate, atlasProvider atlas.Provider, deletionProtection bool, logger *zap.Logger, independentSyncPeriod time.Duration, globalSecretRef client.ObjectKey, maxConcurrentReconciles int) *AtlasNetworkPeeringReconciler { return &AtlasNetworkPeeringReconciler{ AtlasReconciler: reconciler.AtlasReconciler{ Client: c.GetClient(), @@ -76,6 +69,7 @@ func NewAtlasNetworkPeeringsReconciler( GlobalPredicates: predicates, ObjectDeletionProtection: deletionProtection, independentSyncPeriod: independentSyncPeriod, + maxConcurrentReconciles: maxConcurrentReconciles, } } @@ -131,8 +125,9 @@ func (r *AtlasNetworkPeeringReconciler) SetupWithManager(mgr ctrl.Manager, skipN builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}), ). WithOptions(controller.TypedOptions[reconcile.Request]{ - RateLimiter: ratelimit.NewRateLimiter[reconcile.Request](), - SkipNameValidation: pointer.MakePtr(skipNameValidation)}). + RateLimiter: ratelimit.NewRateLimiter[reconcile.Request](), + SkipNameValidation: pointer.MakePtr(skipNameValidation), + MaxConcurrentReconciles: r.maxConcurrentReconciles}). Complete(r) } diff --git a/internal/controller/atlasprivateendpoint/atlasprivateendpoint_controller.go b/internal/controller/atlasprivateendpoint/atlasprivateendpoint_controller.go index 7f633ac33c..1d58a17c7d 100644 --- a/internal/controller/atlasprivateendpoint/atlasprivateendpoint_controller.go +++ b/internal/controller/atlasprivateendpoint/atlasprivateendpoint_controller.go @@ -57,6 +57,7 @@ type AtlasPrivateEndpointReconciler struct { ObjectDeletionProtection bool independentSyncPeriod time.Duration + maxConcurrentReconciles int } // +kubebuilder:rbac:groups=atlas.mongodb.com,resources=atlasprivateendpoints,verbs=get;list;watch;create;update;patch;delete @@ -240,8 +241,9 @@ func (r *AtlasPrivateEndpointReconciler) SetupWithManager(mgr ctrl.Manager, skip builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}), ). WithOptions(controller.TypedOptions[reconcile.Request]{ - RateLimiter: ratelimit.NewRateLimiter[reconcile.Request](), - SkipNameValidation: pointer.MakePtr(skipNameValidation)}). + RateLimiter: ratelimit.NewRateLimiter[reconcile.Request](), + SkipNameValidation: pointer.MakePtr(skipNameValidation), + MaxConcurrentReconciles: r.maxConcurrentReconciles}). Complete(r) } @@ -287,15 +289,7 @@ func (r *AtlasPrivateEndpointReconciler) privateEndpointForCredentialMapFunc() h ) } -func NewAtlasPrivateEndpointReconciler( - c cluster.Cluster, - predicates []predicate.Predicate, - atlasProvider atlas.Provider, - deletionProtection bool, - independentSyncPeriod time.Duration, - logger *zap.Logger, - globalSecretRef client.ObjectKey, -) *AtlasPrivateEndpointReconciler { +func NewAtlasPrivateEndpointReconciler(c cluster.Cluster, predicates []predicate.Predicate, atlasProvider atlas.Provider, deletionProtection bool, independentSyncPeriod time.Duration, logger *zap.Logger, globalSecretRef client.ObjectKey, maxConcurrentReconciles int) *AtlasPrivateEndpointReconciler { return &AtlasPrivateEndpointReconciler{ AtlasReconciler: reconciler.AtlasReconciler{ Client: c.GetClient(), @@ -308,5 +302,6 @@ func NewAtlasPrivateEndpointReconciler( GlobalPredicates: predicates, ObjectDeletionProtection: deletionProtection, independentSyncPeriod: independentSyncPeriod, + maxConcurrentReconciles: maxConcurrentReconciles, } } diff --git a/internal/controller/atlasproject/atlasproject_controller.go b/internal/controller/atlasproject/atlasproject_controller.go index 074251f4fd..d8f968148a 100644 --- a/internal/controller/atlasproject/atlasproject_controller.go +++ b/internal/controller/atlasproject/atlasproject_controller.go @@ -63,6 +63,7 @@ type AtlasProjectReconciler struct { ObjectDeletionProtection bool SubObjectDeletionProtection bool GlobalSecretRef client.ObjectKey + maxConcurrentReconciles int } type AtlasProjectServices struct { @@ -277,8 +278,9 @@ func (r *AtlasProjectReconciler) SetupWithManager(mgr ctrl.Manager, skipNameVali builder.WithPredicates(predicate.GenerationChangedPredicate{}), ). WithOptions(controller.TypedOptions[reconcile.Request]{ - RateLimiter: ratelimit.NewRateLimiter[reconcile.Request](), - SkipNameValidation: pointer.MakePtr(skipNameValidation)}). + RateLimiter: ratelimit.NewRateLimiter[reconcile.Request](), + SkipNameValidation: pointer.MakePtr(skipNameValidation), + MaxConcurrentReconciles: r.maxConcurrentReconciles}). Complete(r) } @@ -289,6 +291,7 @@ func NewAtlasProjectReconciler( deletionProtection bool, logger *zap.Logger, globalSecretRef client.ObjectKey, + maxConcurrentReconciles int, ) *AtlasProjectReconciler { return &AtlasProjectReconciler{ Scheme: c.GetScheme(), @@ -299,6 +302,7 @@ func NewAtlasProjectReconciler( AtlasProvider: atlasProvider, ObjectDeletionProtection: deletionProtection, GlobalSecretRef: globalSecretRef, + maxConcurrentReconciles: maxConcurrentReconciles, } } diff --git a/internal/controller/atlassearchindexconfig/atlassearchindexconfig_controller.go b/internal/controller/atlassearchindexconfig/atlassearchindexconfig_controller.go index 5b736616b8..91482f3c2a 100644 --- a/internal/controller/atlassearchindexconfig/atlassearchindexconfig_controller.go +++ b/internal/controller/atlassearchindexconfig/atlassearchindexconfig_controller.go @@ -61,6 +61,7 @@ type AtlasSearchIndexConfigReconciler struct { Log *zap.SugaredLogger ObjectDeletionProtection bool SubObjectDeletionProtection bool + maxConcurrentReconciles int } func (r *AtlasSearchIndexConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { @@ -125,18 +126,13 @@ func (r *AtlasSearchIndexConfigReconciler) SetupWithManager(mgr ctrl.Manager, sk builder.WithPredicates(predicate.GenerationChangedPredicate{}), ). WithOptions(controller.TypedOptions[reconcile.Request]{ - RateLimiter: ratelimit.NewRateLimiter[reconcile.Request](), - SkipNameValidation: pointer.MakePtr(skipNameValidation)}). + RateLimiter: ratelimit.NewRateLimiter[reconcile.Request](), + SkipNameValidation: pointer.MakePtr(skipNameValidation), + MaxConcurrentReconciles: r.maxConcurrentReconciles}). Complete(r) } -func NewAtlasSearchIndexConfigReconciler( - c cluster.Cluster, - predicates []predicate.Predicate, - atlasProvider atlas.Provider, - deletionProtection bool, - logger *zap.Logger, -) *AtlasSearchIndexConfigReconciler { +func NewAtlasSearchIndexConfigReconciler(c cluster.Cluster, predicates []predicate.Predicate, atlasProvider atlas.Provider, deletionProtection bool, logger *zap.Logger, maxConcurrentReconciles int) *AtlasSearchIndexConfigReconciler { return &AtlasSearchIndexConfigReconciler{ Scheme: c.GetScheme(), Client: c.GetClient(), @@ -145,6 +141,7 @@ func NewAtlasSearchIndexConfigReconciler( Log: logger.Named("controllers").Named("AtlasSearchIndexConfig").Sugar(), AtlasProvider: atlasProvider, ObjectDeletionProtection: deletionProtection, + maxConcurrentReconciles: maxConcurrentReconciles, } } diff --git a/internal/controller/atlasstream/atlasstream_connection_controller.go b/internal/controller/atlasstream/atlasstream_connection_controller.go index b3bf9739c3..059176805a 100644 --- a/internal/controller/atlasstream/atlasstream_connection_controller.go +++ b/internal/controller/atlasstream/atlasstream_connection_controller.go @@ -52,6 +52,7 @@ type AtlasStreamsConnectionReconciler struct { Log *zap.SugaredLogger ObjectDeletionProtection bool SubObjectDeletionProtection bool + maxConcurrentReconciles int } // +kubebuilder:rbac:groups=atlas.mongodb.com,resources=atlasstreamconnections,verbs=get;list;watch;create;update;patch;delete @@ -126,18 +127,13 @@ func (r *AtlasStreamsConnectionReconciler) SetupWithManager(mgr ctrl.Manager, sk builder.WithPredicates(predicate.GenerationChangedPredicate{}), ). WithOptions(controller.TypedOptions[reconcile.Request]{ - RateLimiter: ratelimit.NewRateLimiter[reconcile.Request](), - SkipNameValidation: pointer.MakePtr(skipNameValidation)}). + RateLimiter: ratelimit.NewRateLimiter[reconcile.Request](), + SkipNameValidation: pointer.MakePtr(skipNameValidation), + MaxConcurrentReconciles: r.maxConcurrentReconciles}). Complete(r) } -func NewAtlasStreamsConnectionReconciler( - c cluster.Cluster, - predicates []predicate.Predicate, - atlasProvider atlas.Provider, - deletionProtection bool, - logger *zap.Logger, -) *AtlasStreamsConnectionReconciler { +func NewAtlasStreamsConnectionReconciler(c cluster.Cluster, predicates []predicate.Predicate, atlasProvider atlas.Provider, deletionProtection bool, logger *zap.Logger, maxConcurrentReconciles int) *AtlasStreamsConnectionReconciler { return &AtlasStreamsConnectionReconciler{ Scheme: c.GetScheme(), Client: c.GetClient(), @@ -146,6 +142,7 @@ func NewAtlasStreamsConnectionReconciler( Log: logger.Named("controllers").Named("AtlasStreamsConnection").Sugar(), AtlasProvider: atlasProvider, ObjectDeletionProtection: deletionProtection, + maxConcurrentReconciles: maxConcurrentReconciles, } } diff --git a/internal/controller/atlasstream/atlasstream_instance_controller.go b/internal/controller/atlasstream/atlasstream_instance_controller.go index 284161289b..0b3e036566 100644 --- a/internal/controller/atlasstream/atlasstream_instance_controller.go +++ b/internal/controller/atlasstream/atlasstream_instance_controller.go @@ -57,6 +57,7 @@ type AtlasStreamsInstanceReconciler struct { ObjectDeletionProtection bool SubObjectDeletionProtection bool GlobalSecretRef client.ObjectKey + maxConcurrentReconciles int } // +kubebuilder:rbac:groups=atlas.mongodb.com,resources=atlasstreaminstances,verbs=get;list;watch;create;update;patch;delete @@ -177,19 +178,13 @@ func (r *AtlasStreamsInstanceReconciler) SetupWithManager(mgr ctrl.Manager, skip builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}), ). WithOptions(controller.TypedOptions[reconcile.Request]{ - RateLimiter: ratelimit.NewRateLimiter[reconcile.Request](), - SkipNameValidation: pointer.MakePtr(skipNameValidation)}). + RateLimiter: ratelimit.NewRateLimiter[reconcile.Request](), + SkipNameValidation: pointer.MakePtr(skipNameValidation), + MaxConcurrentReconciles: r.maxConcurrentReconciles}). Complete(r) } -func NewAtlasStreamsInstanceReconciler( - c cluster.Cluster, - predicates []predicate.Predicate, - atlasProvider atlas.Provider, - deletionProtection bool, - logger *zap.Logger, - globalSecretRef client.ObjectKey, -) *AtlasStreamsInstanceReconciler { +func NewAtlasStreamsInstanceReconciler(c cluster.Cluster, predicates []predicate.Predicate, atlasProvider atlas.Provider, deletionProtection bool, logger *zap.Logger, globalSecretRef client.ObjectKey, maxConcurrentReconciles int) *AtlasStreamsInstanceReconciler { return &AtlasStreamsInstanceReconciler{ Scheme: c.GetScheme(), Client: c.GetClient(), @@ -199,6 +194,7 @@ func NewAtlasStreamsInstanceReconciler( AtlasProvider: atlasProvider, ObjectDeletionProtection: deletionProtection, GlobalSecretRef: globalSecretRef, + maxConcurrentReconciles: maxConcurrentReconciles, } } diff --git a/internal/controller/registry.go b/internal/controller/registry.go index 2f611ed8cd..3ed247bc36 100644 --- a/internal/controller/registry.go +++ b/internal/controller/registry.go @@ -70,18 +70,20 @@ type Registry struct { reconcilers []Reconciler globalSecretRef client.ObjectKey - reapplySupport bool + reapplySupport bool + maxConcurrentReconciles int } -func NewRegistry(predicates []predicate.Predicate, deletionProtection bool, logger *zap.Logger, independentSyncPeriod time.Duration, featureFlags *featureflags.FeatureFlags, globalSecretRef client.ObjectKey) *Registry { +func NewRegistry(predicates []predicate.Predicate, deletionProtection bool, logger *zap.Logger, independentSyncPeriod time.Duration, featureFlags *featureflags.FeatureFlags, globalSecretRef client.ObjectKey, maxConcurrentReconciles int) *Registry { return &Registry{ - sharedPredicates: predicates, - deletionProtection: deletionProtection, - logger: logger, - independentSyncPeriod: independentSyncPeriod, - featureFlags: featureFlags, - globalSecretRef: globalSecretRef, - reapplySupport: DefaultReapplySupport, + sharedPredicates: predicates, + deletionProtection: deletionProtection, + logger: logger, + independentSyncPeriod: independentSyncPeriod, + featureFlags: featureFlags, + globalSecretRef: globalSecretRef, + reapplySupport: DefaultReapplySupport, + maxConcurrentReconciles: maxConcurrentReconciles, } } @@ -112,25 +114,25 @@ func (r *Registry) registerControllers(c cluster.Cluster, ap atlas.Provider) { } var reconcilers []Reconciler - reconcilers = append(reconcilers, atlasproject.NewAtlasProjectReconciler(c, r.deprecatedPredicates(), ap, r.deletionProtection, r.logger, r.globalSecretRef)) - reconcilers = append(reconcilers, atlasdeployment.NewAtlasDeploymentReconciler(c, r.deprecatedPredicates(), ap, r.deletionProtection, r.independentSyncPeriod, r.logger, r.globalSecretRef)) - reconcilers = append(reconcilers, atlasdatabaseuser.NewAtlasDatabaseUserReconciler(c, r.deprecatedPredicates(), ap, r.deletionProtection, r.independentSyncPeriod, r.featureFlags, r.logger, r.globalSecretRef)) - reconcilers = append(reconcilers, atlasdatafederation.NewAtlasDataFederationReconciler(c, r.deprecatedPredicates(), ap, r.deletionProtection, r.logger, r.globalSecretRef)) - reconcilers = append(reconcilers, atlasfederatedauth.NewAtlasFederatedAuthReconciler(c, r.deprecatedPredicates(), ap, r.deletionProtection, r.logger, r.globalSecretRef)) - reconcilers = append(reconcilers, atlasstream.NewAtlasStreamsInstanceReconciler(c, r.deprecatedPredicates(), ap, r.deletionProtection, r.logger, r.globalSecretRef)) - reconcilers = append(reconcilers, atlasstream.NewAtlasStreamsConnectionReconciler(c, r.deprecatedPredicates(), ap, r.deletionProtection, r.logger)) - reconcilers = append(reconcilers, atlassearchindexconfig.NewAtlasSearchIndexConfigReconciler(c, r.deprecatedPredicates(), ap, r.deletionProtection, r.logger)) - reconcilers = append(reconcilers, atlasbackupcompliancepolicy.NewAtlasBackupCompliancePolicyReconciler(c, r.deprecatedPredicates(), ap, r.deletionProtection, r.logger)) - reconcilers = append(reconcilers, atlascustomrole.NewAtlasCustomRoleReconciler(c, r.deprecatedPredicates(), ap, r.deletionProtection, r.independentSyncPeriod, r.logger, r.globalSecretRef)) - reconcilers = append(reconcilers, atlasprivateendpoint.NewAtlasPrivateEndpointReconciler(c, r.defaultPredicates(), ap, r.deletionProtection, r.independentSyncPeriod, r.logger, r.globalSecretRef)) - reconcilers = append(reconcilers, atlasipaccesslist.NewAtlasIPAccessListReconciler(c, r.defaultPredicates(), ap, r.deletionProtection, r.independentSyncPeriod, r.logger, r.globalSecretRef)) - reconcilers = append(reconcilers, atlasnetworkcontainer.NewAtlasNetworkContainerReconciler(c, r.defaultPredicates(), ap, r.deletionProtection, r.logger, r.independentSyncPeriod, r.globalSecretRef)) - reconcilers = append(reconcilers, atlasnetworkpeering.NewAtlasNetworkPeeringsReconciler(c, r.defaultPredicates(), ap, r.deletionProtection, r.logger, r.independentSyncPeriod, r.globalSecretRef)) + reconcilers = append(reconcilers, atlasproject.NewAtlasProjectReconciler(c, r.deprecatedPredicates(), ap, r.deletionProtection, r.logger, r.globalSecretRef, r.maxConcurrentReconciles)) + reconcilers = append(reconcilers, atlasdeployment.NewAtlasDeploymentReconciler(c, r.deprecatedPredicates(), ap, r.deletionProtection, r.independentSyncPeriod, r.logger, r.globalSecretRef, r.maxConcurrentReconciles)) + reconcilers = append(reconcilers, atlasdatabaseuser.NewAtlasDatabaseUserReconciler(c, r.deprecatedPredicates(), ap, r.deletionProtection, r.independentSyncPeriod, r.featureFlags, r.logger, r.globalSecretRef, r.maxConcurrentReconciles)) + reconcilers = append(reconcilers, atlasdatafederation.NewAtlasDataFederationReconciler(c, r.deprecatedPredicates(), ap, r.deletionProtection, r.logger, r.globalSecretRef, r.maxConcurrentReconciles)) + reconcilers = append(reconcilers, atlasfederatedauth.NewAtlasFederatedAuthReconciler(c, r.deprecatedPredicates(), ap, r.deletionProtection, r.logger, r.globalSecretRef, r.maxConcurrentReconciles)) + reconcilers = append(reconcilers, atlasstream.NewAtlasStreamsInstanceReconciler(c, r.deprecatedPredicates(), ap, r.deletionProtection, r.logger, r.globalSecretRef, r.maxConcurrentReconciles)) + reconcilers = append(reconcilers, atlasstream.NewAtlasStreamsConnectionReconciler(c, r.deprecatedPredicates(), ap, r.deletionProtection, r.logger, r.maxConcurrentReconciles)) + reconcilers = append(reconcilers, atlassearchindexconfig.NewAtlasSearchIndexConfigReconciler(c, r.deprecatedPredicates(), ap, r.deletionProtection, r.logger, r.maxConcurrentReconciles)) + reconcilers = append(reconcilers, atlasbackupcompliancepolicy.NewAtlasBackupCompliancePolicyReconciler(c, r.deprecatedPredicates(), ap, r.deletionProtection, r.logger, r.maxConcurrentReconciles)) + reconcilers = append(reconcilers, atlascustomrole.NewAtlasCustomRoleReconciler(c, r.deprecatedPredicates(), ap, r.deletionProtection, r.independentSyncPeriod, r.logger, r.globalSecretRef, r.maxConcurrentReconciles)) + reconcilers = append(reconcilers, atlasprivateendpoint.NewAtlasPrivateEndpointReconciler(c, r.defaultPredicates(), ap, r.deletionProtection, r.independentSyncPeriod, r.logger, r.globalSecretRef, r.maxConcurrentReconciles)) + reconcilers = append(reconcilers, atlasipaccesslist.NewAtlasIPAccessListReconciler(c, r.defaultPredicates(), ap, r.deletionProtection, r.independentSyncPeriod, r.logger, r.globalSecretRef, r.maxConcurrentReconciles)) + reconcilers = append(reconcilers, atlasnetworkcontainer.NewAtlasNetworkContainerReconciler(c, r.defaultPredicates(), ap, r.deletionProtection, r.logger, r.independentSyncPeriod, r.globalSecretRef, r.maxConcurrentReconciles)) + reconcilers = append(reconcilers, atlasnetworkpeering.NewAtlasNetworkPeeringsReconciler(c, r.defaultPredicates(), ap, r.deletionProtection, r.logger, r.independentSyncPeriod, r.globalSecretRef, r.maxConcurrentReconciles)) orgSettingsReconciler := atlasorgsettings.NewAtlasOrgSettingsReconciler(c, ap, r.logger, r.globalSecretRef, r.reapplySupport) - reconcilers = append(reconcilers, newCtrlStateReconciler(orgSettingsReconciler)) + reconcilers = append(reconcilers, newCtrlStateReconciler(orgSettingsReconciler, r.maxConcurrentReconciles)) integrationsReconciler := integrations.NewAtlasThirdPartyIntegrationsReconciler(c, ap, r.deletionProtection, r.logger, r.globalSecretRef, r.reapplySupport) - reconcilers = append(reconcilers, newCtrlStateReconciler(integrationsReconciler)) + reconcilers = append(reconcilers, newCtrlStateReconciler(integrationsReconciler, r.maxConcurrentReconciles)) if version.IsExperimental() { // Add experimental controllers here @@ -151,16 +153,18 @@ func (r *Registry) defaultPredicates() []predicate.Predicate { type ctrlStateReconciler[T any] struct { *ctrlstate.Reconciler[T] + maxConcurrentReconciles int } -func newCtrlStateReconciler[T any](r *ctrlstate.Reconciler[T]) *ctrlStateReconciler[T] { - return &ctrlStateReconciler[T]{Reconciler: r} +func newCtrlStateReconciler[T any](r *ctrlstate.Reconciler[T], maxConcurrentReconciles int) *ctrlStateReconciler[T] { + return &ctrlStateReconciler[T]{Reconciler: r, maxConcurrentReconciles: maxConcurrentReconciles} } func (nr *ctrlStateReconciler[T]) SetupWithManager(mgr ctrl.Manager, skipNameValidation bool) error { defaultReconcilerOptions := controller.TypedOptions[reconcile.Request]{ - RateLimiter: ratelimit.NewRateLimiter[reconcile.Request](), - SkipNameValidation: pointer.MakePtr(skipNameValidation), + RateLimiter: ratelimit.NewRateLimiter[reconcile.Request](), + SkipNameValidation: pointer.MakePtr(skipNameValidation), + MaxConcurrentReconciles: nr.maxConcurrentReconciles, } return nr.Reconciler.SetupWithManager(mgr, defaultReconcilerOptions) } diff --git a/internal/controller/registry_test.go b/internal/controller/registry_test.go index 08e87dd59f..6130c4b52b 100644 --- a/internal/controller/registry_test.go +++ b/internal/controller/registry_test.go @@ -34,7 +34,7 @@ func TestCtrlStateReconciler_SetupWithManager_NoGomock(t *testing.T) { fakeReconciler := ctrlstate.NewStateReconciler(&mock) skipNameValidation := true - r := newCtrlStateReconciler(fakeReconciler) + r := newCtrlStateReconciler(fakeReconciler, 0) require.NoError(t, r.SetupWithManager(fakeMgr, skipNameValidation)) require.Equal(t, fakeMgr, mock.ReceivedMgr) wantOpts := controller.TypedOptions[reconcile.Request]{ diff --git a/internal/operator/builder.go b/internal/operator/builder.go index b4a311c61d..576c3bc38b 100644 --- a/internal/operator/builder.go +++ b/internal/operator/builder.go @@ -81,14 +81,20 @@ type Builder struct { leaderElection bool leaderElectionID string - atlasDomain string - predicates []predicate.Predicate - apiSecret client.ObjectKey - atlasProvider atlas.Provider - featureFlags *featureflags.FeatureFlags - deletionProtection bool - skipNameValidation bool - dryRun bool + atlasDomain string + predicates []predicate.Predicate + apiSecret client.ObjectKey + atlasProvider atlas.Provider + featureFlags *featureflags.FeatureFlags + deletionProtection bool + skipNameValidation bool + dryRun bool + maxConcurrentReconciles int +} + +func (b *Builder) WithMaxConcurrentReconciles(maxConcurrentReconciles int) *Builder { + b.maxConcurrentReconciles = maxConcurrentReconciles + return b } func (b *Builder) WithConfig(config *rest.Config) *Builder { @@ -209,6 +215,7 @@ func (b *Builder) Build(ctx context.Context) (cluster.Cluster, error) { b.independentSyncPeriod, b.featureFlags, b.apiSecret, + b.maxConcurrentReconciles, ) var akoCluster cluster.Cluster diff --git a/internal/run/run.go b/internal/run/run.go index 4a2f3aea8a..cf058b631c 100644 --- a/internal/run/run.go +++ b/internal/run/run.go @@ -33,6 +33,7 @@ import ( "k8s.io/client-go/kubernetes/scheme" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" "k8s.io/klog/v2" + "k8s.io/utils/env" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -94,6 +95,7 @@ func Run(ctx context.Context, fs *flag.FlagSet, args []string) error { WithDeletionProtection(config.ObjectDeletionProtection). WithIndependentSyncPeriod(time.Duration(config.IndependentSyncPeriod) * time.Minute). WithDryRun(config.DryRun). + WithMaxConcurrentReconciles(config.MaxConcurrentReconciles). Build(ctx) if err != nil { setupLog.Error(err, "unable to start operator") @@ -123,6 +125,7 @@ type Config struct { IndependentSyncPeriod int FeatureFlags *featureflags.FeatureFlags DryRun bool + MaxConcurrentReconciles int } // ParseConfiguration fills the 'OperatorConfig' from the flags passed to the program @@ -150,6 +153,7 @@ func parseConfiguration(fs *flag.FlagSet, args []string) (Config, error) { fmt.Sprintf("The default time, in minutes, between reconciliations for independent custom resources. (default %d, minimum %d)", independentSyncPeriod, minimumIndependentSyncPeriod), ) fs.BoolVar(&config.DryRun, "dry-run", false, "If set, the operator will not perform any changes to the Atlas resources, run all reconcilers only Once and emit events for all planned changes") + config.MaxConcurrentReconciles, _ = env.GetInt("MDB_MAX_CONCURRENT_RECONCILES", 5) // errors yield default value appVersion := fs.Bool("v", false, "prints application version") if err := fs.Parse(args); err != nil { diff --git a/internal/run/run_test.go b/internal/run/run_test.go index 8d420fa1c3..2dacecdf52 100644 --- a/internal/run/run_test.go +++ b/internal/run/run_test.go @@ -288,6 +288,7 @@ func TestParseConfiguration(t *testing.T) { IndependentSyncPeriod: 15, FeatureFlags: featureflags.NewFeatureFlags(os.Environ), DryRun: false, + MaxConcurrentReconciles: 5, }, }, { @@ -315,6 +316,7 @@ func TestParseConfiguration(t *testing.T) { IndependentSyncPeriod: 15, FeatureFlags: featureflags.NewFeatureFlags(os.Environ), DryRun: false, + MaxConcurrentReconciles: 5, }, }, } {