Skip to content

Commit

Permalink
Add proactive scaleup
Browse files Browse the repository at this point in the history
  • Loading branch information
abdelrahman882 committed Aug 8, 2024
1 parent 522c6fc commit 294b959
Show file tree
Hide file tree
Showing 14 changed files with 1,583 additions and 0 deletions.
26 changes: 26 additions & 0 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ import (
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
"k8s.io/autoscaler/cluster-autoscaler/processors/podinjection"
podinjectionbackoff "k8s.io/autoscaler/cluster-autoscaler/processors/podinjection/backoff"
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
"k8s.io/autoscaler/cluster-autoscaler/processors/provreq"
"k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates"
"k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates/emptycandidates"
Expand Down Expand Up @@ -265,6 +268,9 @@ var (
"Eg. flag usage: '10000:20,1000:100,0:60'")
provisioningRequestsEnabled = flag.Bool("enable-provisioning-requests", false, "Whether the clusterautoscaler will be handling the ProvisioningRequest CRs.")
frequentLoopsEnabled = flag.Bool("frequent-loops-enabled", false, "Whether clusterautoscaler triggers new iterations more frequently when it's needed")
proactiveScaleupEnabled = flag.Bool("enable-proactive-scaleup", false, "Whether to enable/disable proactive scale-ups, defaults to false")
podInjectionLimit = flag.Int("pod-injection-limit", 5000, "Limits total number of pods while injecting fake pods. If unschedulable pods already exceeds the limit, pod injection is disabled but pods are not truncated.")
nodeLimit = flag.Int("node-limit", 15000, "Limits total number of nodes in cluster to avoid overloading KCP, only used when --enable-proactive-scaleup is set to true. Defaults to 15k nodes")
)

func isFlagPassed(name string) bool {
Expand Down Expand Up @@ -525,6 +531,26 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
podListProcessor.AddProcessor(injector)
podListProcessor.AddProcessor(provreqProcesor)
}

if *proactiveScaleupEnabled {
podInjectionBackoffRegistry := podinjectionbackoff.NewFakePodControllerRegistry()
if *nodeLimit <= 0 {
klog.Warningf("Disabling Node limiting for proactive-scaleups. Expected valid positive value for node-limit, found: %v", *nodeLimit)
} else if *maxNodesTotal > 0 {
*maxNodesTotal = min(*nodeLimit, *maxNodesTotal)
} else {
*maxNodesTotal = *nodeLimit
}
podInjectionPodListProcessor := podinjection.NewPodInjectionPodListProcessor(podInjectionBackoffRegistry)
enforceInjectedPodsLimitProcessor := podinjection.NewEnforceInjectedPodsLimitProcessor(*podInjectionLimit)

podListProcessor = pods.NewCombinedPodListProcessor([]pods.PodListProcessor{podInjectionPodListProcessor, podListProcessor, enforceInjectedPodsLimitProcessor})

// FakePodsScaleUpStatusProcessor processor needs to be the first processor in ScaleUpStatusProcessor as it filters out fake pods from
// Scale Up status so that we don't emit events, visibility logs, ..etc for them.
opts.Processors.ScaleUpStatusProcessor = podinjection.NewFakePodsScaleUpStatusProcessor(podInjectionBackoffRegistry)
}

