diff --git a/api/v1alpha1/worker_types.go b/api/v1alpha1/worker_types.go index a107ce85..b2b26844 100644 --- a/api/v1alpha1/worker_types.go +++ b/api/v1alpha1/worker_types.go @@ -346,6 +346,82 @@ type TemporalWorkerDeploymentList struct { Items []TemporalWorkerDeployment `json:"items"` } +// TemporalWorkerDeploymentPatchSpec defines version-specific overrides for a TemporalWorkerDeployment +type TemporalWorkerDeploymentPatchSpec struct { + // TemporalWorkerDeploymentName is the name of the TemporalWorkerDeployment this patch applies to. + // The patch must be in the same namespace as the target deployment. + TemporalWorkerDeploymentName string `json:"temporalWorkerDeploymentName"` + + // VersionID specifies which version this patch applies to + VersionID string `json:"versionID"` + + // Replicas overrides the number of desired pods for this specific version. + // If not specified, the version will use the replicas from the main TemporalWorkerDeployment spec. + // +optional + Replicas *int32 `json:"replicas,omitempty"` + + // SunsetStrategy overrides how to manage sunsetting this specific version. + // If not specified, the version will use the sunset strategy from the main TemporalWorkerDeployment spec. + // +optional + SunsetStrategy *SunsetStrategy `json:"sunsetStrategy,omitempty"` +} + +// PatchStatus indicates the current state of the patch +// +enum +type PatchStatus string + +const ( + // PatchStatusActive indicates the patch is currently applied to an existing version + PatchStatusActive PatchStatus = "Active" + + // PatchStatusOrphaned indicates the referenced version no longer exists + PatchStatusOrphaned PatchStatus = "Orphaned" + + // PatchStatusInvalid indicates the patch references a TemporalWorkerDeployment that doesn't exist + PatchStatusInvalid PatchStatus = "Invalid" +) + +// TemporalWorkerDeploymentPatchStatus defines the observed state of TemporalWorkerDeploymentPatch +type TemporalWorkerDeploymentPatchStatus struct { + // Status indicates the current state of this patch + Status PatchStatus `json:"status"` + + // AppliedAt indicates when this patch was last successfully applied + // +optional + AppliedAt *metav1.Time `json:"appliedAt,omitempty"` + + // ObservedGeneration reflects the generation of the most recently observed patch spec + // +optional + ObservedGeneration int64 `json:"observedGeneration,omitempty"` +} + +//+kubebuilder:object:root=true +//+kubebuilder:subresource:status +// +kubebuilder:resource:shortName=twdpatch;twd-patch;temporalworkerpatch +//+kubebuilder:printcolumn:name="Target",type="string",JSONPath=".spec.temporalWorkerDeploymentName",description="Target TemporalWorkerDeployment" +//+kubebuilder:printcolumn:name="Version",type="string",JSONPath=".spec.versionID",description="Target Version ID" +//+kubebuilder:printcolumn:name="Status",type="string",JSONPath=".status.status",description="Patch Status" +//+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" + +// TemporalWorkerDeploymentPatch is the Schema for the temporalworkerdeploymentpatches API +type TemporalWorkerDeploymentPatch struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec TemporalWorkerDeploymentPatchSpec `json:"spec,omitempty"` + Status TemporalWorkerDeploymentPatchStatus `json:"status,omitempty"` +} + +//+kubebuilder:object:root=true + +// TemporalWorkerDeploymentPatchList contains a list of TemporalWorkerDeploymentPatch +type TemporalWorkerDeploymentPatchList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []TemporalWorkerDeploymentPatch `json:"items"` +} + func init() { SchemeBuilder.Register(&TemporalWorkerDeployment{}, &TemporalWorkerDeploymentList{}) + SchemeBuilder.Register(&TemporalWorkerDeploymentPatch{}, &TemporalWorkerDeploymentPatchList{}) } diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 911e9524..a864c45d 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -399,6 +399,109 @@ func (in *TemporalWorkerDeploymentList) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TemporalWorkerDeploymentPatch) DeepCopyInto(out *TemporalWorkerDeploymentPatch) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TemporalWorkerDeploymentPatch. +func (in *TemporalWorkerDeploymentPatch) DeepCopy() *TemporalWorkerDeploymentPatch { + if in == nil { + return nil + } + out := new(TemporalWorkerDeploymentPatch) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *TemporalWorkerDeploymentPatch) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TemporalWorkerDeploymentPatchList) DeepCopyInto(out *TemporalWorkerDeploymentPatchList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]TemporalWorkerDeploymentPatch, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TemporalWorkerDeploymentPatchList. +func (in *TemporalWorkerDeploymentPatchList) DeepCopy() *TemporalWorkerDeploymentPatchList { + if in == nil { + return nil + } + out := new(TemporalWorkerDeploymentPatchList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *TemporalWorkerDeploymentPatchList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TemporalWorkerDeploymentPatchSpec) DeepCopyInto(out *TemporalWorkerDeploymentPatchSpec) { + *out = *in + if in.Replicas != nil { + in, out := &in.Replicas, &out.Replicas + *out = new(int32) + **out = **in + } + if in.SunsetStrategy != nil { + in, out := &in.SunsetStrategy, &out.SunsetStrategy + *out = new(SunsetStrategy) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TemporalWorkerDeploymentPatchSpec. +func (in *TemporalWorkerDeploymentPatchSpec) DeepCopy() *TemporalWorkerDeploymentPatchSpec { + if in == nil { + return nil + } + out := new(TemporalWorkerDeploymentPatchSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TemporalWorkerDeploymentPatchStatus) DeepCopyInto(out *TemporalWorkerDeploymentPatchStatus) { + *out = *in + if in.AppliedAt != nil { + in, out := &in.AppliedAt, &out.AppliedAt + *out = (*in).DeepCopy() + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TemporalWorkerDeploymentPatchStatus. +func (in *TemporalWorkerDeploymentPatchStatus) DeepCopy() *TemporalWorkerDeploymentPatchStatus { + if in == nil { + return nil + } + out := new(TemporalWorkerDeploymentPatchStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TemporalWorkerDeploymentSpec) DeepCopyInto(out *TemporalWorkerDeploymentSpec) { *out = *in diff --git a/docs/version-patches.md b/docs/version-patches.md new file mode 100644 index 00000000..a78666f0 --- /dev/null +++ b/docs/version-patches.md @@ -0,0 +1,156 @@ +# Version-Specific Configuration Patches + +The Temporal Worker Controller supports version-specific configuration overrides through the `TemporalWorkerDeploymentPatch` custom resource. This allows fine-grained control over individual worker deployment versions without modifying the main `TemporalWorkerDeployment` resource. + +## Overview + +Version patches enable you to: + +- **Scale specific versions independently**: Adjust replica counts for individual versions based on their workload or importance +- **Customize sunset strategies per version**: Apply different cleanup policies to versions based on their lifecycle needs +- **Maintain operational flexibility**: Make per-version adjustments without affecting the overall deployment strategy + +## Use Cases + +### 1. Scaling Down Deprecated Versions + +When a version is deprecated but still processing workflows, you might want to reduce its resource consumption: + +```yaml +apiVersion: temporal.io/v1alpha1 +kind: TemporalWorkerDeploymentPatch +metadata: + name: scale-down-deprecated-v1 +spec: + temporalWorkerDeploymentName: my-worker-deployment + versionID: "my-worker-deployment.v1.0.0" + replicas: 1 # Reduce from default 3 to 1 + sunsetStrategy: + scaledownDelay: 30m # Faster scaledown + deleteDelay: 2h # Quicker cleanup +``` + +### 2. Scaling Up Critical Versions + +For versions handling critical workflows that require higher availability: + +```yaml +apiVersion: temporal.io/v1alpha1 +kind: TemporalWorkerDeploymentPatch +metadata: + name: scale-up-critical-v2 +spec: + temporalWorkerDeploymentName: my-worker-deployment + versionID: "my-worker-deployment.v2.1.0" + replicas: 10 # Increase from default 3 to 10 + sunsetStrategy: + scaledownDelay: 24h # Keep running longer + deleteDelay: 7d # Preserve for a week +``` + +## Patch Status + +The controller automatically updates patch status to indicate whether patches are successfully applied: + +### Status Types + +- **Active**: The patch is applied to an existing version +- **Orphaned**: The referenced version no longer exists +- **Invalid**: The referenced TemporalWorkerDeployment doesn't exist + +### Status Fields + +```yaml +status: + status: Active + appliedAt: "2024-01-15T10:30:00Z" + observedGeneration: 1 +``` + +## Controller Behavior + +### Reconciliation + +1. **Patch Discovery**: The controller watches for changes to `TemporalWorkerDeploymentPatch` resources +2. **Status Updates**: Patch statuses are updated during each reconciliation loop +3. **Continuous Application**: During every reconciliation, patches are applied to compute effective configuration for all operations +4. **Event Propagation**: Changes to patches trigger reconciliation of the target `TemporalWorkerDeployment` + +### How Patches Are Applied + +Patches are applied **continuously during each reconciliation loop**, affecting: + +- **Scaling Operations**: Existing deployments are scaled immediately when patch replica counts differ from current state +- **Sunset Decisions**: Patch sunset strategies are used to determine when to scale down and delete deployments +- **New Deployment Creation**: When new deployments are created, they use the effective configuration including patches + +The controller actively monitors the difference between current deployment state and the effective configuration (including patches) and takes action to reconcile any differences. + +### Conflict Resolution + +- If multiple patches target the same version and field, the last applied patch wins +- Patches are applied in alphabetical order by name for deterministic behavior +- Invalid patches are marked as such and do not affect deployment behavior + +## Best Practices + +### Naming Conventions + +Use descriptive names that indicate the purpose and target: + +```yaml +metadata: + name: scale-down-v1-deprecated + # or + name: extend-sunset-critical-v2 +``` + +### Lifecycle Management + +1. **Create patches proactively** for versions you know will need special handling +2. **Monitor patch statuses** to identify orphaned patches that can be cleaned up +3. **Use labels** to group related patches for easier management: + +```yaml +metadata: + labels: + temporal.io/version-class: deprecated + temporal.io/scaling-policy: conservative +``` + +### Resource Management + +- **Clean up orphaned patches** regularly to avoid resource accumulation +- **Co-locate patches** with their target deployments in the same namespace +- **Set appropriate RBAC** to control who can create/modify patches + +## Limitations + +1. **Namespace Scope**: Patches must be in the same namespace as their target `TemporalWorkerDeployment` +2. **Supported Fields**: Currently only `replicas` and `sunsetStrategy` fields can be overridden +3. **Version Scope**: Patches apply to specific version IDs, not version patterns +4. **Immediate Effect**: Patch changes take effect during the next reconciliation loop (typically within 10 seconds) + +## Monitoring + +### Kubectl Commands + +```bash +# List all patches +kubectl get temporalworkerdeploymentpatches + +# Show patch details +kubectl describe twdpatch scale-down-old-version + +# Check patch status +kubectl get twdpatch -o custom-columns=NAME:.metadata.name,STATUS:.status.status,VERSION:.spec.versionID +``` + +### Metrics + +The controller provides metrics for: +- Number of active/orphaned/invalid patches +- Patch application success/failure rates +- Time since last patch status update + +This enables monitoring and alerting on patch health and effectiveness. \ No newline at end of file diff --git a/helm/temporal-worker-controller/templates/crds/temporal.io_temporalworkerdeploymentpatches.yaml b/helm/temporal-worker-controller/templates/crds/temporal.io_temporalworkerdeploymentpatches.yaml new file mode 100644 index 00000000..9799ed7c --- /dev/null +++ b/helm/temporal-worker-controller/templates/crds/temporal.io_temporalworkerdeploymentpatches.yaml @@ -0,0 +1,90 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.16.2 + name: temporalworkerdeploymentpatches.temporal.io +spec: + group: temporal.io + names: + kind: TemporalWorkerDeploymentPatch + listKind: TemporalWorkerDeploymentPatchList + plural: temporalworkerdeploymentpatches + shortNames: + - twdpatch + - twd-patch + - temporalworkerpatch + singular: temporalworkerdeploymentpatch + scope: Namespaced + versions: + - additionalPrinterColumns: + - description: Target TemporalWorkerDeployment + jsonPath: .spec.temporalWorkerDeploymentName + name: Target + type: string + - description: Target Version ID + jsonPath: .spec.versionID + name: Version + type: string + - description: Override Replicas + jsonPath: .spec.replicas + name: Replicas + type: integer + - description: Patch Status + jsonPath: .status.status + name: Status + type: string + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + name: v1alpha1 + schema: + openAPIV3Schema: + properties: + apiVersion: + type: string + kind: + type: string + metadata: + type: object + spec: + properties: + replicas: + format: int32 + type: integer + sunsetStrategy: + properties: + deleteDelay: + type: string + scaledownDelay: + type: string + type: object + temporalWorkerDeploymentName: + type: string + versionID: + type: string + required: + - temporalWorkerDeploymentName + - versionID + type: object + status: + properties: + appliedAt: + format: date-time + type: string + message: + type: string + observedGeneration: + format: int64 + type: integer + status: + type: string + required: + - status + type: object + type: object + served: true + storage: true + subresources: + status: {} diff --git a/internal/controller/genplan.go b/internal/controller/genplan.go index cd83b383..0f72c8e0 100644 --- a/internal/controller/genplan.go +++ b/internal/controller/genplan.go @@ -13,6 +13,7 @@ import ( appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" "github.com/temporalio/temporal-worker-controller/internal/k8s" @@ -68,6 +69,20 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan( return nil, fmt.Errorf("unable to get Kubernetes deployment state: %w", err) } + // Gather version-specific patches + versionPatches := make(map[string]*temporaliov1alpha1.TemporalWorkerDeploymentPatchSpec) + patchList := &temporaliov1alpha1.TemporalWorkerDeploymentPatchList{} + err = r.Client.List(ctx, patchList, client.MatchingFields{ + patchTargetKey: w.Name, + }, client.InNamespace(w.Namespace)) + if err != nil { + l.Error(err, "failed to get patches, continuing without them") + } else { + for _, patch := range patchList.Items { + versionPatches[patch.Spec.VersionID] = &patch.Spec + } + } + // Create a simple plan structure plan := &plan{ TemporalNamespace: w.Spec.WorkerOptions.TemporalNamespace, @@ -85,6 +100,7 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan( // Generate the plan using the planner package plannerConfig := &planner.Config{ RolloutStrategy: rolloutStrategy, + VersionPatches: versionPatches, } planResult, err := planner.GeneratePlan( @@ -119,7 +135,7 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan( // Handle deployment creation if needed if planResult.ShouldCreateDeployment { _, buildID, _ := k8s.SplitVersionID(targetVersionID) - d, err := r.newDeployment(w, buildID, connection) + d, err := r.newDeployment(ctx, w, buildID, connection) if err != nil { return nil, err } @@ -131,6 +147,7 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan( // Create a new deployment with owner reference func (r *TemporalWorkerDeploymentReconciler) newDeployment( + ctx context.Context, w *temporaliov1alpha1.TemporalWorkerDeployment, buildID string, connection temporaliov1alpha1.TemporalConnectionSpec, diff --git a/internal/controller/patch_management.go b/internal/controller/patch_management.go new file mode 100644 index 00000000..5884c1fa --- /dev/null +++ b/internal/controller/patch_management.go @@ -0,0 +1,197 @@ +// Unless explicitly stated otherwise all files in this repository are licensed under the MIT License. +// +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2024 Datadog, Inc. + +package controller + +import ( + "context" + "fmt" + + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// updatePatchStatuses updates the status of patches based on the current state of versions +func updatePatchStatuses( + ctx context.Context, + k8sClient client.Client, + logger logr.Logger, + workerDeployment *temporaliov1alpha1.TemporalWorkerDeployment, +) error { + // List all patches targeting this worker deployment + patchList := &temporaliov1alpha1.TemporalWorkerDeploymentPatchList{} + err := k8sClient.List(ctx, patchList, client.MatchingFields{ + patchTargetKey: workerDeployment.Name, + }, client.InNamespace(workerDeployment.Namespace)) + if err != nil { + return fmt.Errorf("failed to list patches: %w", err) + } + + // Get current version IDs from the worker deployment status + activeVersions := getActiveVersionIDs(workerDeployment) + + for _, patch := range patchList.Items { + patchNeedsUpdate := false + statusNeedsUpdate := false + + // Check if we need to set OwnerReference + if !hasOwnerReference(&patch, workerDeployment) { + setOwnerReference(&patch, workerDeployment) + patchNeedsUpdate = true + } + + newStatus := determinePatchStatus(patch, workerDeployment, activeVersions) + + if patch.Status.Status != newStatus { + patch.Status.Status = newStatus + patch.Status.ObservedGeneration = patch.Generation + statusNeedsUpdate = true + } + + // Update the patch object if OwnerReference was added + if patchNeedsUpdate { + if err := k8sClient.Update(ctx, &patch); err != nil { + logger.Error(err, "Failed to update patch", "patch", patch.Name) + continue + } + } + + // Update status separately if status was changed + if statusNeedsUpdate { + if err := k8sClient.Status().Update(ctx, &patch); err != nil { + logger.Error(err, "Failed to update patch status", "patch", patch.Name) + continue + } + } + } + + return nil +} + +// determinePatchStatus determines the appropriate status for a patch +func determinePatchStatus( + patch temporaliov1alpha1.TemporalWorkerDeploymentPatch, + workerDeployment *temporaliov1alpha1.TemporalWorkerDeployment, + activeVersions map[string]bool, +) temporaliov1alpha1.PatchStatus { + // Check if the target deployment name matches (namespace is assumed to be the same) + if workerDeployment.Name != patch.Spec.TemporalWorkerDeploymentName { + return temporaliov1alpha1.PatchStatusInvalid + } + + // Check if the version exists + if activeVersions[patch.Spec.VersionID] { + return temporaliov1alpha1.PatchStatusActive + } + + return temporaliov1alpha1.PatchStatusOrphaned +} + +// getActiveVersionIDs extracts all active version IDs from the worker deployment status +func getActiveVersionIDs( + workerDeployment *temporaliov1alpha1.TemporalWorkerDeployment, +) map[string]bool { + versions := make(map[string]bool) + + versions[workerDeployment.Status.TargetVersion.VersionID] = true + + if workerDeployment.Status.CurrentVersion != nil { + versions[workerDeployment.Status.CurrentVersion.VersionID] = true + } + + for _, deprecatedVersion := range workerDeployment.Status.DeprecatedVersions { + versions[deprecatedVersion.VersionID] = true + } + + return versions +} + +// enqueuePatchHandler handles events from TemporalWorkerDeploymentPatch resources +// and enqueues reconciliation requests for the target TemporalWorkerDeployment +type enqueuePatchHandler struct { + client client.Client +} + +// Create implements handler.TypedEventHandler +func (h *enqueuePatchHandler) Create(ctx context.Context, e event.TypedCreateEvent[client.Object], q workqueue.TypedRateLimitingInterface[reconcile.Request]) { + h.enqueuePatch(e.Object, q) +} + +// Update implements handler.TypedEventHandler +func (h *enqueuePatchHandler) Update(ctx context.Context, e event.TypedUpdateEvent[client.Object], q workqueue.TypedRateLimitingInterface[reconcile.Request]) { + h.enqueuePatch(e.ObjectNew, q) +} + +// Delete implements handler.TypedEventHandler +func (h *enqueuePatchHandler) Delete(ctx context.Context, e event.TypedDeleteEvent[client.Object], q workqueue.TypedRateLimitingInterface[reconcile.Request]) { + h.enqueuePatch(e.Object, q) +} + +// Generic implements handler.TypedEventHandler +func (h *enqueuePatchHandler) Generic(ctx context.Context, e event.TypedGenericEvent[client.Object], q workqueue.TypedRateLimitingInterface[reconcile.Request]) { + h.enqueuePatch(e.Object, q) +} + +// enqueuePatch enqueues a reconciliation request for the TemporalWorkerDeployment +// that this patch targets +func (h *enqueuePatchHandler) enqueuePatch(obj client.Object, q workqueue.TypedRateLimitingInterface[reconcile.Request]) { + patch, ok := obj.(*temporaliov1alpha1.TemporalWorkerDeploymentPatch) + if !ok { + return + } + + req := reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: patch.Spec.TemporalWorkerDeploymentName, + Namespace: patch.Namespace, + }, + } + + q.Add(req) +} + +// Ensure enqueuePatchHandler implements the correct event handler interface +var _ handler.TypedEventHandler[client.Object, reconcile.Request] = &enqueuePatchHandler{} + +// hasOwnerReference checks if the patch already has an OwnerReference to the specified TemporalWorkerDeployment +func hasOwnerReference( + patch *temporaliov1alpha1.TemporalWorkerDeploymentPatch, + workerDeployment *temporaliov1alpha1.TemporalWorkerDeployment, +) bool { + for _, ownerRef := range patch.GetOwnerReferences() { + if ownerRef.UID == workerDeployment.GetUID() && + ownerRef.Kind == "TemporalWorkerDeployment" && + ownerRef.APIVersion == temporaliov1alpha1.GroupVersion.String() { + return true + } + } + return false +} + +// setOwnerReference sets the OwnerReference on the patch to point to the TemporalWorkerDeployment +func setOwnerReference( + patch *temporaliov1alpha1.TemporalWorkerDeploymentPatch, + workerDeployment *temporaliov1alpha1.TemporalWorkerDeployment, +) { + blockOwnerDeletion := true + + ownerRef := metav1.OwnerReference{ + APIVersion: temporaliov1alpha1.GroupVersion.String(), + Kind: "TemporalWorkerDeployment", + Name: workerDeployment.GetName(), + UID: workerDeployment.GetUID(), + BlockOwnerDeletion: &blockOwnerDeletion, + Controller: nil, + } + + patch.SetOwnerReferences(append(patch.GetOwnerReferences(), ownerRef)) +} diff --git a/internal/controller/worker_controller.go b/internal/controller/worker_controller.go index 9ac13ad7..24146dc3 100644 --- a/internal/controller/worker_controller.go +++ b/internal/controller/worker_controller.go @@ -31,6 +31,7 @@ var ( const ( // TODO(jlegrone): add this everywhere + // Index key for deployments owned by TemporalWorkerDeployment deployOwnerKey = ".metadata.controller" buildIDLabel = "temporal.io/build-id" ) @@ -45,6 +46,9 @@ type TemporalWorkerDeploymentReconciler struct { //+kubebuilder:rbac:groups=temporal.io,resources=temporalworkerdeployments,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=temporal.io,resources=temporalworkerdeployments/status,verbs=get;update;patch //+kubebuilder:rbac:groups=temporal.io,resources=temporalworkerdeployments/finalizers,verbs=update +//+kubebuilder:rbac:groups=temporal.io,resources=temporalworkerdeploymentpatches,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=temporal.io,resources=temporalworkerdeploymentpatches/status,verbs=get;update;patch +//+kubebuilder:rbac:groups=temporal.io,resources=temporalworkerdeploymentpatches/finalizers,verbs=update //+kubebuilder:rbac:groups=temporal.io,resources=temporalconnections,verbs=get;list;watch //+kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch //+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete @@ -175,6 +179,12 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req return ctrl.Result{}, err } + // Update patch statuses after successful plan execution + if err := updatePatchStatuses(ctx, r.Client, l, &workerDeploy); err != nil { + l.Error(err, "failed to update patch statuses") + // Don't fail reconciliation if patch status update fails + } + return ctrl.Result{ Requeue: true, // TODO(jlegrone): Consider increasing this value if the only thing we need to check for is unreachable versions. @@ -184,6 +194,11 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req }, nil } +const ( + // Index key for patches targeting a TemporalWorkerDeployment + patchTargetKey = ".spec.temporalWorkerDeploymentName" +) + // SetupWithManager sets up the controller with the Manager. func (r *TemporalWorkerDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error { if err := mgr.GetFieldIndexer().IndexField(context.Background(), &appsv1.Deployment{}, deployOwnerKey, func(rawObj client.Object) []string { @@ -206,9 +221,18 @@ func (r *TemporalWorkerDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) return err } + // Index patches by their target TemporalWorkerDeployment + if err := mgr.GetFieldIndexer().IndexField(context.Background(), &temporaliov1alpha1.TemporalWorkerDeploymentPatch{}, patchTargetKey, func(rawObj client.Object) []string { + patch := rawObj.(*temporaliov1alpha1.TemporalWorkerDeploymentPatch) + return []string{patch.Spec.TemporalWorkerDeploymentName} + }); err != nil { + return err + } + return ctrl.NewControllerManagedBy(mgr). For(&temporaliov1alpha1.TemporalWorkerDeployment{}). Owns(&appsv1.Deployment{}). + Watches(&temporaliov1alpha1.TemporalWorkerDeploymentPatch{}, &enqueuePatchHandler{client: mgr.GetClient()}). WithOptions(controller.Options{ MaxConcurrentReconciles: 100, }). diff --git a/internal/planner/planner.go b/internal/planner/planner.go index 51a42b04..0feae3ca 100644 --- a/internal/planner/planner.go +++ b/internal/planner/planner.go @@ -55,6 +55,32 @@ type WorkflowConfig struct { type Config struct { // RolloutStrategy to use RolloutStrategy temporaliov1alpha1.RolloutStrategy + // Version-specific patches that override replicas and sunset strategies + VersionPatches map[string]*temporaliov1alpha1.TemporalWorkerDeploymentPatchSpec +} + +// getEffectiveReplicas returns the effective replicas for a version, considering patches +func getEffectiveReplicas(versionID string, spec *temporaliov1alpha1.TemporalWorkerDeploymentSpec, patches map[string]*temporaliov1alpha1.TemporalWorkerDeploymentPatchSpec) int32 { + if patch, exists := patches[versionID]; exists && patch.Replicas != nil { + return *patch.Replicas + } + return *spec.Replicas +} + +// getEffectiveScaledownDelay returns the effective scaledown delay for a version, considering patches +func getEffectiveScaledownDelay(versionID string, spec *temporaliov1alpha1.TemporalWorkerDeploymentSpec, patches map[string]*temporaliov1alpha1.TemporalWorkerDeploymentPatchSpec) time.Duration { + if patch, exists := patches[versionID]; exists && patch.SunsetStrategy != nil && patch.SunsetStrategy.ScaledownDelay != nil { + return patch.SunsetStrategy.ScaledownDelay.Duration + } + return spec.SunsetStrategy.ScaledownDelay.Duration +} + +// getEffectiveDeleteDelay returns the effective delete delay for a version, considering patches +func getEffectiveDeleteDelay(versionID string, spec *temporaliov1alpha1.TemporalWorkerDeploymentSpec, patches map[string]*temporaliov1alpha1.TemporalWorkerDeploymentPatchSpec) time.Duration { + if patch, exists := patches[versionID]; exists && patch.SunsetStrategy != nil && patch.SunsetStrategy.DeleteDelay != nil { + return patch.SunsetStrategy.DeleteDelay.Duration + } + return spec.SunsetStrategy.DeleteDelay.Duration } // GeneratePlan creates a plan for updating the worker deployment @@ -71,8 +97,8 @@ func GeneratePlan( } // Add delete/scale operations based on version status - plan.DeleteDeployments = getDeleteDeployments(k8sState, status, spec) - plan.ScaleDeployments = getScaleDeployments(k8sState, status, spec) + plan.DeleteDeployments = getDeleteDeployments(k8sState, status, spec, config) + plan.ScaleDeployments = getScaleDeployments(k8sState, status, spec, config) plan.ShouldCreateDeployment = shouldCreateDeployment(status, spec) // Determine if we need to start any test workflows @@ -92,6 +118,7 @@ func getDeleteDeployments( k8sState *k8s.DeploymentState, status *temporaliov1alpha1.TemporalWorkerDeploymentStatus, spec *temporaliov1alpha1.TemporalWorkerDeploymentSpec, + config *Config, ) []*appsv1.Deployment { var deleteDeployments []*appsv1.Deployment @@ -111,7 +138,7 @@ func getDeleteDeployments( // Deleting a deployment is only possible when: // 1. The deployment has been drained for deleteDelay + scaledownDelay. // 2. The deployment is scaled to 0 replicas. - if (time.Since(version.DrainedSince.Time) > spec.SunsetStrategy.DeleteDelay.Duration+spec.SunsetStrategy.ScaledownDelay.Duration) && + if (time.Since(version.DrainedSince.Time) > getEffectiveDeleteDelay(version.VersionID, spec, config.VersionPatches)+getEffectiveScaledownDelay(version.VersionID, spec, config.VersionPatches)) && *d.Spec.Replicas == 0 { deleteDeployments = append(deleteDeployments, d) } @@ -132,26 +159,17 @@ func getScaleDeployments( k8sState *k8s.DeploymentState, status *temporaliov1alpha1.TemporalWorkerDeploymentStatus, spec *temporaliov1alpha1.TemporalWorkerDeploymentSpec, + config *Config, ) map[*v1.ObjectReference]uint32 { scaleDeployments := make(map[*v1.ObjectReference]uint32) - replicas := *spec.Replicas // Scale the current version if needed if status.CurrentVersion != nil && status.CurrentVersion.Deployment != nil { ref := status.CurrentVersion.Deployment if d, exists := k8sState.Deployments[status.CurrentVersion.VersionID]; exists { - if d.Spec.Replicas != nil && *d.Spec.Replicas != replicas { - scaleDeployments[ref] = uint32(replicas) - } - } - } - - // Scale the target version if it exists, and isn't current - if (status.CurrentVersion == nil || status.CurrentVersion.VersionID != status.TargetVersion.VersionID) && - status.TargetVersion.Deployment != nil { - if d, exists := k8sState.Deployments[status.TargetVersion.VersionID]; exists { - if d.Spec.Replicas == nil || *d.Spec.Replicas != replicas { - scaleDeployments[status.TargetVersion.Deployment] = uint32(replicas) + effectiveReplicas := getEffectiveReplicas(status.CurrentVersion.VersionID, spec, config.VersionPatches) + if d.Spec.Replicas != nil && *d.Spec.Replicas != effectiveReplicas { + scaleDeployments[ref] = uint32(effectiveReplicas) } } } @@ -173,11 +191,12 @@ func getScaleDeployments( temporaliov1alpha1.VersionStatusCurrent: // TODO(carlydf): Consolidate scale up cases and verify that scale up is the correct action for inactive versions // Scale up these deployments - if d.Spec.Replicas != nil && *d.Spec.Replicas != replicas { - scaleDeployments[version.Deployment] = uint32(replicas) + effectiveReplicas := getEffectiveReplicas(version.VersionID, spec, config.VersionPatches) + if d.Spec.Replicas != nil && *d.Spec.Replicas != effectiveReplicas { + scaleDeployments[version.Deployment] = uint32(effectiveReplicas) } case temporaliov1alpha1.VersionStatusDrained: - if time.Since(version.DrainedSince.Time) > spec.SunsetStrategy.ScaledownDelay.Duration { + if time.Since(version.DrainedSince.Time) > getEffectiveScaledownDelay(version.VersionID, spec, config.VersionPatches) { // TODO(jlegrone): Compute scale based on load? Or percentage of replicas? // Scale down drained deployments after delay if d.Spec.Replicas != nil && *d.Spec.Replicas != 0 { @@ -187,6 +206,17 @@ func getScaleDeployments( } } + // Scale the target version if it exists and is different from current version + if status.TargetVersion.Deployment != nil && + (status.CurrentVersion == nil || status.TargetVersion.VersionID != status.CurrentVersion.VersionID) { + if d, exists := k8sState.Deployments[status.TargetVersion.VersionID]; exists { + effectiveReplicas := getEffectiveReplicas(status.TargetVersion.VersionID, spec, config.VersionPatches) + if d.Spec.Replicas == nil || *d.Spec.Replicas != effectiveReplicas { + scaleDeployments[status.TargetVersion.Deployment] = uint32(effectiveReplicas) + } + } + } + return scaleDeployments } diff --git a/internal/planner/planner_test.go b/internal/planner/planner_test.go index aaf85d2e..71a0d0b9 100644 --- a/internal/planner/planner_test.go +++ b/internal/planner/planner_test.go @@ -331,6 +331,9 @@ func TestGetDeleteDeployments(t *testing.T) { }, Replicas: func() *int32 { r := int32(1); return &r }(), }, + config: &Config{ + RolloutStrategy: temporaliov1alpha1.RolloutStrategy{}, + }, expectDeletes: 0, }, { @@ -362,6 +365,9 @@ func TestGetDeleteDeployments(t *testing.T) { spec: &temporaliov1alpha1.TemporalWorkerDeploymentSpec{ Replicas: func() *int32 { r := int32(1); return &r }(), }, + config: &Config{ + RolloutStrategy: temporaliov1alpha1.RolloutStrategy{}, + }, expectDeletes: 1, }, } @@ -370,7 +376,7 @@ func TestGetDeleteDeployments(t *testing.T) { t.Run(tc.name, func(t *testing.T) { err := tc.spec.Default(context.Background()) require.NoError(t, err) - deletes := getDeleteDeployments(tc.k8sState, tc.status, tc.spec) + deletes := getDeleteDeployments(tc.k8sState, tc.status, tc.spec, tc.config) assert.Equal(t, tc.expectDeletes, len(deletes), "unexpected number of deletes") }) } @@ -383,6 +389,7 @@ func TestGetScaleDeployments(t *testing.T) { status *temporaliov1alpha1.TemporalWorkerDeploymentStatus spec *temporaliov1alpha1.TemporalWorkerDeploymentSpec state *temporal.TemporalWorkerState + config *Config expectScales int }{ { @@ -411,6 +418,9 @@ func TestGetScaleDeployments(t *testing.T) { spec: &temporaliov1alpha1.TemporalWorkerDeploymentSpec{ Replicas: func() *int32 { r := int32(2); return &r }(), }, + config: &Config{ + RolloutStrategy: temporaliov1alpha1.RolloutStrategy{}, + }, expectScales: 1, }, { @@ -448,6 +458,9 @@ func TestGetScaleDeployments(t *testing.T) { }, Replicas: func() *int32 { r := int32(1); return &r }(), }, + config: &Config{ + RolloutStrategy: temporaliov1alpha1.RolloutStrategy{}, + }, expectScales: 1, }, { @@ -481,6 +494,9 @@ func TestGetScaleDeployments(t *testing.T) { spec: &temporaliov1alpha1.TemporalWorkerDeploymentSpec{ Replicas: func() *int32 { r := int32(3); return &r }(), }, + config: &Config{ + RolloutStrategy: temporaliov1alpha1.RolloutStrategy{}, + }, expectScales: 1, }, { @@ -508,6 +524,9 @@ func TestGetScaleDeployments(t *testing.T) { state: &temporal.TemporalWorkerState{ RampingVersionID: "test/namespace.b", }, + config: &Config{ + RolloutStrategy: temporaliov1alpha1.RolloutStrategy{}, + }, expectScales: 1, }, { @@ -541,6 +560,9 @@ func TestGetScaleDeployments(t *testing.T) { spec: &temporaliov1alpha1.TemporalWorkerDeploymentSpec{ Replicas: func() *int32 { r := int32(3); return &r }(), }, + config: &Config{ + RolloutStrategy: temporaliov1alpha1.RolloutStrategy{}, + }, expectScales: 1, }, { @@ -582,13 +604,16 @@ func TestGetScaleDeployments(t *testing.T) { }, Replicas: func() *int32 { r := int32(3); return &r }(), }, + config: &Config{ + RolloutStrategy: temporaliov1alpha1.RolloutStrategy{}, + }, expectScales: 0, // No scaling yet because not enough time passed }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - scales := getScaleDeployments(tc.k8sState, tc.status, tc.spec) + scales := getScaleDeployments(tc.k8sState, tc.status, tc.spec, tc.config) assert.Equal(t, tc.expectScales, len(scales), "unexpected number of scales") }) } @@ -1709,3 +1734,214 @@ func rolloutStep(ramp float32, d time.Duration) temporaliov1alpha1.RolloutStep { PauseDuration: metav1Duration(d), } } + +func TestVersionPatches(t *testing.T) { + testCases := []struct { + name string + k8sState *k8s.DeploymentState + config *Config + status *temporaliov1alpha1.TemporalWorkerDeploymentStatus + spec *temporaliov1alpha1.TemporalWorkerDeploymentSpec + state *temporal.TemporalWorkerState + expectScales int + expectReplicas map[string]uint32 // version ID -> expected replicas + expectDeletes int + }{ + { + name: "patch overrides replicas for deprecated version", + k8sState: &k8s.DeploymentState{ + Deployments: map[string]*appsv1.Deployment{ + "test/namespace.v1": createDeploymentWithReplicas(5), // Current version already at desired replicas + "test/namespace.v2": createDeploymentWithReplicas(1), // Deprecated version with 1 replica + }, + DeploymentRefs: map[string]*v1.ObjectReference{ + "test/namespace.v1": {Name: "test-v1"}, + "test/namespace.v2": {Name: "test-v2"}, + }, + }, + config: &Config{ + RolloutStrategy: temporaliov1alpha1.RolloutStrategy{}, + VersionPatches: map[string]*temporaliov1alpha1.TemporalWorkerDeploymentPatchSpec{ + "test/namespace.v2": { + VersionID: "test/namespace.v2", + Replicas: func() *int32 { r := int32(10); return &r }(), // Patch overrides to 10 replicas + }, + }, + }, + status: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{ + TargetVersion: temporaliov1alpha1.TargetWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + VersionID: "test/namespace.v1", + Status: temporaliov1alpha1.VersionStatusCurrent, + Deployment: &v1.ObjectReference{Name: "test-v1"}, + }, + }, + CurrentVersion: &temporaliov1alpha1.CurrentWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + VersionID: "test/namespace.v1", + Status: temporaliov1alpha1.VersionStatusCurrent, + Deployment: &v1.ObjectReference{Name: "test-v1"}, + }, + }, + DeprecatedVersions: []*temporaliov1alpha1.DeprecatedWorkerDeploymentVersion{ + { + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + VersionID: "test/namespace.v2", + Status: temporaliov1alpha1.VersionStatusInactive, + Deployment: &v1.ObjectReference{Name: "test-v2"}, + }, + }, + }, + }, + spec: &temporaliov1alpha1.TemporalWorkerDeploymentSpec{ + Replicas: func() *int32 { r := int32(5); return &r }(), + }, + state: &temporal.TemporalWorkerState{}, + expectScales: 1, // Only deprecated version needs scaling + expectReplicas: map[string]uint32{ + "test/namespace.v2": 10, // Should use patched value, not base replicas + }, + }, + { + name: "patch overrides sunset delay for drained version", + k8sState: &k8s.DeploymentState{ + Deployments: map[string]*appsv1.Deployment{ + "test/namespace.v1": createDeploymentWithReplicas(3), + "test/namespace.v2": createDeploymentWithReplicas(2), // Drained version still has replicas + }, + DeploymentRefs: map[string]*v1.ObjectReference{ + "test/namespace.v1": {Name: "test-v1"}, + "test/namespace.v2": {Name: "test-v2"}, + }, + }, + status: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{ + TargetVersion: temporaliov1alpha1.TargetWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + VersionID: "test/namespace.v1", + Status: temporaliov1alpha1.VersionStatusCurrent, + Deployment: &v1.ObjectReference{Name: "test-v1"}, + }, + }, + CurrentVersion: &temporaliov1alpha1.CurrentWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + VersionID: "test/namespace.v1", + Status: temporaliov1alpha1.VersionStatusCurrent, + Deployment: &v1.ObjectReference{Name: "test-v1"}, + }, + }, + DeprecatedVersions: []*temporaliov1alpha1.DeprecatedWorkerDeploymentVersion{ + { + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + VersionID: "test/namespace.v2", + Status: temporaliov1alpha1.VersionStatusDrained, + Deployment: &v1.ObjectReference{Name: "test-v2"}, + }, + DrainedSince: &metav1.Time{ + Time: time.Now().Add(-30 * time.Minute), // Drained 30 minutes ago + }, + }, + }, + }, + spec: &temporaliov1alpha1.TemporalWorkerDeploymentSpec{ + SunsetStrategy: temporaliov1alpha1.SunsetStrategy{ + ScaledownDelay: &metav1.Duration{Duration: 2 * time.Hour}, // Base delay: 2 hours + }, + Replicas: func() *int32 { r := int32(3); return &r }(), + }, + state: &temporal.TemporalWorkerState{}, + config: &Config{ + RolloutStrategy: temporaliov1alpha1.RolloutStrategy{}, + VersionPatches: map[string]*temporaliov1alpha1.TemporalWorkerDeploymentPatchSpec{ + "test/namespace.v2": { + VersionID: "test/namespace.v2", + SunsetStrategy: &temporaliov1alpha1.SunsetStrategy{ + ScaledownDelay: &metav1.Duration{Duration: 15 * time.Minute}, // Patch: much shorter delay + }, + }, + }, + }, + expectScales: 1, // Should scale down because patched delay is shorter + expectReplicas: map[string]uint32{ + "test/namespace.v2": 0, // Should scale to 0 due to patch override + }, + }, + { + name: "no patches applied when none exist", + k8sState: &k8s.DeploymentState{ + Deployments: map[string]*appsv1.Deployment{ + "test/namespace.v1": createDeploymentWithReplicas(5), // Already at desired replicas + "test/namespace.v2": createDeploymentWithReplicas(1), + }, + DeploymentRefs: map[string]*v1.ObjectReference{ + "test/namespace.v1": {Name: "test-v1"}, + "test/namespace.v2": {Name: "test-v2"}, + }, + }, + status: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{ + TargetVersion: temporaliov1alpha1.TargetWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + VersionID: "test/namespace.v1", + Status: temporaliov1alpha1.VersionStatusCurrent, + Deployment: &v1.ObjectReference{Name: "test-v1"}, + }, + }, + CurrentVersion: &temporaliov1alpha1.CurrentWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + VersionID: "test/namespace.v1", + Status: temporaliov1alpha1.VersionStatusCurrent, + Deployment: &v1.ObjectReference{Name: "test-v1"}, + }, + }, + DeprecatedVersions: []*temporaliov1alpha1.DeprecatedWorkerDeploymentVersion{ + { + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + VersionID: "test/namespace.v2", + Status: temporaliov1alpha1.VersionStatusInactive, + Deployment: &v1.ObjectReference{Name: "test-v2"}, + }, + }, + }, + }, + spec: &temporaliov1alpha1.TemporalWorkerDeploymentSpec{ + Replicas: func() *int32 { r := int32(5); return &r }(), + }, + state: &temporal.TemporalWorkerState{}, + config: &Config{ + RolloutStrategy: temporaliov1alpha1.RolloutStrategy{}, + VersionPatches: make(map[string]*temporaliov1alpha1.TemporalWorkerDeploymentPatchSpec), // No patches + }, + expectScales: 1, // Only deprecated version needs scaling + expectReplicas: map[string]uint32{ + "test/namespace.v2": 5, // Should use base replicas (no patch) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := tc.spec.Default(context.Background()) + require.NoError(t, err) + plan, err := GeneratePlan(logr.Discard(), tc.k8sState, tc.status, tc.spec, tc.state, tc.config) + require.NoError(t, err) + + assert.Equal(t, tc.expectScales, len(plan.ScaleDeployments), "unexpected number of scales") + assert.Equal(t, tc.expectDeletes, len(plan.DeleteDeployments), "unexpected number of deletes") + + // Verify the specific replicas for each deployment + for versionID, expectedReplicas := range tc.expectReplicas { + found := false + for objRef, actualReplicas := range plan.ScaleDeployments { + // Find the deployment reference that matches this version + if tc.k8sState.DeploymentRefs[versionID] != nil && + tc.k8sState.DeploymentRefs[versionID].Name == objRef.Name { + assert.Equal(t, expectedReplicas, actualReplicas, + "unexpected replicas for version %s", versionID) + found = true + break + } + } + assert.True(t, found, "expected scaling operation for version %s not found", versionID) + } + }) + } +}