diff --git a/CHANGELOG.md b/CHANGELOG.md index 37e23adfa48..c7b4f18c9b6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -61,6 +61,8 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio - **General:** Support disable keep http connection alive([#3874](https://github.com/kedacore/keda/issues/3874) - **General:** Improve the function used to normalize metric names ([#3789](https://github.com/kedacore/keda/issues/3789) - **Apache Kafka Scaler:** SASL/OAuthbearer Implementation ([#3681](https://github.com/kedacore/keda/issues/3681)) +- **Apache Kafka Scaler:** Limit Kafka Partitions KEDA operates on ([#3830](https://github.com/kedacore/keda/issues/3830)) +- **Apache Kafka Scaler:** Implementation for Excluding Persistent Lag ([#3904](https://github.com/kedacore/keda/issues/3904)) - **Azure AD Pod Identity Authentication:** Improve error messages to emphasize problems around the integration with aad-pod-identity itself ([#3610](https://github.com/kedacore/keda/issues/3610)) - **Azure Event Hub Scaler:** Support Azure Active Direcotry Pod & Workload Identity for Storage Blobs ([#3569](https://github.com/kedacore/keda/issues/3569)) - **Azure Event Hub Scaler:** Support using connection strings for Event Hub namespace instead of the Event Hub itself. ([#3922](https://github.com/kedacore/keda/issues/3922)) diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index 916995645bc..8ce8a159694 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -17,21 +17,24 @@ import ( ) type kafkaScaler struct { - metricType v2.MetricTargetType - metadata kafkaMetadata - client sarama.Client - admin sarama.ClusterAdmin - logger logr.Logger + metricType v2.MetricTargetType + metadata kafkaMetadata + client sarama.Client + admin sarama.ClusterAdmin + logger logr.Logger + previousOffsets map[string]map[int32]int64 } type kafkaMetadata struct { bootstrapServers []string group string topic string + partitionLimitation []int32 lagThreshold int64 activationLagThreshold int64 offsetResetPolicy offsetResetPolicy allowIdleConsumers bool + excludePersistentLag bool version sarama.KafkaVersion // If an invalid offset is found, whether to scale to 1 (false - the default) so consumption can @@ -104,12 +107,15 @@ func NewKafkaScaler(config *ScalerConfig) (Scaler, error) { return nil, err } + previousOffsets := make(map[string]map[int32]int64) + return &kafkaScaler{ - client: client, - admin: admin, - metricType: metricType, - metadata: kafkaMetadata, - logger: logger, + client: client, + admin: admin, + metricType: metricType, + metadata: kafkaMetadata, + logger: logger, + previousOffsets: previousOffsets, }, nil } @@ -205,6 +211,21 @@ func parseKafkaMetadata(config *ScalerConfig, logger logr.Logger) (kafkaMetadata "will use all topics subscribed by the consumer group for scaling", meta.group)) } + meta.partitionLimitation = nil + if config.TriggerMetadata["partitionLimitation"] != "" { + if meta.topic == "" { + logger.V(1).Info("no specific topic set, ignoring partitionLimitation setting") + } else { + pattern := config.TriggerMetadata["partitionLimitation"] + parsed, err := kedautil.ParseInt32List(pattern) + if err != nil { + return meta, fmt.Errorf("error parsing in partitionLimitation '%s': %s", pattern, err) + } + meta.partitionLimitation = parsed + logger.V(0).Info(fmt.Sprintf("partition limit active '%s'", pattern)) + } + } + meta.offsetResetPolicy = defaultOffsetResetPolicy if config.TriggerMetadata["offsetResetPolicy"] != "" { @@ -254,6 +275,15 @@ func parseKafkaMetadata(config *ScalerConfig, logger logr.Logger) (kafkaMetadata meta.allowIdleConsumers = t } + meta.excludePersistentLag = false + if val, ok := config.TriggerMetadata["excludePersistentLag"]; ok { + t, err := strconv.ParseBool(val) + if err != nil { + return meta, fmt.Errorf("error parsing excludePersistentLag: %s", err) + } + meta.excludePersistentLag = t + } + meta.scaleToZeroOnInvalidOffset = false if val, ok := config.TriggerMetadata["scaleToZeroOnInvalidOffset"]; ok { t, err := strconv.ParseBool(val) @@ -277,13 +307,14 @@ func parseKafkaMetadata(config *ScalerConfig, logger logr.Logger) (kafkaMetadata } // IsActive determines if we need to scale from zero +// When replicas is zero, all lag will be deemed as persistent, hence use totalLagWithPersistent to determine scaling. func (s *kafkaScaler) IsActive(ctx context.Context) (bool, error) { - totalLag, err := s.getTotalLag() + _, totalLagWithPersistent, err := s.getTotalLag() if err != nil { return false, err } - return totalLag > s.metadata.activationLagThreshold, nil + return totalLagWithPersistent > s.metadata.activationLagThreshold, nil } func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin, error) { @@ -378,15 +409,33 @@ func (s *kafkaScaler) getTopicPartitions() (map[string][]int32, error) { s.logger.Error(errMsg, "") } partitionMetadata := topicMetadata.Partitions - partitions := make([]int32, len(partitionMetadata)) - for i, p := range partitionMetadata { - partitions[i] = p.ID + var partitions []int32 + for _, p := range partitionMetadata { + if s.isActivePartition(p.ID) { + partitions = append(partitions, p.ID) + } } + if len(partitions) == 0 { + return nil, fmt.Errorf("expected at least one active partition within the topic '%s'", topicMetadata.Name) + } + topicPartitions[topicMetadata.Name] = partitions } return topicPartitions, nil } +func (s *kafkaScaler) isActivePartition(pID int32) bool { + if s.metadata.partitionLimitation == nil { + return true + } + for _, _pID := range s.metadata.partitionLimitation { + if pID == _pID { + return true + } + } + return false +} + func (s *kafkaScaler) getConsumerOffsets(topicPartitions map[string][]int32) (*sarama.OffsetFetchResponse, error) { offsets, err := s.admin.ListConsumerGroupOffsets(s.metadata.group, topicPartitions) if err != nil { @@ -399,12 +448,16 @@ func (s *kafkaScaler) getConsumerOffsets(topicPartitions map[string][]int32) (*s return offsets, nil } -func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offsets *sarama.OffsetFetchResponse, topicPartitionOffsets map[string]map[int32]int64) (int64, error) { +// getLagForPartition returns (lag, lagWithPersistent, error) +// When excludePersistentLag is set to `false` (default), lag will always be equal to lagWithPersistent +// When excludePersistentLag is set to `true`, if partition is deemed to have persistent lag, lag will be set to 0 and lagWithPersistent will be latestOffset - consumerOffset +// These return values will allow proper scaling from 0 -> 1 replicas by the IsActive func. +func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offsets *sarama.OffsetFetchResponse, topicPartitionOffsets map[string]map[int32]int64) (int64, int64, error) { block := offsets.GetBlock(topic, partitionID) if block == nil { errMsg := fmt.Errorf("error finding offset block for topic %s and partition %d", topic, partitionID) s.logger.Error(errMsg, "") - return 0, errMsg + return 0, 0, errMsg } if block.Err > 0 { errMsg := fmt.Errorf("error finding offset block for topic %s and partition %d: %s", topic, partitionID, offsets.Err.Error()) @@ -421,17 +474,39 @@ func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offset "invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet. Returning with lag of %d", topic, s.metadata.group, partitionID, retVal) s.logger.V(1).Info(msg) - return retVal, nil + return retVal, retVal, nil } if _, found := topicPartitionOffsets[topic]; !found { - return 0, fmt.Errorf("error finding partition offset for topic %s", topic) + return 0, 0, fmt.Errorf("error finding partition offset for topic %s", topic) } latestOffset := topicPartitionOffsets[topic][partitionID] if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == earliest { - return latestOffset, nil + return latestOffset, latestOffset, nil } - return latestOffset - consumerOffset, nil + + // This code block tries to prevent KEDA Kafka trigger from scaling the scale target based on erroneous events + if s.metadata.excludePersistentLag { + switch previousOffset, found := s.previousOffsets[topic][partitionID]; { + case !found: + // No record of previous offset, so store current consumer offset + // Allow this consumer lag to be considered in scaling + if _, topicFound := s.previousOffsets[topic]; !topicFound { + s.previousOffsets[topic] = map[int32]int64{partitionID: consumerOffset} + } else { + s.previousOffsets[topic][partitionID] = consumerOffset + } + case previousOffset == consumerOffset: + // Indicates consumer is still on the same offset as the previous polling cycle, there may be some issue with consuming this offset. + // return 0, so this consumer lag is not considered for scaling + return 0, latestOffset - consumerOffset, nil + default: + // Successfully Consumed some messages, proceed to change the previous offset + s.previousOffsets[topic][partitionID] = consumerOffset + } + } + + return latestOffset - consumerOffset, latestOffset - consumerOffset, nil } // Close closes the kafka admin and client @@ -501,7 +576,7 @@ func (s *kafkaScaler) getConsumerAndProducerOffsets(topicPartitions map[string][ // GetMetrics returns value for a supported metric and an error if there is a problem getting the metric func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, error) { - totalLag, err := s.getTotalLag() + totalLag, _, err := s.getTotalLag() if err != nil { return []external_metrics.ExternalMetricValue{}, err } @@ -510,24 +585,29 @@ func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string) ([]exte return append([]external_metrics.ExternalMetricValue{}, metric), nil } -func (s *kafkaScaler) getTotalLag() (int64, error) { +// getTotalLag returns totalLag, totalLagWithPersistent, error +// totalLag and totalLagWithPersistent are the summations of lag and lagWithPersistent returned by getLagForPartition function respectively. +// totalLag maybe less than totalLagWithPersistent when excludePersistentLag is set to `true` due to some partitions deemed as having persistent lag +func (s *kafkaScaler) getTotalLag() (int64, int64, error) { topicPartitions, err := s.getTopicPartitions() if err != nil { - return 0, err + return 0, 0, err } consumerOffsets, producerOffsets, err := s.getConsumerAndProducerOffsets(topicPartitions) if err != nil { - return 0, err + return 0, 0, err } totalLag := int64(0) + totalLagWithPersistent := int64(0) totalTopicPartitions := int64(0) for topic, partitionsOffsets := range producerOffsets { for partition := range partitionsOffsets { - lag, _ := s.getLagForPartition(topic, partition, consumerOffsets, producerOffsets) + lag, lagWithPersistent, _ := s.getLagForPartition(topic, partition, consumerOffsets, producerOffsets) totalLag += lag + totalLagWithPersistent += lagWithPersistent } totalTopicPartitions += (int64)(len(partitionsOffsets)) } @@ -539,7 +619,7 @@ func (s *kafkaScaler) getTotalLag() (int64, error) { totalLag = totalTopicPartitions * s.metadata.lagThreshold } } - return totalLag, nil + return totalLag, totalLagWithPersistent, nil } type brokerOffsetResult struct { diff --git a/pkg/scalers/kafka_scaler_test.go b/pkg/scalers/kafka_scaler_test.go index 054beda9d0c..071dcebc45b 100644 --- a/pkg/scalers/kafka_scaler_test.go +++ b/pkg/scalers/kafka_scaler_test.go @@ -6,18 +6,21 @@ import ( "strings" "testing" + "github.com/Shopify/sarama" "github.com/go-logr/logr" ) type parseKafkaMetadataTestData struct { - metadata map[string]string - isError bool - numBrokers int - brokers []string - group string - topic string - offsetResetPolicy offsetResetPolicy - allowIdleConsumers bool + metadata map[string]string + isError bool + numBrokers int + brokers []string + group string + topic string + partitionLimitation []int32 + offsetResetPolicy offsetResetPolicy + allowIdleConsumers bool + excludePersistentLag bool } type parseKafkaAuthParamsTestData struct { @@ -52,35 +55,49 @@ var validWithoutAuthParams = map[string]string{} var parseKafkaMetadataTestDataset = []parseKafkaMetadataTestData{ // failure, no bootstrapServers - {map[string]string{}, true, 0, nil, "", "", "", false}, + {map[string]string{}, true, 0, nil, "", "", nil, "", false, false}, // failure, no consumer group - {map[string]string{"bootstrapServers": "foobar:9092"}, true, 1, []string{"foobar:9092"}, "", "", "latest", false}, + {map[string]string{"bootstrapServers": "foobar:9092"}, true, 1, []string{"foobar:9092"}, "", "", nil, "latest", false, false}, // success, no topic - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, false, 1, []string{"foobar:9092"}, "my-group", "", offsetResetPolicy("latest"), false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, false, 1, []string{"foobar:9092"}, "my-group", "", nil, offsetResetPolicy("latest"), false, false}, + // success, ignore partitionLimitation if no topic + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": "1,2,3,4,5,6"}, false, 1, []string{"foobar:9092"}, "my-group", "", nil, offsetResetPolicy("latest"), false, false}, // failure, version not supported - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "version": "1.2.3.4"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "version": "1.2.3.4"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false}, // failure, lagThreshold is negative value - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "-1"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "-1"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false}, // failure, lagThreshold is 0 - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "0"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "0"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false}, // failure, activationLagThreshold is not int - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "10", "activationLagThreshold": "AA"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "10", "activationLagThreshold": "AA"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false}, // success - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false}, + // success, partitionLimitation as list + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "1,2,3,4"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", []int32{1, 2, 3, 4}, offsetResetPolicy("latest"), false, false}, + // success, partitionLimitation as range + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "1-4"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", []int32{1, 2, 3, 4}, offsetResetPolicy("latest"), false, false}, + // success, partitionLimitation mixed list + ranges + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "1-4,8,10-12"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", []int32{1, 2, 3, 4, 8, 10, 11, 12}, offsetResetPolicy("latest"), false, false}, + // failure, partitionLimitation wrong data type + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "a,b,c,d"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false}, // success, more brokers - {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false}, + {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false}, // success, offsetResetPolicy policy latest - {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "latest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false}, + {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "latest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false}, // failure, offsetResetPolicy policy wrong - {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "foo"}, true, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", "", false}, + {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "foo"}, true, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", nil, "", false, false}, // success, offsetResetPolicy policy earliest - {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "earliest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", offsetResetPolicy("earliest"), false}, + {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "earliest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("earliest"), false, false}, // failure, allowIdleConsumers malformed - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "notvalid"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "notvalid"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false}, // success, allowIdleConsumers is true - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), true}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), true, false}, + // failure, excludePersistentLag is malformed + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "excludePersistentLag": "notvalid"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false}, + // success, excludePersistentLag is true + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "excludePersistentLag": "true"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, true}, // success, version supported - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), true}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), true, false}, } var parseKafkaAuthParamsTestDataset = []parseKafkaAuthParamsTestData{ @@ -150,8 +167,8 @@ var parseKafkaOAuthbreakerAuthParamsTestDataset = []parseKafkaAuthParamsTestData } var kafkaMetricIdentifiers = []kafkaMetricIdentifier{ - {&parseKafkaMetadataTestDataset[7], 0, "s0-kafka-my-topic"}, - {&parseKafkaMetadataTestDataset[7], 1, "s1-kafka-my-topic"}, + {&parseKafkaMetadataTestDataset[8], 0, "s0-kafka-my-topic"}, + {&parseKafkaMetadataTestDataset[8], 1, "s1-kafka-my-topic"}, {&parseKafkaMetadataTestDataset[2], 1, "s1-kafka-my-group-topics"}, } @@ -177,6 +194,9 @@ func TestGetBrokers(t *testing.T) { if meta.topic != testData.topic { t.Errorf("Expected topic %s but got %s\n", testData.topic, meta.topic) } + if !reflect.DeepEqual(testData.partitionLimitation, meta.partitionLimitation) { + t.Errorf("Expected %v but got %v\n", testData.partitionLimitation, meta.partitionLimitation) + } if err == nil && meta.offsetResetPolicy != testData.offsetResetPolicy { t.Errorf("Expected offsetResetPolicy %s but got %s\n", testData.offsetResetPolicy, meta.offsetResetPolicy) } @@ -201,12 +221,18 @@ func TestGetBrokers(t *testing.T) { if meta.topic != testData.topic { t.Errorf("Expected topic %s but got %s\n", testData.topic, meta.topic) } + if !reflect.DeepEqual(testData.partitionLimitation, meta.partitionLimitation) { + t.Errorf("Expected %v but got %v\n", testData.partitionLimitation, meta.partitionLimitation) + } if err == nil && meta.offsetResetPolicy != testData.offsetResetPolicy { t.Errorf("Expected offsetResetPolicy %s but got %s\n", testData.offsetResetPolicy, meta.offsetResetPolicy) } if err == nil && meta.allowIdleConsumers != testData.allowIdleConsumers { t.Errorf("Expected allowIdleConsumers %t but got %t\n", testData.allowIdleConsumers, meta.allowIdleConsumers) } + if err == nil && meta.excludePersistentLag != testData.excludePersistentLag { + t.Errorf("Expected excludePersistentLag %t but got %t\n", testData.excludePersistentLag, meta.excludePersistentLag) + } } } @@ -264,7 +290,7 @@ func TestKafkaGetMetricSpecForScaling(t *testing.T) { if err != nil { t.Fatal("Could not parse metadata:", err) } - mockKafkaScaler := kafkaScaler{"", meta, nil, nil, logr.Discard()} + mockKafkaScaler := kafkaScaler{"", meta, nil, nil, logr.Discard(), make(map[string]map[int32]int64)} metricSpec := mockKafkaScaler.GetMetricSpecForScaling(context.Background()) metricName := metricSpec[0].External.Metric.Name @@ -273,3 +299,168 @@ func TestKafkaGetMetricSpecForScaling(t *testing.T) { } } } + +func TestGetTopicPartitions(t *testing.T) { + testData := []struct { + name string + metadata map[string]string + partitionIds []int32 + exp map[string][]int32 + }{ + {"success_all_partitions", map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "1,2"}, []int32{1, 2}, map[string][]int32{"my-topic": {1, 2}}}, + {"success_partial_partitions", map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "1,2,3"}, []int32{1, 2, 3, 4, 5, 6}, map[string][]int32{"my-topic": {1, 2, 3}}}, + } + + for _, tt := range testData { + t.Run(tt.name, func(t *testing.T) { + meta, err := parseKafkaMetadata(&ScalerConfig{TriggerMetadata: tt.metadata, AuthParams: validWithAuthParams}, logr.Discard()) + if err != nil { + t.Fatal("Could not parse metadata:", err) + } + mockKafkaScaler := kafkaScaler{"", meta, nil, &MockClusterAdmin{partitionIds: tt.partitionIds}, logr.Discard(), make(map[string]map[int32]int64)} + + patitions, err := mockKafkaScaler.getTopicPartitions() + + if !reflect.DeepEqual(tt.exp, patitions) { + t.Errorf("Expected %v but got %v\n", tt.exp, patitions) + } + + if err != nil { + t.Error("Expected success but got error", err) + } + }) + } +} + +type MockClusterAdmin struct { + partitionIds []int32 +} + +func (m *MockClusterAdmin) CreateTopic(topic string, detail *sarama.TopicDetail, validateOnly bool) error { + return nil +} +func (m *MockClusterAdmin) ListTopics() (map[string]sarama.TopicDetail, error) { + return nil, nil +} + +func (m *MockClusterAdmin) DescribeTopics(topics []string) (metadata []*sarama.TopicMetadata, err error) { + metadatas := make([]*sarama.TopicMetadata, len(topics)) + + partitionMetadata := make([]*sarama.PartitionMetadata, len(m.partitionIds)) + for i, id := range m.partitionIds { + partitionMetadata[i] = &sarama.PartitionMetadata{ID: id} + } + + for i, name := range topics { + metadatas[i] = &sarama.TopicMetadata{Name: name, Partitions: partitionMetadata} + } + return metadatas, nil +} + +func (m *MockClusterAdmin) DeleteTopic(topic string) error { + return nil +} + +func (m *MockClusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error { + return nil +} + +func (m *MockClusterAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error { + return nil +} + +func (m *MockClusterAdmin) ListPartitionReassignments(topics string, partitions []int32) (topicStatus map[string]map[int32]*sarama.PartitionReplicaReassignmentsStatus, err error) { + return nil, nil +} + +func (m *MockClusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error { + return nil +} + +func (m *MockClusterAdmin) DescribeConfig(resource sarama.ConfigResource) ([]sarama.ConfigEntry, error) { + return nil, nil +} + +func (m *MockClusterAdmin) AlterConfig(resourceType sarama.ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error { + return nil +} + +func (m *MockClusterAdmin) IncrementalAlterConfig(resourceType sarama.ConfigResourceType, name string, entries map[string]sarama.IncrementalAlterConfigsEntry, validateOnly bool) error { + return nil +} + +func (m *MockClusterAdmin) CreateACL(resource sarama.Resource, acl sarama.Acl) error { + return nil +} + +func (m *MockClusterAdmin) CreateACLs([]*sarama.ResourceAcls) error { + return nil +} + +func (m *MockClusterAdmin) ListAcls(filter sarama.AclFilter) ([]sarama.ResourceAcls, error) { + return nil, nil +} + +func (m *MockClusterAdmin) DeleteACL(filter sarama.AclFilter, validateOnly bool) ([]sarama.MatchingAcl, error) { + return nil, nil +} + +func (m *MockClusterAdmin) ListConsumerGroups() (map[string]string, error) { + return nil, nil +} + +func (m *MockClusterAdmin) DescribeConsumerGroups(groups []string) ([]*sarama.GroupDescription, error) { + return nil, nil +} + +func (m *MockClusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*sarama.OffsetFetchResponse, error) { + return nil, nil +} + +func (m *MockClusterAdmin) DeleteConsumerGroupOffset(group string, topic string, partition int32) error { + return nil +} + +func (m *MockClusterAdmin) DeleteConsumerGroup(group string) error { + return nil +} + +func (m *MockClusterAdmin) DescribeCluster() (brokers []*sarama.Broker, controllerID int32, err error) { + return nil, 0, nil +} + +func (m *MockClusterAdmin) DescribeLogDirs(brokers []int32) (map[int32][]sarama.DescribeLogDirsResponseDirMetadata, error) { + return nil, nil +} + +func (m *MockClusterAdmin) DescribeUserScramCredentials(users []string) ([]*sarama.DescribeUserScramCredentialsResult, error) { + return nil, nil +} + +func (m *MockClusterAdmin) DeleteUserScramCredentials(delete []sarama.AlterUserScramCredentialsDelete) ([]*sarama.AlterUserScramCredentialsResult, error) { + return nil, nil +} + +func (m *MockClusterAdmin) UpsertUserScramCredentials(upsert []sarama.AlterUserScramCredentialsUpsert) ([]*sarama.AlterUserScramCredentialsResult, error) { + return nil, nil +} + +func (m *MockClusterAdmin) DescribeClientQuotas(components []sarama.QuotaFilterComponent, strict bool) ([]sarama.DescribeClientQuotasEntry, error) { + return nil, nil +} + +func (m *MockClusterAdmin) AlterClientQuotas(entity []sarama.QuotaEntityComponent, op sarama.ClientQuotasOp, validateOnly bool) error { + return nil +} + +func (m *MockClusterAdmin) Controller() (*sarama.Broker, error) { + return nil, nil +} + +func (m *MockClusterAdmin) RemoveMemberFromConsumerGroup(groupID string, groupInstanceIds []string) (*sarama.LeaveGroupResponse, error) { + return nil, nil +} + +func (m *MockClusterAdmin) Close() error { + return nil +} diff --git a/pkg/util/parse_string.go b/pkg/util/parse_string.go new file mode 100644 index 00000000000..f61a1c5ad76 --- /dev/null +++ b/pkg/util/parse_string.go @@ -0,0 +1,65 @@ +/* +Copyright 2021 The KEDA Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "fmt" + "strconv" + "strings" +) + +func ParseRange(from, to string) ([]int32, error) { + f, err := strconv.ParseInt(from, 10, 32) + if err != nil { + return nil, fmt.Errorf("parse error for '%s': %s", from, err) + } + t, err := strconv.ParseInt(to, 10, 32) + if err != nil { + return nil, fmt.Errorf("parse error for '%s': %s", to, err) + } + var parsed []int32 + for i := int32(f); i <= int32(t); i++ { + parsed = append(parsed, i) + } + return parsed, nil +} + +func ParseInt32List(pattern string) ([]int32, error) { + var parsed []int32 + terms := strings.Split(pattern, ",") + for _, term := range terms { + literals := strings.Split(term, "-") + switch { + case len(literals) == 1: + i, err := strconv.ParseInt(literals[0], 10, 32) + if err != nil { + return nil, fmt.Errorf("parse error: %s", err) + } + parsed = append(parsed, int32(i)) + case len(literals) == 2: + r, err := ParseRange(literals[0], literals[1]) + if err != nil { + return nil, fmt.Errorf("error in range: %s", err) + } + parsed = append(parsed, r...) + + default: + return nil, fmt.Errorf("error in range syntax, got '%s'", term) + } + } + return parsed, nil +} diff --git a/pkg/util/parse_string_test.go b/pkg/util/parse_string_test.go new file mode 100644 index 00000000000..647d92c5a09 --- /dev/null +++ b/pkg/util/parse_string_test.go @@ -0,0 +1,87 @@ +/* +Copyright 2021 The KEDA Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "reflect" + "testing" +) + +func TestParseRange(t *testing.T) { + testData := []struct { + name string + from string + to string + exp []int32 + isError bool + }{ + {"success", "3", "10", []int32{3, 4, 5, 6, 7, 8, 9, 10}, false}, + {"failure, from not an int", "a", "10", nil, true}, + {"failure, to not an int", "3", "a", nil, true}, + } + + for _, tt := range testData { + got, err := ParseRange(tt.from, tt.to) + + if err != nil && !tt.isError { + t.Errorf("Expected no error but got %s\n", err) + } + + if err == nil && tt.isError { + t.Errorf("Expected error but got %s\n", err) + } + + if !reflect.DeepEqual(tt.exp, got) { + t.Errorf("Expected %v but got %v\n", tt.exp, got) + } + } +} + +func TestParseint32List(t *testing.T) { + testData := []struct { + name string + pattern string + exp []int32 + isError bool + }{ + {"success_single", "100", []int32{100}, false}, + {"success_list", "1,2,3,4,5,6,10", []int32{1, 2, 3, 4, 5, 6, 10}, false}, + {"success_list, range, list", "1,2,4-10", []int32{1, 2, 4, 5, 6, 7, 8, 9, 10}, false}, + {"success_range", "1-10", []int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, false}, + {"failure_list", "a,2,3", nil, true}, + {"failure_range", "a-3", nil, true}, + {"failure_not_a_range", "a-3-", nil, true}, + } + + for _, tt := range testData { + t.Run(tt.name, func(t *testing.T) { + got, err := ParseInt32List(tt.pattern) + + if err != nil && !tt.isError { + t.Errorf("Expected no error but got %s\n", err) + } + + if err == nil && tt.isError { + t.Errorf("Expected error but got %s\n", err) + } + + if !reflect.DeepEqual(tt.exp, got) { + t.Errorf("Expected %v but got %v\n", tt.exp, got) + } + }) + } +} diff --git a/tests/helper/helper.go b/tests/helper/helper.go index 6b2079fbd23..1b25db557bf 100644 --- a/tests/helper/helper.go +++ b/tests/helper/helper.go @@ -550,3 +550,21 @@ func DeletePodsInNamespaceBySelector(t *testing.T, kc *kubernetes.Clientset, sel }) assert.NoErrorf(t, err, "cannot delete pods - %s", err) } + +// Wait for Pods identified by selector to complete termination +func WaitForPodsTerminated(t *testing.T, kc *kubernetes.Clientset, selector, namespace string, + iterations, intervalSeconds int) bool { + for i := 0; i < iterations; i++ { + pods, err := kc.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{LabelSelector: selector}) + if (err != nil && errors.IsNotFound(err)) || len(pods.Items) == 0 { + t.Logf("No pods with label %s", selector) + return true + } + + t.Logf("Waiting for pods with label %s to terminate", selector) + + time.Sleep(time.Duration(intervalSeconds) * time.Second) + } + + return false +} diff --git a/tests/scalers/kafka/kafka_test.go b/tests/scalers/kafka/kafka_test.go index d16598429b0..c361780b576 100644 --- a/tests/scalers/kafka/kafka_test.go +++ b/tests/scalers/kafka/kafka_test.go @@ -22,19 +22,22 @@ const ( ) var ( - testNamespace = fmt.Sprintf("%s-ns", testName) - deploymentName = fmt.Sprintf("%s-deployment", testName) - kafkaName = fmt.Sprintf("%s-kafka", testName) - kafkaClientName = fmt.Sprintf("%s-client", testName) - scaledObjectName = fmt.Sprintf("%s-so", testName) - bootstrapServer = fmt.Sprintf("%s-kafka-bootstrap.%s:9092", kafkaName, testNamespace) - strimziOperatorVersion = "0.30.0" - topic1 = "kafka-topic" - topic2 = "kafka-topic2" - zeroInvalidOffsetTopic = "kafka-topic-zero-invalid-offset" - oneInvalidOffsetTopic = "kafka-topic-one-invalid-offset" - invalidOffsetGroup = "invalidOffset" - topicPartitions = 3 + testNamespace = fmt.Sprintf("%s-ns", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + kafkaName = fmt.Sprintf("%s-kafka", testName) + kafkaClientName = fmt.Sprintf("%s-client", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + bootstrapServer = fmt.Sprintf("%s-kafka-bootstrap.%s:9092", kafkaName, testNamespace) + strimziOperatorVersion = "0.30.0" + topic1 = "kafka-topic" + topic2 = "kafka-topic2" + zeroInvalidOffsetTopic = "kafka-topic-zero-invalid-offset" + oneInvalidOffsetTopic = "kafka-topic-one-invalid-offset" + invalidOffsetGroup = "invalidOffset" + persistentLagTopic = "kafka-topic-persistent-lag" + persistentLagGroup = "persistentLag" + persistentLagDeploymentGroup = "persistentLagDeploymentGroup" + topicPartitions = 3 ) type templateData struct { @@ -53,6 +56,7 @@ type templateData struct { Params string Commit string ScaleToZeroOnInvalid string + ExcludePersistentLag string } const ( @@ -181,6 +185,42 @@ spec: consumerGroup: {{.ResetPolicy}} lagThreshold: '1' scaleToZeroOnInvalidOffset: '{{.ScaleToZeroOnInvalid}}' + offsetResetPolicy: 'latest'` + + persistentLagScaledObjectTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + advanced: + horizontalPodAutoscalerConfig: + behavior: + scaleUp: + stabilizationWindowSeconds: 60 + policies: + - type: Percent + value: 100 + periodSeconds: 15 + scaleDown: + stabilizationWindowSeconds: 60 + policies: + - type: Percent + value: 100 + periodSeconds: 15 + triggers: + - type: kafka + metadata: + topic: {{.TopicName}} + bootstrapServers: {{.BootstrapServer}} + consumerGroup: {{.ResetPolicy}} + lagThreshold: '1' + excludePersistentLag: '{{.ExcludePersistentLag}}' offsetResetPolicy: 'latest'` kafkaClusterTemplate = `apiVersion: kafka.strimzi.io/v1beta2 @@ -261,6 +301,7 @@ func TestScaler(t *testing.T) { addTopic(t, data, topic2, topicPartitions) addTopic(t, data, zeroInvalidOffsetTopic, 1) addTopic(t, data, oneInvalidOffsetTopic, 1) + addTopic(t, data, persistentLagTopic, topicPartitions) // test scaling testEarliestPolicy(t, kc, data) @@ -268,6 +309,7 @@ func TestScaler(t *testing.T) { testMultiTopic(t, kc, data) testZeroOnInvalidOffset(t, kc, data) testOneOnInvalidOffset(t, kc, data) + testPersistentLag(t, kc, data) // cleanup uninstallKafkaOperator(t) @@ -426,6 +468,49 @@ func commitPartition(t *testing.T, topic string, group string) { assert.NoErrorf(t, err, "cannot execute command - %s", err) } +func testPersistentLag(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing persistentLag: no scale out ---") + + // Simulate Consumption from topic by consumer group + // To avoid edge case where where scaling could be effectively disabled (Consumer never makes a commit) + data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", persistentLagTopic, persistentLagGroup) + data.Commit = StringTrue + data.TopicName = persistentLagTopic + data.ResetPolicy = persistentLagGroup + data.ExcludePersistentLag = StringTrue + KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) + KubectlApplyWithTemplate(t, data, "persistentLagScaledObjectTemplate", persistentLagScaledObjectTemplate) + + // Scale application with kafka messages in persistentLagTopic + publishMessage(t, persistentLagTopic) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2), + "replica count should be %d after 2 minute", 1) + // Recreate Deployment to delibrately assign different consumer group to deployment and scaled object + // This is to simulate inability to consume from topic + // Scaled Object remains unchanged + KubernetesScaleDeployment(t, kc, deploymentName, 0, testNamespace) + assert.True(t, WaitForPodsTerminated(t, kc, "app=kafka-consumer", testNamespace, 60, 2), + "pod should be terminated after %d minute", 2) + + data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", persistentLagTopic, persistentLagDeploymentGroup) + KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) + + messages := 5 + for i := 0; i < messages; i++ { + publishMessage(t, persistentLagTopic) + } + + // Persistent Lag should not scale pod above minimum replicas after 2 reconciliation cycles + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2), + "replica count should be %d after 2 minute", 1) + + // Shouldn't scale pods + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 1, 180) + + KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) + KubectlDeleteWithTemplate(t, data, "persistentLagScaledObjectTemplate", persistentLagScaledObjectTemplate) +} + func installKafkaOperator(t *testing.T) { _, err := ExecuteCommand("helm repo add strimzi https://strimzi.io/charts/") assert.NoErrorf(t, err, "cannot execute command - %s", err)