diff --git a/src/config.rs b/src/config.rs index 1f5e34247..b7cde8ed5 100644 --- a/src/config.rs +++ b/src/config.rs @@ -32,6 +32,7 @@ use rdkafka_sys as rdsys; use rdkafka_sys::types::*; use crate::client::ClientContext; +use crate::config_map::Config; use crate::error::{IsError, KafkaError, KafkaResult}; use crate::log::{log_enabled, DEBUG, INFO, WARN}; use crate::util::{ErrBuf, KafkaDrop, NativePtr}; @@ -226,13 +227,146 @@ impl ClientConfig { /// /// If there is an existing value for `key` in the configuration, it is /// overridden with the new `value`. - pub fn set(&mut self, key: K, value: V) -> &mut ClientConfig + pub fn set(&mut self, map: Config) -> &mut ClientConfig where K: Into, V: Into, { - self.conf_map.insert(key.into(), value.into()); - self + match map { + Config::Producer(producer) => match producer { + crate::config_map::Producer::Acks(ack) => { + let acks = match ack { + crate::config_map::Acks::All => "all", + crate::config_map::Acks::Balanced => "1", + crate::config_map::Acks::FastRisky => "0", + }; + self.conf_map.insert("acks".into(), acks.into()); + self + } + crate::config_map::Producer::BatchSize(size) => { + self.conf_map.insert("batch.size".into(), size.into()); + self + } + crate::config_map::Producer::Bootstrap(port) => { + self.conf_map + .insert("bootstrap.servers".into(), port.into()); + self + } + crate::config_map::Producer::BufferMaxMemory(size) => { + self.conf_map.insert("buffer.memory".into(), size.into()); + self + } + crate::config_map::Producer::Compression(compression) => { + let b = match compression { + crate::config_map::Compression::Gzip => "gzip", + crate::config_map::Compression::Lz4 => "lz4", + crate::config_map::Compression::None => "none", + crate::config_map::Compression::Snappy => "snappy", + crate::config_map::Compression::Zstd => "zstd", + }; + self.conf_map.insert("compression.type".into(), b.into()); + self + } + crate::config_map::Producer::Delay(b) => { + self.conf_map.insert("linger.ms".into(), b.into()); + self + } + crate::config_map::Producer::Idempotence(b) => { + self.conf_map.insert("enable.idempotence".into(), b.into()); + self + } + crate::config_map::Producer::MaxMessageSize(size) => { + self.conf_map.insert("max.message.size".into(), size.into()); + self + } + crate::config_map::Producer::MultipleBootstrap(port) => { + self.conf_map + .insert("bootstrap.servers".into(), port.into()); + self + } + crate::config_map::Producer::Retries(b) => { + self.conf_map.insert("retries".into(), b.into()); + self + } + crate::config_map::Producer::RetryBackoff(b) => { + self.conf_map.insert("retry.backoff.ms".into(), b.into()); + self + } + }, + Config::Consumer(consumer) => match consumer { + crate::config_map::Producer::Acks(ack) => { + let acks = match ack { + crate::config_map::Acks::All => "all", + crate::config_map::Acks::Balanced => "1", + crate::config_map::Acks::FastRisky => "0", + }; + self.conf_map.insert("acks".into(), acks.into()); + self + } + crate::config_map::Consumer::AutoCommit(enable) => { + self.conf_map + .insert("enable.auto.commit".into(), enable.into()); + self + } + crate::config_map::Consumer::AutoOffsetReset(position) => { + let b = match position { + crate::config_map::AutoOffsetReset::Earliest => "earliest", + crate::config_map::AutoOffsetReset::Latest => "latest", + crate::config_map::AutoOffsetReset::None => "none", + }; + self.conf_map.insert("auto.offset.reset".into(), b.into()); + self + } + crate::config_map::Consumer::Bootstrap(port) => { + self.conf_map + .insert("bootstrap.servers".into(), port.into()); + self + } + crate::config_map::Consumer::CommitInterval(ms) => { + self.conf_map + .insert("auto.commit.interval.ms".into(), ms.into()); + self + } + crate::config_map::Consumer::Compression(b) => { + let b = match compression { + crate::config_map::Compression::Gzip => "gzip", + crate::config_map::Compression::Lz4 => "lz4", + crate::config_map::Compression::None => "none", + crate::config_map::Compression::Snappy => "snappy", + crate::config_map::Compression::Zstd => "zstd", + }; + self.conf_map.insert("compression.type".into(), b.into()); + self + } + crate::config_map::Consumer::GroupId(id) => { + self.conf_map.insert("group.id".into(), id.into()); + self + } + crate::config_map::Consumer::Heartbeat(beat) => { + self.conf_map + .insert("heartbeat.interval.ms".into(), beat.into()); + self + } + crate::config_map::Consumer::MaxMessagePerPool(msg) => { + self.conf_map.insert("max.poll.records".into(), msg.into()); + self + } + crate::config_map::Consumer::MaxTimeWait(time) => { + self.conf_map + .insert("fetch.max.wait.ms".into(), time.into()); + self + } + crate::config_map::Consumer::MinFetch(msg) => { + self.conf_map.insert("fetch.min.bytes".into(), msg.into()); + self + } + crate::config_map::Consumer::SessionTimeout(msg) => { + self.conf_map + .insert("session.timeout.ms".into(), msg.into()); + self + } + }, + } } /// Removes a parameter from the configuration. diff --git a/src/config_map.rs b/src/config_map.rs new file mode 100644 index 000000000..1a37eab65 --- /dev/null +++ b/src/config_map.rs @@ -0,0 +1,78 @@ +pub enum Config { + Producer(Producer), + Consumer(Consumer), +} + +/// congif for producer +pub enum Producer { + /// Kafka brokers to connect to (comma-separated) + Bootstrap(&str), + /// Acknowledgment level (0, 1, all) + Acks(Acks), + /// Message compression (none, gzip, snappy, lz4, zstd) + Compression(Compression), + /// Max batch size in bytes before sending + BatchSize(BatchSize), + /// Delay before sending batch (reduce for low latency) + Delay(u64), + /// Max memory for buffering messages + BufferMaxMemory(u64), + /// Max message size allowed + MaxMessageSize(u64), + /// Number of retries for failed sends + Retries(u64), + /// Time between retry attempts + RetryBackoff(u64), + /// Ensure exactly-once semantics (true, false) + Idempotence(bool), + /// Required for transactional messaging + TrasactionId(&str), +} + +/// config for consumer +pub enum Consumer { + /// Kafka brokers to connect to + Bootstrap(u64), + /// Consumer group identifier + GroupId(&str), + /// Start position (earliest, latest, none) + AutoOffsetReset(AutoOffsetReset), + /// Auto commit offsets (true, false) + AutoCommit(bool), + /// Time between auto commits + CommitInterval(u64), + /// Max messages returned per poll + MaxMessagePerPool(u64), + /// Min data to fetch per request + MinFetch(u64), + /// Max time to wait for data + MaxTimeWait(u64), + /// Heartbeat interval to the broker + Heartbeat(u64), + /// Consumer session timeout + SessionTimeout(u64), +} + +pub enum AutoOffsetReset { + Earliest, + Latest, + None, +} +pub enum Acks { + FastRisky, + Balanced, + All, +} + +pub enum Compression { + None, + Gzip, + Snappy, + Lz4, + Zstd, +} + +pub enum BatchSize { + Default, + Custom(u64), +} diff --git a/src/lib.rs b/src/lib.rs index 46709c5a7..66d9b30b3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -277,6 +277,7 @@ pub use rdkafka_sys::{bindings, helpers, types}; pub mod admin; pub mod client; pub mod config; +pub mod config_map; pub mod consumer; pub mod error; pub mod groups;