Skip to content

Commit

Permalink
Implement unit tests for batch processing of check capacity class
Browse files Browse the repository at this point in the history
  • Loading branch information
Duke0404 committed Sep 16, 2024
1 parent f22fcdf commit bca8b93
Show file tree
Hide file tree
Showing 2 changed files with 213 additions and 19 deletions.
11 changes: 11 additions & 0 deletions cluster-autoscaler/processors/provreq/testutils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package provreq

import (
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/utils/clock/testing"
)

// NewFakePodsInjector creates a new instance of ProvisioningRequestPodsInjector with the given client and clock for testing.
func NewFakePodsInjector(client *provreqclient.ProvisioningRequestClient, clock *testing.FakePassiveClock) *ProvisioningRequestPodsInjector {
return &ProvisioningRequestPodsInjector{client: client, clock: clock}
}
221 changes: 202 additions & 19 deletions cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ import (
appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1"
v1 "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1"
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/config"
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
"k8s.io/autoscaler/cluster-autoscaler/processors/provreq"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/besteffortatomic"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity"
Expand All @@ -44,8 +45,9 @@ import (
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
"k8s.io/autoscaler/cluster-autoscaler/utils/units"
"k8s.io/client-go/kubernetes/fake"
clocktesting "k8s.io/utils/clock/testing"

schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

Expand Down Expand Up @@ -78,6 +80,15 @@ func TestScaleUp(t *testing.T) {
Class: v1.ProvisioningClassCheckCapacity,
})

anotherCheckCapacityCpuProvReq := provreqwrapper.BuildValidTestProvisioningRequestFromOptions(
provreqwrapper.TestProvReqOptions{
Name: "anotherCheckCapacityCpuProvReq",
CPU: "5m",
Memory: "5",
PodCount: int32(100),
Class: v1.ProvisioningClassCheckCapacity,
})

newCheckCapacityMemProvReq := provreqwrapper.BuildValidTestProvisioningRequestFromOptions(
provreqwrapper.TestProvReqOptions{
Name: "newCheckCapacityMemProvReq",
Expand All @@ -86,6 +97,23 @@ func TestScaleUp(t *testing.T) {
PodCount: int32(100),
Class: v1.ProvisioningClassCheckCapacity,
})
impossibleCheckCapacityReq := provreqwrapper.BuildValidTestProvisioningRequestFromOptions(
provreqwrapper.TestProvReqOptions{
Name: "impossibleCheckCapacityRequest",
CPU: "1m",
Memory: "1",
PodCount: int32(5001),
Class: v1.ProvisioningClassCheckCapacity,
})

anotherImpossibleCheckCapacityReq := provreqwrapper.BuildValidTestProvisioningRequestFromOptions(
provreqwrapper.TestProvReqOptions{
Name: "anotherImpossibleCheckCapacityRequest",
CPU: "1m",
Memory: "1",
PodCount: int32(5001),
Class: v1.ProvisioningClassCheckCapacity,
})

// Active atomic scale up requests.
atomicScaleUpProvReq := provreqwrapper.BuildValidTestProvisioningRequestFromOptions(
Expand Down Expand Up @@ -169,6 +197,10 @@ func TestScaleUp(t *testing.T) {
scaleUpResult status.ScaleUpResult
autoprovisioning bool
err bool
batchProcessing bool
maxBatchSize int
batchTimebox time.Duration
numProvisioned int
}{
{
name: "no ProvisioningRequests",
Expand Down Expand Up @@ -236,10 +268,129 @@ func TestScaleUp(t *testing.T) {
autoprovisioning: true,
scaleUpResult: status.ScaleUpSuccessful,
},
// Batch processing tests
{
name: "batch processing of check capacity requests with one request",
provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityCpuProvReq},
provReqToScaleUp: newCheckCapacityCpuProvReq,
scaleUpResult: status.ScaleUpSuccessful,
batchProcessing: true,
maxBatchSize: 3,
batchTimebox: 5 * time.Minute,
numProvisioned: 1,
},
{
name: "batch processing of check capacity requests with less requests than max batch size",
provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityCpuProvReq, newCheckCapacityMemProvReq},
provReqToScaleUp: newCheckCapacityCpuProvReq,
scaleUpResult: status.ScaleUpSuccessful,
batchProcessing: true,
maxBatchSize: 3,
batchTimebox: 5 * time.Minute,
numProvisioned: 2,
},
{
name: "batch processing of check capacity requests with requests equal to max batch size",
provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityCpuProvReq, newCheckCapacityMemProvReq},
provReqToScaleUp: newCheckCapacityCpuProvReq,
scaleUpResult: status.ScaleUpSuccessful,
batchProcessing: true,
maxBatchSize: 2,
batchTimebox: 5 * time.Minute,
numProvisioned: 2,
},
{
name: "batch processing of check capacity requests with more requests than max batch size",
provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityCpuProvReq, newCheckCapacityMemProvReq, anotherCheckCapacityCpuProvReq},
provReqToScaleUp: newCheckCapacityCpuProvReq,
scaleUpResult: status.ScaleUpSuccessful,
batchProcessing: true,
maxBatchSize: 2,
batchTimebox: 5 * time.Minute,
numProvisioned: 2,
},
{
name: "batch processing of check capacity requests where cluster contains already provisioned requests",
provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityCpuProvReq, bookedCapacityProvReq, anotherCheckCapacityCpuProvReq},
provReqToScaleUp: newCheckCapacityCpuProvReq,
scaleUpResult: status.ScaleUpSuccessful,
batchProcessing: true,
maxBatchSize: 2,
batchTimebox: 5 * time.Minute,
numProvisioned: 3,
},
{
name: "batch processing of check capacity requests where timebox is exceeded",
provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityCpuProvReq, newCheckCapacityMemProvReq},
provReqToScaleUp: newCheckCapacityCpuProvReq,
scaleUpResult: status.ScaleUpSuccessful,
batchProcessing: true,
maxBatchSize: 5,
batchTimebox: 1 * time.Nanosecond,
numProvisioned: 1,
},
{
name: "batch processing of check capacity requests where max batch size is invalid",
provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityCpuProvReq, newCheckCapacityMemProvReq},
provReqToScaleUp: newCheckCapacityCpuProvReq,
scaleUpResult: status.ScaleUpSuccessful,
batchProcessing: true,
maxBatchSize: 0,
batchTimebox: 5 * time.Minute,
numProvisioned: 1,
},
{
name: "batch processing of check capacity requests where best effort atomic scale-up request is also present in cluster",
provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityMemProvReq, newCheckCapacityCpuProvReq, atomicScaleUpProvReq},
provReqToScaleUp: newCheckCapacityMemProvReq,
scaleUpResult: status.ScaleUpSuccessful,
batchProcessing: true,
maxBatchSize: 2,
batchTimebox: 5 * time.Minute,
numProvisioned: 2,
},
{
name: "process atomic scale-up requests where batch processing of check capacity requests is enabled",
provReqs: []*provreqwrapper.ProvisioningRequest{possibleAtomicScaleUpReq},
provReqToScaleUp: possibleAtomicScaleUpReq,
scaleUpResult: status.ScaleUpSuccessful,
batchProcessing: true,
maxBatchSize: 3,
batchTimebox: 5 * time.Minute,
numProvisioned: 1,
},
{
name: "process atomic scale-up requests where batch processing of check capacity requests is enabled and check capacity requests are present in cluster",
provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityMemProvReq, newCheckCapacityCpuProvReq, atomicScaleUpProvReq},
provReqToScaleUp: atomicScaleUpProvReq,
scaleUpResult: status.ScaleUpNotNeeded,
batchProcessing: true,
maxBatchSize: 3,
batchTimebox: 5 * time.Minute,
numProvisioned: 1,
},
{
name: "batch processing of check capacity requests where some requests' capacity is not available",
provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityMemProvReq, impossibleCheckCapacityReq, newCheckCapacityCpuProvReq},
provReqToScaleUp: newCheckCapacityMemProvReq,
scaleUpResult: status.ScaleUpSuccessful,
batchProcessing: true,
maxBatchSize: 3,
batchTimebox: 5 * time.Minute,
numProvisioned: 3,
},
{
name: "batch processing of check capacity requests where all requests' capacity is not available",
provReqs: []*provreqwrapper.ProvisioningRequest{impossibleCheckCapacityReq, anotherImpossibleCheckCapacityReq},
provReqToScaleUp: impossibleCheckCapacityReq,
scaleUpResult: status.ScaleUpNoOptionsAvailable,
batchProcessing: true,
maxBatchSize: 3,
batchTimebox: 5 * time.Minute,
numProvisioned: 2,
},
}
for _, tc := range testCases {
tc := tc
allNodes := allNodes
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

Expand All @@ -252,7 +403,8 @@ func TestScaleUp(t *testing.T) {
}
return fmt.Errorf("unexpected scale-up of %s by %d", name, n)
}
orchestrator, nodeInfos := setupTest(t, allNodes, tc.provReqs, onScaleUpFunc, tc.autoprovisioning)
client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...)
orchestrator, nodeInfos := setupTest(t, client, allNodes, onScaleUpFunc, tc.autoprovisioning, tc.batchProcessing, tc.maxBatchSize, tc.batchTimebox)

