Skip to content

Commit

Permalink
Add Descriptions for kafka scaler
Browse files Browse the repository at this point in the history
Signed-off-by: JosephABC <[email protected]>
  • Loading branch information
josephangbc committed Dec 5, 2022
1 parent 5a4cccb commit 881cb86
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 13 deletions.
24 changes: 16 additions & 8 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ func NewKafkaScaler(config *ScalerConfig) (Scaler, error) {
previousOffsets := make(map[string]map[int32]int64)

return &kafkaScaler{
client: client,
admin: admin,
metricType: metricType,
metadata: kafkaMetadata,
logger: logger,
client: client,
admin: admin,
metricType: metricType,
metadata: kafkaMetadata,
logger: logger,
previousOffsets: previousOffsets,
}, nil
}
Expand Down Expand Up @@ -291,6 +291,7 @@ func parseKafkaMetadata(config *ScalerConfig, logger logr.Logger) (kafkaMetadata
}

// IsActive determines if we need to scale from zero
// When replicas is zero, all lag will be deemed as persistent, hence use totalLagWithPersistent to determine scaling.
func (s *kafkaScaler) IsActive(ctx context.Context) (bool, error) {
_, totalLagWithPersistent, err := s.getTotalLag()
if err != nil {
Expand Down Expand Up @@ -413,7 +414,11 @@ func (s *kafkaScaler) getConsumerOffsets(topicPartitions map[string][]int32) (*s
return offsets, nil
}

func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offsets *sarama.OffsetFetchResponse, topicPartitionOffsets map[string]map[int32]int64) (int64, int64, error) {
// getLagForPartition returns (lag, lagWithPersistent, error)
// When excludePersistentLag is set to `false` (default), lag will always be equal to lagWithPersistent
// When excludePersistentLag is set to `true`, if partition is deemed to have persistent lag, lag will be set to 0 and lagWithPersistent will be latestOffset - consumerOffset
// These return values will allow proper scaling from 0 -> 1 replicas by the IsActive func.
func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offsets *sarama.OffsetFetchResponse, topicPartitionOffsets map[string]map[int32]int64) (int64, int64, error) {
block := offsets.GetBlock(topic, partitionID)
if block == nil {
errMsg := fmt.Errorf("error finding offset block for topic %s and partition %d", topic, partitionID)
Expand Down Expand Up @@ -460,7 +465,7 @@ func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offset
case previousOffset == consumerOffset:
// Indicates consumer is still on the same offset as the previous polling cycle, there may be some issue with consuming this offset.
// return 0, so this consumer lag is not considered for scaling
return 0, latestOffset - consumerOffset,nil
return 0, latestOffset - consumerOffset, nil
default:
// Successfully Consumed some messages, proceed to change the previous offset
s.previousOffsets[topic][partitionID] = consumerOffset
Expand Down Expand Up @@ -537,7 +542,7 @@ func (s *kafkaScaler) getConsumerAndProducerOffsets(topicPartitions map[string][

// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, error) {
totalLag, _, err := s.getTotalLag()
totalLag, _, err := s.getTotalLag()
if err != nil {
return []external_metrics.ExternalMetricValue{}, err
}
Expand All @@ -546,6 +551,9 @@ func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string) ([]exte
return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

// getTotalLag returns totalLag, totalLagWithPersistent, error
// totalLag and totalLagWithPersistent are the summations of lag and lagWithPersistent returned by getLagForPartition function respectively.
// totalLag maybe less than totalLagWithPersistent when excludePersistentLag is set to `true` due to some partitions deemed as having persistent lag
func (s *kafkaScaler) getTotalLag() (int64, int64, error) {
topicPartitions, err := s.getTopicPartitions()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion tests/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ func DeletePodsInNamespaceBySelector(t *testing.T, kc *kubernetes.Clientset, sel
func WaitForPodsTerminated(t *testing.T, kc *kubernetes.Clientset, selector, namespace string,
iterations, intervalSeconds int) bool {
for i := 0; i < iterations; i++ {
pods, err := kc.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{LabelSelector: selector,})
pods, err := kc.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{LabelSelector: selector})
if (err != nil && errors.IsNotFound(err)) || len(pods.Items) == 0 {
t.Logf("No pods with label %s", selector)
return true
Expand Down
8 changes: 4 additions & 4 deletions tests/scalers/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ spec:
lagThreshold: '1'
scaleToZeroOnInvalidOffset: '{{.ScaleToZeroOnInvalid}}'
offsetResetPolicy: 'latest'`

persistentLagScaledObjectTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
Expand Down Expand Up @@ -494,16 +494,16 @@ func testPersistentLag(t *testing.T, kc *kubernetes.Clientset, data templateData

data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", persistentLagTopic, persistentLagDeploymentGroup)
KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)

messages := 5
for i := 0; i < messages; i++ {
publishMessage(t, persistentLagTopic)
}

// Persistent Lag should not scale pod above minimum replicas after 2 reconcilation cycles
// Persistent Lag should not scale pod above minimum replicas after 2 reconciliation cycles
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2),
"replica count should be %d after 2 minute", 1)

// Shouldn't scale pods
AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 1, 180)

Expand Down

0 comments on commit 881cb86

Please sign in to comment.