Skip to content

Commit

Permalink
feat(router): upgrade to futures 0.3 and async/await
Browse files Browse the repository at this point in the history
  • Loading branch information
gakonst committed Jan 23, 2020
1 parent ced438b commit 08f45dc
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 63 deletions.
30 changes: 29 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/interledger-router/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ edition = "2018"
repository = "https://github.com/interledger-rs/interledger-rs"

[dependencies]
futures = { version = "0.1.29", default-features = false }
interledger-packet = { path = "../interledger-packet", version = "^0.4.0", default-features = false }
interledger-service = { path = "../interledger-service", version = "^0.4.0", default-features = false }
log = { version = "0.4.8", default-features = false }
parking_lot = { version = "0.9.0", default-features = false }
uuid = { version = "0.8.1", default-features = false, features = ["v4"]}
async-trait = "0.1.22"

[dev-dependencies]
lazy_static = { version = "1.4.0", default-features = false }
tokio = { version = "0.2.6", features = ["rt-core", "macros"]}
111 changes: 50 additions & 61 deletions crates/interledger-router/src/router.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::RouterStore;
use futures::{future::err, Future};
use async_trait::async_trait;
use interledger_packet::{ErrorCode, RejectBuilder};
use interledger_service::*;
use log::{error, trace};
Expand Down Expand Up @@ -38,19 +38,18 @@ where
}
}

