From 4144da0eaea5c5f6a8e9662becb1c3e7159dad11 Mon Sep 17 00:00:00 2001 From: Piotr Limanowski Date: Fri, 11 Oct 2024 12:46:55 +0200 Subject: [PATCH] Set `max.request.size` for Kafka Previously, we did not configure this parameter for Kafka producer. However, setting `buffer.memory` (size of Kafka producer buffer before flushing), `linger.ms` (windowing period for batching requests) is not enough. Now, we are adding `max.request.size` that will add another threshold for when messages are flushed based upon request size. --- .../sinks/KafkaSink.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala index 0917bbc4b..662b12f6c 100644 --- a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala +++ b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala @@ -77,6 +77,7 @@ object KafkaSink { "retries" -> kafkaConfig.retries.toString, "buffer.memory" -> bufferConfig.byteLimit.toString, "linger.ms" -> bufferConfig.timeLimit.toString, + "max.request.size" -> kafkaConfig.maxBytes.toString, "key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer", "value.serializer" -> "org.apache.kafka.common.serialization.ByteArraySerializer", "sasl.login.callback.handler.class" -> authCallbackClass