Skip to content

Commit 906e493

Browse files
committed
feature: make delays a config level feature rather than compile time
1 parent 42f8729 commit 906e493

File tree

13 files changed

+88
-101
lines changed

13 files changed

+88
-101
lines changed

.github/workflows/integ_tests.yaml

-10
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,6 @@ jobs:
1919
image: redis
2020
ports:
2121
- 6379:6379
22-
rabbitmq:
23-
image: rabbitmq:management
24-
ports:
25-
- 5672:5672
26-
- 15672:15672
2722

2823
steps:
2924
- uses: actions/checkout@v4
@@ -52,11 +47,6 @@ jobs:
5247
env:
5348
BROCCOLI_QUEUE_URL: redis://localhost:6379
5449

55-
- name: Run RabbitMQ tests
56-
run: cargo test --features rabbitmq
57-
env:
58-
BROCCOLI_QUEUE_URL: amqp://localhost:5672
59-
6050
- name: Run RabbitMQ tests with delay plugin
6151
run: |
6252
docker stop ${{ job.services.rabbitmq.id }}

Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ criterion = { version = "0.5", features = ["async_tokio"] }
4141
default = []
4242
redis = ["dep:bb8-redis", "dep:redis"]
4343
rabbitmq = ["dep:lapin", "dep:deadpool", "dep:deadpool-lapin"]
44-
rabbitmq-delay = []
4544

4645
[[bench]]
4746
name = "queue_benchmark"

examples/consumer.rs

+1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
4848
let queue = BroccoliQueue::builder(queue_url)
4949
.pool_connections(5)
5050
.failed_message_retry_strategy(Default::default())
51+
.enable_scheduling(true)
5152
.build()
5253
.await?;
5354

examples/publisher.rs

+1-8
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ async fn main() -> Result<(), BroccoliError> {
3030
let queue = BroccoliQueue::builder(queue_url)
3131
.failed_message_retry_strategy(Default::default())
3232
.pool_connections(5)
33+
.enable_scheduling(true)
3334
.build()
3435
.await?;
3536

@@ -64,15 +65,7 @@ async fn main() -> Result<(), BroccoliError> {
6465
"jobs",
6566
scheduled_jobs,
6667
Some(PublishOptions {
67-
#[cfg(any(
68-
feature = "redis",
69-
all(feature = "rabbitmq", feature = "rabbitmq-delay")
70-
))]
7168
delay: Some(Duration::seconds(10)),
72-
#[cfg(any(
73-
feature = "redis",
74-
all(feature = "rabbitmq", feature = "rabbitmq-delay")
75-
))]
7669
scheduled_at: None,
7770
ttl: None,
7871
priority: None,

run-tests.sh

+2-11
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,14 @@ run_redis_test() {
1515
BROCCOLI_QUEUE_URL=redis://localhost:6380 cargo test --features redis
1616
}
1717

18-
run_rabbitmq_delay_test() {
18+
run_rabbitmq_test() {
1919
echo "Starting RabbitMQ test with delay plugin..."
2020
docker build -f Dockerfile.rabbitmq -t rabbitmq-with-delays .
2121
docker run --name test-rabbit-mq -d -p 5672:5672 -p 15672:15672 rabbitmq-with-delays >/dev/null
2222
sleep 5
23-
BROCCOLI_QUEUE_URL=amqp://localhost:5672 cargo test --features rabbitmq,rabbitmq-delay
23+
BROCCOLI_QUEUE_URL=amqp://localhost:5672 cargo test --features rabbitmq
2424
}
2525

26-
run_rabbitmq_test() {
27-
echo "Starting RabbitMQ test..."
28-
docker run --name test-rabbit-mq -d -p 5672:5672 -p 15672:15672 rabbitmq:management >/dev/null
29-
sleep 5
30-
BROCCOLI_QUEUE_URL=amqp://localhost:5672 cargo test --features rabbitmq
31-
}
3226

3327
run_redis_bench() {
3428
echo "Starting Redis benchmark test..."
@@ -40,13 +34,10 @@ run_redis_bench() {
4034
case "$1" in
4135
"redis") run_redis_test ;;
4236
"rabbitmq") run_rabbitmq_test ;;
43-
"rabbitmq-delay") run_rabbitmq_delay_test ;;
4437
"redis-bench") run_redis_bench ;;
4538
*)
4639
run_redis_test
4740
cleanup
48-
run_rabbitmq_delay_test
49-
cleanup
5041
run_rabbitmq_test
5142
;;
5243
esac

