From 41bbc11a469defe5ec70ab30420449cc1447c585 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Wed, 16 Feb 2022 18:08:03 +0000 Subject: [PATCH] flag end-of-slot when poh bank is gone (backport #23069) (#23174) * flag end-of-slot when poh bank is gone (cherry picked from commit 03bf66a51b31dfc74d84446064feafc7b00b74df) # Conflicts: # core/src/banking_stage.rs * merge fix Co-authored-by: Tao Zhu --- core/src/banking_stage.rs | 73 ++++++++++++++++++++++++--------------- 1 file changed, 46 insertions(+), 27 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 1837336cfc..47afe29823 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -316,6 +316,12 @@ pub struct BatchedTransactionCostDetails { pub batched_execute_cost: u64, } +#[derive(Debug, Default)] +struct EndOfSlot { + next_slot_leader: Option, + working_bank: Option>, +} + /// Stores the stage's thread handle and output receiver. pub struct BankingStage { bank_thread_hdls: Vec>, @@ -510,38 +516,44 @@ impl BankingStage { let mut consumed_buffered_packets_count = 0; let buffered_packet_batches_len = buffered_packet_batches.len(); let mut proc_start = Measure::start("consume_buffered_process"); - let mut reached_end_of_slot = None; + let mut reached_end_of_slot: Option = None; buffered_packet_batches.retain_mut(|buffered_packet_batch_and_offsets| { let (packet_batch, ref mut original_unprocessed_indexes, _forwarded) = buffered_packet_batch_and_offsets; - if let Some((next_leader, bank)) = &reached_end_of_slot { + if let Some(end_of_slot) = &reached_end_of_slot { // We've hit the end of this slot, no need to perform more processing, // just filter the remaining packets for the invalid (e.g. too old) ones - let new_unprocessed_indexes = Self::filter_unprocessed_packets_at_end_of_slot( - bank, - packet_batch, - original_unprocessed_indexes, - my_pubkey, - *next_leader, - banking_stage_stats, - ); - let end_of_slot_filtered_invalid_count = original_unprocessed_indexes - .len() - .saturating_sub(new_unprocessed_indexes.len()); + // if the working_bank is available + if let Some(bank) = &end_of_slot.working_bank { + let new_unprocessed_indexes = Self::filter_unprocessed_packets_at_end_of_slot( + bank, + packet_batch, + original_unprocessed_indexes, + my_pubkey, + end_of_slot.next_slot_leader, + banking_stage_stats, + ); - slot_metrics_tracker.increment_end_of_slot_filtered_invalid_count( - end_of_slot_filtered_invalid_count as u64, - ); + let end_of_slot_filtered_invalid_count = original_unprocessed_indexes + .len() + .saturating_sub(new_unprocessed_indexes.len()); - banking_stage_stats - .end_of_slot_filtered_invalid_count - .fetch_add(end_of_slot_filtered_invalid_count, Ordering::Relaxed); + slot_metrics_tracker.increment_end_of_slot_filtered_invalid_count( + end_of_slot_filtered_invalid_count as u64, + ); - Self::update_buffered_packets_with_new_unprocessed( - original_unprocessed_indexes, - new_unprocessed_indexes, - ) + banking_stage_stats + .end_of_slot_filtered_invalid_count + .fetch_add(end_of_slot_filtered_invalid_count, Ordering::Relaxed); + + Self::update_buffered_packets_with_new_unprocessed( + original_unprocessed_indexes, + new_unprocessed_indexes, + ) + } else { + true + } } else { let bank_start = poh_recorder.lock().unwrap().bank_start(); if let Some(BankStart { @@ -573,10 +585,10 @@ impl BankingStage { max_tx_ingestion_ns, ) { - reached_end_of_slot = Some(( - poh_recorder.lock().unwrap().next_slot_leader(), - working_bank, - )); + reached_end_of_slot = Some(EndOfSlot { + next_slot_leader: poh_recorder.lock().unwrap().next_slot_leader(), + working_bank: Some(working_bank), + }); } // The difference between all transactions passed to execution and the ones that @@ -603,6 +615,13 @@ impl BankingStage { } has_more_unprocessed_transactions } else { + // mark as end-of-slot to avoid aggressively lock poh for the remaining for + // packet batches in buffer + reached_end_of_slot = Some(EndOfSlot { + next_slot_leader: poh_recorder.lock().unwrap().next_slot_leader(), + working_bank: None, + }); + // `original_unprocessed_indexes` must have remaining packets to process // if not yet processed. assert!(Self::packet_has_more_unprocessed_transactions(