Skip to content

Commit

Permalink
Merge #357
Browse files Browse the repository at this point in the history
357: test: add more integration test for new p2p lib r=quake a=quake

- [x] add network related rpc
- [x] test discovery
- [x] test disconnection

Co-authored-by: quake wang <[email protected]>
  • Loading branch information
bors[bot] and quake committed Mar 29, 2019
2 parents a4144ec + 4a56b24 commit f80e74b
Show file tree
Hide file tree
Showing 16 changed files with 173 additions and 34 deletions.
4 changes: 3 additions & 1 deletion network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ const IDENTIFY_PROTOCOL_ID: ProtocolId = 2;
pub const FEELER_PROTOCOL_ID: ProtocolId = 3;

pub type CKBProtocols = Vec<(CKBProtocol, Arc<dyn CKBProtocolHandler>)>;
pub type MultiaddrList = Vec<(Multiaddr, u8)>;

type NetworkResult = Result<
(
Arc<Network>,
Expand Down Expand Up @@ -134,7 +136,7 @@ impl Network {
&self.local_peer_id
}

pub(crate) fn listened_addresses(&self, count: usize) -> Vec<(Multiaddr, u8)> {
pub(crate) fn listened_addresses(&self, count: usize) -> MultiaddrList {
let listened_addresses = self.listened_addresses.read();
listened_addresses
.iter()
Expand Down
28 changes: 27 additions & 1 deletion network/src/network_service.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::network::MultiaddrList;
use crate::protocol::ckb_handler::{CKBProtocolContext, DefaultCKBProtocolContext};
use crate::{errors::Error, CKBEvent, NetworkConfig, ProtocolId};
use crate::{
multiaddr::Multiaddr,
network::{CKBProtocols, Network},
PeerId,
Peer, PeerId,
};
use ckb_util::Mutex;
use futures::future::Future;
Expand All @@ -13,6 +14,8 @@ use log::{debug, error, info};
use std::sync::Arc;
use tokio::runtime;

const ADDR_LIMIT: u32 = 3;

pub struct StopHandler {
signal: oneshot::Sender<()>,
network_runtime: runtime::Runtime,
Expand Down Expand Up @@ -125,4 +128,27 @@ impl NetworkService {
pub fn add_node(&self, peer_id: &PeerId, address: Multiaddr) {
self.network.add_node(peer_id, address)
}

pub fn connected_peers(&self) -> Vec<(PeerId, Peer, MultiaddrList)> {
let peer_store = self.network.peer_store().read();

self.network
.peers_registry
.read()
.peers_iter()
.map(|(peer_id, peer)| {
(
peer_id.clone(),
peer.clone(),
peer_store
.peer_addrs(peer_id, ADDR_LIMIT)
.unwrap_or_default()
.into_iter()
// FIXME how to return address score?
.map(|address| (address, 1))
.collect(),
)
})
.collect()
}
}
29 changes: 29 additions & 0 deletions rpc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,35 @@ curl -d '{"id": 2, "jsonrpc": "2.0", "method":"local_node_info","params": []}' -
}
```

# get_peers

Returns the connected peers information.

## Examples

```shell
curl -d '{"id": 2, "jsonrpc": "2.0", "method":"get_peers","params": []}' -H 'content-type:application/json' 'http://localhost:8114'
```

```json
{
"jsonrpc": "2.0",
"result": [
{
"addresses": [
{
"address": "/ip4/192.168.2.3/tcp/12344/p2p/QmdiJuQZj1dM4K4HKMxfMwcAqGFYvGKpbvVxTzyQeNGEcG",
"score": 1
}
],
"node_id": "QmdiJuQZj1dM4K4HKMxfMwcAqGFYvGKpbvVxTzyQeNGEcG",
"version": "0.5.0"
}
],
"id": 2
}
```

# send_transaction

Creates new transaction.
Expand Down
34 changes: 30 additions & 4 deletions rpc/src/module/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use build_info::{get_version, Version};
use ckb_network::NetworkService;
use jsonrpc_core::Result;
use jsonrpc_derive::rpc;
use jsonrpc_types::{LocalNode, NodeAddress};
use jsonrpc_types::{Node, NodeAddress};
use std::sync::Arc;

const MAX_ADDRS: usize = 50;
Expand All @@ -11,16 +11,20 @@ const MAX_ADDRS: usize = 50;
pub trait NetworkRpc {
// curl -d '{"id": 2, "jsonrpc": "2.0", "method":"local_node_info","params": []}' -H 'content-type:application/json' 'http://localhost:8114'
#[rpc(name = "local_node_info")]
fn local_node_info(&self) -> Result<LocalNode>;
fn local_node_info(&self) -> Result<Node>;

// curl -d '{"id": 2, "jsonrpc": "2.0", "method":"get_peers","params": []}' -H 'content-type:application/json' 'http://localhost:8114'
#[rpc(name = "get_peers")]
fn get_peers(&self) -> Result<Vec<Node>>;
}

pub(crate) struct NetworkRpcImpl {
pub network: Arc<NetworkService>,
}

impl NetworkRpc for NetworkRpcImpl {
fn local_node_info(&self) -> Result<LocalNode> {
Ok(LocalNode {
fn local_node_info(&self) -> Result<Node> {
Ok(Node {
version: get_version!().to_string(),
node_id: self.network.node_id(),
addresses: self
Expand All @@ -31,4 +35,26 @@ impl NetworkRpc for NetworkRpcImpl {
.collect(),
})
}

fn get_peers(&self) -> Result<Vec<Node>> {
let peers = self.network.connected_peers();
Ok(peers
.into_iter()
.map(|(peer_id, peer, addresses)| Node {
version: peer
.identify_info
.map(|info| info.client_version)
.unwrap_or_else(|| "unknown".to_string()),
node_id: peer_id.to_base58(),
// TODO how to get correct port and score?
addresses: addresses
.into_iter()
.map(|(address, score)| NodeAddress {
address: address.to_string(),
score,
})
.collect(),
})
.collect())
}
}
8 changes: 6 additions & 2 deletions test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ fn main() {
"pool_reconcile" => Box::new(PoolReconcile),
"pool_trace" => Box::new(PoolTrace),
"transaction_relay_basic" => Box::new(TransactionRelayBasic),
"discovery" => Box::new(Discovery),
"disconnect" => Box::new(Disconnect),
_ => panic!("invalid spec"),
};
let net = spec.setup_net(&binary, start_port);
spec.run(&net);
spec.run(net);
} else {
let specs: Vec<Box<Spec>> = vec![
Box::new(BlockRelayBasic),
Expand All @@ -38,11 +40,13 @@ fn main() {
Box::new(PoolReconcile),
Box::new(PoolTrace),
Box::new(TransactionRelayBasic),
Box::new(Discovery),
Box::new(Disconnect),
];

specs.iter().for_each(|spec| {
let net = spec.setup_net(&binary, start_port);
spec.run(&net);
spec.run(net);
})
}

Expand Down
26 changes: 13 additions & 13 deletions test/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub struct Node {
pub dir: String,
pub p2p_port: u16,
pub rpc_port: u16,
pub node_id: Option<String>,
guard: Option<ProcessGuard>,
}

Expand All @@ -43,6 +44,7 @@ impl Node {
dir: dir.to_string(),
p2p_port,
rpc_port,
node_id: None,
guard: None,
}
}
Expand All @@ -57,11 +59,21 @@ impl Node {
])
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.stderr(Stdio::inherit())
.spawn()
.expect("failed to run binary");
self.guard = Some(ProcessGuard(child_process));
info!("Started node with working dir: {}", self.dir);

let mut client = self.rpc_client();
loop {
if let Ok(result) = client.local_node_info().call() {
info!("RPC service ready, {:?}", result);
self.node_id = Some(result.node_id);
break;
}
sleep(1);
}
}

pub fn connect(&self, node: &Node) {
Expand Down Expand Up @@ -142,18 +154,6 @@ impl Node {
.expect("rpc call send_transaction failed")
}

pub fn wait_for_rpc_connection(&self) {
let mut client = self.rpc_client();

loop {
if let Ok(result) = client.local_node_info().call() {
info!("RPC service ready, {:?}", result);
break;
}
sleep(1);
}
}

pub fn new_block(&self) -> Block {
let template = self
.rpc_client()
Expand Down
5 changes: 3 additions & 2 deletions test/src/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use ckb_core::BlockNumber;
use ckb_shared::tx_pool::TxTrace;
use jsonrpc_client_core::{expand_params, jsonrpc_client};
use jsonrpc_types::{Block, BlockTemplate, Header, LocalNode, Transaction};
use jsonrpc_types::{Block, BlockTemplate, Header, Node, Transaction};
use numext_fixed_hash::H256;

jsonrpc_client!(pub struct RpcClient {
pub fn local_node_info(&mut self) -> RpcRequest<LocalNode>;
pub fn local_node_info(&mut self) -> RpcRequest<Node>;
pub fn get_peers(&mut self) -> RpcRequest<Vec<Node>>;

pub fn add_node(&mut self, peer_id: String, address: String) -> RpcRequest<()>;

Expand Down
2 changes: 1 addition & 1 deletion test/src/specs/block_relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use log::info;
pub struct BlockRelayBasic;

impl Spec for BlockRelayBasic {
fn run(&self, net: &Net) {
fn run(&self, net: Net) {
info!("Running BlockRelayBasic");
let node0 = &net.nodes[0];
let node1 = &net.nodes[1];
Expand Down
2 changes: 1 addition & 1 deletion test/src/specs/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use log::info;
pub struct BlockSyncBasic;

impl Spec for BlockSyncBasic {
fn run(&self, net: &Net) {
fn run(&self, net: Net) {
info!("Running BlockSyncBasic");
let node0 = &net.nodes[0];
let node1 = &net.nodes[1];
Expand Down
2 changes: 1 addition & 1 deletion test/src/specs/mining.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use log::info;
pub struct MiningBasic;

impl Spec for MiningBasic {
fn run(&self, net: &Net) {
fn run(&self, net: Net) {
info!("Running MiningBasic");
let node = &net.nodes[0];

Expand Down
5 changes: 3 additions & 2 deletions test/src/specs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod block_relay;
mod block_sync;
mod mining;
mod net;
mod pool;
mod transaction_relay;

Expand All @@ -9,11 +10,12 @@ pub use block_sync::BlockSyncBasic;
pub use mining::MiningBasic;
pub use pool::{PoolReconcile, PoolTrace};
pub use transaction_relay::TransactionRelayBasic;
pub use net::{Disconnect, Discovery};

use crate::{sleep, Net};

pub trait Spec {
fn run(&self, net: &Net);
fn run(&self, net: Net);

fn num_nodes(&self) -> usize {
3
Expand All @@ -29,7 +31,6 @@ pub trait Spec {
// start all nodes
net.nodes.iter_mut().for_each(|node| {
node.start();
node.wait_for_rpc_connection();
});

// connect the nodes as a linear chain: node0 <-> node1 <-> node2 <-> ...
Expand Down
49 changes: 49 additions & 0 deletions test/src/specs/net.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use crate::{sleep, Net, Spec};
use log::info;

pub struct Discovery;

impl Spec for Discovery {
fn run(&self, net: Net) {
info!("Running Discovery");
let node0_id = &net.nodes[0].node_id.clone().unwrap();
let node2 = &net.nodes[2];

info!("Waiting for discovering");
sleep(5);

info!("The address of node0 should be discovered by node2 and connected");
let peers = node2
.rpc_client()
.get_peers()
.call()
.expect("rpc call get_peers failed");
assert!(peers.iter().any(|peer| &peer.node_id == node0_id));
}
}

pub struct Disconnect;

impl Spec for Disconnect {
fn run(&self, mut net: Net) {
info!("Running Disconnect");

info!("Disconnect node1");
let node1 = net.nodes.pop().unwrap();
std::mem::drop(node1);
sleep(3);

info!("The address of node1 should be removed from node0's peers");
let peers = net.nodes[0]
.rpc_client()
.get_peers()
.call()
.expect("rpc call get_peers failed");

assert!(peers.is_empty());
}

fn num_nodes(&self) -> usize {
2
}
}
4 changes: 2 additions & 2 deletions test/src/specs/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use log::info;
pub struct PoolReconcile;

impl Spec for PoolReconcile {
fn run(&self, net: &Net) {
fn run(&self, net: Net) {
info!("Running PoolReconcile");
let node0 = &net.nodes[0];
let node1 = &net.nodes[1];
Expand Down Expand Up @@ -61,7 +61,7 @@ impl Spec for PoolReconcile {
pub struct PoolTrace;

impl Spec for PoolTrace {
fn run(&self, net: &Net) {
fn run(&self, net: Net) {
info!("Running PoolTrace");
let node0 = &net.nodes[0];

Expand Down
2 changes: 1 addition & 1 deletion test/src/specs/transaction_relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use log::info;
pub struct TransactionRelayBasic;

impl Spec for TransactionRelayBasic {
fn run(&self, net: &Net) {
fn run(&self, net: Net) {
info!("Running TransactionRelayBasic");
let node0 = &net.nodes[0];
let node1 = &net.nodes[1];
Expand Down
Loading

0 comments on commit f80e74b

Please sign in to comment.