Fix a batch limits bug in banking (#23327)
* add thread index in thread name for debugging * fix batch_limit * use NUM_VOTE_THREAD instead of hardcoded number (credit to carllin)
This commit is contained in:
@ -90,6 +90,7 @@ const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 128;
|
|||||||
|
|
||||||
const NUM_VOTE_PROCESSING_THREADS: u32 = 2;
|
const NUM_VOTE_PROCESSING_THREADS: u32 = 2;
|
||||||
const MIN_THREADS_BANKING: u32 = 1;
|
const MIN_THREADS_BANKING: u32 = 1;
|
||||||
|
const MIN_TOTAL_THREADS: u32 = NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING;
|
||||||
|
|
||||||
pub struct ProcessTransactionBatchOutput {
|
pub struct ProcessTransactionBatchOutput {
|
||||||
// The number of transactions filtered out by the cost model
|
// The number of transactions filtered out by the cost model
|
||||||
@ -403,13 +404,14 @@ impl BankingStage {
|
|||||||
gossip_vote_sender: ReplayVoteSender,
|
gossip_vote_sender: ReplayVoteSender,
|
||||||
cost_model: Arc<RwLock<CostModel>>,
|
cost_model: Arc<RwLock<CostModel>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - 1) as usize * PACKETS_PER_BATCH);
|
assert!(num_threads >= MIN_TOTAL_THREADS);
|
||||||
// Single thread to generate entries from many banks.
|
// Single thread to generate entries from many banks.
|
||||||
// This thread talks to poh_service and broadcasts the entries once they have been recorded.
|
// This thread talks to poh_service and broadcasts the entries once they have been recorded.
|
||||||
// Once an entry has been recorded, its blockhash is registered with the bank.
|
// Once an entry has been recorded, its blockhash is registered with the bank.
|
||||||
let data_budget = Arc::new(DataBudget::default());
|
let data_budget = Arc::new(DataBudget::default());
|
||||||
|
let batch_limit = TOTAL_BUFFERED_PACKETS
|
||||||
|
/ ((num_threads - NUM_VOTE_PROCESSING_THREADS) as usize * PACKETS_PER_BATCH);
|
||||||
// Many banks that process transactions in parallel.
|
// Many banks that process transactions in parallel.
|
||||||
assert!(num_threads >= NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING);
|
|
||||||
let bank_thread_hdls: Vec<JoinHandle<()>> = (0..num_threads)
|
let bank_thread_hdls: Vec<JoinHandle<()>> = (0..num_threads)
|
||||||
.map(|i| {
|
.map(|i| {
|
||||||
let (verified_receiver, forward_option) = match i {
|
let (verified_receiver, forward_option) = match i {
|
||||||
@ -433,7 +435,7 @@ impl BankingStage {
|
|||||||
let data_budget = data_budget.clone();
|
let data_budget = data_budget.clone();
|
||||||
let cost_model = cost_model.clone();
|
let cost_model = cost_model.clone();
|
||||||
Builder::new()
|
Builder::new()
|
||||||
.name("solana-banking-stage-tx".to_string())
|
.name(format!("solana-banking-stage-tx-{}", i))
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
Self::process_loop(
|
Self::process_loop(
|
||||||
&verified_receiver,
|
&verified_receiver,
|
||||||
@ -1064,7 +1066,7 @@ impl BankingStage {
|
|||||||
env::var("SOLANA_BANKING_THREADS")
|
env::var("SOLANA_BANKING_THREADS")
|
||||||
.map(|x| x.parse().unwrap_or(NUM_THREADS))
|
.map(|x| x.parse().unwrap_or(NUM_THREADS))
|
||||||
.unwrap_or(NUM_THREADS),
|
.unwrap_or(NUM_THREADS),
|
||||||
NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING,
|
MIN_TOTAL_THREADS,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user