Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.

Disconnect peers on a fork #1738

Merged
merged 1 commit into from
Jul 27, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ethcore/res/ethereum/classic.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
"accountStartNonce": "0x00",
"maximumExtraDataSize": "0x20",
"minGasLimit": "0x1388",
"networkID" : "0x1"
"networkID" : "0x1",
"forkBlock": "0x1d4c00",
"forkCanonHash": "0x94365e3a8c0b35089c1d1195081fe7489b528a84b22199c916180db8b28ade7f"
},
"genesis": {
"seal": {
Expand Down
4 changes: 3 additions & 1 deletion ethcore/res/ethereum/frontier.json
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@
"accountStartNonce": "0x00",
"maximumExtraDataSize": "0x20",
"minGasLimit": "0x1388",
"networkID" : "0x1"
"networkID" : "0x1",
"forkBlock": "0x1d4c00",
"forkCanonHash": "0x4985f5ca3d2afbec36529aa96f74de3cc10a2a4a6c44f2157a57d2c6059a11bb"
},
"genesis": {
"seal": {
Expand Down
6 changes: 6 additions & 0 deletions ethcore/src/spec/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ pub struct CommonParams {
pub network_id: U256,
/// Minimum gas limit.
pub min_gas_limit: U256,
/// Fork block to check.
pub fork_block: Option<(BlockNumber, H256)>,
}

impl From<ethjson::spec::Params> for CommonParams {
Expand All @@ -47,6 +49,7 @@ impl From<ethjson::spec::Params> for CommonParams {
maximum_extra_data_size: p.maximum_extra_data_size.into(),
network_id: p.network_id.into(),
min_gas_limit: p.min_gas_limit.into(),
fork_block: if let (Some(n), Some(h)) = (p.fork_block, p.fork_hash) { Some((n.into(), h.into())) } else { None },
}
}
}
Expand Down Expand Up @@ -151,6 +154,9 @@ impl Spec {
/// Get the configured Network ID.
pub fn network_id(&self) -> U256 { self.params.network_id }

/// Get the configured network fork block.
pub fn fork_block(&self) -> Option<(BlockNumber, H256)> { self.params.fork_block }

/// Get the header of the genesis block.
pub fn genesis_header(&self) -> Header {
Header {
Expand Down
7 changes: 7 additions & 0 deletions json/src/spec/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
//! Spec params deserialization.

use uint::Uint;
use hash::H256;

/// Spec params.
#[derive(Debug, PartialEq, Deserialize)]
Expand All @@ -33,6 +34,12 @@ pub struct Params {
/// Minimum gas limit.
#[serde(rename="minGasLimit")]
pub min_gas_limit: Uint,
/// Option fork block number to check.
#[serde(rename="forkBlock")]
pub fork_block: Option<Uint>,
/// Expected fork block hash.
#[serde(rename="forkCanonHash")]
pub fork_hash: Option<H256>,
}

#[cfg(test)]
Expand Down
2 changes: 2 additions & 0 deletions json/src/spec/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ mod tests {
"maximumExtraDataSize": "0x20",
"minGasLimit": "0x1388",
"networkID" : "0x2"
"forkBlock": "0xffffffffffffffff",
"forkCanonHash": "0x0000000000000000000000000000000000000000000000000000000000000000",
},
"genesis": {
"seal": {
Expand Down
1 change: 1 addition & 0 deletions parity/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> {
Some(id) => id,
None => spec.network_id(),
};
sync_config.fork_block = spec.fork_block().clone();

// prepare account provider
let account_provider = Arc::new(try!(prepare_account_provider(&cmd.dirs, cmd.acc_conf)));
Expand Down
4 changes: 4 additions & 0 deletions sync/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, Peer
NetworkConfiguration as BasicNetworkConfiguration, NonReservedPeerMode};
use util::{TimerToken, U256, H256, UtilError, Secret, Populatable};
use ethcore::client::{BlockChainClient, ChainNotify};
use ethcore::header::BlockNumber;
use io::NetSyncIo;
use chain::{ChainSync, SyncStatus};
use std::net::{SocketAddr, AddrParseError};
Expand All @@ -38,13 +39,16 @@ pub struct SyncConfig {
pub max_download_ahead_blocks: usize,
/// Network ID
pub network_id: U256,
/// Fork block to check
pub fork_block: Option<(BlockNumber, H256)>,
}

impl Default for SyncConfig {
fn default() -> SyncConfig {
SyncConfig {
max_download_ahead_blocks: 20000,
network_id: U256::from(1),
fork_block: None,
}
}
}
Expand Down
80 changes: 68 additions & 12 deletions sync/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ const RECEIPTS_PACKET: u8 = 0x10;

const HEADERS_TIMEOUT_SEC: f64 = 15f64;
const BODIES_TIMEOUT_SEC: f64 = 5f64;
const FORK_HEADER_TIMEOUT_SEC: f64 = 3f64;

#[derive(Copy, Clone, Eq, PartialEq, Debug)]
/// Sync state
Expand Down Expand Up @@ -191,6 +192,7 @@ impl SyncStatus {
/// Peer data type requested
enum PeerAsking {
Nothing,
ForkHeader,
BlockHeaders,
BlockBodies,
Heads,
Expand Down Expand Up @@ -221,6 +223,14 @@ struct PeerInfo {
ask_time: f64,
/// Pending request is expird and result should be ignored
expired: bool,
/// Peer fork confirmed
confirmed: bool,
}

impl PeerInfo {
fn is_available(&self) -> bool {
self.confirmed && !self.expired
}
}

/// Blockchain sync handler.
Expand Down Expand Up @@ -254,6 +264,8 @@ pub struct ChainSync {
round_parents: VecDeque<(H256, H256)>,
/// Network ID
network_id: U256,
/// Optional fork block to check
fork_block: Option<(BlockNumber, H256)>,
}

type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
Expand All @@ -277,6 +289,7 @@ impl ChainSync {
round_parents: VecDeque::new(),
_max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks),
network_id: config.network_id,
fork_block: config.fork_block,
};
sync.reset();
sync
Expand All @@ -293,8 +306,8 @@ impl ChainSync {
highest_block_number: self.highest_block.map(|n| max(n, self.last_imported_block)),
blocks_received: if self.last_imported_block > self.starting_block { self.last_imported_block - self.starting_block } else { 0 },
blocks_total: match self.highest_block { Some(x) if x > self.starting_block => x - self.starting_block, _ => 0 },
num_peers: self.peers.len(),
num_active_peers: self.peers.values().filter(|p| p.asking != PeerAsking::Nothing).count(),
num_peers: self.peers.values().filter(|p| p.confirmed).count(),
num_active_peers: self.peers.values().filter(|p| p.confirmed && p.asking != PeerAsking::Nothing).count(),
mem_used:
self.blocks.heap_size()
+ self.peers.heap_size_of_children()
Expand All @@ -316,7 +329,7 @@ impl ChainSync {
p.asking_blocks.clear();
p.asking_hash = None;
// mark any pending requests as expired
if p.asking != PeerAsking::Nothing {
if p.asking != PeerAsking::Nothing && p.confirmed {
p.expired = true;
}
}
Expand Down Expand Up @@ -370,6 +383,7 @@ impl ChainSync {
asking_hash: None,
ask_time: 0f64,
expired: false,
confirmed: self.fork_block.is_none(),
};

trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis);
Expand Down Expand Up @@ -397,16 +411,41 @@ impl ChainSync {
self.peers.insert(peer_id.clone(), peer);
self.active_peers.insert(peer_id.clone());
debug!(target: "sync", "Connected {}:{}", peer_id, io.peer_info(peer_id));
self.sync_peer(io, peer_id, false);
if let Some((fork_block, _)) = self.fork_block {
self.request_headers_by_number(io, peer_id, fork_block, 1, 0, false, PeerAsking::ForkHeader);
} else {
self.sync_peer(io, peer_id, false);
}
Ok(())
}

#[cfg_attr(feature="dev", allow(cyclomatic_complexity))]
/// Called by peer once it has new block headers during sync
fn on_peer_block_headers(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
let confirmed = match self.peers.get_mut(&peer_id) {
Some(ref mut peer) if peer.asking == PeerAsking::ForkHeader => {
let item_count = r.item_count();
if item_count == 0 || (item_count == 1 && try!(r.at(0)).as_raw().sha3() == self.fork_block.unwrap().1) {
trace!(target: "sync", "{}: Confirmed peer", peer_id);
peer.asking = PeerAsking::Nothing;
peer.confirmed = true;
true
} else {
trace!(target: "sync", "{}: Fork mismatch", peer_id);
io.disconnect_peer(peer_id);
false
}
},
_ => false,
};
if confirmed {
self.sync_peer(io, peer_id, false);
return Ok(());
}

self.clear_peer_download(peer_id);
let expected_hash = self.peers.get(&peer_id).and_then(|p| p.asking_hash);
let expected_asking = if self.state == SyncState::ChainHead { PeerAsking::Heads } else { PeerAsking::BlockHeaders };
let expected_hash = self.peers.get(&peer_id).and_then(|p| p.asking_hash);
if !self.reset_peer_asking(peer_id, expected_asking) || expected_hash.is_none() {
trace!(target: "sync", "{}: Ignored unexpected headers", peer_id);
self.continue_sync(io);
Expand Down Expand Up @@ -474,14 +513,14 @@ impl ChainSync {

// Disable the peer for this syncing round if it gives invalid chain
if !valid_response {
trace!(target: "sync", "{} Deactivated for invalid headers response", peer_id);
self.deactivate_peer(io, peer_id);
trace!(target: "sync", "{} Disabled for invalid headers response", peer_id);
io.disable_peer(peer_id);
}

if headers.is_empty() {
// Peer does not have any new subchain heads, deactivate it nd try with another
trace!(target: "sync", "{} Deactivated for no data", peer_id);
self.deactivate_peer(io, peer_id);
trace!(target: "sync", "{} Disabled for no data", peer_id);
io.disable_peer(peer_id);
}
match self.state {
SyncState::ChainHead => {
Expand Down Expand Up @@ -692,15 +731,16 @@ impl ChainSync {

/// Resume downloading
fn continue_sync(&mut self, io: &mut SyncIo) {
let mut peers: Vec<(PeerId, U256)> = self.peers.iter().map(|(k, p)| (*k, p.difficulty.unwrap_or_else(U256::zero))).collect();
let mut peers: Vec<(PeerId, U256)> = self.peers.iter().filter_map(|(k, p)|
if p.is_available() { Some((*k, p.difficulty.unwrap_or_else(U256::zero))) } else { None }).collect();
thread_rng().shuffle(&mut peers); //TODO: sort by rating
trace!(target: "sync", "Syncing with {}/{} peers", self.active_peers.len(), peers.len());
for (p, _) in peers {
if self.active_peers.contains(&p) {
self.sync_peer(io, p, false);
}
}
if self.state != SyncState::Waiting && !self.peers.values().any(|p| p.asking != PeerAsking::Nothing && !p.expired) {
if self.state != SyncState::Waiting && !self.peers.values().any(|p| p.asking != PeerAsking::Nothing && p.is_available()) {
self.complete_sync();
}
}
Expand All @@ -726,7 +766,7 @@ impl ChainSync {
}
let (peer_latest, peer_difficulty) = {
let peer = self.peers.get_mut(&peer_id).unwrap();
if peer.asking != PeerAsking::Nothing {
if peer.asking != PeerAsking::Nothing || !peer.is_available() {
return;
}
if self.state == SyncState::Waiting {
Expand Down Expand Up @@ -924,6 +964,17 @@ impl ChainSync {
.asking_hash = Some(h.clone());
}

/// Request headers from a peer by block number
#[cfg_attr(feature="dev", allow(too_many_arguments))]
fn request_headers_by_number(&mut self, sync: &mut SyncIo, peer_id: PeerId, n: BlockNumber, count: usize, skip: usize, reverse: bool, asking: PeerAsking) {
trace!(target: "sync", "{} <- GetBlockHeaders: {} entries starting from {}", peer_id, count, n);
let mut rlp = RlpStream::new_list(4);
rlp.append(&n);
rlp.append(&count);
rlp.append(&skip);
rlp.append(&if reverse {1u32} else {0u32});
self.send_request(sync, peer_id, asking, GET_BLOCK_HEADERS_PACKET, rlp.out());
}
/// Request block bodies from a peer
fn request_bodies(&mut self, sync: &mut SyncIo, peer_id: PeerId, hashes: Vec<H256>) {
let mut rlp = RlpStream::new_list(hashes.len());
Expand Down Expand Up @@ -977,6 +1028,9 @@ impl ChainSync {
if !io.is_chain_queue_empty() {
return Ok(());
}
if self.peers.get(&peer_id).map_or(false, |p| p.confirmed) {
trace!(target: "sync", "{} Ignoring transactions from unconfirmed/unknown peer", peer_id);
}

let mut item_count = r.item_count();
trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count);
Expand Down Expand Up @@ -1212,6 +1266,7 @@ impl ChainSync {
PeerAsking::BlockHeaders | PeerAsking::Heads => (tick - peer.ask_time) > HEADERS_TIMEOUT_SEC,
PeerAsking::BlockBodies => (tick - peer.ask_time) > BODIES_TIMEOUT_SEC,
PeerAsking::Nothing => false,
PeerAsking::ForkHeader => (tick - peer.ask_time) > FORK_HEADER_TIMEOUT_SEC,
};
if timeout {
trace!(target:"sync", "Timeout {}", peer_id);
Expand Down Expand Up @@ -1629,6 +1684,7 @@ mod tests {
asking_hash: None,
ask_time: 0f64,
expired: false,
confirmed: false,
});
sync
}
Expand Down
21 changes: 20 additions & 1 deletion sync/src/tests/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

use util::*;
use ethcore::client::{BlockChainClient, BlockID, EachBlockWith};
use ethcore::client::{TestBlockChainClient, BlockChainClient, BlockID, EachBlockWith};
use chain::{SyncState};
use super::helpers::*;

Expand Down Expand Up @@ -95,6 +95,25 @@ fn forked() {
assert_eq!(net.peer(2).chain.numbers.read().deref(), &peer1_chain);
}

#[test]
fn net_hard_fork() {
::env_logger::init().ok();
let ref_client = TestBlockChainClient::new();
ref_client.add_blocks(50, EachBlockWith::Uncle);
{
let mut net = TestNet::new_with_fork(2, Some((50, ref_client.block_hash(BlockID::Number(50)).unwrap())));
net.peer_mut(0).chain.add_blocks(100, EachBlockWith::Uncle);
net.sync();
assert_eq!(net.peer(1).chain.chain_info().best_block_number, 100);
}
{
let mut net = TestNet::new_with_fork(2, Some((50, ref_client.block_hash(BlockID::Number(50)).unwrap())));
net.peer_mut(0).chain.add_blocks(100, EachBlockWith::Nothing);
net.sync();
assert_eq!(net.peer(1).chain.chain_info().best_block_number, 0);
}
}

#[test]
fn restart() {
let mut net = TestNet::new(3);
Expand Down
9 changes: 8 additions & 1 deletion sync/src/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

use util::*;
use ethcore::client::{TestBlockChainClient, BlockChainClient};
use ethcore::header::BlockNumber;
use io::SyncIo;
use chain::ChainSync;
use ::SyncConfig;
Expand Down Expand Up @@ -89,13 +90,19 @@ pub struct TestNet {

impl TestNet {
pub fn new(n: usize) -> TestNet {
Self::new_with_fork(n, None)
}

pub fn new_with_fork(n: usize, fork: Option<(BlockNumber, H256)>) -> TestNet {
let mut net = TestNet {
peers: Vec::new(),
started: false,
};
for _ in 0..n {
let chain = TestBlockChainClient::new();
let sync = ChainSync::new(SyncConfig::default(), &chain);
let mut config = SyncConfig::default();
config.fork_block = fork;
let sync = ChainSync::new(config, &chain);
net.peers.push(TestPeer {
sync: RwLock::new(sync),
chain: chain,
Expand Down