Skip to content

Commit

Permalink
Implement batch processing for check capacity class with combined status
Browse files Browse the repository at this point in the history
  • Loading branch information
Duke0404 committed Oct 24, 2024
1 parent 878c982 commit 63d0275
Show file tree
Hide file tree
Showing 5 changed files with 284 additions and 60 deletions.
6 changes: 6 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,12 @@ type AutoscalingOptions struct {
ProvisioningRequestMaxBackoffTime time.Duration
// ProvisioningRequestMaxCacheSize is the max size for ProvisioningRequest cache that is stored for retry backoff.
ProvisioningRequestMaxBackoffCacheSize int
// CheckCapacityBatchProcessing is used to enable/disable batch processing of check capacity provisioning class
CheckCapacityBatchProcessing bool
// CheckCapacityProvisioningRequestMaxBatchSize is the maximum number of provisioning requests to process in a single batch
CheckCapacityProvisioningRequestMaxBatchSize int
// CheckCapacityProvisioningRequestBatchTimebox is the maximum time to spend processing a batch of provisioning requests
CheckCapacityProvisioningRequestBatchTimebox time.Duration
}

// KubeClientOptions specify options for kube client
Expand Down
59 changes: 37 additions & 22 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,14 +268,17 @@ var (
"--max-graceful-termination-sec flag should not be set when this flag is set. Not setting this flag will use unordered evictor by default."+
"Priority evictor reuses the concepts of drain logic in kubelet(https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/2712-pod-priority-based-graceful-node-shutdown#migration-from-the-node-graceful-shutdown-feature)."+
"Eg. flag usage: '10000:20,1000:100,0:60'")
provisioningRequestsEnabled = flag.Bool("enable-provisioning-requests", false, "Whether the clusterautoscaler will be handling the ProvisioningRequest CRs.")
provisioningRequestInitialBackoffTime = flag.Duration("provisioning-request-initial-backoff-time", 1*time.Minute, "Initial backoff time for ProvisioningRequest retry after failed ScaleUp.")
provisioningRequestMaxBackoffTime = flag.Duration("provisioning-request-max-backoff-time", 10*time.Minute, "Max backoff time for ProvisioningRequest retry after failed ScaleUp.")
provisioningRequestMaxBackoffCacheSize = flag.Int("provisioning-request-max-backoff-cache-size", 1000, "Max size for ProvisioningRequest cache size used for retry backoff mechanism.")
frequentLoopsEnabled = flag.Bool("frequent-loops-enabled", false, "Whether clusterautoscaler triggers new iterations more frequently when it's needed")
asyncNodeGroupsEnabled = flag.Bool("async-node-groups", false, "Whether clusterautoscaler creates and deletes node groups asynchronously. Experimental: requires cloud provider supporting async node group operations, enable at your own risk.")
proactiveScaleupEnabled = flag.Bool("enable-proactive-scaleup", false, "Whether to enable/disable proactive scale-ups, defaults to false")
podInjectionLimit = flag.Int("pod-injection-limit", 5000, "Limits total number of pods while injecting fake pods. If unschedulable pods already exceeds the limit, pod injection is disabled but pods are not truncated.")
provisioningRequestsEnabled = flag.Bool("enable-provisioning-requests", false, "Whether the clusterautoscaler will be handling the ProvisioningRequest CRs.")
provisioningRequestInitialBackoffTime = flag.Duration("provisioning-request-initial-backoff-time", 1*time.Minute, "Initial backoff time for ProvisioningRequest retry after failed ScaleUp.")
provisioningRequestMaxBackoffTime = flag.Duration("provisioning-request-max-backoff-time", 10*time.Minute, "Max backoff time for ProvisioningRequest retry after failed ScaleUp.")
provisioningRequestMaxBackoffCacheSize = flag.Int("provisioning-request-max-backoff-cache-size", 1000, "Max size for ProvisioningRequest cache size used for retry backoff mechanism.")
frequentLoopsEnabled = flag.Bool("frequent-loops-enabled", false, "Whether clusterautoscaler triggers new iterations more frequently when it's needed")
asyncNodeGroupsEnabled = flag.Bool("async-node-groups", false, "Whether clusterautoscaler creates and deletes node groups asynchronously. Experimental: requires cloud provider supporting async node group operations, enable at your own risk.")
proactiveScaleupEnabled = flag.Bool("enable-proactive-scaleup", false, "Whether to enable/disable proactive scale-ups, defaults to false")
podInjectionLimit = flag.Int("pod-injection-limit", 5000, "Limits total number of pods while injecting fake pods. If unschedulable pods already exceeds the limit, pod injection is disabled but pods are not truncated.")
checkCapacityBatchProcessing = flag.Bool("check-capacity-batch-processing", false, "Whether to enable batch processing for check capacity requests.")
checkCapacityProvisioningRequestMaxBatchSize = flag.Int("check-capacity-provisioning-request-max-batch-size", 10, "Maximum number of provisioning requests to process in a single batch.")
checkCapacityProvisioningRequestBatchTimebox = flag.Duration("check-capacity-provisioning-request-batch-timebox", 10*time.Second, "Maximum time to process a batch of provisioning requests.")
)

