Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to prefix provisioningClassName to filter provisioning requests #7676

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions cluster-autoscaler/FAQ.md
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,11 @@ When using this class, Cluster Autoscaler performs following actions:
Adds a Provisioned=True condition to the ProvReq if capacity is available.
Adds a BookingExpired=True condition when the 10-minute reservation period expires.

Since Cluster Autoscaler version 1.33, it is possible to configure the autoscaler
to process only those check capacity ProvisioningRequests, that have a prefix matching the `--check-capacity-provisioning-class-prefix=<prefix>` flag.
This allows to run two Cluster Autoscalers in the cluster, but instance with the configured prefix
**should only** handle check capacity ProvisioningRequests and not overlap node groups with the main instance.

* `best-effort-atomic-scale-up.autoscaling.x-k8s.io` (supported from Cluster Autoscaler version 1.30.2 or later).
When using this class, Cluster Autoscaler performs following actions:

Expand Down Expand Up @@ -735,12 +740,12 @@ setting the following flag in your Cluster Autoscaler configuration:
3. **Batch Size**: Set the maximum number of CheckCapacity ProvisioningRequests
to process in a single iteration by setting the following flag in your Cluster
Autoscaler configuration:
`--max-batch-size=<batch-size>`. The default value is 10.
`--check-capacity-provisioning-request-max-batch-size=<batch-size>`. The default value is 10.

4. **Batch Timebox**: Set the maximum time in seconds that Cluster Autoscaler will
spend processing CheckCapacity ProvisioningRequests in a single iteration by
setting the following flag in your Cluster Autoscaler configuration:
`--batch-timebox=<timebox>`. The default value is 10s.
`--check-capacity-provisioning-request-batch-timebox=<timebox>`. The default value is 10s.

****************

Expand Down Expand Up @@ -973,13 +978,15 @@ The following startup parameters are supported for cluster autoscaler:
| `bulk-mig-instances-listing-enabled` | Fetch GCE mig instances in bulk instead of per mig | |
| `bypassed-scheduler-names` | Names of schedulers to bypass. If set to non-empty value, CA will not wait for pods to reach a certain age before triggering a scale-up. | |
| `check-capacity-batch-processing` | Whether to enable batch processing for check capacity requests. | |
| `check-capacity-provisioning-class-prefix` | Prefix of provisioningClassName that will be filtered by processors. Only ProvisioningRequests with this prefix in their class will be processed by this CA. It refers only to check capacity ProvisioningRequests. | |
| `check-capacity-provisioning-request-batch-timebox` | Maximum time to process a batch of provisioning requests. | 10s |
| `check-capacity-provisioning-request-max-batch-size` | Maximum number of provisioning requests to process in a single batch. | 10 |
| `cloud-config` | The path to the cloud provider configuration file. Empty string for no configuration file. | |
| `cloud-provider` | Cloud provider type. Available values: [aws,azure,gce,alicloud,cherryservers,cloudstack,baiducloud,magnum,digitalocean,exoscale,externalgrpc,huaweicloud,hetzner,oci,ovhcloud,clusterapi,ionoscloud,kamatera,kwok,linode,bizflycloud,brightbox,equinixmetal,vultr,tencentcloud,civo,scaleway,rancher,volcengine] | "gce" |
| `cloud-provider-gce-l7lb-src-cidrs` | CIDRs opened in GCE firewall for L7 LB traffic proxy & health checks | 130.211.0.0/22,35.191.0.0/16 |
| `cloud-provider-gce-lb-src-cidrs` | CIDRs opened in GCE firewall for L4 LB traffic proxy & health checks | 130.211.0.0/22,209.85.152.0/22,209.85.204.0/22,35.191.0.0/16 |
| `cluster-name` | Autoscaled cluster name, if available | |
| `cluster-snapshot-parallelism` | Maximum parallelism of cluster snapshot creation. | 16 |
| `clusterapi-cloud-config-authoritative` | Treat the cloud-config flag authoritatively (do not fallback to using kubeconfig flag). ClusterAPI only | |
| `cordon-node-before-terminating` | Should CA cordon nodes before terminating during downscale process | |
| `cores-total` | Minimum and maximum number of cores in cluster, in the format <min>:<max>. Cluster autoscaler will not scale the cluster beyond these numbers. | "0:320000" |
Expand Down
4 changes: 4 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,10 @@ type AutoscalingOptions struct {
DynamicResourceAllocationEnabled bool
// ClusterSnapshotParallelism is the maximum parallelism of cluster snapshot creation.
ClusterSnapshotParallelism int
// CheckCapacityProvisioningClassPrefix is the prefix of provisioningClassName that will be filtered by processors.
// Only ProvisioningRequests with this prefix in their class will be processed by this CA.
// It only refers to check capacity ProvisioningRequests.
CheckCapacityProvisioningClassPrefix string
}

