From 38193983ac334158fdc641a0971a8f5ccadec50c Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Mon, 16 Sep 2024 17:08:55 +0200 Subject: [PATCH 1/6] Use topic partition queue splitting feature from rdkafka. This allows to split the main consumer queue into subqueues, such that we can spawn a subtask for each topic-partition tuple. This roughly gives us 8-10x improvement in throughput (on my machine). --- crates/ingress-kafka/src/consumer_task.rs | 96 ++++++++++++++++++----- 1 file changed, 78 insertions(+), 18 deletions(-) diff --git a/crates/ingress-kafka/src/consumer_task.rs b/crates/ingress-kafka/src/consumer_task.rs index 616922e4d7..d33be95318 100644 --- a/crates/ingress-kafka/src/consumer_task.rs +++ b/crates/ingress-kafka/src/consumer_task.rs @@ -8,19 +8,15 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::fmt; - use base64::Engine; use bytes::Bytes; use opentelemetry::trace::TraceContextExt; +use rdkafka::consumer::stream_consumer::StreamPartitionQueue; use rdkafka::consumer::{Consumer, DefaultConsumerContext, StreamConsumer}; use rdkafka::error::KafkaError; use rdkafka::message::BorrowedMessage; use rdkafka::{ClientConfig, Message}; -use tokio::sync::oneshot; -use tracing::{debug, info, info_span, Instrument}; -use tracing_opentelemetry::OpenTelemetrySpanExt; - +use restate_core::{cancellation_watcher, task_center, TaskId, TaskKind}; use restate_ingress_dispatcher::{ DeduplicationId, DispatchIngressRequest, IngressDispatcher, IngressDispatcherRequest, }; @@ -28,6 +24,13 @@ use restate_types::identifiers::SubscriptionId; use restate_types::invocation::{Header, SpanRelation}; use restate_types::message::MessageIndex; use restate_types::schema::subscriptions::{EventReceiverServiceType, Sink, Subscription}; +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::fmt; +use std::sync::Arc; +use tokio::sync::oneshot; +use tracing::{debug, info, info_span, Instrument}; +use tracing_opentelemetry::OpenTelemetrySpanExt; #[derive(Debug, thiserror::Error)] pub enum Error { @@ -45,6 +48,8 @@ pub enum Error { }, #[error("ingress dispatcher channel is closed")] IngressDispatcherClosed, + #[error("topic {0} partition {1} queue split didn't succeed")] + TopicPartitionSplit(String, i32), } type MessageConsumer = StreamConsumer; @@ -92,11 +97,7 @@ impl MessageSender { } } - async fn send( - &mut self, - consumer_group_id: &str, - msg: &BorrowedMessage<'_>, - ) -> Result<(), Error> { + async fn send(&self, consumer_group_id: &str, msg: BorrowedMessage<'_>) -> Result<(), Error> { // Prepare ingress span let ingress_span = info_span!( "kafka_ingress_consume", @@ -119,14 +120,14 @@ impl MessageSender { } else { Bytes::default() }; - let headers = Self::generate_events_attributes(msg, self.subscription.id()); + let headers = Self::generate_events_attributes(&msg, self.subscription.id()); let req = IngressDispatcherRequest::event( &self.subscription, key, payload, SpanRelation::Parent(ingress_span_context), - Some(Self::generate_deduplication_id(consumer_group_id, msg)), + Some(Self::generate_deduplication_id(consumer_group_id, &msg)), headers, ) .map_err(|cause| Error::Event { @@ -201,7 +202,7 @@ impl ConsumerTask { } } - pub async fn run(mut self, mut rx: oneshot::Receiver<()>) -> Result<(), Error> { + pub async fn run(self, mut rx: oneshot::Receiver<()>) -> Result<(), Error> { // Create the consumer and subscribe to the topic let consumer_group_id = self .client_config @@ -213,25 +214,84 @@ impl ConsumerTask { self.topics, self.client_config ); - let consumer: MessageConsumer = self.client_config.create()?; + let consumer: Arc = Arc::new(self.client_config.create()?); let topics: Vec<&str> = self.topics.iter().map(|x| &**x).collect(); consumer.subscribe(&topics)?; + let mut topic_partition_tasks: HashMap<(String, i32), TaskId> = Default::default(); + let tc = task_center(); + loop { tokio::select! { res = consumer.recv() => { let msg = res?; - self.sender.send(&consumer_group_id, &msg).await?; + let topic = msg.topic().to_owned(); + let partition = msg.partition(); + let offset = msg.offset(); + + // If we didn't split the queue, let's do it and start the topic partition consumer + if let Entry::Vacant(e) = topic_partition_tasks.entry((topic.clone(), partition)) { + let topic_partition_consumer = consumer + .split_partition_queue(&topic, partition) + .ok_or_else(|| Error::TopicPartitionSplit(topic.clone(), partition))?; + + let task = topic_partition_queue_consumption_loop( + self.sender.clone(), + topic.clone(), partition, + topic_partition_consumer, + Arc::clone(&consumer), + consumer_group_id.clone() + ); + + if let Ok(task_id) = tc.spawn_child(TaskKind::Ingress, "partition-queue", None, task) { + e.insert(task_id); + } else { + break; + } + } + + // We got this message, let's send it through + self.sender.send(&consumer_group_id, msg).await?; + // This method tells rdkafka that we have processed this message, // so its offset can be safely committed. // rdkafka periodically commits these offsets asynchronously, with a period configurable // with auto.commit.interval.ms - consumer.store_offset_from_message(&msg)?; + consumer.store_offset(&topic, partition, offset)?; } _ = &mut rx => { - return Ok(()); + break; } } } + for task_id in topic_partition_tasks.into_values() { + tc.cancel_task(task_id); + } + Ok(()) + } +} + +async fn topic_partition_queue_consumption_loop( + sender: MessageSender, + topic: String, + partition: i32, + topic_partition_consumer: StreamPartitionQueue, + consumer: Arc, + consumer_group_id: String, +) -> Result<(), anyhow::Error> { + let mut shutdown = std::pin::pin!(cancellation_watcher()); + + loop { + tokio::select! { + res = topic_partition_consumer.recv() => { + let msg = res?; + let offset = msg.offset(); + sender.send(&consumer_group_id, msg).await?; + consumer.store_offset(&topic, partition, offset)?; + } + _ = &mut shutdown => { + return Ok(()) + } + } } } From e2664cb4365f85f78de5b93699f6d139c952a4bc Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Mon, 16 Sep 2024 17:33:41 +0200 Subject: [PATCH 2/6] Bump to rust-rdkafka 0.35, the latest release that seems to be ok. See https://github.com/fede1024/rust-rdkafka/issues/638#issuecomment-2045433321 for more details. No visible performance differences. --- Cargo.lock | 4 ++-- crates/ingress-kafka/Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fe0e66ec0d..2de8394b47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5217,9 +5217,9 @@ dependencies = [ [[package]] name = "rdkafka" -version = "0.34.0" +version = "0.35.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "053adfa02fab06e86c01d586cc68aa47ee0ff4489a59469081dc12cbcde578bf" +checksum = "f16c17f411935214a5870e40aff9291f8b40a73e97bf8de29e5959c473d5ef33" dependencies = [ "futures-channel", "futures-util", diff --git a/crates/ingress-kafka/Cargo.toml b/crates/ingress-kafka/Cargo.toml index a2337a92a6..8dd3e8b007 100644 --- a/crates/ingress-kafka/Cargo.toml +++ b/crates/ingress-kafka/Cargo.toml @@ -23,7 +23,7 @@ base64 = { workspace = true } bytes = { workspace = true } derive_builder = { workspace = true } opentelemetry = { workspace = true } -rdkafka = { version = "0.34", features = ["libz-static", "cmake-build"] } +rdkafka = { version = "0.35", features = ["libz-static", "cmake-build"] } schemars = { workspace = true, optional = true } serde = { workspace = true } thiserror = { workspace = true } From bbdb7cd411c62bac97f6509150b3f0433baa7c45 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Mon, 16 Sep 2024 18:12:08 +0200 Subject: [PATCH 3/6] Add restate.kafka_ingress.requests.total metric. This is useful to graph the consumption rate. --- Cargo.lock | 1 + crates/ingress-kafka/Cargo.toml | 1 + crates/ingress-kafka/src/consumer_task.rs | 20 +++++++++++------- crates/ingress-kafka/src/lib.rs | 1 + .../ingress-kafka/src/metric_definitions.rs | 21 +++++++++++++++++++ .../src/subscription_controller.rs | 1 + 6 files changed, 38 insertions(+), 7 deletions(-) create mode 100644 crates/ingress-kafka/src/metric_definitions.rs diff --git a/Cargo.lock b/Cargo.lock index 2de8394b47..178cbb9dc7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5775,6 +5775,7 @@ dependencies = [ "base64 0.22.0", "bytes", "derive_builder", + "metrics", "opentelemetry", "rdkafka", "restate-core", diff --git a/crates/ingress-kafka/Cargo.toml b/crates/ingress-kafka/Cargo.toml index 8dd3e8b007..4b94d9db3c 100644 --- a/crates/ingress-kafka/Cargo.toml +++ b/crates/ingress-kafka/Cargo.toml @@ -22,6 +22,7 @@ anyhow = { workspace = true } base64 = { workspace = true } bytes = { workspace = true } derive_builder = { workspace = true } +metrics = { workspace = true } opentelemetry = { workspace = true } rdkafka = { version = "0.35", features = ["libz-static", "cmake-build"] } schemars = { workspace = true, optional = true } diff --git a/crates/ingress-kafka/src/consumer_task.rs b/crates/ingress-kafka/src/consumer_task.rs index d33be95318..ba51a77167 100644 --- a/crates/ingress-kafka/src/consumer_task.rs +++ b/crates/ingress-kafka/src/consumer_task.rs @@ -8,8 +8,10 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use crate::metric_definitions::KAFKA_INGRESS_REQUESTS; use base64::Engine; use bytes::Bytes; +use metrics::counter; use opentelemetry::trace::TraceContextExt; use rdkafka::consumer::stream_consumer::StreamPartitionQueue; use rdkafka::consumer::{Consumer, DefaultConsumerContext, StreamConsumer}; @@ -20,7 +22,6 @@ use restate_core::{cancellation_watcher, task_center, TaskId, TaskKind}; use restate_ingress_dispatcher::{ DeduplicationId, DispatchIngressRequest, IngressDispatcher, IngressDispatcherRequest, }; -use restate_types::identifiers::SubscriptionId; use restate_types::invocation::{Header, SpanRelation}; use restate_types::message::MessageIndex; use restate_types::schema::subscriptions::{EventReceiverServiceType, Sink, Subscription}; @@ -85,6 +86,7 @@ impl DeduplicationId for KafkaDeduplicationId { #[derive(Clone)] pub struct MessageSender { + subscription_id: String, subscription: Subscription, dispatcher: IngressDispatcher, } @@ -92,6 +94,7 @@ pub struct MessageSender { impl MessageSender { pub fn new(subscription: Subscription, dispatcher: IngressDispatcher) -> Self { Self { + subscription_id: subscription.id().to_string(), subscription, dispatcher, } @@ -120,7 +123,7 @@ impl MessageSender { } else { Bytes::default() }; - let headers = Self::generate_events_attributes(&msg, self.subscription.id()); + let headers = Self::generate_events_attributes(&msg, &self.subscription_id); let req = IngressDispatcherRequest::event( &self.subscription, @@ -137,6 +140,12 @@ impl MessageSender { cause, })?; + counter!( + KAFKA_INGRESS_REQUESTS, + "subscription" => self.subscription_id.clone() + ) + .increment(1); + self.dispatcher .dispatch_ingress_request(req) .instrument(ingress_span) @@ -145,10 +154,7 @@ impl MessageSender { Ok(()) } - fn generate_events_attributes( - msg: &impl Message, - subscription_id: SubscriptionId, - ) -> Vec
{ + fn generate_events_attributes(msg: &impl Message, subscription_id: &str) -> Vec
{ let mut headers = Vec::with_capacity(6); headers.push(Header::new("kafka.offset", msg.offset().to_string())); headers.push(Header::new("kafka.topic", msg.topic())); @@ -158,7 +164,7 @@ impl MessageSender { } headers.push(Header::new( "restate.subscription.id".to_string(), - subscription_id.to_string(), + subscription_id, )); if let Some(key) = msg.key() { diff --git a/crates/ingress-kafka/src/lib.rs b/crates/ingress-kafka/src/lib.rs index 77203984af..a517e72157 100644 --- a/crates/ingress-kafka/src/lib.rs +++ b/crates/ingress-kafka/src/lib.rs @@ -9,6 +9,7 @@ // by the Apache License, Version 2.0. mod consumer_task; +mod metric_definitions; mod subscription_controller; use tokio::sync::mpsc; diff --git a/crates/ingress-kafka/src/metric_definitions.rs b/crates/ingress-kafka/src/metric_definitions.rs new file mode 100644 index 0000000000..713793859f --- /dev/null +++ b/crates/ingress-kafka/src/metric_definitions.rs @@ -0,0 +1,21 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use metrics::{describe_counter, Unit}; + +pub const KAFKA_INGRESS_REQUESTS: &str = "restate.kafka_ingress.requests.total"; + +pub(crate) fn describe_metrics() { + describe_counter!( + KAFKA_INGRESS_REQUESTS, + Unit::Count, + "Number of Kafka ingress requests" + ); +} diff --git a/crates/ingress-kafka/src/subscription_controller.rs b/crates/ingress-kafka/src/subscription_controller.rs index a4394e5f6f..259cca0068 100644 --- a/crates/ingress-kafka/src/subscription_controller.rs +++ b/crates/ingress-kafka/src/subscription_controller.rs @@ -48,6 +48,7 @@ pub struct Service { impl Service { pub fn new(dispatcher: IngressDispatcher) -> Service { + metric_definitions::describe_metrics(); let (commands_tx, commands_rx) = mpsc::channel(10); Service { From 8762e9da1d946315ac07346953586ba09088d66a Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 17 Sep 2024 13:47:07 +0200 Subject: [PATCH 4/6] Feedback --- crates/ingress-kafka/src/consumer_task.rs | 33 +++++++++++++---------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/crates/ingress-kafka/src/consumer_task.rs b/crates/ingress-kafka/src/consumer_task.rs index ba51a77167..f12ff48a75 100644 --- a/crates/ingress-kafka/src/consumer_task.rs +++ b/crates/ingress-kafka/src/consumer_task.rs @@ -8,7 +8,11 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::metric_definitions::KAFKA_INGRESS_REQUESTS; +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::fmt; +use std::sync::Arc; + use base64::Engine; use bytes::Bytes; use metrics::counter; @@ -18,6 +22,10 @@ use rdkafka::consumer::{Consumer, DefaultConsumerContext, StreamConsumer}; use rdkafka::error::KafkaError; use rdkafka::message::BorrowedMessage; use rdkafka::{ClientConfig, Message}; +use tokio::sync::oneshot; +use tracing::{debug, info, info_span, Instrument}; +use tracing_opentelemetry::OpenTelemetrySpanExt; + use restate_core::{cancellation_watcher, task_center, TaskId, TaskKind}; use restate_ingress_dispatcher::{ DeduplicationId, DispatchIngressRequest, IngressDispatcher, IngressDispatcherRequest, @@ -25,13 +33,8 @@ use restate_ingress_dispatcher::{ use restate_types::invocation::{Header, SpanRelation}; use restate_types::message::MessageIndex; use restate_types::schema::subscriptions::{EventReceiverServiceType, Sink, Subscription}; -use std::collections::hash_map::Entry; -use std::collections::HashMap; -use std::fmt; -use std::sync::Arc; -use tokio::sync::oneshot; -use tracing::{debug, info, info_span, Instrument}; -use tracing_opentelemetry::OpenTelemetrySpanExt; + +use crate::metric_definitions::KAFKA_INGRESS_REQUESTS; #[derive(Debug, thiserror::Error)] pub enum Error { @@ -86,15 +89,21 @@ impl DeduplicationId for KafkaDeduplicationId { #[derive(Clone)] pub struct MessageSender { - subscription_id: String, subscription: Subscription, dispatcher: IngressDispatcher, + + subscription_id: String, + ingress_request_counter: metrics::Counter, } impl MessageSender { pub fn new(subscription: Subscription, dispatcher: IngressDispatcher) -> Self { Self { subscription_id: subscription.id().to_string(), + ingress_request_counter: counter!( + KAFKA_INGRESS_REQUESTS, + "subscription" => subscription.id().to_string() + ), subscription, dispatcher, } @@ -140,11 +149,7 @@ impl MessageSender { cause, })?; - counter!( - KAFKA_INGRESS_REQUESTS, - "subscription" => self.subscription_id.clone() - ) - .increment(1); + self.ingress_request_counter.increment(1); self.dispatcher .dispatch_ingress_request(req) From 805a43b9da10899bbe063ccc7d956fcf0ae02626 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 17 Sep 2024 16:24:52 +0200 Subject: [PATCH 5/6] Plumb task center from constructor --- crates/ingress-kafka/src/consumer_task.rs | 16 +++++++++++----- .../ingress-kafka/src/subscription_controller.rs | 3 ++- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/crates/ingress-kafka/src/consumer_task.rs b/crates/ingress-kafka/src/consumer_task.rs index f12ff48a75..6a2762d1dc 100644 --- a/crates/ingress-kafka/src/consumer_task.rs +++ b/crates/ingress-kafka/src/consumer_task.rs @@ -26,7 +26,7 @@ use tokio::sync::oneshot; use tracing::{debug, info, info_span, Instrument}; use tracing_opentelemetry::OpenTelemetrySpanExt; -use restate_core::{cancellation_watcher, task_center, TaskId, TaskKind}; +use restate_core::{cancellation_watcher, TaskCenter, TaskId, TaskKind}; use restate_ingress_dispatcher::{ DeduplicationId, DispatchIngressRequest, IngressDispatcher, IngressDispatcherRequest, }; @@ -199,14 +199,21 @@ impl MessageSender { #[derive(Clone)] pub struct ConsumerTask { + task_center: TaskCenter, client_config: ClientConfig, topics: Vec, sender: MessageSender, } impl ConsumerTask { - pub fn new(client_config: ClientConfig, topics: Vec, sender: MessageSender) -> Self { + pub fn new( + task_center: TaskCenter, + client_config: ClientConfig, + topics: Vec, + sender: MessageSender, + ) -> Self { Self { + task_center, client_config, topics, sender, @@ -230,7 +237,6 @@ impl ConsumerTask { consumer.subscribe(&topics)?; let mut topic_partition_tasks: HashMap<(String, i32), TaskId> = Default::default(); - let tc = task_center(); loop { tokio::select! { @@ -254,7 +260,7 @@ impl ConsumerTask { consumer_group_id.clone() ); - if let Ok(task_id) = tc.spawn_child(TaskKind::Ingress, "partition-queue", None, task) { + if let Ok(task_id) = self.task_center.spawn_child(TaskKind::Ingress, "partition-queue", None, task) { e.insert(task_id); } else { break; @@ -276,7 +282,7 @@ impl ConsumerTask { } } for task_id in topic_partition_tasks.into_values() { - tc.cancel_task(task_id); + self.task_center.cancel_task(task_id); } Ok(()) } diff --git a/crates/ingress-kafka/src/subscription_controller.rs b/crates/ingress-kafka/src/subscription_controller.rs index 259cca0068..77f04a8283 100644 --- a/crates/ingress-kafka/src/subscription_controller.rs +++ b/crates/ingress-kafka/src/subscription_controller.rs @@ -14,7 +14,7 @@ use std::collections::HashSet; use crate::subscription_controller::task_orchestrator::TaskOrchestrator; use rdkafka::error::KafkaError; -use restate_core::cancellation_watcher; +use restate_core::{cancellation_watcher, task_center}; use restate_ingress_dispatcher::IngressDispatcher; use restate_types::config::IngressOptions; use restate_types::identifiers::SubscriptionId; @@ -133,6 +133,7 @@ impl Service { // Create the consumer task let consumer_task = consumer_task::ConsumerTask::new( + task_center(), client_config, vec![topic.to_string()], MessageSender::new(subscription, self.dispatcher.clone()), From 778dd088b852a3aae74063113e1d68b704c0eaf5 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 17 Sep 2024 16:49:55 +0200 Subject: [PATCH 6/6] Make sure we cancel child tasks when we're shutting down --- crates/ingress-kafka/src/consumer_task.rs | 31 +++++++++++++++-------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/crates/ingress-kafka/src/consumer_task.rs b/crates/ingress-kafka/src/consumer_task.rs index 6a2762d1dc..0a55c9fe4f 100644 --- a/crates/ingress-kafka/src/consumer_task.rs +++ b/crates/ingress-kafka/src/consumer_task.rs @@ -238,19 +238,24 @@ impl ConsumerTask { let mut topic_partition_tasks: HashMap<(String, i32), TaskId> = Default::default(); - loop { + let result = loop { tokio::select! { res = consumer.recv() => { - let msg = res?; + let msg = match res { + Ok(msg) => msg, + Err(e) => break Err(e.into()) + }; let topic = msg.topic().to_owned(); let partition = msg.partition(); let offset = msg.offset(); // If we didn't split the queue, let's do it and start the topic partition consumer if let Entry::Vacant(e) = topic_partition_tasks.entry((topic.clone(), partition)) { - let topic_partition_consumer = consumer - .split_partition_queue(&topic, partition) - .ok_or_else(|| Error::TopicPartitionSplit(topic.clone(), partition))?; + let topic_partition_consumer = match consumer + .split_partition_queue(&topic, partition) { + Some(q) => q, + None => break Err(Error::TopicPartitionSplit(topic.clone(), partition)) + }; let task = topic_partition_queue_consumption_loop( self.sender.clone(), @@ -263,28 +268,32 @@ impl ConsumerTask { if let Ok(task_id) = self.task_center.spawn_child(TaskKind::Ingress, "partition-queue", None, task) { e.insert(task_id); } else { - break; + break Ok(()); } } // We got this message, let's send it through - self.sender.send(&consumer_group_id, msg).await?; + if let Err(e) = self.sender.send(&consumer_group_id, msg).await { + break Err(e) + } // This method tells rdkafka that we have processed this message, // so its offset can be safely committed. // rdkafka periodically commits these offsets asynchronously, with a period configurable // with auto.commit.interval.ms - consumer.store_offset(&topic, partition, offset)?; + if let Err(e) = consumer.store_offset(&topic, partition, offset) { + break Err(e.into()) + } } _ = &mut rx => { - break; + break Ok(()); } } - } + }; for task_id in topic_partition_tasks.into_values() { self.task_center.cancel_task(task_id); } - Ok(()) + result } }