Skip to content

Commit 6d46784

Browse files
committed
feature: added management api for redis
1 parent 1b67424 commit 6d46784

File tree

10 files changed

+226
-180
lines changed

10 files changed

+226
-180
lines changed

broccoli-queue/Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ redis = { version = "0.27.5", features = [
3232
"aio",
3333
], optional = true }
3434
dashmap = "6.1.0"
35+
derive_more = { version = "2.0.1", features = ["display"], optional = true }
3536

3637
[dev-dependencies]
3738
chrono = { version = "0.4.39", features = ["serde"] }
@@ -52,7 +53,7 @@ rabbitmq = ["dep:lapin", "dep:deadpool", "dep:deadpool-lapin"]
5253
test-fairness = []
5354

5455
# Allows for access to the management API for the queue.
55-
management = []
56+
management = ["dep:derive_more"]
5657

5758
[[bench]]
5859
name = "queue_benchmark"

broccoli-queue/examples/consumer.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ async fn process_job(job: JobPayload) -> Result<(), BroccoliError> {
2323
// Simulate some work
2424
tokio::time::sleep(Duration::from_secs(1)).await;
2525

26-
Ok(())
26+
Err(BroccoliError::Consume("Failed to process job".into()))
2727
}
2828

2929
async fn success_handler(msg: JobPayload) -> Result<(), BroccoliError> {
+14-11
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,42 @@
1+
use derive_more::Display;
2+
13
use crate::error::BroccoliError;
24

35
use super::broker::Broker;
46

57
#[async_trait::async_trait]
68
/// Trait for managing queues.
79
pub trait QueueManagement {
8-
/// Retries all messages in the queue.
10+
/// Retries messages in the queue.
911
async fn retry_queue(
1012
&self,
11-
queue_name: &str,
12-
disambiguator: Option<String>,
13+
queue_name: String,
1314
source_type: QueueType,
1415
) -> Result<usize, BroccoliError>;
15-
/// Gets the size of the queue.
16-
async fn get_queue_size(
17-
&self,
18-
queue_name: &str,
19-
queue_type: QueueType,
20-
) -> Result<usize, BroccoliError>;
2116
/// Gets the status of specific or all queues. If `queue_name` is `None`, returns the status of all queues.
2217
async fn get_queue_status(
2318
&self,
24-
queue_name: Option<&str>,
19+
queue_name: Option<String>,
2520
) -> Result<Vec<QueueStatus>, BroccoliError>;
2621
}
2722

2823
pub(crate) trait BrokerWithManagement: Broker + QueueManagement {}
2924

30-
#[derive(Debug, Clone)]
25+
#[derive(Debug, Clone, PartialEq, Eq, Display)]
3126
/// Enum representing the type of queue.
3227
pub enum QueueType {
3328
/// Failed queue.
29+
#[display("failed")]
3430
Failed,
3531
/// Processing queue.
32+
#[display("processing")]
3633
Processing,
3734
/// Main queue.
35+
#[display("main")]
3836
Main,
37+
/// Fairness queue.
38+
#[display("fairness")]
39+
Fairness,
3940
}
4041

4142
#[derive(Debug, Clone)]
@@ -51,4 +52,6 @@ pub struct QueueStatus {
5152
pub processing: usize,
5253
/// Number of messages that failed to be processed.
5354
pub failed: usize,
55+
/// If the queue is a fairness queue, the number of disambiguators.
56+
pub disambiguator_count: Option<usize>,
5457
}

broccoli-queue/src/brokers/rabbitmq/management.rs

+12-34
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use lapin::{
2-
options::{BasicAckOptions, BasicGetOptions, BasicPublishOptions, QueueDeclareOptions},
3-
types::FieldTable,
2+
options::{BasicAckOptions, BasicGetOptions, BasicPublishOptions},
43
BasicProperties,
54
};
65

@@ -15,8 +14,7 @@ use super::RabbitMQBroker;
1514
impl QueueManagement for RabbitMQBroker {
1615
async fn retry_queue(
1716
&self,
18-
queue_name: &str,
19-
_disambiguator: Option<String>,
17+
queue_name: String,
2018
source_type: QueueType,
2119
) -> Result<usize, BroccoliError> {
2220
let pool = self.ensure_pool()?;
@@ -34,6 +32,11 @@ impl QueueManagement for RabbitMQBroker {
3432
"Cannot retry from ingestion queue".into(),
3533
))
3634
}
35+
QueueType::Fairness => {
36+
return Err(BroccoliError::InvalidOperation(
37+
"Cannot retry from fairness queue".into(),
38+
))
39+
}
3740
};
3841

3942
let mut count = 0;
@@ -44,7 +47,7 @@ impl QueueManagement for RabbitMQBroker {
4447
channel
4548
.basic_publish(
4649
"broccoli",
47-
queue_name,
50+
&queue_name,
4851
BasicPublishOptions::default(),
4952
&delivery.data,
5053
BasicProperties::default(),
@@ -60,35 +63,10 @@ impl QueueManagement for RabbitMQBroker {
6063
Ok(count)
6164
}
6265

63-
async fn get_queue_size(
66+
async fn get_queue_status(
6467
&self,
65-
queue_name: &str,
66-
queue_type: QueueType,
67-
) -> Result<usize, BroccoliError> {
68-
let pool = self.ensure_pool()?;
69-
let conn = pool
70-
.get()
71-
.await
72-
.map_err(|e| BroccoliError::Consume(format!("Failed to consume message: {e:?}")))?;
73-
let channel = conn.create_channel().await?;
74-
75-
let queue = match queue_type {
76-
QueueType::Failed => format!("{queue_name}_failed"),
77-
QueueType::Processing => format!("{queue_name}_processing"),
78-
QueueType::Main => queue_name.to_string(),
79-
};
80-
81-
let queue_info = channel
82-
.queue_declare(
83-
&queue,
84-
QueueDeclareOptions::default(),
85-
FieldTable::default(),
86-
)
87-
.await?;
88-
Ok(queue_info.message_count() as usize)
89-
}
90-
91-
async fn get_queue_status(&self) -> Result<Vec<QueueStatus>, BroccoliError> {
68+
queue_name: Option<String>,
69+
) -> Result<Vec<QueueStatus>, BroccoliError> {
9270
let pool = self.ensure_pool()?;
9371
let conn = pool
9472
.get()
@@ -98,7 +76,7 @@ impl QueueManagement for RabbitMQBroker {
9876

9977
// List queues through management API or channel operations
10078
// This is a simplified version - in practice you'd want to use the RabbitMQ Management API
101-
let mut statuses = Vec::new();
79+
let statuses = Vec::new();
10280

10381
// Implementation note: RabbitMQ doesn't provide memory usage through regular AMQP
10482
// You would need to use the HTTP Management API to get this information

broccoli-queue/src/brokers/redis/broker.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::{
1010

1111
use super::utils::OptionalInternalBrokerMessage;
1212

13-
pub(crate) type RedisPool = bb8_redis::bb8::Pool<bb8_redis::RedisConnectionManager>;
13+
pub type RedisPool = bb8_redis::bb8::Pool<bb8_redis::RedisConnectionManager>;
1414

1515
#[derive(Default)]
1616
/// A message broker implementation for Redis.

0 commit comments

Comments
 (0)