Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

created config enum #758

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 137 additions & 3 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use rdkafka_sys as rdsys;
use rdkafka_sys::types::*;

use crate::client::ClientContext;
use crate::config_map::Config;
use crate::error::{IsError, KafkaError, KafkaResult};
use crate::log::{log_enabled, DEBUG, INFO, WARN};
use crate::util::{ErrBuf, KafkaDrop, NativePtr};
Expand Down Expand Up @@ -226,13 +227,146 @@ impl ClientConfig {
///
/// If there is an existing value for `key` in the configuration, it is
/// overridden with the new `value`.
pub fn set<K, V>(&mut self, key: K, value: V) -> &mut ClientConfig
pub fn set<K, V>(&mut self, map: Config) -> &mut ClientConfig
where
K: Into<String>,
V: Into<String>,
{
self.conf_map.insert(key.into(), value.into());
self
match map {
Config::Producer(producer) => match producer {
crate::config_map::Producer::Acks(ack) => {
let acks = match ack {
crate::config_map::Acks::All => "all",
crate::config_map::Acks::Balanced => "1",
crate::config_map::Acks::FastRisky => "0",
};
self.conf_map.insert("acks".into(), acks.into());
self
}
crate::config_map::Producer::BatchSize(size) => {
self.conf_map.insert("batch.size".into(), size.into());
self
}
crate::config_map::Producer::Bootstrap(port) => {
self.conf_map
.insert("bootstrap.servers".into(), port.into());
self
}
crate::config_map::Producer::BufferMaxMemory(size) => {
self.conf_map.insert("buffer.memory".into(), size.into());
self
}
crate::config_map::Producer::Compression(compression) => {
let b = match compression {
crate::config_map::Compression::Gzip => "gzip",
crate::config_map::Compression::Lz4 => "lz4",
crate::config_map::Compression::None => "none",
crate::config_map::Compression::Snappy => "snappy",
crate::config_map::Compression::Zstd => "zstd",
};
self.conf_map.insert("compression.type".into(), b.into());
self
}
crate::config_map::Producer::Delay(b) => {
self.conf_map.insert("linger.ms".into(), b.into());
self
}
crate::config_map::Producer::Idempotence(b) => {
self.conf_map.insert("enable.idempotence".into(), b.into());
self
}
crate::config_map::Producer::MaxMessageSize(size) => {
self.conf_map.insert("max.message.size".into(), size.into());
self
}
crate::config_map::Producer::MultipleBootstrap(port) => {
self.conf_map
.insert("bootstrap.servers".into(), port.into());
self
}
crate::config_map::Producer::Retries(b) => {
self.conf_map.insert("retries".into(), b.into());
self
}
crate::config_map::Producer::RetryBackoff(b) => {
self.conf_map.insert("retry.backoff.ms".into(), b.into());
self
}
},
Config::Consumer(consumer) => match consumer {
crate::config_map::Producer::Acks(ack) => {
let acks = match ack {
crate::config_map::Acks::All => "all",
crate::config_map::Acks::Balanced => "1",
crate::config_map::Acks::FastRisky => "0",
};
self.conf_map.insert("acks".into(), acks.into());
self
}
crate::config_map::Consumer::AutoCommit(enable) => {
self.conf_map
.insert("enable.auto.commit".into(), enable.into());
self
}
crate::config_map::Consumer::AutoOffsetReset(position) => {
let b = match position {
crate::config_map::AutoOffsetReset::Earliest => "earliest",
crate::config_map::AutoOffsetReset::Latest => "latest",
crate::config_map::AutoOffsetReset::None => "none",
};
self.conf_map.insert("auto.offset.reset".into(), b.into());
self
}
crate::config_map::Consumer::Bootstrap(port) => {
self.conf_map
.insert("bootstrap.servers".into(), port.into());
self
}
crate::config_map::Consumer::CommitInterval(ms) => {
self.conf_map
.insert("auto.commit.interval.ms".into(), ms.into());
self
}
crate::config_map::Consumer::Compression(b) => {
let b = match compression {
crate::config_map::Compression::Gzip => "gzip",
crate::config_map::Compression::Lz4 => "lz4",
crate::config_map::Compression::None => "none",
crate::config_map::Compression::Snappy => "snappy",
crate::config_map::Compression::Zstd => "zstd",
};
self.conf_map.insert("compression.type".into(), b.into());
self
}
crate::config_map::Consumer::GroupId(id) => {
self.conf_map.insert("group.id".into(), id.into());
self
}
crate::config_map::Consumer::Heartbeat(beat) => {
self.conf_map
.insert("heartbeat.interval.ms".into(), beat.into());
self
}
crate::config_map::Consumer::MaxMessagePerPool(msg) => {
self.conf_map.insert("max.poll.records".into(), msg.into());
self
}
crate::config_map::Consumer::MaxTimeWait(time) => {
self.conf_map
.insert("fetch.max.wait.ms".into(), time.into());
self
}
crate::config_map::Consumer::MinFetch(msg) => {
self.conf_map.insert("fetch.min.bytes".into(), msg.into());
self
}
crate::config_map::Consumer::SessionTimeout(msg) => {
self.conf_map
.insert("session.timeout.ms".into(), msg.into());
self
}
},
}
}

/// Removes a parameter from the configuration.
Expand Down
78 changes: 78 additions & 0 deletions src/config_map.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
pub enum Config {
Producer(Producer),
Consumer(Consumer),
}

/// congif for producer
pub enum Producer {
/// Kafka brokers to connect to (comma-separated)
Bootstrap(&str),
/// Acknowledgment level (0, 1, all)
Acks(Acks),
/// Message compression (none, gzip, snappy, lz4, zstd)
Compression(Compression),
/// Max batch size in bytes before sending
BatchSize(BatchSize),
/// Delay before sending batch (reduce for low latency)
Delay(u64),
/// Max memory for buffering messages
BufferMaxMemory(u64),
/// Max message size allowed
MaxMessageSize(u64),
/// Number of retries for failed sends
Retries(u64),
/// Time between retry attempts
RetryBackoff(u64),
/// Ensure exactly-once semantics (true, false)
Idempotence(bool),
/// Required for transactional messaging
TrasactionId(&str),
}

/// config for consumer
pub enum Consumer {
/// Kafka brokers to connect to
Bootstrap(u64),
/// Consumer group identifier
GroupId(&str),
/// Start position (earliest, latest, none)
AutoOffsetReset(AutoOffsetReset),
/// Auto commit offsets (true, false)
AutoCommit(bool),
/// Time between auto commits
CommitInterval(u64),
/// Max messages returned per poll
MaxMessagePerPool(u64),
/// Min data to fetch per request
MinFetch(u64),
/// Max time to wait for data
MaxTimeWait(u64),
/// Heartbeat interval to the broker
Heartbeat(u64),
/// Consumer session timeout
SessionTimeout(u64),
}

pub enum AutoOffsetReset {
Earliest,
Latest,
None,
}
pub enum Acks {
FastRisky,
Balanced,
All,
}

pub enum Compression {
None,
Gzip,
Snappy,
Lz4,
Zstd,
}

pub enum BatchSize {
Default,
Custom(u64),
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ pub use rdkafka_sys::{bindings, helpers, types};
pub mod admin;
pub mod client;
pub mod config;
pub mod config_map;
pub mod consumer;
pub mod error;
pub mod groups;
Expand Down