From 7fedc092cf24b3db20ae31be63e242132f72706f Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Mon, 27 Feb 2023 13:58:31 +0200 Subject: [PATCH] rpc: fixed and expanded unit tests using fixtures Use the versioned path to the fixture files, and add a copy of tests for v0_37 which was previously not covered. --- rpc/src/client/transport/mock.rs | 65 ++++++-- rpc/src/client/transport/router.rs | 145 ++++++++++++----- rpc/src/client/transport/websocket.rs | 223 ++++++++++++++++++-------- 3 files changed, 314 insertions(+), 119 deletions(-) diff --git a/rpc/src/client/transport/mock.rs b/rpc/src/client/transport/mock.rs index 52edebd2f..987885779 100644 --- a/rpc/src/client/transport/mock.rs +++ b/rpc/src/client/transport/mock.rs @@ -251,9 +251,12 @@ mod test { use super::*; use crate::query::EventType; - async fn read_json_fixture(name: &str) -> String { + async fn read_json_fixture(version: &str, name: &str) -> String { fs::read_to_string( - PathBuf::from("./tests/kvstore_fixtures/incoming/").join(name.to_owned() + ".json"), + PathBuf::from("./tests/kvstore_fixtures") + .join(version) + .join("incoming") + .join(name.to_owned() + ".json"), ) .await .unwrap() @@ -265,14 +268,15 @@ mod test { use crate::event::DialectEvent; async fn read_event(name: &str) -> Event { - let msg = DialectEvent::::from_string(read_json_fixture(name).await).unwrap(); + let msg = DialectEvent::::from_string(read_json_fixture("v0_34", name).await) + .unwrap(); msg.into() } #[tokio::test] async fn mock_client() { - let abci_info_fixture = read_json_fixture("abci_info").await; - let block_fixture = read_json_fixture("block_at_height_10").await; + let abci_info_fixture = read_json_fixture("v0_34", "abci_info").await; + let block_fixture = read_json_fixture("v0_34", "block_at_height_10").await; let matcher = MockRequestMethodMatcher::default() .map(Method::AbciInfo, Ok(abci_info_fixture)) .map(Method::Block, Ok(block_fixture)); @@ -330,11 +334,19 @@ mod test { mod v0_37 { use super::*; + use crate::dialect::v0_37::Event as RpcEvent; + use crate::event::DialectEvent; + + async fn read_event(name: &str) -> Event { + let msg = DialectEvent::::from_string(read_json_fixture("v0_37", name).await) + .unwrap(); + msg.into() + } #[tokio::test] async fn mock_client() { - let abci_info_fixture = read_json_fixture("abci_info").await; - let block_fixture = read_json_fixture("block_at_height_10").await; + let abci_info_fixture = read_json_fixture("v0_37", "abci_info").await; + let block_fixture = read_json_fixture("v0_37", "block_at_height_10").await; let matcher = MockRequestMethodMatcher::default() .map(Method::AbciInfo, Ok(abci_info_fixture)) .map(Method::Block, Ok(block_fixture)); @@ -342,7 +354,7 @@ mod test { let driver_hdl = tokio::spawn(async move { driver.run().await }); let abci_info = client.abci_info().await.unwrap(); - assert_eq!("{\"size\":0}".to_string(), abci_info.data); + assert_eq!("{\"size\":9}".to_string(), abci_info.data); let block = client.block(Height::from(10_u32)).await.unwrap().block; assert_eq!(Height::from(10_u32), block.header.height); @@ -352,6 +364,41 @@ mod test { driver_hdl.await.unwrap().unwrap(); } - // TODO: add mock_subscription_client test for v0_37 + #[tokio::test] + async fn mock_subscription_client() { + let (client, driver) = MockClient::new(MockRequestMethodMatcher::default()); + let driver_hdl = tokio::spawn(async move { driver.run().await }); + + let event1 = read_event("subscribe_newblock_0").await; + let event2 = read_event("subscribe_newblock_1").await; + let event3 = read_event("subscribe_newblock_2").await; + let events = vec![event1, event2, event3]; + + let subs1 = client.subscribe(EventType::NewBlock.into()).await.unwrap(); + let subs2 = client.subscribe(EventType::NewBlock.into()).await.unwrap(); + assert_ne!(subs1.id().to_string(), subs2.id().to_string()); + + // We can do this because the underlying channels can buffer the + // messages as we publish them. + let subs1_events = subs1.take(3); + let subs2_events = subs2.take(3); + for ev in &events { + client.publish(ev); + } + + // Here each subscription's channel is drained. + let subs1_events = subs1_events.collect::>>().await; + let subs2_events = subs2_events.collect::>>().await; + + assert_eq!(3, subs1_events.len()); + assert_eq!(3, subs2_events.len()); + + for i in 0..3 { + assert!(events[i].eq(subs1_events[i].as_ref().unwrap())); + } + + client.close(); + driver_hdl.await.unwrap().unwrap(); + } } } diff --git a/rpc/src/client/transport/router.rs b/rpc/src/client/transport/router.rs index 40542c61b..985492069 100644 --- a/rpc/src/client/transport/router.rs +++ b/rpc/src/client/transport/router.rs @@ -147,25 +147,18 @@ mod test { event::{Event, WrappedEvent}, utils::uuid_str, }; - // TODO: add fixtures for dialect::v0_37 - use crate::dialect::v0_34::Event as RpcEvent; - async fn read_json_fixture(name: &str) -> String { + async fn read_json_fixture(version: &str, name: &str) -> String { fs::read_to_string( - PathBuf::from("./tests/kvstore_fixtures/incoming/").join(name.to_owned() + ".json"), + PathBuf::from("./tests/kvstore_fixtures") + .join(version) + .join("incoming") + .join(name.to_owned() + ".json"), ) .await .unwrap() } - async fn read_event(name: &str) -> Event { - serde_json::from_str::>(read_json_fixture(name).await.as_str()) - .unwrap() - .into_result() - .unwrap() - .into() - } - async fn must_recv(ch: &mut ChannelRx, timeout_ms: u64) -> T { let delay = time::sleep(Duration::from_millis(timeout_ms)); tokio::select! { @@ -185,37 +178,101 @@ mod test { } } - #[tokio::test] - async fn router_basic_pub_sub() { - let mut router = SubscriptionRouter::default(); - - let (subs1_id, subs2_id, subs3_id) = (uuid_str(), uuid_str(), uuid_str()); - let (subs1_event_tx, mut subs1_event_rx) = unbounded(); - let (subs2_event_tx, mut subs2_event_rx) = unbounded(); - let (subs3_event_tx, mut subs3_event_rx) = unbounded(); - - // Two subscriptions with the same query - router.add(subs1_id, "query1", subs1_event_tx); - router.add(subs2_id, "query1", subs2_event_tx); - // Another subscription with a different query - router.add(subs3_id, "query2", subs3_event_tx); - - let mut ev = read_event("subscribe_newblock_0").await; - ev.query = "query1".into(); - router.publish_event(ev.clone()); - - let subs1_ev = must_recv(&mut subs1_event_rx, 500).await.unwrap(); - let subs2_ev = must_recv(&mut subs2_event_rx, 500).await.unwrap(); - must_not_recv(&mut subs3_event_rx, 50).await; - assert_eq!(ev, subs1_ev); - assert_eq!(ev, subs2_ev); - - ev.query = "query2".into(); - router.publish_event(ev.clone()); - - must_not_recv(&mut subs1_event_rx, 50).await; - must_not_recv(&mut subs2_event_rx, 50).await; - let subs3_ev = must_recv(&mut subs3_event_rx, 500).await.unwrap(); - assert_eq!(ev, subs3_ev); + mod v0_34 { + use super::*; + use crate::dialect::v0_34::Event as RpcEvent; + + async fn read_event(name: &str) -> Event { + serde_json::from_str::>( + read_json_fixture("v0_34", name).await.as_str(), + ) + .unwrap() + .into_result() + .unwrap() + .into() + } + + #[tokio::test] + async fn router_basic_pub_sub() { + let mut router = SubscriptionRouter::default(); + + let (subs1_id, subs2_id, subs3_id) = (uuid_str(), uuid_str(), uuid_str()); + let (subs1_event_tx, mut subs1_event_rx) = unbounded(); + let (subs2_event_tx, mut subs2_event_rx) = unbounded(); + let (subs3_event_tx, mut subs3_event_rx) = unbounded(); + + // Two subscriptions with the same query + router.add(subs1_id, "query1", subs1_event_tx); + router.add(subs2_id, "query1", subs2_event_tx); + // Another subscription with a different query + router.add(subs3_id, "query2", subs3_event_tx); + + let mut ev = read_event("subscribe_newblock_0").await; + ev.query = "query1".into(); + router.publish_event(ev.clone()); + + let subs1_ev = must_recv(&mut subs1_event_rx, 500).await.unwrap(); + let subs2_ev = must_recv(&mut subs2_event_rx, 500).await.unwrap(); + must_not_recv(&mut subs3_event_rx, 50).await; + assert_eq!(ev, subs1_ev); + assert_eq!(ev, subs2_ev); + + ev.query = "query2".into(); + router.publish_event(ev.clone()); + + must_not_recv(&mut subs1_event_rx, 50).await; + must_not_recv(&mut subs2_event_rx, 50).await; + let subs3_ev = must_recv(&mut subs3_event_rx, 500).await.unwrap(); + assert_eq!(ev, subs3_ev); + } + } + + mod v0_37 { + use super::*; + use crate::dialect::v0_37::Event as RpcEvent; + + async fn read_event(name: &str) -> Event { + serde_json::from_str::>( + read_json_fixture("v0_37", name).await.as_str(), + ) + .unwrap() + .into_result() + .unwrap() + .into() + } + + #[tokio::test] + async fn router_basic_pub_sub() { + let mut router = SubscriptionRouter::default(); + + let (subs1_id, subs2_id, subs3_id) = (uuid_str(), uuid_str(), uuid_str()); + let (subs1_event_tx, mut subs1_event_rx) = unbounded(); + let (subs2_event_tx, mut subs2_event_rx) = unbounded(); + let (subs3_event_tx, mut subs3_event_rx) = unbounded(); + + // Two subscriptions with the same query + router.add(subs1_id, "query1", subs1_event_tx); + router.add(subs2_id, "query1", subs2_event_tx); + // Another subscription with a different query + router.add(subs3_id, "query2", subs3_event_tx); + + let mut ev = read_event("subscribe_newblock_0").await; + ev.query = "query1".into(); + router.publish_event(ev.clone()); + + let subs1_ev = must_recv(&mut subs1_event_rx, 500).await.unwrap(); + let subs2_ev = must_recv(&mut subs2_event_rx, 500).await.unwrap(); + must_not_recv(&mut subs3_event_rx, 50).await; + assert_eq!(ev, subs1_ev); + assert_eq!(ev, subs2_ev); + + ev.query = "query2".into(); + router.publish_event(ev.clone()); + + must_not_recv(&mut subs1_event_rx, 50).await; + must_not_recv(&mut subs2_event_rx, 50).await; + let subs3_ev = must_recv(&mut subs3_event_rx, 500).await.unwrap(); + assert_eq!(ev, subs3_ev); + } } } diff --git a/rpc/src/client/transport/websocket.rs b/rpc/src/client/transport/websocket.rs index 8c3665c7b..25731d88e 100644 --- a/rpc/src/client/transport/websocket.rs +++ b/rpc/src/client/transport/websocket.rs @@ -974,9 +974,7 @@ mod test { }; use super::*; - use crate::{client::sync::unbounded, query::EventType, request, Id, Method}; - // TODO: test with dialect::v0_37 as well - use crate::dialect::v0_34::Event as RpcEvent; + use crate::{client::sync::unbounded, dialect, query::EventType, request, Id, Method}; // Interface to a driver that manages all incoming WebSocket connections. struct TestServer { @@ -987,7 +985,7 @@ mod test { } impl TestServer { - async fn new(addr: &str) -> Self { + async fn new(addr: &str, compat: CompatMode) -> Self { let listener = TcpListener::bind(addr).await.unwrap(); let local_addr = listener.local_addr().unwrap(); let node_addr = net::Address::Tcp { @@ -997,7 +995,7 @@ mod test { }; let (terminate_tx, terminate_rx) = unbounded(); let (event_tx, event_rx) = unbounded(); - let driver = TestServerDriver::new(listener, event_rx, terminate_rx); + let driver = TestServerDriver::new(listener, compat, event_rx, terminate_rx); let driver_hdl = tokio::spawn(async move { driver.run().await }); Self { node_addr, @@ -1020,6 +1018,7 @@ mod test { // Manages all incoming WebSocket connections. struct TestServerDriver { listener: TcpListener, + compat: CompatMode, event_rx: ChannelRx, terminate_rx: ChannelRx>, handlers: Vec, @@ -1028,11 +1027,13 @@ mod test { impl TestServerDriver { fn new( listener: TcpListener, + compat: CompatMode, event_rx: ChannelRx, terminate_rx: ChannelRx>, ) -> Self { Self { listener, + compat, event_rx, terminate_rx, handlers: Vec::new(), @@ -1064,7 +1065,8 @@ mod test { } async fn handle_incoming(&mut self, stream: TcpStream) { - self.handlers.push(TestServerHandler::new(stream).await); + self.handlers + .push(TestServerHandler::new(stream, self.compat).await); } async fn terminate(&mut self) { @@ -1087,12 +1089,12 @@ mod test { } impl TestServerHandler { - async fn new(stream: TcpStream) -> Self { + async fn new(stream: TcpStream, compat: CompatMode) -> Self { let conn: WebSocketStream> = accept_async(stream).await.unwrap(); let (terminate_tx, terminate_rx) = unbounded(); let (event_tx, event_rx) = unbounded(); - let driver = TestServerHandlerDriver::new(conn, event_rx, terminate_rx); + let driver = TestServerHandlerDriver::new(conn, compat, event_rx, terminate_rx); let driver_hdl = tokio::spawn(async move { driver.run().await }); Self { driver_hdl, @@ -1114,6 +1116,7 @@ mod test { // Manages interaction with a single incoming WebSocket connection. struct TestServerHandlerDriver { conn: WebSocketStream>, + compat: CompatMode, event_rx: ChannelRx, terminate_rx: ChannelRx>, // A mapping of subscription queries to subscription IDs for this @@ -1124,11 +1127,13 @@ mod test { impl TestServerHandlerDriver { fn new( conn: WebSocketStream>, + compat: CompatMode, event_rx: ChannelRx, terminate_rx: ChannelRx>, ) -> Self { Self { conn, + compat, event_rx, terminate_rx, subscriptions: HashMap::new(), @@ -1157,8 +1162,16 @@ mod test { Some(id) => id.clone(), None => return, }; - let ev: DialectEvent = ev.into(); - self.send(Id::Str(subs_id), ev).await; + match self.compat { + CompatMode::Latest => { + let ev: DialectEvent = ev.into(); + self.send(Id::Str(subs_id), ev).await; + }, + CompatMode::V0_34 => { + let ev: DialectEvent = ev.into(); + self.send(Id::Str(subs_id), ev).await; + }, + } } async fn handle_incoming_msg(&mut self, msg: Message) -> Option> { @@ -1257,76 +1270,154 @@ mod test { } } - async fn read_json_fixture(name: &str) -> String { + async fn read_json_fixture(version: &str, name: &str) -> String { fs::read_to_string( - PathBuf::from("./tests/kvstore_fixtures/incoming/").join(name.to_owned() + ".json"), + PathBuf::from("./tests/kvstore_fixtures") + .join(version) + .join("incoming") + .join(name.to_owned() + ".json"), ) .await .unwrap() } - async fn read_event(name: &str) -> Event { - DialectEvent::::from_string(read_json_fixture(name).await) - .unwrap() - .into() - } + mod v0_34 { + use super::*; + use crate::dialect::v0_34::Event as RpcEvent; - #[tokio::test] - async fn websocket_client_happy_path() { - let event1 = read_event("subscribe_newblock_0").await; - let event2 = read_event("subscribe_newblock_1").await; - let event3 = read_event("subscribe_newblock_2").await; - let test_events = vec![event1, event2, event3]; - - println!("Starting WebSocket server..."); - let mut server = TestServer::new("127.0.0.1:0").await; - println!("Creating client RPC WebSocket connection..."); - let (client, driver) = WebSocketClient::new_with_config( - server.node_addr.clone(), - WebSocketConfig { - compat: CompatMode::V0_34, - ..Default::default() - }, - ) - .await - .unwrap(); - let driver_handle = tokio::spawn(async move { driver.run().await }); - - println!("Initiating subscription for new blocks..."); - let mut subs = client.subscribe(EventType::NewBlock.into()).await.unwrap(); - - // Collect all the events from the subscription. - let subs_collector_hdl = tokio::spawn(async move { - let mut results = Vec::new(); - while let Some(res) = subs.next().await { - results.push(res); - if results.len() == 3 { - break; + async fn read_event(name: &str) -> Event { + DialectEvent::::from_string(read_json_fixture("v0_34", name).await) + .unwrap() + .into() + } + + #[tokio::test] + async fn websocket_client_happy_path() { + let event1 = read_event("subscribe_newblock_0").await; + let event2 = read_event("subscribe_newblock_1").await; + let event3 = read_event("subscribe_newblock_2").await; + let test_events = vec![event1, event2, event3]; + + println!("Starting WebSocket server..."); + let mut server = TestServer::new("127.0.0.1:0", CompatMode::V0_34).await; + println!("Creating client RPC WebSocket connection..."); + let (client, driver) = WebSocketClient::new_with_config( + server.node_addr.clone(), + WebSocketConfig { + compat: CompatMode::V0_34, + ..Default::default() + }, + ) + .await + .unwrap(); + let driver_handle = tokio::spawn(async move { driver.run().await }); + + println!("Initiating subscription for new blocks..."); + let mut subs = client.subscribe(EventType::NewBlock.into()).await.unwrap(); + + // Collect all the events from the subscription. + let subs_collector_hdl = tokio::spawn(async move { + let mut results = Vec::new(); + while let Some(res) = subs.next().await { + results.push(res); + if results.len() == 3 { + break; + } } + results + }); + + println!("Publishing events"); + // Publish the events from this context + for ev in &test_events { + server.publish_event(ev.clone()).unwrap(); } - results - }); - println!("Publishing events"); - // Publish the events from this context - for ev in &test_events { - server.publish_event(ev.clone()).unwrap(); + println!("Collecting results from subscription..."); + let collected_results = subs_collector_hdl.await.unwrap(); + + client.close().unwrap(); + server.terminate().await.unwrap(); + let _ = driver_handle.await.unwrap(); + println!("Closed client and terminated server"); + + assert_eq!(3, collected_results.len()); + for i in 0..3 { + assert_eq!( + test_events[i], + collected_results[i].as_ref().unwrap().clone() + ); + } } + } - println!("Collecting results from subscription..."); - let collected_results = subs_collector_hdl.await.unwrap(); + mod v0_37 { + use super::*; + use crate::dialect::v0_37::Event as RpcEvent; - client.close().unwrap(); - server.terminate().await.unwrap(); - let _ = driver_handle.await.unwrap(); - println!("Closed client and terminated server"); + async fn read_event(name: &str) -> Event { + DialectEvent::::from_string(read_json_fixture("v0_37", name).await) + .unwrap() + .into() + } - assert_eq!(3, collected_results.len()); - for i in 0..3 { - assert_eq!( - test_events[i], - collected_results[i].as_ref().unwrap().clone() - ); + #[tokio::test] + async fn websocket_client_happy_path() { + let event1 = read_event("subscribe_newblock_0").await; + let event2 = read_event("subscribe_newblock_1").await; + let event3 = read_event("subscribe_newblock_2").await; + let test_events = vec![event1, event2, event3]; + + println!("Starting WebSocket server..."); + let mut server = TestServer::new("127.0.0.1:0", CompatMode::Latest).await; + println!("Creating client RPC WebSocket connection..."); + let (client, driver) = WebSocketClient::new_with_config( + server.node_addr.clone(), + WebSocketConfig { + compat: CompatMode::Latest, + ..Default::default() + }, + ) + .await + .unwrap(); + let driver_handle = tokio::spawn(async move { driver.run().await }); + + println!("Initiating subscription for new blocks..."); + let mut subs = client.subscribe(EventType::NewBlock.into()).await.unwrap(); + + // Collect all the events from the subscription. + let subs_collector_hdl = tokio::spawn(async move { + let mut results = Vec::new(); + while let Some(res) = subs.next().await { + results.push(res); + if results.len() == 3 { + break; + } + } + results + }); + + println!("Publishing events"); + // Publish the events from this context + for ev in &test_events { + server.publish_event(ev.clone()).unwrap(); + } + + println!("Collecting results from subscription..."); + let collected_results = subs_collector_hdl.await.unwrap(); + + client.close().unwrap(); + server.terminate().await.unwrap(); + let _ = driver_handle.await.unwrap(); + println!("Closed client and terminated server"); + + assert_eq!(3, collected_results.len()); + for i in 0..3 { + assert_eq!( + test_events[i], + collected_results[i].as_ref().unwrap().clone() + ); + } } }