#[async_trait]
impl<S, O> IncomingService<S::Account> for Router<S, O>
where
S: AddressStore + RouterStore,
O: OutgoingService<S::Account> + Clone + Send + 'static,
{
type Future = BoxedIlpFuture;

/// Figures out the next node to pass the received Prepare packet to.
///
/// Firstly, it checks if there is a direct path for that account and uses that.
/// If not it scans through the routing table and checks if the route prefix matches
/// the prepare packet's destination or if it's a catch-all address (i.e. empty prefix)
fn handle_request(&mut self, request: IncomingRequest<S::Account>) -> Self::Future {
async fn handle_request(&mut self, request: IncomingRequest<S::Account>) -> IlpResult {
let destination = request.prepare.destination();
let mut next_hop = None;
let routing_table = self.store.routing_table();
Expand Down Expand Up @@ -92,24 +91,22 @@ where

if let Some(account_id) = next_hop {
let mut next = self.next.clone();
Box::new(
self.store
.get_accounts(vec![account_id])
.map_err(move |_| {
error!("No record found for account: {}", account_id);
RejectBuilder {
code: ErrorCode::F02_UNREACHABLE,
message: &[],
triggered_by: Some(&ilp_address),
data: &[],
}
.build()
})
.and_then(move |mut accounts| {
let request = request.into_outgoing(accounts.remove(0));
next.send_request(request)
}),
)
match self.store.get_accounts(vec![account_id]).await {
Ok(mut accounts) => {
let request = request.into_outgoing(accounts.remove(0));
next.send_request(request).await
}
Err(_) => {
error!("No record found for account: {}", account_id);
Err(RejectBuilder {
code: ErrorCode::F02_UNREACHABLE,
message: &[],
triggered_by: Some(&ilp_address),
data: &[],
}
.build())
}
}
} else {
error!(
"No route found for request {}: {:?}",
Expand All @@ -129,21 +126,20 @@ where
},
request
);
Box::new(err(RejectBuilder {
Err(RejectBuilder {
code: ErrorCode::F02_UNREACHABLE,
message: &[],
triggered_by: Some(&ilp_address),
data: &[],
}
.build()))
.build())
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use futures::future::ok;
use interledger_packet::{Address, FulfillBuilder, PrepareBuilder};
use interledger_service::outgoing_service_fn;
use lazy_static::lazy_static;
Expand Down Expand Up @@ -190,36 +186,29 @@ mod tests {
routes: HashMap<String, Uuid>,
}

#[async_trait]
impl AccountStore for TestStore {
type Account = TestAccount;

fn get_accounts(
&self,
account_ids: Vec<Uuid>,
) -> Box<dyn Future<Item = Vec<TestAccount>, Error = ()> + Send> {
Box::new(ok(account_ids.into_iter().map(TestAccount).collect()))
async fn get_accounts(&self, account_ids: Vec<Uuid>) -> Result<Vec<TestAccount>, ()> {
Ok(account_ids.into_iter().map(TestAccount).collect())
}

// stub implementation (not used in these tests)
fn get_account_id_from_username(
&self,
_username: &Username,
) -> Box<dyn Future<Item = Uuid, Error = ()> + Send> {
Box::new(ok(Uuid::new_v4()))
async fn get_account_id_from_username(&self, _username: &Username) -> Result<Uuid, ()> {
Ok(Uuid::new_v4())
}
}

#[async_trait]
impl AddressStore for TestStore {
/// Saves the ILP Address in the store's memory and database
fn set_ilp_address(
&self,
_ilp_address: Address,
) -> Box<dyn Future<Item = (), Error = ()> + Send> {
unimplemented!()
async fn set_ilp_address(&self, _ilp_address: Address) -> Result<(), ()> {
Ok(())
}

fn clear_ilp_address(&self) -> Box<dyn Future<Item = (), Error = ()> + Send> {
unimplemented!()
async fn clear_ilp_address(&self) -> Result<(), ()> {
Ok(())
}

/// Get's the store's ilp address from memory
Expand All @@ -234,8 +223,8 @@ mod tests {
}
}

#[test]
fn empty_routing_table() {
#[tokio::test]
async fn empty_routing_table() {
let mut router = Router::new(
TestStore {
routes: HashMap::new(),
Expand All @@ -261,12 +250,12 @@ mod tests {
}
.build(),
})
.wait();
.await;
assert!(result.is_err());
}

#[test]
fn no_route() {
#[tokio::test]
async fn no_route() {
let mut router = Router::new(
TestStore {
routes: HashMap::from_iter(
Expand Down Expand Up @@ -294,12 +283,12 @@ mod tests {
}
.build(),
})
.wait();
.await;
assert!(result.is_err());
}

#[test]
fn finds_exact_route() {
#[tokio::test]
async fn finds_exact_route() {
let mut router = Router::new(
TestStore {
routes: HashMap::from_iter(
Expand Down Expand Up @@ -327,12 +316,12 @@ mod tests {
}
.build(),
})
.wait();
.await;
assert!(result.is_ok());
}

#[test]
fn catch_all_route() {
#[tokio::test]
async fn catch_all_route() {
let mut router = Router::new(
TestStore {
routes: HashMap::from_iter(vec![(String::new(), Uuid::new_v4())].into_iter()),
Expand All @@ -358,12 +347,12 @@ mod tests {
}
.build(),
})
.wait();
.await;
assert!(result.is_ok());
}

#[test]
fn finds_matching_prefix() {
#[tokio::test]
async fn finds_matching_prefix() {
let mut router = Router::new(
TestStore {
routes: HashMap::from_iter(
Expand Down Expand Up @@ -391,12 +380,12 @@ mod tests {
}
.build(),
})
.wait();
.await;
assert!(result.is_ok());
}

#[test]
fn finds_longest_matching_prefix() {
#[tokio::test]
async fn finds_longest_matching_prefix() {
let id0 = Uuid::from_slice(&[0; 16]).unwrap();
let id1 = Uuid::from_slice(&[1; 16]).unwrap();
let id2 = Uuid::from_slice(&[2; 16]).unwrap();
Expand All @@ -414,7 +403,7 @@ mod tests {
),
},
outgoing_service_fn(move |request: OutgoingRequest<TestAccount>| {
*to_clone.lock() = Some(request.to.clone());
*to_clone.lock() = Some(request.to);

Ok(FulfillBuilder {
fulfillment: &[0; 32],
Expand All @@ -436,7 +425,7 @@ mod tests {
}
.build(),
})
.wait();
.await;
assert!(result.is_ok());
assert_eq!(to.lock().take().unwrap().0, id2);
}
Expand Down

0 comments on commit 08f45dc

Please sign in to comment.