Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
1. make cost model a private member of banking stage, shared between
Browse files Browse the repository at this point in the history
   working threads;
2. banking stage utilizes CostModel when load/execute transaction, if it exceedds limit,
transaction are being put into retry list; Cost Model is reset when a
new slot starts;
  • Loading branch information
tao-stones committed May 10, 2021
1 parent 4b60b28 commit 8a3dae3
Show file tree
Hide file tree
Showing 5 changed files with 741 additions and 2 deletions.
4 changes: 3 additions & 1 deletion core/benches/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use rayon::prelude::*;
use solana_core::banking_stage::{create_test_recorder, BankingStage, BankingStageStats};
use solana_core::cluster_info::ClusterInfo;
use solana_core::cluster_info::Node;
use solana_core::cost_model::CostModel;
use solana_core::poh_recorder::WorkingBankEntry;
use solana_ledger::blockstore_processor::process_entries;
use solana_ledger::entry::{next_hash, Entry};
Expand All @@ -32,7 +33,7 @@ use solana_sdk::transaction::Transaction;
use std::collections::VecDeque;
use std::sync::atomic::Ordering;
use std::sync::mpsc::Receiver;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use test::Bencher;

Expand Down Expand Up @@ -91,6 +92,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
None::<Box<dyn Fn()>>,
&BankingStageStats::default(),
&recorder,
&Arc::new(Mutex::new(CostModel::new())),
);
});

