Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fatxpool: do not use individual transaction listeners #7316

Merged
merged 21 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
1985b2a
some ground work
michalkucharczyk Jan 23, 2025
bcb2876
MultiViewListener meat
michalkucharczyk Jan 23, 2025
2b18e08
integrated
michalkucharczyk Jan 23, 2025
588bc43
multi_view_listener: enum cleanup
michalkucharczyk Jan 24, 2025
fa06458
listener: improvements
michalkucharczyk Jan 24, 2025
44caaa0
Merge remote-tracking branch 'origin/master' into mku-txpool-register…
michalkucharczyk Jan 27, 2025
573fb47
todos removed (5495)
michalkucharczyk Jan 27, 2025
d872d3e
listener: doc
michalkucharczyk Jan 27, 2025
e103f48
flaky tests fixed
michalkucharczyk Jan 27, 2025
e4f3d36
log removed
michalkucharczyk Jan 27, 2025
6f75df0
removed redundant method
michalkucharczyk Jan 27, 2025
06d54d2
some watched-flag related cleanup here and there
michalkucharczyk Jan 28, 2025
88595de
trace -> debug: per transaction logs
michalkucharczyk Jan 29, 2025
4dfe0f5
Merge branch 'master' into mku-txpool-register-listeners
michalkucharczyk Jan 29, 2025
cb7efe1
Update substrate/client/transaction-pool/src/fork_aware_txpool/multi_…
michalkucharczyk Jan 30, 2025
a39450c
review comments applied
michalkucharczyk Jan 30, 2025
849acc3
Merge remote-tracking branch 'origin/master' into mku-txpool-register…
michalkucharczyk Jan 31, 2025
b961638
fix
michalkucharczyk Jan 31, 2025
bd5fab7
review comments applied
michalkucharczyk Feb 3, 2025
e907960
txpool maintain log improved
michalkucharczyk Feb 4, 2025
c15f33a
Merge remote-tracking branch 'origin/master' into mku-txpool-register…
michalkucharczyk Feb 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions substrate/client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use std::{
use codec::{Decode, Encode};
use futures::{pin_mut, FutureExt, StreamExt};
use jsonrpsee::RpcModule;
use log::{debug, error, warn};
use log::{debug, error, trace, warn};
use sc_client_api::{blockchain::HeaderBackend, BlockBackend, BlockchainEvents, ProofProvider};
use sc_network::{
config::MultiaddrWithPeerId, service::traits::NetworkService, NetworkBackend, NetworkBlock,
Expand Down Expand Up @@ -538,7 +538,7 @@ where
{
Ok(_) => {
let elapsed = start.elapsed();
debug!(target: sc_transaction_pool::LOG_TARGET, "import transaction: {elapsed:?}");
trace!(target: sc_transaction_pool::LOG_TARGET, "import transaction: {elapsed:?}");
TransactionImport::NewGood
},
Err(e) => match e.into_pool_error() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub enum DroppedReason<Hash> {
}

/// Dropped-logic related event from the single view.
pub type ViewStreamEvent<C> = crate::graph::DroppedByLimitsEvent<ExtrinsicHash<C>, BlockHash<C>>;
pub type ViewStreamEvent<C> = crate::graph::TransactionStatusEvent<ExtrinsicHash<C>, BlockHash<C>>;

/// Dropped-logic stream of events coming from the single view.
type ViewStream<C> = Pin<Box<dyn futures::Stream<Item = ViewStreamEvent<C>> + Send>>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use super::{
import_notification_sink::MultiViewImportNotificationSink,
metrics::MetricsLink as PrometheusMetrics,
multi_view_listener::MultiViewListener,
tx_mem_pool::{InsertionInfo, TxInMemPool, TxMemPool, TXMEMPOOL_TRANSACTION_LIMIT_MULTIPLIER},
tx_mem_pool::{InsertionInfo, TxMemPool, TXMEMPOOL_TRANSACTION_LIMIT_MULTIPLIER},
view::View,
view_store::ViewStore,
};
Expand Down Expand Up @@ -192,7 +192,9 @@ where
future_limits: crate::PoolLimit,
mempool_max_transactions_count: usize,
) -> (Self, ForkAwareTxPoolTask) {
let listener = Arc::from(MultiViewListener::new());
let (listener, listener_task) = MultiViewListener::new_with_worker();
let listener = Arc::new(listener);

let (import_notification_sink, import_notification_sink_task) =
MultiViewImportNotificationSink::new_with_worker();

Expand All @@ -219,6 +221,7 @@ where

let combined_tasks = async move {
tokio::select! {
_ = listener_task => {},
_ = import_notification_sink_task => {},
_ = dropped_monitor_task => {}
}
Expand Down Expand Up @@ -274,12 +277,7 @@ where
DroppedReason::Usurped(new_tx_hash) => {
if let Some(new_tx) = mempool.get_by_hash(new_tx_hash) {
view_store
.replace_transaction(
new_tx.source(),
new_tx.tx(),
dropped_tx_hash,
new_tx.is_watched(),
)
.replace_transaction(new_tx.source(), new_tx.tx(), dropped_tx_hash)
.await;
} else {
log::trace!(
Expand Down Expand Up @@ -312,7 +310,10 @@ where
finalized_hash: Block::Hash,
) -> Self {
let metrics = PrometheusMetrics::new(prometheus);
let listener = Arc::from(MultiViewListener::new());

let (listener, listener_task) = MultiViewListener::new_with_worker();
let listener = Arc::new(listener);

let (revalidation_queue, revalidation_task) =
revalidation_worker::RevalidationQueue::new_with_worker();

Expand Down Expand Up @@ -341,6 +342,7 @@ where

let combined_tasks = async move {
tokio::select! {
_ = listener_task => {}
_ = revalidation_task => {},
_ = import_notification_sink_task => {},
_ = dropped_monitor_task => {}
Expand Down Expand Up @@ -649,7 +651,7 @@ where
xts: Vec<TransactionFor<Self>>,
) -> Result<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error> {
let view_store = self.view_store.clone();
log::debug!(target: LOG_TARGET, "fatp::submit_at count:{} views:{}", xts.len(), self.active_views_count());
log::trace!(target: LOG_TARGET, "fatp::submit_at count:{} views:{}", xts.len(), self.active_views_count());
log_xt_trace!(target: LOG_TARGET, xts.iter().map(|xt| self.tx_hash(xt)), "[{:?}] fatp::submit_at");
let xts = xts.into_iter().map(Arc::from).collect::<Vec<_>>();
let mempool_results = self.mempool.extend_unwatched(source, &xts);
Expand Down Expand Up @@ -1019,6 +1021,7 @@ where
)
};

let start = Instant::now();
// 1. Capture all import notification from the very beginning, so first register all
//the listeners.
self.import_notification_sink.add_view(
Expand All @@ -1031,26 +1034,25 @@ where
view.pool.validated_pool().create_dropped_by_limits_stream().boxed(),
);

let start = Instant::now();
let watched_xts = self.register_listeners(&mut view).await;
let duration = start.elapsed();
self.view_store.listener.add_view_aggregated_stream(
view.at.hash,
view.pool.validated_pool().create_aggregated_stream().boxed(),
);
// sync the transactions statuses and referencing views in all the listeners with newly
// cloned view.
view.pool.validated_pool().retrigger_notifications();
log::debug!(target: LOG_TARGET, "register_listeners: at {at:?} took {duration:?}");
log::debug!(target: LOG_TARGET, "register listeners: at {at:?} took {:?}", start.elapsed());

// 2. Handle transactions from the tree route. Pruning transactions from the view first
// will make some space for mempool transactions in case we are at the view's limits.
let start = Instant::now();
self.update_view_with_fork(&view, tree_route, at.clone()).await;
let duration = start.elapsed();
log::debug!(target: LOG_TARGET, "update_view_with_fork: at {at:?} took {duration:?}");
log::debug!(target: LOG_TARGET, "update_view_with_fork: at {at:?} took {:?}", start.elapsed());

// 3. Finally, submit transactions from the mempool.
let start = Instant::now();
self.update_view_with_mempool(&mut view, watched_xts).await;
let duration = start.elapsed();
log::debug!(target: LOG_TARGET, "update_view_with_mempool: at {at:?} took {duration:?}");
self.update_view_with_mempool(&mut view).await;
log::debug!(target: LOG_TARGET, "update_view_with_mempool: at {at:?} took {:?}", start.elapsed());

let view = Arc::from(view);
self.view_store.insert_new_view(view.clone(), tree_route).await;
Expand Down Expand Up @@ -1096,48 +1098,6 @@ where
all_extrinsics
}

/// For every watched transaction in the mempool registers a transaction listener in the view.
///
/// The transaction listener for a given view is also added to multi-view listener. This allows
/// to track aggreagated progress of the transaction within the transaction pool.
///
/// Function returns a list of currently watched transactions in the mempool.
async fn register_listeners(
&self,
view: &View<ChainApi>,
) -> Vec<(ExtrinsicHash<ChainApi>, Arc<TxInMemPool<ChainApi, Block>>)> {
log::debug!(
target: LOG_TARGET,
"register_listeners: {:?} xts:{:?} v:{}",
view.at,
self.mempool.unwatched_and_watched_count(),
self.active_views_count()
);

//todo [#5495]: maybe we don't need to register listener in view? We could use
// multi_view_listener.transaction_in_block
let results = self
.mempool
.clone_watched()
.into_iter()
.map(|(tx_hash, tx)| {
let watcher = view.create_watcher(tx_hash);
let at = view.at.clone();
async move {
log::trace!(target: LOG_TARGET, "[{:?}] adding watcher {:?}", tx_hash, at.hash);
self.view_store.listener.add_view_watcher_for_tx(
tx_hash,
at.hash,
watcher.into_stream().boxed(),
);
(tx_hash, tx)
}
})
.collect::<Vec<_>>();

future::join_all(results).await
}

/// Updates the given view with the transactions from the internal mempol.
///
/// All transactions from the mempool (excluding those which are either already imported or
Expand All @@ -1147,15 +1107,7 @@ where
/// If there are no views, and mempool transaction is reported as invalid for the given view,
/// the transaction is reported as invalid and removed from the mempool. This does not apply to
/// stale and temporarily banned transactions.
///
/// As the listeners for watched transactions were registered at the very beginning of maintain
/// procedure (`register_listeners`), this function accepts the list of watched transactions
/// from the mempool for which listener was actually registered to avoid submit/maintain races.
async fn update_view_with_mempool(
&self,
view: &View<ChainApi>,
watched_xts: Vec<(ExtrinsicHash<ChainApi>, Arc<TxInMemPool<ChainApi, Block>>)>,
) {
async fn update_view_with_mempool(&self, view: &View<ChainApi>) {
log::debug!(
target: LOG_TARGET,
"update_view_with_mempool: {:?} xts:{:?} v:{}",
Expand All @@ -1165,15 +1117,16 @@ where
);
let included_xts = self.extrinsics_included_since_finalized(view.at.hash).await;

let (hashes, xts_filtered): (Vec<_>, Vec<_>) = watched_xts
let (hashes, xts_filtered): (Vec<_>, Vec<_>) = self
.mempool
.clone_transactions()
.into_iter()
.chain(self.mempool.clone_unwatched().into_iter())
.filter(|(hash, _)| !view.is_imported(hash))
.filter(|(hash, _)| !included_xts.contains(&hash))
.map(|(tx_hash, tx)| (tx_hash, (tx.source(), tx.tx())))
.unzip();

let watched_results = view
let results = view
.submit_many(xts_filtered)
.await
.into_iter()
Expand All @@ -1185,7 +1138,7 @@ where
})
.collect::<Vec<_>>();

let submitted_count = watched_results.len();
let submitted_count = results.len();

log::debug!(
target: LOG_TARGET,
Expand All @@ -1201,9 +1154,9 @@ where
// if there are no views yet, and a single newly created view is reporting error, just send
// out the invalid event, and remove transaction.
if self.view_store.is_empty() {
for result in watched_results {
for result in results {
if let Err(tx_hash) = result {
self.view_store.listener.invalidate_transactions(&[tx_hash]);
self.view_store.listener.transactions_invalidated(&[tx_hash]);
self.mempool.remove_transaction(&tx_hash);
}
}
Expand Down
49 changes: 28 additions & 21 deletions substrate/client/transaction-pool/src/fork_aware_txpool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@
//!
//! ### Multi-view listeners
//! There is a number of event streams that are provided by individual views:
//! - [transaction status][`Watcher`],
//! - aggregated stream of [transactions statuses][`AggregatedStream`] for all the transactions
//! within the view in the form of `(transaction-hash, status)` tuple,
//! - [ready notification][`vp::import_notification_stream`] (see [networking
//! section](#networking)),
//! - [dropped notification][`create_dropped_by_limits_stream`].
Expand All @@ -106,10 +107,11 @@
//! procedure. Additionally, it allows the pool to accept transactions when no blocks have been
//! reported yet.
//!
//! Since watched and non-watched transactions require a different treatment, the *mempool* keeps a
//! track on how the transaction was submitted. The [transaction source][`TransactionSource`] used
//! to submit transactions also needs to be kept in the *mempool*. The *mempool* transaction is a
//! simple [wrapper][`TxInMemPool`] around the [`Arc`] reference to the actual extrinsic body.
//! The *mempool* keeps a track on how the transaction was submitted - keeping number of watched and
//! non-watched transactions is useful for testing and metrics. The [transaction
//! source][`TransactionSource`] used to submit transactions also needs to be kept in the *mempool*.
//! The *mempool* transaction is a simple [wrapper][`TxInMemPool`] around the [`Arc`] reference to
//! the actual extrinsic body.
//!
//! Once the view is created, all transactions from *mempool* are submitted to and validated at this
//! view.
Expand Down Expand Up @@ -139,19 +141,22 @@
//!
//! The [`submit_and_watch`] function allows to submit the transaction and track its
//! [status][`TransactionStatus`] within the pool. Every view is providing an independent
//! [stream][`View::submit_and_watch`] of events, which needs to be merged into the single stream
//! exposed to the [external listener][`TransactionStatusStreamFor`]. For majority of events simple
//! forwarding of events would not work (e.g. we could get multiple [`Ready`] events, or [`Ready`] /
//! [`Future`] mix). Some additional stateful logic is required to filter and process the views'
//! events. It is also easier to trigger some events (e.g. [`Finalized`], [`Invalid`], and
//! [`Broadcast`]) using some side-channel and simply ignoring these events from the view. All the
//! before mentioned functionality is provided by the [`MultiViewListener`].
//!
//! When watched transaction is submitted to the pool it is added the *mempool* with watched
//! flag. The external stream for the transaction is created in a [`MultiViewListener`]. Then
//! transaction is submitted to every active [`View`] (using
//! [`submit_and_watch`][`View::submit_and_watch`]) and the resulting
//! views' stream is connected to the [`MultiViewListener`].
//! aggreagated [stream][`create_aggregated_stream`] of events for all transactions in this view,
//! which needs to be merged into the single stream exposed to the [external
//! listener][`TransactionStatusStreamFor`]. For majority of events simple forwarding of events
//! would not work (e.g. we could get multiple [`Ready`] events, or [`Ready`] / [`Future`] mix).
//! Some additional stateful logic is required to filter and process the views' events. It is also
//! easier to trigger some events (e.g. [`Finalized`], [`Invalid`], and [`Broadcast`]) using some
//! side-channel and simply ignoring these events from the view. All the before mentioned
//! functionality is provided by the [`MultiViewListener`].
//!
//! When a watched transaction is submitted to the pool it is added to the *mempool* with the
//! watched flag. The external stream for the transaction is created in a [`MultiViewListener`].
//! Then a transaction is submitted to every active [`View`] (using
//! [`submit_many`][`View::submit_many`]). The view's [aggregated
//! stream][`create_aggregated_stream`] was already connected to the [`MultiViewListener`] when new
//! view was created, so no additional action is required upon the submission. The view will provide
//! the required updates for all the transactions over this single stream.
//!
//! ### Maintain
//! The transaction pool exposes the [task][`notification_future`] that listens to the
Expand All @@ -169,8 +174,8 @@
//! *mempool*
//! - all transactions from the *mempool* (with some obvious filtering applied) are submitted to
//! the view,
//! - for all watched transactions from the *mempool* the watcher is registered in the new view,
//! and it is connected to the multi-view-listener,
//! - the new [aggregated stream][`create_aggregated_stream`] of all transactions statuses is
//! created for the new view and it is connected to the multi-view-listener,
//! - [update the view][ForkAwareTxPool::update_view_with_fork] with the transactions from the [tree
//! route][`TreeRoute`] (which is computed from the recent best block to newly notified one by
//! [enactment state][`EnactmentState`] helper):
Expand Down Expand Up @@ -292,7 +297,7 @@
//! [`View`]: crate::fork_aware_txpool::view::View
//! [`view::revalidate`]: crate::fork_aware_txpool::view::View::revalidate
//! [`start_background_revalidation`]: crate::fork_aware_txpool::view::View::start_background_revalidation
//! [`View::submit_and_watch`]: crate::fork_aware_txpool::view::View::submit_and_watch
//! [`View::submit_many`]: crate::fork_aware_txpool::view::View::submit_many
//! [`ViewStore`]: crate::fork_aware_txpool::view_store::ViewStore
//! [`finish_background_revalidations`]: crate::fork_aware_txpool::view_store::ViewStore::finish_background_revalidations
//! [find_best_view]: crate::fork_aware_txpool::view_store::ViewStore::find_best_view
Expand All @@ -305,10 +310,12 @@
//! [`MultiViewListener`]: crate::fork_aware_txpool::multi_view_listener::MultiViewListener
//! [`Pool`]: crate::graph::Pool
//! [`Watcher`]: crate::graph::watcher::Watcher
//! [`AggregatedStream`]: crate::graph::AggregatedStream
//! [`Options`]: crate::graph::Options
//! [`vp::import_notification_stream`]: ../graph/validated_pool/struct.ValidatedPool.html#method.import_notification_stream
//! [`vp::enforce_limits`]: ../graph/validated_pool/struct.ValidatedPool.html#method.enforce_limits
//! [`create_dropped_by_limits_stream`]: ../graph/validated_pool/struct.ValidatedPool.html#method.create_dropped_by_limits_stream
//! [`create_aggregated_stream`]: ../graph/validated_pool/struct.ValidatedPool.html#method.create_aggregated_stream
//! [`ChainEvent`]: sc_transaction_pool_api::ChainEvent
//! [`TransactionStatusStreamFor`]: sc_transaction_pool_api::TransactionStatusStreamFor
//! [`api_submit`]: sc_transaction_pool_api::TransactionPool::submit_at
Expand Down
Loading
Loading