Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fatxpool: do not use individual transaction listeners #7316

Open
wants to merge 18 commits into
base: master
Choose a base branch
from

Conversation

michalkucharczyk
Copy link
Contributor

@michalkucharczyk michalkucharczyk commented Jan 23, 2025

Description

During 2s block investigation it turned out that ForkAwareTxPool::register_listeners call takes significant amount of time.

register_listeners: at HashAndNumber { number: 12, hash: 0xe9a1...0b1d2 } took 200.041933ms
register_listeners: at HashAndNumber { number: 13, hash: 0x5eb8...a87c6 } took 264.487414ms
register_listeners: at HashAndNumber { number: 14, hash: 0x30cb...2e6ec } took 340.525566ms
register_listeners: at HashAndNumber { number: 15, hash: 0x0450...4f05c } took 405.686659ms
register_listeners: at HashAndNumber { number: 16, hash: 0xfa6f...16c20 } took 477.977836ms
register_listeners: at HashAndNumber { number: 17, hash: 0x5474...5d0c1 } took 483.046029ms
register_listeners: at HashAndNumber { number: 18, hash: 0x3ca5...37b78 } took 482.715468ms
register_listeners: at HashAndNumber { number: 19, hash: 0xbfcc...df254 } took 484.206999ms
register_listeners: at HashAndNumber { number: 20, hash: 0xd748...7f027 } took 414.635236ms
register_listeners: at HashAndNumber { number: 21, hash: 0x2baa...f66b5 } took 418.015897ms
register_listeners: at HashAndNumber { number: 22, hash: 0x5f1d...282b5 } took 423.342397ms
register_listeners: at HashAndNumber { number: 23, hash: 0x7a18...f2d03 } took 472.742939ms
register_listeners: at HashAndNumber { number: 24, hash: 0xc381...3fd07 } took 489.625557ms

This PR implements the idea outlined in #7071. Instead of having a separate listener for every transaction in each view, we now use a single stream of aggregated events per view, with each stream providing events for all transactions in that view. Each event is represented as a tuple: (transaction-hash, transaction-status). This significantly reduce the time required for maintain.

Review Notes

  • single aggregated stream, provided by the individual view delivers events in form of (transaction-hash, transaction-status),
  • MultiViewListener now has a task. This task is responsible for:
    • polling the stream map (which consists of individual view's aggregated streams) and the controller_receiver which provides side-channel commands (like AddView or FinalizeTransaction) sent from the transaction pool.
    • dispatching individual transaction statuses and control commands into the external (created via API, e.g. over RPC) listeners of individual transactions,
  • external listener is responsible for status handling logic (e.g. deduplication of events, or ignoring some of them) and triggering statuses to external world (this was not changed).
  • level of debug messages was adjusted (per-tx messages shall be trace),

Closes #7071

@michalkucharczyk michalkucharczyk marked this pull request as draft January 23, 2025 16:15
@michalkucharczyk michalkucharczyk added R0-silent Changes should not be mentioned in any release notes T0-node This PR/Issue is related to the topic “node”. labels Jan 23, 2025
@michalkucharczyk michalkucharczyk marked this pull request as ready for review January 29, 2025 11:45
substrate/client/transaction-pool/src/graph/listener.rs Outdated Show resolved Hide resolved
aggregated_stream
}

/// Notify the listeners about the extrinsic broadcast.
pub fn broadcasted(&mut self, hash: &H, peers: Vec<String>) {
trace!(target: LOG_TARGET, "[{:?}] Broadcasted", hash);
self.fire(hash, |watcher| watcher.broadcast(peers));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So broadcasted event is a bit different and we handle it directly in the upper layers without involvement of the validated_pool?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is directly call on the pool by the networking. There is dedicated method for this:

/// Notify the pool about transactions broadcast.
fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>);

For new pool, we are just triggering event:

fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>) {
self.view_store.listener.transactions_broadcasted(propagations);
}

For the old pool, this went through validated_pool's listener:

fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>) {
self.pool.validated_pool().on_broadcasted(propagations)
}

pub fn on_broadcasted(&self, propagated: HashMap<ExtrinsicHash<B>, Vec<String>>) {
let mut listener = self.listener.write();
for (hash, peers) in propagated.into_iter() {
listener.broadcasted(&hash, peers);
}
}

>::default()));

let (tx, rx) = mpsc::tracing_unbounded("txpool-multi-view-listener-task-controller", 32);
let task = Self::task(external_controllers.clone(), rx);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find that the flow is not so easy to follow. I thought about giving the task sole ownership of the external_controllers. This would get rid of the mutex. But requires some additional messages to manage the external streams list. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also thinking about adding messages here. But at the end I decided that it will be more complex then having mutex.

After all the flow is not that complex. We need mutex to add external watcher controller (sink) into the map. This addition is made from the context of submit_and_watch call.

I don't know. If you think messages will make code more readable, I can give it a try (my little concern is proper order of processing, but probably should be fine).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking adding message passing is more complex too (as in we'll need to add APIs for sending/receiving messages, and for handling them), and at the same time it would make the logic around external_controllers easier to read/comprehend. However, we can also add a comment mentioning that the only place where we're writing is submit_and_watch and I would find it sufficient.

I am thinking that there might be some benefits of message passing over taking and holding the mutex - which can create contention if submit_and_watch is called (very) frequently - although, if rate-limitting is implemented at some upper level, this wouldn't be an issue.