Expand Down
32 changes: 31 additions & 1 deletion core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//! can do its processing in parallel with signature verification on the GPU.
use crate::{
cluster_info::ClusterInfo,
cost_model::CostModel,
packet_hasher::PacketHasher,
poh_recorder::{PohRecorder, PohRecorderError, TransactionRecorder, WorkingBankEntry},
poh_service::{self, PohService},
Expand Down Expand Up @@ -260,6 +261,7 @@ impl BankingStage {
LruCache::new(DEFAULT_LRU_SIZE),
PacketHasher::default(),
)));
let cost_model = Arc::new(Mutex::new(CostModel::new()));
// Many banks that process transactions in parallel.
let bank_thread_hdls: Vec<JoinHandle<()>> = (0..num_threads)
.map(|i| {
Expand All @@ -276,6 +278,7 @@ impl BankingStage {
let transaction_status_sender = transaction_status_sender.clone();
let gossip_vote_sender = gossip_vote_sender.clone();
let duplicates = duplicates.clone();
let cost_model = cost_model.clone();
Builder::new()
.name("solana-banking-stage-tx".to_string())
.spawn(move || {
Expand All @@ -291,6 +294,7 @@ impl BankingStage {
transaction_status_sender,
gossip_vote_sender,
&duplicates,
&cost_model,
);
})
.unwrap()
Expand Down Expand Up @@ -338,6 +342,7 @@ impl BankingStage {
has_more_unprocessed_transactions
}

#[allow(clippy::too_many_arguments)]
pub fn consume_buffered_packets(
my_pubkey: &Pubkey,
max_tx_ingestion_ns: u128,
Expand All @@ -348,6 +353,7 @@ impl BankingStage {
test_fn: Option<impl Fn()>,
banking_stage_stats: &BankingStageStats,
recorder: &TransactionRecorder,
cost_model: &Arc<Mutex<CostModel>>,
) {
let mut rebuffered_packets_len = 0;
let mut new_tx_count = 0;
Expand All @@ -365,6 +371,7 @@ impl BankingStage {
&original_unprocessed_indexes,
my_pubkey,
*next_leader,
cost_model,
);
Self::update_buffered_packets_with_new_unprocessed(
original_unprocessed_indexes,
Expand All @@ -373,6 +380,7 @@ impl BankingStage {
} else {
let bank_start = poh_recorder.lock().unwrap().bank_start();
if let Some((bank, bank_creation_time)) = bank_start {
cost_model.lock().unwrap().reset_if_new_bank(bank.slot());
let (processed, verified_txs_len, new_unprocessed_indexes) =
Self::process_packets_transactions(
&bank,
Expand All @@ -383,6 +391,7 @@ impl BankingStage {
transaction_status_sender.clone(),
gossip_vote_sender,
banking_stage_stats,
cost_model,
);
if processed < verified_txs_len
|| !Bank::should_bank_still_be_processing_txs(
Expand Down Expand Up @@ -485,6 +494,7 @@ impl BankingStage {
gossip_vote_sender: &ReplayVoteSender,
banking_stage_stats: &BankingStageStats,
recorder: &TransactionRecorder,
cost_model: &Arc<Mutex<CostModel>>,
) -> BufferedPacketsDecision {
let bank_start;
let (
Expand All @@ -495,6 +505,9 @@ impl BankingStage {
) = {
let poh = poh_recorder.lock().unwrap();
bank_start = poh.bank_start();
if let Some((ref bank, _)) = bank_start {
cost_model.lock().unwrap().reset_if_new_bank(bank.slot());
};
(
poh.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET),
PohRecorder::get_bank_still_processing_txs(&bank_start),
Expand Down Expand Up @@ -525,6 +538,7 @@ impl BankingStage {
None::<Box<dyn Fn()>>,
banking_stage_stats,
recorder,
cost_model,
);
}
BufferedPacketsDecision::Forward => {
Expand Down Expand Up @@ -595,6 +609,7 @@ impl BankingStage {
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
cost_model: &Arc<Mutex<CostModel>>,
) {
let recorder = poh_recorder.lock().unwrap().recorder();
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
Expand All @@ -613,6 +628,7 @@ impl BankingStage {
&gossip_vote_sender,
&banking_stage_stats,
&recorder,
cost_model,
);
if matches!(decision, BufferedPacketsDecision::Hold)
|| matches!(decision, BufferedPacketsDecision::ForwardAndHold)
Expand Down Expand Up @@ -647,6 +663,7 @@ impl BankingStage {
&banking_stage_stats,
duplicates,
&recorder,
cost_model,
) {
Ok(()) | Err(RecvTimeoutError::Timeout) => (),
Err(RecvTimeoutError::Disconnected) => break,
Expand Down Expand Up @@ -892,12 +909,12 @@ impl BankingStage {
) -> (usize, Vec<usize>) {
let mut chunk_start = 0;
let mut unprocessed_txs = vec![];

while chunk_start != transactions.len() {
let chunk_end = std::cmp::min(
transactions.len(),
chunk_start + MAX_NUM_TRANSACTIONS_PER_BATCH,
);

let (result, retryable_txs_in_chunk) = Self::process_and_record_transactions(
bank,
&transactions[chunk_start..chunk_end],
Expand Down Expand Up @@ -984,6 +1001,7 @@ impl BankingStage {
msgs: &Packets,
transaction_indexes: &[usize],
secp256k1_program_enabled: bool,
cost_model: &Arc<Mutex<CostModel>>,
) -> (Vec<HashedTransaction<'static>>, Vec<usize>) {
transaction_indexes
.iter()
Expand All @@ -993,6 +1011,7 @@ impl BankingStage {
if secp256k1_program_enabled {
tx.verify_precompiles().ok()?;
}
cost_model.lock().unwrap().try_to_add_transaction(&tx)?;
let message_bytes = Self::packet_message(p)?;
let message_hash = Message::hash_raw_message(message_bytes);
Some((
Expand Down Expand Up @@ -1051,12 +1070,14 @@ impl BankingStage {
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
banking_stage_stats: &BankingStageStats,
cost_model: &Arc<Mutex<CostModel>>,
) -> (usize, usize, Vec<usize>) {
let mut packet_conversion_time = Measure::start("packet_conversion");
let (transactions, transaction_to_packet_indexes) = Self::transactions_from_packets(
msgs,
&packet_indexes,
bank.secp256k1_program_enabled(),
cost_model,
);
packet_conversion_time.stop();

Expand Down Expand Up @@ -1114,6 +1135,7 @@ impl BankingStage {
transaction_indexes: &[usize],
my_pubkey: &Pubkey,
next_leader: Option<Pubkey>,
cost_model: &Arc<Mutex<CostModel>>,
) -> Vec<usize> {
// Check if we are the next leader. If so, let's not filter the packets
// as we'll filter it again while processing the packets.
Expand All @@ -1128,6 +1150,7 @@ impl BankingStage {
msgs,
&transaction_indexes,
bank.secp256k1_program_enabled(),
cost_model,
);

let tx_count = transaction_to_packet_indexes.len();
Expand Down Expand Up @@ -1179,6 +1202,7 @@ impl BankingStage {
banking_stage_stats: &BankingStageStats,
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
recorder: &TransactionRecorder,
cost_model: &Arc<Mutex<CostModel>>,
) -> Result<(), RecvTimeoutError> {
let mut recv_time = Measure::start("process_packets_recv");
let mms = verified_receiver.recv_timeout(recv_timeout)?;
Expand Down Expand Up @@ -1217,6 +1241,7 @@ impl BankingStage {
continue;
}
let (bank, bank_creation_time) = bank_start.unwrap();
cost_model.lock().unwrap().reset_if_new_bank(bank.slot());

let (processed, verified_txs_len, unprocessed_indexes) =
Self::process_packets_transactions(
Expand All @@ -1228,6 +1253,7 @@ impl BankingStage {
transaction_status_sender.clone(),
gossip_vote_sender,
banking_stage_stats,
cost_model,
);

new_tx_count += processed;
Expand Down Expand Up @@ -1258,6 +1284,7 @@ impl BankingStage {
&packet_indexes,
&my_pubkey,
next_leader,
cost_model,
);
Self::push_unprocessed(
buffered_packets,
Expand Down Expand Up @@ -2569,6 +2596,7 @@ mod tests {
None::<Box<dyn Fn()>>,
&BankingStageStats::default(),
&recorder,
&Arc::new(Mutex::new(CostModel::new())),
);
assert_eq!(buffered_packets[0].1.len(), num_conflicting_transactions);
// When the poh recorder has a bank, should process all non conflicting buffered packets.
Expand All @@ -2585,6 +2613,7 @@ mod tests {
None::<Box<dyn Fn()>>,
&BankingStageStats::default(),
&recorder,
&Arc::new(Mutex::new(CostModel::new())),
);
if num_expected_unprocessed == 0 {
assert!(buffered_packets.is_empty())
Expand Down Expand Up @@ -2650,6 +2679,7 @@ mod tests {
test_fn,
&BankingStageStats::default(),
&recorder,
&Arc::new(Mutex::new(CostModel::new())),
);

// Check everything is correct. All indexes after `interrupted_iteration`
Expand Down
Loading

0 comments on commit 8a3dae3

Please sign in to comment.