func isFlagPassed(name string) bool {
Expand Down Expand Up @@ -444,13 +447,16 @@ func createAutoscalingOptions() config.AutoscalingOptions {
MaxAllocatableDifferenceRatio: *maxAllocatableDifferenceRatio,
MaxFreeDifferenceRatio: *maxFreeDifferenceRatio,
},
DynamicNodeDeleteDelayAfterTaintEnabled: *dynamicNodeDeleteDelayAfterTaintEnabled,
BypassedSchedulers: scheduler_util.GetBypassedSchedulersMap(*bypassedSchedulers),
ProvisioningRequestEnabled: *provisioningRequestsEnabled,
AsyncNodeGroupsEnabled: *asyncNodeGroupsEnabled,
ProvisioningRequestInitialBackoffTime: *provisioningRequestInitialBackoffTime,
ProvisioningRequestMaxBackoffTime: *provisioningRequestMaxBackoffTime,
ProvisioningRequestMaxBackoffCacheSize: *provisioningRequestMaxBackoffCacheSize,
DynamicNodeDeleteDelayAfterTaintEnabled: *dynamicNodeDeleteDelayAfterTaintEnabled,
BypassedSchedulers: scheduler_util.GetBypassedSchedulersMap(*bypassedSchedulers),
ProvisioningRequestEnabled: *provisioningRequestsEnabled,
AsyncNodeGroupsEnabled: *asyncNodeGroupsEnabled,
ProvisioningRequestInitialBackoffTime: *provisioningRequestInitialBackoffTime,
ProvisioningRequestMaxBackoffTime: *provisioningRequestMaxBackoffTime,
ProvisioningRequestMaxBackoffCacheSize: *provisioningRequestMaxBackoffCacheSize,
CheckCapacityBatchProcessing: *checkCapacityBatchProcessing,
CheckCapacityProvisioningRequestMaxBatchSize: *checkCapacityProvisioningRequestMaxBatchSize,
CheckCapacityProvisioningRequestBatchTimebox: *checkCapacityProvisioningRequestBatchTimebox,
}
}

Expand Down Expand Up @@ -518,20 +524,29 @@ func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot
if err != nil {
return nil, nil, err
}

ProvisioningRequestInjector, err = provreq.NewProvisioningRequestPodsInjector(restConfig, opts.ProvisioningRequestInitialBackoffTime, opts.ProvisioningRequestMaxBackoffTime, opts.ProvisioningRequestMaxBackoffCacheSize)
if err != nil {
return nil, nil, err
}
podListProcessor.AddProcessor(ProvisioningRequestInjector)

var provisioningRequestPodsInjector *provreq.ProvisioningRequestPodsInjector
if autoscalingOptions.CheckCapacityBatchProcessing {
klog.Infof("Batch processing for check capacity requests is enabled. Passing provisioning request injector to check capacity processor.")
provisioningRequestPodsInjector = ProvisioningRequestInjector
}

provreqOrchestrator := provreqorchestrator.New(client, []provreqorchestrator.ProvisioningClass{
checkcapacity.New(client),
checkcapacity.New(client, provisioningRequestPodsInjector),
besteffortatomic.New(client),
})
scaleUpOrchestrator := provreqorchestrator.NewWrapperOrchestrator(provreqOrchestrator)

scaleUpOrchestrator := provreqorchestrator.NewWrapperOrchestrator(provreqOrchestrator)
opts.ScaleUpOrchestrator = scaleUpOrchestrator
provreqProcesor := provreq.NewProvReqProcessor(client, opts.PredicateChecker)
opts.LoopStartNotifier = loopstart.NewObserversList([]loopstart.Observer{provreqProcesor})
ProvisioningRequestInjector, err = provreq.NewProvisioningRequestPodsInjector(restConfig, opts.ProvisioningRequestInitialBackoffTime, opts.ProvisioningRequestMaxBackoffTime, opts.ProvisioningRequestMaxBackoffCacheSize)
if err != nil {
return nil, nil, err
}
podListProcessor.AddProcessor(ProvisioningRequestInjector)

podListProcessor.AddProcessor(provreqProcesor)
}

Expand Down
Loading

0 comments on commit 63d0275

Please sign in to comment.