// KubeClientOptions specify options for kube client
Expand Down
6 changes: 4 additions & 2 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ var (
forceDeleteLongUnregisteredNodes = flag.Bool("force-delete-unregistered-nodes", false, "Whether to enable force deletion of long unregistered nodes, regardless of the min size of the node group the belong to.")
enableDynamicResourceAllocation = flag.Bool("enable-dynamic-resource-allocation", false, "Whether logic for handling DRA (Dynamic Resource Allocation) objects is enabled.")
clusterSnapshotParallelism = flag.Int("cluster-snapshot-parallelism", 16, "Maximum parallelism of cluster snapshot creation.")
checkCapacityProvisioningClassPrefix = flag.String("check-capacity-provisioning-class-prefix", "", "Prefix of provisioningClassName that will be filtered by processors. Only ProvisioningRequests with this prefix in their class will be processed by this CA. It refers only to check capacity ProvisioningRequests.")
)

func isFlagPassed(name string) bool {
Expand Down Expand Up @@ -464,6 +465,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
ForceDeleteLongUnregisteredNodes: *forceDeleteLongUnregisteredNodes,
DynamicResourceAllocationEnabled: *enableDynamicResourceAllocation,
ClusterSnapshotParallelism: *clusterSnapshotParallelism,
CheckCapacityProvisioningClassPrefix: *checkCapacityProvisioningClassPrefix,
}
}

Expand Down Expand Up @@ -539,7 +541,7 @@ func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot
return nil, nil, err
}

ProvisioningRequestInjector, err = provreq.NewProvisioningRequestPodsInjector(restConfig, opts.ProvisioningRequestInitialBackoffTime, opts.ProvisioningRequestMaxBackoffTime, opts.ProvisioningRequestMaxBackoffCacheSize, opts.CheckCapacityBatchProcessing)
ProvisioningRequestInjector, err = provreq.NewProvisioningRequestPodsInjector(restConfig, opts.ProvisioningRequestInitialBackoffTime, opts.ProvisioningRequestMaxBackoffTime, opts.ProvisioningRequestMaxBackoffCacheSize, opts.CheckCapacityBatchProcessing, opts.CheckCapacityProvisioningClassPrefix)
if err != nil {
return nil, nil, err
}
Expand All @@ -558,7 +560,7 @@ func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot

scaleUpOrchestrator := provreqorchestrator.NewWrapperOrchestrator(provreqOrchestrator)
opts.ScaleUpOrchestrator = scaleUpOrchestrator
provreqProcesor := provreq.NewProvReqProcessor(client)
provreqProcesor := provreq.NewProvReqProcessor(client, opts.CheckCapacityProvisioningClassPrefix)
opts.LoopStartNotifier = loopstart.NewObserversList([]loopstart.Observer{provreqProcesor})

