From ceb9770fb8e3b5f693d0085e9df5de0f9e872346 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Tue, 20 Nov 2018 15:02:57 +0000 Subject: [PATCH 1/9] Refactor sync to add priority tasks. --- ethcore/sync/src/api.rs | 50 ++++-- ethcore/sync/src/chain/handler.rs | 2 +- ethcore/sync/src/chain/mod.rs | 251 +++++++++++++++++---------- ethcore/sync/src/chain/propagator.rs | 2 +- ethcore/sync/src/chain/supplier.rs | 4 +- ethcore/sync/src/tests/helpers.rs | 4 +- 6 files changed, 197 insertions(+), 116 deletions(-) diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs index 8063ebecb1e..df2e6c83efd 100644 --- a/ethcore/sync/src/api.rs +++ b/ethcore/sync/src/api.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use std::sync::Arc; +use std::sync::{Arc, mpsc}; use std::collections::{HashMap, BTreeMap}; use std::io; use std::ops::Range; @@ -33,7 +33,7 @@ use ethcore::client::{BlockChainClient, ChainNotify, ChainRoute, ChainMessageTyp use ethcore::snapshot::SnapshotService; use ethcore::header::BlockNumber; use sync_io::NetSyncIo; -use chain::{ChainSync, SyncStatus as EthSyncStatus}; +use chain::{ChainSyncApi, SyncStatus as EthSyncStatus}; use std::net::{SocketAddr, AddrParseError}; use std::str::FromStr; use parking_lot::RwLock; @@ -228,6 +228,16 @@ impl AttachedProtocol { } } +/// A prioritized tasks run in a specialised timer. +/// Every task should be completed within a hard deadline, +/// if it's not it's either cancelled or split into multiple tasks. +/// NOTE These tasks might not complete at all, so anything +/// that happens here should work even if the task is cancelled. +#[derive(Debug)] +pub enum PriorityTask { + +} + /// EthSync initialization parameters. pub struct Params { /// Configuration. @@ -244,6 +254,8 @@ pub struct Params { pub network_config: NetworkConfiguration, /// Other protocols to attach. pub attached_protos: Vec, + /// Priority tasks channel + pub priority_tasks: mpsc::Receiver } /// Ethereum network protocol handler @@ -315,13 +327,18 @@ impl EthSync { }) }; - let chain_sync = ChainSync::new(params.config, &*params.chain, params.private_tx_handler.clone()); + let sync = ChainSyncApi::new( + params.config, + &*params.chain, + params.private_tx_handler.clone(), + params.priority_tasks, + ); let service = NetworkService::new(params.network_config.clone().into_basic()?, connection_filter)?; let sync = Arc::new(EthSync { network: service, eth_handler: Arc::new(SyncProtocolHandler { - sync: RwLock::new(chain_sync), + sync, chain: params.chain, snapshot_service: params.snapshot_service, overlay: RwLock::new(HashMap::new()), @@ -339,17 +356,17 @@ impl EthSync { impl SyncProvider for EthSync { /// Get sync status fn status(&self) -> EthSyncStatus { - self.eth_handler.sync.read().status() + self.eth_handler.sync.status() } /// Get sync peers fn peers(&self) -> Vec { self.network.with_context_eval(self.subprotocol_name, |ctx| { let peer_ids = self.network.connected_peers(); - let eth_sync = self.eth_handler.sync.read(); let light_proto = self.light_proto.as_ref(); - peer_ids.into_iter().filter_map(|peer_id| { + let peer_info = self.eth_handler.sync.peer_info(&peer_ids); + peer_ids.into_iter().zip(peer_info).filter_map(|(peer_id, peer_info)| { let session_info = match ctx.session_info(peer_id) { None => return None, Some(info) => info, @@ -361,7 +378,7 @@ impl SyncProvider for EthSync { capabilities: session_info.peer_capabilities.into_iter().map(|c| c.to_string()).collect(), remote_address: session_info.remote_address, local_address: session_info.local_address, - eth_info: eth_sync.peer_info(&peer_id), + eth_info: peer_info, pip_info: light_proto.as_ref().and_then(|lp| lp.peer_status(peer_id)).map(Into::into), }) }).collect() @@ -373,17 +390,14 @@ impl SyncProvider for EthSync { } fn transactions_stats(&self) -> BTreeMap { - let sync = self.eth_handler.sync.read(); - sync.transactions_stats() - .iter() - .map(|(hash, stats)| (*hash, stats.into())) - .collect() + self.eth_handler.sync.transactions_stats() } } const PEERS_TIMER: TimerToken = 0; const SYNC_TIMER: TimerToken = 1; const TX_TIMER: TimerToken = 2; +const PRIORITY_TIMER: TimerToken = 3; struct SyncProtocolHandler { /// Shared blockchain client. @@ -391,7 +405,7 @@ struct SyncProtocolHandler { /// Shared snapshot service. snapshot_service: Arc, /// Sync strategy - sync: RwLock, + sync: ChainSyncApi, /// Chain overlay used to cache data such as fork block. overlay: RwLock>, } @@ -402,11 +416,12 @@ impl NetworkProtocolHandler for SyncProtocolHandler { io.register_timer(PEERS_TIMER, Duration::from_millis(700)).expect("Error registering peers timer"); io.register_timer(SYNC_TIMER, Duration::from_millis(1100)).expect("Error registering sync timer"); io.register_timer(TX_TIMER, Duration::from_millis(1300)).expect("Error registering transactions timer"); + io.register_timer(PRIORITY_TIMER, Duration::from_millis(250)).expect("Error registering peers timer"); } } fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { - ChainSync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer, packet_id, data); + self.sync.dispatch_packet(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer, packet_id, data); } fn connected(&self, io: &NetworkContext, peer: &PeerId) { @@ -432,9 +447,8 @@ impl NetworkProtocolHandler for SyncProtocolHandler { match timer { PEERS_TIMER => self.sync.write().maintain_peers(&mut io), SYNC_TIMER => self.sync.write().maintain_sync(&mut io), - TX_TIMER => { - self.sync.write().propagate_new_transactions(&mut io); - }, + TX_TIMER => self.sync.write().propagate_new_transactions(&mut io), + PRIORITY_TIMER => self.sync.process_priority_queue(&mut io), _ => warn!("Unknown timer {} triggered.", timer), } } diff --git a/ethcore/sync/src/chain/handler.rs b/ethcore/sync/src/chain/handler.rs index e6702974349..582cab8ae64 100644 --- a/ethcore/sync/src/chain/handler.rs +++ b/ethcore/sync/src/chain/handler.rs @@ -635,7 +635,7 @@ impl SyncHandler { } /// Called when peer sends us new transactions - fn on_peer_transactions(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> { + fn on_peer_transactions(sync: &ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> { // Accept transactions only when fully synced if !io.is_chain_queue_empty() || (sync.state != SyncState::Idle && sync.state != SyncState::NewBlocks) { trace!(target: "sync", "{} Ignoring transactions while syncing", peer_id); diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index e0fc8ecddb6..b805e68a043 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -92,15 +92,15 @@ mod propagator; mod requester; mod supplier; -use std::sync::Arc; -use std::collections::{HashSet, HashMap}; +use std::sync::{Arc, mpsc}; +use std::collections::{HashSet, HashMap, BTreeMap}; use std::cmp; use std::time::{Duration, Instant}; use hash::keccak; use heapsize::HeapSizeOf; use ethereum_types::{H256, U256}; use fastmap::H256FastMap; -use parking_lot::RwLock; +use parking_lot::{Mutex, RwLock, RwLockWriteGuard}; use bytes::Bytes; use rlp::{Rlp, RlpStream, DecoderError}; use network::{self, PeerId, PacketId}; @@ -112,7 +112,7 @@ use super::{WarpSync, SyncConfig}; use block_sync::{BlockDownloader, DownloadAction}; use rand::Rng; use snapshot::{Snapshot}; -use api::{EthProtocolInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID}; +use api::{EthProtocolInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID, PriorityTask}; use private_tx::PrivateTxHandler; use transactions_stats::{TransactionsStats, Stats as TransactionStats}; use transaction::UnverifiedTransaction; @@ -120,7 +120,7 @@ use transaction::UnverifiedTransaction; use self::handler::SyncHandler; use self::propagator::SyncPropagator; use self::requester::SyncRequester; -use self::supplier::SyncSupplier; +pub(crate) use self::supplier::SyncSupplier; known_heap_size!(0, PeerInfo); @@ -375,6 +375,151 @@ pub mod random { pub type RlpResponseResult = Result, PacketDecodeError>; pub type Peers = HashMap; +/// Thread-safe wrapper for `ChainSync`. +pub struct ChainSyncApi { + /// The rest of sync data + sync: RwLock, + /// Priority tasks queue + priority_tasks: Mutex>, +} + +impl ChainSyncApi { + /// Creates new `ChainSyncApi` + pub fn new( + config: SyncConfig, + chain: &BlockChainClient, + private_tx_handler: Arc, + priority_tasks: mpsc::Receiver, + ) -> Self { + ChainSyncApi { + sync: RwLock::new(ChainSync::new(config, chain, private_tx_handler)), + priority_tasks: Mutex::new(priority_tasks), + } + } + + /// Gives `write` access to underlying `ChainSync` + pub fn write(&self) -> RwLockWriteGuard { + self.sync.write() + } + + /// Returns info about given list of peers + pub fn peer_info(&self, ids: &[PeerId]) -> Vec> { + let sync = self.sync.read(); + ids.iter().map(|id| sync.peer_info(id)).collect() + } + + /// Returns synchonization status + pub fn status(&self) -> SyncStatus { + self.sync.read().status() + } + + /// Returns transactions propagation statistics + pub fn transactions_stats(&self) -> BTreeMap { + self.sync.read().transactions_stats() + .iter() + .map(|(hash, stats)| (*hash, stats.into())) + .collect() + + } + + /// Dispatch incoming requests and responses + pub fn dispatch_packet(&self, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { + SyncSupplier::dispatch_packet(&self.sync, io, peer, packet_id, data) + } + + /// Process a priority propagation queue. + /// This task is run from a timer and should be time constrained. + /// Hence we set up a deadline for the execution and cancel the task if the deadline is exceeded. + /// + /// NOTE This method should only handle stuff that can be canceled and would reach other peers + /// by other means. + pub fn process_priority_queue(&self, io: &mut SyncIo) { + let deadline = Instant::now() + Duration::from_millis(250); + + unimplemented!() + } +} + +// Static methods +impl ChainSync { + /// Called when peer sends us new consensus packet + pub fn on_consensus_packet(io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> { + SyncHandler::on_consensus_packet(io, peer_id, r) + } + + /// creates rlp to send for the tree defined by 'from' and 'to' hashes + fn create_new_hashes_rlp(chain: &BlockChainClient, from: &H256, to: &H256) -> Option { + match chain.tree_route(from, to) { + Some(route) => { + let uncles = chain.find_uncles(from).unwrap_or_else(Vec::new); + match route.blocks.len() { + 0 => None, + _ => { + let mut blocks = route.blocks; + blocks.extend(uncles); + let mut rlp_stream = RlpStream::new_list(blocks.len()); + for block_hash in blocks { + let mut hash_rlp = RlpStream::new_list(2); + let number = chain.block_header(BlockId::Hash(block_hash.clone())) + .expect("chain.tree_route and chain.find_uncles only return hahses of blocks that are in the blockchain. qed.").number(); + hash_rlp.append(&block_hash); + hash_rlp.append(&number); + rlp_stream.append_raw(hash_rlp.as_raw(), 1); + } + Some(rlp_stream.out()) + } + } + }, + None => None + } + } + + /// creates rlp from block bytes and total difficulty + fn create_block_rlp(bytes: &Bytes, total_difficulty: U256) -> Bytes { + let mut rlp_stream = RlpStream::new_list(2); + rlp_stream.append_raw(bytes, 1); + rlp_stream.append(&total_difficulty); + rlp_stream.out() + } + + /// creates latest block rlp for the given client + fn create_latest_block_rlp(chain: &BlockChainClient) -> Bytes { + Self::create_block_rlp( + &chain.block(BlockId::Hash(chain.chain_info().best_block_hash)) + .expect("Best block always exists").into_inner(), + chain.chain_info().total_difficulty + ) + } + + /// creates given hash block rlp for the given client + fn create_new_block_rlp(chain: &BlockChainClient, hash: &H256) -> Bytes { + Self::create_block_rlp( + &chain.block(BlockId::Hash(hash.clone())).expect("Block has just been sealed; qed").into_inner(), + chain.block_total_difficulty(BlockId::Hash(hash.clone())).expect("Block has just been sealed; qed.") + ) + } + + fn select_random_peers(peers: &[PeerId]) -> Vec { + // take sqrt(x) peers + let mut peers = peers.to_vec(); + let mut count = (peers.len() as f64).powf(0.5).round() as usize; + count = cmp::min(count, MAX_PEERS_PROPAGATION); + count = cmp::max(count, MIN_PEERS_PROPAGATION); + random::new().shuffle(&mut peers); + peers.truncate(count); + peers + } + + fn get_init_state(warp_sync: WarpSync, chain: &BlockChainClient) -> SyncState { + let best_block = chain.chain_info().best_block_number; + match warp_sync { + WarpSync::Enabled => SyncState::WaitingPeers, + WarpSync::OnlyAndAfter(block) if block > best_block => SyncState::WaitingPeers, + _ => SyncState::Idle, + } + } +} + /// Blockchain sync handler. /// See module documentation for more details. pub struct ChainSync { @@ -417,10 +562,14 @@ pub struct ChainSync { impl ChainSync { /// Create a new instance of syncing strategy. - pub fn new(config: SyncConfig, chain: &BlockChainClient, private_tx_handler: Arc) -> ChainSync { + pub fn new( + config: SyncConfig, + chain: &BlockChainClient, + private_tx_handler: Arc, + ) -> Self { let chain_info = chain.chain_info(); let best_block = chain.chain_info().best_block_number; - let state = ChainSync::get_init_state(config.warp_sync, chain); + let state = Self::get_init_state(config.warp_sync, chain); let mut sync = ChainSync { state, @@ -445,15 +594,6 @@ impl ChainSync { sync } - fn get_init_state(warp_sync: WarpSync, chain: &BlockChainClient) -> SyncState { - let best_block = chain.chain_info().best_block_number; - match warp_sync { - WarpSync::Enabled => SyncState::WaitingPeers, - WarpSync::OnlyAndAfter(block) if block > best_block => SyncState::WaitingPeers, - _ => SyncState::Idle, - } - } - /// Returns synchonization status pub fn status(&self) -> SyncStatus { let last_imported_number = self.new_blocks.last_imported_block_number(); @@ -521,7 +661,7 @@ impl ChainSync { } } } - self.state = state.unwrap_or_else(|| ChainSync::get_init_state(self.warp_sync, io.chain())); + self.state = state.unwrap_or_else(|| Self::get_init_state(self.warp_sync, io.chain())); // Reactivate peers only if some progress has been made // since the last sync round of if starting fresh. self.active_peers = self.peers.keys().cloned().collect(); @@ -1004,58 +1144,6 @@ impl ChainSync { } } - /// creates rlp to send for the tree defined by 'from' and 'to' hashes - fn create_new_hashes_rlp(chain: &BlockChainClient, from: &H256, to: &H256) -> Option { - match chain.tree_route(from, to) { - Some(route) => { - let uncles = chain.find_uncles(from).unwrap_or_else(Vec::new); - match route.blocks.len() { - 0 => None, - _ => { - let mut blocks = route.blocks; - blocks.extend(uncles); - let mut rlp_stream = RlpStream::new_list(blocks.len()); - for block_hash in blocks { - let mut hash_rlp = RlpStream::new_list(2); - let number = chain.block_header(BlockId::Hash(block_hash.clone())) - .expect("chain.tree_route and chain.find_uncles only return hahses of blocks that are in the blockchain. qed.").number(); - hash_rlp.append(&block_hash); - hash_rlp.append(&number); - rlp_stream.append_raw(hash_rlp.as_raw(), 1); - } - Some(rlp_stream.out()) - } - } - }, - None => None - } - } - - /// creates rlp from block bytes and total difficulty - fn create_block_rlp(bytes: &Bytes, total_difficulty: U256) -> Bytes { - let mut rlp_stream = RlpStream::new_list(2); - rlp_stream.append_raw(bytes, 1); - rlp_stream.append(&total_difficulty); - rlp_stream.out() - } - - /// creates latest block rlp for the given client - fn create_latest_block_rlp(chain: &BlockChainClient) -> Bytes { - ChainSync::create_block_rlp( - &chain.block(BlockId::Hash(chain.chain_info().best_block_hash)) - .expect("Best block always exists").into_inner(), - chain.chain_info().total_difficulty - ) - } - - /// creates given hash block rlp for the given client - fn create_new_block_rlp(chain: &BlockChainClient, hash: &H256) -> Bytes { - ChainSync::create_block_rlp( - &chain.block(BlockId::Hash(hash.clone())).expect("Block has just been sealed; qed").into_inner(), - chain.block_total_difficulty(BlockId::Hash(hash.clone())).expect("Block has just been sealed; qed.") - ) - } - /// returns peer ids that have different blocks than our chain fn get_lagging_peers(&mut self, chain_info: &BlockChainInfo) -> Vec { let latest_hash = chain_info.best_block_hash; @@ -1073,17 +1161,6 @@ impl ChainSync { .collect::>() } - fn select_random_peers(peers: &[PeerId]) -> Vec { - // take sqrt(x) peers - let mut peers = peers.to_vec(); - let mut count = (peers.len() as f64).powf(0.5).round() as usize; - count = cmp::min(count, MAX_PEERS_PROPAGATION); - count = cmp::max(count, MIN_PEERS_PROPAGATION); - random::new().shuffle(&mut peers); - peers.truncate(count); - peers - } - fn get_consensus_peers(&self) -> Vec { self.peers.iter().filter_map(|(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_2.0 { Some(*id) } else { None }).collect() } @@ -1132,21 +1209,11 @@ impl ChainSync { } } - /// Dispatch incoming requests and responses - pub fn dispatch_packet(sync: &RwLock, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { - SyncSupplier::dispatch_packet(sync, io, peer, packet_id, data) - } - pub fn on_packet(&mut self, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { debug!(target: "sync", "{} -> Dispatching packet: {}", peer, packet_id); SyncHandler::on_packet(self, io, peer, packet_id, data); } - /// Called when peer sends us new consensus packet - pub fn on_consensus_packet(io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> { - SyncHandler::on_consensus_packet(io, peer_id, r) - } - /// Called by peer when it is disconnecting pub fn on_peer_aborting(&mut self, io: &mut SyncIo, peer: PeerId) { SyncHandler::on_peer_aborting(self, io, peer); @@ -1158,8 +1225,8 @@ impl ChainSync { } /// propagates new transactions to all peers - pub fn propagate_new_transactions(&mut self, io: &mut SyncIo) -> usize { - SyncPropagator::propagate_new_transactions(self, io) + pub fn propagate_new_transactions(&mut self, io: &mut SyncIo) { + SyncPropagator::propagate_new_transactions(self, io); } /// Broadcast consensus message to peers. diff --git a/ethcore/sync/src/chain/propagator.rs b/ethcore/sync/src/chain/propagator.rs index 82b592c4ba0..833081ba605 100644 --- a/ethcore/sync/src/chain/propagator.rs +++ b/ethcore/sync/src/chain/propagator.rs @@ -157,9 +157,9 @@ impl SyncPropagator { // Clear old transactions from stats sync.transactions_stats.retain(&all_transactions_hashes); - // sqrt(x)/x scaled to max u32 let block_number = io.chain().chain_info().best_block_number; + // sqrt(x)/x scaled to max u32 let lucky_peers = { peers.into_iter() .filter_map(|peer_id| { diff --git a/ethcore/sync/src/chain/supplier.rs b/ethcore/sync/src/chain/supplier.rs index 4bce0ef9850..1f8b8347394 100644 --- a/ethcore/sync/src/chain/supplier.rs +++ b/ethcore/sync/src/chain/supplier.rs @@ -404,7 +404,7 @@ mod test { io.sender = Some(2usize); - ChainSync::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, GET_NODE_DATA_PACKET, &node_request); + SyncSupplier::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, GET_NODE_DATA_PACKET, &node_request); assert_eq!(1, io.packets.len()); } @@ -446,7 +446,7 @@ mod test { assert_eq!(603, rlp_result.unwrap().1.out().len()); io.sender = Some(2usize); - ChainSync::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, GET_RECEIPTS_PACKET, &receipts_request); + SyncSupplier::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, GET_RECEIPTS_PACKET, &receipts_request); assert_eq!(1, io.packets.len()); } } diff --git a/ethcore/sync/src/tests/helpers.rs b/ethcore/sync/src/tests/helpers.rs index d75d71ea90a..d83597e5239 100644 --- a/ethcore/sync/src/tests/helpers.rs +++ b/ethcore/sync/src/tests/helpers.rs @@ -33,7 +33,7 @@ use ethcore::test_helpers; use sync_io::SyncIo; use io::{IoChannel, IoContext, IoHandler}; use api::WARP_SYNC_PROTOCOL_ID; -use chain::{ChainSync, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_3, PRIVATE_TRANSACTION_PACKET, SIGNED_PRIVATE_TRANSACTION_PACKET}; +use chain::{ChainSync, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_3, PRIVATE_TRANSACTION_PACKET, SIGNED_PRIVATE_TRANSACTION_PACKET, SyncSupplier}; use SyncConfig; use private_tx::SimplePrivateTxHandler; @@ -271,7 +271,7 @@ impl Peer for EthPeer { fn receive_message(&self, from: PeerId, msg: TestPacket) -> HashSet { let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, Some(from)); - ChainSync::dispatch_packet(&self.sync, &mut io, from, msg.packet_id, &msg.data); + SyncSupplier::dispatch_packet(&self.sync, &mut io, from, msg.packet_id, &msg.data); self.chain.flush(); io.to_disconnect.clone() } From 4a2e2b9387169f7bbe06899adeb22c37acd60ee3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Tue, 20 Nov 2018 19:25:39 +0000 Subject: [PATCH 2/9] Send priority tasks notifications. --- ethcore/service/src/service.rs | 8 ++++- ethcore/src/client/chain_notify.rs | 10 ++++++- ethcore/src/client/client.rs | 18 +++++++++-- ethcore/src/client/traits.rs | 5 ++++ ethcore/src/miner/miner.rs | 2 +- ethcore/src/verification/queue/mod.rs | 7 +++++ ethcore/sync/src/api.rs | 30 +++++++++++++++---- ethcore/sync/src/chain/handler.rs | 5 ++-- ethcore/sync/src/chain/mod.rs | 41 +++++++++++++++++++++----- ethcore/sync/src/chain/propagator.rs | 3 +- ethcore/sync/src/sync_io.rs | 2 +- ethcore/sync/src/transactions_stats.rs | 3 +- parity/modules.rs | 34 +++++++++++++-------- parity/run.rs | 14 +++++++-- util/fastmap/src/lib.rs | 6 ++-- 15 files changed, 146 insertions(+), 42 deletions(-) diff --git a/ethcore/service/src/service.rs b/ethcore/service/src/service.rs index 1763b8fd5ed..77429a4e56e 100644 --- a/ethcore/service/src/service.rs +++ b/ethcore/service/src/service.rs @@ -106,7 +106,13 @@ impl ClientService { info!("Configured for {} using {} engine", Colour::White.bold().paint(spec.name.clone()), Colour::Yellow.bold().paint(spec.engine.name())); let pruning = config.pruning; - let client = Client::new(config, &spec, blockchain_db.clone(), miner.clone(), io_service.channel())?; + let client = Client::new( + config, + &spec, + blockchain_db.clone(), + miner.clone(), + io_service.channel(), + )?; miner.set_io_channel(io_service.channel()); miner.set_in_chain_checker(&client.clone()); diff --git a/ethcore/src/client/chain_notify.rs b/ethcore/src/client/chain_notify.rs index ebfe7bdef5c..48fcaac174c 100644 --- a/ethcore/src/client/chain_notify.rs +++ b/ethcore/src/client/chain_notify.rs @@ -141,7 +141,15 @@ pub trait ChainNotify : Send + Sync { } /// fires when chain broadcasts a message - fn broadcast(&self, _message_type: ChainMessageType) {} + fn broadcast(&self, _message_type: ChainMessageType) { + // does nothing by default + } + + /// fires when new block is about to be imported + /// implementations should be light + fn block_pre_import(&self, _bytes: &Bytes) { + // does nothing by default + } /// fires when new transactions are received from a peer fn transactions_received(&self, diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 8412881a22e..ca0c3edc39f 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -881,7 +881,7 @@ impl Client { /// Flush the block import queue. pub fn flush_queue(&self) { self.importer.block_queue.flush(); - while !self.importer.block_queue.queue_info().is_empty() { + while !self.importer.block_queue.is_empty() { self.import_verified_blocks(); } } @@ -1423,8 +1423,15 @@ impl ImportBlock for Client { bail!(EthcoreErrorKind::Block(BlockError::UnknownParent(unverified.parent_hash()))); } + let raw = if self.importer.block_queue.is_empty() { Some(unverified.bytes.clone()) } else { None }; + match self.importer.block_queue.import(unverified) { - Ok(res) => Ok(res), + Ok(hash) => { + if let Some(raw) = raw { + self.notify(move |n| n.block_pre_import(&raw)); + } + Ok(hash) + }, // we only care about block errors (not import errors) Err((block, EthcoreError(EthcoreErrorKind::Block(err), _))) => { self.importer.bad_blocks.report(block.bytes, format!("{:?}", err)); @@ -1878,6 +1885,10 @@ impl BlockChainClient for Client { self.importer.block_queue.queue_info() } + fn is_queue_empty(&self) -> bool { + self.importer.block_queue.is_empty() + } + fn clear_queue(&self) { self.importer.block_queue.clear(); } @@ -2287,6 +2298,9 @@ impl ScheduleInfo for Client { impl ImportSealedBlock for Client { fn import_sealed_block(&self, block: SealedBlock) -> EthcoreResult { + let raw = block.rlp_bytes(); + self.notify(|n| n.block_pre_import(&raw)); + let h = block.header().hash(); let start = Instant::now(); let route = { diff --git a/ethcore/src/client/traits.rs b/ethcore/src/client/traits.rs index 5b78a54b34e..55d527013ec 100644 --- a/ethcore/src/client/traits.rs +++ b/ethcore/src/client/traits.rs @@ -300,6 +300,11 @@ pub trait BlockChainClient : Sync + Send + AccountData + BlockChain + CallContra /// Get block queue information. fn queue_info(&self) -> BlockQueueInfo; + /// Returns true if block queue is empty. + fn is_queue_empty(&self) -> bool { + self.queue_info().is_empty() + } + /// Clear block queue and abort all import activity. fn clear_queue(&self); diff --git a/ethcore/src/miner/miner.rs b/ethcore/src/miner/miner.rs index 39dac1f2b21..c73887800c8 100644 --- a/ethcore/src/miner/miner.rs +++ b/ethcore/src/miner/miner.rs @@ -576,7 +576,7 @@ impl Miner { trace!(target: "miner", "requires_reseal: sealing enabled"); // Disable sealing if there were no requests for SEALING_TIMEOUT_IN_BLOCKS - let had_requests = sealing.last_request.map(|last_request| + let had_requests = sealing.last_request.map(|last_request| best_block.saturating_sub(last_request) <= SEALING_TIMEOUT_IN_BLOCKS ).unwrap_or(false); diff --git a/ethcore/src/verification/queue/mod.rs b/ethcore/src/verification/queue/mod.rs index 9b1597439b0..b9242f47b06 100644 --- a/ethcore/src/verification/queue/mod.rs +++ b/ethcore/src/verification/queue/mod.rs @@ -583,6 +583,13 @@ impl VerificationQueue { result } + /// Returns true if there is nothing currently in the queue. + /// TODO [ToDr] Optimize to avoid locking + pub fn is_empty(&self) -> bool { + let v = &self.verification; + v.unverified.lock().is_empty() && v.verifying.lock().is_empty() && v.verified.lock().is_empty() + } + /// Get queue status. pub fn queue_info(&self) -> QueueInfo { use std::mem::size_of; diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs index df2e6c83efd..ba968bba62b 100644 --- a/ethcore/sync/src/api.rs +++ b/ethcore/sync/src/api.rs @@ -36,7 +36,7 @@ use sync_io::NetSyncIo; use chain::{ChainSyncApi, SyncStatus as EthSyncStatus}; use std::net::{SocketAddr, AddrParseError}; use std::str::FromStr; -use parking_lot::RwLock; +use parking_lot::{RwLock, Mutex}; use chain::{ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_62, PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3, PRIVATE_TRANSACTION_PACKET, SIGNED_PRIVATE_TRANSACTION_PACKET}; @@ -235,7 +235,10 @@ impl AttachedProtocol { /// that happens here should work even if the task is cancelled. #[derive(Debug)] pub enum PriorityTask { - + /// Propagate given block + PropagateBlock(Bytes), + /// Propagate a list of transactions + PropagateTransactions(Vec), } /// EthSync initialization parameters. @@ -254,8 +257,6 @@ pub struct Params { pub network_config: NetworkConfiguration, /// Other protocols to attach. pub attached_protos: Vec, - /// Priority tasks channel - pub priority_tasks: mpsc::Receiver } /// Ethereum network protocol handler @@ -272,6 +273,8 @@ pub struct EthSync { subprotocol_name: [u8; 3], /// Light subprotocol name. light_subprotocol_name: [u8; 3], + /// Priority tasks notification channel + priority_tasks: Mutex>, } fn light_params( @@ -327,11 +330,12 @@ impl EthSync { }) }; + let (priority_tasks_tx, priority_tasks_rx) = mpsc::channel(); let sync = ChainSyncApi::new( params.config, &*params.chain, params.private_tx_handler.clone(), - params.priority_tasks, + priority_tasks_rx, ); let service = NetworkService::new(params.network_config.clone().into_basic()?, connection_filter)?; @@ -347,10 +351,16 @@ impl EthSync { subprotocol_name: params.config.subprotocol_name, light_subprotocol_name: params.config.light_subprotocol_name, attached_protos: params.attached_protos, + priority_tasks: Mutex::new(priority_tasks_tx), }); Ok(sync) } + + /// Priority tasks producer + pub fn priority_tasks(&self) -> mpsc::Sender { + self.priority_tasks.lock().clone() + } } impl SyncProvider for EthSync { @@ -416,7 +426,8 @@ impl NetworkProtocolHandler for SyncProtocolHandler { io.register_timer(PEERS_TIMER, Duration::from_millis(700)).expect("Error registering peers timer"); io.register_timer(SYNC_TIMER, Duration::from_millis(1100)).expect("Error registering sync timer"); io.register_timer(TX_TIMER, Duration::from_millis(1300)).expect("Error registering transactions timer"); - io.register_timer(PRIORITY_TIMER, Duration::from_millis(250)).expect("Error registering peers timer"); + + io.register_timer(PRIORITY_TIMER, Duration::from_millis(50)).expect("Error registering peers timer"); } } @@ -455,6 +466,13 @@ impl NetworkProtocolHandler for SyncProtocolHandler { } impl ChainNotify for EthSync { + fn block_pre_import(&self, bytes: &Bytes) { + let bytes = bytes.clone(); + if let Err(e) = self.priority_tasks.lock().send(PriorityTask::PropagateBlock(bytes)) { + warn!(target: "sync", "Unexpected error during priority block propagation: {:?}", e); + } + } + fn new_blocks(&self, imported: Vec, invalid: Vec, diff --git a/ethcore/sync/src/chain/handler.rs b/ethcore/sync/src/chain/handler.rs index 582cab8ae64..b750628cd94 100644 --- a/ethcore/sync/src/chain/handler.rs +++ b/ethcore/sync/src/chain/handler.rs @@ -29,7 +29,6 @@ use rlp::Rlp; use snapshot::ChunkType; use std::cmp; use std::mem; -use std::collections::HashSet; use std::time::Instant; use sync_io::SyncIo; @@ -578,8 +577,8 @@ impl SyncHandler { asking_blocks: Vec::new(), asking_hash: None, ask_time: Instant::now(), - last_sent_transactions: HashSet::new(), - last_sent_private_transactions: HashSet::new(), + last_sent_transactions: Default::default(), + last_sent_private_transactions: Default::default(), expired: false, confirmation: if sync.fork_block.is_none() { ForkConfirmation::Confirmed } else { ForkConfirmation::Unconfirmed }, asking_snapshot_data: None, diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index b805e68a043..7dcab7698a2 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -99,7 +99,7 @@ use std::time::{Duration, Instant}; use hash::keccak; use heapsize::HeapSizeOf; use ethereum_types::{H256, U256}; -use fastmap::H256FastMap; +use fastmap::{H256FastMap, H256FastSet}; use parking_lot::{Mutex, RwLock, RwLockWriteGuard}; use bytes::Bytes; use rlp::{Rlp, RlpStream, DecoderError}; @@ -323,9 +323,9 @@ pub struct PeerInfo { /// Request timestamp ask_time: Instant, /// Holds a set of transactions recently sent to this peer to avoid spamming. - last_sent_transactions: HashSet, + last_sent_transactions: H256FastSet, /// Holds a set of private transactions and their signatures recently sent to this peer to avoid spamming. - last_sent_private_transactions: HashSet, + last_sent_private_transactions: H256FastSet, /// Pending request is expired and result should be ignored expired: bool, /// Peer fork confirmation status @@ -376,11 +376,13 @@ pub type RlpResponseResult = Result, PacketDecodeE pub type Peers = HashMap; /// Thread-safe wrapper for `ChainSync`. +/// +/// NOTE always lock in order of fields declaration pub struct ChainSyncApi { - /// The rest of sync data - sync: RwLock, /// Priority tasks queue priority_tasks: Mutex>, + /// The rest of sync data + sync: RwLock, } impl ChainSyncApi { @@ -419,7 +421,6 @@ impl ChainSyncApi { .iter() .map(|(hash, stats)| (*hash, stats.into())) .collect() - } /// Dispatch incoming requests and responses @@ -433,10 +434,34 @@ impl ChainSyncApi { /// /// NOTE This method should only handle stuff that can be canceled and would reach other peers /// by other means. - pub fn process_priority_queue(&self, io: &mut SyncIo) { + pub fn process_priority_queue(&self, _io: &mut SyncIo) { let deadline = Instant::now() + Duration::from_millis(250); + let check_deadline = || { + let now = Instant::now(); + if now > deadline { + None + } else { + Some(deadline - now) + } + }; + + let work = || { + let tasks = self.priority_tasks.try_lock_until(deadline)?; + let mut _sync = self.sync.try_write_until(deadline)?; + let left = check_deadline()?; + let task = tasks.recv_timeout(left).ok()?; + + match task { + PriorityTask::PropagateBlock(_) => info!("Propagating block"), + PriorityTask::PropagateTransactions(_) => info!("Propagating transactions"), + } - unimplemented!() + Some(()) + }; + + if work().is_none() { + debug!(target: "sync", "Unable to complete priority task within deadline."); + } } } diff --git a/ethcore/sync/src/chain/propagator.rs b/ethcore/sync/src/chain/propagator.rs index 833081ba605..d4a7cca71ff 100644 --- a/ethcore/sync/src/chain/propagator.rs +++ b/ethcore/sync/src/chain/propagator.rs @@ -18,6 +18,7 @@ use bytes::Bytes; use ethereum_types::H256; use ethcore::client::BlockChainInfo; use ethcore::header::BlockNumber; +use fastmap::H256FastSet; use network::{PeerId, PacketId}; use rand::Rng; use rlp::{Encodable, RlpStream}; @@ -147,7 +148,7 @@ impl SyncPropagator { fn propagate_transactions_to_peers(sync: &mut ChainSync, io: &mut SyncIo, peers: Vec, transactions: Vec<&SignedTransaction>) -> HashSet { let all_transactions_hashes = transactions.iter() .map(|tx| tx.hash()) - .collect::>(); + .collect::(); let all_transactions_rlp = { let mut packet = RlpStream::new_list(transactions.len()); for tx in &transactions { packet.append(&**tx); } diff --git a/ethcore/sync/src/sync_io.rs b/ethcore/sync/src/sync_io.rs index c7704724c66..a5e9f7b2f44 100644 --- a/ethcore/sync/src/sync_io.rs +++ b/ethcore/sync/src/sync_io.rs @@ -52,7 +52,7 @@ pub trait SyncIo { fn protocol_version(&self, protocol: &ProtocolId, peer_id: PeerId) -> u8; /// Returns if the chain block queue empty fn is_chain_queue_empty(&self) -> bool { - self.chain().queue_info().is_empty() + self.chain().is_queue_empty() } /// Check if the session is expired fn is_expired(&self) -> bool; diff --git a/ethcore/sync/src/transactions_stats.rs b/ethcore/sync/src/transactions_stats.rs index 4d11dcf6809..7d5e2ca4a86 100644 --- a/ethcore/sync/src/transactions_stats.rs +++ b/ethcore/sync/src/transactions_stats.rs @@ -15,6 +15,7 @@ // along with Parity. If not, see . use api::TransactionStats; +use std::hash::BuildHasher; use std::collections::{HashSet, HashMap}; use ethereum_types::{H256, H512}; use fastmap::H256FastMap; @@ -74,7 +75,7 @@ impl TransactionsStats { } /// Retains only transactions present in given `HashSet`. - pub fn retain(&mut self, hashes: &HashSet) { + pub fn retain(&mut self, hashes: &HashSet) { let to_remove = self.pending_transactions.keys() .filter(|hash| !hashes.contains(hash)) .cloned() diff --git a/parity/modules.rs b/parity/modules.rs index e12e8ee4583..ac84aea5f21 100644 --- a/parity/modules.rs +++ b/parity/modules.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use std::sync::Arc; +use std::sync::{Arc, mpsc}; use ethcore::client::BlockChainClient; use sync::{self, AttachedProtocol, SyncConfig, NetworkConfiguration, Params, ConnectionFilter}; @@ -25,12 +25,17 @@ pub use sync::{EthSync, SyncProvider, ManageNetwork, PrivateTxHandler}; pub use ethcore::client::ChainNotify; use ethcore_logger::Config as LogConfig; -pub type SyncModules = (Arc, Arc, Arc); +pub type SyncModules = ( + Arc, + Arc, + Arc, + mpsc::Sender, +); pub fn sync( - sync_cfg: SyncConfig, - net_cfg: NetworkConfiguration, - client: Arc, + config: SyncConfig, + network_config: NetworkConfiguration, + chain: Arc, snapshot_service: Arc, private_tx_handler: Arc, provider: Arc, @@ -39,15 +44,20 @@ pub fn sync( connection_filter: Option>, ) -> Result { let eth_sync = EthSync::new(Params { - config: sync_cfg, - chain: client, - provider: provider, - snapshot_service: snapshot_service, + config, + chain, + provider, + snapshot_service, private_tx_handler, - network_config: net_cfg, - attached_protos: attached_protos, + network_config, + attached_protos, }, connection_filter)?; - Ok((eth_sync.clone() as Arc, eth_sync.clone() as Arc, eth_sync.clone() as Arc)) + Ok(( + eth_sync.clone() as Arc, + eth_sync.clone() as Arc, + eth_sync.clone() as Arc, + eth_sync.priority_tasks() + )) } diff --git a/parity/run.rs b/parity/run.rs index 03dbcaffb0b..c037b02766e 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -484,7 +484,6 @@ fn execute_impl(cmd: RunCmd, logger: Arc, on_client_rq: cmd.gas_pricer_conf.to_gas_pricer(fetch.clone(), runtime.executor()), &spec, Some(account_provider.clone()), - )); miner.set_author(cmd.miner_extras.author, None).expect("Fails only if password is Some; password is None; qed"); miner.set_gas_range_target(cmd.miner_extras.gas_range_target); @@ -641,7 +640,7 @@ fn execute_impl(cmd: RunCmd, logger: Arc, on_client_rq: }; // create sync object - let (sync_provider, manage_network, chain_notify) = modules::sync( + let (sync_provider, manage_network, chain_notify, priority_tasks) = modules::sync( sync_config, net_conf.clone().into(), client.clone(), @@ -655,6 +654,15 @@ fn execute_impl(cmd: RunCmd, logger: Arc, on_client_rq: service.add_notify(chain_notify.clone()); + // Propagate transactions as soon as they are imported. + let tx = ::parking_lot::Mutex::new(priority_tasks); + miner.add_transactions_listener(Box::new(move |hashes| { + let res = tx.lock().send(::sync::PriorityTask::PropagateTransactions(hashes.iter().cloned().collect())); + if let Err(err) = res { + warn!("Unexpected error when sending priority task: {:?}", err); + } + })); + // provider not added to a notification center is effectively disabled // TODO [debris] refactor it later on if cmd.private_tx_enabled { @@ -740,7 +748,7 @@ fn execute_impl(cmd: RunCmd, logger: Arc, on_client_rq: let secretstore_deps = secretstore::Dependencies { client: client.clone(), sync: sync_provider.clone(), - miner: miner, + miner: miner.clone(), account_provider: account_provider, accounts_passwords: &passwords, }; diff --git a/util/fastmap/src/lib.rs b/util/fastmap/src/lib.rs index 135ce54babe..65dd9dfb4a0 100644 --- a/util/fastmap/src/lib.rs +++ b/util/fastmap/src/lib.rs @@ -21,11 +21,13 @@ extern crate plain_hasher; use ethereum_types::H256; use std::hash; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use plain_hasher::PlainHasher; /// Specialized version of `HashMap` with H256 keys and fast hashing function. pub type H256FastMap = HashMap>; +/// Specialized version of HashSet with H256 values and fast hashing function. +pub type H256FastSet = HashSet>; #[cfg(test)] mod tests { @@ -36,4 +38,4 @@ mod tests { let mut h = H256FastMap::default(); h.insert(H256::from(123), "abc"); } -} \ No newline at end of file +} From 79bbf075aa7ebe0fe5555596353960e232aa1e8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Wed, 21 Nov 2018 18:18:04 +0000 Subject: [PATCH 3/9] Propagate blocks, optimize transactions. --- ethcore/src/client/chain_notify.rs | 4 +- ethcore/src/client/client.rs | 11 ++-- ethcore/sync/src/api.rs | 23 +++++-- ethcore/sync/src/chain/handler.rs | 11 +--- ethcore/sync/src/chain/mod.rs | 90 +++++++++++++++++++++------- ethcore/sync/src/chain/propagator.rs | 56 ++++++++--------- ethcore/sync/src/chain/supplier.rs | 38 +++++++++++- parity/run.rs | 3 +- 8 files changed, 163 insertions(+), 73 deletions(-) diff --git a/ethcore/src/client/chain_notify.rs b/ethcore/src/client/chain_notify.rs index 48fcaac174c..334e54a241e 100644 --- a/ethcore/src/client/chain_notify.rs +++ b/ethcore/src/client/chain_notify.rs @@ -15,7 +15,7 @@ // along with Parity. If not, see . use bytes::Bytes; -use ethereum_types::H256; +use ethereum_types::{H256, U256}; use transaction::UnverifiedTransaction; use blockchain::ImportRoute; use std::time::Duration; @@ -147,7 +147,7 @@ pub trait ChainNotify : Send + Sync { /// fires when new block is about to be imported /// implementations should be light - fn block_pre_import(&self, _bytes: &Bytes) { + fn block_pre_import(&self, _bytes: &Bytes, _difficulty: &U256) { // does nothing by default } diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index ca0c3edc39f..8a393a1e0ea 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -1423,12 +1423,14 @@ impl ImportBlock for Client { bail!(EthcoreErrorKind::Block(BlockError::UnknownParent(unverified.parent_hash()))); } - let raw = if self.importer.block_queue.is_empty() { Some(unverified.bytes.clone()) } else { None }; + let raw = if self.importer.block_queue.is_empty() { + Some((unverified.bytes.clone(), *unverified.header.difficulty())) + } else { None }; match self.importer.block_queue.import(unverified) { Ok(hash) => { - if let Some(raw) = raw { - self.notify(move |n| n.block_pre_import(&raw)); + if let Some((raw, difficulty)) = raw { + self.notify(move |n| n.block_pre_import(&raw, &difficulty)); } Ok(hash) }, @@ -2299,7 +2301,8 @@ impl ScheduleInfo for Client { impl ImportSealedBlock for Client { fn import_sealed_block(&self, block: SealedBlock) -> EthcoreResult { let raw = block.rlp_bytes(); - self.notify(|n| n.block_pre_import(&raw)); + let difficulty = *block.header().difficulty(); + self.notify(|n| n.block_pre_import(&raw, &difficulty)); let h = block.header().hash(); let start = Instant::now(); diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs index ba968bba62b..85eae67643e 100644 --- a/ethcore/sync/src/api.rs +++ b/ethcore/sync/src/api.rs @@ -236,9 +236,16 @@ impl AttachedProtocol { #[derive(Debug)] pub enum PriorityTask { /// Propagate given block - PropagateBlock(Bytes), + PropagateBlock { + /// When the task was initiated + started: ::std::time::Instant, + /// Raw block RLP to propagate + block: Bytes, + /// Blocks difficulty + difficulty: U256, + }, /// Propagate a list of transactions - PropagateTransactions(Vec), + PropagateTransactions(::std::time::Instant, Vec), } /// EthSync initialization parameters. @@ -427,7 +434,7 @@ impl NetworkProtocolHandler for SyncProtocolHandler { io.register_timer(SYNC_TIMER, Duration::from_millis(1100)).expect("Error registering sync timer"); io.register_timer(TX_TIMER, Duration::from_millis(1300)).expect("Error registering transactions timer"); - io.register_timer(PRIORITY_TIMER, Duration::from_millis(50)).expect("Error registering peers timer"); + io.register_timer(PRIORITY_TIMER, Duration::from_millis(100)).expect("Error registering peers timer"); } } @@ -466,9 +473,13 @@ impl NetworkProtocolHandler for SyncProtocolHandler { } impl ChainNotify for EthSync { - fn block_pre_import(&self, bytes: &Bytes) { - let bytes = bytes.clone(); - if let Err(e) = self.priority_tasks.lock().send(PriorityTask::PropagateBlock(bytes)) { + fn block_pre_import(&self, bytes: &Bytes, difficulty: &U256) { + let task = PriorityTask::PropagateBlock { + started: ::std::time::Instant::now(), + block: bytes.clone(), + difficulty: *difficulty, + }; + if let Err(e) = self.priority_tasks.lock().send(task) { warn!(target: "sync", "Unexpected error during priority block propagation: {:?}", e); } } diff --git a/ethcore/sync/src/chain/handler.rs b/ethcore/sync/src/chain/handler.rs index b750628cd94..104a80320e9 100644 --- a/ethcore/sync/src/chain/handler.rs +++ b/ethcore/sync/src/chain/handler.rs @@ -57,7 +57,6 @@ use super::{ SNAPSHOT_DATA_PACKET, SNAPSHOT_MANIFEST_PACKET, STATUS_PACKET, - TRANSACTIONS_PACKET, }; /// The Chain Sync Handler: handles responses from peers @@ -66,14 +65,9 @@ pub struct SyncHandler; impl SyncHandler { /// Handle incoming packet from peer pub fn on_packet(sync: &mut ChainSync, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { - if packet_id != STATUS_PACKET && !sync.peers.contains_key(&peer) { - debug!(target:"sync", "Unexpected packet {} from unregistered peer: {}:{}", packet_id, peer, io.peer_info(peer)); - return; - } let rlp = Rlp::new(data); let result = match packet_id { STATUS_PACKET => SyncHandler::on_peer_status(sync, io, peer, &rlp), - TRANSACTIONS_PACKET => SyncHandler::on_peer_transactions(sync, io, peer, &rlp), BLOCK_HEADERS_PACKET => SyncHandler::on_peer_block_headers(sync, io, peer, &rlp), BLOCK_BODIES_PACKET => SyncHandler::on_peer_block_bodies(sync, io, peer, &rlp), RECEIPTS_PACKET => SyncHandler::on_peer_block_receipts(sync, io, peer, &rlp), @@ -108,10 +102,9 @@ impl SyncHandler { } /// Called when peer sends us new consensus packet - pub fn on_consensus_packet(io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> { + pub fn on_consensus_packet(io: &mut SyncIo, peer_id: PeerId, r: &Rlp) { trace!(target: "sync", "Received consensus packet from {:?}", peer_id); io.chain().queue_consensus_message(r.as_raw().to_vec()); - Ok(()) } /// Called by peer when it is disconnecting @@ -634,7 +627,7 @@ impl SyncHandler { } /// Called when peer sends us new transactions - fn on_peer_transactions(sync: &ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> { + pub fn on_peer_transactions(sync: &ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> { // Accept transactions only when fully synced if !io.is_chain_queue_empty() || (sync.state != SyncState::Idle && sync.state != SyncState::NewBlocks) { trace!(target: "sync", "{} Ignoring transactions while syncing", peer_id); diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index 7dcab7698a2..c70705f91f2 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -102,7 +102,7 @@ use ethereum_types::{H256, U256}; use fastmap::{H256FastMap, H256FastSet}; use parking_lot::{Mutex, RwLock, RwLockWriteGuard}; use bytes::Bytes; -use rlp::{Rlp, RlpStream, DecoderError}; +use rlp::{RlpStream, DecoderError}; use network::{self, PeerId, PacketId}; use ethcore::header::{BlockNumber}; use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo, BlockQueueInfo}; @@ -434,26 +434,61 @@ impl ChainSyncApi { /// /// NOTE This method should only handle stuff that can be canceled and would reach other peers /// by other means. - pub fn process_priority_queue(&self, _io: &mut SyncIo) { - let deadline = Instant::now() + Duration::from_millis(250); - let check_deadline = || { + pub fn process_priority_queue(&self, io: &mut SyncIo) { + fn check_deadline(deadline: Instant) -> Option { let now = Instant::now(); if now > deadline { None } else { Some(deadline - now) } - }; + } - let work = || { - let tasks = self.priority_tasks.try_lock_until(deadline)?; - let mut _sync = self.sync.try_write_until(deadline)?; - let left = check_deadline()?; - let task = tasks.recv_timeout(left).ok()?; + // deadline to get the task from the queue + let deadline = Instant::now() + Duration::from_millis(150); + let mut work = || { + let task = { + let tasks = self.priority_tasks.try_lock_until(deadline)?; + let left = check_deadline(deadline)?; + tasks.recv_timeout(left).ok()? + }; + // wait for the sync lock indefinitely, + // since we already have the item. + // Subsequent timers will just try to process + // other tasks. + let mut sync = self.sync.write(); + // since `sync` might take a while to acquire we have a new deadline + // to do the rest of the job now. + let deadline = Instant::now() + Duration::from_millis(100); + + let as_us = move |prev| { + let dur: Duration = Instant::now() - prev; + dur.as_secs() * 1_000_000 + dur.subsec_micros() as u64 + }; match task { - PriorityTask::PropagateBlock(_) => info!("Propagating block"), - PriorityTask::PropagateTransactions(_) => info!("Propagating transactions"), + // NOTE We can't simply use existing methods, + // cause the block is not in the DB yet. + PriorityTask::PropagateBlock { started, block, difficulty } => { + // try to send to peers that are on the same block as us + // (they will most likely accept the new block). + info!("Starting block propagation, took: {}µs", as_us(started)); + let chain_info = io.chain().chain_info(); + let total_difficulty = chain_info.total_difficulty + difficulty; + let rlp = ChainSync::create_block_rlp(&block, total_difficulty); + for peer in sync.get_peers(&chain_info, PeerState::SameBlock) { + check_deadline(deadline)?; + SyncPropagator::send_packet(io, peer, NEW_BLOCK_PACKET, rlp.clone()); + // TODO [ToDr] Update peer latest block? + } + + info!("Finished block propagation, took: {}µs", as_us(started)); + }, + PriorityTask::PropagateTransactions(time, txs) => info!( + "Propagating transactions {}, took {}µs", + txs.len(), + as_us(time) + ), } Some(()) @@ -467,11 +502,6 @@ impl ChainSyncApi { // Static methods impl ChainSync { - /// Called when peer sends us new consensus packet - pub fn on_consensus_packet(io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> { - SyncHandler::on_consensus_packet(io, peer_id, r) - } - /// creates rlp to send for the tree defined by 'from' and 'to' hashes fn create_new_hashes_rlp(chain: &BlockChainClient, from: &H256, to: &H256) -> Option { match chain.tree_route(from, to) { @@ -545,6 +575,14 @@ impl ChainSync { } } +/// A peer query method for getting a list of peers +enum PeerState { + /// Peer is on different hash than us + Lagging, + /// Peer is on the same block as us + SameBlock +} + /// Blockchain sync handler. /// See module documentation for more details. pub struct ChainSync { @@ -1169,15 +1207,24 @@ impl ChainSync { } } - /// returns peer ids that have different blocks than our chain - fn get_lagging_peers(&mut self, chain_info: &BlockChainInfo) -> Vec { + /// returns peer ids that have different block than our chain + fn get_lagging_peers(&self, chain_info: &BlockChainInfo) -> Vec { + self.get_peers(chain_info, PeerState::Lagging) + } + + /// returns peer ids that have different or the same blocks than our chain + fn get_peers(&self, chain_info: &BlockChainInfo, peers: PeerState) -> Vec { let latest_hash = chain_info.best_block_hash; self .peers - .iter_mut() + .iter() .filter_map(|(&id, ref mut peer_info)| { trace!(target: "sync", "Checking peer our best {} their best {}", latest_hash, peer_info.latest_hash); - if peer_info.latest_hash != latest_hash { + let matches = match peers { + PeerState::Lagging => peer_info.latest_hash != latest_hash, + PeerState::SameBlock => peer_info.latest_hash == latest_hash, + }; + if matches { Some(id) } else { None @@ -1235,7 +1282,6 @@ impl ChainSync { } pub fn on_packet(&mut self, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { - debug!(target: "sync", "{} -> Dispatching packet: {}", peer, packet_id); SyncHandler::on_packet(self, io, peer, packet_id, data); } diff --git a/ethcore/sync/src/chain/propagator.rs b/ethcore/sync/src/chain/propagator.rs index d4a7cca71ff..3cdde8ada8c 100644 --- a/ethcore/sync/src/chain/propagator.rs +++ b/ethcore/sync/src/chain/propagator.rs @@ -70,43 +70,45 @@ impl SyncPropagator { /// propagates latest block to a set of peers pub fn propagate_blocks(sync: &mut ChainSync, chain_info: &BlockChainInfo, io: &mut SyncIo, blocks: &[H256], peers: &[PeerId]) -> usize { trace!(target: "sync", "Sending NewBlocks to {:?}", peers); - let mut sent = 0; - for peer_id in peers { - if blocks.is_empty() { - let rlp = ChainSync::create_latest_block_rlp(io.chain()); - SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_PACKET, rlp); - } else { - for h in blocks { - let rlp = ChainSync::create_new_block_rlp(io.chain(), h); - SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_PACKET, rlp); + let sent = peers.len(); + let mut send_packet = |io: &mut SyncIo, rlp: Bytes| { + for peer_id in peers { + SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_PACKET, rlp.clone()); + if let Some(ref mut peer) = sync.peers.get_mut(peer_id) { + peer.latest_hash = chain_info.best_block_hash.clone(); } } - if let Some(ref mut peer) = sync.peers.get_mut(peer_id) { - peer.latest_hash = chain_info.best_block_hash.clone(); + }; + + if blocks.is_empty() { + let rlp = ChainSync::create_latest_block_rlp(io.chain()); + send_packet(io, rlp); + } else { + for h in blocks { + let rlp = ChainSync::create_new_block_rlp(io.chain(), h); + send_packet(io, rlp); } - sent += 1; } + sent } /// propagates new known hashes to all peers pub fn propagate_new_hashes(sync: &mut ChainSync, chain_info: &BlockChainInfo, io: &mut SyncIo, peers: &[PeerId]) -> usize { trace!(target: "sync", "Sending NewHashes to {:?}", peers); - let mut sent = 0; let last_parent = *io.chain().best_block_header().parent_hash(); + let best_block_hash = chain_info.best_block_hash; + let rlp = match ChainSync::create_new_hashes_rlp(io.chain(), &last_parent, &best_block_hash) { + Some(rlp) => rlp, + None => return 0 + }; + + let sent = peers.len(); for peer_id in peers { - sent += match ChainSync::create_new_hashes_rlp(io.chain(), &last_parent, &chain_info.best_block_hash) { - Some(rlp) => { - { - if let Some(ref mut peer) = sync.peers.get_mut(peer_id) { - peer.latest_hash = chain_info.best_block_hash.clone(); - } - } - SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_HASHES_PACKET, rlp); - 1 - }, - None => 0 + if let Some(ref mut peer) = sync.peers.get_mut(peer_id) { + peer.latest_hash = best_block_hash; } + SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_HASHES_PACKET, rlp.clone()); } sent } @@ -250,10 +252,10 @@ impl SyncPropagator { pub fn propagate_latest_blocks(sync: &mut ChainSync, io: &mut SyncIo, sealed: &[H256]) { let chain_info = io.chain().chain_info(); if (((chain_info.best_block_number as i64) - (sync.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION { - let mut peers = sync.get_lagging_peers(&chain_info); + let peers = sync.get_lagging_peers(&chain_info); if sealed.is_empty() { let hashes = SyncPropagator::propagate_new_hashes(sync, &chain_info, io, &peers); - peers = ChainSync::select_random_peers(&peers); + let peers = ChainSync::select_random_peers(&peers); let blocks = SyncPropagator::propagate_blocks(sync, &chain_info, io, sealed, &peers); if blocks != 0 || hashes != 0 { trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes); @@ -319,7 +321,7 @@ impl SyncPropagator { } /// Generic packet sender - fn send_packet(sync: &mut SyncIo, peer_id: PeerId, packet_id: PacketId, packet: Bytes) { + pub fn send_packet(sync: &mut SyncIo, peer_id: PeerId, packet_id: PacketId, packet: Bytes) { if let Err(e) = sync.send(peer_id, packet_id, packet) { debug!(target:"sync", "Error sending packet: {:?}", e); sync.disconnect_peer(peer_id); diff --git a/ethcore/sync/src/chain/supplier.rs b/ethcore/sync/src/chain/supplier.rs index 1f8b8347394..eaee584cad0 100644 --- a/ethcore/sync/src/chain/supplier.rs +++ b/ethcore/sync/src/chain/supplier.rs @@ -27,6 +27,7 @@ use sync_io::SyncIo; use super::{ ChainSync, + SyncHandler, RlpResponseResult, PacketDecodeError, BLOCK_BODIES_PACKET, @@ -47,6 +48,8 @@ use super::{ RECEIPTS_PACKET, SNAPSHOT_DATA_PACKET, SNAPSHOT_MANIFEST_PACKET, + STATUS_PACKET, + TRANSACTIONS_PACKET, }; /// The Chain Sync Supplier: answers requests from peers with available data @@ -56,6 +59,7 @@ impl SyncSupplier { /// Dispatch incoming requests and responses pub fn dispatch_packet(sync: &RwLock, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { let rlp = Rlp::new(data); + let result = match packet_id { GET_BLOCK_BODIES_PACKET => SyncSupplier::return_rlp(io, &rlp, peer, SyncSupplier::return_block_bodies, @@ -80,9 +84,39 @@ impl SyncSupplier { GET_SNAPSHOT_DATA_PACKET => SyncSupplier::return_rlp(io, &rlp, peer, SyncSupplier::return_snapshot_data, |e| format!("Error sending snapshot data: {:?}", e)), - CONSENSUS_DATA_PACKET => ChainSync::on_consensus_packet(io, peer, &rlp), - _ => { + + STATUS_PACKET => { sync.write().on_packet(io, peer, packet_id, data); + Ok(()) + }, + // Packets that require the peer to be confirmed + _ => { + if !sync.read().peers.contains_key(&peer) { + debug!(target:"sync", "Unexpected packet {} from unregistered peer: {}:{}", packet_id, peer, io.peer_info(peer)); + return; + } + debug!(target: "sync", "{} -> Dispatching packet: {}", peer, packet_id); + + match packet_id { + CONSENSUS_DATA_PACKET => { + SyncHandler::on_consensus_packet(io, peer, &rlp) + }, + TRANSACTIONS_PACKET => { + let res = { + let sync_ro = sync.read(); + SyncHandler::on_peer_transactions(&*sync_ro, io, peer, &rlp) + }; + if res.is_err() { + // peer sent invalid data, disconnect. + io.disable_peer(peer); + sync.write().deactivate_peer(io, peer); + } + }, + _ => { + sync.write().on_packet(io, peer, packet_id, data); + } + } + Ok(()) } }; diff --git a/parity/run.rs b/parity/run.rs index c037b02766e..9797ea7dda4 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -657,7 +657,8 @@ fn execute_impl(cmd: RunCmd, logger: Arc, on_client_rq: // Propagate transactions as soon as they are imported. let tx = ::parking_lot::Mutex::new(priority_tasks); miner.add_transactions_listener(Box::new(move |hashes| { - let res = tx.lock().send(::sync::PriorityTask::PropagateTransactions(hashes.iter().cloned().collect())); + let task = ::sync::PriorityTask::PropagateTransactions(Instant::now(), hashes.iter().cloned().collect()); + let res = tx.lock().send(task); if let Err(err) = res { warn!("Unexpected error when sending priority task: {:?}", err); } From 14877dace1b84892a69c8e4d7b67f7465c875570 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Thu, 22 Nov 2018 12:49:28 +0000 Subject: [PATCH 4/9] Implement transaction propagation. Use sync_channel. --- ethcore/src/client/chain_notify.rs | 2 +- ethcore/src/client/client.rs | 14 +- ethcore/sync/src/api.rs | 16 ++- ethcore/sync/src/chain/mod.rs | 42 ++++-- ethcore/sync/src/chain/propagator.rs | 202 +++++++++++++++------------ miner/src/pool/listener.rs | 4 + parity/modules.rs | 2 +- parity/run.rs | 11 +- 8 files changed, 171 insertions(+), 122 deletions(-) diff --git a/ethcore/src/client/chain_notify.rs b/ethcore/src/client/chain_notify.rs index 334e54a241e..3d576ae12aa 100644 --- a/ethcore/src/client/chain_notify.rs +++ b/ethcore/src/client/chain_notify.rs @@ -147,7 +147,7 @@ pub trait ChainNotify : Send + Sync { /// fires when new block is about to be imported /// implementations should be light - fn block_pre_import(&self, _bytes: &Bytes, _difficulty: &U256) { + fn block_pre_import(&self, _bytes: &Bytes, _hash: &H256, _difficulty: &U256) { // does nothing by default } diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 8a393a1e0ea..06f9557589c 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -1424,13 +1424,17 @@ impl ImportBlock for Client { } let raw = if self.importer.block_queue.is_empty() { - Some((unverified.bytes.clone(), *unverified.header.difficulty())) + Some(( + unverified.bytes.clone(), + unverified.header.hash(), + *unverified.header.difficulty(), + )) } else { None }; match self.importer.block_queue.import(unverified) { Ok(hash) => { - if let Some((raw, difficulty)) = raw { - self.notify(move |n| n.block_pre_import(&raw, &difficulty)); + if let Some((raw, hash, difficulty)) = raw { + self.notify(move |n| n.block_pre_import(&raw, &hash, &difficulty)); } Ok(hash) }, @@ -2302,9 +2306,9 @@ impl ImportSealedBlock for Client { fn import_sealed_block(&self, block: SealedBlock) -> EthcoreResult { let raw = block.rlp_bytes(); let difficulty = *block.header().difficulty(); - self.notify(|n| n.block_pre_import(&raw, &difficulty)); - let h = block.header().hash(); + self.notify(|n| n.block_pre_import(&raw, &h, &difficulty)); + let start = Instant::now(); let route = { // scope for self.import_lock diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs index 85eae67643e..7722ae6e14f 100644 --- a/ethcore/sync/src/api.rs +++ b/ethcore/sync/src/api.rs @@ -241,11 +241,13 @@ pub enum PriorityTask { started: ::std::time::Instant, /// Raw block RLP to propagate block: Bytes, + /// Block hash + hash: H256, /// Blocks difficulty difficulty: U256, }, /// Propagate a list of transactions - PropagateTransactions(::std::time::Instant, Vec), + PropagateTransactions(::std::time::Instant), } /// EthSync initialization parameters. @@ -281,7 +283,7 @@ pub struct EthSync { /// Light subprotocol name. light_subprotocol_name: [u8; 3], /// Priority tasks notification channel - priority_tasks: Mutex>, + priority_tasks: Mutex>, } fn light_params( @@ -337,7 +339,8 @@ impl EthSync { }) }; - let (priority_tasks_tx, priority_tasks_rx) = mpsc::channel(); + // We create a sync channel with low buffer to prevent excesive queueing. + let (priority_tasks_tx, priority_tasks_rx) = mpsc::sync_channel(2); let sync = ChainSyncApi::new( params.config, &*params.chain, @@ -365,7 +368,7 @@ impl EthSync { } /// Priority tasks producer - pub fn priority_tasks(&self) -> mpsc::Sender { + pub fn priority_tasks(&self) -> mpsc::SyncSender { self.priority_tasks.lock().clone() } } @@ -434,7 +437,7 @@ impl NetworkProtocolHandler for SyncProtocolHandler { io.register_timer(SYNC_TIMER, Duration::from_millis(1100)).expect("Error registering sync timer"); io.register_timer(TX_TIMER, Duration::from_millis(1300)).expect("Error registering transactions timer"); - io.register_timer(PRIORITY_TIMER, Duration::from_millis(100)).expect("Error registering peers timer"); + io.register_timer(PRIORITY_TIMER, Duration::from_millis(250)).expect("Error registering peers timer"); } } @@ -473,10 +476,11 @@ impl NetworkProtocolHandler for SyncProtocolHandler { } impl ChainNotify for EthSync { - fn block_pre_import(&self, bytes: &Bytes, difficulty: &U256) { + fn block_pre_import(&self, bytes: &Bytes, hash: &H256, difficulty: &U256) { let task = PriorityTask::PropagateBlock { started: ::std::time::Instant::now(), block: bytes.clone(), + hash: *hash, difficulty: *difficulty, }; if let Err(e) = self.priority_tasks.lock().send(task) { diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index c70705f91f2..e574eece963 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -469,26 +469,33 @@ impl ChainSyncApi { match task { // NOTE We can't simply use existing methods, // cause the block is not in the DB yet. - PriorityTask::PropagateBlock { started, block, difficulty } => { + PriorityTask::PropagateBlock { started, block, hash, difficulty } => { // try to send to peers that are on the same block as us // (they will most likely accept the new block). info!("Starting block propagation, took: {}µs", as_us(started)); let chain_info = io.chain().chain_info(); let total_difficulty = chain_info.total_difficulty + difficulty; let rlp = ChainSync::create_block_rlp(&block, total_difficulty); - for peer in sync.get_peers(&chain_info, PeerState::SameBlock) { + for peers in sync.get_peers(&chain_info, PeerState::SameBlock).chunks(10) { check_deadline(deadline)?; - SyncPropagator::send_packet(io, peer, NEW_BLOCK_PACKET, rlp.clone()); - // TODO [ToDr] Update peer latest block? + for peer in peers { + SyncPropagator::send_packet(io, *peer, NEW_BLOCK_PACKET, rlp.clone()); + // TODO [ToDr] Update peer latest block? + if let Some(ref mut peer) = sync.peers.get_mut(peer) { + peer.latest_hash = hash; + } + } } info!("Finished block propagation, took: {}µs", as_us(started)); }, - PriorityTask::PropagateTransactions(time, txs) => info!( - "Propagating transactions {}, took {}µs", - txs.len(), - as_us(time) - ), + PriorityTask::PropagateTransactions(time) => { + info!("Starting transaction propagation, took {}µs", as_us(time)); + SyncPropagator::propagate_new_transactions(&mut sync, io, || { + check_deadline(deadline).is_some() + }); + info!("Finished transaction propagation, took {}µs", as_us(time)); + }, } Some(()) @@ -497,6 +504,13 @@ impl ChainSyncApi { if work().is_none() { debug!(target: "sync", "Unable to complete priority task within deadline."); } + + // attempt to do more stuff, but we don't care if we don't fit the deadline. + loop { + if work().is_none() { + return; + } + } } } @@ -1297,7 +1311,15 @@ impl ChainSync { /// propagates new transactions to all peers pub fn propagate_new_transactions(&mut self, io: &mut SyncIo) { - SyncPropagator::propagate_new_transactions(self, io); + let deadline = Instant::now() + Duration::from_millis(500); + SyncPropagator::propagate_new_transactions(self, io, || { + if deadline > Instant::now() { + true + } else { + warn!("Wasn't able to finish transaction propagation within a deadline."); + false + } + }); } /// Broadcast consensus message to peers. diff --git a/ethcore/sync/src/chain/propagator.rs b/ethcore/sync/src/chain/propagator.rs index 3cdde8ada8c..76a2c12fcf0 100644 --- a/ethcore/sync/src/chain/propagator.rs +++ b/ethcore/sync/src/chain/propagator.rs @@ -114,7 +114,7 @@ impl SyncPropagator { } /// propagates new transactions to all peers - pub fn propagate_new_transactions(sync: &mut ChainSync, io: &mut SyncIo) -> usize { + pub fn propagate_new_transactions bool>(sync: &mut ChainSync, io: &mut SyncIo, mut should_continue: F) -> usize { // Early out if nobody to send to. if sync.peers.is_empty() { return 0; @@ -125,6 +125,10 @@ impl SyncPropagator { return 0; } + if !should_continue() { + return 0; + } + let (transactions, service_transactions): (Vec<_>, Vec<_>) = transactions.iter() .map(|tx| tx.signed()) .partition(|tx| !tx.gas_price.is_zero()); @@ -133,21 +137,31 @@ impl SyncPropagator { let mut affected_peers = HashSet::new(); if !transactions.is_empty() { let peers = SyncPropagator::select_peers_for_transactions(sync, |_| true); - affected_peers = SyncPropagator::propagate_transactions_to_peers(sync, io, peers, transactions); + affected_peers = SyncPropagator::propagate_transactions_to_peers( + sync, io, peers, transactions, &mut should_continue, + ); } // most of times service_transactions will be empty // => there's no need to merge packets if !service_transactions.is_empty() { let service_transactions_peers = SyncPropagator::select_peers_for_transactions(sync, |peer_id| accepts_service_transaction(&io.peer_info(*peer_id))); - let service_transactions_affected_peers = SyncPropagator::propagate_transactions_to_peers(sync, io, service_transactions_peers, service_transactions); + let service_transactions_affected_peers = SyncPropagator::propagate_transactions_to_peers( + sync, io, service_transactions_peers, service_transactions, &mut should_continue + ); affected_peers.extend(&service_transactions_affected_peers); } affected_peers.len() } - fn propagate_transactions_to_peers(sync: &mut ChainSync, io: &mut SyncIo, peers: Vec, transactions: Vec<&SignedTransaction>) -> HashSet { + fn propagate_transactions_to_peers bool>( + sync: &mut ChainSync, + io: &mut SyncIo, + peers: Vec, + transactions: Vec<&SignedTransaction>, + mut should_continue: F, + ) -> HashSet { let all_transactions_hashes = transactions.iter() .map(|tx| tx.hash()) .collect::(); @@ -160,93 +174,95 @@ impl SyncPropagator { // Clear old transactions from stats sync.transactions_stats.retain(&all_transactions_hashes); - let block_number = io.chain().chain_info().best_block_number; + let send_packet = |io: &mut SyncIo, peer_id: PeerId, sent: usize, rlp: Bytes| { + let size = rlp.len(); + SyncPropagator::send_packet(io, peer_id, TRANSACTIONS_PACKET, rlp); + trace!(target: "sync", "{:02} <- Transactions ({} entries; {} bytes)", peer_id, sent, size); + }; - // sqrt(x)/x scaled to max u32 - let lucky_peers = { - peers.into_iter() - .filter_map(|peer_id| { - let stats = &mut sync.transactions_stats; - let peer_info = sync.peers.get_mut(&peer_id) - .expect("peer_id is form peers; peers is result of select_peers_for_transactions; select_peers_for_transactions selects peers from self.peers; qed"); - - // Send all transactions - if peer_info.last_sent_transactions.is_empty() { - // update stats - for hash in &all_transactions_hashes { - let id = io.peer_session_info(peer_id).and_then(|info| info.id); - stats.propagated(hash, id, block_number); - } - peer_info.last_sent_transactions = all_transactions_hashes.clone(); - return Some((peer_id, all_transactions_hashes.len(), all_transactions_rlp.clone())); - } + let block_number = io.chain().chain_info().best_block_number; + let mut sent_to_peers = HashSet::new(); + let mut max_sent = 0; - // Get hashes of all transactions to send to this peer - let to_send = all_transactions_hashes.difference(&peer_info.last_sent_transactions) - .cloned() - .collect::>(); - if to_send.is_empty() { - return None; - } + // for every peer construct and send transactions packet + for peer_id in peers { + if !should_continue() { + debug!(target: "sync", "Sent up to {} transactions to {} peers.", max_sent, sent_to_peers.len()); + return sent_to_peers; + } - // Construct RLP - let (packet, to_send) = { - let mut to_send = to_send; - let mut packet = RlpStream::new(); - packet.begin_unbounded_list(); - let mut pushed = 0; - for tx in &transactions { - let hash = tx.hash(); - if to_send.contains(&hash) { - let mut transaction = RlpStream::new(); - tx.rlp_append(&mut transaction); - let appended = packet.append_raw_checked(&transaction.drain(), 1, MAX_TRANSACTION_PACKET_SIZE); - if !appended { - // Maximal packet size reached just proceed with sending - debug!(target: "sync", "Transaction packet size limit reached. Sending incomplete set of {}/{} transactions.", pushed, to_send.len()); - to_send = to_send.into_iter().take(pushed).collect(); - break; - } - pushed += 1; - } - } - packet.complete_unbounded_list(); - (packet, to_send) - }; + let stats = &mut sync.transactions_stats; + let peer_info = sync.peers.get_mut(&peer_id) + .expect("peer_id is form peers; peers is result of select_peers_for_transactions; select_peers_for_transactions selects peers from self.peers; qed"); - // Update stats + // Send all transactions, if the peer doesn't know about anything + if peer_info.last_sent_transactions.is_empty() { + // update stats + for hash in &all_transactions_hashes { let id = io.peer_session_info(peer_id).and_then(|info| info.id); - for hash in &to_send { - // update stats - stats.propagated(hash, id, block_number); - } + stats.propagated(hash, id, block_number); + } + peer_info.last_sent_transactions = all_transactions_hashes.clone(); - peer_info.last_sent_transactions = all_transactions_hashes - .intersection(&peer_info.last_sent_transactions) - .chain(&to_send) - .cloned() - .collect(); - Some((peer_id, to_send.len(), packet.out())) - }) - .collect::>() - }; + send_packet(io, peer_id, all_transactions_hashes.len(), all_transactions_rlp.clone()); + sent_to_peers.insert(peer_id); + max_sent = cmp::max(max_sent, all_transactions_hashes.len()); + continue; + } + + // Get hashes of all transactions to send to this peer + let to_send = all_transactions_hashes.difference(&peer_info.last_sent_transactions) + .cloned() + .collect::>(); + if to_send.is_empty() { + continue; + } - // Send RLPs - let mut peers = HashSet::new(); - if lucky_peers.len() > 0 { - let mut max_sent = 0; - let lucky_peers_len = lucky_peers.len(); - for (peer_id, sent, rlp) in lucky_peers { - peers.insert(peer_id); - let size = rlp.len(); - SyncPropagator::send_packet(io, peer_id, TRANSACTIONS_PACKET, rlp); - trace!(target: "sync", "{:02} <- Transactions ({} entries; {} bytes)", peer_id, sent, size); - max_sent = cmp::max(max_sent, sent); + // Construct RLP + let (packet, to_send) = { + let mut to_send = to_send; + let mut packet = RlpStream::new(); + packet.begin_unbounded_list(); + let mut pushed = 0; + for tx in &transactions { + let hash = tx.hash(); + if to_send.contains(&hash) { + let mut transaction = RlpStream::new(); + tx.rlp_append(&mut transaction); + let appended = packet.append_raw_checked(&transaction.drain(), 1, MAX_TRANSACTION_PACKET_SIZE); + if !appended { + // Maximal packet size reached just proceed with sending + debug!(target: "sync", "Transaction packet size limit reached. Sending incomplete set of {}/{} transactions.", pushed, to_send.len()); + to_send = to_send.into_iter().take(pushed).collect(); + break; + } + pushed += 1; + } + } + packet.complete_unbounded_list(); + (packet, to_send) + }; + + // Update stats + let id = io.peer_session_info(peer_id).and_then(|info| info.id); + for hash in &to_send { + // update stats + stats.propagated(hash, id, block_number); } - debug!(target: "sync", "Sent up to {} transactions to {} peers.", max_sent, lucky_peers_len); + + peer_info.last_sent_transactions = all_transactions_hashes + .intersection(&peer_info.last_sent_transactions) + .chain(&to_send) + .cloned() + .collect(); + send_packet(io, peer_id, to_send.len(), packet.out()); + sent_to_peers.insert(peer_id); + max_sent = cmp::max(max_sent, to_send.len()); + } - peers + debug!(target: "sync", "Sent up to {} transactions to {} peers.", max_sent, sent_to_peers.len()); + sent_to_peers } pub fn propagate_latest_blocks(sync: &mut ChainSync, io: &mut SyncIo, sealed: &[H256]) { @@ -450,13 +466,13 @@ mod tests { let queue = RwLock::new(VecDeque::new()); let ss = TestSnapshotService::new(); let mut io = TestIo::new(&mut client, &ss, &queue, None); - let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io); + let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); // Try to propagate same transactions for the second time - let peer_count2 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io); + let peer_count2 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); // Even after new block transactions should not be propagated twice sync.chain_new_blocks(&mut io, &[], &[], &[], &[], &[], &[]); // Try to propagate same transactions for the third time - let peer_count3 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io); + let peer_count3 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); // 1 message should be send assert_eq!(1, io.packets.len()); @@ -477,7 +493,7 @@ mod tests { let queue = RwLock::new(VecDeque::new()); let ss = TestSnapshotService::new(); let mut io = TestIo::new(&mut client, &ss, &queue, None); - let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io); + let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); io.chain.insert_transaction_to_queue(); // New block import should not trigger propagation. // (we only propagate on timeout) @@ -501,10 +517,10 @@ mod tests { let queue = RwLock::new(VecDeque::new()); let ss = TestSnapshotService::new(); let mut io = TestIo::new(&mut client, &ss, &queue, None); - let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io); + let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); sync.chain_new_blocks(&mut io, &[], &[], &[], &[], &[], &[]); // Try to propagate same transactions for the second time - let peer_count2 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io); + let peer_count2 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); assert_eq!(0, io.packets.len()); assert_eq!(0, peer_count); @@ -522,7 +538,7 @@ mod tests { // should sent some { let mut io = TestIo::new(&mut client, &ss, &queue, None); - let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io); + let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); assert_eq!(1, io.packets.len()); assert_eq!(1, peer_count); } @@ -531,9 +547,9 @@ mod tests { let (peer_count2, peer_count3) = { let mut io = TestIo::new(&mut client, &ss, &queue, None); // Propagate new transactions - let peer_count2 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io); + let peer_count2 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); // And now the peer should have all transactions - let peer_count3 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io); + let peer_count3 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); (peer_count2, peer_count3) }; @@ -556,7 +572,7 @@ mod tests { let queue = RwLock::new(VecDeque::new()); let ss = TestSnapshotService::new(); let mut io = TestIo::new(&mut client, &ss, &queue, None); - SyncPropagator::propagate_new_transactions(&mut sync, &mut io); + SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); let stats = sync.transactions_stats(); assert_eq!(stats.len(), 1, "Should maintain stats for single transaction.") @@ -586,7 +602,7 @@ mod tests { io.peers_info.insert(4, "Parity-Ethereum/v2.7.3-ABCDEFGH".to_owned()); // and new service transaction is propagated to peers - SyncPropagator::propagate_new_transactions(&mut sync, &mut io); + SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); // peer#2 && peer#4 are receiving service transaction assert!(io.packets.iter().any(|p| p.packet_id == 0x02 && p.recipient == 2)); // TRANSACTIONS_PACKET @@ -610,7 +626,7 @@ mod tests { io.peers_info.insert(1, "Parity-Ethereum/v2.6".to_owned()); // and service + non-service transactions are propagated to peers - SyncPropagator::propagate_new_transactions(&mut sync, &mut io); + SyncPropagator::propagate_new_transactions(&mut sync, &mut io, || true); // two separate packets for peer are queued: // 1) with non-service-transaction diff --git a/miner/src/pool/listener.rs b/miner/src/pool/listener.rs index 5776ba84532..f5e76e15d52 100644 --- a/miner/src/pool/listener.rs +++ b/miner/src/pool/listener.rs @@ -50,6 +50,10 @@ impl Notifier { /// Notify listeners about all currently pending transactions. pub fn notify(&mut self) { + if self.pending.is_empty() { + return; + } + for l in &self.listeners { (l)(&self.pending); } diff --git a/parity/modules.rs b/parity/modules.rs index ac84aea5f21..b1d7d15b464 100644 --- a/parity/modules.rs +++ b/parity/modules.rs @@ -29,7 +29,7 @@ pub type SyncModules = ( Arc, Arc, Arc, - mpsc::Sender, + mpsc::SyncSender, ); pub fn sync( diff --git a/parity/run.rs b/parity/run.rs index 9797ea7dda4..28e40d3f5fc 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -656,12 +656,11 @@ fn execute_impl(cmd: RunCmd, logger: Arc, on_client_rq: // Propagate transactions as soon as they are imported. let tx = ::parking_lot::Mutex::new(priority_tasks); - miner.add_transactions_listener(Box::new(move |hashes| { - let task = ::sync::PriorityTask::PropagateTransactions(Instant::now(), hashes.iter().cloned().collect()); - let res = tx.lock().send(task); - if let Err(err) = res { - warn!("Unexpected error when sending priority task: {:?}", err); - } + miner.add_transactions_listener(Box::new(move |_hashes| { + let task = ::sync::PriorityTask::PropagateTransactions(Instant::now()); + info!("New transactions imported."); + // we ignore both full queue and disconnected error + let _ = tx.lock().try_send(task); })); // provider not added to a notification center is effectively disabled From 96e0c6b4520130b24c03087dc9951ea747bf50ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Thu, 22 Nov 2018 13:27:52 +0000 Subject: [PATCH 5/9] Tone down info. --- ethcore/sync/src/api.rs | 4 +++- ethcore/sync/src/chain/mod.rs | 27 +++++++++++++-------------- parity/run.rs | 7 +++++-- 3 files changed, 21 insertions(+), 17 deletions(-) diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs index 2005cb25727..f8b18b35bd4 100644 --- a/ethcore/sync/src/api.rs +++ b/ethcore/sync/src/api.rs @@ -416,6 +416,8 @@ const SYNC_TIMER: TimerToken = 1; const TX_TIMER: TimerToken = 2; const PRIORITY_TIMER: TimerToken = 3; +pub(crate) const PRIORITY_TIMER_INTERVAL: Duration = Duration::from_millis(250); + struct SyncProtocolHandler { /// Shared blockchain client. chain: Arc, @@ -434,7 +436,7 @@ impl NetworkProtocolHandler for SyncProtocolHandler { io.register_timer(SYNC_TIMER, Duration::from_millis(1100)).expect("Error registering sync timer"); io.register_timer(TX_TIMER, Duration::from_millis(1300)).expect("Error registering transactions timer"); - io.register_timer(PRIORITY_TIMER, Duration::from_millis(250)).expect("Error registering peers timer"); + io.register_timer(PRIORITY_TIMER, PRIORITY_TIMER_INTERVAL).expect("Error registering peers timer"); } } diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index 259290ad3f5..c74f240e2ed 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -187,6 +187,11 @@ const FORK_HEADER_TIMEOUT: Duration = Duration::from_secs(3); const SNAPSHOT_MANIFEST_TIMEOUT: Duration = Duration::from_secs(5); const SNAPSHOT_DATA_TIMEOUT: Duration = Duration::from_secs(120); +/// Defines how much time we have to complete priority transaction or block propagation. +/// after the deadline is reached the task is considered finished +/// (so we might sent only to some part of the peers we originally intended to send to) +const PRIORITY_TASK_DEADLINE: Duration = Duration::from_millis(100); + #[derive(Copy, Clone, Eq, PartialEq, Debug)] /// Sync state pub enum SyncState { @@ -445,7 +450,7 @@ impl ChainSyncApi { } // deadline to get the task from the queue - let deadline = Instant::now() + Duration::from_millis(150); + let deadline = Instant::now() + ::api::PRIORITY_TIMER_INTERVAL; let mut work = || { let task = { let tasks = self.priority_tasks.try_lock_until(deadline)?; @@ -460,11 +465,11 @@ impl ChainSyncApi { let mut sync = self.sync.write(); // since `sync` might take a while to acquire we have a new deadline // to do the rest of the job now. - let deadline = Instant::now() + Duration::from_millis(100); + let deadline = Instant::now() + PRIORITY_TASK_DEADLINE; - let as_us = move |prev| { + let as_ms = move |prev| { let dur: Duration = Instant::now() - prev; - dur.as_secs() * 1_000_000 + dur.subsec_micros() as u64 + dur.as_secs() * 1_000 + dur.subsec_millis() as u64 }; match task { // NOTE We can't simply use existing methods, @@ -472,7 +477,6 @@ impl ChainSyncApi { PriorityTask::PropagateBlock { started, block, hash, difficulty } => { // try to send to peers that are on the same block as us // (they will most likely accept the new block). - info!("Starting block propagation, took: {}µs", as_us(started)); let chain_info = io.chain().chain_info(); let total_difficulty = chain_info.total_difficulty + difficulty; let rlp = ChainSync::create_block_rlp(&block, total_difficulty); @@ -485,25 +489,20 @@ impl ChainSyncApi { } } } - info!("Finished block propagation, took: {}µs", as_us(started)); + debug!(target: "sync", "Finished block propagation, took {}ms", as_ms(started)); }, PriorityTask::PropagateTransactions(time) => { - info!("Starting transaction propagation, took {}µs", as_us(time)); SyncPropagator::propagate_new_transactions(&mut sync, io, || { check_deadline(deadline).is_some() }); - info!("Finished transaction propagation, took {}µs", as_us(time)); + debug!(target: "sync", "Finished transaction propagation, took {}ms", as_ms(time)); }, } Some(()) }; - if work().is_none() { - debug!(target: "sync", "Unable to complete priority task within deadline."); - } - - // attempt to do more stuff, but we don't care if we don't fit the deadline. + // Process as many items as we can until the deadline is reached. loop { if work().is_none() { return; @@ -1314,7 +1313,7 @@ impl ChainSync { if deadline > Instant::now() { true } else { - warn!("Wasn't able to finish transaction propagation within a deadline."); + debug!(target: "sync", "Wasn't able to finish transaction propagation within a deadline."); false } }); diff --git a/parity/run.rs b/parity/run.rs index e082d055875..55a9f7f413c 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -653,9 +653,12 @@ fn execute_impl(cmd: RunCmd, logger: Arc, on_client_rq: let tx = ::parking_lot::Mutex::new(priority_tasks); miner.add_transactions_listener(Box::new(move |_hashes| { let task = ::sync::PriorityTask::PropagateTransactions(Instant::now()); - info!("New transactions imported."); // we ignore both full queue and disconnected error - let _ = tx.lock().try_send(task); + // If the queue is full it means that propagate task is already there + // if the queue is disconnected it means we are closing. + if let Some(send) = tx.try_lock() { + let _ = send.try_send(task); + } })); // provider not added to a notification center is effectively disabled From e98d5b5ea2cbb0edb93e6eb6f2c8801083fdf8b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Thu, 22 Nov 2018 14:26:36 +0000 Subject: [PATCH 6/9] Prevent deadlock by not waiting forever for sync lock. --- ethcore/sync/src/chain/mod.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index c74f240e2ed..6f1e4e4c9c5 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -458,15 +458,12 @@ impl ChainSyncApi { tasks.recv_timeout(left).ok()? }; - // wait for the sync lock indefinitely, - // since we already have the item. - // Subsequent timers will just try to process - // other tasks. - let mut sync = self.sync.write(); - // since `sync` might take a while to acquire we have a new deadline - // to do the rest of the job now. + // wait for the sync lock until deadline, + // note we might drop the task here if we won't manage to acquire the lock. + let mut sync = self.sync.try_write_until(deadline)?; + // since we already have everything let's use a different deadline + // to do the rest of the job now, so that previous work is not wasted. let deadline = Instant::now() + PRIORITY_TASK_DEADLINE; - let as_ms = move |prev| { let dur: Duration = Instant::now() - prev; dur.as_secs() * 1_000 + dur.subsec_millis() as u64 From 580939582e4a0293bc4a76d9f2b3b4f64592a3b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Thu, 22 Nov 2018 16:24:15 +0000 Subject: [PATCH 7/9] Fix lock order. --- ethcore/src/blockchain/blockchain.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ethcore/src/blockchain/blockchain.rs b/ethcore/src/blockchain/blockchain.rs index e5af6420861..4286dc41482 100644 --- a/ethcore/src/blockchain/blockchain.rs +++ b/ethcore/src/blockchain/blockchain.rs @@ -1187,8 +1187,8 @@ impl BlockChain { let mut pending_block_details = self.pending_block_details.write(); let mut pending_write_txs = self.pending_transaction_addresses.write(); - let mut best_ancient_block = self.best_ancient_block.write(); let mut best_block = self.best_block.write(); + let mut best_ancient_block = self.best_ancient_block.write(); let mut write_block_details = self.block_details.write(); let mut write_hashes = self.block_hashes.write(); let mut write_txs = self.transaction_addresses.write(); From abe3b8f33a2a2a3ab6e177bda4960f5d67140d80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Thu, 22 Nov 2018 18:26:18 +0000 Subject: [PATCH 8/9] Don't use sync_channel to prevent deadlocks. --- ethcore/sync/src/api.rs | 20 ++++++++++++++------ ethcore/sync/src/chain/mod.rs | 5 +++-- parity/modules.rs | 2 +- parity/run.rs | 14 +++++++------- 4 files changed, 25 insertions(+), 16 deletions(-) diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs index f8b18b35bd4..6fef887a0fe 100644 --- a/ethcore/sync/src/api.rs +++ b/ethcore/sync/src/api.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use std::sync::{Arc, mpsc}; +use std::sync::{Arc, mpsc, atomic}; use std::collections::{HashMap, BTreeMap}; use std::io; use std::ops::Range; @@ -247,7 +247,16 @@ pub enum PriorityTask { difficulty: U256, }, /// Propagate a list of transactions - PropagateTransactions(::std::time::Instant), + PropagateTransactions(::std::time::Instant, Arc), +} +impl PriorityTask { + /// Mark the task as being processed, right after it's retrieved from the queue. + pub fn starting(&self) { + match *self { + PriorityTask::PropagateTransactions(_, ref is_ready) => is_ready.store(true, atomic::Ordering::SeqCst), + _ => {}, + } + } } /// EthSync initialization parameters. @@ -283,7 +292,7 @@ pub struct EthSync { /// Light subprotocol name. light_subprotocol_name: [u8; 3], /// Priority tasks notification channel - priority_tasks: Mutex>, + priority_tasks: Mutex>, } fn light_params( @@ -336,8 +345,7 @@ impl EthSync { }) }; - // We create a sync channel with low buffer to prevent excesive queueing. - let (priority_tasks_tx, priority_tasks_rx) = mpsc::sync_channel(2); + let (priority_tasks_tx, priority_tasks_rx) = mpsc::channel(); let sync = ChainSyncApi::new( params.config, &*params.chain, @@ -365,7 +373,7 @@ impl EthSync { } /// Priority tasks producer - pub fn priority_tasks(&self) -> mpsc::SyncSender { + pub fn priority_tasks(&self) -> mpsc::Sender { self.priority_tasks.lock().clone() } } diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index 6f1e4e4c9c5..bab799935e0 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -457,7 +457,7 @@ impl ChainSyncApi { let left = check_deadline(deadline)?; tasks.recv_timeout(left).ok()? }; - + task.starting(); // wait for the sync lock until deadline, // note we might drop the task here if we won't manage to acquire the lock. let mut sync = self.sync.try_write_until(deadline)?; @@ -488,7 +488,7 @@ impl ChainSyncApi { } debug!(target: "sync", "Finished block propagation, took {}ms", as_ms(started)); }, - PriorityTask::PropagateTransactions(time) => { + PriorityTask::PropagateTransactions(time, _) => { SyncPropagator::propagate_new_transactions(&mut sync, io, || { check_deadline(deadline).is_some() }); @@ -1601,3 +1601,4 @@ pub mod tests { assert_eq!(status.status.transaction_count, 0); } } + diff --git a/parity/modules.rs b/parity/modules.rs index b1d7d15b464..ac84aea5f21 100644 --- a/parity/modules.rs +++ b/parity/modules.rs @@ -29,7 +29,7 @@ pub type SyncModules = ( Arc, Arc, Arc, - mpsc::SyncSender, + mpsc::Sender, ); pub fn sync( diff --git a/parity/run.rs b/parity/run.rs index 55a9f7f413c..14fd58dd107 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -15,7 +15,7 @@ // along with Parity. If not, see . use std::any::Any; -use std::sync::{Arc, Weak}; +use std::sync::{Arc, Weak, atomic}; use std::time::{Duration, Instant}; use std::thread; @@ -651,13 +651,13 @@ fn execute_impl(cmd: RunCmd, logger: Arc, on_client_rq: // Propagate transactions as soon as they are imported. let tx = ::parking_lot::Mutex::new(priority_tasks); + let is_ready = Arc::new(atomic::AtomicBool::new(true)); miner.add_transactions_listener(Box::new(move |_hashes| { - let task = ::sync::PriorityTask::PropagateTransactions(Instant::now()); - // we ignore both full queue and disconnected error - // If the queue is full it means that propagate task is already there - // if the queue is disconnected it means we are closing. - if let Some(send) = tx.try_lock() { - let _ = send.try_send(task); + // we want to have only one PendingTransactions task in the queue. + if is_ready.compare_and_swap(true, false, atomic::Ordering::SeqCst) { + let task = ::sync::PriorityTask::PropagateTransactions(Instant::now(), is_ready.clone()); + // we ignore error cause it means that we are closing + let _ = tx.lock().send(task); } })); From 3040e82d2ce4a554669e20c398a041389ed81af0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Thu, 22 Nov 2018 18:52:27 +0000 Subject: [PATCH 9/9] Fix tests. --- ethcore/sync/src/chain/mod.rs | 8 ++++---- ethcore/sync/src/chain/propagator.rs | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index bab799935e0..cdedd56303c 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -1329,7 +1329,7 @@ impl ChainSync { #[cfg(test)] pub mod tests { - use std::collections::{HashSet, VecDeque}; + use std::collections::{VecDeque}; use ethkey; use network::PeerId; use tests::helpers::{TestIo}; @@ -1445,8 +1445,8 @@ pub mod tests { asking_blocks: Vec::new(), asking_hash: None, ask_time: Instant::now(), - last_sent_transactions: HashSet::new(), - last_sent_private_transactions: HashSet::new(), + last_sent_transactions: Default::default(), + last_sent_private_transactions: Default::default(), expired: false, confirmation: super::ForkConfirmation::Confirmed, snapshot_number: None, @@ -1461,7 +1461,7 @@ pub mod tests { fn finds_lagging_peers() { let mut client = TestBlockChainClient::new(); client.add_blocks(100, EachBlockWith::Uncle); - let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(10), &client); + let sync = dummy_sync_with_peer(client.block_hash_delta_minus(10), &client); let chain_info = client.chain_info(); let lagging_peers = sync.get_lagging_peers(&chain_info); diff --git a/ethcore/sync/src/chain/propagator.rs b/ethcore/sync/src/chain/propagator.rs index 76a2c12fcf0..689ccfc02b1 100644 --- a/ethcore/sync/src/chain/propagator.rs +++ b/ethcore/sync/src/chain/propagator.rs @@ -438,8 +438,8 @@ mod tests { asking_blocks: Vec::new(), asking_hash: None, ask_time: Instant::now(), - last_sent_transactions: HashSet::new(), - last_sent_private_transactions: HashSet::new(), + last_sent_transactions: Default::default(), + last_sent_private_transactions: Default::default(), expired: false, confirmation: ForkConfirmation::Confirmed, snapshot_number: None,