Skip to content

Commit 9a3749f

Browse files
author
Daniel Giribet Farre
committed
Merge commit '8a62a8f500a94d074ab300765104335bcaa1b124' into danielfarre/eng-386-implement-worker-queue-backed-by-surrealdb
# Conflicts: # src/brokers/surrealdb/utils.rs
2 parents 5775e01 + 8a62a8f commit 9a3749f

File tree

8 files changed

+155
-163
lines changed

8 files changed

+155
-163
lines changed

src/brokers/connect.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,9 @@ pub async fn connect_to_broker(
8484
RabbitMQBroker::new_with_config(config)
8585
})),
8686
#[cfg(feature = "surrealdb")]
87-
BrokerType::SurrealDB => Box::new(match config {
88-
Some(config) => SurrealDBBroker::new_with_config(config),
89-
None => SurrealDBBroker::new(),
90-
}),
87+
BrokerType::SurrealDB => Box::new(config.map_or_else(SurrealDBBroker::new, |config| {
88+
SurrealDBBroker::new_with_config(config)
89+
})),
9190
};
9291

9392
broker.connect(broker_url).await?;

src/brokers/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,6 @@ pub mod rabbitmq;
88
/// Contains the Redis broker implementation
99
#[cfg(feature = "redis")]
1010
pub mod redis;
11-
/// Contains the SurrealDB broker implementation
11+
/// Contains the `SurrealDB` broker implementation
1212
#[cfg(feature = "surrealdb")]
1313
pub mod surrealdb;

src/brokers/surrealdb/broker.rs

+16-21
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use time::Duration;
1313

1414
use super::utils;
1515

