From 8b3db5580626a3ebc8c60699587b9e8fff9187e3 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Thu, 16 Jan 2025 14:49:12 -0600 Subject: [PATCH 1/9] run status and age checks on incoming transactions --- .../receive_and_buffer.rs | 76 +++++++++++++++++-- .../transaction_state_container.rs | 30 +++----- 2 files changed, 81 insertions(+), 25 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs index aa59c97b33f90b..5b8bff8806e0f1 100644 --- a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs +++ b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs @@ -1,9 +1,11 @@ use { super::{ scheduler_metrics::{SchedulerCountMetrics, SchedulerTimingMetrics}, + transaction_priority_id::TransactionPriorityId, transaction_state::TransactionState, transaction_state_container::{ SharedBytes, StateContainer, TransactionViewState, TransactionViewStateContainer, + EXTRA_CAPACITY, }, }, crate::banking_stage::{ @@ -406,8 +408,57 @@ impl TransactionViewReceiveAndBuffer { let mut num_received = 0usize; let mut num_buffered = 0usize; + let mut num_dropped_on_status_age_checks = 0usize; let mut num_dropped_on_capacity = 0usize; let mut num_dropped_on_receive = 0usize; + + // Create temporary batches of transactions to be age-checked. + let mut transaction_ids = ArrayVec::<_, EXTRA_CAPACITY>::new(); + let lock_results: [_; EXTRA_CAPACITY] = core::array::from_fn(|_| Ok(())); + let mut error_counters = TransactionErrorMetrics::default(); + + let mut run_status_age_checks = + |container: &mut TransactionViewStateContainer, + transaction_ids: &mut ArrayVec| { + // Temporary scope so that transaction references are immediately + // dropped and transactions not passing + let check_results = { + let mut transactions = ArrayVec::<_, EXTRA_CAPACITY>::new(); + transactions.extend(transaction_ids.iter().map(|id| { + &container + .get_transaction_ttl(*id) + .expect("transaction must exist") + .transaction + })); + working_bank.check_transactions::>( + &transactions, + &lock_results, + MAX_PROCESSING_AGE, + &mut error_counters, + ) + }; + + // Remove all invalid transactions from the map; insert passing + // ids into the priority queue. + for (transaction_id, check_result) in transaction_ids.drain(..).zip(check_results) { + if check_result.is_ok() { + let priority = container + .get_mut_transaction_state(transaction_id) + .expect("transaction must exist") + .priority(); + if container.push_id_into_queue(TransactionPriorityId::new( + priority, + transaction_id, + )) { + num_dropped_on_capacity += 1; + } + } else { + num_dropped_on_status_age_checks += 1; + container.remove_by_id(transaction_id); + } + } + }; + for packet_batch in packet_batch_message.iter() { for packet in packet_batch.iter() { let Some(packet_data) = packet.data(..) else { @@ -417,9 +468,8 @@ impl TransactionViewReceiveAndBuffer { num_received += 1; // Reserve free-space to copy packet into, run sanitization checks, and insert. - if container.try_insert_with_data( - packet_data, - |bytes| match Self::try_handle_packet( + if let Some(transaction_id) = container.try_insert_with_data(packet_data, |bytes| { + match Self::try_handle_packet( bytes, root_bank, working_bank, @@ -435,13 +485,21 @@ impl TransactionViewReceiveAndBuffer { num_dropped_on_receive += 1; Err(()) } - }, - ) { - num_dropped_on_capacity += 1; - }; + } + }) { + transaction_ids.push(transaction_id); + + // If at capacity, run checks and remove invalid transactions. + if transaction_ids.len() == EXTRA_CAPACITY { + run_status_age_checks(container, &mut transaction_ids); + } + } } } + // Any remaining packets undergo status/age checks + run_status_age_checks(container, &mut transaction_ids); + let buffer_time_us = start.elapsed().as_micros() as u64; timing_metrics.update(|timing_metrics| { saturating_add_assign!(timing_metrics.buffer_time_us, buffer_time_us); @@ -449,6 +507,10 @@ impl TransactionViewReceiveAndBuffer { count_metrics.update(|count_metrics| { saturating_add_assign!(count_metrics.num_received, num_received); saturating_add_assign!(count_metrics.num_buffered, num_buffered); + saturating_add_assign!( + count_metrics.num_dropped_on_age_and_status, + num_dropped_on_status_age_checks + ); saturating_add_assign!( count_metrics.num_dropped_on_capacity, num_dropped_on_capacity diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs index c9c8ddbde751e5..dc4a3afbeb8bd9 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs @@ -95,11 +95,12 @@ pub(crate) trait StateContainer { fn get_min_max_priority(&self) -> MinMaxResult; } +// Extra capacity is added because some additional space is needed when +// pushing a new transaction into the container to avoid reallocation. +pub(crate) const EXTRA_CAPACITY: usize = 64; + impl StateContainer for TransactionStateContainer { fn with_capacity(capacity: usize) -> Self { - // Extra capacity is added because some additional space is needed when - // pushing a new transaction into the container to avoid reallocation. - const EXTRA_CAPACITY: usize = 64; Self { priority_queue: MinMaxHeap::with_capacity(capacity), id_to_transaction_state: Slab::with_capacity(capacity + EXTRA_CAPACITY), @@ -214,15 +215,13 @@ pub struct TransactionViewStateContainer { } impl TransactionViewStateContainer { - /// Returns true if packet was dropped due to capacity limits. + // Insert into the map, but NOT into the priority queue. + // Returns the id of the transaction if it was inserted. pub(crate) fn try_insert_with_data( &mut self, data: &[u8], f: impl FnOnce(SharedBytes) -> Result, ()>, - ) -> bool { - // Get remaining capacity before inserting. - let remaining_capacity = self.remaining_capacity(); - + ) -> Option { // Get a vacant entry in the slab. let vacant_entry = self.inner.get_vacant_map_entry(); let transaction_id = vacant_entry.key(); @@ -248,16 +247,11 @@ impl TransactionViewStateContainer { } // Attempt to insert the transaction. - match f(Arc::clone(bytes_entry)) { - Ok(state) => { - let priority_id = TransactionPriorityId::new(state.priority(), transaction_id); - vacant_entry.insert(state); - - // Push the transaction into the queue. - self.inner - .push_id_into_queue_with_remaining_capacity(priority_id, remaining_capacity) - } - Err(_) => false, + if let Ok(state) = f(Arc::clone(bytes_entry)) { + vacant_entry.insert(state); + Some(transaction_id) + } else { + None } } } From 0245adafde557ca186539855f6f60924b6889864 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 27 Jan 2025 09:57:36 -0600 Subject: [PATCH 2/9] lock_results slice length limit --- .../banking_stage/transaction_scheduler/receive_and_buffer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs index 5b8bff8806e0f1..4b75c71ecbd2f3 100644 --- a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs +++ b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs @@ -432,7 +432,7 @@ impl TransactionViewReceiveAndBuffer { })); working_bank.check_transactions::>( &transactions, - &lock_results, + &lock_results[..transactions.len()], MAX_PROCESSING_AGE, &mut error_counters, ) From 1a35ec591ee7b7501a93d9547ba61914b6f3d939 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Fri, 31 Jan 2025 10:39:23 -0600 Subject: [PATCH 3/9] check_and_push_to_queue --- .../transaction_scheduler/receive_and_buffer.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs index 4b75c71ecbd2f3..04695ab2a86e38 100644 --- a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs +++ b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs @@ -417,7 +417,7 @@ impl TransactionViewReceiveAndBuffer { let lock_results: [_; EXTRA_CAPACITY] = core::array::from_fn(|_| Ok(())); let mut error_counters = TransactionErrorMetrics::default(); - let mut run_status_age_checks = + let mut check_and_push_to_queue = |container: &mut TransactionViewStateContainer, transaction_ids: &mut ArrayVec| { // Temporary scope so that transaction references are immediately @@ -491,14 +491,14 @@ impl TransactionViewReceiveAndBuffer { // If at capacity, run checks and remove invalid transactions. if transaction_ids.len() == EXTRA_CAPACITY { - run_status_age_checks(container, &mut transaction_ids); + check_and_push_to_queue(container, &mut transaction_ids); } } } } // Any remaining packets undergo status/age checks - run_status_age_checks(container, &mut transaction_ids); + check_and_push_to_queue(container, &mut transaction_ids); let buffer_time_us = start.elapsed().as_micros() as u64; timing_metrics.update(|timing_metrics| { From c90bebce0d430748bdb6ef298641d732dd235a7f Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Fri, 31 Jan 2025 10:40:14 -0600 Subject: [PATCH 4/9] try_insert_map_only_with_data --- .../receive_and_buffer.rs | 38 ++++++++++--------- .../transaction_state_container.rs | 6 +-- 2 files changed, 23 insertions(+), 21 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs index 04695ab2a86e38..37e195c858ccf9 100644 --- a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs +++ b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs @@ -468,25 +468,27 @@ impl TransactionViewReceiveAndBuffer { num_received += 1; // Reserve free-space to copy packet into, run sanitization checks, and insert. - if let Some(transaction_id) = container.try_insert_with_data(packet_data, |bytes| { - match Self::try_handle_packet( - bytes, - root_bank, - working_bank, - alt_resolved_slot, - sanitized_epoch, - transaction_account_lock_limit, - ) { - Ok(state) => { - num_buffered += 1; - Ok(state) + if let Some(transaction_id) = + container.try_insert_map_only_with_data(packet_data, |bytes| { + match Self::try_handle_packet( + bytes, + root_bank, + working_bank, + alt_resolved_slot, + sanitized_epoch, + transaction_account_lock_limit, + ) { + Ok(state) => { + num_buffered += 1; + Ok(state) + } + Err(()) => { + num_dropped_on_receive += 1; + Err(()) + } } - Err(()) => { - num_dropped_on_receive += 1; - Err(()) - } - } - }) { + }) + { transaction_ids.push(transaction_id); // If at capacity, run checks and remove invalid transactions. diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs index dc4a3afbeb8bd9..a51edb2bc3ae6e 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs @@ -215,9 +215,9 @@ pub struct TransactionViewStateContainer { } impl TransactionViewStateContainer { - // Insert into the map, but NOT into the priority queue. - // Returns the id of the transaction if it was inserted. - pub(crate) fn try_insert_with_data( + /// Insert into the map, but NOT into the priority queue. + /// Returns the id of the transaction if it was inserted. + pub(crate) fn try_insert_map_only_with_data( &mut self, data: &[u8], f: impl FnOnce(SharedBytes) -> Result, ()>, From bf9107b03b7c29eb41747be87307cd68dd031294 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Thu, 6 Feb 2025 15:53:49 -0600 Subject: [PATCH 5/9] push ids --- .../transaction_scheduler/greedy_scheduler.rs | 4 +- .../prio_graph_scheduler.rs | 11 +-- .../receive_and_buffer.rs | 91 ++++++++++--------- .../scheduler_controller.rs | 19 ++-- .../transaction_state_container.rs | 44 +++++++-- 5 files changed, 100 insertions(+), 69 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/greedy_scheduler.rs b/core/src/banking_stage/transaction_scheduler/greedy_scheduler.rs index 4484307ecf45e0..65b3ed36531048 100644 --- a/core/src/banking_stage/transaction_scheduler/greedy_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/greedy_scheduler.rs @@ -202,9 +202,7 @@ impl Scheduler for GreedyScheduler { ); // Push unschedulables back into the queue - for id in self.unschedulables.drain(..) { - container.push_id_into_queue(id); - } + container.push_ids_into_queue(self.unschedulables.drain(..)); Ok(SchedulingSummary { num_scheduled, diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index 54078166ebd1c8..9d5b3a08d1189c 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -298,14 +298,13 @@ impl Scheduler for PrioGraphScheduler { saturating_add_assign!(num_sent, self.send_batches(&mut batches)?); // Push unschedulable ids back into the container - for id in unschedulable_ids { - container.push_id_into_queue(id); - } + container.push_ids_into_queue(unschedulable_ids.into_iter()); // Push remaining transactions back into the container - while let Some((id, _)) = self.prio_graph.pop_and_unblock() { - container.push_id_into_queue(id); - } + container.push_ids_into_queue(std::iter::from_fn(|| { + self.prio_graph.pop_and_unblock().map(|(id, _)| id) + })); + // No more remaining items in the queue. // Clear here to make sure the next scheduling pass starts fresh // without detecting any conflicts. diff --git a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs index 37e195c858ccf9..97977a3683fb5a 100644 --- a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs +++ b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs @@ -413,51 +413,49 @@ impl TransactionViewReceiveAndBuffer { let mut num_dropped_on_receive = 0usize; // Create temporary batches of transactions to be age-checked. - let mut transaction_ids = ArrayVec::<_, EXTRA_CAPACITY>::new(); + let mut transaction_priority_ids = ArrayVec::<_, EXTRA_CAPACITY>::new(); let lock_results: [_; EXTRA_CAPACITY] = core::array::from_fn(|_| Ok(())); let mut error_counters = TransactionErrorMetrics::default(); - let mut check_and_push_to_queue = - |container: &mut TransactionViewStateContainer, - transaction_ids: &mut ArrayVec| { - // Temporary scope so that transaction references are immediately - // dropped and transactions not passing - let check_results = { - let mut transactions = ArrayVec::<_, EXTRA_CAPACITY>::new(); - transactions.extend(transaction_ids.iter().map(|id| { - &container - .get_transaction_ttl(*id) - .expect("transaction must exist") - .transaction - })); - working_bank.check_transactions::>( - &transactions, - &lock_results[..transactions.len()], - MAX_PROCESSING_AGE, - &mut error_counters, - ) - }; + let mut check_and_push_to_queue = |container: &mut TransactionViewStateContainer, + transaction_priority_ids: &mut ArrayVec< + TransactionPriorityId, + 64, + >| { + // Temporary scope so that transaction references are immediately + // dropped and transactions not passing + let check_results = { + let mut transactions = ArrayVec::<_, EXTRA_CAPACITY>::new(); + transactions.extend(transaction_priority_ids.iter().map(|priority_id| { + &container + .get_transaction_ttl(priority_id.id) + .expect("transaction must exist") + .transaction + })); + working_bank.check_transactions::>( + &transactions, + &lock_results[..transactions.len()], + MAX_PROCESSING_AGE, + &mut error_counters, + ) + }; - // Remove all invalid transactions from the map; insert passing - // ids into the priority queue. - for (transaction_id, check_result) in transaction_ids.drain(..).zip(check_results) { - if check_result.is_ok() { - let priority = container - .get_mut_transaction_state(transaction_id) - .expect("transaction must exist") - .priority(); - if container.push_id_into_queue(TransactionPriorityId::new( - priority, - transaction_id, - )) { - num_dropped_on_capacity += 1; - } - } else { - num_dropped_on_status_age_checks += 1; - container.remove_by_id(transaction_id); - } + // Remove errored transactions + for (result, priority_id) in check_results.iter().zip(transaction_priority_ids.iter()) { + if result.is_err() { + num_dropped_on_status_age_checks += 1; + container.remove_by_id(priority_id.id); } - }; + } + // Push non-errored transaction into queue. + num_dropped_on_capacity += container.push_ids_into_queue( + check_results + .into_iter() + .zip(transaction_priority_ids.drain(..)) + .filter(|(r, _)| r.is_ok()) + .map(|(_, id)| id), + ); + }; for packet_batch in packet_batch_message.iter() { for packet in packet_batch.iter() { @@ -489,18 +487,23 @@ impl TransactionViewReceiveAndBuffer { } }) { - transaction_ids.push(transaction_id); + let priority = container + .get_mut_transaction_state(transaction_id) + .expect("transaction must exist") + .priority(); + transaction_priority_ids + .push(TransactionPriorityId::new(priority, transaction_id)); // If at capacity, run checks and remove invalid transactions. - if transaction_ids.len() == EXTRA_CAPACITY { - check_and_push_to_queue(container, &mut transaction_ids); + if transaction_priority_ids.len() == EXTRA_CAPACITY { + check_and_push_to_queue(container, &mut transaction_priority_ids); } } } } // Any remaining packets undergo status/age checks - check_and_push_to_queue(container, &mut transaction_ids); + check_and_push_to_queue(container, &mut transaction_priority_ids); let buffer_time_us = start.elapsed().as_micros() as u64; timing_metrics.update(|timing_metrics| { diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs index 3394a2f9cc71e2..d7e7061bb0c246 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs @@ -327,9 +327,8 @@ where } if hold { - for priority_id in ids_to_add_back { - self.container.push_id_into_queue(priority_id); - } + self.container + .push_ids_into_queue(ids_to_add_back.into_iter()); } else { for priority_id in ids_to_add_back { self.container.remove_by_id(priority_id.id); @@ -393,14 +392,22 @@ where &mut error_counters, ); - for (result, id) in check_results.into_iter().zip(chunk.iter()) { + // Remove errored transactions + for (result, id) in check_results.iter().zip(chunk.iter()) { if result.is_err() { saturating_add_assign!(num_dropped_on_age_and_status, 1); self.container.remove_by_id(id.id); - } else { - self.container.push_id_into_queue(*id); } } + + // Push non-errored transaction into queue. + self.container.push_ids_into_queue( + check_results + .into_iter() + .zip(chunk.iter()) + .filter(|(r, _)| r.is_ok()) + .map(|(_, id)| *id), + ); } self.count_metrics.update(|count_metrics| { diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs index a51edb2bc3ae6e..b916998041ec36 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs @@ -44,6 +44,7 @@ use { /// The container maintains a fixed capacity. If the queue is full when pushing /// a new transaction, the lowest priority transaction will be dropped. pub(crate) struct TransactionStateContainer { + capacity: usize, priority_queue: MinMaxHeap, id_to_transaction_state: Slab>, } @@ -81,13 +82,19 @@ pub(crate) trait StateContainer { .expect("transaction must exist"); let priority_id = TransactionPriorityId::new(transaction_state.priority(), transaction_id); transaction_state.transition_to_unprocessed(transaction_ttl); - self.push_id_into_queue(priority_id); + self.push_ids_into_queue(std::iter::once(priority_id)); } - /// Pushes a transaction id into the priority queue. If the queue is full, the lowest priority - /// transaction will be dropped (removed from the queue and map). - /// Returns `true` if a packet was dropped due to capacity limits. - fn push_id_into_queue(&mut self, priority_id: TransactionPriorityId) -> bool; + /// Pushes transaction ids into the priority queue. If the queue if full, + /// the lowest priority transactions will be dropped (removed from the + /// queue and map) **after** all ids have been pushed. + /// To avoid allocating, the caller should not push more than + /// [`EXTRA_CAPACITY`] ids in a call. + /// Returns the number of dropped transactions. + fn push_ids_into_queue( + &mut self, + priority_ids: impl Iterator, + ) -> usize; /// Remove transaction by id. fn remove_by_id(&mut self, id: TransactionId); @@ -102,7 +109,8 @@ pub(crate) const EXTRA_CAPACITY: usize = 64; impl StateContainer for TransactionStateContainer { fn with_capacity(capacity: usize) -> Self { Self { - priority_queue: MinMaxHeap::with_capacity(capacity), + capacity, + priority_queue: MinMaxHeap::with_capacity(capacity + EXTRA_CAPACITY), id_to_transaction_state: Slab::with_capacity(capacity + EXTRA_CAPACITY), } } @@ -134,8 +142,21 @@ impl StateContainer for TransactionStateContainer bool { - self.push_id_into_queue_with_remaining_capacity(priority_id, self.remaining_capacity()) + fn push_ids_into_queue( + &mut self, + priority_ids: impl Iterator, + ) -> usize { + for id in priority_ids { + self.priority_queue.push(id); + } + let num_dropped = self.priority_queue.len().saturating_sub(self.capacity); + + for _ in 0..num_dropped { + let priority_id = self.priority_queue.pop_min().expect("queue is not empty"); + self.id_to_transaction_state.remove(priority_id.id); + } + + num_dropped } fn remove_by_id(&mut self, id: TransactionId) { @@ -301,8 +322,11 @@ impl StateContainer for TransactionViewStateContainer { } #[inline] - fn push_id_into_queue(&mut self, priority_id: TransactionPriorityId) -> bool { - self.inner.push_id_into_queue(priority_id) + fn push_ids_into_queue( + &mut self, + priority_ids: impl Iterator, + ) -> usize { + self.inner.push_ids_into_queue(priority_ids) } #[inline] From c8c0f1c046255942f6080355485f9813584098c3 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Thu, 6 Feb 2025 16:25:10 -0600 Subject: [PATCH 6/9] test_view_push_ids_to_queue --- .../transaction_state_container.rs | 68 ++++++++++++++++++- 1 file changed, 67 insertions(+), 1 deletion(-) diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs index b916998041ec36..0a97ff5da1a27f 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs @@ -345,6 +345,7 @@ mod tests { use { super::*, crate::banking_stage::scheduler_messages::MaxAge, + agave_transaction_view::transaction_view::SanitizedTransactionView, solana_runtime_transaction::runtime_transaction::RuntimeTransaction, solana_sdk::{ compute_budget::ComputeBudgetInstruction, @@ -354,8 +355,9 @@ mod tests { signature::Keypair, signer::Signer, system_instruction, - transaction::{SanitizedTransaction, Transaction}, + transaction::{MessageHash, SanitizedTransaction, Transaction}, }, + std::collections::HashSet, }; /// Returns (transaction_ttl, priority, cost) @@ -442,4 +444,68 @@ mod tests { .get_mut_transaction_state(non_existing_id) .is_none()); } + + #[test] + fn test_view_push_ids_to_queue() { + let mut container = TransactionViewStateContainer::with_capacity(2); + + let reserved_addresses = HashSet::default(); + let packet_parser = |data, priority, cost| { + let view = SanitizedTransactionView::try_new_sanitized(data).unwrap(); + let view = RuntimeTransaction::>::try_from( + view, + MessageHash::Compute, + None, + ) + .unwrap(); + let view = RuntimeTransaction::>::try_from( + view, + None, + &reserved_addresses, + ) + .unwrap(); + + Ok(TransactionState::new( + SanitizedTransactionTTL { + transaction: view, + max_age: MaxAge::MAX, + }, + None, + priority, + cost, + )) + }; + + // Push 2 transactions into the queue so buffer is full. + for priority in [4, 5] { + let (_transaction_ttl, packet, priority, cost) = test_transaction(priority); + let id = container + .try_insert_map_only_with_data(packet.original_packet().data(..).unwrap(), |data| { + packet_parser(data, priority, cost) + }) + .unwrap(); + let priority_id = TransactionPriorityId::new(priority, id); + assert_eq!( + container.push_ids_into_queue(std::iter::once(priority_id)), + 0 + ); + } + + // Push 5 additional packets in. 5 should be dropped. + let mut priority_ids = Vec::with_capacity(5); + for priority in [10, 11, 12, 1, 2] { + let (_transaction_ttl, packet, priority, cost) = test_transaction(priority); + let id = container + .try_insert_map_only_with_data(packet.original_packet().data(..).unwrap(), |data| { + packet_parser(data, priority, cost) + }) + .unwrap(); + let priority_id = TransactionPriorityId::new(priority, id); + priority_ids.push(priority_id); + } + assert_eq!(container.push_ids_into_queue(priority_ids.into_iter()), 5); + assert_eq!(container.pop().unwrap().priority, 12); + assert_eq!(container.pop().unwrap().priority, 11); + assert!(container.pop().is_none()); + } } From 7f2addfb0df9af25f48cd36415e50e5a0096024e Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Thu, 6 Feb 2025 19:04:13 -0600 Subject: [PATCH 7/9] fix test, remove remaining_capacity --- .../transaction_state_container.rs | 34 +------------------ 1 file changed, 1 insertion(+), 33 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs index 0a97ff5da1a27f..f57b4a10350c60 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs @@ -56,9 +56,6 @@ pub(crate) trait StateContainer { /// Returns true if the queue is empty. fn is_empty(&self) -> bool; - /// Returns the remaining capacity of the container - fn remaining_capacity(&self) -> usize; - /// Get the top transaction id in the priority queue. fn pop(&mut self) -> Option; @@ -119,12 +116,6 @@ impl StateContainer for TransactionStateContainer usize { - self.priority_queue - .capacity() - .saturating_sub(self.id_to_transaction_state.len()) - } - fn pop(&mut self) -> Option { self.priority_queue.pop_max() } @@ -184,9 +175,6 @@ impl TransactionStateContainer { priority: u64, cost: u64, ) -> bool { - // cache the remaining capacity **before** we take ownership of - // the next vacant entry. i.e. get the size before we insert. - let remaining_capacity = self.remaining_capacity(); let priority_id = { let entry = self.get_vacant_map_entry(); let transaction_id = entry.key(); @@ -199,22 +187,7 @@ impl TransactionStateContainer { TransactionPriorityId::new(priority, transaction_id) }; - self.push_id_into_queue_with_remaining_capacity(priority_id, remaining_capacity) - } - - fn push_id_into_queue_with_remaining_capacity( - &mut self, - priority_id: TransactionPriorityId, - remaining_capacity: usize, - ) -> bool { - if remaining_capacity == 0 { - let popped_id = self.priority_queue.push_pop_min(priority_id); - self.remove_by_id(popped_id.id); - true - } else { - self.priority_queue.push(priority_id); - false - } + self.push_ids_into_queue(std::iter::once(priority_id)) > 0 } fn get_vacant_map_entry(&mut self) -> VacantEntry> { @@ -295,11 +268,6 @@ impl StateContainer for TransactionViewStateContainer { self.inner.is_empty() } - #[inline] - fn remaining_capacity(&self) -> usize { - self.inner.remaining_capacity() - } - #[inline] fn pop(&mut self) -> Option { self.inner.pop() From 9094202273d26d423966ed28f11d839305bda104 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 11 Feb 2025 16:14:47 -0600 Subject: [PATCH 8/9] bound by map len, add test --- .../transaction_state_container.rs | 27 ++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs index f57b4a10350c60..ceca4b5a7e26a8 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs @@ -140,7 +140,15 @@ impl StateContainer for TransactionStateContainer Date: Tue, 11 Feb 2025 16:22:15 -0600 Subject: [PATCH 9/9] earlier fee check --- .../receive_and_buffer.rs | 86 +++++++++++-------- 1 file changed, 50 insertions(+), 36 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs index 97977a3683fb5a..6aef38c473a5e0 100644 --- a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs +++ b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs @@ -417,45 +417,59 @@ impl TransactionViewReceiveAndBuffer { let lock_results: [_; EXTRA_CAPACITY] = core::array::from_fn(|_| Ok(())); let mut error_counters = TransactionErrorMetrics::default(); - let mut check_and_push_to_queue = |container: &mut TransactionViewStateContainer, - transaction_priority_ids: &mut ArrayVec< - TransactionPriorityId, - 64, - >| { - // Temporary scope so that transaction references are immediately - // dropped and transactions not passing - let check_results = { - let mut transactions = ArrayVec::<_, EXTRA_CAPACITY>::new(); - transactions.extend(transaction_priority_ids.iter().map(|priority_id| { - &container + let mut check_and_push_to_queue = + |container: &mut TransactionViewStateContainer, + transaction_priority_ids: &mut ArrayVec| { + // Temporary scope so that transaction references are immediately + // dropped and transactions not passing + let mut check_results = { + let mut transactions = ArrayVec::<_, EXTRA_CAPACITY>::new(); + transactions.extend(transaction_priority_ids.iter().map(|priority_id| { + &container + .get_transaction_ttl(priority_id.id) + .expect("transaction must exist") + .transaction + })); + working_bank.check_transactions::>( + &transactions, + &lock_results[..transactions.len()], + MAX_PROCESSING_AGE, + &mut error_counters, + ) + }; + + // Remove errored transactions + for (result, priority_id) in check_results + .iter_mut() + .zip(transaction_priority_ids.iter()) + { + if result.is_err() { + num_dropped_on_status_age_checks += 1; + container.remove_by_id(priority_id.id); + } + let transaction = &container .get_transaction_ttl(priority_id.id) .expect("transaction must exist") - .transaction - })); - working_bank.check_transactions::>( - &transactions, - &lock_results[..transactions.len()], - MAX_PROCESSING_AGE, - &mut error_counters, - ) - }; - - // Remove errored transactions - for (result, priority_id) in check_results.iter().zip(transaction_priority_ids.iter()) { - if result.is_err() { - num_dropped_on_status_age_checks += 1; - container.remove_by_id(priority_id.id); + .transaction; + if let Err(err) = Consumer::check_fee_payer_unlocked( + working_bank, + transaction, + &mut error_counters, + ) { + *result = Err(err); + num_dropped_on_status_age_checks += 1; + container.remove_by_id(priority_id.id); + } } - } - // Push non-errored transaction into queue. - num_dropped_on_capacity += container.push_ids_into_queue( - check_results - .into_iter() - .zip(transaction_priority_ids.drain(..)) - .filter(|(r, _)| r.is_ok()) - .map(|(_, id)| id), - ); - }; + // Push non-errored transaction into queue. + num_dropped_on_capacity += container.push_ids_into_queue( + check_results + .into_iter() + .zip(transaction_priority_ids.drain(..)) + .filter(|(r, _)| r.is_ok()) + .map(|(_, id)| id), + ); + }; for packet_batch in packet_batch_message.iter() { for packet in packet_batch.iter() {