From b8913a1c820111952bc3dddcbb7b06eaeeecf7cf Mon Sep 17 00:00:00 2001 From: Ram Cohen Date: Mon, 14 Mar 2022 10:04:10 +0200 Subject: [PATCH] Add a scaler based on number of objects in a GCP Storage bucket (#2648) Signed-off-by: Ram Cohen Co-authored-by: Zbynek Roubalik --- CHANGELOG.md | 1 + go.mod | 11 +- go.sum | 27 ++++ pkg/scalers/gcp_common.go | 42 +++++ pkg/scalers/gcp_pub_sub_scaler.go | 36 +---- pkg/scalers/gcp_storage_scaler.go | 214 +++++++++++++++++++++++++ pkg/scalers/gcp_storage_scaler_test.go | 75 +++++++++ pkg/scaling/scale_handler.go | 2 + tests/scalers/gcp-storage.test.ts | 195 ++++++++++++++++++++++ 9 files changed, 565 insertions(+), 38 deletions(-) create mode 100644 pkg/scalers/gcp_common.go create mode 100644 pkg/scalers/gcp_storage_scaler.go create mode 100644 pkg/scalers/gcp_storage_scaler_test.go create mode 100644 tests/scalers/gcp-storage.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 5c0c9efa79c..a8e8c62859d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ - **General:** Automatically release container image for ARM ([#2263]https://github.com/kedacore/keda/issues/2263)) - **General:** Automatically run end-to-end tests on ARM ([#2262]https://github.com/kedacore/keda/issues/2262)) - **General:** Introduce new Azure Data Explorer Scaler ([#1488](https://github.com/kedacore/keda/issues/1488)) +- **General:** Introduce new GCP Storage Scaler ([#2628](https://github.com/kedacore/keda/issues/2628)) ### Improvements diff --git a/go.mod b/go.mod index cba453a2db8..93fc65d607e 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/kedacore/keda/v2 go 1.17 require ( - cloud.google.com/go/compute v1.1.0 + cloud.google.com/go/compute v1.3.0 cloud.google.com/go/monitoring v1.2.0 github.com/Azure/azure-amqp-common-go/v3 v3.2.2 github.com/Azure/azure-event-hubs-go/v3 v3.3.16 @@ -51,8 +51,8 @@ require ( github.com/xdg/scram v1.0.5 github.com/xhit/go-str2duration/v2 v2.0.0 go.mongodb.org/mongo-driver v1.8.2 - google.golang.org/api v0.66.0 - google.golang.org/genproto v0.0.0-20220126215142-9970aeb2e350 + google.golang.org/api v0.69.0 + google.golang.org/genproto v0.0.0-20220218161850-94dd64e39d7c google.golang.org/grpc v1.44.0 google.golang.org/protobuf v1.27.1 k8s.io/api v0.23.3 @@ -84,6 +84,9 @@ replace ( ) require ( + cloud.google.com/go v0.100.2 // indirect + cloud.google.com/go/iam v0.2.0 // indirect + cloud.google.com/go/storage v1.21.0 // indirect github.com/Azure/azure-pipeline-go v0.2.3 // indirect github.com/Azure/go-amqp v0.16.4 // indirect github.com/Azure/go-autorest v14.2.0+incompatible // indirect @@ -235,7 +238,7 @@ require ( golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect - golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect + golang.org/x/sys v0.0.0-20220209214540-3681064d5158 // indirect golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect diff --git a/go.sum b/go.sum index 9090ba7148c..d3bf17648d1 100644 --- a/go.sum +++ b/go.sum @@ -27,6 +27,7 @@ cloud.google.com/go v0.94.1/go.mod h1:qAlAugsXlC+JWO+Bke5vCtc9ONxjQT3drlTTnAplMW cloud.google.com/go v0.97.0/go.mod h1:GF7l59pYBVlXQIBLx3a761cZ41F9bBH3JUlihCt2Udc= cloud.google.com/go v0.98.0/go.mod h1:ua6Ush4NALrHk5QXDWnjvZHN93OuF0HfuEPq9I1X0cM= cloud.google.com/go v0.99.0/go.mod h1:w0Xx2nLzqWJPuozYQX+hFfCSI8WioryfRDzkoI/Y2ZA= +cloud.google.com/go v0.100.1/go.mod h1:fs4QogzfH5n2pBXBP9vRiU+eCny7lD2vmFZy79Iuw1U= cloud.google.com/go v0.100.2 h1:t9Iw5QH5v4XtlEQaCtUY7x6sCABps8sW0acw7e2WQ6Y= cloud.google.com/go v0.100.2/go.mod h1:4Xra9TjzAeYHrl5+oeLlzbM2k3mjVhZh4UqTZ//w99A= cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o= @@ -38,9 +39,16 @@ cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM7 cloud.google.com/go/compute v0.1.0/go.mod h1:GAesmwr110a34z04OlxYkATPBEfVhkymfTBXtfbBFow= cloud.google.com/go/compute v1.1.0 h1:pyPhehLfZ6pVzRgJmXGYvCY4K7WSWRhVw0AwhgVvS84= cloud.google.com/go/compute v1.1.0/go.mod h1:2NIffxgWfORSI7EOYMFatGTfjMLnqrOKBEyYb6NoRgA= +cloud.google.com/go/compute v1.2.0/go.mod h1:xlogom/6gr8RJGBe7nT2eGsQYAFUbbv8dbC29qE3Xmw= +cloud.google.com/go/compute v1.3.0 h1:mPL/MzDDYHsh5tHRS9mhmhWlcgClCrCa6ApQCU6wnHI= +cloud.google.com/go/compute v1.3.0/go.mod h1:cCZiE1NHEtai4wiufUhW8I8S1JKkAnhnQJWM7YD99wM= cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE= cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk= cloud.google.com/go/firestore v1.1.0/go.mod h1:ulACoGHTpvq5r8rxGJ4ddJZBZqakUQqClKRT5SZwBmk= +cloud.google.com/go/iam v0.1.1 h1:4CapQyNFjiksks1/x7jsvsygFPhihslYk5GptIrlX68= +cloud.google.com/go/iam v0.1.1/go.mod h1:CKqrcnI/suGpybEHxZ7BMehL0oA4LpdyJdUlTl9jVMw= +cloud.google.com/go/iam v0.2.0 h1:Ouq6qif4mZdXkb3SiFMpxvu0JQJB1Yid9TsZ23N6hg8= +cloud.google.com/go/iam v0.2.0/go.mod h1:BCK88+tmjAwnZYfOSizmKCTSFjJHCa18t3DpdGEY13Y= cloud.google.com/go/monitoring v1.2.0 h1:fEvQITrhVcPM6vuDQcgPMbU5kZFeQFwZmE7v6+S8BPo= cloud.google.com/go/monitoring v1.2.0/go.mod h1:tE8I08OzjWmXLhCopnPaUDpfGOEJOonfWXGR9E9SsFo= cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I= @@ -53,6 +61,10 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= cloud.google.com/go/storage v1.18.2/go.mod h1:AiIj7BWXyhO5gGVmYJ+S8tbkCx3yb0IMjua8Aw4naVM= +cloud.google.com/go/storage v1.20.0 h1:kv3rQ3clEQdxqokkCCgQo+bxPqcuXiROjxvnKb8Oqdk= +cloud.google.com/go/storage v1.20.0/go.mod h1:TiC1o6FxNCG8y5gB7rqCsFZCIYPMPZCO81ppOoEPLGI= +cloud.google.com/go/storage v1.21.0 h1:HwnT2u2D309SFDHQII6m18HlrCi3jAXhUMTLOWXYH14= +cloud.google.com/go/storage v1.21.0/go.mod h1:XmRlxkgPjlBONznT2dDUU/5XlpU2OjMnKuqnZI01LAA= contrib.go.opencensus.io/exporter/ocagent v0.7.1-0.20200907061046-05415f1de66d/go.mod h1:IshRmMJBhDfFj5Y67nVhMYTTIze91RUeT73ipWKs/GY= contrib.go.opencensus.io/exporter/prometheus v0.4.0/go.mod h1:o7cosnyfuPVK0tB8q0QmaQNhGnptITnPQB+z1+qeFB0= contrib.go.opencensus.io/exporter/zipkin v0.1.2/go.mod h1:mP5xM3rrgOjpn79MM8fZbj3gsxcuytSqtH0dxSWW1RE= @@ -1267,6 +1279,9 @@ golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 h1:XfKQ4OlFl8okEOr5UvAqFRVj8pY/4yfcXrddB8qAbU0= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220209214540-3681064d5158 h1:rm+CHSpPEEW2IsXUib1ThaHIjuBVZjxNgSKmBLFfD4c= +golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -1389,9 +1404,13 @@ google.golang.org/api v0.57.0/go.mod h1:dVPlbZyBo2/OjBpmvNdpn2GRm6rPy75jyU7bmhdr google.golang.org/api v0.58.0/go.mod h1:cAbP2FsxoGVNwtgNAmmn3y5G1TWAiVYRmg4yku3lv+E= google.golang.org/api v0.61.0/go.mod h1:xQRti5UdCmoCEqFxcz93fTl338AVqDgyaDRuOZ3hg9I= google.golang.org/api v0.63.0/go.mod h1:gs4ij2ffTRXwuzzgJl/56BdwJaA194ijkfn++9tDuPo= +google.golang.org/api v0.64.0/go.mod h1:931CdxA8Rm4t6zqTFGSsgwbAEZ2+GMYurbndwSimebM= google.golang.org/api v0.65.0/go.mod h1:ArYhxgGadlWmqO1IqVujw6Cs8IdD33bTmzKo2Sh+cbg= google.golang.org/api v0.66.0 h1:CbGy4LEiXCVCiNEDFgGpWOVwsDT7E2Qej1ZvN1P7KPg= google.golang.org/api v0.66.0/go.mod h1:I1dmXYpX7HGwz/ejRxwQp2qj5bFAz93HiCU1C1oYd9M= +google.golang.org/api v0.67.0/go.mod h1:ShHKP8E60yPsKNw/w8w+VYaj9H6buA5UqDp8dhbQZ6g= +google.golang.org/api v0.69.0 h1:yHW5s2SFyDapr/43kYtIQmoaaFVW4baLMLwqV4auj2A= +google.golang.org/api v0.69.0/go.mod h1:boanBiw+h5c3s+tBPgEzLDRHfFLWV0qXxRHz3ws7C80= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= @@ -1468,11 +1487,19 @@ google.golang.org/genproto v0.0.0-20211129164237-f09f9a12af12/go.mod h1:5CzLGKJ6 google.golang.org/genproto v0.0.0-20211206160659-862468c7d6e0/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20211221195035-429b39de9b1c/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= +google.golang.org/genproto v0.0.0-20211223182754-3ac035c7e7cb/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20220107163113-42d7afdf6368/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20220111164026-67b88f271998/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20220114231437-d2e6a121cae0/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20220126215142-9970aeb2e350 h1:YxHp5zqIcAShDEvRr5/0rVESVS+njYF68PSdazrNLJo= google.golang.org/genproto v0.0.0-20220126215142-9970aeb2e350/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= +google.golang.org/genproto v0.0.0-20220201184016-50beb8ab5c44 h1:0UVUC7VWA/mIU+5a4hVWH6xa234gLcRX8ZcrFKmWWKA= +google.golang.org/genproto v0.0.0-20220201184016-50beb8ab5c44/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= +google.golang.org/genproto v0.0.0-20220207164111-0872dc986b00/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= +google.golang.org/genproto v0.0.0-20220211171837-173942840c17/go.mod h1:kGP+zUP2Ddo0ayMi4YuN7C3WZyJvGLZRh8Z5wnAqvEI= +google.golang.org/genproto v0.0.0-20220216160803-4663080d8bc8/go.mod h1:kGP+zUP2Ddo0ayMi4YuN7C3WZyJvGLZRh8Z5wnAqvEI= +google.golang.org/genproto v0.0.0-20220218161850-94dd64e39d7c h1:TU4rFa5APdKTq0s6B7WTsH6Xmx0Knj86s6Biz56mErE= +google.golang.org/genproto v0.0.0-20220218161850-94dd64e39d7c/go.mod h1:kGP+zUP2Ddo0ayMi4YuN7C3WZyJvGLZRh8Z5wnAqvEI= google.golang.org/grpc v1.8.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.0/go.mod h1:chYK+tFQF0nDUGJgXMSgLCQk3phJEuONr2DCgLDdAQM= diff --git a/pkg/scalers/gcp_common.go b/pkg/scalers/gcp_common.go new file mode 100644 index 00000000000..090d154b2c5 --- /dev/null +++ b/pkg/scalers/gcp_common.go @@ -0,0 +1,42 @@ +package scalers + +import ( + "fmt" + + kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" +) + +type gcpAuthorizationMetadata struct { + GoogleApplicationCredentials string + GoogleApplicationCredentialsFile string + podIdentityOwner bool + podIdentityProviderEnabled bool +} + +func getGcpAuthorization(config *ScalerConfig, resolvedEnv map[string]string) (*gcpAuthorizationMetadata, error) { + metadata := config.TriggerMetadata + authParams := config.AuthParams + meta := gcpAuthorizationMetadata{} + if metadata["identityOwner"] == "operator" { + meta.podIdentityOwner = false + } else if metadata["identityOwner"] == "" || metadata["identityOwner"] == "pod" { + meta.podIdentityOwner = true + switch { + case config.PodIdentity == kedav1alpha1.PodIdentityProviderGCP: + // do nothing, rely on underneath metadata google + meta.podIdentityProviderEnabled = true + case authParams["GoogleApplicationCredentials"] != "": + meta.GoogleApplicationCredentials = authParams["GoogleApplicationCredentials"] + default: + switch { + case metadata["credentialsFromEnv"] != "": + meta.GoogleApplicationCredentials = resolvedEnv[metadata["credentialsFromEnv"]] + case metadata["credentialsFromEnvFile"] != "": + meta.GoogleApplicationCredentialsFile = resolvedEnv[metadata["credentialsFromEnvFile"]] + default: + return nil, fmt.Errorf("GoogleApplicationCredentials not found") + } + } + } + return &meta, nil +} diff --git a/pkg/scalers/gcp_pub_sub_scaler.go b/pkg/scalers/gcp_pub_sub_scaler.go index 94a3a384156..254cc549eec 100644 --- a/pkg/scalers/gcp_pub_sub_scaler.go +++ b/pkg/scalers/gcp_pub_sub_scaler.go @@ -15,7 +15,6 @@ import ( "k8s.io/metrics/pkg/apis/external_metrics" logf "sigs.k8s.io/controller-runtime/pkg/log" - kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" kedautil "github.com/kedacore/keda/v2/pkg/util" ) @@ -32,12 +31,6 @@ const ( var regexpCompositeSubscriptionIDPrefix = regexp.MustCompile(compositeSubscriptionIDPrefix) -type gcpAuthorizationMetadata struct { - GoogleApplicationCredentials string - podIdentityOwner bool - podIdentityProviderEnabled bool -} - type pubsubScaler struct { client *StackDriverClient metadata *pubsubMetadata @@ -48,7 +41,7 @@ type pubsubMetadata struct { value int subscriptionName string - gcpAuthorization gcpAuthorizationMetadata + gcpAuthorization *gcpAuthorizationMetadata scalerIndex int } @@ -121,7 +114,7 @@ func parsePubSubMetadata(config *ScalerConfig) (*pubsubMetadata, error) { if err != nil { return nil, err } - meta.gcpAuthorization = *auth + meta.gcpAuthorization = auth meta.scalerIndex = config.ScalerIndex return &meta, nil } @@ -255,28 +248,3 @@ func getSubscriptionData(s *pubsubScaler) (string, string) { } return subscriptionID, projectID } - -func getGcpAuthorization(config *ScalerConfig, resolvedEnv map[string]string) (*gcpAuthorizationMetadata, error) { - metadata := config.TriggerMetadata - authParams := config.AuthParams - meta := gcpAuthorizationMetadata{} - if metadata["identityOwner"] == "operator" { - meta.podIdentityOwner = false - } else if metadata["identityOwner"] == "" || metadata["identityOwner"] == "pod" { - meta.podIdentityOwner = true - switch { - case config.PodIdentity == kedav1alpha1.PodIdentityProviderGCP: - // do nothing, rely on underneath metadata google - meta.podIdentityProviderEnabled = true - case authParams["GoogleApplicationCredentials"] != "": - meta.GoogleApplicationCredentials = authParams["GoogleApplicationCredentials"] - default: - if metadata["credentialsFromEnv"] != "" { - meta.GoogleApplicationCredentials = resolvedEnv[metadata["credentialsFromEnv"]] - } else { - return nil, fmt.Errorf("GoogleApplicationCredentials not found") - } - } - } - return &meta, nil -} diff --git a/pkg/scalers/gcp_storage_scaler.go b/pkg/scalers/gcp_storage_scaler.go new file mode 100644 index 00000000000..4716348e39b --- /dev/null +++ b/pkg/scalers/gcp_storage_scaler.go @@ -0,0 +1,214 @@ +package scalers + +import ( + "context" + "fmt" + "strconv" + "strings" + + "cloud.google.com/go/storage" + "google.golang.org/api/iterator" + option "google.golang.org/api/option" + + "k8s.io/api/autoscaling/v2beta2" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/metrics/pkg/apis/external_metrics" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + kedautil "github.com/kedacore/keda/v2/pkg/util" +) + +const ( + // Default for how many objects per a single scaled processor + defaultTargetObjectCount = 100 + // A limit on iterating bucket objects + defaultMaxBucketItemsToScan = 1000 +) + +type gcsScaler struct { + client *storage.Client + bucket *storage.BucketHandle + metadata *gcsMetadata +} + +type gcsMetadata struct { + bucketName string + gcpAuthorization *gcpAuthorizationMetadata + maxBucketItemsToScan int + metricName string + targetObjectCount int +} + +var gcsLog = logf.Log.WithName("gcp_storage_scaler") + +// NewGcsScaler creates a new gcsScaler +func NewGcsScaler(config *ScalerConfig) (Scaler, error) { + meta, err := parseGcsMetadata(config) + if err != nil { + return nil, fmt.Errorf("error parsing GCP storage metadata: %s", err) + } + + ctx := context.Background() + + var client *storage.Client + + switch { + case meta.gcpAuthorization.podIdentityProviderEnabled: + client, err = storage.NewClient(ctx) + case meta.gcpAuthorization.GoogleApplicationCredentialsFile != "": + client, err = storage.NewClient( + ctx, option.WithCredentialsFile(meta.gcpAuthorization.GoogleApplicationCredentialsFile)) + default: + client, err = storage.NewClient( + ctx, option.WithCredentialsJSON([]byte(meta.gcpAuthorization.GoogleApplicationCredentials))) + } + + if err != nil { + return nil, fmt.Errorf("storage.NewClient: %v", err) + } + + bucket := client.Bucket(meta.bucketName) + if bucket == nil { + return nil, fmt.Errorf("failed to create a handle to bucket %s", meta.bucketName) + } + + gcsLog.Info(fmt.Sprintf("Metadata %v", meta)) + + return &gcsScaler{ + client: client, + bucket: bucket, + metadata: meta, + }, nil +} + +func parseGcsMetadata(config *ScalerConfig) (*gcsMetadata, error) { + meta := gcsMetadata{} + meta.targetObjectCount = defaultTargetObjectCount + meta.maxBucketItemsToScan = defaultMaxBucketItemsToScan + + if val, ok := config.TriggerMetadata["bucketName"]; ok { + if val == "" { + gcsLog.Error(nil, "no bucket name given") + return nil, fmt.Errorf("no bucket name given") + } + + meta.bucketName = val + } else { + gcsLog.Error(nil, "no bucket name given") + return nil, fmt.Errorf("no bucket name given") + } + + if val, ok := config.TriggerMetadata["targetObjectCount"]; ok { + targetObjectCount, err := strconv.Atoi(val) + if err != nil { + gcsLog.Error(err, "Error parsing targetObjectCount") + return nil, fmt.Errorf("error parsing targetObjectCount: %s", err.Error()) + } + + meta.targetObjectCount = targetObjectCount + } + + if val, ok := config.TriggerMetadata["maxBucketItemsToScan"]; ok { + maxBucketItemsToScan, err := strconv.Atoi(val) + if err != nil { + gcsLog.Error(err, "Error parsing maxBucketItemsToScan") + return nil, fmt.Errorf("error parsing maxBucketItemsToScan: %s", err.Error()) + } + + meta.maxBucketItemsToScan = maxBucketItemsToScan + } + + auth, err := getGcpAuthorization(config, config.ResolvedEnv) + if err != nil { + return nil, err + } + meta.gcpAuthorization = auth + + var metricName = kedautil.NormalizeString(fmt.Sprintf("gcp-storage-%s", meta.bucketName)) + meta.metricName = GenerateMetricNameWithIndex(config.ScalerIndex, metricName) + + return &meta, nil +} + +// IsActive checks if there are any messages in the subscription +func (s *gcsScaler) IsActive(ctx context.Context) (bool, error) { + items, err := s.getItemCount(ctx, 1) + if err != nil { + return false, err + } + + return items > 0, nil +} + +func (s *gcsScaler) Close(context.Context) error { + if s.client != nil { + return s.client.Close() + } + return nil +} + +// GetMetricSpecForScaling returns the metric spec for the HPA +func (s *gcsScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec { + targetValueQty := resource.NewQuantity(int64(s.metadata.targetObjectCount), resource.DecimalSI) + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: s.metadata.metricName, + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetValueQty, + }, + } + metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: externalMetricType} + return []v2beta2.MetricSpec{metricSpec} +} + +// GetMetrics returns the number of items in the bucket (up to s.metadata.maxBucketItemsToScan) +func (s *gcsScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { + items, err := s.getItemCount(ctx, s.metadata.maxBucketItemsToScan) + if err != nil { + return []external_metrics.ExternalMetricValue{}, err + } + + metric := external_metrics.ExternalMetricValue{ + MetricName: metricName, + Value: *resource.NewQuantity(int64(items), resource.DecimalSI), + Timestamp: metav1.Now(), + } + + return append([]external_metrics.ExternalMetricValue{}, metric), nil +} + +// getItemCount gets the number of items in the bucket, up to maxCount +func (s *gcsScaler) getItemCount(ctx context.Context, maxCount int) (int, error) { + query := &storage.Query{Prefix: ""} + err := query.SetAttrSelection([]string{"Name"}) + if err != nil { + gcsLog.Error(err, "failed to set attribute selection") + return 0, err + } + + it := s.bucket.Objects(ctx, query) + count := 0 + + for count < maxCount { + _, err := it.Next() + if err == iterator.Done { + break + } + if err != nil { + if strings.Contains(err.Error(), "bucket doesn't exist") { + gcsLog.Info("Bucket " + s.metadata.bucketName + " doesn't exist") + return 0, nil + } + gcsLog.Error(err, "failed to enumerate items in bucket "+s.metadata.bucketName) + return count, err + } + count++ + } + + gcsLog.V(1).Info(fmt.Sprintf("Counted %d items with a limit of %d", count, maxCount)) + return count, nil +} diff --git a/pkg/scalers/gcp_storage_scaler_test.go b/pkg/scalers/gcp_storage_scaler_test.go new file mode 100644 index 00000000000..02a90888d4a --- /dev/null +++ b/pkg/scalers/gcp_storage_scaler_test.go @@ -0,0 +1,75 @@ +package scalers + +import ( + "context" + "testing" +) + +var testGcsResolvedEnv = map[string]string{ + "SAMPLE_CREDS": "", +} + +type parseGcsMetadataTestData struct { + authParams map[string]string + metadata map[string]string + isError bool +} + +type gcpGcsMetricIdentifier struct { + metadataTestData *parseGcsMetadataTestData + scalerIndex int + name string +} + +var testGcsMetadata = []parseGcsMetadataTestData{ + {map[string]string{}, map[string]string{}, true}, + // all properly formed + {nil, map[string]string{"bucketName": "test-bucket", "targetObjectCount": "7", "maxBucketItemsToScan": "100", "credentialsFromEnv": "SAMPLE_CREDS"}, false}, + // all properly formed while using defaults + {nil, map[string]string{"bucketName": "test-bucket", "credentialsFromEnv": "SAMPLE_CREDS"}, false}, + // missing bucketName + {nil, map[string]string{"bucketName": "", "targetObjectCount": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, true}, + // missing credentials + {nil, map[string]string{"bucketName": "test-bucket", "targetObjectCount": "7", "credentialsFromEnv": ""}, true}, + // malformed targetObjectCount + {nil, map[string]string{"bucketName": "test-bucket", "targetObjectCount": "AA", "credentialsFromEnv": "SAMPLE_CREDS"}, true}, + // malformed maxBucketItemsToScan + {nil, map[string]string{"bucketName": "test-bucket", "targetObjectCount": "7", "maxBucketItemsToScan": "AA", "credentialsFromEnv": "SAMPLE_CREDS"}, true}, + // Credentials from AuthParams + {map[string]string{"GoogleApplicationCredentials": "Creds", "podIdentityOwner": ""}, map[string]string{"bucketName": "test-bucket", "targetLength": "7"}, false}, + // Credentials from AuthParams with empty creds + {map[string]string{"GoogleApplicationCredentials": "", "podIdentityOwner": ""}, map[string]string{"bucketName": "test-bucket", "subscriptionSize": "7"}, true}, +} + +var gcpGcsMetricIdentifiers = []gcpGcsMetricIdentifier{ + {&testGcsMetadata[1], 0, "s0-gcp-storage-test-bucket"}, + {&testGcsMetadata[1], 1, "s1-gcp-storage-test-bucket"}, +} + +func TestGcsParseMetadata(t *testing.T) { + for _, testData := range testGcsMetadata { + _, err := parseGcsMetadata(&ScalerConfig{AuthParams: testData.authParams, TriggerMetadata: testData.metadata, ResolvedEnv: testGcsResolvedEnv}) + if err != nil && !testData.isError { + t.Error("Expected success but got error", err) + } + if testData.isError && err == nil { + t.Error("Expected error but got success") + } + } +} + +func TestGcsGetMetricSpecForScaling(t *testing.T) { + for _, testData := range gcpGcsMetricIdentifiers { + meta, err := parseGcsMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testGcsResolvedEnv, ScalerIndex: testData.scalerIndex}) + if err != nil { + t.Fatal("Could not parse metadata:", err) + } + mockGcsScaler := gcsScaler{nil, nil, meta} + + metricSpec := mockGcsScaler.GetMetricSpecForScaling(context.Background()) + metricName := metricSpec[0].External.Metric.Name + if metricName != testData.name { + t.Error("Wrong External metric source name:", metricName) + } + } +} diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 3e98f28b39f..df3874ae20a 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -392,6 +392,8 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string, return scalers.NewExternalPushScaler(config) case "gcp-pubsub": return scalers.NewPubSubScaler(config) + case "gcp-storage": + return scalers.NewGcsScaler(config) case "graphite": return scalers.NewGraphiteScaler(config) case "huawei-cloudeye": diff --git a/tests/scalers/gcp-storage.test.ts b/tests/scalers/gcp-storage.test.ts new file mode 100644 index 00000000000..2523704711b --- /dev/null +++ b/tests/scalers/gcp-storage.test.ts @@ -0,0 +1,195 @@ +import * as fs from 'fs' +import * as sh from 'shelljs' +import * as tmp from 'tmp' +import test from 'ava' +import { createNamespace, waitForDeploymentReplicaCount } from './helpers'; + +const gcpKey = process.env['GCP_SP_KEY'] +const testNamespace = 'gcp-storage-test' +const bucketName = 'keda-test-storage-bucket' +const deploymentName = 'dummy-consumer' +const maxReplicaCount = '3' +const gsPrefix = `kubectl exec --namespace ${testNamespace} deploy/gcp-sdk -- ` + +test.before(t => { + createNamespace(testNamespace) + + // deploy dummy consumer app, scaled object etc. + const tmpFile = tmp.fileSync() + fs.writeFileSync(tmpFile.name, deployYaml.replace("{{GCP_CREDS}}", Buffer.from(gcpKey).toString("base64"))) + + t.is( + 0, + sh.exec(`kubectl apply -f ${tmpFile.name} --namespace ${testNamespace}`).code, + 'creating a deployment should work..' + ) +}) + +test.serial('Deployment should have 0 replicas on start', async t => { + t.true(await waitForDeploymentReplicaCount(0, deploymentName, testNamespace, 30, 2000), 'replica count should start out as 0') +}) + +test.serial('creating the gcp-sdk pod should work..', async t => { + let tmpFile = tmp.fileSync() + fs.writeFileSync(tmpFile.name, gcpSdkYaml) + t.is( + 0, + sh.exec(`kubectl apply -f ${tmpFile.name} --namespace ${testNamespace}`).code, + 'creating the gcp-sdk pod should work..' + ) + + // wait for the gcp-sdk pod to be ready + t.true(await waitForDeploymentReplicaCount(1, 'gcp-sdk', testNamespace, 30, 2000), 'GCP-SDK pod is not in a ready state') +}) + +test.serial('initializing the gcp-sdk pod should work..', t => { + sh.exec(`kubectl wait --for=condition=ready --namespace ${testNamespace} pod -l app=gcp-sdk --timeout=30s`) + sh.exec('sleep 5s') + + // Authenticate to GCP + const creds = JSON.parse(gcpKey) + t.is( + 0, + sh.exec(gsPrefix + `gcloud auth activate-service-account ${creds.client_email} --key-file /etc/secret-volume/creds.json --project=${creds.project_id}`).code, + 'Setting GCP authentication on gcp-sdk should work..' + ) + + // Cleanup the bucket + sh.exec(gsPrefix + `gsutil -m rm -r gs://${bucketName}`) + + // Create bucket + t.is( + 0, + sh.exec(gsPrefix + `gsutil mb gs://${bucketName}`).code, + 'Creating GCS bucket should work' + ) +}) + +test.serial(`Uploading objects to GCS bucket`, t => { + for (let i = 0; i < 30; i++) { + t.is( + 0, + sh.exec(gsPrefix + `gsutil cp -n /usr/lib/google-cloud-sdk/bin/gsutil gs://${bucketName}/gsutil` + i).code, + 'Copying an object should work..' + ) + } +}) + +test.serial(`Deployment should scale to ${maxReplicaCount} (the max) then back to 0`, async t => { + // Wait for the number of replicas to be scaled up to maxReplicaCount + t.true( + await waitForDeploymentReplicaCount(parseInt(maxReplicaCount, 10), deploymentName, testNamespace, 60, 2000), + `Replica count should be ${maxReplicaCount} after 120 seconds`) +}) + +test.serial(`Deleting objects from GCS bucket`, t => { + t.is( + 0, + sh.exec(gsPrefix + `gsutil -m rm -a gs://${bucketName}/**`).code, + 'Deleting objects should work..' + ) +}) + +test.serial(`Deployment should scale back to 0`, async t => { + t.true( + await waitForDeploymentReplicaCount(0, deploymentName, testNamespace, 30, 10000), + `Replica count should be 0 after 5 minutes`) +}) + +test.after.always.cb('clean up', t => { + // Cleanup the bucket + sh.exec(gsPrefix + `gsutil -m rm -r gs://${bucketName}`) + + sh.exec(`kubectl delete deployment.apps/${deploymentName} --namespace ${testNamespace}`) + sh.exec(`kubectl delete namespace ${testNamespace}`) + + t.end() +}) + + +const deployYaml = `apiVersion: apps/v1 +kind: Deployment +metadata: + name: ${deploymentName} + namespace: ${testNamespace} + labels: + app: ${deploymentName} +spec: + replicas: 0 + selector: + matchLabels: + app: ${deploymentName} + template: + metadata: + labels: + app: ${deploymentName} + spec: + containers: + - name: noop-processor + image: ubuntu:20.04 + command: ["/bin/bash", "-c", "--"] + args: ["sleep 10"] + env: + - name: GOOGLE_APPLICATION_CREDENTIALS_JSON + valueFrom: + secretKeyRef: + name: gcp-storage-secrets + key: creds.json +--- +apiVersion: v1 +kind: Secret +metadata: + name: gcp-storage-secrets +type: Opaque +data: + creds.json: {{GCP_CREDS}} +--- +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: test-scaledobject +spec: + scaleTargetRef: + name: ${deploymentName} + pollingInterval: 5 + maxReplicaCount: ${maxReplicaCount} + cooldownPeriod: 10 + triggers: + - type: gcp-storage + metadata: + bucketName: ${bucketName} + targetObjectCount: '5' + credentialsFromEnv: GOOGLE_APPLICATION_CREDENTIALS_JSON +` + +const gcpSdkYaml = `apiVersion: apps/v1 +kind: Deployment +metadata: + name: gcp-sdk + namespace: ${testNamespace} + labels: + app: gcp-sdk +spec: + replicas: 1 + selector: + matchLabels: + app: gcp-sdk + template: + metadata: + labels: + app: gcp-sdk + spec: + containers: + - name: gcp-sdk-container + image: google/cloud-sdk:slim + # Just spin & wait forever + command: [ "/bin/bash", "-c", "--" ] + args: [ "ls /tmp && while true; do sleep 30; done;" ] + volumeMounts: + - name: secret-volume + mountPath: /etc/secret-volume + volumes: + - name: secret-volume + secret: + secretName: gcp-storage-secrets +`