podListProcessor.AddProcessor(provreqProcesor)
Expand Down
77 changes: 40 additions & 37 deletions cluster-autoscaler/processors/provreq/injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ import (

// ProvisioningRequestPodsInjector creates in-memory pods from ProvisioningRequest and inject them to unscheduled pods list.
type ProvisioningRequestPodsInjector struct {
initialRetryTime time.Duration
maxBackoffTime time.Duration
backoffDuration *lru.Cache
clock clock.PassiveClock
client *provreqclient.ProvisioningRequestClient
lastProvisioningRequestProcessTime time.Time
checkCapacityBatchProcessing bool
initialRetryTime time.Duration
maxBackoffTime time.Duration
backoffDuration *lru.Cache
clock clock.PassiveClock
client *provreqclient.ProvisioningRequestClient
lastProvisioningRequestProcessTime time.Time
checkCapacityBatchProcessing bool
checkCapacityProvisioningClassPrefix string
}

// IsAvailableForProvisioning checks if the provisioning request is the correct state for processing and provisioning has not been attempted recently.
Expand Down Expand Up @@ -93,16 +94,24 @@ func (p *ProvisioningRequestPodsInjector) MarkAsFailed(pr *provreqwrapper.Provis
p.UpdateLastProcessTime()
}

func (p *ProvisioningRequestPodsInjector) isSupportedClass(pr *provreqwrapper.ProvisioningRequest) bool {
return provisioningrequest.SupportedProvisioningClass(pr.Spec.ProvisioningClassName, p.checkCapacityProvisioningClassPrefix)
}

func (p *ProvisioningRequestPodsInjector) shouldMarkAsAccepted(pr *provreqwrapper.ProvisioningRequest) bool {
// Don't mark as accepted the check capacity ProvReq when batch processing is enabled.
// It will be marked later, in parallel, during processing the requests.
return !p.checkCapacityBatchProcessing || !p.matchesCheckCapacityClass(pr.Spec.ProvisioningClassName)
}

// GetPodsFromNextRequest picks one ProvisioningRequest meeting the condition passed using isSupportedClass function, marks it as accepted and returns pods from it.
func (p *ProvisioningRequestPodsInjector) GetPodsFromNextRequest(
isSupportedClass func(*provreqwrapper.ProvisioningRequest) bool,
) ([]*apiv1.Pod, error) {
func (p *ProvisioningRequestPodsInjector) GetPodsFromNextRequest() ([]*apiv1.Pod, error) {
provReqs, err := p.client.ProvisioningRequests()
if err != nil {
return nil, err
}
for _, pr := range provReqs {
if !isSupportedClass(pr) {
if !p.isSupportedClass(pr) {
continue
}

Expand All @@ -117,16 +126,13 @@ func (p *ProvisioningRequestPodsInjector) GetPodsFromNextRequest(
p.MarkAsFailed(pr, provreqconditions.FailedToCreatePodsReason, err.Error())
continue
}
// Don't mark as accepted the check capacity ProvReq when batch processing is enabled.
// It will be marked later, in parallel, during processing the requests.
if pr.Spec.ProvisioningClassName == v1.ProvisioningClassCheckCapacity && p.checkCapacityBatchProcessing {
p.UpdateLastProcessTime()
if p.shouldMarkAsAccepted(pr) {
if err := p.MarkAsAccepted(pr); err != nil {
continue
}
return podsFromProvReq, nil
}
if err := p.MarkAsAccepted(pr); err != nil {
continue
}

p.UpdateLastProcessTime()
return podsFromProvReq, nil
}
return nil, nil
Expand All @@ -139,6 +145,10 @@ type ProvisioningRequestWithPods struct {
Pods []*apiv1.Pod
}

func (p *ProvisioningRequestPodsInjector) matchesCheckCapacityClass(provisioningClassName string) bool {
return provisioningClassName == p.checkCapacityProvisioningClassPrefix+v1.ProvisioningClassCheckCapacity
}

// GetCheckCapacityBatch returns up to the requested number of ProvisioningRequestWithPods.
// We do not mark the PRs as accepted here.
// If we fail to get the pods for a PR, we mark the PR as failed and issue an update.
Expand All @@ -152,7 +162,7 @@ func (p *ProvisioningRequestPodsInjector) GetCheckCapacityBatch(maxPrs int) ([]P
if len(prsWithPods) >= maxPrs {
break
}
if pr.Spec.ProvisioningClassName != v1.ProvisioningClassCheckCapacity {
if !p.matchesCheckCapacityClass(pr.Spec.ProvisioningClassName) {
continue
}
if !p.IsAvailableForProvisioning(pr) {
Expand All @@ -175,15 +185,7 @@ func (p *ProvisioningRequestPodsInjector) Process(
_ *context.AutoscalingContext,
unschedulablePods []*apiv1.Pod,
) ([]*apiv1.Pod, error) {
podsFromProvReq, err := p.GetPodsFromNextRequest(
func(pr *provreqwrapper.ProvisioningRequest) bool {
_, found := provisioningrequest.SupportedProvisioningClasses[pr.Spec.ProvisioningClassName]
if !found {
klog.Warningf("Provisioning Class %s is not supported for ProvReq %s/%s", pr.Spec.ProvisioningClassName, pr.Namespace, pr.Name)
}
return found
})

podsFromProvReq, err := p.GetPodsFromNextRequest()
if err != nil {
return unschedulablePods, err
}
Expand All @@ -195,19 +197,20 @@ func (p *ProvisioningRequestPodsInjector) Process(
func (p *ProvisioningRequestPodsInjector) CleanUp() {}

// NewProvisioningRequestPodsInjector creates a ProvisioningRequest filter processor.
func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config, initialBackoffTime, maxBackoffTime time.Duration, maxCacheSize int, checkCapacityBatchProcessing bool) (*ProvisioningRequestPodsInjector, error) {
func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config, initialBackoffTime, maxBackoffTime time.Duration, maxCacheSize int, checkCapacityBatchProcessing bool, checkCapacityProvisioningClassPrefix string) (*ProvisioningRequestPodsInjector, error) {
client, err := provreqclient.NewProvisioningRequestClient(kubeConfig)
if err != nil {
return nil, err
}
return &ProvisioningRequestPodsInjector{
initialRetryTime: initialBackoffTime,
maxBackoffTime: maxBackoffTime,
backoffDuration: lru.New(maxCacheSize),
client: client,
clock: clock.RealClock{},
lastProvisioningRequestProcessTime: time.Now(),
checkCapacityBatchProcessing: checkCapacityBatchProcessing,
initialRetryTime: initialBackoffTime,
maxBackoffTime: maxBackoffTime,
backoffDuration: lru.New(maxCacheSize),
client: client,
clock: clock.RealClock{},
lastProvisioningRequestProcessTime: time.Now(),
checkCapacityBatchProcessing: checkCapacityBatchProcessing,
checkCapacityProvisioningClassPrefix: checkCapacityProvisioningClassPrefix,
}, nil
}

Expand Down
33 changes: 25 additions & 8 deletions cluster-autoscaler/processors/provreq/injector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func TestProvisioningRequestPodsInjector(t *testing.T) {
podsA := 10
newProvReqA := testProvisioningRequestWithCondition("new", podsA, v1.ProvisioningClassCheckCapacity)
newAcceptedProvReqA := testProvisioningRequestWithCondition("new-accepted", podsA, v1.ProvisioningClassCheckCapacity, accepted)
newProvReqAPrefixed := testProvisioningRequestWithCondition("new-prefixed", podsA, "test-prefix.check-capacity.autoscaling.x-k8s.io")

podsB := 20
notProvisionedAcceptedProvReqB := testProvisioningRequestWithCondition("provisioned-false-B", podsB, v1.ProvisioningClassBestEffortAtomicScaleUp, notProvisioned, accepted)
Expand All @@ -79,20 +80,20 @@ func TestProvisioningRequestPodsInjector(t *testing.T) {
unknownClass := testProvisioningRequestWithCondition("new-accepted", podsA, "unknown-class", accepted)

testCases := []struct {
name string
provReqs []*provreqwrapper.ProvisioningRequest
existingUnsUnschedulablePodCount int
checkCapacityBatchProcessing bool
wantUnscheduledPodCount int
wantUpdatedConditionName string
name string
provReqs []*provreqwrapper.ProvisioningRequest
existingUnsUnschedulablePodCount int
checkCapacityBatchProcessing bool
checkCapacityProvisioningClassPrefix string
wantUnscheduledPodCount int
wantUpdatedConditionName string
}{
{
name: "New ProvisioningRequest, pods are injected and Accepted condition is added",
provReqs: []*provreqwrapper.ProvisioningRequest{newProvReqA, provisionedAcceptedProvReqB},
wantUnscheduledPodCount: podsA,
wantUpdatedConditionName: newProvReqA.Name,
},

{
name: "New check capacity ProvisioningRequest with batch processing, pods are injected and Accepted condition is not added",
provReqs: []*provreqwrapper.ProvisioningRequest{newProvReqA, provisionedAcceptedProvReqB},
Expand All @@ -106,6 +107,22 @@ func TestProvisioningRequestPodsInjector(t *testing.T) {
wantUnscheduledPodCount: podsA,
wantUpdatedConditionName: newAcceptedProvReqA.Name,
},
{
name: "New ProvisioningRequest with not matching custom prefix, no pods are injected",
provReqs: []*provreqwrapper.ProvisioningRequest{newProvReqAPrefixed},
},
{
name: "New ProvisioningRequest with not matching prefix, no pods are injected",
provReqs: []*provreqwrapper.ProvisioningRequest{newProvReqA, provisionedAcceptedProvReqB},
checkCapacityProvisioningClassPrefix: "test-prefix.",
},
{
name: "New check capacity ProvisioningRequest with matching prefix, pods are injected and Accepted condition is added",
provReqs: []*provreqwrapper.ProvisioningRequest{newProvReqAPrefixed, provisionedAcceptedProvReqB},
checkCapacityProvisioningClassPrefix: "test-prefix.",
wantUnscheduledPodCount: podsA,
wantUpdatedConditionName: newProvReqAPrefixed.Name,
},
{
name: "Provisioned=False, pods are injected",
provReqs: []*provreqwrapper.ProvisioningRequest{notProvisionedAcceptedProvReqB, failedProvReq},
Expand Down Expand Up @@ -140,7 +157,7 @@ func TestProvisioningRequestPodsInjector(t *testing.T) {
client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...)
backoffTime := lru.New(100)
backoffTime.Add(key(notProvisionedRecentlyProvReqB), 2*time.Minute)
injector := ProvisioningRequestPodsInjector{1 * time.Minute, 10 * time.Minute, backoffTime, clock.NewFakePassiveClock(now), client, now, tc.checkCapacityBatchProcessing}
injector := ProvisioningRequestPodsInjector{1 * time.Minute, 10 * time.Minute, backoffTime, clock.NewFakePassiveClock(now), client, now, tc.checkCapacityBatchProcessing, tc.checkCapacityProvisioningClassPrefix}
getUnscheduledPods, err := injector.Process(nil, provreqwrapper.BuildTestPods("ns", "pod", tc.existingUnsUnschedulablePodCount))
if err != nil {
t.Errorf("%s failed: injector.Process return error %v", tc.name, err)
Expand Down
17 changes: 9 additions & 8 deletions cluster-autoscaler/processors/provreq/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,16 @@ type injector interface {
}

type provReqProcessor struct {
now func() time.Time
maxUpdated int
client *provreqclient.ProvisioningRequestClient
injector injector
now func() time.Time
maxUpdated int
client *provreqclient.ProvisioningRequestClient
injector injector
checkCapacityProvisioningClassPrefix string
}

// NewProvReqProcessor return ProvisioningRequestProcessor.
func NewProvReqProcessor(client *provreqclient.ProvisioningRequestClient) *provReqProcessor {
return &provReqProcessor{now: time.Now, maxUpdated: defaultMaxUpdated, client: client, injector: scheduling.NewHintingSimulator()}
func NewProvReqProcessor(client *provreqclient.ProvisioningRequestClient, checkCapacityProvisioningClassPrefix string) *provReqProcessor {
return &provReqProcessor{now: time.Now, maxUpdated: defaultMaxUpdated, client: client, injector: scheduling.NewHintingSimulator(), checkCapacityProvisioningClassPrefix: checkCapacityProvisioningClassPrefix}
}

// Refresh implements loop.Observer interface and will be run at the start
Expand All @@ -84,7 +85,7 @@ func (p *provReqProcessor) refresh(provReqs []*provreqwrapper.ProvisioningReques
if len(expiredProvReq) >= p.maxUpdated {
break
}
if ok, found := provisioningrequest.SupportedProvisioningClasses[provReq.Spec.ProvisioningClassName]; !ok || !found {
if !provisioningrequest.SupportedProvisioningClass(provReq.Spec.ProvisioningClassName, p.checkCapacityProvisioningClassPrefix) {
continue
}
conditions := provReq.Status.Conditions
Expand Down Expand Up @@ -144,7 +145,7 @@ func (p *provReqProcessor) bookCapacity(ctx *context.AutoscalingContext) error {
}
podsToCreate := []*apiv1.Pod{}
for _, provReq := range provReqs {
if !conditions.ShouldCapacityBeBooked(provReq) {
if !conditions.ShouldCapacityBeBooked(provReq, p.checkCapacityProvisioningClassPrefix) {
continue
}
pods, err := provreq_pods.PodsForProvisioningRequest(provReq)
Expand Down
Loading
Loading