Skip to content

Commit

Permalink
Merge pull request nervosnetwork#20 from contrun/sync-after-restart: …
Browse files Browse the repository at this point in the history
…Add integration test to check correctly sync after restart

Add integration test to check correctly sync after restart
  • Loading branch information
eval-exec authored Jan 18, 2024
2 parents b5d5529 + 73be8e1 commit 9f05dc5
Show file tree
Hide file tree
Showing 9 changed files with 217 additions and 41 deletions.
2 changes: 2 additions & 0 deletions test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ fn all_specs() -> Vec<Box<dyn Spec>> {
Box::new(BlockSyncNonAncestorBestBlocks),
Box::new(RequestUnverifiedBlocks),
Box::new(SyncTimeout),
Box::new(SyncChurn),
Box::new(GetBlockFilterCheckPoints),
Box::new(GetBlockFilterHashes),
Box::new(GetBlockFilters),
Expand Down Expand Up @@ -580,6 +581,7 @@ fn all_specs() -> Vec<Box<dyn Spec>> {
Box::new(CheckVmVersion1),
Box::new(CheckVmVersion2),
Box::new(CheckVmBExtension),
Box::new(RandomlyKill),
];
specs.shuffle(&mut thread_rng());
specs
Expand Down
4 changes: 2 additions & 2 deletions test/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl Net {
let protocol_id = protocol.protocol_id();
let peer_index = self
.receivers
.get(node_id)
.get(&node_id)
.map(|(peer_index, _)| *peer_index)
.unwrap_or_else(|| panic!("not connected peer {}", node.p2p_address()));
self.controller()
Expand All @@ -156,7 +156,7 @@ impl Net {
let node_id = node.node_id();
let (peer_index, receiver) = self
.receivers
.get(node_id)
.get(&node_id)
.unwrap_or_else(|| panic!("not connected peer {}", node.p2p_address()));
let net_message = receiver.recv_timeout(timeout)?;
info!(
Expand Down
128 changes: 97 additions & 31 deletions test/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use ckb_chain_spec::ChainSpec;
use ckb_error::AnyError;
use ckb_jsonrpc_types::{BlockFilter, BlockTemplate, TxPoolInfo};
use ckb_jsonrpc_types::{PoolTxDetailInfo, TxStatus};
use ckb_logger::{debug, error};
use ckb_logger::{debug, error, info};
use ckb_network::multiaddr::Multiaddr;
use ckb_resource::Resource;
use ckb_types::{
bytes,
Expand All @@ -19,16 +20,17 @@ use ckb_types::{
packed::{Block, Byte32, CellDep, CellInput, CellOutput, CellOutputBuilder, OutPoint, Script},
prelude::*,
};
use std::borrow::Borrow;
use std::collections::HashSet;
use std::borrow::{Borrow, BorrowMut};
use std::collections::{HashMap, HashSet};
use std::convert::Into;
use std::fs;
use std::path::PathBuf;
use std::process::{Child, Command, Stdio};
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::time::{Duration, Instant};

struct ProcessGuard {
pub(crate) struct ProcessGuard {
pub name: String,
pub child: Child,
pub killed: bool,
Expand All @@ -46,16 +48,21 @@ impl Drop for ProcessGuard {
}
}

#[derive(Clone)]
pub struct Node {
inner: Arc<InnerNode>,
}

pub struct InnerNode {
spec_node_name: String,
working_dir: PathBuf,
consensus: Consensus,
p2p_listen: String,
rpc_client: RpcClient,
rpc_listen: String,

node_id: Option<String>, // initialize when starts node
guard: Option<ProcessGuard>, // initialize when starts node
node_id: RwLock<Option<String>>, // initialize when starts node
guard: RwLock<Option<ProcessGuard>>, // initialize when starts node
}

impl Node {
Expand Down Expand Up @@ -105,7 +112,7 @@ impl Node {
modifier(&mut app_config);
fs::write(&app_config_path, toml::to_string(&app_config).unwrap()).unwrap();

*self = Self::init(self.working_dir(), self.spec_node_name.clone());
*self = Self::init(self.working_dir(), self.inner.spec_node_name.clone());
}

pub fn modify_chain_spec<M>(&mut self, modifier: M)
Expand All @@ -118,7 +125,7 @@ impl Node {
modifier(&mut chain_spec);
fs::write(&chain_spec_path, toml::to_string(&chain_spec).unwrap()).unwrap();

*self = Self::init(self.working_dir(), self.spec_node_name.clone());
*self = Self::init(self.working_dir(), self.inner.spec_node_name.clone());
}

// Initialize Node instance based on working directory
Expand Down Expand Up @@ -150,44 +157,51 @@ impl Node {
chain_spec.build_consensus().unwrap()
};
Self {
spec_node_name,
working_dir,
consensus,
p2p_listen,
rpc_client,
rpc_listen,
node_id: None,
guard: None,
inner: Arc::new(InnerNode {
spec_node_name,
working_dir,
consensus,
p2p_listen,
rpc_client,
rpc_listen,
node_id: RwLock::new(None),
guard: RwLock::new(None),
}),
}
}

pub fn rpc_client(&self) -> &RpcClient {
&self.rpc_client
&self.inner.rpc_client
}

pub fn working_dir(&self) -> PathBuf {
self.working_dir.clone()
self.inner.working_dir.clone()
}

pub fn log_path(&self) -> PathBuf {
self.working_dir().join("data/logs/run.log")
}

pub fn node_id(&self) -> &str {
pub fn node_id(&self) -> String {
// peer_id.to_base58()
self.node_id.as_ref().expect("uninitialized node_id")
self.inner
.node_id
.read()
.expect("read locked node_id")
.clone()
.expect("uninitialized node_id")
}

pub fn consensus(&self) -> &Consensus {
&self.consensus
&self.inner.consensus
}

pub fn p2p_listen(&self) -> String {
self.p2p_listen.clone()
self.inner.p2p_listen.clone()
}

pub fn rpc_listen(&self) -> String {
self.rpc_listen.clone()
self.inner.rpc_listen.clone()
}

pub fn p2p_address(&self) -> String {
Expand Down Expand Up @@ -678,21 +692,37 @@ impl Node {

self.wait_tx_pool_ready();

self.guard = Some(ProcessGuard {
name: self.spec_node_name.clone(),
self.set_process_guard(ProcessGuard {
name: self.inner.spec_node_name.clone(),
child: child_process,
killed: false,
});
self.node_id = Some(node_info.node_id);
self.set_node_id(node_info.node_id.as_str());
}

pub(crate) fn set_process_guard(&mut self, guard: ProcessGuard) {
let mut g = self.inner.guard.write().unwrap();
*g = Some(guard);
}

pub(crate) fn set_node_id(&mut self, node_id: &str) {
let mut n = self.inner.node_id.write().unwrap();
*n = Some(node_id.to_owned());
}

pub(crate) fn take_guard(&mut self) -> Option<ProcessGuard> {
let mut g = self.inner.guard.write().unwrap();
g.take()
}

pub fn stop(&mut self) {
drop(self.guard.take())
drop(self.take_guard());
}

#[cfg(not(target_os = "windows"))]
pub fn stop_gracefully(&mut self) {
if let Some(mut guard) = self.guard.take() {
let guard = self.take_guard();
if let Some(mut guard) = guard {
if !guard.killed {
// send SIGINT to the child
nix::sys::signal::kill(
Expand Down Expand Up @@ -749,11 +779,11 @@ pub fn connect_all(nodes: &[Node]) {
}

// TODO it will be removed out later, in another PR
pub fn disconnect_all(nodes: &[Node]) {
pub fn disconnect_all<N: Borrow<Node>>(nodes: &[N]) {
for node_a in nodes.iter() {
for node_b in nodes.iter() {
if node_a.p2p_address() != node_b.p2p_address() {
node_a.disconnect(node_b);
if node_a.borrow().p2p_address() != node_b.borrow().p2p_address() {
node_a.borrow().disconnect(node_b.borrow());
}
}
}
Expand All @@ -779,3 +809,39 @@ pub fn waiting_for_sync<N: Borrow<Node>>(nodes: &[N]) {
node.borrow().wait_for_tx_pool();
}
}

pub fn make_bootnodes_for_all<N: BorrowMut<Node>>(nodes: &mut [N]) {
let node_multiaddrs: HashMap<String, Multiaddr> = nodes
.iter()
.map(|n| {
(
n.borrow().node_id().to_owned(),
n.borrow().p2p_address().try_into().unwrap(),
)
})
.collect();
let other_node_addrs: Vec<Vec<Multiaddr>> = node_multiaddrs
.keys()
.map(|id| {
let addrs = node_multiaddrs
.iter()
.filter(|(other_id, _)| other_id.as_str() != id.as_str())
.map(|(_, addr)| addr.to_owned())
.collect::<Vec<_>>();
addrs
})
.collect();
for (i, node) in nodes.iter_mut().enumerate() {
node.borrow_mut()
.modify_app_config(|config: &mut CKBAppConfig| {
info!("Setting bootnodes to {:?}", other_node_addrs[i]);
config.network.bootnodes = other_node_addrs[i].clone();
})
}
// Restart nodes to make bootnodes work
for node in nodes.iter_mut() {
node.borrow_mut().stop();
node.borrow_mut().start();
info!("Restarted node {:?}", node.borrow_mut().node_id());
}
}
3 changes: 3 additions & 0 deletions test/src/specs/fault_injection/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod randomly_kill;

pub use randomly_kill::*;
31 changes: 31 additions & 0 deletions test/src/specs/fault_injection/randomly_kill.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use crate::{Node, Spec};

use ckb_logger::info;
use rand::{thread_rng, Rng};

pub struct RandomlyKill;

impl Spec for RandomlyKill {
crate::setup!(num_nodes: 1);

fn run(&self, nodes: &mut Vec<Node>) {
let mut rng = thread_rng();
let node = &mut nodes[0];
for _ in 0..rng.gen_range(10, 20) {
let n = rng.gen_range(0, 10);
// TODO: the kill of child process and mining are actually sequential here
// We need to find some way to so these two things in parallel.
// It would be great if we can kill and start the node externally (instead of writing
// rust code to manage all the nodes, because in that case we will have to fight
// ownership rules, and monitor node).
if n != 0 {
info!("Mining {} blocks", n);
node.mine(n);
}
info!("Stop the node");
node.stop();
info!("Start the node");
node.start();
}
}
}
2 changes: 2 additions & 0 deletions test/src/specs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod alert;
mod consensus;
mod dao;
mod fault_injection;
mod hardfork;
mod mining;
mod p2p;
Expand All @@ -12,6 +13,7 @@ mod tx_pool;
pub use alert::*;
pub use consensus::*;
pub use dao::*;
pub use fault_injection::*;
pub use hardfork::*;
pub use mining::*;
pub use p2p::*;
Expand Down
10 changes: 2 additions & 8 deletions test/src/specs/p2p/whitelist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,7 @@ impl Spec for WhitelistOnSessionLimit {
let rpc_client0 = node0.rpc_client();
let is_connect_peer_num_eq_2 = wait_until(10, || {
let peers = rpc_client0.get_peers();
peers.len() == 2
&& peers
.into_iter()
.all(|node| id_set.contains(&node.node_id.as_str()))
peers.len() == 2 && peers.into_iter().all(|node| id_set.contains(&node.node_id))
});

if !is_connect_peer_num_eq_2 {
Expand Down Expand Up @@ -78,10 +75,7 @@ impl Spec for WhitelistOnSessionLimit {
let rpc_client0 = node0.rpc_client();
let is_connect_peer_num_eq_3 = wait_until(10, || {
let peers = rpc_client0.get_peers();
peers.len() == 3
&& peers
.into_iter()
.all(|node| id_set.contains(&node.node_id.as_str()))
peers.len() == 3 && peers.into_iter().all(|node| id_set.contains(&node.node_id))
});

if !is_connect_peer_num_eq_3 {
Expand Down
2 changes: 2 additions & 0 deletions test/src/specs/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod invalid_block;
mod invalid_locator_size;
mod last_common_header;
mod sync_and_mine;
mod sync_churn;
mod sync_timeout;

pub use block_filter::*;
Expand All @@ -18,4 +19,5 @@ pub use invalid_block::*;
pub use invalid_locator_size::*;
pub use last_common_header::*;
pub use sync_and_mine::*;
pub use sync_churn::*;
pub use sync_timeout::*;
Loading

0 comments on commit 9f05dc5

Please sign in to comment.