Skip to content

Commit

Permalink
Return 1 or 0 instead of -1 when detecting an invalid offset in Kafka (
Browse files Browse the repository at this point in the history
…#2621)

Signed-off-by: Ram Cohen <[email protected]>
  • Loading branch information
RamCohen authored Apr 7, 2022
1 parent 8b972f9 commit d9172a7
Show file tree
Hide file tree
Showing 4 changed files with 264 additions and 327 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
- **Datadog Scaler:** Several improvements, including a new optional parameter `metricUnavailableValue` to fill data when no Datadog metric was returned ([#2657](https://github.com/kedacore/keda/issues/2657))
- **Datadog Scaler:** Rely on Datadog API to validate the query ([2761](https://github.com/kedacore/keda/issues/2761))
- **Kafka Scaler:** Make "disable" a valid value for tls auth parameter ([#2608](https://github.com/kedacore/keda/issues/2608))
- **Kafka Scaler:** New `scaleToZeroOnInvalidOffset` to control behavior when partitions have an invalid offset ([#2033](https://github.com/kedacore/keda/issues/2033)[#2612](https://github.com/kedacore/keda/issues/2612))
- **Metric API Scaler:** Improve error handling on not-ok response ([#2317](https://github.com/kedacore/keda/issues/2317))
- **Prometheus Scaler:** Check and properly inform user that `threshold` is not set ([#2793](https://github.com/kedacore/keda/issues/2793))
- **Prometheus Scaler:** Support for `X-Scope-OrgID` header ([#2667](https://github.com/kedacore/keda/issues/2667))
Expand Down
123 changes: 76 additions & 47 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ type kafkaMetadata struct {
allowIdleConsumers bool
version sarama.KafkaVersion

// If an invalid offset is found, whether to scale to 1 (false - the default) so consumption can
// occur or scale to 0 (true). See discussion in https://github.com/kedacore/keda/issues/2612
scaleToZeroOnInvalidOffset bool

// SASL
saslType kafkaSaslType
username string
Expand Down Expand Up @@ -101,6 +105,53 @@ func NewKafkaScaler(config *ScalerConfig) (Scaler, error) {
}, nil
}

func parseKafkaAuthParams(config *ScalerConfig, meta *kafkaMetadata) error {
meta.saslType = KafkaSASLTypeNone
if val, ok := config.AuthParams["sasl"]; ok {
val = strings.TrimSpace(val)
mode := kafkaSaslType(val)

if mode == KafkaSASLTypePlaintext || mode == KafkaSASLTypeSCRAMSHA256 || mode == KafkaSASLTypeSCRAMSHA512 {
if config.AuthParams["username"] == "" {
return errors.New("no username given")
}
meta.username = strings.TrimSpace(config.AuthParams["username"])

if config.AuthParams["password"] == "" {
return errors.New("no password given")
}
meta.password = strings.TrimSpace(config.AuthParams["password"])
meta.saslType = mode
} else {
return fmt.Errorf("err SASL mode %s given", mode)
}
}

meta.enableTLS = false
if val, ok := config.AuthParams["tls"]; ok {
val = strings.TrimSpace(val)

if val == "enable" {
certGiven := config.AuthParams["cert"] != ""
keyGiven := config.AuthParams["key"] != ""
if certGiven && !keyGiven {
return errors.New("key must be provided with cert")
}
if keyGiven && !certGiven {
return errors.New("cert must be provided with key")
}
meta.ca = config.AuthParams["ca"]
meta.cert = config.AuthParams["cert"]
meta.key = config.AuthParams["key"]
meta.enableTLS = true
} else if val != "disable" {
return fmt.Errorf("err incorrect value for TLS given: %s", val)
}
}

return nil
}

func parseKafkaMetadata(config *ScalerConfig) (kafkaMetadata, error) {
meta := kafkaMetadata{}
switch {
Expand Down Expand Up @@ -128,7 +179,7 @@ func parseKafkaMetadata(config *ScalerConfig) (kafkaMetadata, error) {
meta.topic = config.TriggerMetadata["topic"]
default:
meta.topic = ""
kafkaLog.V(1).Info(fmt.Sprintf("cosumer group %s has no topic specified, "+
kafkaLog.V(1).Info(fmt.Sprintf("consumer group %s has no topic specified, "+
"will use all topics subscribed by the consumer group for scaling", meta.group))
}

Expand All @@ -152,47 +203,8 @@ func parseKafkaMetadata(config *ScalerConfig) (kafkaMetadata, error) {
meta.lagThreshold = t
}

meta.saslType = KafkaSASLTypeNone
if val, ok := config.AuthParams["sasl"]; ok {
val = strings.TrimSpace(val)
mode := kafkaSaslType(val)

if mode == KafkaSASLTypePlaintext || mode == KafkaSASLTypeSCRAMSHA256 || mode == KafkaSASLTypeSCRAMSHA512 {
if config.AuthParams["username"] == "" {
return meta, errors.New("no username given")
}
meta.username = strings.TrimSpace(config.AuthParams["username"])

if config.AuthParams["password"] == "" {
return meta, errors.New("no password given")
}
meta.password = strings.TrimSpace(config.AuthParams["password"])
meta.saslType = mode
} else {
return meta, fmt.Errorf("err SASL mode %s given", mode)
}
}

meta.enableTLS = false
if val, ok := config.AuthParams["tls"]; ok {
val = strings.TrimSpace(val)

if val == "enable" {
certGiven := config.AuthParams["cert"] != ""
keyGiven := config.AuthParams["key"] != ""
if certGiven && !keyGiven {
return meta, errors.New("key must be provided with cert")
}
if keyGiven && !certGiven {
return meta, errors.New("cert must be provided with key")
}
meta.ca = config.AuthParams["ca"]
meta.cert = config.AuthParams["cert"]
meta.key = config.AuthParams["key"]
meta.enableTLS = true
} else if val != "disable" {
return meta, fmt.Errorf("err incorrect value for TLS given: %s", val)
}
if err := parseKafkaAuthParams(config, &meta); err != nil {
return meta, err
}

meta.allowIdleConsumers = false
Expand All @@ -204,6 +216,15 @@ func parseKafkaMetadata(config *ScalerConfig) (kafkaMetadata, error) {
meta.allowIdleConsumers = t
}

meta.scaleToZeroOnInvalidOffset = false
if val, ok := config.TriggerMetadata["scaleToZeroOnInvalidOffset"]; ok {
t, err := strconv.ParseBool(val)
if err != nil {
return meta, fmt.Errorf("error parsing scaleToZeroOnInvalidOffset: %s", err)
}
meta.scaleToZeroOnInvalidOffset = t
}

meta.version = sarama.V1_0_0_0
if val, ok := config.TriggerMetadata["version"]; ok {
val = strings.TrimSpace(val)
Expand Down Expand Up @@ -343,17 +364,25 @@ func (s *kafkaScaler) getConsumerOffsets(topicPartitions map[string][]int32) (*s
func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offsets *sarama.OffsetFetchResponse, topicPartitionOffsets map[string]map[int32]int64) (int64, error) {
block := offsets.GetBlock(topic, partitionID)
if block == nil {
kafkaLog.Error(fmt.Errorf("error finding offset block for topic %s and partition %d", topic, partitionID), "")
return 0, fmt.Errorf("error finding offset block for topic %s and partition %d", topic, partitionID)
errMsg := fmt.Errorf("error finding offset block for topic %s and partition %d", topic, partitionID)
kafkaLog.Error(errMsg, "")
return 0, errMsg
}
consumerOffset := block.Offset
if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == latest {
kafkaLog.V(0).Info(fmt.Sprintf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", topic, s.metadata.group, partitionID))
return invalidOffset, fmt.Errorf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", topic, s.metadata.group, partitionID)
retVal := int64(1)
if s.metadata.scaleToZeroOnInvalidOffset {
retVal = 0
}
msg := fmt.Sprintf(
"invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet. Returning with lag of %d",
topic, s.metadata.group, partitionID, retVal)
kafkaLog.V(0).Info(msg)
return retVal, nil
}

if _, found := topicPartitionOffsets[topic]; !found {
return 0, fmt.Errorf("error finding parition offset for topic %s", topic)
return 0, fmt.Errorf("error finding partition offset for topic %s", topic)
}
latestOffset := topicPartitionOffsets[topic][partitionID]
if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == earliest {
Expand Down
11 changes: 11 additions & 0 deletions pkg/scalers/kafka_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,17 @@ func TestKafkaAuthParams(t *testing.T) {
if meta.enableTLS != testData.enableTLS {
t.Errorf("Expected enableTLS to be set to %v but got %v\n", testData.enableTLS, meta.enableTLS)
}
if meta.enableTLS {
if meta.ca != testData.authParams["ca"] {
t.Errorf("Expected ca to be set to %v but got %v\n", testData.authParams["ca"], meta.enableTLS)
}
if meta.cert != testData.authParams["cert"] {
t.Errorf("Expected cert to be set to %v but got %v\n", testData.authParams["cert"], meta.cert)
}
if meta.key != testData.authParams["key"] {
t.Errorf("Expected key to be set to %v but got %v\n", testData.authParams["key"], meta.key)
}
}
}
}

Expand Down
Loading

0 comments on commit d9172a7

Please sign in to comment.