src/brokers/broker.rs

+6
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,11 @@ pub struct BrokerConfig {
106106
pub retry_failed: Option<bool>,
107107
/// Number of connections to maintain in the connection pool
108108
pub pool_connections: Option<u8>,
109+
/// Whether to enable scheduling for messages
110+
///
111+
/// NOTE: If you enable this w/ rabbitmq, you will need to install the delayed-exchange plugin
112+
/// https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq
113+
pub enable_scheduling: Option<bool>,
109114
}
110115

111116
impl Default for BrokerConfig {
@@ -114,6 +119,7 @@ impl Default for BrokerConfig {
114119
retry_attempts: Some(3),
115120
retry_failed: Some(true),
116121
pool_connections: Some(10),
122+
enable_scheduling: Some(false),
117123
}
118124
}
119125
}

src/brokers/rabbitmq/broker.rs

+24-13
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ use lapin::{
1313
BasicProperties, Channel,
1414
};
1515

16-
#[cfg(feature = "rabbitmq-delay")]
1716
use time::OffsetDateTime;
1817

1918
use crate::{
@@ -108,22 +107,34 @@ impl Broker for RabbitMQBroker {
108107
properties = properties.with_expiration(ttl.whole_seconds().to_string().into());
109108
}
110109

111-
#[cfg(feature = "rabbitmq-delay")]
112110
if let Some(delay) = opts.delay {
113-
table.insert(
114-
"x-delay".to_string().into(),
115-
AMQPValue::LongLongInt(delay.whole_milliseconds() as i64),
116-
);
111+
if self
112+
.config
113+
.as_ref()
114+
.map(|c| c.enable_scheduling.unwrap_or(false))
115+
.unwrap_or(false)
116+
{
117+
table.insert(
118+
"x-delay".to_string().into(),
119+
AMQPValue::LongLongInt(delay.whole_milliseconds() as i64),
120+
);
121+
}
117122
}
118123

119-
#[cfg(feature = "rabbitmq-delay")]
120124
if let Some(schedule) = opts.scheduled_at {
121-
table.insert(
122-
"x-delay".to_string().into(),
123-
AMQPValue::LongLongInt(
124-
(schedule - OffsetDateTime::now_utc()).whole_milliseconds() as i64,
125-
),
126-
);
125+
if self
126+
.config
127+
.as_ref()
128+
.map(|c| c.enable_scheduling.unwrap_or(false))
129+
.unwrap_or(false)
130+
{
131+
table.insert(
132+
"x-delay".to_string().into(),
133+
AMQPValue::LongLongInt(
134+
(schedule - OffsetDateTime::now_utc()).whole_milliseconds() as i64,
135+
),
136+
);
137+
}
127138
}
128139
}
129140

src/brokers/rabbitmq/utils.rs

