Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Backport sealing fixes to beta #1583

Merged
merged 2 commits into from
Jul 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 6 additions & 17 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use util::*;
use util::panics::*;
use views::BlockView;
use error::{Error, ImportError, ExecutionError, BlockError, ImportResult};
use error::{ImportError, ExecutionError, BlockError, ImportResult};
use header::{BlockNumber, Header};
use state::State;
use spec::Spec;
Expand All @@ -38,7 +38,8 @@ use filter::Filter;
use log_entry::LocalizedLogEntry;
use block_queue::{BlockQueue, BlockQueueInfo};
use blockchain::{BlockChain, BlockProvider, TreeRoute, ImportRoute};
use client::{BlockID, TransactionID, UncleID, TraceId, ClientConfig, DatabaseCompactionProfile, BlockChainClient, MiningBlockChainClient, TraceFilter, CallAnalytics};
use client::{BlockID, TransactionID, UncleID, TraceId, ClientConfig, DatabaseCompactionProfile,
BlockChainClient, MiningBlockChainClient, TraceFilter, CallAnalytics };
use client::Error as ClientError;
use env_info::EnvInfo;
use executive::{Executive, Executed, TransactOptions, contract_address};
Expand All @@ -49,7 +50,7 @@ use trace;
pub use types::blockchain_info::BlockChainInfo;
pub use types::block_status::BlockStatus;
use evm::Factory as EvmFactory;
use miner::{Miner, MinerService, TransactionImportResult, AccountDetails};
use miner::{Miner, MinerService};

const MAX_TX_QUEUE_SIZE: usize = 4096;

