diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index aab437ce3d3..ab95aefa915 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -109,11 +109,11 @@ func NewKafkaScaler(config *ScalerConfig) (Scaler, error) { previousOffsets := make(map[string]map[int32]int64) return &kafkaScaler{ - client: client, - admin: admin, - metricType: metricType, - metadata: kafkaMetadata, - logger: logger, + client: client, + admin: admin, + metricType: metricType, + metadata: kafkaMetadata, + logger: logger, previousOffsets: previousOffsets, }, nil } @@ -291,6 +291,7 @@ func parseKafkaMetadata(config *ScalerConfig, logger logr.Logger) (kafkaMetadata } // IsActive determines if we need to scale from zero +// When replicas is zero, all lag will be deemed as persistent, hence use totalLagWithPersistent to determine scaling. func (s *kafkaScaler) IsActive(ctx context.Context) (bool, error) { _, totalLagWithPersistent, err := s.getTotalLag() if err != nil { @@ -413,7 +414,11 @@ func (s *kafkaScaler) getConsumerOffsets(topicPartitions map[string][]int32) (*s return offsets, nil } -func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offsets *sarama.OffsetFetchResponse, topicPartitionOffsets map[string]map[int32]int64) (int64, int64, error) { +// getLagForPartition returns (lag, lagWithPersistent, error) +// When excludePersistentLag is set to `false` (default), lag will always be equal to lagWithPersistent +// When excludePersistentLag is set to `true`, if partition is deemed to have persistent lag, lag will be set to 0 and lagWithPersistent will be latestOffset - consumerOffset +// These return values will allow proper scaling from 0 -> 1 replicas by the IsActive func. +func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offsets *sarama.OffsetFetchResponse, topicPartitionOffsets map[string]map[int32]int64) (int64, int64, error) { block := offsets.GetBlock(topic, partitionID) if block == nil { errMsg := fmt.Errorf("error finding offset block for topic %s and partition %d", topic, partitionID) @@ -460,7 +465,7 @@ func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offset case previousOffset == consumerOffset: // Indicates consumer is still on the same offset as the previous polling cycle, there may be some issue with consuming this offset. // return 0, so this consumer lag is not considered for scaling - return 0, latestOffset - consumerOffset,nil + return 0, latestOffset - consumerOffset, nil default: // Successfully Consumed some messages, proceed to change the previous offset s.previousOffsets[topic][partitionID] = consumerOffset @@ -537,7 +542,7 @@ func (s *kafkaScaler) getConsumerAndProducerOffsets(topicPartitions map[string][ // GetMetrics returns value for a supported metric and an error if there is a problem getting the metric func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, error) { - totalLag, _, err := s.getTotalLag() + totalLag, _, err := s.getTotalLag() if err != nil { return []external_metrics.ExternalMetricValue{}, err } @@ -546,6 +551,9 @@ func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string) ([]exte return append([]external_metrics.ExternalMetricValue{}, metric), nil } +// getTotalLag returns totalLag, totalLagWithPersistent, error +// totalLag and totalLagWithPersistent are the summations of lag and lagWithPersistent returned by getLagForPartition function respectively. +// totalLag maybe less than totalLagWithPersistent when excludePersistentLag is set to `true` due to some partitions deemed as having persistent lag func (s *kafkaScaler) getTotalLag() (int64, int64, error) { topicPartitions, err := s.getTopicPartitions() if err != nil { diff --git a/tests/helper/helper.go b/tests/helper/helper.go index 0e65c54456d..cd14303b9eb 100644 --- a/tests/helper/helper.go +++ b/tests/helper/helper.go @@ -551,7 +551,7 @@ func DeletePodsInNamespaceBySelector(t *testing.T, kc *kubernetes.Clientset, sel func WaitForPodsTerminated(t *testing.T, kc *kubernetes.Clientset, selector, namespace string, iterations, intervalSeconds int) bool { for i := 0; i < iterations; i++ { - pods, err := kc.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{LabelSelector: selector,}) + pods, err := kc.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{LabelSelector: selector}) if (err != nil && errors.IsNotFound(err)) || len(pods.Items) == 0 { t.Logf("No pods with label %s", selector) return true diff --git a/tests/scalers/kafka/kafka_test.go b/tests/scalers/kafka/kafka_test.go index ddd47261f8a..c361780b576 100644 --- a/tests/scalers/kafka/kafka_test.go +++ b/tests/scalers/kafka/kafka_test.go @@ -186,7 +186,7 @@ spec: lagThreshold: '1' scaleToZeroOnInvalidOffset: '{{.ScaleToZeroOnInvalid}}' offsetResetPolicy: 'latest'` - + persistentLagScaledObjectTemplate = ` apiVersion: keda.sh/v1alpha1 kind: ScaledObject @@ -494,16 +494,16 @@ func testPersistentLag(t *testing.T, kc *kubernetes.Clientset, data templateData data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", persistentLagTopic, persistentLagDeploymentGroup) KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) - + messages := 5 for i := 0; i < messages; i++ { publishMessage(t, persistentLagTopic) } - // Persistent Lag should not scale pod above minimum replicas after 2 reconcilation cycles + // Persistent Lag should not scale pod above minimum replicas after 2 reconciliation cycles assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2), "replica count should be %d after 2 minute", 1) - + // Shouldn't scale pods AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 1, 180)