Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apache Kafka Scaler: Implementation for Excluding Persistent Lag #3905

Closed
wants to merge 35 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
c0a56b8
add partitionLimitation parameter to limit partitions to operate on
tobiaskrause Nov 18, 2022
9935f3c
fixed typo
tobiaskrause Nov 18, 2022
9691387
add issue to CHANGELOG
tobiaskrause Nov 18, 2022
aff2ec9
Merge branch 'main' into kafka-scaler-partition-limit
tobiaskrause Nov 18, 2022
59dcdad
Disable response compression for k8s restAPI in client-go (#3866)
ckuduvalli Nov 21, 2022
34b0e61
fix: Remove unnecessary TLS Config (#3857)
michaeljmarshall Nov 22, 2022
ab1b663
feat(scaler): support cloudID and apiKey in elasticsearch scaler (#3786)
Nov 22, 2022
5f85176
Add simple Grafana dashboard (#3902)
darox Nov 24, 2022
dad43c6
Do not keep alive http connection (#3878)
penghuazhou Nov 28, 2022
5d4bedb
bump k8s deps to v0.25.4 & bump other deps (#3914)
zroubalik Nov 28, 2022
483d25f
chore: add GCP_RUN_IDENTITY_TESTS in e2e pr (#3915)
JorTurFer Nov 28, 2022
351cb37
Metrics Server: use vendored OpenAPI definitions (#3928)
olivierlemasle Nov 29, 2022
546e6db
feat: Support using connection strings for Event Hub namespace instea…
v-shenoy Nov 29, 2022
18f3e7f
Metrics Server: use gRPC connection to get metrics from Operator (#3861)
zroubalik Nov 29, 2022
bc5b782
chore: add `stale-bot-ignore` label to stale bot ignores (#3936)
JorTurFer Nov 30, 2022
2dd408f
grpc client: wait properly for establishing a connection (#3938)
zroubalik Nov 30, 2022
16fbbe5
feat: add chaos e2e test (#3935)
JorTurFer Dec 1, 2022
5901daa
chore: bump deps (#3944)
JorTurFer Dec 1, 2022
13c273b
NewRelic scaler crashes on logging (#3946)
lkishalmi Dec 2, 2022
9c87cff
Added panel regarding ammount of HPA (#3934)
yuvalweber Dec 4, 2022
aa2666c
Metrics Server: use correct k8s config in Manager (#3955)
zroubalik Dec 5, 2022
25901b4
Kafka: Increase logging V-level (#3953)
dkv Dec 5, 2022
c925806
docs(changelog): Provide off-the-shelf dashboard (#3951)
tomkerkhove Dec 5, 2022
249682d
Add support for JetStream scaler to query the stream consumer leader …
rayjanoka Dec 5, 2022
40a2be6
fix(azure event hub): wrong calc of the remaining items (#3912)
yodobrin Dec 5, 2022
a0e74a0
Fix stackdriver client returning 0 for metric types of double (#3788)
octothorped Dec 5, 2022
7b1e307
Split CodeQL into a specific static analysers workflow (#3957)
JorTurFer Dec 5, 2022
1528a3c
feat: add semgrep as CI tool (#3958)
JorTurFer Dec 5, 2022
e922460
feat: add e2e tests for GCP with workload identity (#3916)
JorTurFer Dec 6, 2022
370a523
fix: remove vendor folder from semgrep scan and fix workflow (#3959)
JorTurFer Dec 6, 2022
0e52a6a
fix: add missing env var for gcp e2e and missing Semgrep stuff (#3960)
JorTurFer Dec 6, 2022
7627da5
Eventhub Scaler: add new dapr checkpoint strategy to eventhub scaler …
christle Dec 6, 2022
f82217c
Merge branch 'kedacore:main' into main
josephangbc Dec 6, 2022
9d40a47
Add Implementation for ExcludePersistentLag
josephangbc Dec 6, 2022
1256929
Merge branch 'main' of github.com:JosephABC/keda
josephangbc Dec 6, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **General:** Support disable keep http connection alive([#3874](https://github.com/kedacore/keda/issues/3874)
- **General:** Improve the function used to normalize metric names ([#3789](https://github.com/kedacore/keda/issues/3789)
- **Apache Kafka Scaler:** SASL/OAuthbearer Implementation ([#3681](https://github.com/kedacore/keda/issues/3681))
- **Apache Kafka Scaler:** Limit Kafka Partitions KEDA operates on ([#3830](https://github.com/kedacore/keda/issues/3830))
- **Apache Kafka Scaler:** Implementation for Excluding Persistent Lag ([#3904](https://github.com/kedacore/keda/issues/3904))
- **Azure AD Pod Identity Authentication:** Improve error messages to emphasize problems around the integration with aad-pod-identity itself ([#3610](https://github.com/kedacore/keda/issues/3610))
- **Azure Event Hub Scaler:** Support Azure Active Direcotry Pod & Workload Identity for Storage Blobs ([#3569](https://github.com/kedacore/keda/issues/3569))
- **Azure Event Hub Scaler:** Support using connection strings for Event Hub namespace instead of the Event Hub itself. ([#3922](https://github.com/kedacore/keda/issues/3922))
Expand Down
134 changes: 107 additions & 27 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,24 @@ import (
)

type kafkaScaler struct {
metricType v2.MetricTargetType
metadata kafkaMetadata
client sarama.Client
admin sarama.ClusterAdmin
logger logr.Logger
metricType v2.MetricTargetType
metadata kafkaMetadata
client sarama.Client
admin sarama.ClusterAdmin
logger logr.Logger
previousOffsets map[string]map[int32]int64
}

type kafkaMetadata struct {
bootstrapServers []string
group string
topic string
partitionLimitation []int32
lagThreshold int64
activationLagThreshold int64
offsetResetPolicy offsetResetPolicy
allowIdleConsumers bool
excludePersistentLag bool
version sarama.KafkaVersion

// If an invalid offset is found, whether to scale to 1 (false - the default) so consumption can
Expand Down Expand Up @@ -104,12 +107,15 @@ func NewKafkaScaler(config *ScalerConfig) (Scaler, error) {
return nil, err
}

previousOffsets := make(map[string]map[int32]int64)

return &kafkaScaler{
client: client,
admin: admin,
metricType: metricType,
metadata: kafkaMetadata,
logger: logger,
client: client,
admin: admin,
metricType: metricType,
metadata: kafkaMetadata,
logger: logger,
previousOffsets: previousOffsets,
}, nil
}

Expand Down Expand Up @@ -205,6 +211,21 @@ func parseKafkaMetadata(config *ScalerConfig, logger logr.Logger) (kafkaMetadata
"will use all topics subscribed by the consumer group for scaling", meta.group))
}

meta.partitionLimitation = nil
if config.TriggerMetadata["partitionLimitation"] != "" {
if meta.topic == "" {
logger.V(1).Info("no specific topic set, ignoring partitionLimitation setting")
} else {
pattern := config.TriggerMetadata["partitionLimitation"]
parsed, err := kedautil.ParseInt32List(pattern)
if err != nil {
return meta, fmt.Errorf("error parsing in partitionLimitation '%s': %s", pattern, err)
}
meta.partitionLimitation = parsed
logger.V(0).Info(fmt.Sprintf("partition limit active '%s'", pattern))
}
}

meta.offsetResetPolicy = defaultOffsetResetPolicy

if config.TriggerMetadata["offsetResetPolicy"] != "" {
Expand Down Expand Up @@ -254,6 +275,15 @@ func parseKafkaMetadata(config *ScalerConfig, logger logr.Logger) (kafkaMetadata
meta.allowIdleConsumers = t
}

meta.excludePersistentLag = false
if val, ok := config.TriggerMetadata["excludePersistentLag"]; ok {
t, err := strconv.ParseBool(val)
if err != nil {
return meta, fmt.Errorf("error parsing excludePersistentLag: %s", err)
}
meta.excludePersistentLag = t
}

meta.scaleToZeroOnInvalidOffset = false
if val, ok := config.TriggerMetadata["scaleToZeroOnInvalidOffset"]; ok {
t, err := strconv.ParseBool(val)
Expand All @@ -277,13 +307,14 @@ func parseKafkaMetadata(config *ScalerConfig, logger logr.Logger) (kafkaMetadata
}

// IsActive determines if we need to scale from zero
// When replicas is zero, all lag will be deemed as persistent, hence use totalLagWithPersistent to determine scaling.
func (s *kafkaScaler) IsActive(ctx context.Context) (bool, error) {
totalLag, err := s.getTotalLag()
_, totalLagWithPersistent, err := s.getTotalLag()
if err != nil {
return false, err
}

return totalLag > s.metadata.activationLagThreshold, nil
return totalLagWithPersistent > s.metadata.activationLagThreshold, nil
}

func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin, error) {
Expand Down Expand Up @@ -378,15 +409,33 @@ func (s *kafkaScaler) getTopicPartitions() (map[string][]int32, error) {
s.logger.Error(errMsg, "")
}
partitionMetadata := topicMetadata.Partitions
partitions := make([]int32, len(partitionMetadata))
for i, p := range partitionMetadata {
partitions[i] = p.ID
var partitions []int32
for _, p := range partitionMetadata {
if s.isActivePartition(p.ID) {
partitions = append(partitions, p.ID)
}
}
if len(partitions) == 0 {
return nil, fmt.Errorf("expected at least one active partition within the topic '%s'", topicMetadata.Name)
}

topicPartitions[topicMetadata.Name] = partitions
}
return topicPartitions, nil
}

func (s *kafkaScaler) isActivePartition(pID int32) bool {
if s.metadata.partitionLimitation == nil {
return true
}
for _, _pID := range s.metadata.partitionLimitation {
if pID == _pID {
return true
}
}
return false
}

func (s *kafkaScaler) getConsumerOffsets(topicPartitions map[string][]int32) (*sarama.OffsetFetchResponse, error) {
offsets, err := s.admin.ListConsumerGroupOffsets(s.metadata.group, topicPartitions)
if err != nil {
Expand All @@ -399,12 +448,16 @@ func (s *kafkaScaler) getConsumerOffsets(topicPartitions map[string][]int32) (*s
return offsets, nil
}

func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offsets *sarama.OffsetFetchResponse, topicPartitionOffsets map[string]map[int32]int64) (int64, error) {
// getLagForPartition returns (lag, lagWithPersistent, error)
// When excludePersistentLag is set to `false` (default), lag will always be equal to lagWithPersistent
// When excludePersistentLag is set to `true`, if partition is deemed to have persistent lag, lag will be set to 0 and lagWithPersistent will be latestOffset - consumerOffset
// These return values will allow proper scaling from 0 -> 1 replicas by the IsActive func.
func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offsets *sarama.OffsetFetchResponse, topicPartitionOffsets map[string]map[int32]int64) (int64, int64, error) {
block := offsets.GetBlock(topic, partitionID)
if block == nil {
errMsg := fmt.Errorf("error finding offset block for topic %s and partition %d", topic, partitionID)
s.logger.Error(errMsg, "")
return 0, errMsg
return 0, 0, errMsg
}
if block.Err > 0 {
errMsg := fmt.Errorf("error finding offset block for topic %s and partition %d: %s", topic, partitionID, offsets.Err.Error())
Expand All @@ -421,17 +474,39 @@ func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offset
"invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet. Returning with lag of %d",
topic, s.metadata.group, partitionID, retVal)
s.logger.V(1).Info(msg)
return retVal, nil
return retVal, retVal, nil
}

if _, found := topicPartitionOffsets[topic]; !found {
return 0, fmt.Errorf("error finding partition offset for topic %s", topic)
return 0, 0, fmt.Errorf("error finding partition offset for topic %s", topic)
}
latestOffset := topicPartitionOffsets[topic][partitionID]
if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == earliest {
return latestOffset, nil
return latestOffset, latestOffset, nil
}
return latestOffset - consumerOffset, nil

// This code block tries to prevent KEDA Kafka trigger from scaling the scale target based on erroneous events
if s.metadata.excludePersistentLag {
switch previousOffset, found := s.previousOffsets[topic][partitionID]; {
case !found:
// No record of previous offset, so store current consumer offset
// Allow this consumer lag to be considered in scaling
if _, topicFound := s.previousOffsets[topic]; !topicFound {
s.previousOffsets[topic] = map[int32]int64{partitionID: consumerOffset}
} else {
s.previousOffsets[topic][partitionID] = consumerOffset
}
case previousOffset == consumerOffset:
// Indicates consumer is still on the same offset as the previous polling cycle, there may be some issue with consuming this offset.
// return 0, so this consumer lag is not considered for scaling
return 0, latestOffset - consumerOffset, nil
default:
// Successfully Consumed some messages, proceed to change the previous offset
s.previousOffsets[topic][partitionID] = consumerOffset
}
}

return latestOffset - consumerOffset, latestOffset - consumerOffset, nil
}

// Close closes the kafka admin and client
Expand Down Expand Up @@ -501,7 +576,7 @@ func (s *kafkaScaler) getConsumerAndProducerOffsets(topicPartitions map[string][

// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, error) {
totalLag, err := s.getTotalLag()
totalLag, _, err := s.getTotalLag()
if err != nil {
return []external_metrics.ExternalMetricValue{}, err
}
Expand All @@ -510,24 +585,29 @@ func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string) ([]exte
return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

func (s *kafkaScaler) getTotalLag() (int64, error) {
// getTotalLag returns totalLag, totalLagWithPersistent, error
// totalLag and totalLagWithPersistent are the summations of lag and lagWithPersistent returned by getLagForPartition function respectively.
// totalLag maybe less than totalLagWithPersistent when excludePersistentLag is set to `true` due to some partitions deemed as having persistent lag
func (s *kafkaScaler) getTotalLag() (int64, int64, error) {
topicPartitions, err := s.getTopicPartitions()
if err != nil {
return 0, err
return 0, 0, err
}

consumerOffsets, producerOffsets, err := s.getConsumerAndProducerOffsets(topicPartitions)
if err != nil {
return 0, err
return 0, 0, err
}

totalLag := int64(0)
totalLagWithPersistent := int64(0)
totalTopicPartitions := int64(0)

for topic, partitionsOffsets := range producerOffsets {
for partition := range partitionsOffsets {
lag, _ := s.getLagForPartition(topic, partition, consumerOffsets, producerOffsets)
lag, lagWithPersistent, _ := s.getLagForPartition(topic, partition, consumerOffsets, producerOffsets)
totalLag += lag
totalLagWithPersistent += lagWithPersistent
}
totalTopicPartitions += (int64)(len(partitionsOffsets))
}
Expand All @@ -539,7 +619,7 @@ func (s *kafkaScaler) getTotalLag() (int64, error) {
totalLag = totalTopicPartitions * s.metadata.lagThreshold
}
}
return totalLag, nil
return totalLag, totalLagWithPersistent, nil
}

type brokerOffsetResult struct {
Expand Down
Loading