Skip to content

Commit

Permalink
Add Client Workaround With a Load Balancer (#220)
Browse files Browse the repository at this point in the history
* add the load_malance_made attribute to the Client Options.
  • Loading branch information
JiaYingZhang authored Jul 30, 2024
1 parent b26e379 commit 3aa3c8a
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 15 deletions.
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,19 @@ let environment = Environment::builder()
.build()
```

##### Building the environment with a load balancer

```rust,no_run
use rabbitmq_stream_client::Environment;
let environment = Environment::builder()
.load_balancer_mode(true)
.build()
```



##### Publishing messages

```rust,no_run
Expand Down
9 changes: 9 additions & 0 deletions src/client/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub struct ClientOptions {
pub(crate) v_host: String,
pub(crate) heartbeat: u32,
pub(crate) max_frame_size: u32,
pub(crate) load_balancer_mode: bool,
pub(crate) tls: TlsConfiguration,
pub(crate) collector: Arc<dyn MetricsCollector>,
}
Expand Down Expand Up @@ -39,6 +40,7 @@ impl Default for ClientOptions {
v_host: "/".to_owned(),
heartbeat: 60,
max_frame_size: 1048576,
load_balancer_mode: false,
collector: Arc::new(NopMetricsCollector {}),
tls: TlsConfiguration {
enabled: false,
Expand Down Expand Up @@ -117,6 +119,11 @@ impl ClientOptionsBuilder {
self
}

pub fn load_balancer_mode(mut self, load_balancer_mode: bool) -> Self {
self.0.load_balancer_mode = load_balancer_mode;
self
}

pub fn build(self) -> ClientOptions {
self.0
}
Expand Down Expand Up @@ -145,6 +152,7 @@ mod tests {
client_keys_path: String::from(""),
})
.collector(Arc::new(NopMetricsCollector {}))
.load_balancer_mode(true)
.build();
assert_eq!(options.host, "test");
assert_eq!(options.port, 8888);
Expand All @@ -154,5 +162,6 @@ mod tests {
assert_eq!(options.heartbeat, 10000);
assert_eq!(options.max_frame_size, 1);
assert_eq!(options.tls.enabled, true);
assert_eq!(options.load_balancer_mode, true);
}
}
28 changes: 21 additions & 7 deletions src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,27 @@ impl ConsumerBuilder {
metadata.replicas,
stream
);
client = Client::connect(ClientOptions {
host: replica.host.clone(),
port: replica.port as u16,
..self.environment.options.client_options
})
.await?;
let load_balancer_mode = self.environment.options.client_options.load_balancer_mode;
if load_balancer_mode {
let options = self.environment.options.client_options.clone();
loop {
let temp_client = Client::connect(options.clone()).await?;
let mapping = temp_client.connection_properties().await;
if let Some(advertised_host) = mapping.get("advertised_host") {
if *advertised_host == replica.host.clone() {
client = temp_client;
break;
}
}
}
} else {
client = Client::connect(ClientOptions {
host: replica.host.clone(),
port: replica.port as u16,
..self.environment.options.client_options
})
.await?;
}
}
} else {
return Err(ConsumerCreateError::StreamDoesNotExist {
Expand All @@ -100,7 +115,6 @@ impl ConsumerBuilder {
waker: AtomicWaker::new(),
metrics_collector: collector,
});

let msg_handler = ConsumerMessageHandler(consumer.clone());
client.set_handler(msg_handler).await;

Expand Down
7 changes: 6 additions & 1 deletion src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,16 @@ impl EnvironmentBuilder {
}
pub fn metrics_collector(
mut self,
collector: impl MetricsCollector + Send + Sync + 'static,
collector: impl MetricsCollector + 'static,
) -> EnvironmentBuilder {
self.0.client_options.collector = Arc::new(collector);
self
}

pub fn load_balancer_mode(mut self, load_balancer_mode: bool) -> EnvironmentBuilder {
self.0.client_options.load_balancer_mode = load_balancer_mode;
self
}
}
#[derive(Clone, Default)]
pub struct EnvironmentOptions {
Expand Down
30 changes: 23 additions & 7 deletions src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,29 @@ impl<T> ProducerBuilder<T> {
metadata.leader,
stream
);
client.close().await?;
client = Client::connect(ClientOptions {
host: metadata.leader.host.clone(),
port: metadata.leader.port as u16,
..self.environment.options.client_options
})
.await?;
let load_balancer_mode = self.environment.options.client_options.load_balancer_mode;
if load_balancer_mode {
// Producer must connect to leader node
let options: ClientOptions = self.environment.options.client_options.clone();
loop {
let temp_client = Client::connect(options.clone()).await?;
let mapping = temp_client.connection_properties().await;
if let Some(advertised_host) = mapping.get("advertised_host") {
if *advertised_host == metadata.leader.host.clone() {
client = temp_client;
break;
}
}
}
} else {
client.close().await?;
client = Client::connect(ClientOptions {
host: metadata.leader.host.clone(),
port: metadata.leader.port as u16,
..self.environment.options.client_options
})
.await?
};
} else {
return Err(ProducerCreateError::StreamDoesNotExist {
stream: stream.into(),
Expand Down

0 comments on commit 3aa3c8a

Please sign in to comment.