Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fatxpool: do not use individual transaction listeners #7316

Merged
merged 21 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
1985b2a
some ground work
michalkucharczyk Jan 23, 2025
bcb2876
MultiViewListener meat
michalkucharczyk Jan 23, 2025
2b18e08
integrated
michalkucharczyk Jan 23, 2025
588bc43
multi_view_listener: enum cleanup
michalkucharczyk Jan 24, 2025
fa06458
listener: improvements
michalkucharczyk Jan 24, 2025
44caaa0
Merge remote-tracking branch 'origin/master' into mku-txpool-register…
michalkucharczyk Jan 27, 2025
573fb47
todos removed (5495)
michalkucharczyk Jan 27, 2025
d872d3e
listener: doc
michalkucharczyk Jan 27, 2025
e103f48
flaky tests fixed
michalkucharczyk Jan 27, 2025
e4f3d36
log removed
michalkucharczyk Jan 27, 2025
6f75df0
removed redundant method
michalkucharczyk Jan 27, 2025
06d54d2
some watched-flag related cleanup here and there
michalkucharczyk Jan 28, 2025
88595de
trace -> debug: per transaction logs
michalkucharczyk Jan 29, 2025
4dfe0f5
Merge branch 'master' into mku-txpool-register-listeners
michalkucharczyk Jan 29, 2025
cb7efe1
Update substrate/client/transaction-pool/src/fork_aware_txpool/multi_…
michalkucharczyk Jan 30, 2025
a39450c
review comments applied
michalkucharczyk Jan 30, 2025
849acc3
Merge remote-tracking branch 'origin/master' into mku-txpool-register…
michalkucharczyk Jan 31, 2025
b961638
fix
michalkucharczyk Jan 31, 2025
bd5fab7
review comments applied
michalkucharczyk Feb 3, 2025
e907960
txpool maintain log improved
michalkucharczyk Feb 4, 2025
c15f33a
Merge remote-tracking branch 'origin/master' into mku-txpool-register…
michalkucharczyk Feb 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions substrate/client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use std::{
use codec::{Decode, Encode};
use futures::{pin_mut, FutureExt, StreamExt};
use jsonrpsee::RpcModule;
use log::{debug, error, warn};
use log::{debug, error, trace, warn};
use sc_client_api::{blockchain::HeaderBackend, BlockBackend, BlockchainEvents, ProofProvider};
use sc_network::{
config::MultiaddrWithPeerId, service::traits::NetworkService, NetworkBackend, NetworkBlock,
Expand Down Expand Up @@ -538,7 +538,7 @@ where
{
Ok(_) => {
let elapsed = start.elapsed();
debug!(target: sc_transaction_pool::LOG_TARGET, "import transaction: {elapsed:?}");
trace!(target: sc_transaction_pool::LOG_TARGET, "import transaction: {elapsed:?}");
TransactionImport::NewGood
},
Err(e) => match e.into_pool_error() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub enum DroppedReason<Hash> {
}

/// Dropped-logic related event from the single view.
pub type ViewStreamEvent<C> = crate::graph::DroppedByLimitsEvent<ExtrinsicHash<C>, BlockHash<C>>;
pub type ViewStreamEvent<C> = crate::graph::TransactionStatusEvent<ExtrinsicHash<C>, BlockHash<C>>;

/// Dropped-logic stream of events coming from the single view.
type ViewStream<C> = Pin<Box<dyn futures::Stream<Item = ViewStreamEvent<C>> + Send>>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use super::{
import_notification_sink::MultiViewImportNotificationSink,
metrics::MetricsLink as PrometheusMetrics,
multi_view_listener::MultiViewListener,
tx_mem_pool::{InsertionInfo, TxInMemPool, TxMemPool, TXMEMPOOL_TRANSACTION_LIMIT_MULTIPLIER},
tx_mem_pool::{InsertionInfo, TxMemPool, TXMEMPOOL_TRANSACTION_LIMIT_MULTIPLIER},
view::View,
view_store::ViewStore,
};
Expand Down Expand Up @@ -193,7 +193,9 @@ where
future_limits: crate::PoolLimit,
mempool_max_transactions_count: usize,
) -> (Self, ForkAwareTxPoolTask) {
let listener = Arc::from(MultiViewListener::new());
let (listener, listener_task) = MultiViewListener::new_with_worker();
let listener = Arc::new(listener);

let (import_notification_sink, import_notification_sink_task) =
MultiViewImportNotificationSink::new_with_worker();

Expand All @@ -220,6 +222,7 @@ where

let combined_tasks = async move {
tokio::select! {
_ = listener_task => {},
_ = import_notification_sink_task => {},
_ = dropped_monitor_task => {}
}
Expand Down Expand Up @@ -279,14 +282,7 @@ where
match dropped.reason {
DroppedReason::Usurped(new_tx_hash) => {
if let Some(new_tx) = mempool.get_by_hash(new_tx_hash) {
view_store
.replace_transaction(
new_tx.source(),
new_tx.tx(),
tx_hash,
new_tx.is_watched(),
)
.await;
view_store.replace_transaction(new_tx.source(), new_tx.tx(), tx_hash).await;
} else {
trace!(
target: LOG_TARGET,
Expand Down Expand Up @@ -318,7 +314,10 @@ where
finalized_hash: Block::Hash,
) -> Self {
let metrics = PrometheusMetrics::new(prometheus);
let listener = Arc::from(MultiViewListener::new());

let (listener, listener_task) = MultiViewListener::new_with_worker();
let listener = Arc::new(listener);

let (revalidation_queue, revalidation_task) =
revalidation_worker::RevalidationQueue::new_with_worker();

Expand Down Expand Up @@ -347,6 +346,7 @@ where

let combined_tasks = async move {
tokio::select! {
_ = listener_task => {}
_ = revalidation_task => {},
_ = import_notification_sink_task => {},
_ = dropped_monitor_task => {}
Expand Down Expand Up @@ -1077,6 +1077,7 @@ where
)
};

let start = Instant::now();
// 1. Capture all import notification from the very beginning, so first register all
//the listeners.
self.import_notification_sink.add_view(
Expand All @@ -1089,39 +1090,38 @@ where
view.pool.validated_pool().create_dropped_by_limits_stream().boxed(),
);

let start = Instant::now();
let watched_xts = self.register_listeners(&mut view).await;
let duration = start.elapsed();
self.view_store.listener.add_view_aggregated_stream(
view.at.hash,
view.pool.validated_pool().create_aggregated_stream().boxed(),
);
// sync the transactions statuses and referencing views in all the listeners with newly
// cloned view.
view.pool.validated_pool().retrigger_notifications();
debug!(
target: LOG_TARGET,
?at,
?duration,
duration = ?start.elapsed(),
"register_listeners"
);

// 2. Handle transactions from the tree route. Pruning transactions from the view first
// will make some space for mempool transactions in case we are at the view's limits.
let start = Instant::now();
self.update_view_with_fork(&view, tree_route, at.clone()).await;
let duration = start.elapsed();
debug!(
target: LOG_TARGET,
?at,
?duration,
duration = ?start.elapsed(),
"update_view_with_fork"
);

// 3. Finally, submit transactions from the mempool.
let start = Instant::now();
self.update_view_with_mempool(&mut view, watched_xts).await;
let duration = start.elapsed();
self.update_view_with_mempool(&mut view).await;
debug!(
target: LOG_TARGET,
?at,
?duration,
duration= ?start.elapsed(),
"update_view_with_mempool"
);
let view = Arc::from(view);
Expand Down Expand Up @@ -1173,53 +1173,6 @@ where
all_extrinsics
}

/// For every watched transaction in the mempool registers a transaction listener in the view.
///
/// The transaction listener for a given view is also added to multi-view listener. This allows
/// to track aggreagated progress of the transaction within the transaction pool.
///
/// Function returns a list of currently watched transactions in the mempool.
async fn register_listeners(
&self,
view: &View<ChainApi>,
) -> Vec<(ExtrinsicHash<ChainApi>, Arc<TxInMemPool<ChainApi, Block>>)> {
debug!(
target: LOG_TARGET,
view_at = ?view.at,
xts_count = ?self.mempool.unwatched_and_watched_count(),
active_views_count = self.active_views_count(),
"register_listeners"
);

//todo [#5495]: maybe we don't need to register listener in view? We could use
// multi_view_listener.transaction_in_block
let results = self
.mempool
.clone_watched()
.into_iter()
.map(|(tx_hash, tx)| {
let watcher = view.create_watcher(tx_hash);
let at = view.at.clone();
async move {
trace!(
target: LOG_TARGET,
?tx_hash,
at = ?at.hash,
"adding watcher"
);
self.view_store.listener.add_view_watcher_for_tx(
tx_hash,
at.hash,
watcher.into_stream().boxed(),
);
(tx_hash, tx)
}
})
.collect::<Vec<_>>();

future::join_all(results).await
}

/// Updates the given view with the transactions from the internal mempol.
///
/// All transactions from the mempool (excluding those which are either already imported or
Expand All @@ -1229,15 +1182,7 @@ where
/// If there are no views, and mempool transaction is reported as invalid for the given view,
/// the transaction is reported as invalid and removed from the mempool. This does not apply to
/// stale and temporarily banned transactions.
///
/// As the listeners for watched transactions were registered at the very beginning of maintain
/// procedure (`register_listeners`), this function accepts the list of watched transactions
/// from the mempool for which listener was actually registered to avoid submit/maintain races.
async fn update_view_with_mempool(
&self,
view: &View<ChainApi>,
watched_xts: Vec<(ExtrinsicHash<ChainApi>, Arc<TxInMemPool<ChainApi, Block>>)>,
) {
async fn update_view_with_mempool(&self, view: &View<ChainApi>) {
debug!(
target: LOG_TARGET,
view_at = ?view.at,
Expand All @@ -1247,15 +1192,16 @@ where
);
let included_xts = self.extrinsics_included_since_finalized(view.at.hash).await;

let (hashes, xts_filtered): (Vec<_>, Vec<_>) = watched_xts
let (hashes, xts_filtered): (Vec<_>, Vec<_>) = self
.mempool
.clone_transactions()
.into_iter()
.chain(self.mempool.clone_unwatched().into_iter())
.filter(|(hash, _)| !view.is_imported(hash))
.filter(|(hash, _)| !included_xts.contains(&hash))
.map(|(tx_hash, tx)| (tx_hash, (tx.source(), tx.tx())))
.unzip();

let watched_results = view
let results = view
.submit_many(xts_filtered)
.await
.into_iter()
Expand All @@ -1267,7 +1213,7 @@ where
})
.collect::<Vec<_>>();

let submitted_count = watched_results.len();
let submitted_count = results.len();

debug!(
target: LOG_TARGET,
Expand All @@ -1283,9 +1229,9 @@ where
// if there are no views yet, and a single newly created view is reporting error, just send
// out the invalid event, and remove transaction.
if self.view_store.is_empty() {
for result in watched_results {
for result in results {
if let Err(tx_hash) = result {
self.view_store.listener.invalidate_transactions(&[tx_hash]);
self.view_store.listener.transactions_invalidated(&[tx_hash]);
self.mempool.remove_transaction(&tx_hash);
}
}
Expand Down Expand Up @@ -1619,9 +1565,9 @@ where

info!(
target: LOG_TARGET,
mempool_len = format!("{:?}", self.mempool_len()),
txs = ?self.mempool_len(),
active_views_count = self.active_views_count(),
views_stats = ?self.views_stats(),
views = ?self.views_stats(),
?event,
?duration,
"maintain"
Expand Down
Loading
Loading