diff --git a/substrate/client/service/src/lib.rs b/substrate/client/service/src/lib.rs index 322726a1eff4..52a19da220c0 100644 --- a/substrate/client/service/src/lib.rs +++ b/substrate/client/service/src/lib.rs @@ -40,7 +40,7 @@ use std::{ use codec::{Decode, Encode}; use futures::{pin_mut, FutureExt, StreamExt}; use jsonrpsee::RpcModule; -use log::{debug, error, warn}; +use log::{debug, error, trace, warn}; use sc_client_api::{blockchain::HeaderBackend, BlockBackend, BlockchainEvents, ProofProvider}; use sc_network::{ config::MultiaddrWithPeerId, service::traits::NetworkService, NetworkBackend, NetworkBlock, @@ -538,7 +538,7 @@ where { Ok(_) => { let elapsed = start.elapsed(); - debug!(target: sc_transaction_pool::LOG_TARGET, "import transaction: {elapsed:?}"); + trace!(target: sc_transaction_pool::LOG_TARGET, "import transaction: {elapsed:?}"); TransactionImport::NewGood }, Err(e) => match e.into_pool_error() { diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs index 3588645344ba..be20a1608961 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs @@ -74,7 +74,7 @@ pub enum DroppedReason { } /// Dropped-logic related event from the single view. -pub type ViewStreamEvent = crate::graph::DroppedByLimitsEvent, BlockHash>; +pub type ViewStreamEvent = crate::graph::TransactionStatusEvent, BlockHash>; /// Dropped-logic stream of events coming from the single view. type ViewStream = Pin> + Send>>; diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs index c609ee2da22e..ffe6c20d92b7 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs @@ -23,7 +23,7 @@ use super::{ import_notification_sink::MultiViewImportNotificationSink, metrics::MetricsLink as PrometheusMetrics, multi_view_listener::MultiViewListener, - tx_mem_pool::{InsertionInfo, TxInMemPool, TxMemPool, TXMEMPOOL_TRANSACTION_LIMIT_MULTIPLIER}, + tx_mem_pool::{InsertionInfo, TxMemPool, TXMEMPOOL_TRANSACTION_LIMIT_MULTIPLIER}, view::View, view_store::ViewStore, }; @@ -193,7 +193,9 @@ where future_limits: crate::PoolLimit, mempool_max_transactions_count: usize, ) -> (Self, ForkAwareTxPoolTask) { - let listener = Arc::from(MultiViewListener::new()); + let (listener, listener_task) = MultiViewListener::new_with_worker(); + let listener = Arc::new(listener); + let (import_notification_sink, import_notification_sink_task) = MultiViewImportNotificationSink::new_with_worker(); @@ -220,6 +222,7 @@ where let combined_tasks = async move { tokio::select! { + _ = listener_task => {}, _ = import_notification_sink_task => {}, _ = dropped_monitor_task => {} } @@ -279,14 +282,7 @@ where match dropped.reason { DroppedReason::Usurped(new_tx_hash) => { if let Some(new_tx) = mempool.get_by_hash(new_tx_hash) { - view_store - .replace_transaction( - new_tx.source(), - new_tx.tx(), - tx_hash, - new_tx.is_watched(), - ) - .await; + view_store.replace_transaction(new_tx.source(), new_tx.tx(), tx_hash).await; } else { trace!( target: LOG_TARGET, @@ -318,7 +314,10 @@ where finalized_hash: Block::Hash, ) -> Self { let metrics = PrometheusMetrics::new(prometheus); - let listener = Arc::from(MultiViewListener::new()); + + let (listener, listener_task) = MultiViewListener::new_with_worker(); + let listener = Arc::new(listener); + let (revalidation_queue, revalidation_task) = revalidation_worker::RevalidationQueue::new_with_worker(); @@ -347,6 +346,7 @@ where let combined_tasks = async move { tokio::select! { + _ = listener_task => {} _ = revalidation_task => {}, _ = import_notification_sink_task => {}, _ = dropped_monitor_task => {} @@ -1077,6 +1077,7 @@ where ) }; + let start = Instant::now(); // 1. Capture all import notification from the very beginning, so first register all //the listeners. self.import_notification_sink.add_view( @@ -1089,16 +1090,17 @@ where view.pool.validated_pool().create_dropped_by_limits_stream().boxed(), ); - let start = Instant::now(); - let watched_xts = self.register_listeners(&mut view).await; - let duration = start.elapsed(); + self.view_store.listener.add_view_aggregated_stream( + view.at.hash, + view.pool.validated_pool().create_aggregated_stream().boxed(), + ); // sync the transactions statuses and referencing views in all the listeners with newly // cloned view. view.pool.validated_pool().retrigger_notifications(); debug!( target: LOG_TARGET, ?at, - ?duration, + duration = ?start.elapsed(), "register_listeners" ); @@ -1106,22 +1108,20 @@ where // will make some space for mempool transactions in case we are at the view's limits. let start = Instant::now(); self.update_view_with_fork(&view, tree_route, at.clone()).await; - let duration = start.elapsed(); debug!( target: LOG_TARGET, ?at, - ?duration, + duration = ?start.elapsed(), "update_view_with_fork" ); // 3. Finally, submit transactions from the mempool. let start = Instant::now(); - self.update_view_with_mempool(&mut view, watched_xts).await; - let duration = start.elapsed(); + self.update_view_with_mempool(&mut view).await; debug!( target: LOG_TARGET, ?at, - ?duration, + duration= ?start.elapsed(), "update_view_with_mempool" ); let view = Arc::from(view); @@ -1173,53 +1173,6 @@ where all_extrinsics } - /// For every watched transaction in the mempool registers a transaction listener in the view. - /// - /// The transaction listener for a given view is also added to multi-view listener. This allows - /// to track aggreagated progress of the transaction within the transaction pool. - /// - /// Function returns a list of currently watched transactions in the mempool. - async fn register_listeners( - &self, - view: &View, - ) -> Vec<(ExtrinsicHash, Arc>)> { - debug!( - target: LOG_TARGET, - view_at = ?view.at, - xts_count = ?self.mempool.unwatched_and_watched_count(), - active_views_count = self.active_views_count(), - "register_listeners" - ); - - //todo [#5495]: maybe we don't need to register listener in view? We could use - // multi_view_listener.transaction_in_block - let results = self - .mempool - .clone_watched() - .into_iter() - .map(|(tx_hash, tx)| { - let watcher = view.create_watcher(tx_hash); - let at = view.at.clone(); - async move { - trace!( - target: LOG_TARGET, - ?tx_hash, - at = ?at.hash, - "adding watcher" - ); - self.view_store.listener.add_view_watcher_for_tx( - tx_hash, - at.hash, - watcher.into_stream().boxed(), - ); - (tx_hash, tx) - } - }) - .collect::>(); - - future::join_all(results).await - } - /// Updates the given view with the transactions from the internal mempol. /// /// All transactions from the mempool (excluding those which are either already imported or @@ -1229,15 +1182,7 @@ where /// If there are no views, and mempool transaction is reported as invalid for the given view, /// the transaction is reported as invalid and removed from the mempool. This does not apply to /// stale and temporarily banned transactions. - /// - /// As the listeners for watched transactions were registered at the very beginning of maintain - /// procedure (`register_listeners`), this function accepts the list of watched transactions - /// from the mempool for which listener was actually registered to avoid submit/maintain races. - async fn update_view_with_mempool( - &self, - view: &View, - watched_xts: Vec<(ExtrinsicHash, Arc>)>, - ) { + async fn update_view_with_mempool(&self, view: &View) { debug!( target: LOG_TARGET, view_at = ?view.at, @@ -1247,15 +1192,16 @@ where ); let included_xts = self.extrinsics_included_since_finalized(view.at.hash).await; - let (hashes, xts_filtered): (Vec<_>, Vec<_>) = watched_xts + let (hashes, xts_filtered): (Vec<_>, Vec<_>) = self + .mempool + .clone_transactions() .into_iter() - .chain(self.mempool.clone_unwatched().into_iter()) .filter(|(hash, _)| !view.is_imported(hash)) .filter(|(hash, _)| !included_xts.contains(&hash)) .map(|(tx_hash, tx)| (tx_hash, (tx.source(), tx.tx()))) .unzip(); - let watched_results = view + let results = view .submit_many(xts_filtered) .await .into_iter() @@ -1267,7 +1213,7 @@ where }) .collect::>(); - let submitted_count = watched_results.len(); + let submitted_count = results.len(); debug!( target: LOG_TARGET, @@ -1283,9 +1229,9 @@ where // if there are no views yet, and a single newly created view is reporting error, just send // out the invalid event, and remove transaction. if self.view_store.is_empty() { - for result in watched_results { + for result in results { if let Err(tx_hash) = result { - self.view_store.listener.invalidate_transactions(&[tx_hash]); + self.view_store.listener.transactions_invalidated(&[tx_hash]); self.mempool.remove_transaction(&tx_hash); } } @@ -1619,9 +1565,9 @@ where info!( target: LOG_TARGET, - mempool_len = format!("{:?}", self.mempool_len()), + txs = ?self.mempool_len(), active_views_count = self.active_views_count(), - views_stats = ?self.views_stats(), + views = ?self.views_stats(), ?event, ?duration, "maintain" diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/mod.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/mod.rs index 5f7294a24fd7..2c4da0182a25 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/mod.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/mod.rs @@ -84,7 +84,8 @@ //! //! ### Multi-view listeners //! There is a number of event streams that are provided by individual views: -//! - [transaction status][`Watcher`], +//! - aggregated stream of [transactions statuses][`AggregatedStream`] for all the transactions +//! within the view in the form of `(transaction-hash, status)` tuple, //! - [ready notification][`vp::import_notification_stream`] (see [networking //! section](#networking)), //! - [dropped notification][`create_dropped_by_limits_stream`]. @@ -93,10 +94,9 @@ //! internally). Those aggregators are often referred as multi-view listeners and they implement //! stream-specific or event-specific logic. //! -//! The most important is [`MultiViewListener`] which is owned by view store. -//! More information about it is provided in [transaction -//! route](#transaction-route-submit_and_watch) section. -//! +//! The most important is [`MultiViewListener`] which is owned by view store. Some internal details +//! on events' flow is provided in [transaction status](#monitoring-the-status-of-a-transaction) +//! section. //! //! ### Intermediate transactions buffer: [`TxMemPool`] //! The main purpose of an internal [`TxMemPool`] (referred to as *mempool*) is to prevent a @@ -106,10 +106,11 @@ //! procedure. Additionally, it allows the pool to accept transactions when no blocks have been //! reported yet. //! -//! Since watched and non-watched transactions require a different treatment, the *mempool* keeps a -//! track on how the transaction was submitted. The [transaction source][`TransactionSource`] used -//! to submit transactions also needs to be kept in the *mempool*. The *mempool* transaction is a -//! simple [wrapper][`TxInMemPool`] around the [`Arc`] reference to the actual extrinsic body. +//! The *mempool* keeps a track on how the transaction was submitted - keeping number of watched and +//! non-watched transactions is useful for testing and metrics. The [transaction +//! source][`TransactionSource`] used to submit transactions also needs to be kept in the *mempool*. +//! The *mempool* transaction is a simple [wrapper][`TxInMemPool`] around the [`Arc`] reference to +//! the actual extrinsic body. //! //! Once the view is created, all transactions from *mempool* are submitted to and validated at this //! view. @@ -138,20 +139,37 @@ //! ### Transaction route: [`submit_and_watch`][`api_submit_and_watch`] //! //! The [`submit_and_watch`] function allows to submit the transaction and track its -//! [status][`TransactionStatus`] within the pool. Every view is providing an independent -//! [stream][`View::submit_and_watch`] of events, which needs to be merged into the single stream -//! exposed to the [external listener][`TransactionStatusStreamFor`]. For majority of events simple -//! forwarding of events would not work (e.g. we could get multiple [`Ready`] events, or [`Ready`] / -//! [`Future`] mix). Some additional stateful logic is required to filter and process the views' -//! events. It is also easier to trigger some events (e.g. [`Finalized`], [`Invalid`], and -//! [`Broadcast`]) using some side-channel and simply ignoring these events from the view. All the -//! before mentioned functionality is provided by the [`MultiViewListener`]. -//! -//! When watched transaction is submitted to the pool it is added the *mempool* with watched -//! flag. The external stream for the transaction is created in a [`MultiViewListener`]. Then -//! transaction is submitted to every active [`View`] (using -//! [`submit_and_watch`][`View::submit_and_watch`]) and the resulting -//! views' stream is connected to the [`MultiViewListener`]. +//! [status][`TransactionStatus`] within the pool. +//! +//! When a watched transaction is submitted to the pool it is added to the *mempool* with the +//! watched flag. The external stream for the transaction is created in a [`MultiViewListener`]. +//! Then a transaction is submitted to every active [`View`] (using +//! [`submit_many`][`View::submit_many`]). The view's [aggregated +//! stream][`create_aggregated_stream`] was already connected to the [`MultiViewListener`] when new +//! view was created, so no additional action is required upon the submission. The view will provide +//! the required updates for all the transactions over this single stream. +//! +//! +//! #### Monitoring the status of a transaction +//! +//! Transaction status monitoring and triggering events to [external +//! listener][`TransactionStatusStreamFor`] (e.g. to RPC client) is responsibility of the +//! [`MultiViewListener`]. +//! +//! Every view is providing an independent aggreagated [stream][`create_aggregated_stream`] of +//! events for all transactions in this view, which needs to be merged into the single stream +//! exposed to the [external listener][`TransactionStatusStreamFor`] (e.g. to RPC client). For +//! majority of events simple forwarding would not work (e.g. we could get multiple [`Ready`] +//! events, or [`Ready`] / [`Future`] mix). Some additional stateful logic (implemented by +//! [`MultiViewListener`]) is required to filter and process the views' events. +//! +//! It is not possible to trigger some external events (e.g., [`Dropped`], [`Finalized`], +//! [`Invalid`], and [`Broadcast`]) using only the view-aggregated streams. These events require a +//! pool-wide understanding of the transaction state. For example, dropping a transaction from a +//! single view does not mean it was dropped from other views. Broadcast and finalized notifications +//! are sent to the transaction pool API, not at the view level. These events are simply ignored +//! when they originate in the view. The pool uses a dedicated side channel exposed by +//! [`MultiViewListener`] to trigger the beforementioned events. //! //! ### Maintain //! The transaction pool exposes the [task][`notification_future`] that listens to the @@ -169,8 +187,8 @@ //! *mempool* //! - all transactions from the *mempool* (with some obvious filtering applied) are submitted to //! the view, -//! - for all watched transactions from the *mempool* the watcher is registered in the new view, -//! and it is connected to the multi-view-listener, +//! - the new [aggregated stream][`create_aggregated_stream`] of all transactions statuses is +//! created for the new view and it is connected to the multi-view-listener, //! - [update the view][ForkAwareTxPool::update_view_with_fork] with the transactions from the [tree //! route][`TreeRoute`] (which is computed from the recent best block to newly notified one by //! [enactment state][`EnactmentState`] helper): @@ -292,7 +310,7 @@ //! [`View`]: crate::fork_aware_txpool::view::View //! [`view::revalidate`]: crate::fork_aware_txpool::view::View::revalidate //! [`start_background_revalidation`]: crate::fork_aware_txpool::view::View::start_background_revalidation -//! [`View::submit_and_watch`]: crate::fork_aware_txpool::view::View::submit_and_watch +//! [`View::submit_many`]: crate::fork_aware_txpool::view::View::submit_many //! [`ViewStore`]: crate::fork_aware_txpool::view_store::ViewStore //! [`finish_background_revalidations`]: crate::fork_aware_txpool::view_store::ViewStore::finish_background_revalidations //! [find_best_view]: crate::fork_aware_txpool::view_store::ViewStore::find_best_view @@ -305,10 +323,12 @@ //! [`MultiViewListener`]: crate::fork_aware_txpool::multi_view_listener::MultiViewListener //! [`Pool`]: crate::graph::Pool //! [`Watcher`]: crate::graph::watcher::Watcher +//! [`AggregatedStream`]: crate::graph::AggregatedStream //! [`Options`]: crate::graph::Options //! [`vp::import_notification_stream`]: ../graph/validated_pool/struct.ValidatedPool.html#method.import_notification_stream //! [`vp::enforce_limits`]: ../graph/validated_pool/struct.ValidatedPool.html#method.enforce_limits //! [`create_dropped_by_limits_stream`]: ../graph/validated_pool/struct.ValidatedPool.html#method.create_dropped_by_limits_stream +//! [`create_aggregated_stream`]: ../graph/validated_pool/struct.ValidatedPool.html#method.create_aggregated_stream //! [`ChainEvent`]: sc_transaction_pool_api::ChainEvent //! [`TransactionStatusStreamFor`]: sc_transaction_pool_api::TransactionStatusStreamFor //! [`api_submit`]: sc_transaction_pool_api::TransactionPool::submit_at @@ -323,6 +343,7 @@ //! [`Invalid`]:sc_transaction_pool_api::TransactionStatus::Invalid //! [`InBlock`]:sc_transaction_pool_api::TransactionStatus::InBlock //! [`Finalized`]:sc_transaction_pool_api::TransactionStatus::Finalized +//! [`Dropped`]:sc_transaction_pool_api::TransactionStatus::Dropped //! [`ReadyTransactions`]:sc_transaction_pool_api::ReadyTransactions //! [`dropped_monitor_task`]: ForkAwareTxPool::dropped_monitor_task //! [`ready_poll`]: ForkAwareTxPool::ready_poll diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs index a513559a7cd5..107c2941ec18 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs @@ -21,20 +21,23 @@ //! aggregated streams of transaction events. use crate::{ + common::tracing_log_xt::log_xt_trace, fork_aware_txpool::stream_map_util::next_event, - graph::{self, BlockHash, ExtrinsicHash}, + graph::{self, BlockHash, ExtrinsicHash, TransactionStatusEvent}, LOG_TARGET, }; -use futures::StreamExt; +use futures::{Future, FutureExt, Stream, StreamExt}; +use parking_lot::RwLock; use sc_transaction_pool_api::{TransactionStatus, TransactionStatusStream, TxIndex}; use sc_utils::mpsc; use sp_runtime::traits::Block as BlockT; use std::{ collections::{hash_map::Entry, HashMap, HashSet}, pin::Pin, + sync::Arc, }; use tokio_stream::StreamMap; -use tracing::{debug, trace}; +use tracing::trace; use super::dropped_watcher::{DroppedReason, DroppedTransaction}; @@ -54,99 +57,202 @@ type CommandReceiver = mpsc::TracingUnboundedReceiver; /// It can represent both a single view's stream and an external watcher stream. pub type TxStatusStream = Pin, BlockHash>>>; -/// Commands to control the single external stream living within the multi view listener. -enum ControllerCommand { - /// Adds a new stream of transaction statuses originating in the view associated with a - /// specific block hash. - AddViewStream(BlockHash, TxStatusStream), +/// An aggregated stream providing events for all transactions from the view. +/// +/// This stream delivers updates for all transactions in the view, rather than for individual +/// transactions. +pub type ViewStatusStream = + Pin, BlockHash>> + Send>>; +/// Commands to control / drive the task of the multi view listener. +enum ControllerCommand { + /// Requests transaction status updated. Sent by transaction pool implementation. + TransactionStatusRequest(TransactionStatusUpdate), + /// Adds a new (aggregated) stream of transactions statuses originating in the view associated + /// with a specific block hash. + AddViewStream(BlockHash, ViewStatusStream), /// Removes an existing view's stream associated with a specific block hash. RemoveViewStream(BlockHash), +} +/// Represents the transaction status update performed by transaction pool state machine. The +/// corresponding statuses coming from the view would typically be ignored in the external watcher. +enum TransactionStatusUpdate { /// Marks a transaction as invalidated. /// /// If all pre-conditions are met, an external invalid event will be sent out. - TransactionInvalidated, + TransactionInvalidated(ExtrinsicHash), /// Notifies that a transaction was finalized in a specific block hash and transaction index. /// /// Send out an external finalized event. - FinalizeTransaction(BlockHash, TxIndex), + TransactionFinalized(ExtrinsicHash, BlockHash, TxIndex), /// Notifies that a transaction was broadcasted with a list of peer addresses. /// /// Sends out an external broadcasted event. - TransactionBroadcasted(Vec), + TransactionBroadcasted(ExtrinsicHash, Vec), /// Notifies that a transaction was dropped from the pool. /// /// If all preconditions are met, an external dropped event will be sent out. - TransactionDropped(DroppedReason>), + TransactionDropped(ExtrinsicHash, DroppedReason>), } -impl std::fmt::Debug for ControllerCommand +impl TransactionStatusUpdate +where + ChainApi: graph::ChainApi, +{ + fn hash(&self) -> ExtrinsicHash { + match self { + Self::TransactionInvalidated(hash) | + Self::TransactionFinalized(hash, _, _) | + Self::TransactionBroadcasted(hash, _) | + Self::TransactionDropped(hash, _) => *hash, + } + } +} + +impl std::fmt::Debug for TransactionStatusUpdate where ChainApi: graph::ChainApi, { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - ControllerCommand::AddViewStream(h, _) => write!(f, "ListenerAction::AddView({h})"), - ControllerCommand::RemoveViewStream(h) => write!(f, "ListenerAction::RemoveView({h})"), - ControllerCommand::TransactionInvalidated => { - write!(f, "ListenerAction::TransactionInvalidated") + Self::TransactionInvalidated(h) => { + write!(f, "TransactionInvalidated({h})") }, - ControllerCommand::FinalizeTransaction(h, i) => { - write!(f, "ListenerAction::FinalizeTransaction({h},{i})") + Self::TransactionFinalized(h, b, i) => { + write!(f, "FinalizeTransaction({h},{b},{i})") }, - ControllerCommand::TransactionBroadcasted(_) => { - write!(f, "ListenerAction::TransactionBroadcasted(...)") + Self::TransactionBroadcasted(h, _) => { + write!(f, "TransactionBroadcasted({h})") }, - ControllerCommand::TransactionDropped(r) => { - write!(f, "ListenerAction::TransactionDropped {r:?}") + Self::TransactionDropped(h, r) => { + write!(f, "TransactionDropped({h},{r:?})") + }, + } + } +} + +impl std::fmt::Debug for ControllerCommand +where + ChainApi: graph::ChainApi, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ControllerCommand::AddViewStream(h, _) => write!(f, "AddView({h})"), + ControllerCommand::RemoveViewStream(h) => write!(f, "RemoveView({h})"), + ControllerCommand::TransactionStatusRequest(c) => { + write!(f, "TransactionStatusRequest({c:?})") }, } } } +impl ControllerCommand +where + ChainApi: graph::ChainApi, +{ + /// Creates new instance of a command requesting [`TransactionStatus::Invalid`] transaction + /// status. + fn new_transaction_invalidated(tx_hash: ExtrinsicHash) -> Self { + ControllerCommand::TransactionStatusRequest( + TransactionStatusUpdate::TransactionInvalidated(tx_hash), + ) + } + /// Creates new instance of a command requesting [`TransactionStatus::Broadcast`] transaction + /// status. + fn new_transaction_broadcasted(tx_hash: ExtrinsicHash, peers: Vec) -> Self { + ControllerCommand::TransactionStatusRequest( + TransactionStatusUpdate::TransactionBroadcasted(tx_hash, peers), + ) + } + /// Creates new instance of a command requesting [`TransactionStatus::Finalized`] transaction + /// status. + fn new_transaction_finalized( + tx_hash: ExtrinsicHash, + block_hash: BlockHash, + index: TxIndex, + ) -> Self { + ControllerCommand::TransactionStatusRequest(TransactionStatusUpdate::TransactionFinalized( + tx_hash, block_hash, index, + )) + } + /// Creates new instance of a command requesting [`TransactionStatus::Dropped`] transaction + /// status. + fn new_transaction_dropped( + tx_hash: ExtrinsicHash, + reason: DroppedReason>, + ) -> Self { + ControllerCommand::TransactionStatusRequest(TransactionStatusUpdate::TransactionDropped( + tx_hash, reason, + )) + } +} /// This struct allows to create and control listener for multiple transactions. /// -/// For every transaction the view's stream generating its own events can be added. The events are -/// flattened and sent out to the external listener. (The *external* term here means that it can be -/// exposed to [`sc_transaction_pool_api::TransactionPool`] API client e.g. over RPC.) +/// For every view, an aggregated stream of transactions events can be added. The events are +/// flattened and sent out to the external listener for individual transactions. (The *external* +/// term here means that it can be exposed to [`sc_transaction_pool_api::TransactionPool`] API +/// client e.g. over RPC.) /// -/// The listener allows to add and remove view's stream (per transaction). +/// The listener allows to add and remove view's stream. /// /// The listener provides a side channel that allows triggering specific events (finalized, dropped, -/// invalid) independently of the view's stream. +/// invalid, broadcast) independently of the view's stream. pub struct MultiViewListener { - /// Provides the set of controllers for the events streams corresponding to individual - /// transactions identified by transaction hashes. - controllers: parking_lot::RwLock< - HashMap, Controller>>, - >, + /// Provides the controller for sending control commands to the listener's task. + controller: Controller>, + + /// The map containing the sinks of the streams representing the external listeners of + /// the individual transactions. Hash of the transaction is used as a map's key. A map is + /// shared with listener's task. + external_controllers: + Arc, Controller>>>>, } +/// A type representing a `MultiViewListener` task. For more details refer to +/// [`MultiViewListener::task`]. +pub type MultiViewListenerTask = Pin + Send>>; + /// The external stream unfolding context. /// -/// This context is used to unfold the external events stream for a single transaction, it -/// facilitates the logic of converting single view's events to the external events stream. +/// This context is used to unfold the external events stream for a individual transaction, it +/// facilitates the logic of converting events incoming from numerous views into the external events +/// stream. struct ExternalWatcherContext { /// The hash of the transaction being monitored within this context. tx_hash: ExtrinsicHash, - /// A stream map of transaction status streams coming from individual views, keyed by - /// block hash associated with view. - status_stream_map: StreamMap, TxStatusStream>, - /// A receiver for controller commands. - command_receiver: CommandReceiver>, + /// A receiver for controller commands sent by [`MultiViewListener`]'s task. + command_receiver: CommandReceiver>, /// A flag indicating whether the context should terminate. terminate: bool, /// A flag indicating if a `Future` status has been encountered. future_seen: bool, /// A flag indicating if a `Ready` status has been encountered. ready_seen: bool, - /// A hash set of block hashes from views that consider the transaction valid. views_keeping_tx_valid: HashSet>, + /// The set of views (represented by block hashes) currently maintained by the transaction + /// pool. + known_views: HashSet>, +} + +/// Commands to control the single external stream living within the multi view listener. These +/// commands are sent from listener's task to [`ExternalWatcherContext`]. +enum ExternalWatcherCommand { + /// Command for triggering some of the transaction states, that are decided by the pool logic. + PoolTransactionStatus(TransactionStatusUpdate), + /// Transaction status updates coming from the individual views. + ViewTransactionStatus( + BlockHash, + TransactionStatus, BlockHash>, + ), + /// Notification about new view being added. + AddView(BlockHash), + /// Notification about view being removed. + RemoveView(BlockHash), } impl ExternalWatcherContext @@ -155,44 +261,85 @@ where { /// Creates new `ExternalWatcherContext` for particular transaction identified by `tx_hash` /// - /// The `command_receiver` is a side channel for receiving controller's commands. + /// The `command_receiver` is a side channel for receiving controller's + /// [commands][`ExternalWatcherCommand`]. fn new( tx_hash: ExtrinsicHash, - command_receiver: CommandReceiver>, + command_receiver: CommandReceiver>, ) -> Self { Self { tx_hash, - status_stream_map: StreamMap::new(), command_receiver, terminate: false, future_seen: false, ready_seen: false, views_keeping_tx_valid: Default::default(), + known_views: Default::default(), } } - /// Handles various transaction status updates and manages internal states based on the status. + /// Handles transaction status updates from the pool and manages internal states based on the + /// input value. + /// + /// Function may set the context termination flag, which will close the stream. + /// + /// Returns `Some` with the `event` to be sent out or `None`. + fn handle_pool_transaction_status( + &mut self, + request: TransactionStatusUpdate, + ) -> Option, BlockHash>> { + match request { + TransactionStatusUpdate::TransactionInvalidated(..) => + if self.handle_invalidate_transaction() { + log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: Invalid", self.tx_hash); + return Some(TransactionStatus::Invalid) + }, + TransactionStatusUpdate::TransactionFinalized(_, block, index) => { + log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: Finalized", self.tx_hash); + self.terminate = true; + return Some(TransactionStatus::Finalized((block, index))) + }, + TransactionStatusUpdate::TransactionBroadcasted(_, peers) => { + log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: Broadcasted", self.tx_hash); + return Some(TransactionStatus::Broadcast(peers)) + }, + TransactionStatusUpdate::TransactionDropped(_, DroppedReason::LimitsEnforced) => { + log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: Dropped", self.tx_hash); + self.terminate = true; + return Some(TransactionStatus::Dropped) + }, + TransactionStatusUpdate::TransactionDropped(_, DroppedReason::Usurped(by)) => { + log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: Usurped({:?})", self.tx_hash, by); + self.terminate = true; + return Some(TransactionStatus::Usurped(by)) + }, + }; + None + } + + /// Handles various transaction status updates from individual views and manages internal states + /// based on the input value. /// /// Function may set the context termination flag, which will close the stream. /// - /// Returns `Some` with the `event` to forward or `None`. - fn handle( + /// Returns `Some` with the `event` to be sent out or `None`. + fn handle_view_transaction_status( &mut self, + block_hash: BlockHash, status: TransactionStatus, BlockHash>, - hash: BlockHash, ) -> Option, BlockHash>> { trace!( target: LOG_TARGET, tx_hash = ?self.tx_hash, - ?hash, + ?block_hash, ?status, - views = ?self.status_stream_map.keys().collect::>(), + views = ?self.known_views.iter().collect::>(), "mvl handle event" ); match status { TransactionStatus::Future => { - self.views_keeping_tx_valid.insert(hash); + self.views_keeping_tx_valid.insert(block_hash); if self.ready_seen || self.future_seen { None } else { @@ -201,7 +348,7 @@ where } }, TransactionStatus::Ready => { - self.views_keeping_tx_valid.insert(hash); + self.views_keeping_tx_valid.insert(block_hash); if self.ready_seen { None } else { @@ -209,9 +356,8 @@ where Some(status) } }, - TransactionStatus::Broadcast(_) => None, TransactionStatus::InBlock((..)) => { - self.views_keeping_tx_valid.insert(hash); + self.views_keeping_tx_valid.insert(block_hash); if !(self.ready_seen || self.future_seen) { self.ready_seen = true; Some(status) @@ -219,12 +365,13 @@ where Some(status) } }, - TransactionStatus::Retracted(_) => None, TransactionStatus::FinalityTimeout(_) => Some(status), TransactionStatus::Finalized(_) => { self.terminate = true; Some(status) }, + TransactionStatus::Retracted(_) | + TransactionStatus::Broadcast(_) | TransactionStatus::Usurped(_) | TransactionStatus::Dropped | TransactionStatus::Invalid => None, @@ -238,13 +385,11 @@ where /// Returns true if the event should be sent out, and false if the invalidation request should /// be skipped. fn handle_invalidate_transaction(&mut self) -> bool { - let keys = HashSet::>::from_iter( - self.status_stream_map.keys().map(Clone::clone), - ); + let keys = self.known_views.clone(); trace!( target: LOG_TARGET, tx_hash = ?self.tx_hash, - views = ?self.status_stream_map.keys().collect::>(), + views = ?self.known_views.iter().collect::>(), "got invalidate_transaction" ); if self.views_keeping_tx_valid.is_disjoint(&keys) { @@ -261,33 +406,33 @@ where } } - /// Adds a new transaction status stream. + /// Adds a new aggragted transaction status stream. /// - /// Inserts a new view's transaction status stream associated with a specific block hash into - /// the stream map. - fn add_stream(&mut self, block_hash: BlockHash, stream: TxStatusStream) { - self.status_stream_map.insert(block_hash, stream); + /// Inserts a new view's transaction status stream into the stream map. The view is represented + /// by `block_hash`. + fn add_view(&mut self, block_hash: BlockHash) { trace!( target: LOG_TARGET, tx_hash = ?self.tx_hash, ?block_hash, - views = ?self.status_stream_map.keys().collect::>(), + views = ?self.known_views.iter().collect::>(), "AddView view" ); + self.known_views.insert(block_hash); } - /// Removes an existing transaction status stream. + /// Removes an existing aggreagated transaction status stream. /// - /// Removes a transaction status stream associated with a specific block hash from the - /// stream map. + /// Removes an aggregated transaction status stream associated with a specific block hash from + /// the stream map. fn remove_view(&mut self, block_hash: BlockHash) { - self.status_stream_map.remove(&block_hash); + self.known_views.remove(&block_hash); self.views_keeping_tx_valid.remove(&block_hash); trace!( target: LOG_TARGET, tx_hash = ?self.tx_hash, ?block_hash, - views = ?self.status_stream_map.keys().collect::>(), + views = ?self.known_views.iter().collect::>(), "RemoveView view" ); } @@ -298,125 +443,180 @@ where ChainApi: graph::ChainApi + 'static, <::Block as BlockT>::Hash: Unpin, { - /// Creates new instance of `MultiViewListener`. - pub fn new() -> Self { - Self { controllers: Default::default() } + /// A worker task associated with `MultiViewListener` instance. + /// + /// An asynchronous listener's task responsible for dispatching: + /// - stream_map containing aggregated transaction status streams from multiple views, + /// - view add/remove requests, + /// - transaction commands, + /// to multiple individual per-transaction external watcher contexts. + /// + /// The future shall be polled by instantiator of `MultiViewListener`. + async fn task( + external_watchers_tx_hash_map: Arc< + RwLock, Controller>>>, + >, + mut command_receiver: CommandReceiver>, + ) { + let mut aggregated_streams_map: StreamMap, ViewStatusStream> = + Default::default(); + + loop { + tokio::select! { + biased; + Some((view_hash, (tx_hash, status))) = next_event(&mut aggregated_streams_map) => { + if let Entry::Occupied(mut ctrl) = external_watchers_tx_hash_map.write().entry(tx_hash) { + log::trace!( + target: LOG_TARGET, + "[{:?}] aggregated_stream_map event: view:{} status:{:?}", + tx_hash, + view_hash, + status + ); + if let Err(e) = ctrl + .get_mut() + .unbounded_send(ExternalWatcherCommand::ViewTransactionStatus(view_hash, status)) + { + trace!(target: LOG_TARGET, "[{:?}] send status failed: {:?}", tx_hash, e); + ctrl.remove(); + } + } + }, + cmd = command_receiver.next() => { + log::trace!(target: LOG_TARGET, "cmd {:?}", cmd); + match cmd { + Some(ControllerCommand::AddViewStream(h,stream)) => { + aggregated_streams_map.insert(h,stream); + // //todo: aysnc and join all? + external_watchers_tx_hash_map.write().retain(|tx_hash, ctrl| { + ctrl.unbounded_send(ExternalWatcherCommand::AddView(h)) + .inspect_err(|e| { + trace!(target: LOG_TARGET, "[{:?}] invalidate_transaction: send message failed: {:?}", tx_hash, e); + }) + .is_ok() + }) + }, + Some(ControllerCommand::RemoveViewStream(h)) => { + aggregated_streams_map.remove(&h); + //todo: aysnc and join all? + external_watchers_tx_hash_map.write().retain(|tx_hash, ctrl| { + ctrl.unbounded_send(ExternalWatcherCommand::RemoveView(h)) + .inspect_err(|e| { + trace!(target: LOG_TARGET, "[{:?}] invalidate_transaction: send message failed: {:?}", tx_hash, e); + }) + .is_ok() + }) + }, + + Some(ControllerCommand::TransactionStatusRequest(request)) => { + let tx_hash = request.hash(); + if let Entry::Occupied(mut ctrl) = external_watchers_tx_hash_map.write().entry(tx_hash) { + if let Err(e) = ctrl + .get_mut() + .unbounded_send(ExternalWatcherCommand::PoolTransactionStatus(request)) + { + trace!(target: LOG_TARGET, "[{:?}] send message failed: {:?}", tx_hash, e); + ctrl.remove(); + } + } + }, + None => {} + } + }, + }; + } } - /// Returns `true` if the listener contains a stream controller for the specified hash. - pub fn contains_tx(&self, tx_hash: &ExtrinsicHash) -> bool { - self.controllers.read().contains_key(tx_hash) + /// Creates a new [`MultiViewListener`] instance along with its associated worker task. + /// + /// This function instansiates the new `MultiViewListener` and provides the worker task that + /// relays messages to the external transactions listeners. The task shall be polled by caller. + /// + /// Returns a tuple containing the [`MultiViewListener`] and the + /// [`MultiViewListenerTask`]. + pub fn new_with_worker() -> (Self, MultiViewListenerTask) { + let external_controllers = Arc::from(RwLock::from(HashMap::< + ExtrinsicHash, + Controller>, + >::default())); + + const CONTROLLER_QUEUE_WARN_SIZE: usize = 100_000; + let (tx, rx) = mpsc::tracing_unbounded( + "txpool-multi-view-listener-task-controller", + CONTROLLER_QUEUE_WARN_SIZE, + ); + let task = Self::task(external_controllers.clone(), rx); + + (Self { external_controllers, controller: tx }, task.boxed()) } - /// Creates an external aggregated stream of events for given transaction. + /// Creates an external tstream of events for given transaction. /// /// This method initializes an `ExternalWatcherContext` for the provided transaction hash, sets - /// up the necessary communication channels, and unfolds an external (meaning that it can be - /// exposed to [`sc_transaction_pool_api::TransactionPool`] API client e.g. rpc) stream of - /// transaction status events. If an external watcher is already present for the given - /// transaction, it returns `None`. + /// up the necessary communication channel with listener's task, and unfolds an external + /// (meaning that it can be exposed to [`sc_transaction_pool_api::TransactionPool`] API client + /// e.g. rpc) stream of transaction status events. If an external watcher is already present for + /// the given transaction, it returns `None`. pub(crate) fn create_external_watcher_for_tx( &self, tx_hash: ExtrinsicHash, ) -> Option> { - let mut controllers = self.controllers.write(); - if controllers.contains_key(&tx_hash) { - return None - } + let external_ctx = match self.external_controllers.write().entry(tx_hash) { + Entry::Occupied(_) => return None, + Entry::Vacant(entry) => { + const EXT_CONTROLLER_QUEUE_WARN_THRESHOLD: usize = 128; + let (tx, rx) = mpsc::tracing_unbounded( + "txpool-multi-view-listener", + EXT_CONTROLLER_QUEUE_WARN_THRESHOLD, + ); + entry.insert(tx); + ExternalWatcherContext::new(tx_hash, rx) + }, + }; trace!( target: LOG_TARGET, ?tx_hash, "create_external_watcher_for_tx" ); - let (tx, rx) = mpsc::tracing_unbounded("txpool-multi-view-listener", 32); - controllers.insert(tx_hash, tx); - - let ctx = ExternalWatcherContext::new(tx_hash, rx); Some( - futures::stream::unfold(ctx, |mut ctx| async move { + futures::stream::unfold(external_ctx, |mut ctx| async move { if ctx.terminate { + log::trace!(target: LOG_TARGET, "[{:?}] terminate", ctx.tx_hash); return None } loop { tokio::select! { - biased; - Some((view_hash, status)) = next_event(&mut ctx.status_stream_map) => { - if let Some(new_status) = ctx.handle(status, view_hash) { - trace!( - target: LOG_TARGET, - tx_hash = ?ctx.tx_hash, - ?new_status, - "mvl sending out" - ); - return Some((new_status, ctx)) - } - }, cmd = ctx.command_receiver.next() => { - trace!( - target: LOG_TARGET, - tx_hash = ?ctx.tx_hash, - views = ?ctx.status_stream_map.keys().collect::>(), - "select::rx" - ); match cmd? { - ControllerCommand::AddViewStream(h,stream) => { - ctx.add_stream(h, stream); - }, - ControllerCommand::RemoveViewStream(h) => { - ctx.remove_view(h); - }, - ControllerCommand::TransactionInvalidated => { - if ctx.handle_invalidate_transaction() { + ExternalWatcherCommand::ViewTransactionStatus(view_hash, status) => { + if let Some(new_status) = ctx.handle_view_transaction_status(view_hash, status) { trace!( target: LOG_TARGET, tx_hash = ?ctx.tx_hash, - status = "Invalid", + ?new_status, "mvl sending out" ); - return Some((TransactionStatus::Invalid, ctx)) + return Some((new_status, ctx)) } }, - ControllerCommand::FinalizeTransaction(block, index) => { - trace!( - target: LOG_TARGET, - tx_hash = ?ctx.tx_hash, - status = "Finalized", - "mvl sending out" - ); - ctx.terminate = true; - return Some((TransactionStatus::Finalized((block, index)), ctx)) - }, - ControllerCommand::TransactionBroadcasted(peers) => { - trace!( - target: LOG_TARGET, - tx_hash = ?ctx.tx_hash, - status = "Broadcasted", - "mvl sending out" - ); - return Some((TransactionStatus::Broadcast(peers), ctx)) - }, - ControllerCommand::TransactionDropped(DroppedReason::LimitsEnforced) => { - trace!( - target: LOG_TARGET, - tx_hash = ?ctx.tx_hash, - status = "Dropped", - "mvl sending out" - ); - ctx.terminate = true; - return Some((TransactionStatus::Dropped, ctx)) + ExternalWatcherCommand::PoolTransactionStatus(request) => { + if let Some(new_status) = ctx.handle_pool_transaction_status(request) { + trace!( + target: LOG_TARGET, + tx_hash = ?ctx.tx_hash, + ?new_status, + "mvl sending out" + ); + return Some((new_status, ctx)) + } + } + ExternalWatcherCommand::AddView(h) => { + ctx.add_view(h); }, - ControllerCommand::TransactionDropped(DroppedReason::Usurped(by)) => { - trace!( - target: LOG_TARGET, - tx_hash = ?ctx.tx_hash, - status = "Usurped", - ?by, - "mvl sending out" - ); - ctx.terminate = true; - return Some((TransactionStatus::Usurped(by), ctx)) + ExternalWatcherCommand::RemoveView(h) => { + ctx.remove_view(h); }, } }, @@ -427,178 +627,142 @@ where ) } - /// Adds a view's transaction status stream for particular transaction. + /// Adds an aggregated view's transaction status stream. + /// + /// This method sends a `AddViewStream` command to the task, from where it is further dispatched + /// to the external watcher context for every watched transaction. /// - /// This method sends a `AddViewStream` command to the controller of each transaction to - /// remove the view's stream corresponding to the given block hash. - pub(crate) fn add_view_watcher_for_tx( + /// The stream is associated with a view represented by `block_hash`. + pub(crate) fn add_view_aggregated_stream( &self, - tx_hash: ExtrinsicHash, block_hash: BlockHash, - stream: TxStatusStream, + stream: ViewStatusStream, ) { - let mut controllers = self.controllers.write(); - - if let Entry::Occupied(mut tx) = controllers.entry(tx_hash) { - if let Err(error) = tx - .get_mut() - .unbounded_send(ControllerCommand::AddViewStream(block_hash, stream)) - { - trace!( - target: LOG_TARGET, - ?tx_hash, - %error, - "add_view_watcher_for_tx: send message failed" - ); - tx.remove(); - } + trace!(target: LOG_TARGET, ?block_hash, "mvl::add_view_aggregated_stream"); + if let Err(error) = self + .controller + .unbounded_send(ControllerCommand::AddViewStream(block_hash, stream)) + { + trace!( + target: LOG_TARGET, + ?block_hash, + %error, + "add_view_aggregated_stream: send message failed" + ); } } - /// Removes a view's stream associated with a specific view hash across all transactions. + /// Removes a view's stream associated with a specific view hash. /// - /// This method sends a `RemoveViewStream` command to the controller of each transaction to - /// remove the view's stream corresponding to the given block hash. + /// This method sends a `RemoveViewStream` command to the listener's task, from where is further + /// dispatched to the external watcher context for every watched transaction. pub(crate) fn remove_view(&self, block_hash: BlockHash) { - self.controllers.write().retain(|tx_hash, sender| { - sender - .unbounded_send(ControllerCommand::RemoveViewStream(block_hash)) - .map_err(|error| { - trace!( - target: LOG_TARGET, - ?tx_hash, - %error, - "remove_view: send message failed" - ); - error - }) - .is_ok() - }); + trace!(target: LOG_TARGET, ?block_hash, "mvl::remove_view"); + if let Err(error) = + self.controller.unbounded_send(ControllerCommand::RemoveViewStream(block_hash)) + { + trace!( + target: LOG_TARGET, + ?block_hash, + %error, + "remove_view: send message failed" + ); + } } /// Invalidate given transaction. /// - /// This method sends a `TransactionInvalidated` command to the controller of each transaction - /// provided to process the invalidation request. + /// This method sends a `TransactionInvalidated` command to the task's controller of each + /// transaction provided to process the invalidation request. /// /// The external event will be sent if no view is referencing the transaction as `Ready` or /// `Future`. - pub(crate) fn invalidate_transactions(&self, invalid_hashes: &[ExtrinsicHash]) { - let mut controllers = self.controllers.write(); - invalid_hashes.iter().for_each(|tx_hash| { - if let Entry::Occupied(mut tx) = controllers.entry(*tx_hash) { + pub(crate) fn transactions_invalidated(&self, invalid_hashes: &[ExtrinsicHash]) { + log_xt_trace!(target: LOG_TARGET, invalid_hashes, "transactions_invalidated"); + for tx_hash in invalid_hashes { + if let Err(error) = self + .controller + .unbounded_send(ControllerCommand::new_transaction_invalidated(*tx_hash)) + { trace!( target: LOG_TARGET, ?tx_hash, - "invalidate_transaction" + %error, + "transactions_invalidated: send message failed" ); - if let Err(error) = - tx.get_mut().unbounded_send(ControllerCommand::TransactionInvalidated) - { - trace!( - target: LOG_TARGET, - ?tx_hash, - %error, - "invalidate_transaction: send message failed" - ); - tx.remove(); - } } - }); + } } /// Send `Broadcasted` event to listeners of all transactions. /// - /// This method sends a `TransactionBroadcasted` command to the controller of each transaction - /// provided prompting the external `Broadcasted` event. + /// This method sends a `TransactionBroadcasted` command to the task's controller for each + /// transaction provided. It will prompt the external `Broadcasted` event. pub(crate) fn transactions_broadcasted( &self, propagated: HashMap, Vec>, ) { - let mut controllers = self.controllers.write(); - propagated.into_iter().for_each(|(tx_hash, peers)| { - if let Entry::Occupied(mut tx) = controllers.entry(tx_hash) { + for (tx_hash, peers) in propagated { + if let Err(error) = self + .controller + .unbounded_send(ControllerCommand::new_transaction_broadcasted(tx_hash, peers)) + { trace!( target: LOG_TARGET, ?tx_hash, - "transaction_broadcasted" + %error, + "transactions_broadcasted: send message failed" ); - if let Err(error) = - tx.get_mut().unbounded_send(ControllerCommand::TransactionBroadcasted(peers)) - { - trace!( - target: LOG_TARGET, - ?tx_hash, - %error, - "transactions_broadcasted: send message failed" - ); - tx.remove(); - } } - }); + } } /// Send `Dropped` event to listeners of transactions. /// - /// This method sends a `TransactionDropped` command to the controller of each requested - /// transaction prompting and external `Broadcasted` event. + /// This method sends a `TransactionDropped` command to the task's controller. It will prompt + /// the external `Broadcasted` event. pub(crate) fn transaction_dropped(&self, dropped: DroppedTransaction>) { - let mut controllers = self.controllers.write(); - debug!( - target: LOG_TARGET, - ?dropped, - "mvl::transaction_dropped" - ); - if let Some(tx) = controllers.remove(&dropped.tx_hash) { - let DroppedTransaction { tx_hash, reason } = dropped; - debug!( + let DroppedTransaction { tx_hash, reason } = dropped; + trace!(target: LOG_TARGET, ?tx_hash, ?reason, "transaction_dropped"); + if let Err(error) = self + .controller + .unbounded_send(ControllerCommand::new_transaction_dropped(tx_hash, reason)) + { + trace!( target: LOG_TARGET, ?tx_hash, - "transaction_dropped" + %error, + "transaction_dropped: send message failed" ); - if let Err(error) = tx.unbounded_send(ControllerCommand::TransactionDropped(reason)) { - trace!( - target: LOG_TARGET, - ?tx_hash, - %error, - "transaction_dropped: send message failed" - ); - }; } } /// Send `Finalized` event for given transaction at given block. /// - /// This will send `Finalized` event to the external watcher. - pub(crate) fn finalize_transaction( + /// This will trigger `Finalized` event to the external watcher. + pub(crate) fn transaction_finalized( &self, tx_hash: ExtrinsicHash, block: BlockHash, idx: TxIndex, ) { - let mut controllers = self.controllers.write(); - if let Some(tx) = controllers.remove(&tx_hash) { + trace!(target: LOG_TARGET, ?tx_hash, "transaction_finalized"); + if let Err(error) = self + .controller + .unbounded_send(ControllerCommand::new_transaction_finalized(tx_hash, block, idx)) + { trace!( target: LOG_TARGET, ?tx_hash, - "finalize_transaction" + %error, + "transaction_finalized: send message failed" ); - if let Err(error) = - tx.unbounded_send(ControllerCommand::FinalizeTransaction(block, idx)) - { - trace!( - target: LOG_TARGET, - ?tx_hash, - %error, - "finalize_transaction: send message failed" - ); - } }; } /// Removes stale controllers. pub(crate) fn remove_stale_controllers(&self) { - self.controllers.write().retain(|_, c| !c.is_closed()); + self.external_controllers.write().retain(|_, c| !c.is_closed()); } } @@ -608,38 +772,60 @@ mod tests { use crate::common::tests::TestApi; use futures::{stream, StreamExt}; use sp_core::H256; + use tokio::{select, task::JoinHandle}; + use tracing::debug; type MultiViewListener = super::MultiViewListener; + fn create_multi_view_listener( + ) -> (MultiViewListener, tokio::sync::oneshot::Sender<()>, JoinHandle<()>) { + let (listener, listener_task) = MultiViewListener::new_with_worker(); + + let (tx, rx) = tokio::sync::oneshot::channel(); + + let listener_handle = tokio::spawn(async move { + select! { + _ = listener_task => {}, + _ = rx => { return; } + } + }); + + (listener, tx, listener_handle) + } + #[tokio::test] async fn test01() { sp_tracing::try_init_simple(); - let listener = MultiViewListener::new(); + let (listener, terminate_listener, listener_task) = create_multi_view_listener(); let block_hash = H256::repeat_byte(0x01); + let tx_hash = H256::repeat_byte(0x0a); let events = vec![ TransactionStatus::Ready, TransactionStatus::InBlock((block_hash, 0)), TransactionStatus::Finalized((block_hash, 0)), ]; - let tx_hash = H256::repeat_byte(0x0a); let external_watcher = listener.create_external_watcher_for_tx(tx_hash).unwrap(); let handle = tokio::spawn(async move { external_watcher.collect::>().await }); - let view_stream = futures::stream::iter(events.clone()); + let view_stream = + futures::stream::iter(std::iter::repeat(tx_hash).zip(events.clone().into_iter())); - listener.add_view_watcher_for_tx(tx_hash, block_hash, view_stream.boxed()); + listener.add_view_aggregated_stream(block_hash, view_stream.boxed()); let out = handle.await.unwrap(); assert_eq!(out, events); debug!("out: {:#?}", out); + + let _ = terminate_listener.send(()); + let _ = listener_task.await.unwrap(); } #[tokio::test] async fn test02() { sp_tracing::try_init_simple(); - let listener = MultiViewListener::new(); + let (listener, terminate_listener, listener_task) = create_multi_view_listener(); let block_hash0 = H256::repeat_byte(0x01); let events0 = vec![ @@ -658,13 +844,15 @@ mod tests { let tx_hash = H256::repeat_byte(0x0a); let external_watcher = listener.create_external_watcher_for_tx(tx_hash).unwrap(); - let view_stream0 = futures::stream::iter(events0.clone()); - let view_stream1 = futures::stream::iter(events1.clone()); + let view_stream0 = + futures::stream::iter(std::iter::repeat(tx_hash).zip(events0.clone().into_iter())); + let view_stream1 = + futures::stream::iter(std::iter::repeat(tx_hash).zip(events1.clone().into_iter())); let handle = tokio::spawn(async move { external_watcher.collect::>().await }); - listener.add_view_watcher_for_tx(tx_hash, block_hash0, view_stream0.boxed()); - listener.add_view_watcher_for_tx(tx_hash, block_hash1, view_stream1.boxed()); + listener.add_view_aggregated_stream(block_hash0, view_stream0.boxed()); + listener.add_view_aggregated_stream(block_hash1, view_stream1.boxed()); let out = handle.await.unwrap(); @@ -678,12 +866,15 @@ mod tests { ] .contains(v))); assert_eq!(out.len(), 5); + + let _ = terminate_listener.send(()); + let _ = listener_task.await.unwrap(); } #[tokio::test] async fn test03() { sp_tracing::try_init_simple(); - let listener = MultiViewListener::new(); + let (listener, terminate_listener, listener_task) = create_multi_view_listener(); let block_hash0 = H256::repeat_byte(0x01); let events0 = vec![ @@ -699,13 +890,18 @@ mod tests { let external_watcher = listener.create_external_watcher_for_tx(tx_hash).unwrap(); let handle = tokio::spawn(async move { external_watcher.collect::>().await }); - let view_stream0 = futures::stream::iter(events0.clone()); - let view_stream1 = futures::stream::iter(events1.clone()); + let view_stream0 = + futures::stream::iter(std::iter::repeat(tx_hash).zip(events0.clone().into_iter())); + let view_stream1 = + futures::stream::iter(std::iter::repeat(tx_hash).zip(events1.clone().into_iter())); - listener.add_view_watcher_for_tx(tx_hash, block_hash0, view_stream0.boxed()); - listener.add_view_watcher_for_tx(tx_hash, block_hash1, view_stream1.boxed()); + listener.add_view_aggregated_stream(block_hash0, view_stream0.boxed()); + listener.add_view_aggregated_stream(block_hash1, view_stream1.boxed()); - listener.invalidate_transactions(&[tx_hash]); + listener.remove_view(block_hash0); + listener.remove_view(block_hash1); + + listener.transactions_invalidated(&[tx_hash]); let out = handle.await.unwrap(); debug!("out: {:#?}", out); @@ -717,12 +913,15 @@ mod tests { ] .contains(v))); assert_eq!(out.len(), 4); - } + let _ = terminate_listener.send(()); + let _ = listener_task.await.unwrap(); + } + // #[tokio::test] async fn test032() { sp_tracing::try_init_simple(); - let listener = MultiViewListener::new(); + let (listener, terminate_listener, listener_task) = create_multi_view_listener(); let block_hash0 = H256::repeat_byte(0x01); let events0_tx0 = vec![TransactionStatus::Future]; @@ -745,19 +944,26 @@ mod tests { let handle0 = tokio::spawn(async move { external_watcher_tx0.collect::>().await }); let handle1 = tokio::spawn(async move { external_watcher_tx1.collect::>().await }); - let view0_tx0_stream = futures::stream::iter(events0_tx0.clone()); - let view0_tx1_stream = futures::stream::iter(events0_tx1.clone()); + let view0_tx0_stream = + futures::stream::iter(std::iter::repeat(tx0_hash).zip(events0_tx0.clone())); + let view0_tx1_stream = + futures::stream::iter(std::iter::repeat(tx1_hash).zip(events0_tx1.clone())); + + let view1_tx0_stream = + futures::stream::iter(std::iter::repeat(tx0_hash).zip(events1_tx0.clone())); + let view1_tx1_stream = + futures::stream::iter(std::iter::repeat(tx1_hash).zip(events1_tx1.clone())); - let view1_tx0_stream = futures::stream::iter(events1_tx0.clone()); - let view1_tx1_stream = futures::stream::iter(events1_tx1.clone()); + listener.add_view_aggregated_stream(block_hash0, view0_tx0_stream.boxed()); + listener.add_view_aggregated_stream(block_hash1, view1_tx0_stream.boxed()); + listener.add_view_aggregated_stream(block_hash0, view0_tx1_stream.boxed()); + listener.add_view_aggregated_stream(block_hash1, view1_tx1_stream.boxed()); - listener.add_view_watcher_for_tx(tx0_hash, block_hash0, view0_tx0_stream.boxed()); - listener.add_view_watcher_for_tx(tx0_hash, block_hash1, view1_tx0_stream.boxed()); - listener.add_view_watcher_for_tx(tx1_hash, block_hash0, view0_tx1_stream.boxed()); - listener.add_view_watcher_for_tx(tx1_hash, block_hash1, view1_tx1_stream.boxed()); + listener.remove_view(block_hash0); + listener.remove_view(block_hash1); - listener.invalidate_transactions(&[tx0_hash]); - listener.invalidate_transactions(&[tx1_hash]); + listener.transactions_invalidated(&[tx0_hash]); + listener.transactions_invalidated(&[tx1_hash]); let out_tx0 = handle0.await.unwrap(); let out_tx1 = handle1.await.unwrap(); @@ -780,12 +986,15 @@ mod tests { .contains(v))); assert_eq!(out_tx0.len(), 4); assert_eq!(out_tx1.len(), 3); + + let _ = terminate_listener.send(()); + let _ = listener_task.await.unwrap(); } #[tokio::test] async fn test04() { sp_tracing::try_init_simple(); - let listener = MultiViewListener::new(); + let (listener, terminate_listener, listener_task) = create_multi_view_listener(); let block_hash0 = H256::repeat_byte(0x01); let events0 = vec![ @@ -801,18 +1010,20 @@ mod tests { let external_watcher = listener.create_external_watcher_for_tx(tx_hash).unwrap(); //views will keep transaction valid, invalidation shall not happen - let view_stream0 = futures::stream::iter(events0.clone()).chain(stream::pending().boxed()); - let view_stream1 = futures::stream::iter(events1.clone()).chain(stream::pending().boxed()); + let view_stream0 = futures::stream::iter(std::iter::repeat(tx_hash).zip(events0.clone())) + .chain(stream::pending().boxed()); + let view_stream1 = futures::stream::iter(std::iter::repeat(tx_hash).zip(events1.clone())) + .chain(stream::pending().boxed()); let handle = tokio::spawn(async move { // views are still there, we need to fetch 3 events external_watcher.take(3).collect::>().await }); - listener.add_view_watcher_for_tx(tx_hash, block_hash0, view_stream0.boxed()); - listener.add_view_watcher_for_tx(tx_hash, block_hash1, view_stream1.boxed()); + listener.add_view_aggregated_stream(block_hash0, view_stream0.boxed()); + listener.add_view_aggregated_stream(block_hash1, view_stream1.boxed()); - listener.invalidate_transactions(&[tx_hash]); + listener.transactions_invalidated(&[tx_hash]); let out = handle.await.unwrap(); debug!("out: {:#?}", out); @@ -825,12 +1036,14 @@ mod tests { ] .contains(v))); assert_eq!(out.len(), 3); + let _ = terminate_listener.send(()); + let _ = listener_task.await.unwrap(); } #[tokio::test] async fn test05() { sp_tracing::try_init_simple(); - let listener = MultiViewListener::new(); + let (listener, terminate_listener, listener_task) = create_multi_view_listener(); let block_hash0 = H256::repeat_byte(0x01); let events0 = vec![TransactionStatus::Invalid]; @@ -839,18 +1052,24 @@ mod tests { let external_watcher = listener.create_external_watcher_for_tx(tx_hash).unwrap(); let handle = tokio::spawn(async move { external_watcher.collect::>().await }); - let view_stream0 = futures::stream::iter(events0.clone()).chain(stream::pending().boxed()); + let view_stream0 = futures::stream::iter(std::iter::repeat(tx_hash).zip(events0.clone())) + .chain(stream::pending().boxed()); // Note: this generates actual Invalid event. - // Invalid event from View's stream is intentionally ignored. - listener.invalidate_transactions(&[tx_hash]); + // Invalid event from View's stream is intentionally ignored . + // we need to explicitely remove the view + listener.remove_view(block_hash0); + listener.transactions_invalidated(&[tx_hash]); - listener.add_view_watcher_for_tx(tx_hash, block_hash0, view_stream0.boxed()); + listener.add_view_aggregated_stream(block_hash0, view_stream0.boxed()); let out = handle.await.unwrap(); debug!("out: {:#?}", out); assert!(out.iter().all(|v| vec![TransactionStatus::Invalid].contains(v))); assert_eq!(out.len(), 1); + + let _ = terminate_listener.send(()); + let _ = listener_task.await.unwrap(); } } diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs index 440e77313d3e..e141016ccb28 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs @@ -77,10 +77,10 @@ where Block: BlockT, ChainApi: graph::ChainApi + 'static, { - //todo: add listener for updating listeners with events [#5495] /// Is the progress of transaction watched. /// - /// Was transaction sent with `submit_and_watch`. + /// Indicates if transaction was sent with `submit_and_watch`. Serves only stats/testing + /// purposes. watched: bool, /// Extrinsic actual body. tx: ExtrinsicFor, @@ -93,14 +93,6 @@ where /// Priority of transaction at some block. It is assumed it will not be changed often. None if /// not known. priority: RwLock>, - //todo: we need to add future / ready status at finalized block. - //If future transactions are stuck in tx_mem_pool (due to limits being hit), we need a means - // to replace them somehow with newly coming transactions. - // For sure priority is one of them, but some additional criteria maybe required. - // - // The other maybe simple solution for this could be just obeying 10% limit for future in - // tx_mem_pool. Oldest future transaction could be just dropped. *(Status at finalized would - // also be needed). Probably is_future_at_finalized:Option flag will be enought } impl TxInMemPool @@ -215,7 +207,6 @@ where /// A shared instance of the `MultiViewListener`. /// /// Provides a side-channel allowing to send per-transaction state changes notification. - //todo: could be removed after removing watched field (and adding listener into tx) [#5495] listener: Arc>, /// A map that stores the transactions currently in the memory pool. @@ -277,7 +268,7 @@ where } /// Creates a new `TxMemPool` instance for testing purposes. - #[allow(dead_code)] + #[cfg(test)] fn new_test( api: Arc, max_transactions_count: usize, @@ -285,7 +276,7 @@ where ) -> Self { Self { api, - listener: Arc::from(MultiViewListener::new()), + listener: Arc::from(MultiViewListener::new_with_worker().0), transactions: Default::default(), metrics: Default::default(), max_transactions_count, @@ -469,27 +460,11 @@ where self.try_insert(hash, TxInMemPool::new_watched(source, xt.clone(), length)) } - /// Clones and returns a `HashMap` of references to all unwatched transactions in the memory - /// pool. - pub(super) fn clone_unwatched( - &self, - ) -> HashMap, Arc>> { - self.transactions - .read() - .iter() - .filter_map(|(hash, tx)| (!tx.is_watched()).then(|| (*hash, tx.clone()))) - .collect::>() - } - - /// Clones and returns a `HashMap` of references to all watched transactions in the memory pool. - pub(super) fn clone_watched( + /// Clones and returns a `HashMap` of references to all transactions in the memory pool. + pub(super) fn clone_transactions( &self, ) -> HashMap, Arc>> { - self.transactions - .read() - .iter() - .filter_map(|(hash, tx)| (tx.is_watched()).then(|| (*hash, tx.clone()))) - .collect::>() + self.transactions.clone_map() } /// Removes a transaction with given hash from the memory pool. @@ -611,7 +586,7 @@ where invalid_hashes.iter().for_each(|i| { transactions.remove(i); }); - self.listener.invalidate_transactions(&invalid_hashes); + self.listener.transactions_invalidated(&invalid_hashes); } /// Updates the priority of transaction stored in mempool using provided view_store submission diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs index 6324997da67b..555444956122 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs @@ -27,8 +27,8 @@ use super::metrics::MetricsLink as PrometheusMetrics; use crate::{ common::tracing_log_xt::log_xt_trace, graph::{ - self, base_pool::TimedTransactionSource, watcher::Watcher, ExtrinsicFor, ExtrinsicHash, - IsValidator, ValidatedPoolSubmitOutcome, ValidatedTransaction, ValidatedTransactionFor, + self, base_pool::TimedTransactionSource, ExtrinsicFor, ExtrinsicHash, IsValidator, + ValidatedPoolSubmitOutcome, ValidatedTransaction, ValidatedTransactionFor, }, LOG_TARGET, }; @@ -155,6 +155,18 @@ where } } + /// Imports single unvalidated extrinsic into the view. + pub(super) async fn submit_one( + &self, + source: TimedTransactionSource, + xt: ExtrinsicFor, + ) -> Result, ChainApi::Error> { + self.submit_many(std::iter::once((source, xt))) + .await + .pop() + .expect("There is exactly one result, qed.") + } + /// Imports many unvalidated extrinsics into the view. pub(super) async fn submit_many( &self, @@ -162,28 +174,17 @@ where ) -> Vec, ChainApi::Error>> { if tracing::enabled!(target: LOG_TARGET, tracing::Level::TRACE) { let xts = xts.into_iter().collect::>(); - log_xt_trace!(target: LOG_TARGET, xts.iter().map(|(_,xt)| self.pool.validated_pool().api().hash_and_length(xt).0), "view::submit_many at:{}", self.at.hash); + log_xt_trace!( + target: LOG_TARGET, + xts.iter().map(|(_,xt)| self.pool.validated_pool().api().hash_and_length(xt).0), + "view::submit_many at:{}", + self.at.hash); self.pool.submit_at(&self.at, xts).await } else { self.pool.submit_at(&self.at, xts).await } } - /// Import a single extrinsic and starts to watch its progress in the view. - pub(super) async fn submit_and_watch( - &self, - source: TimedTransactionSource, - xt: ExtrinsicFor, - ) -> Result, ChainApi::Error> { - trace!( - target: LOG_TARGET, - tx_hash = ?self.pool.validated_pool().api().hash_and_length(&xt).0, - view_at_hash = ?self.at.hash, - "view::submit_and_watch" - ); - self.pool.submit_and_watch(&self.at, source, xt).await - } - /// Synchronously imports single unvalidated extrinsics into the view. pub(super) fn submit_local( &self, @@ -237,18 +238,6 @@ where self.pool.validated_pool().status() } - /// Creates a watcher for given transaction. - /// - /// Intended to be called for the transaction that already exists in the pool - pub(super) fn create_watcher( - &self, - tx_hash: ExtrinsicHash, - ) -> Watcher, ExtrinsicHash> { - //todo(minor): some assert could be added here - to make sure that transaction actually - // exists in the view. - self.pool.validated_pool().create_watcher(tx_hash) - } - /// Revalidates some part of transaction from the internal pool. /// /// Intended to be called from the revalidation worker. The revalidation process can be diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs index c4209a7d7f41..e534decf9b1a 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs @@ -31,7 +31,6 @@ use crate::{ }, ReadyIteratorFor, LOG_TARGET, }; -use futures::prelude::*; use itertools::Itertools; use parking_lot::RwLock; use sc_transaction_pool_api::{error::Error as PoolError, PoolStatus}; @@ -55,8 +54,6 @@ where xt: ExtrinsicFor, /// Source of the transaction. source: TimedTransactionSource, - /// Inidicates if transaction is watched. - watched: bool, } /// Helper type representing the callback allowing to trigger per-transaction events on @@ -108,14 +105,10 @@ where ChainApi: graph::ChainApi, { /// Creates new unprocessed instance of pending transaction submission. - fn new_submission_action( - xt: ExtrinsicFor, - source: TimedTransactionSource, - watched: bool, - ) -> Self { + fn new_submission_action(xt: ExtrinsicFor, source: TimedTransactionSource) -> Self { Self { processed: false, - action: PreInsertAction::SubmitTx(PendingTxSubmission { xt, source, watched }), + action: PreInsertAction::SubmitTx(PendingTxSubmission { xt, source }), } } @@ -290,7 +283,7 @@ where let Some(external_watcher) = self.listener.create_external_watcher_for_tx(tx_hash) else { return Err(PoolError::AlreadyImported(Box::new(tx_hash)).into()) }; - let submit_and_watch_futures = { + let submit_futures = { let active_views = self.active_views.read(); active_views .iter() @@ -298,23 +291,11 @@ where let view = view.clone(); let xt = xt.clone(); let source = source.clone(); - async move { - match view.submit_and_watch(source, xt).await { - Ok(mut result) => { - self.listener.add_view_watcher_for_tx( - tx_hash, - view.at.hash, - result.expect_watcher().into_stream().boxed(), - ); - Ok(result) - }, - Err(e) => Err(e), - } - } + async move { view.submit_one(source, xt).await } }) .collect::>() }; - let result = futures::future::join_all(submit_and_watch_futures) + let result = futures::future::join_all(submit_futures) .await .into_iter() .find_or_first(Result::is_ok); @@ -462,7 +443,7 @@ where extrinsics .iter() .enumerate() - .for_each(|(i, tx_hash)| self.listener.finalize_transaction(*tx_hash, *block, i)); + .for_each(|(i, tx_hash)| self.listener.transaction_finalized(*tx_hash, *block, i)); finalized_transactions.extend(extrinsics); } @@ -705,14 +686,9 @@ where source: TimedTransactionSource, xt: ExtrinsicFor, replaced: ExtrinsicHash, - watched: bool, ) { if let Entry::Vacant(entry) = self.pending_txs_tasks.write().entry(replaced) { - entry.insert(PendingPreInsertTask::new_submission_action( - xt.clone(), - source.clone(), - watched, - )); + entry.insert(PendingPreInsertTask::new_submission_action(xt.clone(), source.clone())); } else { return }; @@ -722,11 +698,9 @@ where target: LOG_TARGET, ?replaced, ?tx_hash, - watched, "replace_transaction" ); - - self.replace_transaction_in_views(source, xt, tx_hash, replaced, watched).await; + self.replace_transaction_in_views(source, xt, tx_hash, replaced).await; if let Some(replacement) = self.pending_txs_tasks.write().get_mut(&replaced) { replacement.mark_processed(); @@ -747,7 +721,6 @@ where submission.source.clone(), submission.xt.clone(), xt_hash, - submission.watched, )); }, PreInsertAction::RemoveSubtree(ref removal) => { @@ -768,37 +741,15 @@ where source: TimedTransactionSource, xt: ExtrinsicFor, tx_hash: ExtrinsicHash, - watched: bool, ) { - if watched { - match view.submit_and_watch(source, xt).await { - Ok(mut result) => { - self.listener.add_view_watcher_for_tx( - tx_hash, - view.at.hash, - result.expect_watcher().into_stream().boxed(), - ); - }, - Err(error) => { - trace!( - target: LOG_TARGET, - ?tx_hash, - at_hash = ?view.at.hash, - %error, - "replace_transaction: submit_and_watch failed" - ); - }, - } - } else { - if let Some(Err(error)) = view.submit_many(std::iter::once((source, xt))).await.pop() { - trace!( - target: LOG_TARGET, - ?tx_hash, - at_hash = ?view.at.hash, - %error, - "replace_transaction: submit failed" - ); - } + if let Err(error) = view.submit_one(source, xt).await { + trace!( + target: LOG_TARGET, + ?tx_hash, + at_hash = ?view.at.hash, + %error, + "replace_transaction: submit failed" + ); } } @@ -812,17 +763,7 @@ where xt: ExtrinsicFor, tx_hash: ExtrinsicHash, replaced: ExtrinsicHash, - watched: bool, ) { - if watched && !self.listener.contains_tx(&tx_hash) { - trace!( - target: LOG_TARGET, - ?tx_hash, - "error: replace_transaction_in_views: no listener for watched transaction" - ); - return; - } - let submit_futures = { let active_views = self.active_views.read(); let inactive_views = self.inactive_views.read(); @@ -836,7 +777,6 @@ where source.clone(), xt.clone(), tx_hash, - watched, ) }) .collect::>() diff --git a/substrate/client/transaction-pool/src/graph/listener.rs b/substrate/client/transaction-pool/src/graph/listener.rs index 7b09ee4c6409..0e70334ea0e2 100644 --- a/substrate/client/transaction-pool/src/graph/listener.rs +++ b/substrate/client/transaction-pool/src/graph/listener.rs @@ -29,10 +29,13 @@ use super::{watcher, BlockHash, ChainApi, ExtrinsicHash}; static LOG_TARGET: &str = "txpool::watcher"; -/// Single event used in dropped by limits stream. It is one of Ready/Future/Dropped. -pub type DroppedByLimitsEvent = (H, TransactionStatus); -/// Stream of events used to determine if a transaction was dropped. -pub type DroppedByLimitsStream = TracingUnboundedReceiver>; +/// Single event used in aggregated stream. Tuple containing hash of transactions and its status. +pub type TransactionStatusEvent = (H, TransactionStatus); +/// Stream of events providing statuses of all the transactions within the pool. +pub type AggregatedStream = TracingUnboundedReceiver>; + +/// Warning threshold for (unbounded) channel used in aggregated stream. +const AGGREGATED_STREAM_WARN_THRESHOLD: usize = 100_000; /// Extrinsic pool default listener. pub struct Listener { @@ -40,10 +43,15 @@ pub struct Listener { watchers: HashMap>>, finality_watchers: LinkedHashMap, Vec>, - /// The sink used to notify dropped-by-enforcing-limits transactions. Also ready and future - /// statuses are reported via this channel to allow consumer of the stream tracking actual - /// drops. - dropped_by_limits_sink: Option>>>, + /// The sink used to notify dropped by enforcing limits or by being usurped transactions. + /// + /// Note: Ready and future statuses are alse communicated through this channel, enabling the + /// stream consumer to track views that reference the transaction. + dropped_stream_sink: Option>>>, + + /// The sink of the single, merged stream providing updates for all the transactions in the + /// associated pool. + aggregated_stream_sink: Option>>>, } /// Maximum number of blocks awaiting finality at any time. @@ -54,7 +62,8 @@ impl Default for Listener { Self { watchers: Default::default(), finality_watchers: Default::default(), - dropped_by_limits_sink: None, + dropped_stream_sink: None, + aggregated_stream_sink: None, } } } @@ -84,21 +93,60 @@ impl Listener DroppedByLimitsStream> { - let (sender, single_stream) = tracing_unbounded("mpsc_txpool_watcher", 100_000); - self.dropped_by_limits_sink = Some(sender); + /// The stream can be used to subscribe to events related to dropping of all extrinsics in the + /// pool. + pub fn create_dropped_by_limits_stream(&mut self) -> AggregatedStream> { + let (sender, single_stream) = + tracing_unbounded("mpsc_txpool_watcher", AGGREGATED_STREAM_WARN_THRESHOLD); + self.dropped_stream_sink = Some(sender); single_stream } - /// Notify the listeners about extrinsic broadcast. + /// Creates a new single merged stream for all extrinsics in the associated pool. + /// + /// The stream can be used to subscribe to life-cycle events of all extrinsics in the pool. For + /// some implementations (e.g. fork-aware pool) this approach may be more efficient than using + /// individual streams for every transaction. + /// + /// Note: some of the events which are currently ignored on the other side of this channel + /// (external watcher) are not sent. + pub fn create_aggregated_stream(&mut self) -> AggregatedStream> { + let (sender, aggregated_stream) = + tracing_unbounded("mpsc_txpool_aggregated_stream", AGGREGATED_STREAM_WARN_THRESHOLD); + self.aggregated_stream_sink = Some(sender); + aggregated_stream + } + + /// Notify the listeners about the extrinsic broadcast. pub fn broadcasted(&mut self, hash: &H, peers: Vec) { trace!(target: LOG_TARGET, "[{:?}] Broadcasted", hash); self.fire(hash, |watcher| watcher.broadcast(peers)); } + /// Sends given event to the `dropped_stream_sink`. + fn send_to_dropped_stream_sink(&mut self, tx: &H, status: TransactionStatus>) { + if let Some(ref sink) = self.dropped_stream_sink { + if let Err(e) = sink.unbounded_send((tx.clone(), status.clone())) { + trace!(target: LOG_TARGET, "[{:?}] dropped_sink: {:?} send message failed: {:?}", tx, status, e); + } + } + } + + /// Sends given event to the `aggregated_stream_sink`. + fn send_to_aggregated_stream_sink( + &mut self, + tx: &H, + status: TransactionStatus>, + ) { + if let Some(ref sink) = self.aggregated_stream_sink { + if let Err(e) = sink.unbounded_send((tx.clone(), status.clone())) { + trace!(target: LOG_TARGET, "[{:?}] aggregated_stream {:?} send message failed: {:?}", tx, status, e); + } + } + } + /// New transaction was added to the ready pool or promoted from the future pool. pub fn ready(&mut self, tx: &H, old: Option<&H>) { trace!(target: LOG_TARGET, "[{:?}] Ready (replaced with {:?})", tx, old); @@ -107,22 +155,17 @@ impl Listener Listener Listener Listener MAX_FINALITY_WATCHERS { if let Some((hash, txs)) = self.finality_watchers.pop_front() { for tx in txs { self.fire(&tx, |watcher| watcher.finality_timeout(hash)); + //todo: do we need this? [related issue: #5482] + self.send_to_aggregated_stream_sink( + &tx, + TransactionStatus::FinalityTimeout(hash), + ); } } } @@ -188,7 +227,8 @@ impl Listener) { if let Some(hashes) = self.finality_watchers.remove(&block_hash) { for hash in hashes { - self.fire(&hash, |watcher| watcher.retracted(block_hash)) + self.fire(&hash, |watcher| watcher.retracted(block_hash)); + // note: [#5479], we do not send to aggregated stream. } } } diff --git a/substrate/client/transaction-pool/src/graph/mod.rs b/substrate/client/transaction-pool/src/graph/mod.rs index 2114577f4dee..c3161799785a 100644 --- a/substrate/client/transaction-pool/src/graph/mod.rs +++ b/substrate/client/transaction-pool/src/graph/mod.rs @@ -46,7 +46,9 @@ pub use validated_pool::{ }; pub(crate) use self::pool::CheckBannedBeforeVerify; -pub(crate) use listener::DroppedByLimitsEvent; +pub(crate) use listener::TransactionStatusEvent; +#[cfg(doc)] +pub(crate) use listener::AggregatedStream; #[cfg(doc)] pub(crate) use validated_pool::ValidatedPool; diff --git a/substrate/client/transaction-pool/src/graph/tracked_map.rs b/substrate/client/transaction-pool/src/graph/tracked_map.rs index fe15c6eca308..ca1ee035cf37 100644 --- a/substrate/client/transaction-pool/src/graph/tracked_map.rs +++ b/substrate/client/transaction-pool/src/graph/tracked_map.rs @@ -120,11 +120,6 @@ where pub fn len(&self) -> usize { self.inner_guard.len() } - - /// Returns an iterator over all key-value pairs. - pub fn iter(&self) -> Iter<'_, K, V> { - self.inner_guard.iter() - } } pub struct TrackedMapWriteAccess<'a, K, V> { diff --git a/substrate/client/transaction-pool/src/graph/validated_pool.rs b/substrate/client/transaction-pool/src/graph/validated_pool.rs index bbfcb9b40aca..9631a27ead93 100644 --- a/substrate/client/transaction-pool/src/graph/validated_pool.rs +++ b/substrate/client/transaction-pool/src/graph/validated_pool.rs @@ -747,12 +747,20 @@ impl ValidatedPool { self.listener.write().retracted(block_hash) } + /// Refer to [`Listener::create_dropped_by_limits_stream`] for details. pub fn create_dropped_by_limits_stream( &self, - ) -> super::listener::DroppedByLimitsStream, BlockHash> { + ) -> super::listener::AggregatedStream, BlockHash> { self.listener.write().create_dropped_by_limits_stream() } + /// Refer to [`Listener::create_aggregated_stream`] + pub fn create_aggregated_stream( + &self, + ) -> super::listener::AggregatedStream, BlockHash> { + self.listener.write().create_aggregated_stream() + } + /// Resends ready and future events for all the ready and future transactions that are already /// in the pool. /// diff --git a/substrate/client/transaction-pool/tests/fatp_common/mod.rs b/substrate/client/transaction-pool/tests/fatp_common/mod.rs index 530c25caf88e..20178fdc7c4e 100644 --- a/substrate/client/transaction-pool/tests/fatp_common/mod.rs +++ b/substrate/client/transaction-pool/tests/fatp_common/mod.rs @@ -203,8 +203,8 @@ macro_rules! assert_future_iterator { ($hash:expr, $pool:expr, [$( $xt:expr ),*]) => {{ let futures = $pool.futures_at($hash).unwrap(); let expected = vec![ $($pool.api().hash_and_length(&$xt).0),*]; - log::debug!(target:LOG_TARGET, "expected: {:#?}", futures); - log::debug!(target:LOG_TARGET, "output: {:#?}", expected); + log::debug!(target:LOG_TARGET, "expected: {:#?}", expected); + log::debug!(target:LOG_TARGET, "output: {:#?}", futures); assert_eq!(expected.len(), futures.len()); let hsf = futures.iter().map(|a| a.hash).collect::>(); let hse = expected.into_iter().collect::>(); diff --git a/substrate/client/transaction-pool/tests/fatp_limits.rs b/substrate/client/transaction-pool/tests/fatp_limits.rs index fb02b21ebc2b..50e75e1e28e7 100644 --- a/substrate/client/transaction-pool/tests/fatp_limits.rs +++ b/substrate/client/transaction-pool/tests/fatp_limits.rs @@ -377,12 +377,11 @@ fn fatp_limits_watcher_view_can_drop_transcation() { assert_eq!(xt0_status, vec![TransactionStatus::Ready, TransactionStatus::Dropped,]); assert_ready_iterator!(header01.hash(), pool, [xt1, xt2]); + let xt3_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap(); let header02 = api.push_block_with_parent(header01.hash(), vec![], true); block_on(pool.maintain(finalized_block_event(&pool, api.genesis_hash(), header02.hash()))); - let xt3_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap(); - let xt1_status = futures::executor::block_on_stream(xt1_watcher).take(2).collect::>(); assert_eq!(xt1_status, vec![TransactionStatus::Ready, TransactionStatus::Dropped]);