16-
/// SurrealDB state struct
16+
/// `SurrealDB` state struct
1717
pub struct SurrealDBBroker {
1818
pub(crate) db: Option<Surreal<Any>>,
1919
pub(crate) connected: bool,
@@ -22,7 +22,7 @@ pub struct SurrealDBBroker {
2222

2323
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2424
pub(crate) struct InternalSurrealDBBrokerMessage {
25-
/// Actual record id in surrealDB (topicname,task_id)
25+
/// Actual record id in surrealDB (`topicname,task_id`)
2626
pub id: RecordId,
2727
/// Unique identifier for the message (external version, without table name)
2828
pub task_id: String,
@@ -86,7 +86,7 @@ impl Broker for SurrealDBBroker {
8686
///
8787
///
8888
async fn connect(&mut self, broker_url: &str) -> Result<(), BroccoliError> {
89-
let db = SurrealDBBroker::client_from_url(broker_url).await?;
89+
let db = Self::client_from_url(broker_url).await?;
9090
self.db = db;
9191
self.connected = true;
9292
Ok(())
@@ -115,7 +115,7 @@ impl Broker for SurrealDBBroker {
115115
}
116116

117117
let config = self.config.clone().unwrap_or_default();
118-
let priority = publish_options.priority.unwrap_or(5) as i64;
118+
let priority = i64::from(publish_options.priority.unwrap_or(5));
119119
if !(1..=5).contains(&priority) {
120120
return Err(BroccoliError::Broker(
121121
"Priority must be between 1 and 5".to_string(),
@@ -136,7 +136,7 @@ impl Broker for SurrealDBBroker {
136136
for msg in messages {
137137
// 1: insert actual message //
138138
let inserted =
139-
utils::add_message(&db, queue_name, &msg, "Could not publish (add msg)").await?;
139+
utils::add_message(&db, queue_name, msg, "Could not publish (add msg)").await?;
140140
published.push(inserted);
141141

142142
// 2: add to queue //
@@ -160,7 +160,7 @@ impl Broker for SurrealDBBroker {
160160
when,
161161
"Could not publish scheduled (enqueue)",
162162
)
163-
.await?
163+
.await?;
164164
} else if let Some(when) = delay {
165165
utils::add_to_queue_delayed(
166166
&db,
@@ -170,7 +170,7 @@ impl Broker for SurrealDBBroker {
170170
when,
171171
"Could not publish delayed (enqueue)",
172172
)
173-
.await?
173+
.await?;
174174
}
175175
}
176176
}
@@ -254,7 +254,7 @@ impl Broker for SurrealDBBroker {
254254
options: Option<ConsumeOptions>,
255255
) -> Result<InternalBrokerMessage, BroccoliError> {
256256
// first of all, we try to consume without blocking, and return if we have messages
257-
let resp = Self::try_consume(&self, queue_name, options).await?;
257+
let resp = Self::try_consume(self, queue_name, options).await?;
258258
if let Some(message) = resp {
259259
return Ok(message);
260260
}
@@ -266,11 +266,11 @@ impl Broker for SurrealDBBroker {
266266
.select(queue_table)
267267
.range(
268268
vec![Value::default(), Value::default()] // note default is 'None'
269-
..vec![Value::from_str("time::now()").unwrap_or(Value::default()),Value::default()],
269+
..vec![Value::from_str("time::now()").unwrap_or_default(),Value::default()],
270270
) // should notify when future becomes present
271271
.live()
272272
.await
273-
.map_err(|err| BroccoliError::Broker(format!("Could not consume: {:?}", err)))?;
273+
.map_err(|err| BroccoliError::Broker(format!("Could not consume: {err:?}")))?;
274274
let mut queued_message: Result<InternalSurrealDBBrokerQueuedMessageRecord, BroccoliError> =
275275
Err(BroccoliError::NotImplemented);
276276
while let Some(notification) = futures::StreamExt::next(&mut stream).await {
@@ -287,8 +287,7 @@ impl Broker for SurrealDBBroker {
287287
}
288288
}
289289
Err(error) => Some(Err(BroccoliError::Broker(format!(
290-
"Could not consume::'{}' {}",
291-
queue_name, error
290+
"Could not consume::'{queue_name}' {error}"
292291
)))),
293292
};
294293
if let Some(message) = payload {
@@ -375,13 +374,11 @@ impl Broker for SurrealDBBroker {
375374
>= self
376375
.config
377376
.as_ref()
378-
.map(|config| config.retry_attempts.unwrap_or(3))
379-
.unwrap_or(3))
377+
.map_or(3, |config| config.retry_attempts.unwrap_or(3)))
380378
|| !self
381379
.config
382380
.as_ref()
383-
.map(|config| config.retry_failed.unwrap_or(true))
384-
.unwrap_or(true)
381+
.map_or(true, |config| config.retry_failed.unwrap_or(true))
385382
{
386383
let msg = utils::get_message(
387384
&db,
@@ -411,8 +408,7 @@ impl Broker for SurrealDBBroker {
411408
if self
412409
.config
413410
.as_ref()
414-
.map(|config| config.retry_failed.unwrap_or(true))
415-
.unwrap_or(true)
411+
.map_or(true, |config| config.retry_failed.unwrap_or(true))
416412
{
417413
//// 4: if retry is configured, we increase attempts ////
418414
let mut message = message;
@@ -429,7 +425,7 @@ impl Broker for SurrealDBBroker {
429425
priority,
430426
"Could not reject (reenqueue)",
431427
)
432-
.await?
428+
.await?;
433429
}
434430

435431
Ok(())
@@ -467,8 +463,7 @@ impl Broker for SurrealDBBroker {
467463
.await
468464
}
469465
None => Err(BroccoliError::Broker(format!(
470-
"Could not cancel (task_id not found):{}:{}",
471-
queue_name, task_id
466+
"Could not cancel (task_id not found):{queue_name}:{task_id}"
472467
))),
473468
}
474469
}

src/brokers/surrealdb/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
/// Contains the SurrealDB Broker implementation
1+
/// Contains the `SurrealDB` Broker implementation
22
pub mod broker;
33

4-
/// Utility functions for the SurrealDB Broker
4+
/// Utility functions for the `SurrealDB` Broker
55
pub(crate) mod utils;
66

77
pub use broker::SurrealDBBroker;

0 commit comments

Comments
 (0)