opts.Processors.PodListProcessor = podListProcessor
scaleDownCandidatesComparers := []scaledowncandidates.CandidatesComparer{}
if autoscalingOptions.ParallelDrain {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package podinjectionbackoff

import (
"time"

"github.com/cenkalti/backoff/v4"
"k8s.io/apimachinery/pkg/types"
)

const (
baseBackoff = 5 * time.Minute
backoffThreshold = 30 * time.Minute
)

// Describes a backed off controller
type controllerEntry struct {
until time.Time
backoff backoff.ExponentialBackOff
}

// Registry containing backed off controllers to be used in time-based backing off of controllers considered in fake pod injection
type ControllerRegistry struct {
backedOffControllers map[types.UID]controllerEntry
}

// Creates & returns an instance of fakePodControllerBackoffRegistry
func NewFakePodControllerRegistry() *ControllerRegistry {
return &ControllerRegistry{
backedOffControllers: make(map[types.UID]controllerEntry),
}
}

// newExponentialBackOff creates an instance of ExponentialBackOff using non-default values.
func newExponentialBackOff(clock backoff.Clock) backoff.ExponentialBackOff {
b := backoff.ExponentialBackOff{
InitialInterval: baseBackoff,
// Disables randomization for easier testing and better predictability
RandomizationFactor: 0,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: backoffThreshold,
// Disable stopping if it reaches threshold
MaxElapsedTime: 0,
Stop: backoff.Stop,
Clock: clock,
}
b.Reset()
return b
}

// Backs off a controller
// If the controller is already in backoff it's backoff time is exponentially increased
// If the controller was in backoff, it resets its entry and makes it in backoff
// If the controller is not in backoff and not stored, a new entry is created
func (r *ControllerRegistry) BackoffController(ownerUID types.UID, now time.Time) {
if ownerUID == "" {
return
}

controller, found := r.backedOffControllers[ownerUID]

if !found || now.After(controller.until) {
controller = controllerEntry{
backoff: newExponentialBackOff(backoff.SystemClock),
}
}
// NextBackOff() needs to be called to increase the next interval
controller.until = now.Add(controller.backoff.NextBackOff())

r.backedOffControllers[ownerUID] = controller
}

// Returns the back off status a controller with id `uid`
func (r *ControllerRegistry) BackOffUntil(uid types.UID, now time.Time) time.Time {
controller, found := r.backedOffControllers[uid]

if !found {
return time.Time{}
}

return controller.until
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package podinjectionbackoff

import (
"testing"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/types"
)

func TestBackoffControllerOfPod(t *testing.T) {
c1 := types.UID("c1")
c2 := types.UID("c2")
clock := &clock{}

testCases := map[string]struct {
backoffCounts map[types.UID]int
spendTime time.Duration
expectedBackedoffControllers map[types.UID]controllerEntry
}{
"backing-off a controller adds its controller UID in backoff correctly": {
backoffCounts: map[types.UID]int{
c1: 1,
},
expectedBackedoffControllers: map[types.UID]controllerEntry{
c1: {
until: clock.now.Add(baseBackoff),
},
},
},
"backing-off an already backed-off controller exponentially increases backoff duration": {
backoffCounts: map[types.UID]int{
c1: 2,
},
expectedBackedoffControllers: map[types.UID]controllerEntry{
c1: {
until: clock.now.Add(time.Duration(float64(baseBackoff) * backoff.DefaultMultiplier)),
},
},
},
"backing-off a controller doesn't affect other controllers": {
backoffCounts: map[types.UID]int{
c1: 1,
c2: 2,
},
expectedBackedoffControllers: map[types.UID]controllerEntry{
c1: {
until: clock.now.Add(baseBackoff),
},
c2: {
until: clock.now.Add(time.Duration(float64(baseBackoff) * backoff.DefaultMultiplier)),
},
},
},
"backing-off a past backed-off controller resets backoff": {
backoffCounts: map[types.UID]int{
c1: 1,
},
spendTime: baseBackoff * 2,
expectedBackedoffControllers: map[types.UID]controllerEntry{
c1: {
until: clock.now.Add(baseBackoff * 2).Add(baseBackoff),
},
},
},
"back-off duration doesn't exceed backoffThreshold": {
backoffCounts: map[types.UID]int{
c1: 15,
},
expectedBackedoffControllers: map[types.UID]controllerEntry{
c1: {
until: clock.now.Add(backoffThreshold),
},
},
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
// Reset time between test cases
clock.now = time.Time{}
clock.now = clock.now.Add(tc.spendTime)

registry := NewFakePodControllerRegistry()

for uid, backoffCount := range tc.backoffCounts {
for i := 0; i < backoffCount; i++ {
registry.BackoffController(uid, clock.now)
}
}

assert.Equal(t, len(registry.backedOffControllers), len(tc.expectedBackedoffControllers))
for uid, backoffController := range tc.expectedBackedoffControllers {
assert.NotNil(t, registry.backedOffControllers[uid])
assert.Equal(t, backoffController.until, registry.backedOffControllers[uid].until)
}
})
}
}

type clock struct {
now time.Time
}

func (c *clock) Now() time.Time {
return c.now
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package podinjection

import (
"time"

apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
)

const (
EnforceFakePodsLimitDuration = "total-time-to-enforce-injected-pods-limit"
)

// EnforceFakePodsLimitProcessor is a PodListProcessor used to limit the number of injected fake pods.
type EnforceInjectedPodsLimitProcessor struct {
podLimit int
}

// NewEnforceFakePodsLimitProcessor return an instance of EnforceFakePodsLimitProcessor
func NewEnforceInjectedPodsLimitProcessor(podLimit int) *EnforceInjectedPodsLimitProcessor {
return &EnforceInjectedPodsLimitProcessor{
podLimit: podLimit,
}
}

func (p *EnforceInjectedPodsLimitProcessor) Process(ctx *context.AutoscalingContext, unschedulablePods []*apiv1.Pod) ([]*apiv1.Pod, error) {

defer metrics.UpdateDurationFromStart(EnforceFakePodsLimitDuration, time.Now())

numberOfFakePodsToRemove := len(unschedulablePods) - p.podLimit
var unschedulablePodsAfterProcessing []*apiv1.Pod

for _, pod := range unschedulablePods {
if IsFake(pod) && numberOfFakePodsToRemove > 0 {
numberOfFakePodsToRemove -= 1
continue
}

unschedulablePodsAfterProcessing = append(unschedulablePodsAfterProcessing, pod)
}

return unschedulablePodsAfterProcessing, nil
}

func (p *EnforceInjectedPodsLimitProcessor) CleanUp() {
}
Loading

0 comments on commit 294b959

Please sign in to comment.