Expand Down Expand Up @@ -384,12 +385,8 @@ impl Client {
pub fn import_queued_transactions(&self, transactions: &[Bytes]) -> usize {
let _timer = PerfTimer::new("import_queued_transactions");
self.queue_transactions.fetch_sub(transactions.len(), AtomicOrdering::SeqCst);
let fetch_account = |a: &Address| AccountDetails {
nonce: self.latest_nonce(a),
balance: self.latest_balance(a),
};
let tx = transactions.iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect();
let results = self.miner.import_transactions(self, tx, fetch_account);
let txs = transactions.iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect();
let results = self.miner.import_external_transactions(self, txs);
results.len()
}

Expand Down Expand Up @@ -772,14 +769,6 @@ impl BlockChainClient for Client {
self.build_last_hashes(self.chain.best_block_hash())
}

fn import_transactions(&self, transactions: Vec<SignedTransaction>) -> Vec<Result<TransactionImportResult, Error>> {
let fetch_account = |a: &Address| AccountDetails {
nonce: self.latest_nonce(a),
balance: self.latest_balance(a),
};
self.miner.import_transactions(self, transactions, fetch_account)
}

fn queue_transactions(&self, transactions: Vec<Bytes>) {
if self.queue_transactions.load(AtomicOrdering::Relaxed) > MAX_TX_QUEUE_SIZE {
debug!("Ignoring {} transactions: queue is full", transactions.len());
Expand Down
5 changes: 0 additions & 5 deletions ethcore/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ use error::{ImportResult, ExecutionError};
use receipt::LocalizedReceipt;
use trace::LocalizedTrace;
use evm::Factory as EvmFactory;
use miner::{TransactionImportResult};
use error::Error as EthError;

/// Options concerning what analytics we run on the call.
#[derive(Eq, PartialEq, Default, Clone, Copy, Debug)]
Expand Down Expand Up @@ -187,9 +185,6 @@ pub trait BlockChainClient : Sync + Send {
/// Get last hashes starting from best block.
fn last_hashes(&self) -> LastHashes;

/// import transactions from network/other 3rd party
fn import_transactions(&self, transactions: Vec<SignedTransaction>) -> Vec<Result<TransactionImportResult, EthError>>;

/// Queue transactions for importing.
fn queue_transactions(&self, transactions: Vec<Bytes>);

Expand Down
29 changes: 12 additions & 17 deletions ethcore/src/client/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrder};
use util::*;
use transaction::{Transaction, LocalizedTransaction, SignedTransaction, Action};
use blockchain::TreeRoute;
use client::{BlockChainClient, MiningBlockChainClient, BlockChainInfo, BlockStatus, BlockID, TransactionID, UncleID, TraceId, TraceFilter, LastHashes, CallAnalytics};
use client::{BlockChainClient, MiningBlockChainClient, BlockChainInfo, BlockStatus, BlockID,
TransactionID, UncleID, TraceId, TraceFilter, LastHashes, CallAnalytics };
use header::{Header as BlockHeader, BlockNumber};
use filter::Filter;
use log_entry::LocalizedLogEntry;
Expand All @@ -37,9 +38,6 @@ use executive::Executed;
use error::{ExecutionError};
use trace::LocalizedTrace;

use miner::{TransactionImportResult, AccountDetails};
use error::Error as EthError;

/// Test client.
pub struct TestBlockChainClient {
/// Blocks.
Expand Down Expand Up @@ -274,6 +272,10 @@ impl BlockChainClient for TestBlockChainClient {
}
}

fn latest_nonce(&self, address: &Address) -> U256 {
self.nonce(address, BlockID::Latest).unwrap()
}

fn code(&self, address: &Address) -> Option<Bytes> {
self.code.read().unwrap().get(address).cloned()
}
Expand All @@ -286,6 +288,10 @@ impl BlockChainClient for TestBlockChainClient {
}
}

fn latest_balance(&self, address: &Address) -> U256 {
self.balance(address, BlockID::Latest).unwrap()
}

fn storage_at(&self, address: &Address, position: &H256, id: BlockID) -> Option<H256> {
if let BlockID::Latest = id {
Some(self.storage.read().unwrap().get(&(address.clone(), position.clone())).cloned().unwrap_or_else(H256::new))
Expand Down Expand Up @@ -487,21 +493,10 @@ impl BlockChainClient for TestBlockChainClient {
unimplemented!();
}

fn import_transactions(&self, transactions: Vec<SignedTransaction>) -> Vec<Result<TransactionImportResult, EthError>> {
let nonces = self.nonces.read().unwrap();
let balances = self.balances.read().unwrap();
let fetch_account = |a: &Address| AccountDetails {
nonce: nonces[a],
balance: balances[a],
};

self.miner.import_transactions(self, transactions, &fetch_account)
}

fn queue_transactions(&self, transactions: Vec<Bytes>) {
// import right here
let tx = transactions.into_iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect();
self.import_transactions(tx);
let txs = transactions.into_iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect();
self.miner.import_external_transactions(self, txs);
}

fn pending_transactions(&self) -> Vec<SignedTransaction> {
Expand Down
79 changes: 58 additions & 21 deletions ethcore/src/miner/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,10 @@ impl Miner {
let have_work = self.sealing_work.lock().unwrap().peek_last_ref().is_some();
trace!(target: "miner", "enable_and_prepare_sealing: have_work={}", have_work);
if !have_work {
// --------------------------------------------------------------------------
// | NOTE Code below requires transaction_queue and sealing_work locks. |
// | Make sure to release the locks before calling that method. |
// --------------------------------------------------------------------------
self.sealing_enabled.store(true, atomic::Ordering::Relaxed);
self.prepare_sealing(chain);
}
Expand All @@ -313,6 +317,19 @@ impl Miner {
!have_work
}

fn add_transactions_to_queue(&self, chain: &MiningBlockChainClient, transactions: Vec<SignedTransaction>, origin: TransactionOrigin, transaction_queue: &mut TransactionQueue) ->
Vec<Result<TransactionImportResult, Error>> {

let fetch_account = |a: &Address| AccountDetails {
nonce: chain.latest_nonce(a),
balance: chain.latest_balance(a),
};

transactions.into_iter()
.map(|tx| transaction_queue.add(tx, &fetch_account, origin))
.collect()
}

/// Are we allowed to do a non-mandatory reseal?
fn tx_reseal_allowed(&self) -> bool { Instant::now() > *self.next_allowed_reseal.lock().unwrap() }
}
Expand All @@ -323,6 +340,10 @@ impl MinerService for Miner {

fn clear_and_reset(&self, chain: &MiningBlockChainClient) {
self.transaction_queue.lock().unwrap().clear();
// --------------------------------------------------------------------------
// | NOTE Code below requires transaction_queue and sealing_work locks. |
// | Make sure to release the locks before calling that method. |
// --------------------------------------------------------------------------
self.update_sealing(chain);
}

Expand Down Expand Up @@ -476,35 +497,39 @@ impl MinerService for Miner {
self.gas_range_target.read().unwrap().1
}

fn import_transactions<T>(&self, chain: &MiningBlockChainClient, transactions: Vec<SignedTransaction>, fetch_account: T) ->
Vec<Result<TransactionImportResult, Error>>
where T: Fn(&Address) -> AccountDetails {
let results: Vec<Result<TransactionImportResult, Error>> = {
fn import_external_transactions(&self, chain: &MiningBlockChainClient, transactions: Vec<SignedTransaction>) ->
Vec<Result<TransactionImportResult, Error>> {

let results = {
let mut transaction_queue = self.transaction_queue.lock().unwrap();
transactions.into_iter()
.map(|tx| transaction_queue.add(tx, &fetch_account, TransactionOrigin::External))
.collect()
self.add_transactions_to_queue(
chain, transactions, TransactionOrigin::External, &mut transaction_queue
)
};
if !results.is_empty() && self.options.reseal_on_external_tx && self.tx_reseal_allowed() {

if !results.is_empty() && self.options.reseal_on_external_tx && self.tx_reseal_allowed() {
// --------------------------------------------------------------------------
// | NOTE Code below requires transaction_queue and sealing_work locks. |
// | Make sure to release the locks before calling that method. |
// --------------------------------------------------------------------------
self.update_sealing(chain);
}
results
}

fn import_own_transaction<T>(
fn import_own_transaction(
&self,
chain: &MiningBlockChainClient,
transaction: SignedTransaction,
fetch_account: T
) -> Result<TransactionImportResult, Error> where T: Fn(&Address) -> AccountDetails {
) -> Result<TransactionImportResult, Error> {

let hash = transaction.hash();
trace!(target: "own_tx", "Importing transaction: {:?}", transaction);

let imported = {
// Be sure to release the lock before we call enable_and_prepare_sealing
let mut transaction_queue = self.transaction_queue.lock().unwrap();
let import = transaction_queue.add(transaction, &fetch_account, TransactionOrigin::Local);
let import = self.add_transactions_to_queue(chain, vec![transaction], TransactionOrigin::Local, &mut transaction_queue).pop().unwrap();

match import {
Ok(ref res) => {
Expand All @@ -520,6 +545,10 @@ impl MinerService for Miner {
import
};

// --------------------------------------------------------------------------
// | NOTE Code below requires transaction_queue and sealing_work locks. |
// | Make sure to release the locks before calling that method. |
// --------------------------------------------------------------------------
if imported.is_ok() && self.options.reseal_on_own_tx && self.tx_reseal_allowed() {
// Make sure to do it after transaction is imported and lock is droped.
// We need to create pending block and enable sealing
Expand Down Expand Up @@ -614,6 +643,10 @@ impl MinerService for Miner {
self.sealing_work.lock().unwrap().reset();
} else {
*self.next_allowed_reseal.lock().unwrap() = Instant::now() + self.options.reseal_min_period;
// --------------------------------------------------------------------------
// | NOTE Code below requires transaction_queue and sealing_work locks. |
// | Make sure to release the locks before calling that method. |
// --------------------------------------------------------------------------
self.prepare_sealing(chain);
}
}
Expand Down Expand Up @@ -655,7 +688,12 @@ impl MinerService for Miner {
// Client should send message after commit to db and inserting to chain.
.expect("Expected in-chain blocks.");
let block = BlockView::new(&block);
block.transactions()
let txs = block.transactions();
// populate sender
for tx in &txs {
let _sender = tx.sender();
}
txs
}

// 1. We ignore blocks that were `imported` (because it means that they are not in canon-chain, and transactions
Expand All @@ -672,14 +710,9 @@ impl MinerService for Miner {
.par_iter()
.map(|h| fetch_transactions(chain, h));
out_of_chain.for_each(|txs| {
// populate sender
for tx in &txs {
let _sender = tx.sender();
}
let _ = self.import_transactions(chain, txs, |a| AccountDetails {
nonce: chain.latest_nonce(a),
balance: chain.latest_balance(a),
});
let mut transaction_queue = self.transaction_queue.lock().unwrap();
let _ = self.add_transactions_to_queue(chain, txs, TransactionOrigin::External,
&mut transaction_queue);
});
}

Expand All @@ -703,6 +736,10 @@ impl MinerService for Miner {
});
}

// --------------------------------------------------------------------------
// | NOTE Code below requires transaction_queue and sealing_work locks. |
// | Make sure to release the locks before calling that method. |
// --------------------------------------------------------------------------
self.update_sealing(chain);
}
}
Expand Down
10 changes: 4 additions & 6 deletions ethcore/src/miner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,12 @@ pub trait MinerService : Send + Sync {
fn set_tx_gas_limit(&self, limit: U256);

/// Imports transactions to transaction queue.
fn import_transactions<T>(&self, chain: &MiningBlockChainClient, transactions: Vec<SignedTransaction>, fetch_account: T) ->
Vec<Result<TransactionImportResult, Error>>
where T: Fn(&Address) -> AccountDetails, Self: Sized;
fn import_external_transactions(&self, chain: &MiningBlockChainClient, transactions: Vec<SignedTransaction>) ->
Vec<Result<TransactionImportResult, Error>>;

/// Imports own (node owner) transaction to queue.
fn import_own_transaction<T>(&self, chain: &MiningBlockChainClient, transaction: SignedTransaction, fetch_account: T) ->
Result<TransactionImportResult, Error>
where T: Fn(&Address) -> AccountDetails, Self: Sized;
fn import_own_transaction(&self, chain: &MiningBlockChainClient, transaction: SignedTransaction) ->
Result<TransactionImportResult, Error>;

/// Returns hashes of transactions currently in pending
fn pending_transactions_hashes(&self) -> Vec<H256>;
Expand Down
9 changes: 2 additions & 7 deletions rpc/src/v1/impls/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub use self::rpc::RpcClient;

use v1::types::TransactionRequest;
use ethcore::error::Error as EthcoreError;
use ethcore::miner::{AccountDetails, MinerService};
use ethcore::miner::MinerService;
use ethcore::client::MiningBlockChainClient;
use ethcore::transaction::{Action, SignedTransaction, Transaction};
use ethcore::account_provider::{AccountProvider, Error as AccountError};
Expand All @@ -79,12 +79,7 @@ fn dispatch_transaction<C, M>(client: &C, miner: &M, signed_transaction: SignedT
where C: MiningBlockChainClient, M: MinerService {
let hash = signed_transaction.hash();

let import = miner.import_own_transaction(client, signed_transaction, |a: &Address| {
AccountDetails {
nonce: client.latest_nonce(&a),
balance: client.latest_balance(&a),
}
});
let import = miner.import_own_transaction(client, signed_transaction);

import
.map_err(transaction_error)
Expand Down
14 changes: 6 additions & 8 deletions rpc/src/v1/tests/helpers/miner_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use ethcore::client::{MiningBlockChainClient, Executed, CallAnalytics};
use ethcore::block::{ClosedBlock, IsBlock};
use ethcore::transaction::SignedTransaction;
use ethcore::receipt::Receipt;
use ethcore::miner::{MinerService, MinerStatus, AccountDetails, TransactionImportResult};
use ethcore::miner::{MinerService, MinerStatus, TransactionImportResult};

/// Test miner service.
pub struct TestMinerService {
Expand Down Expand Up @@ -130,14 +130,13 @@ impl MinerService for TestMinerService {
}

/// Imports transactions to transaction queue.
fn import_transactions<T>(&self, _chain: &MiningBlockChainClient, transactions: Vec<SignedTransaction>, fetch_account: T) ->
Vec<Result<TransactionImportResult, Error>>
where T: Fn(&Address) -> AccountDetails {
fn import_external_transactions(&self, _chain: &MiningBlockChainClient, transactions: Vec<SignedTransaction>) ->
Vec<Result<TransactionImportResult, Error>> {
// lets assume that all txs are valid
self.imported_transactions.lock().unwrap().extend_from_slice(&transactions);

for sender in transactions.iter().filter_map(|t| t.sender().ok()) {
let nonce = self.last_nonce(&sender).unwrap_or(fetch_account(&sender).nonce);
let nonce = self.last_nonce(&sender).expect("last_nonce must be populated in tests");
self.last_nonces.write().unwrap().insert(sender, nonce + U256::from(1));
}
transactions
Expand All @@ -147,9 +146,8 @@ impl MinerService for TestMinerService {
}

/// Imports transactions to transaction queue.
fn import_own_transaction<T>(&self, chain: &MiningBlockChainClient, transaction: SignedTransaction, _fetch_account: T) ->
Result<TransactionImportResult, Error>
where T: Fn(&Address) -> AccountDetails {
fn import_own_transaction(&self, chain: &MiningBlockChainClient, transaction: SignedTransaction) ->
Result<TransactionImportResult, Error> {

// keep the pending nonces up to date
if let Ok(ref sender) = transaction.sender() {
Expand Down