Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Cassandra scaler and tests #2211

Merged
merged 4 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add scalerIndex
Signed-off-by: nilayasiktoprak <[email protected]>
  • Loading branch information
nilayasiktoprak committed Oct 26, 2021
commit c2c83c416c13aae371880255c56e18a1cf8dd6d4
16 changes: 10 additions & 6 deletions pkg/scalers/cassandra_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ import (
"strings"

"github.com/gocql/gocql"
kedautil "github.com/kedacore/keda/v2/pkg/util"
v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)

// cassandraScaler exposes a data pointer to CassandraMetadata and gocql.Session connection.
Expand All @@ -34,6 +35,7 @@ type CassandraMetadata struct {
query string
targetQueryValue int
metricName string
scalerIndex int
}

var cassandraLog = logf.Log.WithName("cassandra_scaler")
Expand Down Expand Up @@ -137,6 +139,8 @@ func ParseCassandraMetadata(config *ScalerConfig) (*CassandraMetadata, error) {
return nil, fmt.Errorf("no password given")
}

meta.scalerIndex = config.ScalerIndex

return &meta, nil
}

Expand All @@ -161,7 +165,7 @@ func NewCassandraSession(meta *CassandraMetadata) (*gocql.Session, error) {

// IsActive returns true if there are pending events to be processed.
func (s *cassandraScaler) IsActive(ctx context.Context) (bool, error) {
messages, err := s.GetQueryResult()
messages, err := s.GetQueryResult(ctx)
if err != nil {
return false, fmt.Errorf("error inspecting cassandra: %s", err)
}
Expand All @@ -174,7 +178,7 @@ func (s *cassandraScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
targetQueryValue := resource.NewQuantity(int64(s.metadata.targetQueryValue), resource.DecimalSI)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: s.metadata.metricName,
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, s.metadata.metricName),
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
Expand All @@ -190,7 +194,7 @@ func (s *cassandraScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {

// GetMetrics returns a value for a supported metric or an error if there is a problem getting the metric.
func (s *cassandraScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
num, err := s.GetQueryResult()
num, err := s.GetQueryResult(ctx)
if err != nil {
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error inspecting cassandra: %s", err)
}
Expand All @@ -205,9 +209,9 @@ func (s *cassandraScaler) GetMetrics(ctx context.Context, metricName string, met
}

// GetQueryResult returns the result of the scaler query.
func (s *cassandraScaler) GetQueryResult() (int, error) {
func (s *cassandraScaler) GetQueryResult(ctx context.Context) (int, error) {
var value int
if err := s.session.Query(s.metadata.query).Scan(&value); err != nil {
if err := s.session.Query(s.metadata.query).WithContext(ctx).Scan(&value); err != nil {
if err != gocql.ErrNotFound {
cassandraLog.Error(err, "query failed")
return 0, err
Expand Down
186 changes: 56 additions & 130 deletions pkg/scalers/cassandra_scaler_test.go
Original file line number Diff line number Diff line change
@@ -1,153 +1,79 @@
package scalers

import (
"errors"
"fmt"
"testing"

"github.com/gocql/gocql"
)

type cassandraTestData struct {
// test inputs
type parseCassandraMetadataTestData struct {
metadata map[string]string
isError bool
authParams map[string]string

// expected outputs
expectedMetricName string
expectedConsistency gocql.Consistency
expectedProtocolVersion int
expectedClusterIPAddress string
expectedError error
}

var testCassandraInputs = []cassandraTestData{
// metricName written
{
metadata: map[string]string{"query": "SELECT COUNT(*) FROM test_keyspace.test_table;", "targetQueryValue": "1", "username": "cassandra", "clusterIPAddress": "cassandra.test:9042", "keyspace": "test_keyspace", "metricName": "myMetric"},
authParams: map[string]string{"password": "Y2Fzc2FuZHJhCg=="},
expectedMetricName: "cassandra-myMetric",
},

// keyspace written, no metricName
{
metadata: map[string]string{"query": "SELECT COUNT(*) FROM test_keyspace.test_table;", "targetQueryValue": "1", "username": "cassandra", "clusterIPAddress": "cassandra.test:9042", "keyspace": "test_keyspace"},
authParams: map[string]string{"password": "Y2Fzc2FuZHJhCg=="},
expectedMetricName: "cassandra-test_keyspace",
},

// metricName and keyspace written
{
metadata: map[string]string{"query": "SELECT COUNT(*) FROM test_keyspace.test_table;", "targetQueryValue": "1", "username": "cassandra", "clusterIPAddress": "cassandra.test:9042", "metricName": "myMetric", "keyspace": "test_keyspace"},
authParams: map[string]string{"password": "Y2Fzc2FuZHJhCg=="},
expectedMetricName: "cassandra-myMetric",
},

// consistency and protocolVersion not written
{
metadata: map[string]string{"query": "SELECT COUNT(*) FROM test_keyspace.test_table;", "targetQueryValue": "1", "username": "cassandra", "clusterIPAddress": "cassandra.test:9042", "keyspace": "test_keyspace"},
authParams: map[string]string{"password": "Y2Fzc2FuZHJhCg=="},
expectedConsistency: gocql.One,
expectedProtocolVersion: 4,
},

// Error: keyspace not written
{
metadata: map[string]string{"query": "SELECT COUNT(*) FROM test_keyspace.test_table;", "targetQueryValue": "1", "username": "cassandra", "clusterIPAddress": "cassandra.test:9042", "metricName": "myMetric"},
authParams: map[string]string{"password": "Y2Fzc2FuZHJhCg=="},
expectedError: errors.New("no keyspace given"),
},

// Error: missing query
{
metadata: map[string]string{"targetQueryValue": "1", "username": "cassandra", "clusterIPAddress": "cassandra.test:9042", "keyspace": "test_keyspace"},
authParams: map[string]string{"password": "Y2Fzc2FuZHJhCg=="},
expectedError: errors.New("no query given"),
},

// Error: missing targetQueryValue
{
metadata: map[string]string{"query": "SELECT COUNT(*) FROM test_keyspace.test_table;", "username": "cassandra", "clusterIPAddress": "cassandra.test:9042", "keyspace": "test_keyspace"},
authParams: map[string]string{"password": "Y2Fzc2FuZHJhCg=="},
expectedError: errors.New("no targetQueryValue given"),
},

// Error: missing username
{
metadata: map[string]string{"query": "SELECT COUNT(*) FROM test_keyspace.test_table;", "targetQueryValue": "1", "clusterIPAddress": "cassandra.test:9042", "keyspace": "test_keyspace"},
authParams: map[string]string{"password": "Y2Fzc2FuZHJhCg=="},
expectedError: errors.New("no username given"),
},

// Error: missing clusterIPAddress
{
metadata: map[string]string{"query": "SELECT COUNT(*) FROM test_keyspace.test_table;", "targetQueryValue": "1", "username": "cassandra", "keyspace": "test_keyspace"},
authParams: map[string]string{"password": "Y2Fzc2FuZHJhCg=="},
expectedError: errors.New("no cluster IP address given"),
},

// Error: missing port
{
metadata: map[string]string{"query": "SELECT COUNT(*) FROM test_keyspace.test_table;", "clusterIPAddress": "cassandra.test", "targetQueryValue": "1", "username": "cassandra", "keyspace": "test_keyspace"},
authParams: map[string]string{"password": "Y2Fzc2FuZHJhCg=="},
expectedError: errors.New("no port given"),
},

// Error: missing password
{
metadata: map[string]string{"query": "SELECT COUNT(*) FROM test_keyspace.test_table;", "targetQueryValue": "1", "username": "cassandra", "clusterIPAddress": "cassandra.test:9042", "keyspace": "test_keyspace"},
authParams: map[string]string{},
expectedError: errors.New("no password given"),
},
type cassandraMetricIdentifier struct {
metadataTestData *parseCassandraMetadataTestData
scalerIndex int
name string
}

func TestParseCassandraMetadata(t *testing.T) {
for _, testData := range testCassandraInputs {
var config = ScalerConfig{
TriggerMetadata: testData.metadata,
AuthParams: testData.authParams,
}

outputMetadata, err := ParseCassandraMetadata(&config)
fmt.Printf("Expected error '%v'\n", testData.expectedError)
fmt.Printf("Got error '%v'\n", err)
if err != nil {
if testData.expectedError == nil {
t.Errorf("Unexpected error parsing input metadata: %v", err)
} else if testData.expectedError.Error() != err.Error() {
t.Errorf("Expected error '%v' but got '%v'", testData.expectedError, err)
}

continue
}

expectedQuery := "SELECT COUNT(*) FROM test_keyspace.test_table;"
if outputMetadata.query != expectedQuery {
t.Errorf("Wrong query. Expected '%s' but got '%s'", expectedQuery, outputMetadata.query)
}
var testCassandraMetadata = []parseCassandraMetadataTestData{
// nothing passed
{map[string]string{}, true, map[string]string{}},
// everything is passed in verbatim
{map[string]string{"query": "SELECT COUNT(*) FROM test_keyspace.test_table;", "targetQueryValue": "1", "username": "cassandra", "port": "9042", "clusterIPAddress": "cassandra.test", "keyspace": "test_keyspace", "ScalerIndex": "0", "metricName": "myMetric"}, false, map[string]string{"password": "Y2Fzc2FuZHJhCg=="}},
// no metricName passed, metricName is generated from keyspace
{map[string]string{"query": "SELECT COUNT(*) FROM test_keyspace.test_table;", "targetQueryValue": "1", "username": "cassandra", "clusterIPAddress": "cassandra.test:9042", "keyspace": "test_keyspace", "ScalerIndex": "0"}, false, map[string]string{"password": "Y2Fzc2FuZHJhCg=="}},
// no query passed
{map[string]string{"targetQueryValue": "1", "username": "cassandra", "clusterIPAddress": "cassandra.test:9042", "keyspace": "test_keyspace", "ScalerIndex": "0", "metricName": "myMetric"}, true, map[string]string{"password": "Y2Fzc2FuZHJhCg=="}},
// no targetQueryValue passed
{map[string]string{"query": "SELECT COUNT(*) FROM test_keyspace.test_table;", "username": "cassandra", "clusterIPAddress": "cassandra.test:9042", "keyspace": "test_keyspace", "ScalerIndex": "0", "metricName": "myMetric"}, true, map[string]string{"password": "Y2Fzc2FuZHJhCg=="}},
// no username passed
{map[string]string{"query": "SELECT COUNT(*) FROM test_keyspace.test_table;", "targetQueryValue": "1", "clusterIPAddress": "cassandra.test:9042", "keyspace": "test_keyspace", "ScalerIndex": "0", "metricName": "myMetric"}, true, map[string]string{"password": "Y2Fzc2FuZHJhCg=="}},
// no port passed
{map[string]string{"query": "SELECT COUNT(*) FROM test_keyspace.test_table;", "targetQueryValue": "1", "username": "cassandra", "clusterIPAddress": "cassandra.test", "keyspace": "test_keyspace", "ScalerIndex": "0", "metricName": "myMetric"}, true, map[string]string{"password": "Y2Fzc2FuZHJhCg=="}},
// no clusterIPAddress passed
{map[string]string{"query": "SELECT COUNT(*) FROM test_keyspace.test_table;", "targetQueryValue": "1", "username": "cassandra", "port": "9042", "keyspace": "test_keyspace", "ScalerIndex": "0", "metricName": "myMetric"}, true, map[string]string{"password": "Y2Fzc2FuZHJhCg=="}},
// no keyspace passed
{map[string]string{"query": "SELECT COUNT(*) FROM test_keyspace.test_table;", "targetQueryValue": "1", "username": "cassandra", "clusterIPAddress": "cassandra.test:9042", "ScalerIndex": "0", "metricName": "myMetric"}, true, map[string]string{"password": "Y2Fzc2FuZHJhCg=="}},
// no password passed
{map[string]string{"query": "SELECT COUNT(*) FROM test_keyspace.test_table;", "targetQueryValue": "1", "username": "cassandra", "clusterIPAddress": "cassandra.test:9042", "keyspace": "test_keyspace", "ScalerIndex": "0", "metricName": "myMetric"}, true, map[string]string{}},
}

expectedTargetQueryValue := 1
if outputMetadata.targetQueryValue != expectedTargetQueryValue {
t.Errorf("Wrong targetQueryValue. Expected %d but got %d", expectedTargetQueryValue, outputMetadata.targetQueryValue)
}
var cassandraMetricIdentifiers = []cassandraMetricIdentifier{
{&testCassandraMetadata[1], 0, "s0-cassandra-myMetric"},
{&testCassandraMetadata[2], 1, "s1-cassandra-test_keyspace"},
}

expectedConsistency := gocql.One
if testData.expectedConsistency != 0 && testData.expectedConsistency != outputMetadata.consistency {
t.Errorf("Wrong consistency. Expected %d but got %d", expectedConsistency, outputMetadata.consistency)
func TestCassandraParseMetadata(t *testing.T) {
testCaseNum := 1
for _, testData := range testCassandraMetadata {
_, err := ParseCassandraMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams})
if err != nil && !testData.isError {
t.Errorf("Expected success but got error for unit test # %v", testCaseNum)
}

expectedProtocolVersion := 4
if testData.expectedProtocolVersion != 0 && testData.expectedProtocolVersion != outputMetadata.protocolVersion {
t.Errorf("Wrong protocol version. Expected %d but got %d", expectedProtocolVersion, outputMetadata.protocolVersion)
if testData.isError && err == nil {
t.Errorf("Expected error but got success for unit test # %v", testCaseNum)
}
testCaseNum++
}
}

expectedClusterIPAddress := "cassandra.test:9042"
if testData.expectedClusterIPAddress != "" && testData.expectedClusterIPAddress != outputMetadata.clusterIPAddress {
t.Errorf("Wrong clusterIPAddress. Expected %s but got %s", expectedClusterIPAddress, outputMetadata.clusterIPAddress)
func TestCassandraGetMetricSpecForScaling(t *testing.T) {
for _, testData := range cassandraMetricIdentifiers {
meta, err := ParseCassandraMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ScalerIndex: testData.scalerIndex, AuthParams: testData.metadataTestData.authParams})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}

if testData.expectedMetricName != "" && testData.expectedMetricName != outputMetadata.metricName {
t.Errorf("Wrong metric name. Expected '%s' but got '%s'", testData.expectedMetricName, outputMetadata.metricName)
cluster := gocql.NewCluster(meta.clusterIPAddress)
session, _ := cluster.CreateSession()
mockCassandraScaler := cassandraScaler{meta, session}

metricSpec := mockCassandraScaler.GetMetricSpecForScaling()
metricName := metricSpec[0].External.Metric.Name
if metricName != testData.name {
t.Errorf("Wrong External metric source name: %s, expected: %s", metricName, testData.name)
}
}
}