Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txview: run status and age checks on incoming transactions #4506

Merged
merged 9 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 69 additions & 7 deletions core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use {
super::{
scheduler_metrics::{SchedulerCountMetrics, SchedulerTimingMetrics},
transaction_priority_id::TransactionPriorityId,
transaction_state::TransactionState,
transaction_state_container::{
SharedBytes, StateContainer, TransactionViewState, TransactionViewStateContainer,
EXTRA_CAPACITY,
},
},
crate::banking_stage::{
Expand Down Expand Up @@ -406,8 +408,57 @@ impl TransactionViewReceiveAndBuffer {

let mut num_received = 0usize;
let mut num_buffered = 0usize;
let mut num_dropped_on_status_age_checks = 0usize;
let mut num_dropped_on_capacity = 0usize;
let mut num_dropped_on_receive = 0usize;

// Create temporary batches of transactions to be age-checked.
let mut transaction_ids = ArrayVec::<_, EXTRA_CAPACITY>::new();
let lock_results: [_; EXTRA_CAPACITY] = core::array::from_fn(|_| Ok(()));
let mut error_counters = TransactionErrorMetrics::default();

let mut run_status_age_checks =
|container: &mut TransactionViewStateContainer,
transaction_ids: &mut ArrayVec<usize, 64>| {
// Temporary scope so that transaction references are immediately
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

complexity here can go away once we have Bytes backed transactions coming from upstream, since we do not need to do the weird "insert to map only" pattern.

// dropped and transactions not passing
let check_results = {
let mut transactions = ArrayVec::<_, EXTRA_CAPACITY>::new();
transactions.extend(transaction_ids.iter().map(|id| {
&container
.get_transaction_ttl(*id)
.expect("transaction must exist")
.transaction
}));
working_bank.check_transactions::<RuntimeTransaction<_>>(
&transactions,
&lock_results,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's currently fine to call check_transactions with lock_results that are longer than the transactions slice but I think it would be better to pass a slice that is exactly the same length as transactions. What do you think?

MAX_PROCESSING_AGE,
&mut error_counters,
)
};

// Remove all invalid transactions from the map; insert passing
// ids into the priority queue.
for (transaction_id, check_result) in transaction_ids.drain(..).zip(check_results) {
if check_result.is_ok() {
let priority = container
.get_mut_transaction_state(transaction_id)
.expect("transaction must exist")
.priority();
if container.push_id_into_queue(TransactionPriorityId::new(
priority,
transaction_id,
)) {
num_dropped_on_capacity += 1;
}
} else {
num_dropped_on_status_age_checks += 1;
container.remove_by_id(transaction_id);
}
}
};

for packet_batch in packet_batch_message.iter() {
for packet in packet_batch.iter() {
let Some(packet_data) = packet.data(..) else {
Expand All @@ -417,9 +468,8 @@ impl TransactionViewReceiveAndBuffer {
num_received += 1;

// Reserve free-space to copy packet into, run sanitization checks, and insert.
if container.try_insert_with_data(
packet_data,
|bytes| match Self::try_handle_packet(
if let Some(transaction_id) = container.try_insert_with_data(packet_data, |bytes| {
match Self::try_handle_packet(
bytes,
root_bank,
working_bank,
Expand All @@ -435,20 +485,32 @@ impl TransactionViewReceiveAndBuffer {
num_dropped_on_receive += 1;
Err(())
}
},
) {
num_dropped_on_capacity += 1;
};
}
}) {
transaction_ids.push(transaction_id);

// If at capacity, run checks and remove invalid transactions.
if transaction_ids.len() == EXTRA_CAPACITY {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EXTRA_CAPACITY is effectively a "batch size" now, right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - it's a "safe" batch size. nothing breaks if you use more, but using this batch size you will not do any allocations to push in container.

run_status_age_checks(container, &mut transaction_ids);
}
}
}
}

// Any remaining packets undergo status/age checks
run_status_age_checks(container, &mut transaction_ids);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's check if transactions_ids is empty first


let buffer_time_us = start.elapsed().as_micros() as u64;
timing_metrics.update(|timing_metrics| {
saturating_add_assign!(timing_metrics.buffer_time_us, buffer_time_us);
});
count_metrics.update(|count_metrics| {
saturating_add_assign!(count_metrics.num_received, num_received);
saturating_add_assign!(count_metrics.num_buffered, num_buffered);
saturating_add_assign!(
count_metrics.num_dropped_on_age_and_status,
num_dropped_on_status_age_checks
);
saturating_add_assign!(
count_metrics.num_dropped_on_capacity,
num_dropped_on_capacity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,12 @@ pub(crate) trait StateContainer<Tx: TransactionWithMeta> {
fn get_min_max_priority(&self) -> MinMaxResult<u64>;
}

// Extra capacity is added because some additional space is needed when
// pushing a new transaction into the container to avoid reallocation.
pub(crate) const EXTRA_CAPACITY: usize = 64;

impl<Tx: TransactionWithMeta> StateContainer<Tx> for TransactionStateContainer<Tx> {
fn with_capacity(capacity: usize) -> Self {
// Extra capacity is added because some additional space is needed when
// pushing a new transaction into the container to avoid reallocation.
const EXTRA_CAPACITY: usize = 64;
Self {
priority_queue: MinMaxHeap::with_capacity(capacity),
id_to_transaction_state: Slab::with_capacity(capacity + EXTRA_CAPACITY),
Expand Down Expand Up @@ -214,15 +215,13 @@ pub struct TransactionViewStateContainer {
}

impl TransactionViewStateContainer {
/// Returns true if packet was dropped due to capacity limits.
// Insert into the map, but NOT into the priority queue.
// Returns the id of the transaction if it was inserted.
pub(crate) fn try_insert_with_data(
&mut self,
data: &[u8],
f: impl FnOnce(SharedBytes) -> Result<TransactionState<RuntimeTransactionView>, ()>,
) -> bool {
// Get remaining capacity before inserting.
let remaining_capacity = self.remaining_capacity();

) -> Option<usize> {
// Get a vacant entry in the slab.
let vacant_entry = self.inner.get_vacant_map_entry();
let transaction_id = vacant_entry.key();
Expand All @@ -248,16 +247,11 @@ impl TransactionViewStateContainer {
}

// Attempt to insert the transaction.
match f(Arc::clone(bytes_entry)) {
Ok(state) => {
let priority_id = TransactionPriorityId::new(state.priority(), transaction_id);
vacant_entry.insert(state);

// Push the transaction into the queue.
self.inner
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No longer push into the queue here. We just let up to 64 (see EXTRA_CAPACITY const) additional transactions to live in the map. Once we reach end of incoming tx stream or hit 64 packets we run age checks and only THEN do we insert into the priority queue.
This means that txs that are already processed or too old will not push out transactions that can be processed.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think fn remaining_capacity needs to be reworked because with this change we will often have 0 remaining capacity and always be popping from the priority queue even if the priority queue length was actually less than its capacity.

If the priority capacity was 100 and we only had 36 elements in the transaction slab, then a new batch of 64 transactions would get added to the slab and remaining_capacity would return 0 when inserting into the priority queue. This means we would always pop an element and end up with only 36 elements in the queue when we should have 100.

If I'm reading this change correctly, we used to get the remaining capacity before inserting but no longer do that, causing this new issue to emerge.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah 100% right; I need to add some tests for this and fix the issue with remaining capacity.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we'd end up with 36 packets though? We'd end up with 99 (obviously still not ideal).

  • Initial capacity = 100. Extra capacity = 64 => Slab capacity = 164
  • 36 packets in queue/slab. => 64 remanining_capacity at start
  • receive 64 packets, all entered into the slab. Slab length now 100.
  • we run checks
  • when we go to insert to queue in a loop. First tx insert, remaining_capacity = 0. So we drop a packet. Now slab length is 99.
  • rest of the loop the slab length will be 99, and remaining packets will be inserted.

but we've also possibly dropped the wrong packet here. everything already in queue and first tx in in received batch could be high priority, with remaining 63 being low priority.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could solve w/ the following:

  • store desired capacity separately (or derive by - EXTRA_CAPACITY)
  • insert all into slab
  • insert all into queue
  • pop min from queue until desired size

that way we are always popping the lowest known item(s) instead of strictly keeping at capacity. because even if fixing the off-by-one issue, we'd still potentially be dropping non-optimally for the received batch.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we'd end up with 36 packets though? We'd end up with 99 (obviously still not ideal).

Yeah this looks correct for the example I gave, my mistake. I should have given this example:

If the priority capacity was 100 and we had 68 elements in the transaction slab, after inserting a new batch of 64 pending transactions into the slab, the remaining_capacity would return 0 when inserting into the priority queue. This is an issue because we actually have capacity to add 32 new transactions to the priority queue. But for the first 32 pending transactions that we try to insert into the priority queue, we will pop the min value of (next-pending-tx, set of priority queue txs). But that's not great because it could be that the first 32 transactions in the batch of 64 pending transactions are all high priority and we would end up dropping 31 of them.

That said, I think your proposed solution makes sense. We could have a new method on TransactionStateContainer for pushing a batch of transactions into the priority queue and then drain from the container until remaining_capacity is within the max capacity.

.push_id_into_queue_with_remaining_capacity(priority_id, remaining_capacity)
}
Err(_) => false,
if let Ok(state) = f(Arc::clone(bytes_entry)) {
vacant_entry.insert(state);
Some(transaction_id)
} else {
None
}
}
}
Expand Down