From bf84fc61568dcec2f653dd7c51ad32954c3bd606 Mon Sep 17 00:00:00 2001 From: Saketh <43525626+sappusaketh@users.noreply.github.com> Date: Sat, 9 Mar 2024 13:06:11 -0700 Subject: [PATCH] [kafka-producer] Support setting max message size (#5263) ## Which problem is this PR solving? - Exposes MaxMessageBytes config mentioned in https://github.com/jaegertracing/jaeger/issues/1335 ## Description of the changes - Adds support to set MaxMessageBytes config on producer - Also setting a default which matches with [sarama](https://github.com/IBM/sarama/blob/main/config.go#L177) default unclear if we need to set that default explicitly from jaeger producer initialization ## How was this change tested? - Tested on our development jaeger cluster. After seeing below error we made following changes and built a new Image and verified that my changes fixed the issue ``` {"level":"error","ts":1550003610.8029132,"caller":"kafka/writer.go:59","msg":"kafka server: Message was too large, server rejected it to avoid allocation error.","stacktrace":"[github.com/jaegertracing/jaeger/plugin/storage/kafka.NewSpanWriter.func2](http://github.com/jaegertracing/jaeger/plugin/storage/kafka.NewSpanWriter.func2)\n\t/home/travis/gopath/src/github.com/jaegertracing/jaeger/plugin/storage/kafka/writer.go:59"} ``` ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `yarn lint` and `yarn test` --------- Signed-off-by: Saketh kappala <43525626+sappusaketh@users.noreply.github.com> Signed-off-by: Saketh <43525626+sappusaketh@users.noreply.github.com> Co-authored-by: Yuri Shkuro --- pkg/kafka/producer/config.go | 2 ++ plugin/storage/kafka/options.go | 8 ++++++++ plugin/storage/kafka/options_test.go | 4 ++++ 3 files changed, 14 insertions(+) diff --git a/pkg/kafka/producer/config.go b/pkg/kafka/producer/config.go index f12b09dc2f8..6e9cfa53cd1 100644 --- a/pkg/kafka/producer/config.go +++ b/pkg/kafka/producer/config.go @@ -39,6 +39,7 @@ type Configuration struct { BatchSize int `mapstructure:"batch_size"` BatchMinMessages int `mapstructure:"batch_min_messages"` BatchMaxMessages int `mapstructure:"batch_max_messages"` + MaxMessageBytes int `mapstructure:"max_message_bytes"` auth.AuthenticationConfig `mapstructure:"authentication"` } @@ -53,6 +54,7 @@ func (c *Configuration) NewProducer(logger *zap.Logger) (sarama.AsyncProducer, e saramaConfig.Producer.Flush.Frequency = c.BatchLinger saramaConfig.Producer.Flush.Messages = c.BatchMinMessages saramaConfig.Producer.Flush.MaxMessages = c.BatchMaxMessages + saramaConfig.Producer.MaxMessageBytes = c.MaxMessageBytes if len(c.ProtocolVersion) > 0 { ver, err := sarama.ParseKafkaVersion(c.ProtocolVersion) if err != nil { diff --git a/plugin/storage/kafka/options.go b/plugin/storage/kafka/options.go index 02a45c638b2..635e7526d63 100644 --- a/plugin/storage/kafka/options.go +++ b/plugin/storage/kafka/options.go @@ -47,6 +47,7 @@ const ( suffixBatchSize = ".batch-size" suffixBatchMinMessages = ".batch-min-messages" suffixBatchMaxMessages = ".batch-max-messages" + suffixMaxMessageBytes = ".max-message-bytes" defaultBroker = "127.0.0.1:9092" defaultTopic = "jaeger-spans" @@ -58,6 +59,7 @@ const ( defaultBatchSize = 0 defaultBatchMinMessages = 0 defaultBatchMaxMessages = 0 + defaultMaxMessageBytes = 1000000 // https://github.com/IBM/sarama/blob/main/config.go#L177 ) var ( @@ -152,6 +154,11 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) { defaultBatchMaxMessages, "(experimental) Maximum number of message to batch before sending records to Kafka", ) + flagSet.Int( + configPrefix+suffixMaxMessageBytes, + defaultMaxMessageBytes, + "(experimental) The maximum permitted size of a message. Should be set equal to or smaller than the broker's `message.max.bytes`.", + ) flagSet.String( configPrefix+suffixBrokers, defaultBroker, @@ -207,6 +214,7 @@ func (opt *Options) InitFromViper(v *viper.Viper) { BatchSize: v.GetInt(configPrefix + suffixBatchSize), BatchMinMessages: v.GetInt(configPrefix + suffixBatchMinMessages), BatchMaxMessages: v.GetInt(configPrefix + suffixBatchMaxMessages), + MaxMessageBytes: v.GetInt(configPrefix + suffixMaxMessageBytes), } opt.Topic = v.GetString(configPrefix + suffixTopic) opt.Encoding = v.GetString(configPrefix + suffixEncoding) diff --git a/plugin/storage/kafka/options_test.go b/plugin/storage/kafka/options_test.go index c34bd70a82e..790cbab35ee 100644 --- a/plugin/storage/kafka/options_test.go +++ b/plugin/storage/kafka/options_test.go @@ -42,6 +42,7 @@ func TestOptionsWithFlags(t *testing.T) { "--kafka.producer.batch-size=128000", "--kafka.producer.batch-min-messages=50", "--kafka.producer.batch-max-messages=100", + "--kafka.producer.max-message-bytes=10485760", }) opts.InitFromViper(v) @@ -55,6 +56,8 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, time.Duration(1*time.Second), opts.Config.BatchLinger) assert.Equal(t, 50, opts.Config.BatchMinMessages) assert.Equal(t, 100, opts.Config.BatchMaxMessages) + assert.Equal(t, 100, opts.Config.BatchMaxMessages) + assert.Equal(t, 10485760, opts.Config.MaxMessageBytes) } func TestFlagDefaults(t *testing.T) { @@ -73,6 +76,7 @@ func TestFlagDefaults(t *testing.T) { assert.Equal(t, time.Duration(0*time.Second), opts.Config.BatchLinger) assert.Equal(t, 0, opts.Config.BatchMinMessages) assert.Equal(t, 0, opts.Config.BatchMaxMessages) + assert.Equal(t, defaultMaxMessageBytes, opts.Config.MaxMessageBytes) } func TestCompressionLevelDefaults(t *testing.T) {