Skip to content

Commit

Permalink
Expose Kafka commit interval
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierCazade committed Jun 23, 2022
1 parent e1d3744 commit 87fe2c4
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 0 deletions.
1 change: 1 addition & 0 deletions pkg/api/ingest_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ type IngestKafka struct {
BatchReadTimeout int64 `yaml:"batchReadTimeout,omitempty" json:"batchReadTimeout,omitempty" doc:"how often (in milliseconds) to process input"`
Decoder Decoder `yaml:"decoder,omitempty" json:"decoder" doc:"decoder to use (E.g. json or protobuf)"`
BatchMaxLen int `yaml:"batchMaxLen,omitempty" json:"batchMaxLen,omitempty" doc:"the number of accumulated flows before being forwarded for processing"`
CommitInterval int64 `yaml:"commitInterval,omitempty" json:"commitInterval,omitempty" doc:"the interval (in milliseconds) at which offsets are committed to the broker. If 0, commits will be handled synchronously."`
}
7 changes: 7 additions & 0 deletions pkg/pipeline/ingest/ingest_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type ingestKafka struct {
const channelSizeKafka = 1000
const defaultBatchReadTimeout = int64(1000)
const defaultKafkaBatchMaxLength = 500
const defaultKafkaCommitInterval = 500

// Ingest ingests entries from kafka topic
func (ingestK *ingestKafka) Ingest(out chan<- []config.GenericMap) {
Expand Down Expand Up @@ -162,12 +163,18 @@ func NewIngestKafka(params config.StageParam) (Ingester, error) {
}
log.Infof("BatchReadTimeout = %d", jsonIngestKafka.BatchReadTimeout)

commitInterval := int64(defaultKafkaCommitInterval)
if jsonIngestKafka.CommitInterval != 0 {
commitInterval = jsonIngestKafka.CommitInterval
}

kafkaReader := kafkago.NewReader(kafkago.ReaderConfig{
Brokers: jsonIngestKafka.Brokers,
Topic: jsonIngestKafka.Topic,
GroupID: jsonIngestKafka.GroupId,
GroupBalancers: groupBalancers,
StartOffset: startOffset,
CommitInterval: time.Duration(commitInterval) * time.Millisecond,
})
if kafkaReader == nil {
errMsg := "NewIngestKafka: failed to create kafka-go reader"
Expand Down

0 comments on commit 87fe2c4

Please sign in to comment.