All in all, I transform this to message passing solely from the performance consideration, if my reasoning is correct. I would also add the comment specifying where the external_controllers is written from.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am considering using: DashMap which replacement for RwLock<HashMap>. Need to take a deeper look on this.

self.listener.write().create_dropped_by_limits_stream()
}

/// Refer to [`Listener::create_aggregated_stream`]
pub fn create_aggregated_stream(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general these changes mean that the submit_and_watch codepath is only used by the single state pool, so we can remove all of that once the fatxpool is the default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes!

The only concern I have is that now we are also sending events for transactions that are not watched. So theoretically we are creating a unnecessary traffic in the aggregated stream on collators for gossiped transactions. (All notifications are simply dropped in the task, because there are no external controllers in the map).

I think it is not significant overhead (I would even say negligible), but maybe 🤔 we should block it. This will involve populating the hashmap with transactions which are allowed to be notified in the aggregated stream. But for now I would leave it as it is proposed.

Copy link
Contributor

@iulianbarbu iulianbarbu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some stylistic comments and some questions/opinions. LGTM but I will approve after clarifying the comments (if we don't need other changes than the ones I suggested).

Controller<ExternalWatcherCommand<ChainApi>>,
>::default()));

let (tx, rx) = mpsc::tracing_unbounded("txpool-multi-view-listener-task-controller", 512);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please extract the 512 into a constant variable?

return None
}

trace!(target: LOG_TARGET, "[{:?}] create_external_watcher_for_tx", tx_hash);

let (tx, rx) = mpsc::tracing_unbounded("txpool-multi-view-listener", 32);
controllers.insert(tx_hash, tx);
let (tx, rx) = mpsc::tracing_unbounded("txpool-multi-view-listener", 128);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to have 128 extracted in a dedicated const.

trace!(target: LOG_TARGET, "[{:?}] dropped_sink: send message failed: {:?}", tx, e);
}
}
self.send_to_dropped_stream_sink(tx, TransactionStatus::Dropped);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why aren't we sending to the aggregated stream sink here as well?

trace!(target: LOG_TARGET, "[{:?}] dropped_sink: send message failed: {:?}", tx, e);
}
}
self.send_to_dropped_stream_sink(tx, TransactionStatus::Usurped(by.clone()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why aren't we sending to the aggregated stream sink here as well?

/// (external watcher) are not sent.
pub fn create_aggregated_stream(&mut self) -> AggregatedStream<H, BlockHash<C>> {
let (sender, aggregated_stream) =
tracing_unbounded("mpsc_txpool_aggregated_stream", 100_000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be great to have a const variable with 100_000.

pub fn create_dropped_by_limits_stream(&mut self) -> DroppedByLimitsStream<H, BlockHash<C>> {
/// The stream can be used to subscribe to events related to dropping of all extrinsics in the
/// pool.
pub fn create_dropped_by_limits_stream(&mut self) -> AggregatedStream<H, BlockHash<C>> {
let (sender, single_stream) = tracing_unbounded("mpsc_txpool_watcher", 100_000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to have 100_000 placed in a dedicated const here too.

Comment on lines +44 to +45
/// ready and future statuses are reported via this channel to allow consumer of the stream
/// tracking actual drops.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does it mean to allow consumer of the stream tracking actual drops? How does ready & future events on the same stream improve the dropped tracking?

>,
mut command_receiver: CommandReceiver<ControllerCommand<ChainApi>>,
) {
let mut aggregated_streams_map: StreamMap<BlockHash<ChainApi>, ViewStatusStream<ChainApi>> =
Copy link
Contributor

@iulianbarbu iulianbarbu Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dq: Should we be concerned in practice with the size of this StreamMap? It increases when adding a new view, so in theory it can grow to arbitrary sizes, but in practice it might not get to extreme sizes due back pressure in other parts of the pool (for which I am having a hard time to reason about). The docs mention that it works best with a smallish number of streams as all entries are scanned on insert, remove, and polling.

>::default()));

let (tx, rx) = mpsc::tracing_unbounded("txpool-multi-view-listener-task-controller", 32);
let task = Self::task(external_controllers.clone(), rx);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking adding message passing is more complex too (as in we'll need to add APIs for sending/receiving messages, and for handling them), and at the same time it would make the logic around external_controllers easier to read/comprehend. However, we can also add a comment mentioning that the only place where we're writing is submit_and_watch and I would find it sufficient.

I am thinking that there might be some benefits of message passing over taking and holding the mutex - which can create contention if submit_and_watch is called (very) frequently - although, if rate-limitting is implemented at some upper level, this wouldn't be an issue.

All in all, I transform this to message passing solely from the performance consideration, if my reasoning is correct. I would also add the comment specifying where the external_controllers is written from.

let (tx, rx) = mpsc::tracing_unbounded("txpool-multi-view-listener", 32);
controllers.insert(tx_hash, tx);
let (tx, rx) = mpsc::tracing_unbounded("txpool-multi-view-listener", 128);
external_controllers.insert(tx_hash, tx);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking about dropping the lock right after this, to minimize how long the lock is held. WDYT?

@paritytech-workflow-stopper
Copy link

All GitHub workflows were cancelled due to failure one of the required jobs.
Failed workflow url: https://github.com/paritytech/polkadot-sdk/actions/runs/13075004797
Failed job name: test-linux-stable

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
R0-silent Changes should not be mentioned in any release notes T0-node This PR/Issue is related to the topic “node”.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

fatxpool: optimize per-transaction listeners
3 participants