Skip to content

Commit

Permalink
rpc: fixed and expanded unit tests using fixtures
Browse files Browse the repository at this point in the history
Use the versioned path to the fixture files,
and add a copy of tests for v0_37 which was previously not covered.
  • Loading branch information
mzabaluev committed Feb 27, 2023
1 parent 295ef2a commit 7fedc09
Show file tree
Hide file tree
Showing 3 changed files with 314 additions and 119 deletions.
65 changes: 56 additions & 9 deletions rpc/src/client/transport/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,12 @@ mod test {
use super::*;
use crate::query::EventType;

async fn read_json_fixture(name: &str) -> String {
async fn read_json_fixture(version: &str, name: &str) -> String {
fs::read_to_string(
PathBuf::from("./tests/kvstore_fixtures/incoming/").join(name.to_owned() + ".json"),
PathBuf::from("./tests/kvstore_fixtures")
.join(version)
.join("incoming")
.join(name.to_owned() + ".json"),
)
.await
.unwrap()
Expand All @@ -265,14 +268,15 @@ mod test {
use crate::event::DialectEvent;

async fn read_event(name: &str) -> Event {
let msg = DialectEvent::<RpcEvent>::from_string(read_json_fixture(name).await).unwrap();
let msg = DialectEvent::<RpcEvent>::from_string(read_json_fixture("v0_34", name).await)
.unwrap();
msg.into()
}

#[tokio::test]
async fn mock_client() {
let abci_info_fixture = read_json_fixture("abci_info").await;
let block_fixture = read_json_fixture("block_at_height_10").await;
let abci_info_fixture = read_json_fixture("v0_34", "abci_info").await;
let block_fixture = read_json_fixture("v0_34", "block_at_height_10").await;
let matcher = MockRequestMethodMatcher::default()
.map(Method::AbciInfo, Ok(abci_info_fixture))
.map(Method::Block, Ok(block_fixture));
Expand Down Expand Up @@ -330,19 +334,27 @@ mod test {

mod v0_37 {
use super::*;
use crate::dialect::v0_37::Event as RpcEvent;
use crate::event::DialectEvent;

async fn read_event(name: &str) -> Event {
let msg = DialectEvent::<RpcEvent>::from_string(read_json_fixture("v0_37", name).await)
.unwrap();
msg.into()
}

#[tokio::test]
async fn mock_client() {
let abci_info_fixture = read_json_fixture("abci_info").await;
let block_fixture = read_json_fixture("block_at_height_10").await;
let abci_info_fixture = read_json_fixture("v0_37", "abci_info").await;
let block_fixture = read_json_fixture("v0_37", "block_at_height_10").await;
let matcher = MockRequestMethodMatcher::default()
.map(Method::AbciInfo, Ok(abci_info_fixture))
.map(Method::Block, Ok(block_fixture));
let (client, driver) = MockClient::new(matcher);
let driver_hdl = tokio::spawn(async move { driver.run().await });

let abci_info = client.abci_info().await.unwrap();
assert_eq!("{\"size\":0}".to_string(), abci_info.data);
assert_eq!("{\"size\":9}".to_string(), abci_info.data);

let block = client.block(Height::from(10_u32)).await.unwrap().block;
assert_eq!(Height::from(10_u32), block.header.height);
Expand All @@ -352,6 +364,41 @@ mod test {
driver_hdl.await.unwrap().unwrap();
}

// TODO: add mock_subscription_client test for v0_37
#[tokio::test]
async fn mock_subscription_client() {
let (client, driver) = MockClient::new(MockRequestMethodMatcher::default());
let driver_hdl = tokio::spawn(async move { driver.run().await });

let event1 = read_event("subscribe_newblock_0").await;
let event2 = read_event("subscribe_newblock_1").await;
let event3 = read_event("subscribe_newblock_2").await;
let events = vec![event1, event2, event3];

let subs1 = client.subscribe(EventType::NewBlock.into()).await.unwrap();
let subs2 = client.subscribe(EventType::NewBlock.into()).await.unwrap();
assert_ne!(subs1.id().to_string(), subs2.id().to_string());

// We can do this because the underlying channels can buffer the
// messages as we publish them.
let subs1_events = subs1.take(3);
let subs2_events = subs2.take(3);
for ev in &events {
client.publish(ev);
}

// Here each subscription's channel is drained.
let subs1_events = subs1_events.collect::<Vec<Result<Event, Error>>>().await;
let subs2_events = subs2_events.collect::<Vec<Result<Event, Error>>>().await;

assert_eq!(3, subs1_events.len());
assert_eq!(3, subs2_events.len());

for i in 0..3 {
assert!(events[i].eq(subs1_events[i].as_ref().unwrap()));
}

client.close();
driver_hdl.await.unwrap().unwrap();
}
}
}
145 changes: 101 additions & 44 deletions rpc/src/client/transport/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,25 +147,18 @@ mod test {
event::{Event, WrappedEvent},
utils::uuid_str,
};
// TODO: add fixtures for dialect::v0_37
use crate::dialect::v0_34::Event as RpcEvent;

async fn read_json_fixture(name: &str) -> String {
async fn read_json_fixture(version: &str, name: &str) -> String {
fs::read_to_string(
PathBuf::from("./tests/kvstore_fixtures/incoming/").join(name.to_owned() + ".json"),
PathBuf::from("./tests/kvstore_fixtures")
.join(version)
.join("incoming")
.join(name.to_owned() + ".json"),
)
.await
.unwrap()
}

async fn read_event(name: &str) -> Event {
serde_json::from_str::<WrappedEvent<RpcEvent>>(read_json_fixture(name).await.as_str())
.unwrap()
.into_result()
.unwrap()
.into()
}

async fn must_recv<T>(ch: &mut ChannelRx<T>, timeout_ms: u64) -> T {
let delay = time::sleep(Duration::from_millis(timeout_ms));
tokio::select! {
Expand All @@ -185,37 +178,101 @@ mod test {
}
}

#[tokio::test]
async fn router_basic_pub_sub() {
let mut router = SubscriptionRouter::default();

let (subs1_id, subs2_id, subs3_id) = (uuid_str(), uuid_str(), uuid_str());
let (subs1_event_tx, mut subs1_event_rx) = unbounded();
let (subs2_event_tx, mut subs2_event_rx) = unbounded();
let (subs3_event_tx, mut subs3_event_rx) = unbounded();

// Two subscriptions with the same query
router.add(subs1_id, "query1", subs1_event_tx);
router.add(subs2_id, "query1", subs2_event_tx);
// Another subscription with a different query
router.add(subs3_id, "query2", subs3_event_tx);

let mut ev = read_event("subscribe_newblock_0").await;
ev.query = "query1".into();
router.publish_event(ev.clone());

let subs1_ev = must_recv(&mut subs1_event_rx, 500).await.unwrap();
let subs2_ev = must_recv(&mut subs2_event_rx, 500).await.unwrap();
must_not_recv(&mut subs3_event_rx, 50).await;
assert_eq!(ev, subs1_ev);
assert_eq!(ev, subs2_ev);

ev.query = "query2".into();
router.publish_event(ev.clone());

must_not_recv(&mut subs1_event_rx, 50).await;
must_not_recv(&mut subs2_event_rx, 50).await;
let subs3_ev = must_recv(&mut subs3_event_rx, 500).await.unwrap();
assert_eq!(ev, subs3_ev);
mod v0_34 {
use super::*;
use crate::dialect::v0_34::Event as RpcEvent;

async fn read_event(name: &str) -> Event {
serde_json::from_str::<WrappedEvent<RpcEvent>>(
read_json_fixture("v0_34", name).await.as_str(),
)
.unwrap()
.into_result()
.unwrap()
.into()
}

#[tokio::test]
async fn router_basic_pub_sub() {
let mut router = SubscriptionRouter::default();

let (subs1_id, subs2_id, subs3_id) = (uuid_str(), uuid_str(), uuid_str());
let (subs1_event_tx, mut subs1_event_rx) = unbounded();
let (subs2_event_tx, mut subs2_event_rx) = unbounded();
let (subs3_event_tx, mut subs3_event_rx) = unbounded();

// Two subscriptions with the same query
router.add(subs1_id, "query1", subs1_event_tx);
router.add(subs2_id, "query1", subs2_event_tx);
// Another subscription with a different query
router.add(subs3_id, "query2", subs3_event_tx);

let mut ev = read_event("subscribe_newblock_0").await;
ev.query = "query1".into();
router.publish_event(ev.clone());

let subs1_ev = must_recv(&mut subs1_event_rx, 500).await.unwrap();
let subs2_ev = must_recv(&mut subs2_event_rx, 500).await.unwrap();
must_not_recv(&mut subs3_event_rx, 50).await;
assert_eq!(ev, subs1_ev);
assert_eq!(ev, subs2_ev);

ev.query = "query2".into();
router.publish_event(ev.clone());

must_not_recv(&mut subs1_event_rx, 50).await;
must_not_recv(&mut subs2_event_rx, 50).await;
let subs3_ev = must_recv(&mut subs3_event_rx, 500).await.unwrap();
assert_eq!(ev, subs3_ev);
}
}

mod v0_37 {
use super::*;
use crate::dialect::v0_37::Event as RpcEvent;

async fn read_event(name: &str) -> Event {
serde_json::from_str::<WrappedEvent<RpcEvent>>(
read_json_fixture("v0_37", name).await.as_str(),
)
.unwrap()
.into_result()
.unwrap()
.into()
}

#[tokio::test]
async fn router_basic_pub_sub() {
let mut router = SubscriptionRouter::default();

let (subs1_id, subs2_id, subs3_id) = (uuid_str(), uuid_str(), uuid_str());
let (subs1_event_tx, mut subs1_event_rx) = unbounded();
let (subs2_event_tx, mut subs2_event_rx) = unbounded();
let (subs3_event_tx, mut subs3_event_rx) = unbounded();

// Two subscriptions with the same query
router.add(subs1_id, "query1", subs1_event_tx);
router.add(subs2_id, "query1", subs2_event_tx);
// Another subscription with a different query
router.add(subs3_id, "query2", subs3_event_tx);

let mut ev = read_event("subscribe_newblock_0").await;
ev.query = "query1".into();
router.publish_event(ev.clone());

let subs1_ev = must_recv(&mut subs1_event_rx, 500).await.unwrap();
let subs2_ev = must_recv(&mut subs2_event_rx, 500).await.unwrap();
must_not_recv(&mut subs3_event_rx, 50).await;
assert_eq!(ev, subs1_ev);
assert_eq!(ev, subs2_ev);

ev.query = "query2".into();
router.publish_event(ev.clone());

must_not_recv(&mut subs1_event_rx, 50).await;
must_not_recv(&mut subs2_event_rx, 50).await;
let subs3_ev = must_recv(&mut subs3_event_rx, 500).await.unwrap();
assert_eq!(ev, subs3_ev);
}
}
}
Loading

0 comments on commit 7fedc09

Please sign in to comment.