Skip to content

Commit 39b57ec

Browse files
authored
feat(kafka): Replay kafka from last commit before allowing ingesters to become ready (#14330)
1 parent 6de6420 commit 39b57ec

File tree

7 files changed

+386
-18
lines changed

7 files changed

+386
-18
lines changed

docs/sources/shared/configuration.md

+14
Original file line numberDiff line numberDiff line change
@@ -839,6 +839,20 @@ kafka_config:
839839
# CLI flag: -kafka.producer-max-buffered-bytes
840840
[producer_max_buffered_bytes: <int> | default = 1073741824]
841841

842+
# The best-effort maximum lag a consumer tries to achieve at startup. Set both
843+
# -kafka.target-consumer-lag-at-startup and -kafka.max-consumer-lag-at-startup
844+
# to 0 to disable waiting for maximum consumer lag being honored at startup.
845+
# CLI flag: -kafka.target-consumer-lag-at-startup
846+
[target_consumer_lag_at_startup: <duration> | default = 2s]
847+
848+
# The guaranteed maximum lag before a consumer is considered to have caught up
849+
# reading from a partition at startup, becomes ACTIVE in the hash ring and
850+
# passes the readiness check. Set both -kafka.target-consumer-lag-at-startup
851+
# and -kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum
852+
# consumer lag being honored at startup.
853+
# CLI flag: -kafka.max-consumer-lag-at-startup
854+
[max_consumer_lag_at_startup: <duration> | default = 15s]
855+
842856
# Configuration for 'runtime config' module, responsible for reloading runtime
843857
# configuration file.
844858
[runtime_config: <runtime_config>]

pkg/ingester/ingester.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ type Ingester struct {
300300
}
301301

302302
// New makes a new Ingester.
303-
func New(cfg Config, clientConfig client.Config, store Store, limits Limits, configs *runtime.TenantConfigs, registerer prometheus.Registerer, writeFailuresCfg writefailures.Cfg, metricsNamespace string, logger log.Logger, customStreamsTracker push.UsageTracker, readRing ring.ReadRing, partitionRingWatcher *ring.PartitionRingWatcher) (*Ingester, error) {
303+
func New(cfg Config, clientConfig client.Config, store Store, limits Limits, configs *runtime.TenantConfigs, registerer prometheus.Registerer, writeFailuresCfg writefailures.Cfg, metricsNamespace string, logger log.Logger, customStreamsTracker push.UsageTracker, readRing ring.ReadRing, partitionRingWatcher ring.PartitionRingReader) (*Ingester, error) {
304304
if cfg.ingesterClientFactory == nil {
305305
cfg.ingesterClientFactory = client.New
306306
}

pkg/ingester/kafka_consumer.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package ingester
33
import (
44
"context"
55
"errors"
6-
math "math"
6+
"math"
77
"sync"
88
"time"
99

pkg/kafka/config.go

+15-6
Original file line numberDiff line numberDiff line change
@@ -36,18 +36,14 @@ const (
3636
// in the worst case scenario, which is expected to be way above the actual one.
3737
maxProducerRecordDataBytesLimit = producerBatchMaxBytes - 16384
3838
minProducerRecordDataBytesLimit = 1024 * 1024
39-
40-
kafkaConfigFlagPrefix = "ingest-storage.kafka"
41-
targetConsumerLagAtStartupFlag = kafkaConfigFlagPrefix + ".target-consumer-lag-at-startup"
42-
maxConsumerLagAtStartupFlag = kafkaConfigFlagPrefix + ".max-consumer-lag-at-startup"
4339
)
4440

4541
var (
4642
ErrMissingKafkaAddress = errors.New("the Kafka address has not been configured")
4743
ErrMissingKafkaTopic = errors.New("the Kafka topic has not been configured")
44+
ErrInconsistentConsumerLagAtStartup = errors.New("the target and max consumer lag at startup must be either both set to 0 or to a value greater than 0")
45+
ErrInvalidMaxConsumerLagAtStartup = errors.New("the configured max consumer lag at startup must greater or equal than the configured target consumer lag")
4846
ErrInvalidProducerMaxRecordSizeBytes = fmt.Errorf("the configured producer max record size bytes must be a value between %d and %d", minProducerRecordDataBytesLimit, maxProducerRecordDataBytesLimit)
49-
50-
consumeFromPositionOptions = []string{consumeFromLastOffset, consumeFromStart, consumeFromEnd, consumeFromTimestamp}
5147
)
5248

5349
// Config holds the generic config for the Kafka backend.
@@ -68,6 +64,9 @@ type Config struct {
6864

6965
ProducerMaxRecordSizeBytes int `yaml:"producer_max_record_size_bytes"`
7066
ProducerMaxBufferedBytes int64 `yaml:"producer_max_buffered_bytes"`
67+
68+
TargetConsumerLagAtStartup time.Duration `yaml:"target_consumer_lag_at_startup"`
69+
MaxConsumerLagAtStartup time.Duration `yaml:"max_consumer_lag_at_startup"`
7170
}
7271

7372
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
@@ -91,6 +90,10 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
9190

9291
f.IntVar(&cfg.ProducerMaxRecordSizeBytes, prefix+".producer-max-record-size-bytes", maxProducerRecordDataBytesLimit, "The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes.")
9392
f.Int64Var(&cfg.ProducerMaxBufferedBytes, prefix+".producer-max-buffered-bytes", 1024*1024*1024, "The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit.")
93+
94+
consumerLagUsage := fmt.Sprintf("Set both -%s and -%s to 0 to disable waiting for maximum consumer lag being honored at startup.", prefix+".target-consumer-lag-at-startup", prefix+".max-consumer-lag-at-startup")
95+
f.DurationVar(&cfg.TargetConsumerLagAtStartup, prefix+".target-consumer-lag-at-startup", 2*time.Second, "The best-effort maximum lag a consumer tries to achieve at startup. "+consumerLagUsage)
96+
f.DurationVar(&cfg.MaxConsumerLagAtStartup, prefix+".max-consumer-lag-at-startup", 15*time.Second, "The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. "+consumerLagUsage)
9497
}
9598

9699
func (cfg *Config) Validate() error {
@@ -103,6 +106,12 @@ func (cfg *Config) Validate() error {
103106
if cfg.ProducerMaxRecordSizeBytes < minProducerRecordDataBytesLimit || cfg.ProducerMaxRecordSizeBytes > maxProducerRecordDataBytesLimit {
104107
return ErrInvalidProducerMaxRecordSizeBytes
105108
}
109+
if (cfg.TargetConsumerLagAtStartup == 0 && cfg.MaxConsumerLagAtStartup != 0) || (cfg.TargetConsumerLagAtStartup != 0 && cfg.MaxConsumerLagAtStartup == 0) {
110+
return ErrInconsistentConsumerLagAtStartup
111+
}
112+
if cfg.MaxConsumerLagAtStartup < cfg.TargetConsumerLagAtStartup {
113+
return ErrInvalidMaxConsumerLagAtStartup
114+
}
106115

107116
return nil
108117
}

0 commit comments

Comments
 (0)