Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.

Transaction Pool improvements #8470

Merged
merged 5 commits into from
May 2, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions miner/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ impl VerifiedTransaction {
}

impl txpool::VerifiedTransaction for VerifiedTransaction {
type Hash = H256;
type Sender = Address;

fn hash(&self) -> &H256 {
&self.hash
}
Expand Down
4 changes: 2 additions & 2 deletions miner/src/pool/ready.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ impl<C: NonceClient> txpool::Ready<VerifiedTransaction> for State<C> {
match tx.transaction.nonce.cmp(nonce) {
// Before marking as future check for stale ids
cmp::Ordering::Greater => match self.stale_id {
Some(id) if tx.insertion_id() < id => txpool::Readiness::Stalled,
Some(id) if tx.insertion_id() < id => txpool::Readiness::Stale,
_ => txpool::Readiness::Future,
},
cmp::Ordering::Less => txpool::Readiness::Stalled,
cmp::Ordering::Less => txpool::Readiness::Stale,
cmp::Ordering::Equal => {
*nonce = *nonce + 1.into();
txpool::Readiness::Ready
Expand Down
2 changes: 2 additions & 0 deletions transaction-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ error-chain = "0.11"
log = "0.3"
smallvec = "0.4"
trace-time = { path = "../util/trace-time" }

[dev-dependencies]
ethereum-types = "0.3"
16 changes: 9 additions & 7 deletions transaction-pool/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,26 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

use ethereum_types::H256;
/// Error chain doesn't let us have generic types.
/// So the hashes are converted to debug strings for easy display.
type Hash = String;

error_chain! {
errors {
/// Transaction is already imported
AlreadyImported(hash: H256) {
AlreadyImported(hash: Hash) {
description("transaction is already in the pool"),
display("[{:?}] already imported", hash)
display("[{}] already imported", hash)
}
/// Transaction is too cheap to enter the queue
TooCheapToEnter(hash: H256, min_score: String) {
TooCheapToEnter(hash: Hash, min_score: String) {
description("the pool is full and transaction is too cheap to replace any transaction"),
display("[{:?}] too cheap to enter the pool. Min score: {}", hash, min_score)
display("[{}] too cheap to enter the pool. Min score: {}", hash, min_score)
}
/// Transaction is too cheap to replace existing transaction that occupies the same slot.
TooCheapToReplace(old_hash: H256, hash: H256) {
TooCheapToReplace(old_hash: Hash, hash: Hash) {
description("transaction is too cheap to replace existing transaction in the pool"),
display("[{:?}] too cheap to replace: {:?}", hash, old_hash)
display("[{}] too cheap to replace: {}", hash, old_hash)
}
}
}
Expand Down
18 changes: 12 additions & 6 deletions transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,15 @@
#![warn(missing_docs)]

extern crate smallvec;
extern crate ethereum_types;
extern crate trace_time;

#[macro_use]
extern crate error_chain;
#[macro_use]
extern crate log;

extern crate trace_time;
#[cfg(test)]
extern crate ethereum_types;

#[cfg(test)]
mod tests;
Expand All @@ -102,19 +103,24 @@ pub use self::status::{LightStatus, Status};
pub use self::verifier::Verifier;

use std::fmt;

use ethereum_types::{H256, Address};
use std::hash::Hash;

/// Already verified transaction that can be safely queued.
pub trait VerifiedTransaction: fmt::Debug {
/// Transaction hash type.
type Hash: fmt::Debug + Eq + Clone + Hash;

/// Transaction sender type.
type Sender: fmt::Debug + Eq + Clone + Hash;

/// Transaction hash
fn hash(&self) -> &H256;
fn hash(&self) -> &Self::Hash;

/// Memory usage
fn mem_usage(&self) -> usize;

/// Transaction sender
fn sender(&self) -> &Address;
fn sender(&self) -> &Self::Sender;

/// Unique index of insertion (lower = older).
fn insertion_id(&self) -> u64;
Expand Down
42 changes: 19 additions & 23 deletions transaction-pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
use std::sync::Arc;
use std::collections::{HashMap, BTreeSet};

use ethereum_types::{H160, H256};

use error;
use listener::{Listener, NoopListener};
use options::Options;
Expand All @@ -29,18 +27,16 @@ use transactions::{AddResult, Transactions};

use {VerifiedTransaction};

type Sender = H160;

/// A transaction pool.
#[derive(Debug)]
pub struct Pool<T, S: Scoring<T>, L = NoopListener> {
pub struct Pool<T: VerifiedTransaction, S: Scoring<T>, L = NoopListener> {
listener: L,
scoring: S,
options: Options,
mem_usage: usize,

transactions: HashMap<Sender, Transactions<T, S>>,
by_hash: HashMap<H256, Arc<T>>,
transactions: HashMap<T::Sender, Transactions<T, S>>,
by_hash: HashMap<T::Hash, Arc<T>>,

best_transactions: BTreeSet<ScoreWithRef<T, S::Score>>,
worst_transactions: BTreeSet<ScoreWithRef<T, S::Score>>,
Expand Down Expand Up @@ -107,7 +103,7 @@ impl<T, S, L> Pool<T, S, L> where
pub fn import(&mut self, mut transaction: T) -> error::Result<Arc<T>> {
let mem_usage = transaction.mem_usage();

ensure!(!self.by_hash.contains_key(transaction.hash()), error::ErrorKind::AlreadyImported(*transaction.hash()));
ensure!(!self.by_hash.contains_key(transaction.hash()), error::ErrorKind::AlreadyImported(format!("{:?}", transaction.hash())));

// TODO [ToDr] Most likely move this after the transaction is inserted.
// Avoid using should_replace, but rather use scoring for that.
Expand Down Expand Up @@ -138,7 +134,7 @@ impl<T, S, L> Pool<T, S, L> where
}

let (result, prev_state, current_state) = {
let transactions = self.transactions.entry(*transaction.sender()).or_insert_with(Transactions::default);
let transactions = self.transactions.entry(transaction.sender().clone()).or_insert_with(Transactions::default);
// get worst and best transactions for comparison
let prev = transactions.worst_and_best();
let result = transactions.add(transaction, &self.scoring, self.options.max_per_sender);
Expand All @@ -162,12 +158,12 @@ impl<T, S, L> Pool<T, S, L> where
Ok(new)
},
AddResult::TooCheap { new, old } => {
let error = error::ErrorKind::TooCheapToReplace(*old.hash(), *new.hash());
let error = error::ErrorKind::TooCheapToReplace(format!("{:?}", old.hash()), format!("{:?}", new.hash()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the debug formatting for hashes? Should we require AsHex and use {:x} instead? (Same comment applies in lines below).

self.listener.rejected(&Arc::new(new), &error);
bail!(error)
},
AddResult::TooCheapToEnter(new, score) => {
let error = error::ErrorKind::TooCheapToEnter(*new.hash(), format!("{:?}", score));
let error = error::ErrorKind::TooCheapToEnter(format!("{:?}", new.hash()), format!("{:?}", score));
self.listener.rejected(&Arc::new(new), &error);
bail!(error)
}
Expand All @@ -177,15 +173,15 @@ impl<T, S, L> Pool<T, S, L> where
/// Updates state of the pool statistics if the transaction was added to a set.
fn finalize_insert(&mut self, new: &Arc<T>, old: Option<&Arc<T>>) {
self.mem_usage += new.mem_usage();
self.by_hash.insert(*new.hash(), new.clone());
self.by_hash.insert(new.hash().clone(), new.clone());

if let Some(old) = old {
self.finalize_remove(old.hash());
}
}

/// Updates the pool statistics if transaction was removed.
fn finalize_remove(&mut self, hash: &H256) -> Option<Arc<T>> {
fn finalize_remove(&mut self, hash: &T::Hash) -> Option<Arc<T>> {
self.by_hash.remove(hash).map(|old| {
self.mem_usage -= old.mem_usage();
old
Expand Down Expand Up @@ -243,14 +239,14 @@ impl<T, S, L> Pool<T, S, L> where
// No elements to remove? and the pool is still full?
None => {
warn!("The pool is full but there are no transactions to remove.");
return Err(error::ErrorKind::TooCheapToEnter(*transaction.hash(), "unknown".into()).into());
return Err(error::ErrorKind::TooCheapToEnter(format!("{:?}", transaction.hash()), "unknown".into()).into());
},
Some(old) => if self.scoring.should_replace(&old.transaction, transaction) {
// New transaction is better than the worst one so we can replace it.
old.clone()
} else {
// otherwise fail
return Err(error::ErrorKind::TooCheapToEnter(*transaction.hash(), format!("{:?}", old.score)).into())
return Err(error::ErrorKind::TooCheapToEnter(format!("{:?}", transaction.hash()), format!("{:?}", old.score)).into())
},
};

Expand All @@ -263,7 +259,7 @@ impl<T, S, L> Pool<T, S, L> where
}

/// Removes transaction from sender's transaction `HashMap`.
fn remove_from_set<R, F: FnOnce(&mut Transactions<T, S>, &S) -> R>(&mut self, sender: &Sender, f: F) -> Option<R> {
fn remove_from_set<R, F: FnOnce(&mut Transactions<T, S>, &S) -> R>(&mut self, sender: &T::Sender, f: F) -> Option<R> {
let (prev, next, result) = if let Some(set) = self.transactions.get_mut(sender) {
let prev = set.worst_and_best();
let result = f(set, &self.scoring);
Expand Down Expand Up @@ -293,7 +289,7 @@ impl<T, S, L> Pool<T, S, L> where
/// Removes single transaction from the pool.
/// Depending on the `is_invalid` flag the listener
/// will either get a `cancelled` or `invalid` notification.
pub fn remove(&mut self, hash: &H256, is_invalid: bool) -> Option<Arc<T>> {
pub fn remove(&mut self, hash: &T::Hash, is_invalid: bool) -> Option<Arc<T>> {
if let Some(tx) = self.finalize_remove(hash) {
self.remove_from_set(tx.sender(), |set, scoring| {
set.remove(&tx, scoring)
Expand All @@ -310,7 +306,7 @@ impl<T, S, L> Pool<T, S, L> where
}

/// Removes all stalled transactions from given sender.
fn remove_stalled<R: Ready<T>>(&mut self, sender: &Sender, ready: &mut R) -> usize {
fn remove_stalled<R: Ready<T>>(&mut self, sender: &T::Sender, ready: &mut R) -> usize {
let removed_from_set = self.remove_from_set(sender, |transactions, scoring| {
transactions.cull(ready, scoring)
});
Expand All @@ -329,7 +325,7 @@ impl<T, S, L> Pool<T, S, L> where
}

/// Removes all stalled transactions from given sender list (or from all senders).
pub fn cull<R: Ready<T>>(&mut self, senders: Option<&[Sender]>, mut ready: R) -> usize {
pub fn cull<R: Ready<T>>(&mut self, senders: Option<&[T::Sender]>, mut ready: R) -> usize {
let mut removed = 0;
match senders {
Some(senders) => {
Expand All @@ -349,7 +345,7 @@ impl<T, S, L> Pool<T, S, L> where
}

/// Returns a transaction if it's part of the pool or `None` otherwise.
pub fn find(&self, hash: &H256) -> Option<Arc<T>> {
pub fn find(&self, hash: &T::Hash) -> Option<Arc<T>> {
self.by_hash.get(hash).cloned()
}

Expand All @@ -368,7 +364,7 @@ impl<T, S, L> Pool<T, S, L> where
}

/// Returns pending (ready) transactions from given sender.
pub fn pending_from_sender<R: Ready<T>>(&self, ready: R, sender: &Sender) -> PendingIterator<T, R, S, L> {
pub fn pending_from_sender<R: Ready<T>>(&self, ready: R, sender: &T::Sender) -> PendingIterator<T, R, S, L> {
let best_transactions = self.transactions.get(sender)
.and_then(|transactions| transactions.worst_and_best())
.map(|(_, best)| ScoreWithRef::new(best.0, best.1))
Expand All @@ -387,7 +383,7 @@ impl<T, S, L> Pool<T, S, L> where
}

/// Update score of transactions of a particular sender.
pub fn update_scores(&mut self, sender: &Sender, event: S::Event) {
pub fn update_scores(&mut self, sender: &T::Sender, event: S::Event) {
let res = if let Some(set) = self.transactions.get_mut(sender) {
let prev = set.worst_and_best();
set.update_scores(&self.scoring, event);
Expand All @@ -410,7 +406,7 @@ impl<T, S, L> Pool<T, S, L> where
let len = transactions.len();
for (idx, tx) in transactions.iter().enumerate() {
match ready.is_ready(tx) {
Readiness::Stalled => status.stalled += 1,
Readiness::Stale => status.stalled += 1,
Readiness::Ready => status.pending += 1,
Readiness::Future => {
status.future += len - idx;
Expand Down
4 changes: 2 additions & 2 deletions transaction-pool/src/ready.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
/// Transaction readiness.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Readiness {
/// The transaction is stalled (and should/will be removed from the pool).
Stalled,
/// The transaction is stalle (and should/will be removed from the pool).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/stalle/stale

Stale,
/// The transaction is ready to be included in pending set.
Ready,
/// The transaction is not yet ready.
Expand Down
6 changes: 3 additions & 3 deletions transaction-pool/src/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
use std::cmp;
use std::collections::HashMap;

use ethereum_types::U256;
use {scoring, Scoring, Ready, Readiness, Address as Sender};
use ethereum_types::{H160 as Sender, U256};
use {scoring, Scoring, Ready, Readiness};
use super::{Transaction, SharedTransaction};

#[derive(Debug, Default)]
Expand Down Expand Up @@ -84,7 +84,7 @@ impl Ready<Transaction> for NonceReady {
*nonce = *nonce + 1.into();
Readiness::Ready
},
cmp::Ordering::Less => Readiness::Stalled,
cmp::Ordering::Less => Readiness::Stale,
}
}
}
11 changes: 7 additions & 4 deletions transaction-pool/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ pub struct Transaction {
}

impl VerifiedTransaction for Transaction {
type Hash = H256;
type Sender = Address;

fn hash(&self) -> &H256 { &self.hash }
fn mem_usage(&self) -> usize { self.mem_usage }
fn sender(&self) -> &Address { &self.sender }
Expand Down Expand Up @@ -123,7 +126,7 @@ fn should_reject_if_above_count() {
// Reject second
let tx1 = b.tx().nonce(0).new();
let tx2 = b.tx().nonce(1).new();
let hash = *tx2.hash();
let hash = format!("{:?}", tx2.hash());
txq.import(tx1).unwrap();
assert_eq!(txq.import(tx2).unwrap_err().kind(), &error::ErrorKind::TooCheapToEnter(hash, "0x0".into()));
assert_eq!(txq.light_status().transaction_count, 1);
Expand All @@ -149,7 +152,7 @@ fn should_reject_if_above_mem_usage() {
// Reject second
let tx1 = b.tx().nonce(1).mem_usage(1).new();
let tx2 = b.tx().nonce(2).mem_usage(2).new();
let hash = *tx2.hash();
let hash = format!("{:?}", tx2.hash());
txq.import(tx1).unwrap();
assert_eq!(txq.import(tx2).unwrap_err().kind(), &error::ErrorKind::TooCheapToEnter(hash, "0x0".into()));
assert_eq!(txq.light_status().transaction_count, 1);
Expand All @@ -175,7 +178,7 @@ fn should_reject_if_above_sender_count() {
// Reject second
let tx1 = b.tx().nonce(1).new();
let tx2 = b.tx().nonce(2).new();
let hash = *tx2.hash();
let hash = format!("{:?}", tx2.hash());
txq.import(tx1).unwrap();
assert_eq!(txq.import(tx2).unwrap_err().kind(), &error::ErrorKind::TooCheapToEnter(hash, "0x0".into()));
assert_eq!(txq.light_status().transaction_count, 1);
Expand All @@ -185,7 +188,7 @@ fn should_reject_if_above_sender_count() {
// Replace first
let tx1 = b.tx().nonce(1).new();
let tx2 = b.tx().nonce(2).gas_price(2).new();
let hash = *tx2.hash();
let hash = format!("{:?}", tx2.hash());
txq.import(tx1).unwrap();
// This results in error because we also compare nonces
assert_eq!(txq.import(tx2).unwrap_err().kind(), &error::ErrorKind::TooCheapToEnter(hash, "0x0".into()));
Expand Down
2 changes: 1 addition & 1 deletion transaction-pool/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ impl<T: fmt::Debug, S: Scoring<T>> Transactions<T, S> {
let mut first_non_stalled = 0;
for tx in &self.transactions {
match ready.is_ready(tx) {
Readiness::Stalled => {
Readiness::Stale => {
first_non_stalled += 1;
},
Readiness::Ready | Readiness::Future => break,
Expand Down