From 16acb304f660fd63eaa0cce95107b0d7860a29e4 Mon Sep 17 00:00:00 2001 From: Ram Cohen Date: Thu, 10 Feb 2022 09:42:38 +0200 Subject: [PATCH 1/9] Return 0 instead of -1 when detecting an invalid offset in Kafka When getting the consumer offset of a Kafka topic for which no offset was committed yet, the scaler returns with -1 instead of 0, which causes to scale to the maximum number of replicas. Also fixed some typos and used interim variables for error strings. Fixes #2612 Signed-off-by: Ram Cohen --- pkg/scalers/kafka_scaler.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index 01a691f3552..11f83601b41 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -128,7 +128,7 @@ func parseKafkaMetadata(config *ScalerConfig) (kafkaMetadata, error) { meta.topic = config.TriggerMetadata["topic"] default: meta.topic = "" - kafkaLog.V(1).Info(fmt.Sprintf("cosumer group %s has no topic specified, "+ + kafkaLog.V(1).Info(fmt.Sprintf("consumer group %s has no topic specified, "+ "will use all topics subscribed by the consumer group for scaling", meta.group)) } @@ -343,17 +343,19 @@ func (s *kafkaScaler) getConsumerOffsets(topicPartitions map[string][]int32) (*s func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offsets *sarama.OffsetFetchResponse, topicPartitionOffsets map[string]map[int32]int64) (int64, error) { block := offsets.GetBlock(topic, partitionID) if block == nil { - kafkaLog.Error(fmt.Errorf("error finding offset block for topic %s and partition %d", topic, partitionID), "") - return 0, fmt.Errorf("error finding offset block for topic %s and partition %d", topic, partitionID) + errMsg := fmt.Errorf("error finding offset block for topic %s and partition %d", topic, partitionID) + kafkaLog.Error(errMsg, "") + return 0, errMsg } consumerOffset := block.Offset if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == latest { - kafkaLog.V(0).Info(fmt.Sprintf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", topic, s.metadata.group, partitionID)) - return invalidOffset, fmt.Errorf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", topic, s.metadata.group, partitionID) + errMsg := fmt.Errorf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", topic, s.metadata.group, partitionID) + kafkaLog.V(0).Info(errMsg.Error()) + return 0, errMsg } if _, found := topicPartitionOffsets[topic]; !found { - return 0, fmt.Errorf("error finding parition offset for topic %s", topic) + return 0, fmt.Errorf("error finding partition offset for topic %s", topic) } latestOffset := topicPartitionOffsets[topic][partitionID] if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == earliest { From dfb0d57ef54594931d442dce99d800a162c59315 Mon Sep 17 00:00:00 2001 From: Ram Cohen Date: Thu, 10 Feb 2022 16:33:24 +0200 Subject: [PATCH 2/9] Add a parameter to specify whether to scale to 1 or 0 on an invalid offset Signed-off-by: Ram Cohen --- pkg/scalers/kafka_scaler.go | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index 11f83601b41..03714536747 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -35,6 +35,10 @@ type kafkaMetadata struct { allowIdleConsumers bool version sarama.KafkaVersion + // If an invalid offset is found, whether to scale to 1 (false - the default) so consumption can + // occur or scale to 0 (true). See discussion in https://github.com/kedacore/keda/issues/2612 + scaleToZeroOnInvalidOffset bool + // SASL saslType kafkaSaslType username string @@ -204,6 +208,15 @@ func parseKafkaMetadata(config *ScalerConfig) (kafkaMetadata, error) { meta.allowIdleConsumers = t } + meta.scaleToZeroOnInvalidOffset = false + if val, ok := config.TriggerMetadata["scaleToZeroOnInvalidOffset"]; ok { + t, err := strconv.ParseBool(val) + if err != nil { + return meta, fmt.Errorf("error parsing scaleToZeroOnInvalidOffset: %s", err) + } + meta.scaleToZeroOnInvalidOffset = t + } + meta.version = sarama.V1_0_0_0 if val, ok := config.TriggerMetadata["version"]; ok { val = strings.TrimSpace(val) @@ -349,9 +362,15 @@ func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offset } consumerOffset := block.Offset if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == latest { - errMsg := fmt.Errorf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", topic, s.metadata.group, partitionID) + retVal := 1; + if s.metadata.scaleToZeroOnInvalidOffset { + retVal = 0; + } + errMsg := fmt.Errorf( + "invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet. Scaling to %d", + topic, s.metadata.group, partitionID, retVal) kafkaLog.V(0).Info(errMsg.Error()) - return 0, errMsg + return retVal, errMsg } if _, found := topicPartitionOffsets[topic]; !found { From 43c9e22f9e5a8a0b1ed2648b5561e5b03806b69c Mon Sep 17 00:00:00 2001 From: Ram Cohen Date: Thu, 10 Feb 2022 16:39:03 +0200 Subject: [PATCH 3/9] Fix int64 type Signed-off-by: Ram Cohen --- pkg/scalers/kafka_scaler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index 03714536747..319c14d78e9 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -362,7 +362,7 @@ func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offset } consumerOffset := block.Offset if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == latest { - retVal := 1; + retVal := int64(1) if s.metadata.scaleToZeroOnInvalidOffset { retVal = 0; } From 77eda45d31f41cda7bc4bbe0d0b8b0ad79725576 Mon Sep 17 00:00:00 2001 From: Ram Cohen Date: Thu, 10 Feb 2022 17:03:37 +0200 Subject: [PATCH 4/9] Remove semicolon Signed-off-by: Ram Cohen --- pkg/scalers/kafka_scaler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index 319c14d78e9..d0893d2b9f9 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -364,7 +364,7 @@ func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offset if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == latest { retVal := int64(1) if s.metadata.scaleToZeroOnInvalidOffset { - retVal = 0; + retVal = 0 } errMsg := fmt.Errorf( "invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet. Scaling to %d", From b0daec37470bfecc0d515fabda92326628909668 Mon Sep 17 00:00:00 2001 From: Ram Cohen Date: Wed, 16 Mar 2022 13:45:04 +0200 Subject: [PATCH 5/9] Update kafka test to use strimzi operator v0.23.0 Update to use strimzi operator to v0.23.0 and Kafka 2.6.0 in order to properly work on Kubernetes 1.21 and up due to deprecation of beta CRD api Also refactor common deploy and status checks to use internal methods Signed-off-by: Ram Cohen --- tests/scalers/kafka.test.ts | 213 +++++++++++------------------------- 1 file changed, 65 insertions(+), 148 deletions(-) diff --git a/tests/scalers/kafka.test.ts b/tests/scalers/kafka.test.ts index 102190c8148..45368034f34 100644 --- a/tests/scalers/kafka.test.ts +++ b/tests/scalers/kafka.test.ts @@ -1,8 +1,8 @@ import * as fs from 'fs' import * as sh from 'shelljs' import * as tmp from 'tmp' -import test from 'ava'; -import { createNamespace } from './helpers'; +import test, { Assertions } from 'ava'; +import { createNamespace, waitForDeploymentReplicaCount } from './helpers'; const defaultNamespace = 'kafka-test' const defaultCluster = 'kafka-cluster' @@ -10,8 +10,7 @@ const timeToWait = 300 const defaultTopic = 'kafka-topic' const defaultTopic2 = 'kafka-topic-2' const defaultKafkaClient = 'kafka-client' -const strimziOperatorVersion = '0.18.0' -const commandToCheckReplicas = `kubectl get deployments/kafka-consumer --namespace ${defaultNamespace} -o jsonpath="{.spec.replicas}"` +const strimziOperatorVersion = '0.23.0' const strimziOperatorYamlFile = tmp.fileSync() const kafkaClusterYamlFile = tmp.fileSync() @@ -24,123 +23,61 @@ const scaledObjectEarliestYamlFile = tmp.fileSync() const scaledObjectLatestYamlFile = tmp.fileSync() const scaledObjectMultipleTopicsYamlFile = tmp.fileSync() -test.before('Set up, create necessary resources.', t => { - sh.config.silent = true +function deployFromYaml(t: Assertions, filename: string, yaml: string, name: string) { + sh.exec(`echo Deploying ${name}`) + fs.writeFileSync(filename, yaml) + t.is(0, sh.exec(`kubectl apply -f ${filename} --namespace ${defaultNamespace}`).code, `Deploying ${name} should work.`) +} + +function waitForReady(t: Assertions, app: string, name: string, condition: string = 'Ready') { + sh.exec(`echo Waiting for ${app} for ${timeToWait} seconds to be ${condition}`) + t.is( + 0, + sh.exec(`kubectl wait ${app} --for=condition=${condition} --timeout=${timeToWait}s --namespace ${defaultNamespace}`).code, + `${name} should be ready within given time limit.` + ) +} + +test.before('Set up, create necessary resources.', async t => { createNamespace(defaultNamespace) + sh.config.silent = true const strimziOperatorYaml = sh.exec(`curl -L https://github.com/strimzi/strimzi-kafka-operator/releases/download/${strimziOperatorVersion}/strimzi-cluster-operator-${strimziOperatorVersion}.yaml`).stdout - fs.writeFileSync(strimziOperatorYamlFile.name, strimziOperatorYaml.replace(/myproject/g, `${defaultNamespace}`)) - t.is( - 0, - sh.exec(`kubectl apply -f ${strimziOperatorYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deploying Strimzi operator should work.' - ) + sh.config.silent = false - fs.writeFileSync(kafkaClusterYamlFile.name, kafkaClusterYaml) - t.is( - 0, - sh.exec(`kubectl apply -f ${kafkaClusterYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deploying Kafka cluster instance should work.' - ) - t.is( - 0, - sh.exec(`kubectl wait kafka/${defaultCluster} --for=condition=Ready --timeout=${timeToWait}s --namespace ${defaultNamespace}`).code, - 'Kafka instance should be ready within given time limit.' - ) + deployFromYaml(t, strimziOperatorYamlFile.name, strimziOperatorYaml.replace(/myproject/g, `${defaultNamespace}`), 'Strimzi operator') + deployFromYaml(t, kafkaClusterYamlFile.name, kafkaClusterYaml, 'Kafka cluster') + waitForReady(t, `kafka/${defaultCluster}`,'Kafka instance') - fs.writeFileSync(kafkaTopicYamlFile.name, kafkaTopicsYaml) - t.is( - 0, - sh.exec(`kubectl apply -f ${kafkaTopicYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deploying Kafka topic should work.' - ) - t.is( - 0, - sh.exec(`kubectl wait kafkatopic/${defaultTopic} --for=condition=Ready --timeout=${timeToWait}s --namespace ${defaultNamespace}`).code, - 'Kafka topic should be ready withlanguage-mattersin given time limit.' - ) - t.is( - 0, - sh.exec(`kubectl wait kafkatopic/${defaultTopic2} --for=condition=Ready --timeout=${timeToWait}s --namespace ${defaultNamespace}`).code, - 'Kafka topic2 should be ready within given time limit.' - ) + deployFromYaml(t, kafkaTopicYamlFile.name, kafkaTopicsYaml, 'Kafka topic') + waitForReady(t, `kafkatopic/${defaultTopic}`,'Kafka topic') + waitForReady(t, `kafkatopic/${defaultTopic2}`,'Kafka topic2') - fs.writeFileSync(kafkaClientYamlFile.name, kafkaClientYaml) - t.is( - 0, - sh.exec(`kubectl apply -f ${kafkaClientYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deploying Kafka client should work.' - ) - t.is( - 0, - sh.exec(`kubectl wait pod/${defaultKafkaClient} --for=condition=Ready --timeout=${timeToWait}s --namespace ${defaultNamespace}`).code, - 'Kafka client should be ready within given time limit.' - ) + deployFromYaml(t, kafkaClientYamlFile.name, kafkaClientYaml, 'Kafka client') + waitForReady(t, `pod/${defaultKafkaClient}`,'Kafka client') - fs.writeFileSync(kafkaApplicationEarliestYamlFile.name, kafkaApplicationEarliestYaml) + deployFromYaml(t, kafkaApplicationEarliestYamlFile.name, kafkaApplicationEarliestYaml, 'Kafka application') + deployFromYaml(t, scaledObjectEarliestYamlFile.name, scaledObjectEarliestYaml, 'Scaled Object') + waitForReady(t, 'deployment/kafka-consumer','Kafka application', 'Available') - t.is( - 0, - sh.exec(`kubectl apply -f ${kafkaApplicationEarliestYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deploying Kafka application should work.' - ) - fs.writeFileSync(scaledObjectEarliestYamlFile.name, scaledObjectEarliestYaml) - t.is( - 0, - sh.exec(`kubectl apply -f ${scaledObjectEarliestYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deploying Scaled Object should work.' - ) - t.is( - 0, - sh.exec(`kubectl wait deployment/kafka-consumer --for=condition=Available --timeout=${timeToWait}s --namespace ${defaultNamespace}`).code, - 'Kafka application should be ready within given time limit.' - ) - waitForReplicaCount(0, commandToCheckReplicas) - t.is('0', sh.exec(commandToCheckReplicas).stdout, 'Replica count should be 0.') + t.true(await waitForDeploymentReplicaCount(0, 'kafka-consumer', defaultNamespace, 30, 2000), 'replica count should start out as 0') }); -function waitForReplicaCount(desiredReplicaCount: number, commandToCheck: string) { - let replicaCount = undefined - let changed = undefined - for (let i = 0; i < 10; i++) { - changed = false - // checks the replica count 3 times, it tends to fluctuate from the beginning - for (let j = 0; j < 3; j++) { - replicaCount = sh.exec(commandToCheck).stdout - if (replicaCount === desiredReplicaCount.toString()) { - sh.exec('sleep 2s') - } else { - changed = true - break - } - } - if (changed === false) { - return - } else { - sh.exec('sleep 3s') - } - } -} - -test.serial('Scale application with kafka messages.', t => { +test.serial('Scale application with kafka messages.', async t => { for (let r = 1; r <= 3; r++) { sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic}'`) sh.exec(`sleep 5s`) - waitForReplicaCount(r, commandToCheckReplicas) - - t.is(r.toString(), sh.exec(commandToCheckReplicas).stdout, `Replica count should be ${r}.`) + t.true(await waitForDeploymentReplicaCount(r, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be ${r}.`) } }) -test.serial('Scale application beyond partition max.', t => { +test.serial('Scale application beyond partition max.', async t => { sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic}'`) sh.exec(`sleep 5s`) - waitForReplicaCount(3, commandToCheckReplicas) - - t.is('3', sh.exec(commandToCheckReplicas).stdout, `Replica count should be 3.`) + t.true(await waitForDeploymentReplicaCount(3, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 3.`) }) test.serial('cleanup after earliest policy test', t=> { @@ -158,40 +95,27 @@ test.serial('cleanup after earliest policy test', t=> { sh.exec(`sleep 30s`) }) -test.serial('Applying ScaledObject latest policy should not scale up pods', t => { +test.serial('Applying ScaledObject latest policy should not scale up pods', async t => { //Make the consumer commit the first offset for each partition. sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'kafka-console-consumer --bootstrap-server ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic} --group latest --from-beginning --consumer-property enable.auto.commit=true --timeout-ms 15000'`) - fs.writeFileSync(kafkaApplicationLatestYamlFile.name, kafkaApplicationLatestYaml) - t.is( - 0, - sh.exec(`kubectl apply -f ${kafkaApplicationLatestYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deploying Kafka application should work.' - ) + deployFromYaml(t, kafkaApplicationLatestYamlFile.name, kafkaApplicationLatestYaml, 'Kafka application') sh.exec(`sleep 10s`) - fs.writeFileSync(scaledObjectLatestYamlFile.name, scaledObjectLatestYaml) - t.is( - 0, - sh.exec(`kubectl apply -f ${scaledObjectLatestYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deploying Scaled Object should work.' - ) + deployFromYaml(t, scaledObjectLatestYamlFile.name, scaledObjectLatestYaml, 'Scaled Object') sh.exec(`sleep 5s`) - waitForReplicaCount(1, commandToCheckReplicas) - t.is('0', sh.exec(commandToCheckReplicas).stdout, 'Replica count should be 0.') + t.true(await waitForDeploymentReplicaCount(0, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 0.`) }) -test.serial('Latest Scale object should scale with new messages', t => { +test.serial('Latest Scale object should scale with new messages', async t => { for (let r = 1; r <= 3; r++) { sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic}'`) sh.exec(`sleep 5s`) - waitForReplicaCount(r, commandToCheckReplicas) - - t.is(r.toString(), sh.exec(commandToCheckReplicas).stdout, `Replica count should be ${r}.`) + t.true(await waitForDeploymentReplicaCount(r, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be ${r}.`) } }) @@ -209,37 +133,24 @@ test.serial('Cleanup after latest policy test', t=> { sh.exec(`sleep 10s`) }) -test.serial('Applying ScaledObject with multiple topics should scale up pods', t => { +test.serial('Applying ScaledObject with multiple topics should scale up pods', async t => { // Make the consumer commit the all offsets for all topics in the group sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'kafka-console-consumer --bootstrap-server "${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092" --topic ${defaultTopic} --group multiTopic --from-beginning --consumer-property enable.auto.commit=true --timeout-ms 15000'`) sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'kafka-console-consumer --bootstrap-server "${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092" --topic ${defaultTopic2} --group multiTopic --from-beginning --consumer-property enable.auto.commit=true --timeout-ms 15000'`) - fs.writeFileSync(kafkaApplicationMultipleTopicsYamlFile.name, kafkaApplicationMultipleTopicsYaml) - t.is( - 0, - sh.exec(`kubectl apply -f ${kafkaApplicationMultipleTopicsYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deploying Kafka application should work.' - ) + deployFromYaml(t, kafkaApplicationMultipleTopicsYamlFile.name, kafkaApplicationMultipleTopicsYaml, 'Kafka application') sh.exec(`sleep 5s`) - fs.writeFileSync(scaledObjectMultipleTopicsYamlFile.name, scaledObjectMultipleTopicsYaml) - - t.is( - 0, - sh.exec(`kubectl apply -f ${scaledObjectMultipleTopicsYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deploying Scaled Object should work.' - ) + deployFromYaml(t, scaledObjectMultipleTopicsYamlFile.name, scaledObjectMultipleTopicsYaml, ' Scaled Object') sh.exec(`sleep 5s`) // when lag is 0, scaled object is not active, replica = 0 - waitForReplicaCount(0, commandToCheckReplicas) - t.is('0', sh.exec(commandToCheckReplicas).stdout, 'Replica count should be 0.') + t.true(await waitForDeploymentReplicaCount(0, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 0.`) // produce a single msg to the default topic // should turn scale object active, replica = 1 sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -exc 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic}'`) sh.exec(`sleep 5s`) - waitForReplicaCount(1, commandToCheckReplicas) - t.is('1', sh.exec(commandToCheckReplicas).stdout, 'Replica count should be 1.') + t.true(await waitForDeploymentReplicaCount(1, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 1.`) // produce one more msg to the different topic within the same group // will turn total consumer group lag to 2. @@ -248,14 +159,12 @@ test.serial('Applying ScaledObject with multiple topics should scale up pods', t // as desiredReplicaCount = totalLag / avgThreshold sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -exc 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic2}'`) sh.exec(`sleep 5s`) - waitForReplicaCount(2, commandToCheckReplicas) - t.is('2', sh.exec(commandToCheckReplicas).stdout, 'Replica count should be 2.') + t.true(await waitForDeploymentReplicaCount(2, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 2.`) // make it 3 cause why not? sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -exc 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic}'`) sh.exec(`sleep 5s`) - waitForReplicaCount(3, commandToCheckReplicas) - t.is('3', sh.exec(commandToCheckReplicas).stdout, 'Replica count should be 3.') + t.true(await waitForDeploymentReplicaCount(3, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 3.`) }) test.serial('Cleanup after multiple topics test', t=> { @@ -289,23 +198,31 @@ test.after.always('Clean up, delete created resources.', t => { ] for (const resource of resources) { - sh.exec(`kubectl delete ${resource} --namespace ${defaultNamespace}`) + sh.exec(`echo Deleting resource from file ${resource}`) + sh.exec(`kubectl delete -f ${resource} --namespace ${defaultNamespace}`) } + sh.exec(`echo Deleting namespace ${defaultNamespace}`) sh.exec(`kubectl delete namespace ${defaultNamespace}`) }) -const kafkaClusterYaml = `apiVersion: kafka.strimzi.io/v1beta1 +const kafkaClusterYaml = `apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: ${defaultCluster} namespace: ${defaultNamespace} spec: kafka: - version: 2.5.0 + version: "2.6.0" replicas: 3 listeners: - plain: {} - tls: {} + - name: plain + port: 9092 + type: internal + tls: false + - name: tls + port: 9093 + type: internal + tls: true config: offsets.topic.replication.factor: 1 transaction.state.log.replication.factor: 1 @@ -321,7 +238,7 @@ spec: topicOperator: {} userOperator: {}` -const kafkaTopicsYaml = `apiVersion: kafka.strimzi.io/v1beta1 +const kafkaTopicsYaml = `apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: ${defaultTopic} @@ -335,7 +252,7 @@ spec: retention.ms: 604800000 segment.bytes: 1073741824 --- -apiVersion: kafka.strimzi.io/v1beta1 +apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: ${defaultTopic2} From a79eb69a8e1c1c83d17fe47c2a6631fe480b0e34 Mon Sep 17 00:00:00 2001 From: Ram Cohen Date: Wed, 30 Mar 2022 12:49:14 +0300 Subject: [PATCH 6/9] Add tests for new scaleToZeroOnInvalidOffset configuration Signed-off-by: Ram Cohen --- pkg/scalers/kafka_scaler.go | 2 +- tests/scalers/kafka.test.ts | 263 +++++++++++++++++------------------- 2 files changed, 122 insertions(+), 143 deletions(-) diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index d0893d2b9f9..c9ebd4e3a18 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -367,7 +367,7 @@ func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offset retVal = 0 } errMsg := fmt.Errorf( - "invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet. Scaling to %d", + "invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet. Returning with lag of %d", topic, s.metadata.group, partitionID, retVal) kafkaLog.V(0).Info(errMsg.Error()) return retVal, errMsg diff --git a/tests/scalers/kafka.test.ts b/tests/scalers/kafka.test.ts index 45368034f34..a1fb8a077a5 100644 --- a/tests/scalers/kafka.test.ts +++ b/tests/scalers/kafka.test.ts @@ -9,19 +9,19 @@ const defaultCluster = 'kafka-cluster' const timeToWait = 300 const defaultTopic = 'kafka-topic' const defaultTopic2 = 'kafka-topic-2' +const zeroInvalidOffsetTopic = 'kafka-topic-zero-invalid-offset' +const oneInvalidOffsetTopic = 'kafka-topic-one-invalid-offset' +const invalidOffsetGroup = 'invalidOffset' const defaultKafkaClient = 'kafka-client' const strimziOperatorVersion = '0.23.0' +const bootstrapServer = `${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092` const strimziOperatorYamlFile = tmp.fileSync() const kafkaClusterYamlFile = tmp.fileSync() const kafkaTopicYamlFile = tmp.fileSync() const kafkaClientYamlFile = tmp.fileSync() -const kafkaApplicationLatestYamlFile = tmp.fileSync() -const kafkaApplicationEarliestYamlFile = tmp.fileSync() -const kafkaApplicationMultipleTopicsYamlFile = tmp.fileSync() -const scaledObjectEarliestYamlFile = tmp.fileSync() -const scaledObjectLatestYamlFile = tmp.fileSync() -const scaledObjectMultipleTopicsYamlFile = tmp.fileSync() +const kafkaApplicationYamlFile = tmp.fileSync() +const scaledObjectYamlFile = tmp.fileSync() function deployFromYaml(t: Assertions, filename: string, yaml: string, name: string) { sh.exec(`echo Deploying ${name}`) @@ -38,6 +38,30 @@ function waitForReady(t: Assertions, app: string, name: string, condition: strin ) } +function commitPartition(topic: string, group: string) { + sh.exec(`echo Committing partition for ${topic}:${group}`) + return sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'kafka-console-consumer --bootstrap-server "${bootstrapServer}" --topic ${topic} --group ${group} --from-beginning --consumer-property enable.auto.commit=true --timeout-ms 15000'`).code == 0 +} + +function publishMessage(topic: string) { + sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -exc 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${bootstrapServer} --topic ${topic}'`) + sh.exec(`sleep 5s`) +} + +function cleanup(t: Assertions) { + t.is( + 0, + sh.exec(`kubectl delete -f ${scaledObjectYamlFile.name} --namespace ${defaultNamespace}`).code, + 'Deleting Scaled Object should work.' + ) + t.is( + 0, + sh.exec(`kubectl delete -f ${kafkaApplicationYamlFile.name} --namespace ${defaultNamespace}`).code, + 'Deleting kafka application should work.' + ) + sh.exec(`sleep 10s`) +} + test.before('Set up, create necessary resources.', async t => { createNamespace(defaultNamespace) @@ -49,15 +73,25 @@ test.before('Set up, create necessary resources.', async t => { deployFromYaml(t, kafkaClusterYamlFile.name, kafkaClusterYaml, 'Kafka cluster') waitForReady(t, `kafka/${defaultCluster}`,'Kafka instance') + var kafkaTopicsYaml = + kafkaTopicsTemplateYaml.replace('{{TOPIC}}', defaultTopic).replace('{{PARTITIONS}}', '3') + + kafkaTopicsTemplateYaml.replace('{{TOPIC}}', defaultTopic2).replace('{{PARTITIONS}}', '3') + + kafkaTopicsTemplateYaml.replace('{{TOPIC}}', zeroInvalidOffsetTopic).replace('{{PARTITIONS}}', '1') + + kafkaTopicsTemplateYaml.replace('{{TOPIC}}', oneInvalidOffsetTopic).replace('{{PARTITIONS}}', '1') deployFromYaml(t, kafkaTopicYamlFile.name, kafkaTopicsYaml, 'Kafka topic') - waitForReady(t, `kafkatopic/${defaultTopic}`,'Kafka topic') - waitForReady(t, `kafkatopic/${defaultTopic2}`,'Kafka topic2') + waitForReady(t, `kafkatopic/${defaultTopic}`,defaultTopic) + waitForReady(t, `kafkatopic/${defaultTopic2}`,defaultTopic2) + waitForReady(t, `kafkatopic/${zeroInvalidOffsetTopic}`,zeroInvalidOffsetTopic) + waitForReady(t, `kafkatopic/${oneInvalidOffsetTopic}`,oneInvalidOffsetTopic) deployFromYaml(t, kafkaClientYamlFile.name, kafkaClientYaml, 'Kafka client') waitForReady(t, `pod/${defaultKafkaClient}`,'Kafka client') +}); + +test.serial('Applying ScaledObject earliest policy should not scale up pods', async t => { - deployFromYaml(t, kafkaApplicationEarliestYamlFile.name, kafkaApplicationEarliestYaml, 'Kafka application') - deployFromYaml(t, scaledObjectEarliestYamlFile.name, scaledObjectEarliestYaml, 'Scaled Object') + deployFromYaml(t, kafkaApplicationYamlFile.name, kafkaApplicationEarliestYaml, 'Kafka application') + deployFromYaml(t, scaledObjectYamlFile.name, scaledObjectEarliestYaml, 'Scaled Object') waitForReady(t, 'deployment/kafka-consumer','Kafka application', 'Available') t.true(await waitForDeploymentReplicaCount(0, 'kafka-consumer', defaultNamespace, 30, 2000), 'replica count should start out as 0') @@ -65,82 +99,48 @@ test.before('Set up, create necessary resources.', async t => { test.serial('Scale application with kafka messages.', async t => { for (let r = 1; r <= 3; r++) { - - sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic}'`) - sh.exec(`sleep 5s`) - + publishMessage(defaultTopic) t.true(await waitForDeploymentReplicaCount(r, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be ${r}.`) } }) test.serial('Scale application beyond partition max.', async t => { - sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic}'`) - sh.exec(`sleep 5s`) - + publishMessage(defaultTopic) t.true(await waitForDeploymentReplicaCount(3, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 3.`) }) -test.serial('cleanup after earliest policy test', t=> { - t.is( - 0, - sh.exec(`kubectl delete -f ${scaledObjectEarliestYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deleting Scaled Object should work.' - ) - t.is( - 0, - sh.exec(`kubectl delete -f ${kafkaApplicationEarliestYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deleting kafka application should work.' - ) - - sh.exec(`sleep 30s`) -}) +test.serial('cleanup after earliest policy test', t => cleanup(t)) test.serial('Applying ScaledObject latest policy should not scale up pods', async t => { //Make the consumer commit the first offset for each partition. - sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'kafka-console-consumer --bootstrap-server ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic} --group latest --from-beginning --consumer-property enable.auto.commit=true --timeout-ms 15000'`) + t.true(commitPartition(defaultTopic, 'latest'), 'Commit partition should work') - deployFromYaml(t, kafkaApplicationLatestYamlFile.name, kafkaApplicationLatestYaml, 'Kafka application') + deployFromYaml(t, kafkaApplicationYamlFile.name, kafkaApplicationLatestYaml, 'Kafka application') sh.exec(`sleep 10s`) - deployFromYaml(t, scaledObjectLatestYamlFile.name, scaledObjectLatestYaml, 'Scaled Object') + deployFromYaml(t, scaledObjectYamlFile.name, scaledObjectLatestYaml, 'Scaled Object') sh.exec(`sleep 5s`) t.true(await waitForDeploymentReplicaCount(0, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 0.`) }) - test.serial('Latest Scale object should scale with new messages', async t => { for (let r = 1; r <= 3; r++) { - - sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic}'`) - sh.exec(`sleep 5s`) - + publishMessage(defaultTopic) t.true(await waitForDeploymentReplicaCount(r, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be ${r}.`) } }) -test.serial('Cleanup after latest policy test', t=> { - t.is( - 0, - sh.exec(`kubectl delete -f ${scaledObjectLatestYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deleting Scaled Object should work.' - ) - t.is( - 0, - sh.exec(`kubectl delete -f ${kafkaApplicationLatestYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deleting kafka application should work.' - ) - sh.exec(`sleep 10s`) -}) +test.serial('Cleanup after latest policy test', t => cleanup(t)) test.serial('Applying ScaledObject with multiple topics should scale up pods', async t => { // Make the consumer commit the all offsets for all topics in the group - sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'kafka-console-consumer --bootstrap-server "${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092" --topic ${defaultTopic} --group multiTopic --from-beginning --consumer-property enable.auto.commit=true --timeout-ms 15000'`) - sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'kafka-console-consumer --bootstrap-server "${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092" --topic ${defaultTopic2} --group multiTopic --from-beginning --consumer-property enable.auto.commit=true --timeout-ms 15000'`) + t.true(commitPartition(defaultTopic, 'multiTopic'), 'Commit partition should work') + t.true(commitPartition(defaultTopic2, 'multiTopic'), 'Commit partition should work') - deployFromYaml(t, kafkaApplicationMultipleTopicsYamlFile.name, kafkaApplicationMultipleTopicsYaml, 'Kafka application') + deployFromYaml(t, kafkaApplicationYamlFile.name, kafkaApplicationMultipleTopicsYaml, 'Kafka application') sh.exec(`sleep 5s`) - deployFromYaml(t, scaledObjectMultipleTopicsYamlFile.name, scaledObjectMultipleTopicsYaml, ' Scaled Object') + deployFromYaml(t, scaledObjectYamlFile.name, scaledObjectMultipleTopicsYaml, 'Scaled Object') sh.exec(`sleep 5s`) // when lag is 0, scaled object is not active, replica = 0 @@ -148,8 +148,7 @@ test.serial('Applying ScaledObject with multiple topics should scale up pods', a // produce a single msg to the default topic // should turn scale object active, replica = 1 - sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -exc 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic}'`) - sh.exec(`sleep 5s`) + publishMessage(defaultTopic) t.true(await waitForDeploymentReplicaCount(1, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 1.`) // produce one more msg to the different topic within the same group @@ -157,44 +156,50 @@ test.serial('Applying ScaledObject with multiple topics should scale up pods', a // with lagThreshold as 1 -> making hpa AverageValue to 1 // this should turn nb of replicas to 2 // as desiredReplicaCount = totalLag / avgThreshold - sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -exc 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic2}'`) - sh.exec(`sleep 5s`) + publishMessage(defaultTopic2) t.true(await waitForDeploymentReplicaCount(2, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 2.`) // make it 3 cause why not? - sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -exc 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic}'`) - sh.exec(`sleep 5s`) + publishMessage(defaultTopic) t.true(await waitForDeploymentReplicaCount(3, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 3.`) }) -test.serial('Cleanup after multiple topics test', t=> { - t.is( - 0, - sh.exec(`kubectl delete -f ${scaledObjectMultipleTopicsYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deleting Scaled Object should work.' - ) - t.is( - 0, - sh.exec(`kubectl delete -f ${kafkaApplicationMultipleTopicsYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deleting kafka application should work.' - ) +test.serial('Cleanup after multiple topics test', t => cleanup(t)) + +test.serial('Applying ScaledObject zeroOnInvalidOffset policy should not scale up pods', async t => { + + deployFromYaml(t, kafkaApplicationYamlFile.name, kafkaApplicationZeroOnInvalidYaml, 'Kafka application') + deployFromYaml(t, scaledObjectYamlFile.name, scaledObjectZeroOnInvalidOffsetYaml, 'Scaled Object') + waitForReady(t, 'deployment/kafka-consumer','Kafka application', 'Available') + sh.exec(`sleep 30s`) + t.true(await waitForDeploymentReplicaCount(0, 'kafka-consumer', defaultNamespace, 30, 3000), `Replica count should be 0.`) }) +test.serial('cleanup after zeroOnInvalidOffset policy test', t => cleanup(t)) -test.after.always('Clean up, delete created resources.', t => { - const resources = [ - `${scaledObjectEarliestYamlFile.name}`, - `${scaledObjectLatestYamlFile.name}`, - `${scaledObjectMultipleTopicsYamlFile.name}`, +test.serial('Applying ScaledObject oneOnInvalidOffset policy should scale to one pod', async t => { + deployFromYaml(t, kafkaApplicationYamlFile.name, kafkaApplicationOneOnInvalidYaml, 'Kafka application') + deployFromYaml(t, scaledObjectYamlFile.name, scaledObjectOneOnInvalidOffsetYaml, 'Scaled Object') + waitForReady(t, 'deployment/kafka-consumer','Kafka application', 'Available') + sh.exec(`sleep 30s`) + t.true(await waitForDeploymentReplicaCount(1, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 1.`) +}) + +test.serial('oneOnInvalidOffset Scale object should scale to zero when offset is set', async t => { + t.true(commitPartition(oneInvalidOffsetTopic, invalidOffsetGroup), 'Commit partition should work') + publishMessage(oneInvalidOffsetTopic) + t.true(await waitForDeploymentReplicaCount(0, 'kafka-consumer', defaultNamespace, 60, 10000), `Replica count should scale down to 0.`) +}) - `${kafkaApplicationEarliestYamlFile.name}`, - `${kafkaApplicationLatestYamlFile.name}`, - `${kafkaApplicationMultipleTopicsYamlFile.name}`, +test.serial('cleanup after oneOnInvalidOffset policy test', t => cleanup(t)) + +test.after.always('Clean up, delete created resources.', t => { + const resources = [ `${kafkaClientYamlFile.name}`, `${kafkaTopicYamlFile.name}`, `${kafkaClusterYamlFile.name}`, - `${strimziOperatorYamlFile}` + `${strimziOperatorYamlFile.name}` ] for (const resource of resources) { @@ -213,7 +218,7 @@ metadata: spec: kafka: version: "2.6.0" - replicas: 3 + replicas: 1 listeners: - name: plain port: 9092 @@ -238,33 +243,21 @@ spec: topicOperator: {} userOperator: {}` -const kafkaTopicsYaml = `apiVersion: kafka.strimzi.io/v1beta2 +const kafkaTopicsTemplateYaml = `apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: - name: ${defaultTopic} + name: {{TOPIC}} labels: strimzi.io/cluster: ${defaultCluster} namespace: ${defaultNamespace} spec: - partitions: 3 + partitions: {{PARTITIONS}} replicas: 1 config: retention.ms: 604800000 segment.bytes: 1073741824 --- -apiVersion: kafka.strimzi.io/v1beta2 -kind: KafkaTopic -metadata: - name: ${defaultTopic2} - labels: - strimzi.io/cluster: ${defaultCluster} - namespace: ${defaultNamespace} -spec: - partitions: 3 - replicas: 1 - config: - retention.ms: 604800000 - segment.bytes: 1073741824` +` const kafkaClientYaml = `apiVersion: v1 kind: Pod @@ -280,7 +273,7 @@ spec: - -c - "exec tail -f /dev/null"` -const kafkaApplicationLatestYaml = ` +const kafkaApplicationTemplateYaml = ` apiVersion: apps/v1 kind: Deployment metadata: @@ -303,33 +296,12 @@ spec: command: - sh - -c - - "kafka-console-consumer --bootstrap-server ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic} --group latest --consumer-property enable.auto.commit=false"` - + - "kafka-console-consumer --bootstrap-server ${bootstrapServer} PARAMS --consumer-property enable.auto.commit=COMMIT"` -const kafkaApplicationEarliestYaml = ` -apiVersion: apps/v1 -kind: Deployment -metadata: - name: kafka-consumer - namespace: ${defaultNamespace} - labels: - app: kafka-consumer -spec: - selector: - matchLabels: - app: kafka-consumer - template: - metadata: - labels: - app: kafka-consumer - spec: - containers: - - name: kafka-consumer - image: confluentinc/cp-kafka:5.2.1 - command: - - sh - - -c - - "kafka-console-consumer --bootstrap-server ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic} --group earliest --from-beginning --consumer-property enable.auto.commit=false"` +const kafkaApplicationEarliestYaml = kafkaApplicationTemplateYaml.replace(/PARAMS/g, `--topic ${defaultTopic} --group earliest --from-beginning`).replace(/COMMIT/g, 'false') +const kafkaApplicationLatestYaml = kafkaApplicationTemplateYaml.replace(/PARAMS/g, `--topic ${defaultTopic} --group latest`).replace(/COMMIT/g, 'false') +const kafkaApplicationZeroOnInvalidYaml = kafkaApplicationTemplateYaml.replace(/PARAMS/g, `--topic ${zeroInvalidOffsetTopic} --group ${invalidOffsetGroup}`).replace(/COMMIT/g, 'true') +const kafkaApplicationOneOnInvalidYaml = kafkaApplicationTemplateYaml.replace(/PARAMS/g, `--topic ${oneInvalidOffsetTopic} --group ${invalidOffsetGroup} --from-beginning`).replace(/COMMIT/g, 'true') const kafkaApplicationMultipleTopicsYaml = ` apiVersion: apps/v1 @@ -357,18 +329,18 @@ spec: command: - sh - -c - - "kafka-console-consumer --bootstrap-server ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic '${defaultTopic}' --group multiTopic --from-beginning --consumer-property enable.auto.commit=false" + - "kafka-console-consumer --bootstrap-server ${bootstrapServer} --topic '${defaultTopic}' --group multiTopic --from-beginning --consumer-property enable.auto.commit=false" - name: kafka-consumer-2 image: confluentinc/cp-kafka:5.2.1 command: - sh - -c - - "kafka-console-consumer --bootstrap-server ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic '${defaultTopic2}' --group multiTopic --from-beginning --consumer-property enable.auto.commit=false"` + - "kafka-console-consumer --bootstrap-server ${bootstrapServer} --topic '${defaultTopic2}' --group multiTopic --from-beginning --consumer-property enable.auto.commit=false"` -const scaledObjectEarliestYaml = `apiVersion: keda.sh/v1alpha1 +const scaledObjectTemplateYaml = `apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: - name: kafka-consumer-earliest + name: kafka-consumer-GROUP namespace: ${defaultNamespace} spec: scaleTargetRef: @@ -377,15 +349,18 @@ spec: - type: kafka metadata: topic: ${defaultTopic} - bootstrapServers: ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 - consumerGroup: earliest + bootstrapServers: ${bootstrapServer} + consumerGroup: GROUP lagThreshold: '1' - offsetResetPolicy: 'earliest'` + offsetResetPolicy: 'GROUP'` -const scaledObjectLatestYaml = `apiVersion: keda.sh/v1alpha1 +const scaledObjectEarliestYaml = scaledObjectTemplateYaml.replace(/GROUP/g, 'earliest') +const scaledObjectLatestYaml = scaledObjectTemplateYaml.replace(/GROUP/g, 'latest') + +const scaledObjectMultipleTopicsYaml = `apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: - name: kafka-consumer-latest + name: kafka-consumer-multi-topic namespace: ${defaultNamespace} spec: scaleTargetRef: @@ -393,16 +368,15 @@ spec: triggers: - type: kafka metadata: - topic: ${defaultTopic} - bootstrapServers: ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 - consumerGroup: latest + bootstrapServers: ${bootstrapServer} + consumerGroup: multiTopic lagThreshold: '1' offsetResetPolicy: 'latest'` -const scaledObjectMultipleTopicsYaml = `apiVersion: keda.sh/v1alpha1 +const scaledObjectInvalidOffsetTemplateYaml = `apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: - name: kafka-consumer-multi-topic + name: kafka-consumer-on-invalid namespace: ${defaultNamespace} spec: scaleTargetRef: @@ -410,7 +384,12 @@ spec: triggers: - type: kafka metadata: - bootstrapServers: ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 - consumerGroup: multiTopic + topic: TOPIC + bootstrapServers: ${bootstrapServer} + consumerGroup: ${invalidOffsetGroup} lagThreshold: '1' + scaleToZeroOnInvalidOffset: 'VALUE' offsetResetPolicy: 'latest'` + +const scaledObjectZeroOnInvalidOffsetYaml = scaledObjectInvalidOffsetTemplateYaml.replace(/TOPIC/g, zeroInvalidOffsetTopic).replace(/VALUE/g, 'true') +const scaledObjectOneOnInvalidOffsetYaml = scaledObjectInvalidOffsetTemplateYaml.replace(/TOPIC/g, oneInvalidOffsetTopic).replace(/VALUE/g, 'false') From 005859fcc9e87e8e5726f9a4566e4451a09e2b65 Mon Sep 17 00:00:00 2001 From: Ram Cohen Date: Wed, 30 Mar 2022 13:19:45 +0300 Subject: [PATCH 7/9] Extract auth metadata parsing to separate method Signed-off-by: Ram Cohen --- pkg/scalers/kafka_scaler.go | 90 +++++++++++++++++--------------- pkg/scalers/kafka_scaler_test.go | 11 ++++ 2 files changed, 60 insertions(+), 41 deletions(-) diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index c9ebd4e3a18..6eb5f60085f 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -105,6 +105,53 @@ func NewKafkaScaler(config *ScalerConfig) (Scaler, error) { }, nil } +func parseKafkaAuthParams(config *ScalerConfig, meta *kafkaMetadata) error { + meta.saslType = KafkaSASLTypeNone + if val, ok := config.AuthParams["sasl"]; ok { + val = strings.TrimSpace(val) + mode := kafkaSaslType(val) + + if mode == KafkaSASLTypePlaintext || mode == KafkaSASLTypeSCRAMSHA256 || mode == KafkaSASLTypeSCRAMSHA512 { + if config.AuthParams["username"] == "" { + return errors.New("no username given") + } + meta.username = strings.TrimSpace(config.AuthParams["username"]) + + if config.AuthParams["password"] == "" { + return errors.New("no password given") + } + meta.password = strings.TrimSpace(config.AuthParams["password"]) + meta.saslType = mode + } else { + return fmt.Errorf("err SASL mode %s given", mode) + } + } + + meta.enableTLS = false + if val, ok := config.AuthParams["tls"]; ok { + val = strings.TrimSpace(val) + + if val == "enable" { + certGiven := config.AuthParams["cert"] != "" + keyGiven := config.AuthParams["key"] != "" + if certGiven && !keyGiven { + return errors.New("key must be provided with cert") + } + if keyGiven && !certGiven { + return errors.New("cert must be provided with key") + } + meta.ca = config.AuthParams["ca"] + meta.cert = config.AuthParams["cert"] + meta.key = config.AuthParams["key"] + meta.enableTLS = true + } else if val != "disable" { + return fmt.Errorf("err incorrect value for TLS given: %s", val) + } + } + + return nil +} + func parseKafkaMetadata(config *ScalerConfig) (kafkaMetadata, error) { meta := kafkaMetadata{} switch { @@ -156,47 +203,8 @@ func parseKafkaMetadata(config *ScalerConfig) (kafkaMetadata, error) { meta.lagThreshold = t } - meta.saslType = KafkaSASLTypeNone - if val, ok := config.AuthParams["sasl"]; ok { - val = strings.TrimSpace(val) - mode := kafkaSaslType(val) - - if mode == KafkaSASLTypePlaintext || mode == KafkaSASLTypeSCRAMSHA256 || mode == KafkaSASLTypeSCRAMSHA512 { - if config.AuthParams["username"] == "" { - return meta, errors.New("no username given") - } - meta.username = strings.TrimSpace(config.AuthParams["username"]) - - if config.AuthParams["password"] == "" { - return meta, errors.New("no password given") - } - meta.password = strings.TrimSpace(config.AuthParams["password"]) - meta.saslType = mode - } else { - return meta, fmt.Errorf("err SASL mode %s given", mode) - } - } - - meta.enableTLS = false - if val, ok := config.AuthParams["tls"]; ok { - val = strings.TrimSpace(val) - - if val == "enable" { - certGiven := config.AuthParams["cert"] != "" - keyGiven := config.AuthParams["key"] != "" - if certGiven && !keyGiven { - return meta, errors.New("key must be provided with cert") - } - if keyGiven && !certGiven { - return meta, errors.New("cert must be provided with key") - } - meta.ca = config.AuthParams["ca"] - meta.cert = config.AuthParams["cert"] - meta.key = config.AuthParams["key"] - meta.enableTLS = true - } else if val != "disable" { - return meta, fmt.Errorf("err incorrect value for TLS given: %s", val) - } + if err := parseKafkaAuthParams(config, &meta); err != nil { + return meta, err } meta.allowIdleConsumers = false diff --git a/pkg/scalers/kafka_scaler_test.go b/pkg/scalers/kafka_scaler_test.go index 7dfcae20ebe..166e37d628f 100644 --- a/pkg/scalers/kafka_scaler_test.go +++ b/pkg/scalers/kafka_scaler_test.go @@ -191,6 +191,17 @@ func TestKafkaAuthParams(t *testing.T) { if meta.enableTLS != testData.enableTLS { t.Errorf("Expected enableTLS to be set to %v but got %v\n", testData.enableTLS, meta.enableTLS) } + if meta.enableTLS { + if meta.ca != testData.authParams["ca"] { + t.Errorf("Expected ca to be set to %v but got %v\n", testData.authParams["ca"], meta.enableTLS) + } + if meta.cert != testData.authParams["cert"] { + t.Errorf("Expected cert to be set to %v but got %v\n", testData.authParams["cert"], meta.cert) + } + if meta.key != testData.authParams["key"] { + t.Errorf("Expected key to be set to %v but got %v\n", testData.authParams["key"], meta.key) + } + } } } From d49de59c1922b7aae0aacde8728abfc46df3f357 Mon Sep 17 00:00:00 2001 From: Ram Cohen Date: Tue, 5 Apr 2022 08:57:43 +0300 Subject: [PATCH 8/9] Dont return an error on invalid offset Signed-off-by: Ram Cohen --- pkg/scalers/kafka_scaler.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index 6eb5f60085f..170d7eed425 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -374,11 +374,11 @@ func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offset if s.metadata.scaleToZeroOnInvalidOffset { retVal = 0 } - errMsg := fmt.Errorf( + msg := fmt.Sprintf( "invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet. Returning with lag of %d", topic, s.metadata.group, partitionID, retVal) - kafkaLog.V(0).Info(errMsg.Error()) - return retVal, errMsg + kafkaLog.V(0).Info(msg) + return retVal, nil } if _, found := topicPartitionOffsets[topic]; !found { From e9b8e012ad5c00ec03474f5a863fdbace1b91438 Mon Sep 17 00:00:00 2001 From: Ram Cohen Date: Tue, 5 Apr 2022 14:56:27 +0300 Subject: [PATCH 9/9] Changelog Signed-off-by: Ram Cohen --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f61d750aed..b06c86dda1c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -54,6 +54,7 @@ - **Datadog Scaler:** Several improvements, including a new optional parameter `metricUnavailableValue` to fill data when no Datadog metric was returned ([#2657](https://github.com/kedacore/keda/issues/2657)) - **Datadog Scaler:** Rely on Datadog API to validate the query ([2761](https://github.com/kedacore/keda/issues/2761)) - **Kafka Scaler:** Make "disable" a valid value for tls auth parameter ([#2608](https://github.com/kedacore/keda/issues/2608)) +- **Kafka Scaler:** New `scaleToZeroOnInvalidOffset` to control behavior when partitions have an invalid offset ([#2033](https://github.com/kedacore/keda/issues/2033)[#2612](https://github.com/kedacore/keda/issues/2612)) - **Metric API Scaler:** Improve error handling on not-ok response ([#2317](https://github.com/kedacore/keda/issues/2317)) - **Prometheus Scaler:** Check and properly inform user that `threshold` is not set ([#2793](https://github.com/kedacore/keda/issues/2793)) - **Prometheus Scaler:** Support for `X-Scope-OrgID` header ([#2667](https://github.com/kedacore/keda/issues/2667))