Skip to content

Commit c7995db

Browse files
committed
AMQP benchmarks
1 parent aa0f1ab commit c7995db

File tree

3 files changed

+131
-1
lines changed

3 files changed

+131
-1
lines changed

Cargo.toml

+5-1
Original file line numberDiff line numberDiff line change
@@ -43,5 +43,9 @@ rabbitmq = ["dep:lapin", "dep:deadpool", "dep:deadpool-lapin"]
4343
test-fairness = []
4444

4545
[[bench]]
46-
name = "queue_benchmark"
46+
name = "amqp_benchmark"
47+
harness = false
48+
49+
[[bench]]
50+
name = "redis_benchmark"
4751
harness = false

benches/amqp_benchmark.rs

+126
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
use broccoli_queue::queue::BroccoliQueue;
2+
use criterion::{criterion_group, criterion_main, Criterion};
3+
use lapin::{options::*, BasicProperties, Connection, ConnectionProperties, Result};
4+
use serde::{Deserialize, Serialize};
5+
use std::time::Instant;
6+
use tokio::runtime::Runtime;
7+
8+
#[derive(Debug, Clone, Serialize, Deserialize)]
9+
struct BenchmarkMessage {
10+
id: String,
11+
data: String,
12+
timestamp: i64,
13+
}
14+
const ADDR: &str = "amqp://localhost:5672/";
15+
16+
async fn setup_amqp() -> Result<Connection> {
17+
Connection::connect(&ADDR, ConnectionProperties::default()).await
18+
}
19+
20+
async fn setup_broccoli() -> BroccoliQueue {
21+
BroccoliQueue::builder(ADDR)
22+
.pool_connections(10)
23+
.build()
24+
.await
25+
.unwrap()
26+
}
27+
28+
async fn benchmark_raw_amqp_throughput(conn: &mut Connection, message_count: usize) -> (f64, f64) {
29+
let queue_name = "bench_raw_amqp";
30+
let now = Instant::now();
31+
32+
// Generate test messages
33+
let messages: Vec<BenchmarkMessage> = (0..message_count)
34+
.map(|i| BenchmarkMessage {
35+
id: i.to_string(),
36+
data: format!("test data {}", i),
37+
timestamp: time::OffsetDateTime::now_utc().unix_timestamp(),
38+
})
39+
.collect();
40+
let ch = conn.create_channel().await.unwrap();
41+
// Publish messages
42+
for msg in &messages {
43+
let payload = serde_json::to_vec(msg).unwrap();
44+
ch.basic_publish(
45+
"",
46+
queue_name,
47+
BasicPublishOptions::default(),
48+
&payload,
49+
BasicProperties::default(),
50+
)
51+
.await
52+
.expect("publish failed");
53+
}
54+
55+
let ch = conn.create_channel().await.unwrap();
56+
for _ in 0..message_count {
57+
let _ = ch.basic_get(queue_name, BasicGetOptions::default());
58+
}
59+
60+
let total_time = now.elapsed().as_secs_f64();
61+
let throughput = message_count as f64 / total_time;
62+
let avg_latency = total_time / message_count as f64;
63+
64+
(throughput, avg_latency)
65+
}
66+
67+
async fn benchmark_broccoli_throughput(queue: &BroccoliQueue, message_count: usize) -> (f64, f64) {
68+
let queue_name = "bench_broccoli";
69+
let now = Instant::now();
70+
71+
// Generate test messages
72+
let messages: Vec<BenchmarkMessage> = (0..message_count)
73+
.map(|i| BenchmarkMessage {
74+
id: i.to_string(),
75+
data: format!("test data {}", i),
76+
timestamp: time::OffsetDateTime::now_utc().unix_timestamp(),
77+
})
78+
.collect();
79+
80+
// Publish messages
81+
for msg in &messages {
82+
queue.publish(queue_name, None, msg, None).await.unwrap();
83+
}
84+
85+
// Consume messages
86+
for _ in 0..message_count {
87+
let msg = queue
88+
.consume::<BenchmarkMessage>(queue_name, None)
89+
.await
90+
.unwrap();
91+
queue.acknowledge(queue_name, msg).await.unwrap();
92+
}
93+
94+
let total_time = now.elapsed().as_secs_f64();
95+
let throughput = message_count as f64 / total_time;
96+
let avg_latency = total_time / message_count as f64;
97+
98+
(throughput, avg_latency)
99+
}
100+
101+
fn criterion_benchmark(c: &mut Criterion) {
102+
let rt = Runtime::new().unwrap();
103+
104+
let mut group = c.benchmark_group("Queue Performance");
105+
let message_counts = [100, 1000, 10000];
106+
107+
let mut conn = rt.block_on(setup_amqp()).unwrap();
108+
let broccoli_queue = rt.block_on(setup_broccoli());
109+
110+
for &count in &message_counts {
111+
group.bench_function(format!("Raw AMQP {}", count), |b| {
112+
b.iter(|| rt.block_on(async { benchmark_raw_amqp_throughput(&mut conn, count).await }))
113+
});
114+
115+
group.bench_function(format!("Broccoli AMQP {}", count), |b| {
116+
b.iter(|| {
117+
rt.block_on(async { benchmark_broccoli_throughput(&broccoli_queue, count).await })
118+
})
119+
});
120+
}
121+
122+
group.finish();
123+
}
124+
125+
criterion_group!(benches, criterion_benchmark);
126+
criterion_main!(benches);
File renamed without changes.

0 commit comments

Comments
 (0)