Skip to content

Commit

Permalink
feat: remove template blocking call (#6220)
Browse files Browse the repository at this point in the history
Description
---
Removed blocking call from mempool retrieve template request.

Motivation and Context
---
See #6219 

How Has This Been Tested?
---
System-level testing with a big mempool

With a fairly large mempool going 30 blocks deep, these are the results,
nice and fast responses from the mempool:
```rust
2024-03-19 11:58:23.819942200 [c::mp::mempool] DEBUG Retrieved 4 highest priority transaction(s) from the mempool in 116µs ms
2024-03-19 11:58:43.196219300 [c::mp::mempool] DEBUG Retrieved 4 highest priority transaction(s) from the mempool in 342ms ms
2024-03-19 12:00:25.633493600 [c::mp::mempool] DEBUG Retrieved 4 highest priority transaction(s) from the mempool in 155µs ms
2024-03-19 12:00:41.847437000 [c::mp::mempool] DEBUG Retrieved 4 highest priority transaction(s) from the mempool in 39ms ms
2024-03-19 12:04:41.302614400 [c::mp::mempool] DEBUG Retrieved 4 highest priority transaction(s) from the mempool in 172ms ms
2024-03-19 12:04:57.957985300 [c::mp::mempool] DEBUG Retrieved 4 highest priority transaction(s) from the mempool in 168µs ms
2024-03-19 12:05:49.060040100 [c::mp::mempool] DEBUG Retrieved 4 highest priority transaction(s) from the mempool in 114µs ms
2024-03-19 12:05:58.040083300 [c::mp::mempool] DEBUG Retrieved 4 highest priority transaction(s) from the mempool in 198µs ms
2024-03-19 12:08:47.903942500 [c::mp::mempool] DEBUG Retrieved 4 highest priority transaction(s) from the mempool in 169µs ms
```
The mempool stats for the period:
```rust
11:58 v1.0.0-pre.11a esmeralda State: Listening Tip: 2427 (Tue, 19 Mar 2024 09:58:14 +0000) Mempool: 80tx (2281440g, +/- 18blks) Connections: 12|10 Banned: 0 Messages (last 60s): 64 Rpc: 55/10000 RandomX: #2 with flags FLAG_HARD_AES | FLAG_JIT | FLAG_ARGON2_SSSE3 | FLAG_ARGON2_AVX2 | FLAG_ARGON2
11:59 v1.0.0-pre.11a esmeralda State: Listening Tip: 2427 (Tue, 19 Mar 2024 09:58:14 +0000) Mempool: 83tx (2366994g, +/- 19blks) Connections: 12|10 Banned: 0 Messages (last 60s): 68 Rpc: 57/10000
12:00 v1.0.0-pre.11a esmeralda State: Listening Tip: 2427 (Tue, 19 Mar 2024 09:58:14 +0000) Mempool: 91tx (2595138g, +/- 21blks) Connections: 13|10 Banned: 0 Messages (last 60s): 56 Rpc: 55/10000
12:01 v1.0.0-pre.11a esmeralda State: Listening Tip: 2429 (Tue, 19 Mar 2024 10:00:27 +0000) Mempool: 90tx (2566620g, +/- 21blks) Connections: 12|10 Banned: 0 Messages (last 60s): 63 Rpc: 56/10000
12:02 v1.0.0-pre.11a esmeralda State: Listening Tip: 2429 (Tue, 19 Mar 2024 10:00:27 +0000) Mempool: 97tx (2766246g, +/- 22blks) Connections: 11|10 Banned: 0 Messages (last 60s): 45 Rpc: 55/10000
12:03 v1.0.0-pre.11a esmeralda State: Listening Tip: 2429 (Tue, 19 Mar 2024 10:00:27 +0000) Mempool: 104tx (2965872g, +/- 24blks) Connections: 12|10 Banned: 0 Messages (last 60s): 50 Rpc: 55/10000
12:04 v1.0.0-pre.11a esmeralda State: Listening Tip: 2429 (Tue, 19 Mar 2024 10:00:27 +0000) Mempool: 114tx (3251052g, +/- 26blks) Connections: 13|10 Banned: 0 Messages (last 60s): 55 Rpc: 55/10000
12:05 v1.0.0-pre.11a esmeralda State: Listening Tip: 2431 (Tue, 19 Mar 2024 10:04:41 +0000) Mempool: 117tx (3336606g, +/- 27blks) Connections: 13|10 Banned: 0 Messages (last 60s): 68 Rpc: 90/10000
12:06 v1.0.0-pre.11a esmeralda State: Listening Tip: 2433 (Tue, 19 Mar 2024 10:05:49 +0000) Mempool: 118tx (3365124g, +/- 27blks) Connections: 12|10 Banned: 0 Messages (last 60s): 67 Rpc: 91/10000
12:07 v1.0.0-pre.11a esmeralda State: Listening Tip: 2433 (Tue, 19 Mar 2024 10:05:49 +0000) Mempool: 128tx (3650304g, +/- 29blks) Connections: 10|10 Banned: 0 Messages (last 60s): 40 Rpc: 90/10000
12:08 v1.0.0-pre.11a esmeralda State: Listening Tip: 2433 (Tue, 19 Mar 2024 10:05:49 +0000) Mempool: 134tx (3821412g, +/- 30blks) Connections: 12|10 Banned: 0 Messages (last 60s): 45 Rpc: 90/10000
```

What process can a PR reviewer use to test or verify this change?
---
Review code changes

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be included exactly as is in the CHANGELOG, so please think
about it carefully. -->


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify

<!-- Does this include a breaking change? If so, include this line as a
footer -->
<!-- BREAKING CHANGE: Description what the user should do, e.g. delete a
database, resync the chain -->
  • Loading branch information
hansieodendaal authored Mar 20, 2024
1 parent 019a909 commit 01d79e0
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 29 deletions.
32 changes: 30 additions & 2 deletions base_layer/core/src/mempool/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

use std::sync::{Arc, RwLock};

use log::debug;
use tari_common_types::types::{PrivateKey, Signature};
use tokio::task;

Expand All @@ -41,6 +42,8 @@ use crate::{
validation::TransactionValidator,
};

pub const LOG_TARGET: &str = "c::mp::mempool";

/// The Mempool consists of an Unconfirmed Transaction Pool, Pending Pool, Orphan Pool and Reorg Pool and is responsible
/// for managing and maintaining all unconfirmed transactions that have not yet been included in a block, and
/// transactions that have recently been included in a block.
Expand Down Expand Up @@ -117,8 +120,33 @@ impl Mempool {
/// Returns a list of transaction ranked by transaction priority up to a given weight.
/// Only transactions that fit into a block will be returned
pub async fn retrieve(&self, total_weight: u64) -> Result<Vec<Arc<Transaction>>, MempoolError> {
self.with_write_access(move |storage| storage.retrieve_and_revalidate(total_weight))
.await
let start = std::time::Instant::now();
let retrieved = self
.with_read_access(move |storage| storage.retrieve(total_weight))
.await?;
debug!(
target: LOG_TARGET,
"Retrieved {} highest priority transaction(s) from the mempool in {:.0?} ms",
retrieved.retrieved_transactions.len(),
start.elapsed()
);

if !retrieved.transactions_to_remove_and_insert.is_empty() {
// we need to remove all transactions that need to be rechecked.
debug!(
target: LOG_TARGET,
"Removing {} transaction(s) from unconfirmed pool because they need re-evaluation",
retrieved.transactions_to_remove_and_insert.len()
);

let transactions_to_remove_and_insert = retrieved.transactions_to_remove_and_insert.clone();
self.with_write_access(move |storage| {
storage.remove_and_reinsert_transactions(transactions_to_remove_and_insert)
})
.await?;
}

Ok(retrieved.retrieved_transactions)
}

pub async fn retrieve_by_excess_sigs(
Expand Down
30 changes: 23 additions & 7 deletions base_layer/core/src/mempool/mempool_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::{
mempool::{
error::MempoolError,
reorg_pool::ReorgPool,
unconfirmed_pool::{UnconfirmedPool, UnconfirmedPoolError},
unconfirmed_pool::{RetrieveResults, TransactionKey, UnconfirmedPool, UnconfirmedPoolError},
FeePerGramStat,
MempoolConfig,
StateResponse,
Expand All @@ -52,7 +52,7 @@ pub const LOG_TARGET: &str = "c::mp::mempool_storage";
/// for managing and maintaining all unconfirmed transactions have not yet been included in a block, and transactions
/// that have recently been included in a block.
pub struct MempoolStorage {
unconfirmed_pool: UnconfirmedPool,
pub(crate) unconfirmed_pool: UnconfirmedPool,
reorg_pool: ReorgPool,
validator: Box<dyn TransactionValidator>,
rules: ConsensusManager,
Expand Down Expand Up @@ -156,6 +156,23 @@ impl MempoolStorage {
.transaction_weight_params()
}

/// Ensures that all transactions are safely deleted in order and from all storage and then
/// re-inserted
pub(crate) fn remove_and_reinsert_transactions(
&mut self,
transactions: Vec<(TransactionKey, Arc<Transaction>)>,
) -> Result<(), MempoolError> {
for (tx_key, _) in &transactions {
self.unconfirmed_pool
.remove_transaction(*tx_key)
.map_err(|e| MempoolError::InternalError(e.to_string()))?;
}
self.insert_txs(transactions.iter().map(|(_, tx)| tx.clone()).collect())
.map_err(|e| MempoolError::InternalError(e.to_string()))?;

Ok(())
}

// Insert a set of new transactions into the UTxPool.
fn insert_txs(&mut self, txs: Vec<Arc<Transaction>>) -> Result<(), UnconfirmedPoolError> {
for tx in txs {
Expand Down Expand Up @@ -285,11 +302,10 @@ impl MempoolStorage {

/// Returns a list of transaction ranked by transaction priority up to a given weight.
/// Will only return transactions that will fit into the given weight
pub fn retrieve_and_revalidate(&mut self, total_weight: u64) -> Result<Vec<Arc<Transaction>>, MempoolError> {
let results = self.unconfirmed_pool.fetch_highest_priority_txs(total_weight)?;
self.insert_txs(results.transactions_to_insert)
.map_err(|e| MempoolError::InternalError(e.to_string()))?;
Ok(results.retrieved_transactions)
pub fn retrieve(&self, total_weight: u64) -> Result<RetrieveResults, MempoolError> {
self.unconfirmed_pool
.fetch_highest_priority_txs(total_weight)
.map_err(|e| MempoolError::InternalError(e.to_string()))
}

pub fn retrieve_by_excess_sigs(
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/mempool/unconfirmed_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ mod unconfirmed_pool;
// Public re-exports
pub use error::UnconfirmedPoolError;
use tari_crypto::hash_domain;
pub use unconfirmed_pool::{UnconfirmedPool, UnconfirmedPoolConfig};
pub use unconfirmed_pool::{RetrieveResults, TransactionKey, UnconfirmedPool, UnconfirmedPoolConfig};

hash_domain!(
UnconfirmedPoolOutputTokenIdHashDomain,
Expand Down
28 changes: 9 additions & 19 deletions base_layer/core/src/mempool/unconfirmed_pool/unconfirmed_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use crate::{

pub const LOG_TARGET: &str = "c::mp::unconfirmed_pool::unconfirmed_pool_storage";

type TransactionKey = usize;
pub type TransactionKey = usize;

/// Configuration for the UnconfirmedPool
#[derive(Clone, Copy, Serialize, Deserialize, Debug)]
Expand Down Expand Up @@ -92,9 +92,10 @@ pub struct UnconfirmedPool {
}

// helper class to reduce type complexity
#[derive(Debug, Clone)]
pub struct RetrieveResults {
pub retrieved_transactions: Vec<Arc<Transaction>>,
pub transactions_to_insert: Vec<Arc<Transaction>>,
pub transactions_to_remove_and_insert: Vec<(TransactionKey, Arc<Transaction>)>,
}

pub type CompleteTransactionBranch = HashMap<TransactionKey, (HashMap<TransactionKey, Arc<Transaction>>, u64, u64)>;
Expand Down Expand Up @@ -183,7 +184,7 @@ impl UnconfirmedPool {

/// Returns a set of the highest priority unconfirmed transactions, that can be included in a block
#[allow(clippy::too_many_lines)]
pub fn fetch_highest_priority_txs(&mut self, total_weight: u64) -> Result<RetrieveResults, UnconfirmedPoolError> {
pub fn fetch_highest_priority_txs(&self, total_weight: u64) -> Result<RetrieveResults, UnconfirmedPoolError> {
// The process of selection is as follows:
// Assume that all transaction have the same weight for simplicity. A(20)->B(2) means A depends on B and A has
// fee 20 and B has fee 2. A(20)->B(2)->C(14), D(12)
Expand Down Expand Up @@ -297,24 +298,10 @@ impl UnconfirmedPool {
0,
)?;
}
if !transactions_to_remove_and_recheck.is_empty() {
// we need to remove all transactions that need to be rechecked.
debug!(
target: LOG_TARGET,
"Removing {} transaction(s) from unconfirmed pool because they need re-evaluation",
transactions_to_remove_and_recheck.len()
);
}
for (tx_key, _) in &transactions_to_remove_and_recheck {
self.remove_transaction(*tx_key)?;
}

let results = RetrieveResults {
retrieved_transactions: selected_txs.into_values().collect(),
transactions_to_insert: transactions_to_remove_and_recheck
.into_iter()
.map(|(_, tx)| tx)
.collect(),
transactions_to_remove_and_insert: transactions_to_remove_and_recheck,
};
Ok(results)
}
Expand Down Expand Up @@ -703,7 +690,10 @@ impl UnconfirmedPool {
}

/// Ensures that all transactions are safely deleted in order and from all storage
fn remove_transaction(&mut self, tx_key: TransactionKey) -> Result<Option<Arc<Transaction>>, UnconfirmedPoolError> {
pub(crate) fn remove_transaction(
&mut self,
tx_key: TransactionKey,
) -> Result<Option<Arc<Transaction>>, UnconfirmedPoolError> {
let prioritized_transaction = match self.tx_by_key.remove(&tx_key) {
Some(tx) => tx,
None => return Ok(None),
Expand Down

0 comments on commit 01d79e0

Please sign in to comment.