Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Combine mining queue and enabled into single locked datum #1749

Merged
merged 4 commits into from
Jul 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ impl Client {
/// This is triggered by a message coming from a block queue when the block is ready for insertion
pub fn import_verified_blocks(&self) -> usize {
let max_blocks_to_import = 64;
let (imported_blocks, import_results, invalid_blocks, original_best, imported, duration) = {
let (imported_blocks, import_results, invalid_blocks, imported, duration) = {
let mut imported_blocks = Vec::with_capacity(max_blocks_to_import);
let mut invalid_blocks = HashSet::new();
let mut import_results = Vec::with_capacity(max_blocks_to_import);
Expand All @@ -354,8 +354,6 @@ impl Client {
let start = precise_time_ns();
let blocks = self.block_queue.drain(max_blocks_to_import);

let original_best = self.chain_info().best_block_hash;

for block in blocks {
let header = &block.header;
if invalid_blocks.contains(&header.parent_hash) {
Expand Down Expand Up @@ -389,7 +387,7 @@ impl Client {
}
}
let duration_ns = precise_time_ns() - start;
(imported_blocks, import_results, invalid_blocks, original_best, imported, duration_ns)
(imported_blocks, import_results, invalid_blocks, imported, duration_ns)
};

{
Expand All @@ -413,10 +411,6 @@ impl Client {
}
}

if self.chain_info().best_block_hash != original_best {
self.miner.update_sealing(self);
}

imported
}

Expand Down
151 changes: 91 additions & 60 deletions ethcore/src/miner/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

use rayon::prelude::*;
use std::sync::atomic::{self, AtomicBool};
use std::time::{Instant, Duration};

use util::*;
Expand Down Expand Up @@ -159,15 +158,20 @@ impl GasPricer {
}
}

struct SealingWork {
queue: UsingQueue<ClosedBlock>,
enabled: bool,
}

/// Keeps track of transactions using priority queue and holds currently mined block.
pub struct Miner {
// NOTE [ToDr] When locking always lock in this order!
transaction_queue: Arc<Mutex<TransactionQueue>>,
sealing_work: Mutex<UsingQueue<ClosedBlock>>,
sealing_work: Mutex<SealingWork>,

// for sealing...
options: MinerOptions,
sealing_enabled: AtomicBool,

next_allowed_reseal: Mutex<Instant>,
sealing_block_last_request: Mutex<u64>,
gas_range_target: RwLock<(U256, U256)>,
Expand All @@ -186,10 +190,9 @@ impl Miner {
Miner {
transaction_queue: Arc::new(Mutex::new(TransactionQueue::new())),
options: Default::default(),
sealing_enabled: AtomicBool::new(false),
next_allowed_reseal: Mutex::new(Instant::now()),
sealing_block_last_request: Mutex::new(0),
sealing_work: Mutex::new(UsingQueue::new(20)),
sealing_work: Mutex::new(SealingWork{queue: UsingQueue::new(20), enabled: false}),
gas_range_target: RwLock::new((U256::zero(), U256::zero())),
author: RwLock::new(Address::default()),
extra_data: RwLock::new(Vec::new()),
Expand All @@ -206,10 +209,9 @@ impl Miner {
let txq = Arc::new(Mutex::new(TransactionQueue::with_limits(options.tx_queue_size, options.tx_gas_limit)));
Arc::new(Miner {
transaction_queue: txq,
sealing_enabled: AtomicBool::new(options.force_sealing || !options.new_work_notify.is_empty()),
next_allowed_reseal: Mutex::new(Instant::now()),
sealing_block_last_request: Mutex::new(0),
sealing_work: Mutex::new(UsingQueue::new(options.work_queue_size)),
sealing_work: Mutex::new(SealingWork{queue: UsingQueue::new(options.work_queue_size), enabled: options.force_sealing || !options.new_work_notify.is_empty()}),
gas_range_target: RwLock::new((U256::zero(), U256::zero())),
author: RwLock::new(Address::default()),
extra_data: RwLock::new(Vec::new()),
Expand All @@ -231,12 +233,12 @@ impl Miner {

/// Get `Some` `clone()` of the current pending block's state or `None` if we're not sealing.
pub fn pending_state(&self) -> Option<State> {
self.sealing_work.lock().peek_last_ref().map(|b| b.block().fields().state.clone())
self.sealing_work.lock().queue.peek_last_ref().map(|b| b.block().fields().state.clone())
}

/// Get `Some` `clone()` of the current pending block's state or `None` if we're not sealing.
pub fn pending_block(&self) -> Option<Block> {
self.sealing_work.lock().peek_last_ref().map(|b| b.base().clone())
self.sealing_work.lock().queue.peek_last_ref().map(|b| b.base().clone())
}

/// Prepares new block for sealing including top transactions from queue.
Expand All @@ -258,7 +260,7 @@ impl Miner {
let (transactions, mut open_block, original_work_hash) = {
let transactions = {self.transaction_queue.lock().top_transactions()};
let mut sealing_work = self.sealing_work.lock();
let last_work_hash = sealing_work.peek_last_ref().map(|pb| pb.block().fields().header.hash());
let last_work_hash = sealing_work.queue.peek_last_ref().map(|pb| pb.block().fields().header.hash());
let best_hash = chain.best_block_header().sha3();
/*
// check to see if last ClosedBlock in would_seals is actually same parent block.
Expand All @@ -268,7 +270,7 @@ impl Miner {
// otherwise, leave everything alone.
// otherwise, author a fresh block.
*/
let open_block = match sealing_work.pop_if(|b| b.block().fields().header.parent_hash() == &best_hash) {
let open_block = match sealing_work.queue.pop_if(|b| b.block().fields().header.parent_hash() == &best_hash) {
Some(old_block) => {
trace!(target: "miner", "Already have previous work; updating and returning");
// add transactions to old_block
Expand Down Expand Up @@ -359,24 +361,24 @@ impl Miner {

let (work, is_new) = {
let mut sealing_work = self.sealing_work.lock();
let last_work_hash = sealing_work.peek_last_ref().map(|pb| pb.block().fields().header.hash());
let last_work_hash = sealing_work.queue.peek_last_ref().map(|pb| pb.block().fields().header.hash());
trace!(target: "miner", "Checking whether we need to reseal: orig={:?} last={:?}, this={:?}", original_work_hash, last_work_hash, block.block().fields().header.hash());
let (work, is_new) = if last_work_hash.map_or(true, |h| h != block.block().fields().header.hash()) {
trace!(target: "miner", "Pushing a new, refreshed or borrowed pending {}...", block.block().fields().header.hash());
let pow_hash = block.block().fields().header.hash();
let number = block.block().fields().header.number();
let difficulty = *block.block().fields().header.difficulty();
let is_new = original_work_hash.map_or(true, |h| block.block().fields().header.hash() != h);
sealing_work.push(block);
sealing_work.queue.push(block);
// If push notifications are enabled we assume all work items are used.
if self.work_poster.is_some() && is_new {
sealing_work.use_last_ref();
sealing_work.queue.use_last_ref();
}
(Some((pow_hash, difficulty, number)), is_new)
} else {
(None, false)
};
trace!(target: "miner", "prepare_sealing: leaving (last={:?})", sealing_work.peek_last_ref().map(|b| b.block().fields().header.hash()));
trace!(target: "miner", "prepare_sealing: leaving (last={:?})", sealing_work.queue.peek_last_ref().map(|b| b.block().fields().header.hash()));
(work, is_new)
};
if is_new {
Expand All @@ -393,14 +395,22 @@ impl Miner {
/// Returns true if we had to prepare new pending block
fn enable_and_prepare_sealing(&self, chain: &MiningBlockChainClient) -> bool {
trace!(target: "miner", "enable_and_prepare_sealing: entering");
let have_work = self.sealing_work.lock().peek_last_ref().is_some();
trace!(target: "miner", "enable_and_prepare_sealing: have_work={}", have_work);
if !have_work {
let prepare_new = {
let mut sealing_work = self.sealing_work.lock();
let have_work = sealing_work.queue.peek_last_ref().is_some();
trace!(target: "miner", "enable_and_prepare_sealing: have_work={}", have_work);
if !have_work {
sealing_work.enabled = true;
true
} else {
false
}
};
if prepare_new {
// --------------------------------------------------------------------------
// | NOTE Code below requires transaction_queue and sealing_work locks. |
// | Make sure to release the locks before calling that method. |
// --------------------------------------------------------------------------
self.sealing_enabled.store(true, atomic::Ordering::Relaxed);
self.prepare_sealing(chain);
}
let mut sealing_block_last_request = self.sealing_block_last_request.lock();
Expand All @@ -410,8 +420,8 @@ impl Miner {
*sealing_block_last_request = best_number;
}

// Return if
!have_work
// Return if we restarted
prepare_new
}

fn add_transactions_to_queue(&self, chain: &MiningBlockChainClient, transactions: Vec<SignedTransaction>, origin: TransactionOrigin, transaction_queue: &mut TransactionQueue) ->
Expand Down Expand Up @@ -450,13 +460,13 @@ impl MinerService for Miner {
MinerStatus {
transactions_in_pending_queue: status.pending,
transactions_in_future_queue: status.future,
transactions_in_pending_block: sealing_work.peek_last_ref().map_or(0, |b| b.transactions().len()),
transactions_in_pending_block: sealing_work.queue.peek_last_ref().map_or(0, |b| b.transactions().len()),
}
}

fn call(&self, chain: &MiningBlockChainClient, t: &SignedTransaction, analytics: CallAnalytics) -> Result<Executed, ExecutionError> {
let sealing_work = self.sealing_work.lock();
match sealing_work.peek_last_ref() {
match sealing_work.queue.peek_last_ref() {
Some(work) => {
let block = work.block();

Expand Down Expand Up @@ -503,28 +513,28 @@ impl MinerService for Miner {

fn balance(&self, chain: &MiningBlockChainClient, address: &Address) -> U256 {
let sealing_work = self.sealing_work.lock();
sealing_work.peek_last_ref().map_or_else(
sealing_work.queue.peek_last_ref().map_or_else(
|| chain.latest_balance(address),
|b| b.block().fields().state.balance(address)
)
}

fn storage_at(&self, chain: &MiningBlockChainClient, address: &Address, position: &H256) -> H256 {
let sealing_work = self.sealing_work.lock();
sealing_work.peek_last_ref().map_or_else(
sealing_work.queue.peek_last_ref().map_or_else(
|| chain.latest_storage_at(address, position),
|b| b.block().fields().state.storage_at(address, position)
)
}

fn nonce(&self, chain: &MiningBlockChainClient, address: &Address) -> U256 {
let sealing_work = self.sealing_work.lock();
sealing_work.peek_last_ref().map_or_else(|| chain.latest_nonce(address), |b| b.block().fields().state.nonce(address))
sealing_work.queue.peek_last_ref().map_or_else(|| chain.latest_nonce(address), |b| b.block().fields().state.nonce(address))
}

fn code(&self, chain: &MiningBlockChainClient, address: &Address) -> Option<Bytes> {
let sealing_work = self.sealing_work.lock();
sealing_work.peek_last_ref().map_or_else(|| chain.code(address), |b| b.block().fields().state.code(address))
sealing_work.queue.peek_last_ref().map_or_else(|| chain.code(address), |b| b.block().fields().state.code(address))
}

fn set_author(&self, author: Address) {
Expand Down Expand Up @@ -673,8 +683,8 @@ impl MinerService for Miner {
let queue = self.transaction_queue.lock();
let sw = self.sealing_work.lock();
// TODO: should only use the sealing_work when it's current (it could be an old block)
let sealing_set = match self.sealing_enabled.load(atomic::Ordering::Relaxed) {
true => sw.peek_last_ref(),
let sealing_set = match sw.enabled {
true => sw.queue.peek_last_ref(),
false => None,
};
match (&self.options.pending_set, sealing_set) {
Expand All @@ -686,8 +696,8 @@ impl MinerService for Miner {
fn pending_transactions_hashes(&self) -> Vec<H256> {
let queue = self.transaction_queue.lock();
let sw = self.sealing_work.lock();
let sealing_set = match self.sealing_enabled.load(atomic::Ordering::Relaxed) {
true => sw.peek_last_ref(),
let sealing_set = match sw.enabled {
true => sw.queue.peek_last_ref(),
false => None,
};
match (&self.options.pending_set, sealing_set) {
Expand All @@ -699,8 +709,8 @@ impl MinerService for Miner {
fn transaction(&self, hash: &H256) -> Option<SignedTransaction> {
let queue = self.transaction_queue.lock();
let sw = self.sealing_work.lock();
let sealing_set = match self.sealing_enabled.load(atomic::Ordering::Relaxed) {
true => sw.peek_last_ref(),
let sealing_set = match sw.enabled {
true => sw.queue.peek_last_ref(),
false => None,
};
match (&self.options.pending_set, sealing_set) {
Expand All @@ -710,7 +720,8 @@ impl MinerService for Miner {
}

fn pending_receipts(&self) -> BTreeMap<H256, Receipt> {
match (self.sealing_enabled.load(atomic::Ordering::Relaxed), self.sealing_work.lock().peek_last_ref()) {
let sealing_work = self.sealing_work.lock();
match (sealing_work.enabled, sealing_work.queue.peek_last_ref()) {
(true, Some(pending)) => {
let hashes = pending.transactions()
.iter()
Expand All @@ -729,27 +740,43 @@ impl MinerService for Miner {
}

fn update_sealing(&self, chain: &MiningBlockChainClient) {
if self.sealing_enabled.load(atomic::Ordering::Relaxed) {
let current_no = chain.chain_info().best_block_number;
let has_local_transactions = self.transaction_queue.lock().has_local_pending_transactions();
let last_request = *self.sealing_block_last_request.lock();
let should_disable_sealing = !self.forced_sealing()
&& !has_local_transactions
&& current_no > last_request
&& current_no - last_request > SEALING_TIMEOUT_IN_BLOCKS;

if should_disable_sealing {
trace!(target: "miner", "Miner sleeping (current {}, last {})", current_no, last_request);
self.sealing_enabled.store(false, atomic::Ordering::Relaxed);
self.sealing_work.lock().reset();
trace!(target: "miner", "update_sealing");
let requires_reseal = {
let mut sealing_work = self.sealing_work.lock();
if sealing_work.enabled {
trace!(target: "miner", "update_sealing: sealing enabled");
let current_no = chain.chain_info().best_block_number;
let has_local_transactions = self.transaction_queue.lock().has_local_pending_transactions();
let last_request = *self.sealing_block_last_request.lock();
let should_disable_sealing = !self.forced_sealing()
&& !has_local_transactions
&& current_no > last_request
&& current_no - last_request > SEALING_TIMEOUT_IN_BLOCKS;

trace!(target: "miner", "update_sealing: should_disable_sealing={}; current_no={}, last_request={}", should_disable_sealing, current_no, last_request);

if should_disable_sealing {
trace!(target: "miner", "Miner sleeping (current {}, last {})", current_no, last_request);
sealing_work.enabled = false;
sealing_work.queue.reset();
false
} else {
// sealing enabled and we don't want to sleep.
*self.next_allowed_reseal.lock() = Instant::now() + self.options.reseal_min_period;
true
}
} else {
*self.next_allowed_reseal.lock() = Instant::now() + self.options.reseal_min_period;
// --------------------------------------------------------------------------
// | NOTE Code below requires transaction_queue and sealing_work locks. |
// | Make sure to release the locks before calling that method. |
// --------------------------------------------------------------------------
self.prepare_sealing(chain);
// sealing is disabled.
false
}
};

if requires_reseal {
// --------------------------------------------------------------------------
// | NOTE Code below requires transaction_queue and sealing_work locks. |
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But sealing_work is not locked, is it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no - that's why it doesn't deadlock.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment needs updating then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could perhaps do to be made clear that the function doesn't actually need it to be locked, but rather needs it to be unlocked, since it needs to lock it.

// | Make sure to release the locks before calling that method. |
// --------------------------------------------------------------------------
self.prepare_sealing(chain);
}
}

Expand All @@ -758,13 +785,13 @@ impl MinerService for Miner {
self.enable_and_prepare_sealing(chain);
trace!(target: "miner", "map_sealing_work: sealing prepared");
let mut sealing_work = self.sealing_work.lock();
let ret = sealing_work.use_last_ref();
let ret = sealing_work.queue.use_last_ref();
trace!(target: "miner", "map_sealing_work: leaving use_last_ref={:?}", ret.as_ref().map(|b| b.block().fields().header.hash()));
ret.map(f)
}

fn submit_seal(&self, chain: &MiningBlockChainClient, pow_hash: H256, seal: Vec<Bytes>) -> Result<(), Error> {
let result = if let Some(b) = self.sealing_work.lock().get_used_if(if self.options.enable_resubmission { GetAction::Clone } else { GetAction::Take }, |b| &b.hash() == &pow_hash) {
let result = if let Some(b) = self.sealing_work.lock().queue.get_used_if(if self.options.enable_resubmission { GetAction::Clone } else { GetAction::Take }, |b| &b.hash() == &pow_hash) {
b.lock().try_seal(self.engine(), seal).or_else(|_| {
warn!(target: "miner", "Mined solution rejected: Invalid.");
Err(Error::PowInvalid)
Expand All @@ -783,6 +810,8 @@ impl MinerService for Miner {
}

fn chain_new_blocks(&self, chain: &MiningBlockChainClient, _imported: &[H256], _invalid: &[H256], enacted: &[H256], retracted: &[H256]) {
trace!(target: "miner", "chain_new_blocks");

fn fetch_transactions(chain: &MiningBlockChainClient, hash: &H256) -> Vec<SignedTransaction> {
let block = chain
.block(BlockID::Hash(*hash))
Expand Down Expand Up @@ -838,11 +867,13 @@ impl MinerService for Miner {
});
}

// --------------------------------------------------------------------------
// | NOTE Code below requires transaction_queue and sealing_work locks. |
// | Make sure to release the locks before calling that method. |
// --------------------------------------------------------------------------
self.update_sealing(chain);
if enacted.len() > 0 {
// --------------------------------------------------------------------------
// | NOTE Code below requires transaction_queue and sealing_work locks. |
// | Make sure to release the locks before calling that method. |
// --------------------------------------------------------------------------
self.update_sealing(chain);
}
}
}

Expand Down