Skip to content

Commit

Permalink
Merge pull request #33 from Toumash/28-direct-exchange-binds-doesnt-w…
Browse files Browse the repository at this point in the history
…ork-correctly

[fix] direct exchange allows for multiple binds for the same queue
  • Loading branch information
Toumash authored Jul 25, 2024
2 parents 8468715 + 94cf758 commit 3e8b2fd
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 68 deletions.
15 changes: 8 additions & 7 deletions exchanges/src/exchanges.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ pub enum ExchangeKind {
pub enum ExchangeError {
BindFail,
EmptyPayloadFail,
DeadLetterQueueLockFail {},
DeadLetterQueueLockFail,
NoRouteKey,
NoMatchingQueue { route_key: String },
MessageWithoutMetadata
}

impl fmt::Display for ExchangeError {
Expand All @@ -41,16 +41,16 @@ impl fmt::Display for ExchangeError {
write!(f, "binding to exchange failed")
}
ExchangeError::EmptyPayloadFail => {
write!(f, "handling message failed")
write!(f, "handling message failed: empty payload")
}
ExchangeError::DeadLetterQueueLockFail {} => write!(
f,
"handling message failed: queue().lock() failed for dead letter queue"
),

ExchangeError::NoMatchingQueue { route_key } => {
write!(f, "No matching queue for route key {}", route_key)
}
ExchangeError::MessageWithoutMetadata => write!(
f,
"handling message failed: insufficient metadata in message"
),
ExchangeError::NoRouteKey => {
write!(f, "No routing key found")
}
Expand All @@ -65,6 +65,7 @@ pub trait Exchange {
metadata: Option<&protos::BindMetadata>,
) -> Result<(), ExchangeError>;
fn get_bound_queue_names(&self) -> HashSet<QueueName>;
fn get_bind_count(&self) -> u32;
fn handle_message(
&self,
message: Message,
Expand Down
140 changes: 85 additions & 55 deletions exchanges/src/types/direct.rs
Original file line number Diff line number Diff line change
@@ -1,62 +1,69 @@
use tracing::info;
use std::collections::hash_map::Entry;
use std::collections::HashSet;

use std::collections::{HashMap, HashSet};
use tracing::{debug, error, instrument};

use crate::*;

type RoutingKey = String;

#[derive(Default)]
pub struct DirectExchange {
bound_queues: HashSet<QueueName>,
routing_map: HashMap<String, HashSet<QueueName>>,
exchange_name: String,
routing_map: HashMap<QueueName, HashSet<RoutingKey>>, // allow multiple bindings exchange and queue
exchange_name: ExchangeName,
bind_count: u32
}

impl DirectExchange {
#[allow(dead_code)] // this is currently only used in tests
fn new(exchange_name: String) -> Self {
fn new(exchange_name: ExchangeName) -> Self {
DirectExchange {
bound_queues: HashSet::new(),
exchange_name,
routing_map: HashMap::new(),
bind_count: 0u32
}
}
}

impl PushToQueueStrategy for DirectExchange {}

impl Exchange for DirectExchange {
#[instrument(skip_all, fields(exchange_name=%self.exchange_name, queue_name=%queue_name))]
fn bind(
&mut self,
queue_name: &QueueName,
metadata: Option<&protos::BindMetadata>,
) -> Result<(), ExchangeError> {
let metadata = match metadata {
Some(data) => data,
None => return Err(ExchangeError::BindFail),
};

let route_key = match &metadata.routing_key {
Some(key) => key.value.clone(),
None => return Err(ExchangeError::BindFail),
};

match self.routing_map.entry(route_key) {
Entry::Occupied(o) => {
let value = o.into_mut();
match value.get(queue_name) {
Some(_) => return Err(ExchangeError::BindFail),
None => value.insert(queue_name.to_string()),
};
let metadata = metadata.ok_or_else(|| {
error!("metadata is required for DirectExchange binding");
ExchangeError::BindFail
})?;

let routing_key = metadata.routing_key.as_ref().ok_or_else(|| {
error!("routing_key is required for DirectExchange binding");
ExchangeError::BindFail
})?;

match self.routing_map.entry(queue_name.into()) {
Entry::Occupied(mut entry) => {
debug!("adding bind to an existing routing_map entry");
if !entry.get_mut().insert(routing_key.value.clone()) {
error!("this binding already exists");
return Err(ExchangeError::BindFail);
}
}
Entry::Vacant(v) => {
v.insert(HashSet::from([queue_name.to_string()]));
Entry::Vacant(_) => {
debug!("adding new entry to routing_map");
self.routing_map.insert(
queue_name.into(),
HashSet::from([routing_key.value.clone()]),
);
}
};

if !self.bound_queues.insert(queue_name.clone()) {
info!("Updating queue named: {}", queue_name);
}
self.bound_queues.insert(queue_name.into());
self.bind_count += 1;

Ok(())
}
Expand All @@ -70,45 +77,44 @@ impl Exchange for DirectExchange {
message: Message,
queues: Arc<RwLock<QueueContainer>>,
) -> Result<u32, ExchangeError> {
let metadata = match &message.metadata {
Some(data) => data,
None => return Err(ExchangeError::BindFail),
};
let metadata = message
.metadata
.as_ref()
.ok_or(ExchangeError::MessageWithoutMetadata)?;

let route_key = match &metadata.routing_key {
Some(key) => key.clone(),
None => return Err(ExchangeError::BindFail),
};
let routing_map = &self.routing_map;

let bound_queues = match self.routing_map.get(&route_key) {
Some(q) => q,
None => {
return Err(ExchangeError::NoMatchingQueue {
route_key,
})
}
};
let routing_key = metadata.routing_key.as_ref().ok_or(ExchangeError::NoRouteKey)?;

let queues_read = queues.read().unwrap();
let mut pushed_counter: u32 = 0;

for name in bound_queues {
if let Some(queue) = queues_read.get(name) {
if self.push_to_queue(
&self.exchange_name,
message.clone(),
queue,
name,
&queues_read,
)? == PushResult::Ok
{
pushed_counter += 1;
for (queue_name, keys) in routing_map {
if let Some(queue) = queues_read.get(queue_name) {
for key in keys {
if key != routing_key {
continue;
}
if self.push_to_queue(
&self.exchange_name,
message.clone(),
queue,
queue_name,
&queues_read,
)? == PushResult::Ok
{
pushed_counter += 1;
}
}
}
}

Ok(pushed_counter)
}

fn get_bind_count(&self) -> u32 {
self.bind_count
}
}

#[cfg(test)]
Expand Down Expand Up @@ -146,6 +152,29 @@ mod tests {
assert_eq!(ex.get_bound_queue_names().len(), 3);
}

#[test]
fn bind_count_test() {
let mut ex: DirectExchange = DirectExchange::default();
let bind_metadata1 = BindMetadata {
routing_key: Some(RoutingKey {
value: "route_1".to_string(),
}),
};
let bind_metadata2 = BindMetadata {
routing_key: Some(RoutingKey {
value: "route_2".to_string(),
}),
};
assert_eq!(ex.bind(&"q1".to_string(), Some(&bind_metadata1)), Ok(()));
assert_eq!(ex.bind(&"q2".to_string(), Some(&bind_metadata1)), Ok(()));
assert_eq!(ex.bind(&"q3".to_string(), Some(&bind_metadata1)), Ok(()));
assert_eq!(ex.bind(&"q1".to_string(), Some(&bind_metadata2)), Ok(()));
assert_eq!(ex.bind(&"q2".to_string(), Some(&bind_metadata2)), Ok(()));
assert_eq!(ex.bind(&"q3".to_string(), Some(&bind_metadata2)), Ok(()));
assert_eq!(ex.get_bound_queue_names().len(), 3);
assert_eq!(ex.get_bind_count(), 6);
}

#[test]
fn duplicate_queue_different_routing_bind_test() {
let mut ex = DirectExchange::default();
Expand All @@ -170,6 +199,7 @@ mod tests {
)
.is_ok());
assert_eq!(ex.get_bound_queue_names().len(), 1);

}

#[test]
Expand Down
29 changes: 23 additions & 6 deletions exchanges/src/types/fanout.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::collections::HashSet;

use tracing::{info, instrument, Span};
use tracing::{info, error, instrument, Span};

use crate::*;

#[derive(Default)]
pub struct FanoutExchange {
bound_queues: HashSet<QueueName>,
exchange_name: String,
bind_count: u32
}

impl FanoutExchange {
Expand All @@ -16,6 +17,7 @@ impl FanoutExchange {
FanoutExchange {
bound_queues: HashSet::new(),
exchange_name,
bind_count: 0u32
}
}
}
Expand All @@ -29,8 +31,12 @@ impl Exchange for FanoutExchange {
_metadata: Option<&protos::BindMetadata>,
) -> Result<(), ExchangeError> {
if !self.bound_queues.insert(queue_name.clone()) {
error!("bind with this name already exists");
return Err(ExchangeError::BindFail);
}

self.bind_count += 1;

Ok(())
}

Expand Down Expand Up @@ -73,18 +79,17 @@ impl Exchange for FanoutExchange {

Ok(pushed_counter)
}

fn get_bind_count(&self) -> u32 {
self.bind_count
}
}

#[cfg(test)]
mod tests {
use uuid::Uuid;

use super::*;
// use lazy_static::lazy_static;
// lazy_static! {
// static ref DUMMY_METADATA: Metadata = Metadata{ routing_key: None };
// }

/**
* Creates 3 queues with names: "q1", "q2", "q3"
*/
Expand Down Expand Up @@ -230,4 +235,16 @@ mod tests {
// assert
assert_eq!(message_handled_by_queues_count, 0);
}

#[test]
fn bind_count_test() {
let _queues = setup_test_queues();
let mut ex = FanoutExchange::default();

assert_eq!(ex.bind(&"q1".to_string(), None), Ok(()));
assert_eq!(ex.bind(&"q2".to_string(), None), Ok(()));
assert_eq!(ex.bind(&"q3".to_string(), None), Ok(()));

assert_eq!(ex.get_bind_count(), 3u32);
}
}

0 comments on commit 3e8b2fd

Please sign in to comment.