Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.

More sync and propagation fixes #420

Merged
merged 10 commits into from
Feb 15, 2016
99 changes: 67 additions & 32 deletions sync/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ struct PeerInfo {
asking: PeerAsking,
/// A set of block numbers being requested
asking_blocks: Vec<BlockNumber>,
/// Holds requested header hash if currently requesting block header by hash
asking_hash: Option<H256>,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally headers and bodies are downloaded by number, unless downloading a header known from NewBlockHashes packet. Currently, if two NewBlockHash packets with the same hash arrive at the same time this may result in same header being requested twice. This was added to keep track of the requested hash.

/// Request timestamp
ask_time: f64,
}
Expand All @@ -179,6 +181,8 @@ pub struct ChainSync {
downloading_headers: HashSet<BlockNumber>,
/// Set of block body numbers being downloaded
downloading_bodies: HashSet<BlockNumber>,
/// Set of block headers being downloaded by hash
downloading_hashes: HashSet<H256>,
/// Downloaded headers.
headers: Vec<(BlockNumber, Vec<Header>)>, //TODO: use BTreeMap once range API is sable. For now it is a vector sorted in descending order
/// Downloaded bodies
Expand All @@ -195,6 +199,8 @@ pub struct ChainSync {
syncing_difficulty: U256,
/// True if common block for our and remote chain has been found
have_common_block: bool,
/// Last propagated block number
last_send_block_number: BlockNumber,
}

type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
Expand All @@ -208,6 +214,7 @@ impl ChainSync {
highest_block: None,
downloading_headers: HashSet::new(),
downloading_bodies: HashSet::new(),
downloading_hashes: HashSet::new(),
headers: Vec::new(),
bodies: Vec::new(),
peers: HashMap::new(),
Expand All @@ -216,6 +223,7 @@ impl ChainSync {
last_imported_hash: None,
syncing_difficulty: U256::from(0u64),
have_common_block: false,
last_send_block_number: 0,
}
}

Expand Down Expand Up @@ -248,6 +256,7 @@ impl ChainSync {
self.bodies.clear();
for (_, ref mut p) in &mut self.peers {
p.asking_blocks.clear();
p.asking_hash = None;
}
self.header_ids.clear();
self.syncing_difficulty = From::from(0u64);
Expand Down Expand Up @@ -277,11 +286,16 @@ impl ChainSync {
genesis: try!(r.val_at(4)),
asking: PeerAsking::Nothing,
asking_blocks: Vec::new(),
asking_hash: None,
ask_time: 0f64,
};

trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest, peer.genesis);

if self.peers.contains_key(&peer_id) {
warn!("Unexpected status packet from {}:{}", peer_id, io.peer_info(peer_id));
return Ok(());
}
let chain_info = io.chain().chain_info();
if peer.genesis != chain_info.genesis_hash {
io.disable_peer(peer_id);
Expand All @@ -294,10 +308,7 @@ impl ChainSync {
return Ok(());
}

let old = self.peers.insert(peer_id.clone(), peer);
if old.is_some() {
panic!("ChainSync: new peer already exists");
}
self.peers.insert(peer_id.clone(), peer);
info!(target: "sync", "Connected {}:{}", peer_id, io.peer_info(peer_id));
self.sync_peer(io, peer_id, false);
Ok(())
Expand Down Expand Up @@ -437,6 +448,10 @@ impl ChainSync {
trace!(target: "sync", "{} -> NewBlock ({})", peer_id, h);
let header: BlockHeader = try!(header_rlp.as_val());
let mut unknown = false;
{
let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this expect doesn't look like it's a fundamental logic error. if it is, add a comment explaining why it can never ever happen and other pre-condition assert!s in the code to make the proof clear.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i notice that there are others in the code, so happy for the whole lot to be cleared up in a later PR if preferable.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better fix that now

peer.latest = header.hash();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update peer's latest hash regardless if the block is known or not, this prevents sending out blocks peer already knows

}
// TODO: Decompose block and add to self.headers and self.bodies instead
if header.number == From::from(self.current_base_block() + 1) {
match io.chain().import_block(block_rlp.as_raw().to_vec()) {
Expand Down Expand Up @@ -469,10 +484,6 @@ impl ChainSync {
let peer_difficulty = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").difficulty;
if difficulty > peer_difficulty {
trace!(target: "sync", "Received block {:?} with no known parent. Peer needs syncing...", h);
{
let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer");
peer.latest = header.hash();
}
self.sync_peer(io, peer_id, true);
}
}
Expand All @@ -486,11 +497,14 @@ impl ChainSync {
return Ok(());
}
trace!(target: "sync", "{} -> NewHashes ({} entries)", peer_id, r.item_count());
let hashes = r.iter().map(|item| (item.val_at::<H256>(0), item.val_at::<U256>(1)));
let mut max_height: U256 = From::from(0);
let hashes = r.iter().map(|item| (item.val_at::<H256>(0), item.val_at::<BlockNumber>(1)));
let mut max_height: BlockNumber = 0;
for (rh, rd) in hashes {
let h = try!(rh);
let d = try!(rd);
if self.downloading_hashes.contains(&h) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check if this is already being request

continue;
}
match io.chain().block_status(BlockId::Hash(h.clone())) {
BlockStatus::InChain => {
trace!(target: "sync", "New block hash already in chain {:?}", h);
Expand All @@ -499,8 +513,8 @@ impl ChainSync {
trace!(target: "sync", "New hash block already queued {:?}", h);
},
BlockStatus::Unknown => {
trace!(target: "sync", "New unknown block hash {:?}", h);
if d > max_height {
trace!(target: "sync", "New unknown block hash {:?}", h);
let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer");
peer.latest = h.clone();
max_height = d;
Expand All @@ -513,7 +527,7 @@ impl ChainSync {
}
}
};
if max_height != x!(0) {
if max_height != 0 {
self.sync_peer(io, peer_id, true);
}
Ok(())
Expand All @@ -523,7 +537,7 @@ impl ChainSync {
pub fn on_peer_aborting(&mut self, io: &mut SyncIo, peer: PeerId) {
trace!(target: "sync", "== Disconnecting {}", peer);
if self.peers.contains_key(&peer) {
info!(target: "sync", "Disconnected {}:{}", peer, io.peer_info(peer));
info!(target: "sync", "Disconnected {}", peer);
self.clear_peer_download(peer);
self.peers.remove(&peer);
self.continue_sync(io);
Expand Down Expand Up @@ -581,6 +595,8 @@ impl ChainSync {
self.state = SyncState::Blocks;
}
trace!(target: "sync", "Starting sync with better chain");
self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").asking_hash = Some(peer_latest.clone());
self.downloading_hashes.insert(peer_latest.clone());
self.request_headers_by_hash(io, peer_id, &peer_latest, 1, 0, false);
}
else if self.state == SyncState::Blocks && io.chain().block_status(BlockId::Hash(peer_latest)) == BlockStatus::Unknown {
Expand Down Expand Up @@ -673,6 +689,8 @@ impl ChainSync {
}
}
else {
// continue search for common block
self.downloading_headers.insert(start as BlockNumber);
self.request_headers_by_number(io, peer_id, start as BlockNumber, 1, 0, false);
}
}
Expand All @@ -681,6 +699,9 @@ impl ChainSync {
/// Clear all blocks/headers marked as being downloaded by a peer.
fn clear_peer_download(&mut self, peer_id: PeerId) {
let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer");
if let Some(hash) = peer.asking_hash.take() {
self.downloading_hashes.remove(&hash);
}
for b in &peer.asking_blocks {
self.downloading_headers.remove(&b);
self.downloading_bodies.remove(&b);
Expand Down Expand Up @@ -827,7 +848,7 @@ impl ChainSync {
{
let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer");
if peer.asking != PeerAsking::Nothing {
warn!(target:"sync", "Asking {:?} while requesting {:?}", asking, peer.asking);
warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking);
}
}
match sync.send(peer_id, packet_id, packet) {
Expand All @@ -844,6 +865,14 @@ impl ChainSync {
}
}

/// Generic packet sender
fn send_packet(&mut self, sync: &mut SyncIo, peer_id: PeerId, packet_id: PacketId, packet: Bytes) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used to send out blocks/hashes/transactions. Does not change peer asking status

if let Err(e) = sync.send(peer_id, packet_id, packet) {
warn!(target:"sync", "Error sending packet: {:?}", e);
sync.disable_peer(peer_id);
self.on_peer_aborting(sync, peer_id);
}
}
/// Called when peer sends us new transactions
fn on_peer_transactions(&mut self, _io: &mut SyncIo, _peer_id: PeerId, _r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
Ok(())
Expand Down Expand Up @@ -1089,11 +1118,10 @@ impl ChainSync {
let latest_hash = chain_info.best_block_hash;
let latest_number = chain_info.best_block_number;
self.peers.iter().filter(|&(_, peer_info)|
match io.chain().block_status(BlockId::Hash(peer_info.latest.clone()))
{
match io.chain().block_status(BlockId::Hash(peer_info.latest.clone())) {
BlockStatus::InChain => {
let peer_number = HeaderView::new(&io.chain().block_header(BlockId::Hash(peer_info.latest.clone())).unwrap()).number();
peer_info.latest != latest_hash && latest_number > peer_number && latest_number - peer_number < MAX_PEER_LAG_PROPAGATION
peer_info.latest != latest_hash && latest_number > peer_number
},
_ => false
})
Expand All @@ -1102,7 +1130,7 @@ impl ChainSync {
}

/// propagades latest block to lagging peers
fn propagade_blocks(&mut self, io: &mut SyncIo) -> usize {
fn propagade_blocks(&mut self, local_best: &H256, io: &mut SyncIo) -> usize {
let updated_peers = {
let lagging_peers = self.get_lagging_peers(io);

Expand All @@ -1118,29 +1146,27 @@ impl ChainSync {
};

let mut sent = 0;
let local_best = io.chain().chain_info().best_block_hash;
for peer_id in updated_peers {
let rlp = ChainSync::create_latest_block_rlp(io.chain());
self.send_request(io, peer_id, PeerAsking::Nothing, NEW_BLOCK_PACKET, rlp);
self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp);
self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").latest = local_best.clone();
sent = sent + 1;
}
sent
}

/// propagades new known hashes to all peers
fn propagade_new_hashes(&mut self, io: &mut SyncIo) -> usize {
fn propagade_new_hashes(&mut self, local_best: &H256, io: &mut SyncIo) -> usize {
let updated_peers = self.get_lagging_peers(io);
let mut sent = 0;
let local_best = io.chain().chain_info().best_block_hash;
for peer_id in updated_peers {
sent = sent + match ChainSync::create_new_hashes_rlp(io.chain(), &self.peers.get(&peer_id).expect("ChainSync: unknown peer").latest, &local_best) {
Some(rlp) => {
{
let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer");
peer.latest = local_best.clone();
}
self.send_request(io, peer_id, PeerAsking::Nothing, NEW_BLOCK_HASHES_PACKET, rlp);
self.send_packet(io, peer_id, NEW_BLOCK_HASHES_PACKET, rlp);
1
},
None => 0
Expand All @@ -1152,15 +1178,19 @@ impl ChainSync {
/// Maintain other peers. Send out any new blocks and transactions
pub fn maintain_sync(&mut self, io: &mut SyncIo) {
self.check_resume(io);

let peers = self.propagade_new_hashes(io);
trace!(target: "sync", "Sent new hashes to peers: {:?}", peers);
}

/// should be called once chain has new block, triggers the latest block propagation
pub fn chain_blocks_verified(&mut self, io: &mut SyncIo) {
let peers = self.propagade_blocks(io);
trace!(target: "sync", "Sent latest block to peers: {:?}", peers);
let chain = io.chain().chain_info();
if (((chain.best_block_number as i64) - (self.last_send_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See top comment. This check is in cpp client also

let blocks = self.propagade_blocks(&chain.best_block_hash, io);
let hashes = self.propagade_new_hashes(&chain.best_block_hash, io);
if blocks != 0 || hashes != 0 {
trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes);
}
}
self.last_send_block_number = chain.best_block_number;
}
}

Expand Down Expand Up @@ -1291,6 +1321,7 @@ mod tests {
difficulty: U256::zero(),
asking: PeerAsking::Nothing,
asking_blocks: Vec::<BlockNumber>::new(),
asking_hash: None,
ask_time: 0f64,
});
sync
Expand Down Expand Up @@ -1332,9 +1363,10 @@ mod tests {
client.add_blocks(100, false);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let best_hash = client.chain_info().best_block_hash.clone();
let mut io = TestIo::new(&mut client, &mut queue, None);

let peer_count = sync.propagade_new_hashes(&mut io);
let peer_count = sync.propagade_new_hashes(&best_hash, &mut io);

// 1 message should be send
assert_eq!(1, io.queue.len());
Expand All @@ -1350,9 +1382,10 @@ mod tests {
client.add_blocks(100, false);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let best_hash = client.chain_info().best_block_hash.clone();
let mut io = TestIo::new(&mut client, &mut queue, None);

let peer_count = sync.propagade_blocks(&mut io);
let peer_count = sync.propagade_blocks(&best_hash, &mut io);

// 1 message should be send
assert_eq!(1, io.queue.len());
Expand Down Expand Up @@ -1454,9 +1487,10 @@ mod tests {
client.add_blocks(100, false);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let best_hash = client.chain_info().best_block_hash.clone();
let mut io = TestIo::new(&mut client, &mut queue, None);

sync.propagade_new_hashes(&mut io);
sync.propagade_new_hashes(&best_hash, &mut io);

let data = &io.queue[0].data.clone();
let result = sync.on_peer_new_hashes(&mut io, 0, &UntrustedRlp::new(&data));
Expand All @@ -1471,9 +1505,10 @@ mod tests {
client.add_blocks(100, false);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let best_hash = client.chain_info().best_block_hash.clone();
let mut io = TestIo::new(&mut client, &mut queue, None);

sync.propagade_blocks(&mut io);
sync.propagade_blocks(&best_hash, &mut io);

let data = &io.queue[0].data.clone();
let result = sync.on_peer_new_block(&mut io, 0, &UntrustedRlp::new(&data));
Expand Down
30 changes: 21 additions & 9 deletions sync/src/tests/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ fn status_empty() {
#[test]
fn status_packet() {
let mut net = TestNet::new(2);
net.peer_mut(0).chain.add_blocks(1000, false);
net.peer_mut(0).chain.add_blocks(100, false);
net.peer_mut(1).chain.add_blocks(1, false);

net.start();
Expand All @@ -122,18 +122,29 @@ fn status_packet() {

#[test]
fn propagade_hashes() {
let mut net = TestNet::new(3);
net.peer_mut(1).chain.add_blocks(1000, false);
net.peer_mut(2).chain.add_blocks(1000, false);
let mut net = TestNet::new(6);
net.peer_mut(1).chain.add_blocks(10, false);
net.sync();

net.peer_mut(0).chain.add_blocks(10, false);
net.sync_step_peer(0);
net.sync();
net.trigger_block_verified(0); //first event just sets the marker
net.trigger_block_verified(0);

// 2 peers to sync
assert_eq!(2, net.peer(0).queue.len());
// NEW_BLOCK_HASHES_PACKET
assert_eq!(0x01, net.peer(0).queue[0].packet_id);
// 5 peers to sync
assert_eq!(5, net.peer(0).queue.len());
let mut hashes = 0;
let mut blocks = 0;
for i in 0..5 {
if net.peer(0).queue[i].packet_id == 0x1 {
hashes += 1;
}
if net.peer(0).queue[i].packet_id == 0x7 {
blocks += 1;
}
}
assert!(blocks > 0);
assert!(hashes > 0);
}

#[test]
Expand All @@ -143,6 +154,7 @@ fn propagade_blocks() {
net.sync();

net.peer_mut(0).chain.add_blocks(10, false);
net.trigger_block_verified(0); //first event just sets the marker
net.trigger_block_verified(0);

assert!(!net.peer(0).queue.is_empty());
Expand Down
1 change: 1 addition & 0 deletions util/src/network/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
s.send_packet(self.protocol, packet_id as u8, &data).unwrap_or_else(|e| {
warn!(target: "net", "Send error: {:?}", e);
}); //TODO: don't copy vector data
try!(self.io.update_registration(peer));
},
_ => warn!(target: "net", "Send: Peer is not connected yet")
}
Expand Down