st, err := orchestrator.ScaleUp(prPods, []*apiv1.Node{}, []*appsv1.DaemonSet{}, nodeInfos, false)
if !tc.err {
Expand All @@ -263,14 +415,26 @@ func TestScaleUp(t *testing.T) {
t.Errorf("noScaleUpInfo: %#v", st.PodsRemainUnschedulable[0].RejectedNodeGroups)
}
assert.Equal(t, tc.scaleUpResult, st.Result)

// Stopgap solution to ensure that fake client has time to update the status of the provisioned requests.
time.Sleep(1 * time.Millisecond)

provReqsAfterScaleUp, err := client.ProvisioningRequests()
assert.NoError(t, err)
assert.Equal(t, len(tc.provReqs), len(provReqsAfterScaleUp))

if tc.batchProcessing {
// Since batch processing returns aggregated result, we need to check the number of provisioned requests which have the provisioned condition.
assert.Equal(t, tc.numProvisioned, NumProvisioningRequestsWithCondition(provReqsAfterScaleUp, v1.Provisioned))
}
} else {
assert.Error(t, err)
}
})
}
}

func setupTest(t *testing.T, nodes []*apiv1.Node, prs []*provreqwrapper.ProvisioningRequest, onScaleUpFunc func(string, int) error, autoprovisioning bool) (*provReqOrchestrator, map[string]*schedulerframework.NodeInfo) {
func setupTest(t *testing.T, client *provreqclient.ProvisioningRequestClient, nodes []*apiv1.Node, onScaleUpFunc func(string, int) error, autoprovisioning bool, batchProcessing bool, maxBatchSize int, batchTimebox time.Duration) (*provReqOrchestrator, map[string]*schedulerframework.NodeInfo) {
provider := testprovider.NewTestCloudProvider(onScaleUpFunc, nil)
if autoprovisioning {
machineTypes := []string{"large-machine"}
Expand All @@ -292,11 +456,18 @@ func setupTest(t *testing.T, nodes []*apiv1.Node, prs []*provreqwrapper.Provisio

podLister := kube_util.NewTestPodLister(nil)
listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil)
autoscalingContext, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, &fake.Clientset{}, listers, provider, nil, nil)

options := config.AutoscalingOptions{}
if batchProcessing {
options.CheckCapacityBatchProcessing = true
options.MaxBatchSize = maxBatchSize
options.BatchTimebox = batchTimebox
}

autoscalingContext, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)

clustersnapshot.InitializeClusterSnapshotOrDie(t, autoscalingContext.ClusterSnapshot, nodes, nil)
client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, prs...)
processors := NewTestProcessors(&autoscalingContext)
if autoprovisioning {
processors.NodeGroupListProcessor = &MockAutoprovisioningNodeGroupListProcessor{T: t}
Expand All @@ -307,29 +478,41 @@ func setupTest(t *testing.T, nodes []*apiv1.Node, prs []*provreqwrapper.Provisio
nodeInfos, err := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&autoscalingContext, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
assert.NoError(t, err)

options := config.AutoscalingOptions{
EstimatorName: estimator.BinpackingEstimatorName,
MaxCoresTotal: config.DefaultMaxClusterCores,
MaxMemoryTotal: config.DefaultMaxClusterMemory * units.GiB,
MinCoresTotal: 0,
MinMemoryTotal: 0,
NodeAutoprovisioningEnabled: autoprovisioning,
MaxAutoprovisionedNodeGroupCount: 10,
}
estimatorBuilder, _ := estimator.NewEstimatorBuilder(
estimator.BinpackingEstimatorName,
estimator.NewThresholdBasedEstimationLimiter(nil),
estimator.NewDecreasingPodOrderer(),
nil,
)

clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingContext.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingContext.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(autoscalingContext.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker)
clusterState.UpdateNodes(nodes, nodeInfos, now)

var injector *provreq.ProvisioningRequestPodsInjector
if batchProcessing {
injector = provreq.NewFakePodsInjector(client, clocktesting.NewFakePassiveClock(now))
}

orchestrator := &provReqOrchestrator{
client: client,
provisioningClasses: []ProvisioningClass{checkcapacity.New(client, nil), besteffortatomic.New(client)},
provisioningClasses: []ProvisioningClass{checkcapacity.New(client, injector), besteffortatomic.New(client)},
}

orchestrator.Initialize(&autoscalingContext, processors, clusterState, estimatorBuilder, taints.TaintConfig{})
return orchestrator, nodeInfos
}

func NumProvisioningRequestsWithCondition(prList []*provreqwrapper.ProvisioningRequest, conditionType string) int {
count := 0

for _, pr := range prList {
for _, c := range pr.Status.Conditions {
if c.Type == conditionType {
count++
break
}
}
}

return count
}

0 comments on commit bca8b93

Please sign in to comment.