Skip to content

Commit

Permalink
Introduce activationThreshold for GCP scalers (#3414)
Browse files Browse the repository at this point in the history
* Introduce activationThreshold for GCP scalers

Signed-off-by: Ram Cohen <[email protected]>

* Fixes

Signed-off-by: Ram Cohen <[email protected]>

* formatting

Signed-off-by: Ram Cohen <[email protected]>

* Fix storage active trigger

Signed-off-by: Ram Cohen <[email protected]>
  • Loading branch information
RamCohen authored Jul 24, 2022
1 parent 6609084 commit eb9d5ce
Show file tree
Hide file tree
Showing 9 changed files with 283 additions and 153 deletions.
20 changes: 15 additions & 5 deletions pkg/scalers/gcp_pubsub_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ type pubsubScaler struct {
}

type pubsubMetadata struct {
mode string
value int64
mode string
value int64
activationValue int64

subscriptionName string
gcpAuthorization *gcpAuthorizationMetadata
Expand Down Expand Up @@ -115,6 +116,15 @@ func parsePubSubMetadata(config *ScalerConfig) (*pubsubMetadata, error) {
return nil, fmt.Errorf("no subscription name given")
}

meta.activationValue = 0
if val, ok := config.TriggerMetadata["activationValue"]; ok {
activationValue, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("activationValue parsing error %s", err.Error())
}
meta.activationValue = activationValue
}

auth, err := getGcpAuthorization(config, config.ResolvedEnv)
if err != nil {
return nil, err
Expand All @@ -133,14 +143,14 @@ func (s *pubsubScaler) IsActive(ctx context.Context) (bool, error) {
gcpPubSubLog.Error(err, "error getting Active Status")
return false, err
}
return size > 0, nil
return size > s.metadata.activationValue, nil
case pubsubModeOldestUnackedMessageAge:
_, err := s.getMetrics(ctx, pubSubStackDriverOldestUnackedMessageAgeMetricName)
delay, err := s.getMetrics(ctx, pubSubStackDriverOldestUnackedMessageAgeMetricName)
if err != nil {
gcpPubSubLog.Error(err, "error getting Active Status")
return false, err
}
return true, nil
return delay > s.metadata.activationValue, nil
default:
return false, errors.New("unknown mode")
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/scalers/gcp_pubsub_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var testPubSubMetadata = []parsePubSubMetadataTestData{
// all properly formed with deprecated field
{nil, map[string]string{"subscriptionName": "mysubscription", "subscriptionSize": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
// all properly formed
{nil, map[string]string{"subscriptionName": "mysubscription", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
{nil, map[string]string{"subscriptionName": "mysubscription", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS", "activationValue": "5"}, false},
// all properly formed with oldest unacked message age mode
{nil, map[string]string{"subscriptionName": "mysubscription", "mode": pubsubModeOldestUnackedMessageAge, "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
// missing subscriptionName
Expand All @@ -44,6 +44,8 @@ var testPubSubMetadata = []parsePubSubMetadataTestData{
{nil, map[string]string{"subscriptionName": "mysubscription", "value": "AA", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// malformed mode
{nil, map[string]string{"subscriptionName": "", "mode": "AA", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// malformed activationTargetValue
{nil, map[string]string{"subscriptionName": "mysubscription", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS", "activationValue": "AA"}, true},
// Credentials from AuthParams
{map[string]string{"GoogleApplicationCredentials": "Creds", "podIdentityOwner": ""}, map[string]string{"subscriptionName": "mysubscription", "value": "7"}, false},
// Credentials from AuthParams with empty creds
Expand All @@ -60,8 +62,8 @@ var gcpPubSubMetricIdentifiers = []gcpPubSubMetricIdentifier{
}

var gcpSubscriptionNameTests = []gcpPubSubSubscription{
{&testPubSubMetadata[10], 1, "mysubscription", "myproject"},
{&testPubSubMetadata[11], 1, "projects/myproject/mysubscription", ""},
{&testPubSubMetadata[11], 1, "mysubscription", "myproject"},
{&testPubSubMetadata[12], 1, "projects/myproject/mysubscription", ""},
}

func TestPubSubParseMetadata(t *testing.T) {
Expand Down
20 changes: 15 additions & 5 deletions pkg/scalers/gcp_stackdriver_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ type stackdriverScaler struct {
}

type stackdriverMetadata struct {
projectID string
filter string
targetValue int64
metricName string
projectID string
filter string
targetValue int64
activationTargetValue int64
metricName string

gcpAuthorization *gcpAuthorizationMetadata
aggregation *monitoringpb.Aggregation
Expand Down Expand Up @@ -98,6 +99,15 @@ func parseStackdriverMetadata(config *ScalerConfig) (*stackdriverMetadata, error
meta.targetValue = targetValue
}

meta.activationTargetValue = 0
if val, ok := config.TriggerMetadata["activationTargetValue"]; ok {
activationTargetValue, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("activationTargetValue parsing error %s", err.Error())
}
meta.activationTargetValue = activationTargetValue
}

auth, err := getGcpAuthorization(config, config.ResolvedEnv)
if err != nil {
return nil, err
Expand Down Expand Up @@ -156,7 +166,7 @@ func (s *stackdriverScaler) IsActive(ctx context.Context) (bool, error) {
gcpStackdriverLog.Error(err, "error getting metric value")
return false, err
}
return value > 0, nil
return value > s.metadata.activationTargetValue, nil
}

func (s *stackdriverScaler) Close(context.Context) error {
Expand Down
4 changes: 3 additions & 1 deletion pkg/scalers/gcp_stackdriver_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ var sdFilter = "metric.type=\"storage.googleapis.com/storage/object_count\" reso
var testStackdriverMetadata = []parseStackdriverMetadataTestData{
{map[string]string{}, map[string]string{}, true},
// all properly formed
{nil, map[string]string{"projectId": "myProject", "filter": sdFilter, "targetValue": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
{nil, map[string]string{"projectId": "myProject", "filter": sdFilter, "targetValue": "7", "credentialsFromEnv": "SAMPLE_CREDS", "activationTargetValue": "5"}, false},
// all required properly formed
{nil, map[string]string{"projectId": "myProject", "filter": sdFilter, "credentialsFromEnv": "SAMPLE_CREDS"}, false},
// missing projectId
Expand All @@ -37,6 +37,8 @@ var testStackdriverMetadata = []parseStackdriverMetadataTestData{
{nil, map[string]string{"projectId": "myProject", "filter": sdFilter, "targetValue": "7"}, true},
// malformed targetValue
{nil, map[string]string{"projectId": "myProject", "filter": sdFilter, "targetValue": "aa", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// malformed activationTargetValue
{nil, map[string]string{"projectId": "myProject", "filter": sdFilter, "credentialsFromEnv": "SAMPLE_CREDS", "activationTargetValue": "a"}, true},
// Credentials from AuthParams
{map[string]string{"GoogleApplicationCredentials": "Creds", "podIdentityOwner": ""}, map[string]string{"projectId": "myProject", "filter": sdFilter}, false},
// Credentials from AuthParams with empty creds
Expand Down
30 changes: 20 additions & 10 deletions pkg/scalers/gcp_storage_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ type gcsScaler struct {
}

type gcsMetadata struct {
bucketName string
gcpAuthorization *gcpAuthorizationMetadata
maxBucketItemsToScan int
metricName string
targetObjectCount int64
bucketName string
gcpAuthorization *gcpAuthorizationMetadata
maxBucketItemsToScan int64
metricName string
targetObjectCount int64
activationTargetObjectCount int64
}

var gcsLog = logf.Log.WithName("gcp_storage_scaler")
Expand Down Expand Up @@ -114,8 +115,17 @@ func parseGcsMetadata(config *ScalerConfig) (*gcsMetadata, error) {
meta.targetObjectCount = targetObjectCount
}

meta.activationTargetObjectCount = 0
if val, ok := config.TriggerMetadata["activationTargetObjectCount"]; ok {
activationTargetObjectCount, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("activationTargetObjectCount parsing error %s", err.Error())
}
meta.activationTargetObjectCount = activationTargetObjectCount
}

if val, ok := config.TriggerMetadata["maxBucketItemsToScan"]; ok {
maxBucketItemsToScan, err := strconv.Atoi(val)
maxBucketItemsToScan, err := strconv.ParseInt(val, 10, 64)
if err != nil {
gcsLog.Error(err, "Error parsing maxBucketItemsToScan")
return nil, fmt.Errorf("error parsing maxBucketItemsToScan: %s", err.Error())
Expand All @@ -138,12 +148,12 @@ func parseGcsMetadata(config *ScalerConfig) (*gcsMetadata, error) {

// IsActive checks if there are any messages in the subscription
func (s *gcsScaler) IsActive(ctx context.Context) (bool, error) {
items, err := s.getItemCount(ctx, 1)
items, err := s.getItemCount(ctx, s.metadata.activationTargetObjectCount+1)
if err != nil {
return false, err
}

return items > 0, nil
return items > s.metadata.activationTargetObjectCount, nil
}

func (s *gcsScaler) Close(context.Context) error {
Expand Down Expand Up @@ -178,7 +188,7 @@ func (s *gcsScaler) GetMetrics(ctx context.Context, metricName string, metricSel
}

// getItemCount gets the number of items in the bucket, up to maxCount
func (s *gcsScaler) getItemCount(ctx context.Context, maxCount int) (int64, error) {
func (s *gcsScaler) getItemCount(ctx context.Context, maxCount int64) (int64, error) {
query := &storage.Query{Prefix: ""}
err := query.SetAttrSelection([]string{"Name"})
if err != nil {
Expand All @@ -189,7 +199,7 @@ func (s *gcsScaler) getItemCount(ctx context.Context, maxCount int) (int64, erro
it := s.bucket.Objects(ctx, query)
var count int64

for count < int64(maxCount) {
for count < maxCount {
_, err := it.Next()
if err == iterator.Done {
break
Expand Down
4 changes: 3 additions & 1 deletion pkg/scalers/gcp_storage_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type gcpGcsMetricIdentifier struct {
var testGcsMetadata = []parseGcsMetadataTestData{
{map[string]string{}, map[string]string{}, true},
// all properly formed
{nil, map[string]string{"bucketName": "test-bucket", "targetObjectCount": "7", "maxBucketItemsToScan": "100", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
{nil, map[string]string{"bucketName": "test-bucket", "targetObjectCount": "7", "maxBucketItemsToScan": "100", "credentialsFromEnv": "SAMPLE_CREDS", "activationTargetObjectCount": "5"}, false},
// all properly formed while using defaults
{nil, map[string]string{"bucketName": "test-bucket", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
// missing bucketName
Expand All @@ -35,6 +35,8 @@ var testGcsMetadata = []parseGcsMetadataTestData{
{nil, map[string]string{"bucketName": "test-bucket", "targetObjectCount": "AA", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// malformed maxBucketItemsToScan
{nil, map[string]string{"bucketName": "test-bucket", "targetObjectCount": "7", "maxBucketItemsToScan": "AA", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// malformed activationTargetObjectCount
{nil, map[string]string{"bucketName": "test-bucket", "credentialsFromEnv": "SAMPLE_CREDS", "activationTargetObjectCount": "A"}, true},
// Credentials from AuthParams
{map[string]string{"GoogleApplicationCredentials": "Creds", "podIdentityOwner": ""}, map[string]string{"bucketName": "test-bucket", "targetLength": "7"}, false},
// Credentials from AuthParams with empty creds
Expand Down
97 changes: 57 additions & 40 deletions tests/scalers_go/gcp_pubsub/gcp_pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,30 +29,32 @@ const (
)

var (
gcpKey = os.Getenv("GCP_SP_KEY")
creds = make(map[string]interface{})
errGcpKey = json.Unmarshal([]byte(gcpKey), &creds)
testNamespace = fmt.Sprintf("%s-ns", testName)
secretName = fmt.Sprintf("%s-secret", testName)
deploymentName = fmt.Sprintf("%s-deployment", testName)
scaledObjectName = fmt.Sprintf("%s-so", testName)
projectID = creds["project_id"]
topicID = fmt.Sprintf("projects/%s/topics/keda-test-topic-%d", projectID, now)
subscriptionName = fmt.Sprintf("keda-test-topic-sub-%d", now)
subscriptionID = fmt.Sprintf("projects/%s/subscriptions/%s", projectID, subscriptionName)
maxReplicaCount = 4
gsPrefix = fmt.Sprintf("kubectl exec --namespace %s deploy/gcp-sdk -- ", testNamespace)
gcpKey = os.Getenv("GCP_SP_KEY")
creds = make(map[string]interface{})
errGcpKey = json.Unmarshal([]byte(gcpKey), &creds)
testNamespace = fmt.Sprintf("%s-ns", testName)
secretName = fmt.Sprintf("%s-secret", testName)
deploymentName = fmt.Sprintf("%s-deployment", testName)
scaledObjectName = fmt.Sprintf("%s-so", testName)
projectID = creds["project_id"]
topicID = fmt.Sprintf("projects/%s/topics/keda-test-topic-%d", projectID, now)
subscriptionName = fmt.Sprintf("keda-test-topic-sub-%d", now)
subscriptionID = fmt.Sprintf("projects/%s/subscriptions/%s", projectID, subscriptionName)
maxReplicaCount = 4
activationThreshold = 5
gsPrefix = fmt.Sprintf("kubectl exec --namespace %s deploy/gcp-sdk -- ", testNamespace)
)

type templateData struct {
TestNamespace string
SecretName string
GcpCreds string
DeploymentName string
ScaledObjectName string
SubscriptionName string
SubscriptionID string
MaxReplicaCount int
TestNamespace string
SecretName string
GcpCreds string
DeploymentName string
ScaledObjectName string
SubscriptionName string
SubscriptionID string
MaxReplicaCount int
ActivationThreshold int
}

type templateValues map[string]string
Expand Down Expand Up @@ -127,6 +129,7 @@ spec:
subscriptionName: {{.SubscriptionName}}
mode: SubscriptionSize
value: "5"
activationValue: "{{.ActivationThreshold}}"
credentialsFromEnv: GOOGLE_APPLICATION_CREDENTIALS_JSON
`

Expand Down Expand Up @@ -185,6 +188,7 @@ func TestScaler(t *testing.T) {
if sdkReady {
if createPubsub(t) == nil {
// test scaling
testActivation(t, kc)
testScaleUp(t, kc)
testScaleDown(t, kc)

Expand Down Expand Up @@ -236,42 +240,55 @@ func getTemplateData() (templateData, templateValues) {
base64GcpCreds := base64.StdEncoding.EncodeToString([]byte(gcpKey))

return templateData{
TestNamespace: testNamespace,
SecretName: secretName,
GcpCreds: base64GcpCreds,
DeploymentName: deploymentName,
ScaledObjectName: scaledObjectName,
SubscriptionID: subscriptionID,
SubscriptionName: subscriptionName,
MaxReplicaCount: maxReplicaCount,
TestNamespace: testNamespace,
SecretName: secretName,
GcpCreds: base64GcpCreds,
DeploymentName: deploymentName,
ScaledObjectName: scaledObjectName,
SubscriptionID: subscriptionID,
SubscriptionName: subscriptionName,
MaxReplicaCount: maxReplicaCount,
ActivationThreshold: activationThreshold,
}, templateValues{
"secretTemplate": secretTemplate,
"deploymentTemplate": deploymentTemplate,
"scaledObjectTemplate": scaledObjectTemplate,
"gcpSdkTemplate": gcpSdkTemplate}
}

func publishMessages(t *testing.T, count int) {
t.Logf("--- publishing %d messages ---", count)
publish := fmt.Sprintf(
"%s/bin/bash -c -- 'for i in {1..%d}; do gcloud pubsub topics publish %s --message=AAAAAAAAAA;done'",
gsPrefix,
count,
topicID)
_, err := ExecuteCommand(publish)
assert.NoErrorf(t, err, "cannot publish messages to pubsub topic - %s", err)
}

func testActivation(t *testing.T, kc *kubernetes.Clientset) {
t.Log("--- testing not scaling if below threshold ---")

publishMessages(t, activationThreshold)

t.Log("--- waiting to see replicas are not scaled up ---")
AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 240)
}

func testScaleUp(t *testing.T, kc *kubernetes.Clientset) {
t.Log("--- testing scale up ---")

t.Log("--- publishing messages ---")
publish := fmt.Sprintf(" && gcloud pubsub topics publish %s --message=AAAAAAAAAA && sleep 1s", topicID)
cmd := gsPrefix + `/bin/bash -c -- 'cd .`
for i := 0; i < 30; i++ {
cmd += publish
}
cmd += `'`
_, err := ExecuteCommand(cmd)
assert.NoErrorf(t, err, "cannot publish messages to pubsub topic - %s", err)
publishMessages(t, 20-activationThreshold)

t.Log("--- waiting for replicas to scale up ---")
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 60, 5),
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 30, 10),
fmt.Sprintf("replica count should be %d after five minutes", maxReplicaCount))
}

func testScaleDown(t *testing.T, kc *kubernetes.Clientset) {
t.Log("--- testing scale down ---")
cmd := fmt.Sprintf("%sgcloud pubsub subscriptions seek %s --time=p0s", gsPrefix, subscriptionID)
cmd := fmt.Sprintf("%sgcloud pubsub subscriptions seek %s --time=-P1S", gsPrefix, subscriptionID)
_, err := ExecuteCommand(cmd)
assert.NoErrorf(t, err, "cannot reset subscription position - %s", err)

Expand Down
Loading

0 comments on commit eb9d5ce

Please sign in to comment.