From 03bf66a51b31dfc74d84446064feafc7b00b74df Mon Sep 17 00:00:00 2001 From: Tao Zhu Date: Thu, 10 Feb 2022 20:00:36 -0600 Subject: [PATCH] flag end-of-slot when poh bank is gone --- core/src/banking_stage.rs | 73 ++++++++++++++++++++++++--------------- 1 file changed, 46 insertions(+), 27 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 6539d4aadf..816d57babb 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -325,6 +325,12 @@ pub struct BatchedTransactionCostDetails { pub batched_execute_cost: u64, } +#[derive(Debug, Default)] +struct EndOfSlot { + next_slot_leader: Option, + working_bank: Option>, +} + /// Stores the stage's thread handle and output receiver. pub struct BankingStage { bank_thread_hdls: Vec>, @@ -519,41 +525,47 @@ impl BankingStage { let mut consumed_buffered_packets_count = 0; let buffered_packet_batches_len = buffered_packet_batches.len(); let mut proc_start = Measure::start("consume_buffered_process"); - let mut reached_end_of_slot = None; + let mut reached_end_of_slot: Option = None; RetainMut::retain_mut( buffered_packet_batches, |buffered_packet_batch_and_offsets| { let (packet_batch, ref mut original_unprocessed_indexes, _forwarded) = buffered_packet_batch_and_offsets; - if let Some((next_leader, bank)) = &reached_end_of_slot { + if let Some(end_of_slot) = &reached_end_of_slot { // We've hit the end of this slot, no need to perform more processing, // just filter the remaining packets for the invalid (e.g. too old) ones - let new_unprocessed_indexes = Self::filter_unprocessed_packets_at_end_of_slot( - bank, - packet_batch, - original_unprocessed_indexes, - my_pubkey, - *next_leader, - banking_stage_stats, - ); + // if the working_bank is available + if let Some(bank) = &end_of_slot.working_bank { + let new_unprocessed_indexes = + Self::filter_unprocessed_packets_at_end_of_slot( + bank, + packet_batch, + original_unprocessed_indexes, + my_pubkey, + end_of_slot.next_slot_leader, + banking_stage_stats, + ); - let end_of_slot_filtered_invalid_count = original_unprocessed_indexes - .len() - .saturating_sub(new_unprocessed_indexes.len()); + let end_of_slot_filtered_invalid_count = original_unprocessed_indexes + .len() + .saturating_sub(new_unprocessed_indexes.len()); - slot_metrics_tracker.increment_end_of_slot_filtered_invalid_count( - end_of_slot_filtered_invalid_count as u64, - ); + slot_metrics_tracker.increment_end_of_slot_filtered_invalid_count( + end_of_slot_filtered_invalid_count as u64, + ); - banking_stage_stats - .end_of_slot_filtered_invalid_count - .fetch_add(end_of_slot_filtered_invalid_count, Ordering::Relaxed); + banking_stage_stats + .end_of_slot_filtered_invalid_count + .fetch_add(end_of_slot_filtered_invalid_count, Ordering::Relaxed); - Self::update_buffered_packets_with_new_unprocessed( - original_unprocessed_indexes, - new_unprocessed_indexes, - ) + Self::update_buffered_packets_with_new_unprocessed( + original_unprocessed_indexes, + new_unprocessed_indexes, + ) + } else { + true + } } else { let bank_start = poh_recorder.lock().unwrap().bank_start(); if let Some(BankStart { @@ -586,10 +598,10 @@ impl BankingStage { max_tx_ingestion_ns, ) { - reached_end_of_slot = Some(( - poh_recorder.lock().unwrap().next_slot_leader(), - working_bank, - )); + reached_end_of_slot = Some(EndOfSlot { + next_slot_leader: poh_recorder.lock().unwrap().next_slot_leader(), + working_bank: Some(working_bank), + }); } // The difference between all transactions passed to execution and the ones that @@ -616,6 +628,13 @@ impl BankingStage { } has_more_unprocessed_transactions } else { + // mark as end-of-slot to avoid aggressively lock poh for the remaining for + // packet batches in buffer + reached_end_of_slot = Some(EndOfSlot { + next_slot_leader: poh_recorder.lock().unwrap().next_slot_leader(), + working_bank: None, + }); + // `original_unprocessed_indexes` must have remaining packets to process // if not yet processed. assert!(Self::packet_has_more_unprocessed_transactions(