+15-9
Original file line numberDiff line numberDiff line change
@@ -40,19 +40,25 @@ impl RabbitMQBroker {
4040
) -> Result<(), BroccoliError> {
4141
#[allow(unused_mut)]
4242
let mut args = FieldTable::default();
43-
#[cfg(feature = "rabbitmq-delay")]
44-
args.insert(
45-
"x-delayed-type".into(),
46-
AMQPValue::LongString("direct".into()),
47-
);
43+
let exchange_kind = if self
44+
.config
45+
.as_ref()
46+
.map(|c| c.enable_scheduling.unwrap_or(false))
47+
.unwrap_or(false)
48+
{
49+
args.insert(
50+
"x-delayed-type".into(),
51+
AMQPValue::LongString("direct".into()),
52+
);
53+
lapin::ExchangeKind::Custom("x-delayed-message".into())
54+
} else {
55+
lapin::ExchangeKind::Direct
56+
};
4857

4958
channel
5059
.exchange_declare(
5160
exchange_name,
52-
#[cfg(feature = "rabbitmq-delay")]
53-
lapin::ExchangeKind::Custom("x-delayed-message".into()),
54-
#[cfg(not(feature = "rabbitmq-delay"))]
55-
lapin::ExchangeKind::Direct,
61+
exchange_kind,
5662
lapin::options::ExchangeDeclareOptions::default(),
5763
args.clone(),
5864
)

src/brokers/redis/broker.rs

+16-2
Original file line numberDiff line numberDiff line change
@@ -141,11 +141,25 @@ impl Broker for RedisBroker {
141141

142142
if let Some(ref publish_options) = publish_options {
143143
if let Some(delay) = publish_options.delay {
144-
score += (delay.as_seconds_f32() * 1_000_000_000.0) as i64;
144+
if self
145+
.config
146+
.as_ref()
147+
.map(|c| c.enable_scheduling.unwrap_or(false))
148+
.unwrap_or(false)
149+
{
150+
score += (delay.as_seconds_f32() * 1_000_000_000.0) as i64;
151+
}
145152
}
146153

147154
if let Some(timestamp) = publish_options.scheduled_at {
148-
score = timestamp.unix_timestamp_nanos() as i64;
155+
if self
156+
.config
157+
.as_ref()
158+
.map(|c| c.enable_scheduling.unwrap_or(false))
159+
.unwrap_or(false)
160+
{
161+
score = timestamp.unix_timestamp_nanos() as i64;
162+
}
149163
}
150164

151165
if let Some(ttl) = publish_options.ttl {

src/queue.rs

+22-44
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,6 @@ use std::{future::Future, sync::Arc, time::Instant};
33
use futures::stream::FuturesUnordered;
44
use time::Duration;
55

6-
#[cfg(any(
7-
feature = "redis",
8-
all(feature = "rabbitmq", feature = "rabbitmq-delay")
9-
))]
106
use time::OffsetDateTime;
117

128
use crate::{
@@ -89,6 +85,11 @@ pub struct BroccoliQueueBuilder {
8985
retry_failed: Option<bool>,
9086
/// Number of connections to maintain in the connection pool
9187
pool_connections: Option<u8>,
88+
/// Whether to enable message scheduling
89+
///
90+
/// NOTE: If you enable this w/ rabbitmq, you will need to install the delayed-exchange plugin
91+
/// https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq
92+
enable_scheduling: Option<bool>,
9293
}
9394

9495
impl BroccoliQueueBuilder {
@@ -105,6 +106,7 @@ impl BroccoliQueueBuilder {
105106
retry_attempts: None,
106107
retry_failed: None,
107108
pool_connections: None,
109+
enable_scheduling: None,
108110
}
109111
}
110112

@@ -133,6 +135,21 @@ impl BroccoliQueueBuilder {
133135
self
134136
}
135137

138+
/// Enables or disables message scheduling.
139+
///
140+
/// NOTE: If you enable this w/ rabbitmq, you will need to install the delayed-exchange plugin
141+
/// https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq
142+
///
143+
/// # Arguments
144+
/// * `enable_scheduling` - If true, message scheduling will be enabled.
145+
///
146+
/// # Returns
147+
/// The updated `BroccoliQueueBuilder` instance.
148+
pub fn enable_scheduling(mut self, enable_scheduling: bool) -> Self {
149+
self.enable_scheduling = Some(enable_scheduling);
150+
self
151+
}
152+
136153
/// Builds the `BroccoliQueue` with the specified configuration.
137154
///
138155
/// # Returns
@@ -142,6 +159,7 @@ impl BroccoliQueueBuilder {
142159
retry_attempts: self.retry_attempts,
143160
retry_failed: self.retry_failed,
144161
pool_connections: self.pool_connections,
162+
enable_scheduling: self.enable_scheduling,
145163
};
146164

147165
let broker = connect_to_broker(&self.broker_url, Some(config))
@@ -209,16 +227,8 @@ pub struct PublishOptions {
209227
pub ttl: Option<Duration>,
210228
/// Message priority level. This is a number between 1 and 5, where 5 is the lowest priority and 1 is the highest.
211229
pub priority: Option<u8>,
212-
#[cfg(any(
213-
feature = "redis",
214-
all(feature = "rabbitmq", feature = "rabbitmq-delay")
215-
))]
216230
/// Delay before the message is published
217231
pub delay: Option<Duration>,
218-
#[cfg(any(
219-
feature = "redis",
220-
all(feature = "rabbitmq", feature = "rabbitmq-delay")
221-
))]
222232
/// Scheduled time for the message to be published
223233
pub scheduled_at: Option<OffsetDateTime>,
224234
}
@@ -235,15 +245,7 @@ impl PublishOptions {
235245
pub struct PublishOptionsBuilder {
236246
ttl: Option<Duration>,
237247
priority: Option<u8>,
238-
#[cfg(any(
239-
feature = "redis",
240-
all(feature = "rabbitmq", feature = "rabbitmq-delay")
241-
))]
242248
delay: Option<Duration>,
243-
#[cfg(any(
244-
feature = "redis",
245-
all(feature = "rabbitmq", feature = "rabbitmq-delay")
246-
))]
247249
scheduled_at: Option<OffsetDateTime>,
248250
}
249251

