diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 21746450061..49580253d0e 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -344,7 +344,7 @@ impl Client { /// This is triggered by a message coming from a block queue when the block is ready for insertion pub fn import_verified_blocks(&self) -> usize { let max_blocks_to_import = 64; - let (imported_blocks, import_results, invalid_blocks, original_best, imported, duration) = { + let (imported_blocks, import_results, invalid_blocks, imported, duration) = { let mut imported_blocks = Vec::with_capacity(max_blocks_to_import); let mut invalid_blocks = HashSet::new(); let mut import_results = Vec::with_capacity(max_blocks_to_import); @@ -354,8 +354,6 @@ impl Client { let start = precise_time_ns(); let blocks = self.block_queue.drain(max_blocks_to_import); - let original_best = self.chain_info().best_block_hash; - for block in blocks { let header = &block.header; if invalid_blocks.contains(&header.parent_hash) { @@ -389,7 +387,7 @@ impl Client { } } let duration_ns = precise_time_ns() - start; - (imported_blocks, import_results, invalid_blocks, original_best, imported, duration_ns) + (imported_blocks, import_results, invalid_blocks, imported, duration_ns) }; { @@ -413,10 +411,6 @@ impl Client { } } - if self.chain_info().best_block_hash != original_best { - self.miner.update_sealing(self); - } - imported } diff --git a/ethcore/src/miner/miner.rs b/ethcore/src/miner/miner.rs index 79c8a95bf17..a6090f63692 100644 --- a/ethcore/src/miner/miner.rs +++ b/ethcore/src/miner/miner.rs @@ -15,7 +15,6 @@ // along with Parity. If not, see . use rayon::prelude::*; -use std::sync::atomic::{self, AtomicBool}; use std::time::{Instant, Duration}; use util::*; @@ -159,15 +158,20 @@ impl GasPricer { } } +struct SealingWork { + queue: UsingQueue, + enabled: bool, +} + /// Keeps track of transactions using priority queue and holds currently mined block. pub struct Miner { // NOTE [ToDr] When locking always lock in this order! transaction_queue: Arc>, - sealing_work: Mutex>, + sealing_work: Mutex, // for sealing... options: MinerOptions, - sealing_enabled: AtomicBool, + next_allowed_reseal: Mutex, sealing_block_last_request: Mutex, gas_range_target: RwLock<(U256, U256)>, @@ -186,10 +190,9 @@ impl Miner { Miner { transaction_queue: Arc::new(Mutex::new(TransactionQueue::new())), options: Default::default(), - sealing_enabled: AtomicBool::new(false), next_allowed_reseal: Mutex::new(Instant::now()), sealing_block_last_request: Mutex::new(0), - sealing_work: Mutex::new(UsingQueue::new(20)), + sealing_work: Mutex::new(SealingWork{queue: UsingQueue::new(20), enabled: false}), gas_range_target: RwLock::new((U256::zero(), U256::zero())), author: RwLock::new(Address::default()), extra_data: RwLock::new(Vec::new()), @@ -206,10 +209,9 @@ impl Miner { let txq = Arc::new(Mutex::new(TransactionQueue::with_limits(options.tx_queue_size, options.tx_gas_limit))); Arc::new(Miner { transaction_queue: txq, - sealing_enabled: AtomicBool::new(options.force_sealing || !options.new_work_notify.is_empty()), next_allowed_reseal: Mutex::new(Instant::now()), sealing_block_last_request: Mutex::new(0), - sealing_work: Mutex::new(UsingQueue::new(options.work_queue_size)), + sealing_work: Mutex::new(SealingWork{queue: UsingQueue::new(options.work_queue_size), enabled: options.force_sealing || !options.new_work_notify.is_empty()}), gas_range_target: RwLock::new((U256::zero(), U256::zero())), author: RwLock::new(Address::default()), extra_data: RwLock::new(Vec::new()), @@ -231,12 +233,12 @@ impl Miner { /// Get `Some` `clone()` of the current pending block's state or `None` if we're not sealing. pub fn pending_state(&self) -> Option { - self.sealing_work.lock().peek_last_ref().map(|b| b.block().fields().state.clone()) + self.sealing_work.lock().queue.peek_last_ref().map(|b| b.block().fields().state.clone()) } /// Get `Some` `clone()` of the current pending block's state or `None` if we're not sealing. pub fn pending_block(&self) -> Option { - self.sealing_work.lock().peek_last_ref().map(|b| b.base().clone()) + self.sealing_work.lock().queue.peek_last_ref().map(|b| b.base().clone()) } /// Prepares new block for sealing including top transactions from queue. @@ -258,7 +260,7 @@ impl Miner { let (transactions, mut open_block, original_work_hash) = { let transactions = {self.transaction_queue.lock().top_transactions()}; let mut sealing_work = self.sealing_work.lock(); - let last_work_hash = sealing_work.peek_last_ref().map(|pb| pb.block().fields().header.hash()); + let last_work_hash = sealing_work.queue.peek_last_ref().map(|pb| pb.block().fields().header.hash()); let best_hash = chain.best_block_header().sha3(); /* // check to see if last ClosedBlock in would_seals is actually same parent block. @@ -268,7 +270,7 @@ impl Miner { // otherwise, leave everything alone. // otherwise, author a fresh block. */ - let open_block = match sealing_work.pop_if(|b| b.block().fields().header.parent_hash() == &best_hash) { + let open_block = match sealing_work.queue.pop_if(|b| b.block().fields().header.parent_hash() == &best_hash) { Some(old_block) => { trace!(target: "miner", "Already have previous work; updating and returning"); // add transactions to old_block @@ -359,7 +361,7 @@ impl Miner { let (work, is_new) = { let mut sealing_work = self.sealing_work.lock(); - let last_work_hash = sealing_work.peek_last_ref().map(|pb| pb.block().fields().header.hash()); + let last_work_hash = sealing_work.queue.peek_last_ref().map(|pb| pb.block().fields().header.hash()); trace!(target: "miner", "Checking whether we need to reseal: orig={:?} last={:?}, this={:?}", original_work_hash, last_work_hash, block.block().fields().header.hash()); let (work, is_new) = if last_work_hash.map_or(true, |h| h != block.block().fields().header.hash()) { trace!(target: "miner", "Pushing a new, refreshed or borrowed pending {}...", block.block().fields().header.hash()); @@ -367,16 +369,16 @@ impl Miner { let number = block.block().fields().header.number(); let difficulty = *block.block().fields().header.difficulty(); let is_new = original_work_hash.map_or(true, |h| block.block().fields().header.hash() != h); - sealing_work.push(block); + sealing_work.queue.push(block); // If push notifications are enabled we assume all work items are used. if self.work_poster.is_some() && is_new { - sealing_work.use_last_ref(); + sealing_work.queue.use_last_ref(); } (Some((pow_hash, difficulty, number)), is_new) } else { (None, false) }; - trace!(target: "miner", "prepare_sealing: leaving (last={:?})", sealing_work.peek_last_ref().map(|b| b.block().fields().header.hash())); + trace!(target: "miner", "prepare_sealing: leaving (last={:?})", sealing_work.queue.peek_last_ref().map(|b| b.block().fields().header.hash())); (work, is_new) }; if is_new { @@ -393,14 +395,22 @@ impl Miner { /// Returns true if we had to prepare new pending block fn enable_and_prepare_sealing(&self, chain: &MiningBlockChainClient) -> bool { trace!(target: "miner", "enable_and_prepare_sealing: entering"); - let have_work = self.sealing_work.lock().peek_last_ref().is_some(); - trace!(target: "miner", "enable_and_prepare_sealing: have_work={}", have_work); - if !have_work { + let prepare_new = { + let mut sealing_work = self.sealing_work.lock(); + let have_work = sealing_work.queue.peek_last_ref().is_some(); + trace!(target: "miner", "enable_and_prepare_sealing: have_work={}", have_work); + if !have_work { + sealing_work.enabled = true; + true + } else { + false + } + }; + if prepare_new { // -------------------------------------------------------------------------- // | NOTE Code below requires transaction_queue and sealing_work locks. | // | Make sure to release the locks before calling that method. | // -------------------------------------------------------------------------- - self.sealing_enabled.store(true, atomic::Ordering::Relaxed); self.prepare_sealing(chain); } let mut sealing_block_last_request = self.sealing_block_last_request.lock(); @@ -410,8 +420,8 @@ impl Miner { *sealing_block_last_request = best_number; } - // Return if - !have_work + // Return if we restarted + prepare_new } fn add_transactions_to_queue(&self, chain: &MiningBlockChainClient, transactions: Vec, origin: TransactionOrigin, transaction_queue: &mut TransactionQueue) -> @@ -450,13 +460,13 @@ impl MinerService for Miner { MinerStatus { transactions_in_pending_queue: status.pending, transactions_in_future_queue: status.future, - transactions_in_pending_block: sealing_work.peek_last_ref().map_or(0, |b| b.transactions().len()), + transactions_in_pending_block: sealing_work.queue.peek_last_ref().map_or(0, |b| b.transactions().len()), } } fn call(&self, chain: &MiningBlockChainClient, t: &SignedTransaction, analytics: CallAnalytics) -> Result { let sealing_work = self.sealing_work.lock(); - match sealing_work.peek_last_ref() { + match sealing_work.queue.peek_last_ref() { Some(work) => { let block = work.block(); @@ -503,7 +513,7 @@ impl MinerService for Miner { fn balance(&self, chain: &MiningBlockChainClient, address: &Address) -> U256 { let sealing_work = self.sealing_work.lock(); - sealing_work.peek_last_ref().map_or_else( + sealing_work.queue.peek_last_ref().map_or_else( || chain.latest_balance(address), |b| b.block().fields().state.balance(address) ) @@ -511,7 +521,7 @@ impl MinerService for Miner { fn storage_at(&self, chain: &MiningBlockChainClient, address: &Address, position: &H256) -> H256 { let sealing_work = self.sealing_work.lock(); - sealing_work.peek_last_ref().map_or_else( + sealing_work.queue.peek_last_ref().map_or_else( || chain.latest_storage_at(address, position), |b| b.block().fields().state.storage_at(address, position) ) @@ -519,12 +529,12 @@ impl MinerService for Miner { fn nonce(&self, chain: &MiningBlockChainClient, address: &Address) -> U256 { let sealing_work = self.sealing_work.lock(); - sealing_work.peek_last_ref().map_or_else(|| chain.latest_nonce(address), |b| b.block().fields().state.nonce(address)) + sealing_work.queue.peek_last_ref().map_or_else(|| chain.latest_nonce(address), |b| b.block().fields().state.nonce(address)) } fn code(&self, chain: &MiningBlockChainClient, address: &Address) -> Option { let sealing_work = self.sealing_work.lock(); - sealing_work.peek_last_ref().map_or_else(|| chain.code(address), |b| b.block().fields().state.code(address)) + sealing_work.queue.peek_last_ref().map_or_else(|| chain.code(address), |b| b.block().fields().state.code(address)) } fn set_author(&self, author: Address) { @@ -673,8 +683,8 @@ impl MinerService for Miner { let queue = self.transaction_queue.lock(); let sw = self.sealing_work.lock(); // TODO: should only use the sealing_work when it's current (it could be an old block) - let sealing_set = match self.sealing_enabled.load(atomic::Ordering::Relaxed) { - true => sw.peek_last_ref(), + let sealing_set = match sw.enabled { + true => sw.queue.peek_last_ref(), false => None, }; match (&self.options.pending_set, sealing_set) { @@ -686,8 +696,8 @@ impl MinerService for Miner { fn pending_transactions_hashes(&self) -> Vec { let queue = self.transaction_queue.lock(); let sw = self.sealing_work.lock(); - let sealing_set = match self.sealing_enabled.load(atomic::Ordering::Relaxed) { - true => sw.peek_last_ref(), + let sealing_set = match sw.enabled { + true => sw.queue.peek_last_ref(), false => None, }; match (&self.options.pending_set, sealing_set) { @@ -699,8 +709,8 @@ impl MinerService for Miner { fn transaction(&self, hash: &H256) -> Option { let queue = self.transaction_queue.lock(); let sw = self.sealing_work.lock(); - let sealing_set = match self.sealing_enabled.load(atomic::Ordering::Relaxed) { - true => sw.peek_last_ref(), + let sealing_set = match sw.enabled { + true => sw.queue.peek_last_ref(), false => None, }; match (&self.options.pending_set, sealing_set) { @@ -710,7 +720,8 @@ impl MinerService for Miner { } fn pending_receipts(&self) -> BTreeMap { - match (self.sealing_enabled.load(atomic::Ordering::Relaxed), self.sealing_work.lock().peek_last_ref()) { + let sealing_work = self.sealing_work.lock(); + match (sealing_work.enabled, sealing_work.queue.peek_last_ref()) { (true, Some(pending)) => { let hashes = pending.transactions() .iter() @@ -729,27 +740,43 @@ impl MinerService for Miner { } fn update_sealing(&self, chain: &MiningBlockChainClient) { - if self.sealing_enabled.load(atomic::Ordering::Relaxed) { - let current_no = chain.chain_info().best_block_number; - let has_local_transactions = self.transaction_queue.lock().has_local_pending_transactions(); - let last_request = *self.sealing_block_last_request.lock(); - let should_disable_sealing = !self.forced_sealing() - && !has_local_transactions - && current_no > last_request - && current_no - last_request > SEALING_TIMEOUT_IN_BLOCKS; - - if should_disable_sealing { - trace!(target: "miner", "Miner sleeping (current {}, last {})", current_no, last_request); - self.sealing_enabled.store(false, atomic::Ordering::Relaxed); - self.sealing_work.lock().reset(); + trace!(target: "miner", "update_sealing"); + let requires_reseal = { + let mut sealing_work = self.sealing_work.lock(); + if sealing_work.enabled { + trace!(target: "miner", "update_sealing: sealing enabled"); + let current_no = chain.chain_info().best_block_number; + let has_local_transactions = self.transaction_queue.lock().has_local_pending_transactions(); + let last_request = *self.sealing_block_last_request.lock(); + let should_disable_sealing = !self.forced_sealing() + && !has_local_transactions + && current_no > last_request + && current_no - last_request > SEALING_TIMEOUT_IN_BLOCKS; + + trace!(target: "miner", "update_sealing: should_disable_sealing={}; current_no={}, last_request={}", should_disable_sealing, current_no, last_request); + + if should_disable_sealing { + trace!(target: "miner", "Miner sleeping (current {}, last {})", current_no, last_request); + sealing_work.enabled = false; + sealing_work.queue.reset(); + false + } else { + // sealing enabled and we don't want to sleep. + *self.next_allowed_reseal.lock() = Instant::now() + self.options.reseal_min_period; + true + } } else { - *self.next_allowed_reseal.lock() = Instant::now() + self.options.reseal_min_period; - // -------------------------------------------------------------------------- - // | NOTE Code below requires transaction_queue and sealing_work locks. | - // | Make sure to release the locks before calling that method. | - // -------------------------------------------------------------------------- - self.prepare_sealing(chain); + // sealing is disabled. + false } + }; + + if requires_reseal { + // -------------------------------------------------------------------------- + // | NOTE Code below requires transaction_queue and sealing_work locks. | + // | Make sure to release the locks before calling that method. | + // -------------------------------------------------------------------------- + self.prepare_sealing(chain); } } @@ -758,13 +785,13 @@ impl MinerService for Miner { self.enable_and_prepare_sealing(chain); trace!(target: "miner", "map_sealing_work: sealing prepared"); let mut sealing_work = self.sealing_work.lock(); - let ret = sealing_work.use_last_ref(); + let ret = sealing_work.queue.use_last_ref(); trace!(target: "miner", "map_sealing_work: leaving use_last_ref={:?}", ret.as_ref().map(|b| b.block().fields().header.hash())); ret.map(f) } fn submit_seal(&self, chain: &MiningBlockChainClient, pow_hash: H256, seal: Vec) -> Result<(), Error> { - let result = if let Some(b) = self.sealing_work.lock().get_used_if(if self.options.enable_resubmission { GetAction::Clone } else { GetAction::Take }, |b| &b.hash() == &pow_hash) { + let result = if let Some(b) = self.sealing_work.lock().queue.get_used_if(if self.options.enable_resubmission { GetAction::Clone } else { GetAction::Take }, |b| &b.hash() == &pow_hash) { b.lock().try_seal(self.engine(), seal).or_else(|_| { warn!(target: "miner", "Mined solution rejected: Invalid."); Err(Error::PowInvalid) @@ -783,6 +810,8 @@ impl MinerService for Miner { } fn chain_new_blocks(&self, chain: &MiningBlockChainClient, _imported: &[H256], _invalid: &[H256], enacted: &[H256], retracted: &[H256]) { + trace!(target: "miner", "chain_new_blocks"); + fn fetch_transactions(chain: &MiningBlockChainClient, hash: &H256) -> Vec { let block = chain .block(BlockID::Hash(*hash)) @@ -838,11 +867,13 @@ impl MinerService for Miner { }); } - // -------------------------------------------------------------------------- - // | NOTE Code below requires transaction_queue and sealing_work locks. | - // | Make sure to release the locks before calling that method. | - // -------------------------------------------------------------------------- - self.update_sealing(chain); + if enacted.len() > 0 { + // -------------------------------------------------------------------------- + // | NOTE Code below requires transaction_queue and sealing_work locks. | + // | Make sure to release the locks before calling that method. | + // -------------------------------------------------------------------------- + self.update_sealing(chain); + } } }