From 86656aa99b377359e358f277e432dcffc72eeb6c Mon Sep 17 00:00:00 2001 From: Mo Imran Date: Tue, 14 Jan 2025 19:25:05 +0000 Subject: [PATCH] resolve issue https://github.com/mimran1980/rusteron/issues/23 i.e. add AeronUriStringBuilder --- rusteron-archive/aeron | 2 +- rusteron-archive/src/lib.rs | 22 ++- rusteron-client/aeron | 2 +- rusteron-code-gen/src/aeron_custom.rs | 253 +++++++++++++++----------- rusteron-code-gen/src/common.rs | 218 ---------------------- rusteron-media-driver/aeron | 2 +- 6 files changed, 167 insertions(+), 332 deletions(-) diff --git a/rusteron-archive/aeron b/rusteron-archive/aeron index 2af201d..8a06415 160000 --- a/rusteron-archive/aeron +++ b/rusteron-archive/aeron @@ -1 +1 @@ -Subproject commit 2af201da4a69d66df1ffbdb0570a35d4fb6c7e02 +Subproject commit 8a06415a1522e8fbde7d0ea33522336ad8e82ea3 diff --git a/rusteron-archive/src/lib.rs b/rusteron-archive/src/lib.rs index 98dad56..c8d5d08 100644 --- a/rusteron-archive/src/lib.rs +++ b/rusteron-archive/src/lib.rs @@ -149,13 +149,27 @@ mod tests { "aeron:udp?control-mode=dynamic|control=localhost:8012"; #[test] - #[ignore] // TODO need to finish off fn test_uri_string_builder() -> Result<(), AeronCError> { let builder = AeronUriStringBuilder::default(); builder.init_new()?; - // builder.put("hello", "world")?; - builder.set_initial_position(0, 4, 1024)?; - // panic!("{}", builder.build(1024)?); + builder + .media(Media::Udp)? // very important to set media else set_initial_position will give an error of -1 + .mtu_length(1024 * 64)? + .set_initial_position(127424949617280, 1182294755, 65536)?; + let uri = builder.build(1024)?; + assert_eq!("aeron:udp?term-id=-1168322114|term-length=65536|mtu=65536|init-term-id=1182294755|term-offset=33408", uri); + + builder.init_new()?; + let uri = builder + .media(Media::Udp)? + .control_mode(ControlMode::Dynamic)? + .reliable(false)? + .ttl(2)? + .endpoint("localhost:1235")? + .control("localhost:1234")? + .build(1024)?; + assert_eq!("aeron:udp?ttl=2|control-mode=dynamic|endpoint=localhost:1235|control=localhost:1234|reliable=false", uri); + Ok(()) } diff --git a/rusteron-client/aeron b/rusteron-client/aeron index 2af201d..8a06415 160000 --- a/rusteron-client/aeron +++ b/rusteron-client/aeron @@ -1 +1 @@ -Subproject commit 2af201da4a69d66df1ffbdb0570a35d4fb6c7e02 +Subproject commit 8a06415a1522e8fbde7d0ea33522336ad8e82ea3 diff --git a/rusteron-code-gen/src/aeron_custom.rs b/rusteron-code-gen/src/aeron_custom.rs index c753686..72b0fe3 100644 --- a/rusteron-code-gen/src/aeron_custom.rs +++ b/rusteron-code-gen/src/aeron_custom.rs @@ -195,178 +195,217 @@ impl AeronUriStringBuilder { Ok(result) } - pub fn initial_term_id_key(&self, value: &str) -> Result<&Self, AeronCError> { - const KEY: &str = unsafe { std::str::from_utf8_unchecked(AERON_URI_INITIAL_TERM_ID_KEY) }; - self.put(KEY, value)?; + pub fn media(&self, value: Media) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_URI_STRING_BUILDER_MEDIA_KEY); + self.put(key, value.as_str())?; Ok(self) } - pub fn term_id_key(&self, value: &str) -> Result<&Self, AeronCError> { - const KEY: &str = unsafe { std::str::from_utf8_unchecked(AERON_URI_TERM_ID_KEY) }; - self.put(KEY, value)?; + + pub fn control_mode(&self, value: ControlMode) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_UDP_CHANNEL_CONTROL_MODE_KEY); + self.put(key, value.as_str())?; + Ok(self) + } + + pub fn prefix(&self, value: &str) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_URI_STRING_BUILDER_PREFIX_KEY); + self.put(key, value)?; + Ok(self) + } + + fn strip_null_terminator(bytes: &[u8]) -> &str { + let len = bytes.len() - 1; + unsafe { std::str::from_utf8_unchecked(&bytes[..len]) } + } + + pub fn initial_term_id(&self, value: i32) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_URI_INITIAL_TERM_ID_KEY); + self.put_int32(key, value)?; + Ok(self) + } + pub fn term_id(&self, value: i32) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_URI_TERM_ID_KEY); + self.put_int32(key, value)?; + Ok(self) + } + pub fn term_offset(&self, value: i32) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_URI_TERM_OFFSET_KEY); + self.put_int32(key, value)?; + Ok(self) + } + pub fn alias(&self, value: &str) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_URI_ALIAS_KEY); + self.put(key, value)?; + Ok(self) + } + pub fn term_length(&self, value: &str) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_URI_TERM_LENGTH_KEY); + self.put(key, value)?; + Ok(self) + } + pub fn linger_timeout(&self, value: i64) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_URI_LINGER_TIMEOUT_KEY); + self.put_int64(key, value)?; + Ok(self) + } + pub fn mtu_length(&self, value: i32) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_URI_MTU_LENGTH_KEY); + self.put_int32(key, value)?; Ok(self) } - pub fn term_offset_key(&self, value: &str) -> Result<&Self, AeronCError> { - const KEY: &str = unsafe { std::str::from_utf8_unchecked(AERON_URI_TERM_OFFSET_KEY) }; - self.put(KEY, value)?; + pub fn ttl(&self, value: i32) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_UDP_CHANNEL_TTL_KEY); + self.put_int32(key, value)?; Ok(self) } - pub fn alias_key(&self, value: &str) -> Result<&Self, AeronCError> { - const KEY: &str = unsafe { std::str::from_utf8_unchecked(AERON_URI_ALIAS_KEY) }; - self.put(KEY, value)?; + pub fn sparse_term(&self, value: bool) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_URI_SPARSE_TERM_KEY); + self.put(key, if value { "true" } else { "false" })?; Ok(self) } - pub fn term_length_key(&self, value: &str) -> Result<&Self, AeronCError> { - const KEY: &str = unsafe { std::str::from_utf8_unchecked(AERON_URI_TERM_LENGTH_KEY) }; - self.put(KEY, value)?; + pub fn reliable(&self, value: bool) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_UDP_CHANNEL_RELIABLE_KEY); + self.put(key, if value { "true" } else { "false" })?; Ok(self) } - pub fn linger_timeout_key(&self, value: &str) -> Result<&Self, AeronCError> { - const KEY: &str = unsafe { std::str::from_utf8_unchecked(AERON_URI_LINGER_TIMEOUT_KEY) }; - self.put(KEY, value)?; + pub fn eos(&self, value: bool) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_URI_EOS_KEY); + self.put(key, if value { "true" } else { "false" })?; Ok(self) } - pub fn mtu_length_key(&self, value: &str) -> Result<&Self, AeronCError> { - const KEY: &str = unsafe { std::str::from_utf8_unchecked(AERON_URI_MTU_LENGTH_KEY) }; - self.put(KEY, value)?; + pub fn tether(&self, value: bool) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_URI_TETHER_KEY); + self.put(key, if value { "true" } else { "false" })?; Ok(self) } - pub fn sparse_term_key(&self, value: &str) -> Result<&Self, AeronCError> { - const KEY: &str = unsafe { std::str::from_utf8_unchecked(AERON_URI_SPARSE_TERM_KEY) }; - self.put(KEY, value)?; + pub fn tags(&self, value: &str) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_URI_TAGS_KEY); + self.put(key, value)?; Ok(self) } - pub fn eos_key(&self, value: &str) -> Result<&Self, AeronCError> { - const KEY: &str = unsafe { std::str::from_utf8_unchecked(AERON_URI_EOS_KEY) }; - self.put(KEY, value)?; + pub fn endpoint(&self, value: &str) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_UDP_CHANNEL_ENDPOINT_KEY); + self.put(key, value)?; Ok(self) } - pub fn tether_key(&self, value: &str) -> Result<&Self, AeronCError> { - const KEY: &str = unsafe { std::str::from_utf8_unchecked(AERON_URI_TETHER_KEY) }; - self.put(KEY, value)?; + pub fn interface(&self, value: &str) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_UDP_CHANNEL_INTERFACE_KEY); + self.put(key, value)?; Ok(self) } - pub fn tags_key(&self, value: &str) -> Result<&Self, AeronCError> { - const KEY: &str = unsafe { std::str::from_utf8_unchecked(AERON_URI_TAGS_KEY) }; - self.put(KEY, value)?; + pub fn control(&self, value: &str) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_UDP_CHANNEL_CONTROL_KEY); + self.put(key, value)?; Ok(self) } - pub fn session_id_key(&self, value: &str) -> Result<&Self, AeronCError> { - const KEY: &str = unsafe { std::str::from_utf8_unchecked(AERON_URI_SESSION_ID_KEY) }; - self.put(KEY, value)?; + pub fn session_id(&self, value: &str) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_URI_SESSION_ID_KEY); + self.put(key, value)?; Ok(self) } - pub fn group_key(&self, value: &str) -> Result<&Self, AeronCError> { - const KEY: &str = unsafe { std::str::from_utf8_unchecked(AERON_URI_GROUP_KEY) }; - self.put(KEY, value)?; + pub fn group(&self, value: bool) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_URI_GROUP_KEY); + self.put(key, if value { "true" } else { "false" })?; Ok(self) } - pub fn rejoin_key(&self, value: &str) -> Result<&Self, AeronCError> { - const KEY: &str = unsafe { std::str::from_utf8_unchecked(AERON_URI_REJOIN_KEY) }; - self.put(KEY, value)?; + pub fn rejoin(&self, value: bool) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_URI_REJOIN_KEY); + self.put(key, if value { "true" } else { "false" })?; Ok(self) } - pub fn fc_key(&self, value: &str) -> Result<&Self, AeronCError> { - const KEY: &str = unsafe { std::str::from_utf8_unchecked(AERON_URI_FC_KEY) }; - self.put(KEY, value)?; + pub fn fc(&self, value: &str) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_URI_FC_KEY); + self.put(key, value)?; Ok(self) } - pub fn gtag_key(&self, value: &str) -> Result<&Self, AeronCError> { - const KEY: &str = unsafe { std::str::from_utf8_unchecked(AERON_URI_GTAG_KEY) }; - self.put(KEY, value)?; + pub fn gtag(&self, value: &str) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_URI_GTAG_KEY); + self.put(key, value)?; Ok(self) } - pub fn cc_key(&self, value: &str) -> Result<&Self, AeronCError> { - const KEY: &str = unsafe { std::str::from_utf8_unchecked(AERON_URI_CC_KEY) }; - self.put(KEY, value)?; + pub fn cc(&self, value: &str) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_URI_CC_KEY); + self.put(key, value)?; Ok(self) } - pub fn spies_simulate_connection_key(&self, value: &str) -> Result<&Self, AeronCError> { - const KEY: &str = - unsafe { std::str::from_utf8_unchecked(AERON_URI_SPIES_SIMULATE_CONNECTION_KEY) }; - self.put(KEY, value)?; + pub fn spies_simulate_connection(&self, value: bool) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_URI_SPIES_SIMULATE_CONNECTION_KEY); + self.put(key, if value { "true" } else { "false" })?; Ok(self) } - pub fn ats_key(&self, value: &str) -> Result<&Self, AeronCError> { - const KEY: &str = unsafe { std::str::from_utf8_unchecked(AERON_URI_ATS_KEY) }; - self.put(KEY, value)?; + pub fn ats(&self, value: bool) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_URI_ATS_KEY); + self.put(key, if value { "true" } else { "false" })?; Ok(self) } - pub fn socket_sndbuf_key(&self, value: &str) -> Result<&Self, AeronCError> { - const KEY: &str = unsafe { std::str::from_utf8_unchecked(AERON_URI_SOCKET_SNDBUF_KEY) }; - self.put(KEY, value)?; + pub fn socket_sndbuf(&self, value: i32) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_URI_SOCKET_SNDBUF_KEY); + self.put_int32(key, value)?; Ok(self) } - pub fn socket_rcvbuf_key(&self, value: &str) -> Result<&Self, AeronCError> { - const KEY: &str = unsafe { std::str::from_utf8_unchecked(AERON_URI_SOCKET_RCVBUF_KEY) }; - self.put(KEY, value)?; + pub fn socket_rcvbuf(&self, value: i32) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_URI_SOCKET_RCVBUF_KEY); + self.put_int32(key, value)?; Ok(self) } - pub fn receiver_window_key(&self, value: &str) -> Result<&Self, AeronCError> { - const KEY: &str = unsafe { std::str::from_utf8_unchecked(AERON_URI_RECEIVER_WINDOW_KEY) }; - self.put(KEY, value)?; + pub fn receiver_window(&self, value: i32) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_URI_RECEIVER_WINDOW_KEY); + self.put_int32(key, value)?; Ok(self) } - pub fn media_rcv_timestamp_offset_key(&self, value: &str) -> Result<&Self, AeronCError> { - const KEY: &str = - unsafe { std::str::from_utf8_unchecked(AERON_URI_MEDIA_RCV_TIMESTAMP_OFFSET_KEY) }; - self.put(KEY, value)?; + pub fn media_rcv_timestamp_offset(&self, value: &str) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_URI_MEDIA_RCV_TIMESTAMP_OFFSET_KEY); + self.put(key, value)?; Ok(self) } - pub fn channel_rcv_timestamp_offset_key(&self, value: &str) -> Result<&Self, AeronCError> { - const KEY: &str = - unsafe { std::str::from_utf8_unchecked(AERON_URI_CHANNEL_RCV_TIMESTAMP_OFFSET_KEY) }; - self.put(KEY, value)?; + pub fn channel_rcv_timestamp_offset(&self, value: &str) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_URI_CHANNEL_RCV_TIMESTAMP_OFFSET_KEY); + self.put(key, value)?; Ok(self) } - pub fn channel_snd_timestamp_offset_key(&self, value: &str) -> Result<&Self, AeronCError> { - const KEY: &str = - unsafe { std::str::from_utf8_unchecked(AERON_URI_CHANNEL_SND_TIMESTAMP_OFFSET_KEY) }; - self.put(KEY, value)?; + pub fn channel_snd_timestamp_offset(&self, value: &str) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_URI_CHANNEL_SND_TIMESTAMP_OFFSET_KEY); + self.put(key, value)?; Ok(self) } pub fn timestamp_offset_reserved(&self, value: &str) -> Result<&Self, AeronCError> { - const KEY: &str = - unsafe { std::str::from_utf8_unchecked(AERON_URI_TIMESTAMP_OFFSET_RESERVED) }; - self.put(KEY, value)?; + let key: &str = Self::strip_null_terminator(AERON_URI_TIMESTAMP_OFFSET_RESERVED); + self.put(key, value)?; Ok(self) } - pub fn response_correlation_id_key(&self, value: &str) -> Result<&Self, AeronCError> { - const KEY: &str = - unsafe { std::str::from_utf8_unchecked(AERON_URI_RESPONSE_CORRELATION_ID_KEY) }; - self.put(KEY, value)?; + pub fn response_correlation_id(&self, value: i64) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_URI_RESPONSE_CORRELATION_ID_KEY); + self.put_int64(key, value)?; Ok(self) } - pub fn nak_delay_key(&self, value: &str) -> Result<&Self, AeronCError> { - const KEY: &str = unsafe { std::str::from_utf8_unchecked(AERON_URI_NAK_DELAY_KEY) }; - self.put(KEY, value)?; + pub fn nak_delay(&self, value: i64) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_URI_NAK_DELAY_KEY); + self.put_int64(key, value)?; Ok(self) } - pub fn untethered_window_limit_timeout_key(&self, value: &str) -> Result<&Self, AeronCError> { - const KEY: &str = - unsafe { std::str::from_utf8_unchecked(AERON_URI_UNTETHERED_WINDOW_LIMIT_TIMEOUT_KEY) }; - self.put(KEY, value)?; + pub fn untethered_window_limit_timeout(&self, value: i64) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_URI_UNTETHERED_WINDOW_LIMIT_TIMEOUT_KEY); + self.put_int64(key, value)?; Ok(self) } - pub fn untethered_resting_timeout_key(&self, value: &str) -> Result<&Self, AeronCError> { - const KEY: &str = - unsafe { std::str::from_utf8_unchecked(AERON_URI_UNTETHERED_RESTING_TIMEOUT_KEY) }; - self.put(KEY, value)?; + pub fn untethered_resting_timeout(&self, value: i64) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_URI_UNTETHERED_RESTING_TIMEOUT_KEY); + self.put_int64(key, value)?; Ok(self) } - pub fn max_resend_key(&self, value: &str) -> Result<&Self, AeronCError> { - const KEY: &str = unsafe { std::str::from_utf8_unchecked(AERON_URI_MAX_RESEND_KEY) }; - self.put(KEY, value)?; + pub fn max_resend(&self, value: i32) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_URI_MAX_RESEND_KEY); + self.put_int32(key, value)?; Ok(self) } - pub fn stream_id_key(&self, value: &str) -> Result<&Self, AeronCError> { - const KEY: &str = unsafe { std::str::from_utf8_unchecked(AERON_URI_STREAM_ID_KEY) }; - self.put(KEY, value)?; + pub fn stream_id(&self, value: i32) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_URI_STREAM_ID_KEY); + self.put_int32(key, value)?; Ok(self) } - pub fn publication_window_key(&self, value: &str) -> Result<&Self, AeronCError> { - const KEY: &str = - unsafe { std::str::from_utf8_unchecked(AERON_URI_PUBLICATION_WINDOW_KEY) }; - self.put(KEY, value)?; + pub fn publication_window(&self, value: i32) -> Result<&Self, AeronCError> { + let key: &str = Self::strip_null_terminator(AERON_URI_PUBLICATION_WINDOW_KEY); + self.put_int32(key, value)?; Ok(self) } diff --git a/rusteron-code-gen/src/common.rs b/rusteron-code-gen/src/common.rs index cbddeb3..8416463 100644 --- a/rusteron-code-gen/src/common.rs +++ b/rusteron-code-gen/src/common.rs @@ -1,7 +1,6 @@ use crate::AeronErrorType::Unknown; #[cfg(debug_assertions)] use std::backtrace::Backtrace; -use std::collections::BTreeMap; use std::fmt::{Debug, Formatter}; use std::ops::{Deref, DerefMut}; use std::{any, fmt, ptr}; @@ -402,220 +401,3 @@ impl ControlMode { } } } - -/// Builder for constructing Aeron URIs. -#[derive(Default, Debug)] -pub struct ChannelUriBuilder { - prefix: Option, - media: Option, - endpoint: Option, - network_interface: Option, - control_endpoint: Option, - control_mode: Option, - tags: Option, - reliable: Option, - ttl: Option, - mtu: Option, - term_length: Option, - initial_term_id: Option, - term_id: Option, - term_offset: Option, - session_id: Option, - linger: Option, - sparse: Option, - additional_params: BTreeMap, -} - -impl ChannelUriBuilder { - /// Create a new builder. - pub fn new() -> Self { - Self::default() - } - - /// Set the prefix (e.g., "aeron-spy"). - pub fn prefix(mut self, prefix: &str) -> Self { - self.prefix = Some(prefix.to_string()); - self - } - - /// Set the media type. - pub fn media(mut self, media: Media) -> Self { - self.media = Some(media); - self - } - - /// Set the endpoint (address:port). - pub fn endpoint(mut self, endpoint: &str) -> Self { - self.endpoint = Some(endpoint.to_string()); - self - } - - /// Set the network interface. - pub fn network_interface(mut self, network_interface: &str) -> Self { - self.network_interface = Some(network_interface.to_string()); - self - } - - /// Set the control endpoint (address:port). - pub fn control_endpoint(mut self, control_endpoint: &str) -> Self { - self.control_endpoint = Some(control_endpoint.to_string()); - self - } - - /// Set the control mode. - pub fn control_mode(mut self, control_mode: ControlMode) -> Self { - self.control_mode = Some(control_mode); - self - } - - /// Set tags for the channel. - pub fn tags(mut self, tags: &str) -> Self { - self.tags = Some(tags.to_string()); - self - } - - /// Set the reliable flag. - pub fn reliable(mut self, reliable: bool) -> Self { - self.reliable = Some(reliable); - self - } - - /// Set the Time To Live (TTL). - pub fn ttl(mut self, ttl: u8) -> Self { - self.ttl = Some(ttl); - self - } - - /// Set the Maximum Transmission Unit (MTU). - pub fn mtu(mut self, mtu: u32) -> Self { - self.mtu = Some(mtu); - self - } - - /// Set the term length. - pub fn term_length(mut self, term_length: u32) -> Self { - self.term_length = Some(term_length); - self - } - - /// Set the initial term ID. - pub fn initial_term_id(mut self, initial_term_id: i32) -> Self { - self.initial_term_id = Some(initial_term_id); - self - } - - /// Set the term ID. - pub fn term_id(mut self, term_id: i32) -> Self { - self.term_id = Some(term_id); - self - } - - /// Set the term offset. - pub fn term_offset(mut self, term_offset: u32) -> Self { - self.term_offset = Some(term_offset); - self - } - - /// Set the session ID. - pub fn session_id(mut self, session_id: i32) -> Self { - self.session_id = Some(session_id); - self - } - - /// Set the linger timeout. - pub fn linger(mut self, linger: u64) -> Self { - self.linger = Some(linger); - self - } - - /// Set the sparse flag. - pub fn sparse(mut self, sparse: bool) -> Self { - self.sparse = Some(sparse); - self - } - - /// Add a custom parameter to the URI. - pub fn add_param(mut self, key: &str, value: &str) -> Self { - self.additional_params - .insert(key.to_string(), value.to_string()); - self - } - - /// Build the Aeron URI as a string. - pub fn build(self) -> Result { - let media = self - .media - .map(|m| m.as_str()) - .ok_or_else(|| "Media must be specified".to_string())?; - let mut uri = String::new(); - - if let Some(prefix) = self.prefix { - uri.push_str(&format!("{}:", prefix)); - } - - uri.push_str(&format!("aeron:{}?", media)); - - if let Some(endpoint) = self.endpoint { - uri.push_str(&format!("endpoint={}|", endpoint)); - } - - if let Some(control_endpoint) = self.control_endpoint { - uri.push_str(&format!("control={}|", control_endpoint)); - } - - if let Some(control_mode) = self.control_mode { - uri.push_str(&format!("control-mode={}|", control_mode.as_str())); - } - - if let Some(tags) = self.tags { - uri.push_str(&format!("tags={}|", tags)); - } - - if let Some(reliable) = self.reliable { - uri.push_str(&format!("reliable={}|", reliable)); - } - - if let Some(ttl) = self.ttl { - uri.push_str(&format!("ttl={}|", ttl)); - } - - if let Some(mtu) = self.mtu { - uri.push_str(&format!("mtu={}|", mtu)); - } - - if let Some(term_length) = self.term_length { - uri.push_str(&format!("term-length={}|", term_length)); - } - - if let Some(initial_term_id) = self.initial_term_id { - uri.push_str(&format!("initial-term-id={}|", initial_term_id)); - } - - if let Some(term_id) = self.term_id { - uri.push_str(&format!("term-id={}|", term_id)); - } - - if let Some(term_offset) = self.term_offset { - uri.push_str(&format!("term-offset={}|", term_offset)); - } - - if let Some(session_id) = self.session_id { - uri.push_str(&format!("session-id={}|", session_id)); - } - - if let Some(linger) = self.linger { - uri.push_str(&format!("linger={}|", linger)); - } - - if let Some(sparse) = self.sparse { - uri.push_str(&format!("sparse={}|", sparse)); - } - - for (key, value) in self.additional_params { - uri.push_str(&format!("{}={}|", key, value)); - } - - uri.pop(); - Ok(uri) - } -} diff --git a/rusteron-media-driver/aeron b/rusteron-media-driver/aeron index 2af201d..8a06415 160000 --- a/rusteron-media-driver/aeron +++ b/rusteron-media-driver/aeron @@ -1 +1 @@ -Subproject commit 2af201da4a69d66df1ffbdb0570a35d4fb6c7e02 +Subproject commit 8a06415a1522e8fbde7d0ea33522336ad8e82ea3