@@ -253,15 +255,7 @@ impl PublishOptionsBuilder {
253255
Self {
254256
ttl: None,
255257
priority: None,
256-
#[cfg(any(
257-
feature = "redis",
258-
all(feature = "rabbitmq", feature = "rabbitmq-delay")
259-
))]
260258
delay: None,
261-
#[cfg(any(
262-
feature = "redis",
263-
all(feature = "rabbitmq", feature = "rabbitmq-delay")
264-
))]
265259
scheduled_at: None,
266260
}
267261
}
@@ -273,20 +267,12 @@ impl PublishOptionsBuilder {
273267
}
274268

275269
/// Sets a delay before the message is published.
276-
#[cfg(any(
277-
feature = "redis",
278-
all(feature = "rabbitmq", feature = "rabbitmq-delay")
279-
))]
280270
pub fn delay(mut self, duration: Duration) -> Self {
281271
self.delay = Some(duration);
282272
self
283273
}
284274

285275
/// Sets a specific time for the message to be published.
286-
#[cfg(any(
287-
feature = "redis",
288-
all(feature = "rabbitmq", feature = "rabbitmq-delay")
289-
))]
290276
pub fn schedule_at(mut self, time: OffsetDateTime) -> Self {
291277
self.scheduled_at = Some(time);
292278
self
@@ -307,15 +293,7 @@ impl PublishOptionsBuilder {
307293
PublishOptions {
308294
ttl: self.ttl,
309295
priority: self.priority,
310-
#[cfg(any(
311-
feature = "redis",
312-
all(feature = "rabbitmq", feature = "rabbitmq-delay")
313-
))]
314296
delay: self.delay,
315-
#[cfg(any(
316-
feature = "redis",
317-
all(feature = "rabbitmq", feature = "rabbitmq-delay")
318-
))]
319297
scheduled_at: self.scheduled_at,
320298
}
321299
}

tests/common/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ pub async fn setup_queue() -> BroccoliQueue {
44
let queue_url = std::env::var("BROCCOLI_QUEUE_URL").unwrap();
55
BroccoliQueue::builder(queue_url)
66
.pool_connections(5)
7+
.enable_scheduling(true)
78
.build()
89
.await
910
.expect("Queue setup failed. Are you sure Redis/RabbitMQ is running?")

0 commit comments

Comments
 (0)