From e6b4dd3866cc0ee5efe6d306f49d09db7e6134f8 Mon Sep 17 00:00:00 2001 From: carllin Date: Fri, 17 Sep 2021 16:55:53 -0700 Subject: [PATCH] Add bank to banking stage regardless of if there is a working bank (#19855) --- core/src/banking_stage.rs | 49 ++++++++++++++++++--------- poh/src/poh_recorder.rs | 70 ++++++++++++++++++++++++++++++++------- 2 files changed, 91 insertions(+), 28 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 6ea21db992..31a8cb0a43 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -16,7 +16,7 @@ use solana_perf::{ packet::{limited_deserialize, Packet, Packets, PACKETS_PER_BATCH}, perf_libs, }; -use solana_poh::poh_recorder::{PohRecorder, PohRecorderError, TransactionRecorder}; +use solana_poh::poh_recorder::{BankStart, PohRecorder, PohRecorderError, TransactionRecorder}; use solana_runtime::{ accounts_db::ErrorCounters, bank::{ @@ -433,15 +433,19 @@ impl BankingStage { ) } else { let bank_start = poh_recorder.lock().unwrap().bank_start(); - if let Some((bank, bank_creation_time)) = bank_start { + if let Some(BankStart { + working_bank, + bank_creation_time, + }) = bank_start + { Self::reset_cost_tracker_if_new_bank( cost_tracker, - bank.slot(), + working_bank.slot(), banking_stage_stats, ); let (processed, verified_txs_len, new_unprocessed_indexes) = Self::process_packets_transactions( - &bank, + &working_bank, &bank_creation_time, recorder, msgs, @@ -457,8 +461,10 @@ impl BankingStage { max_tx_ingestion_ns, ) { - reached_end_of_slot = - Some((poh_recorder.lock().unwrap().next_slot_leader(), bank)); + reached_end_of_slot = Some(( + poh_recorder.lock().unwrap().next_slot_leader(), + working_bank, + )); } new_tx_count += processed; // Out of the buffered packets just retried, collect any still unprocessed @@ -563,16 +569,17 @@ impl BankingStage { ) = { let poh = poh_recorder.lock().unwrap(); bank_start = poh.bank_start(); - if let Some((ref bank, _)) = bank_start { + if let Some(ref bank_start) = bank_start { Self::reset_cost_tracker_if_new_bank( cost_tracker, - bank.slot(), + bank_start.working_bank.slot(), banking_stage_stats, ); }; + ( poh.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET), - PohRecorder::get_bank_still_processing_txs(&bank_start), + PohRecorder::get_working_bank_if_not_expired(&bank_start.as_ref()), poh.would_be_leader(HOLD_TRANSACTIONS_SLOT_OFFSET * DEFAULT_TICKS_PER_SLOT), poh.would_be_leader( (FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET - 1) * DEFAULT_TICKS_PER_SLOT, @@ -1375,8 +1382,9 @@ impl BankingStage { let mut newly_buffered_packets_count = 0; while let Some(msgs) = mms_iter.next() { let packet_indexes = Self::generate_packet_indexes(&msgs.packets); - let bank_start = poh.lock().unwrap().bank_start(); - if PohRecorder::get_bank_still_processing_txs(&bank_start).is_none() { + let poh_recorder_bank = poh.lock().unwrap().get_poh_recorder_bank(); + let working_bank_start = poh_recorder_bank.working_bank_start(); + if PohRecorder::get_working_bank_if_not_expired(&working_bank_start).is_none() { Self::push_unprocessed( buffered_packets, msgs, @@ -1390,13 +1398,22 @@ impl BankingStage { ); continue; } - let (bank, bank_creation_time) = bank_start.unwrap(); - Self::reset_cost_tracker_if_new_bank(cost_tracker, bank.slot(), banking_stage_stats); + + // Destructure the `BankStart` behind an Arc + let BankStart { + working_bank, + bank_creation_time, + } = &*working_bank_start.unwrap(); + Self::reset_cost_tracker_if_new_bank( + cost_tracker, + working_bank.slot(), + banking_stage_stats, + ); let (processed, verified_txs_len, unprocessed_indexes) = Self::process_packets_transactions( - &bank, - &bank_creation_time, + working_bank, + bank_creation_time, recorder, &msgs, packet_indexes, @@ -1430,7 +1447,7 @@ impl BankingStage { while let Some(msgs) = mms_iter.next() { let packet_indexes = Self::generate_packet_indexes(&msgs.packets); let unprocessed_indexes = Self::filter_unprocessed_packets( - &bank, + working_bank, &msgs, &packet_indexes, my_pubkey, diff --git a/poh/src/poh_recorder.rs b/poh/src/poh_recorder.rs index d6387130b3..581bebc112 100644 --- a/poh/src/poh_recorder.rs +++ b/poh/src/poh_recorder.rs @@ -61,7 +61,24 @@ pub enum PohRecorderError { type Result = std::result::Result; pub type WorkingBankEntry = (Arc, (Entry, u64)); -pub type BankStart = (Arc, Arc); + +pub struct BankStart { + pub working_bank: Arc, + pub bank_creation_time: Arc, +} + +impl BankStart { + fn get_working_bank_if_not_expired(&self) -> Option<&Arc> { + if Bank::should_bank_still_be_processing_txs( + &self.bank_creation_time, + self.working_bank.ns_per_slot, + ) { + Some(&self.working_bank) + } else { + None + } + } +} pub struct Record { pub mixin: Hash, @@ -147,6 +164,27 @@ impl TransactionRecorder { } } +pub enum PohRecorderBank { + WorkingBank(BankStart), + LastResetBank(Arc), +} + +impl PohRecorderBank { + pub fn bank(&self) -> &Arc { + match self { + PohRecorderBank::WorkingBank(bank_start) => &bank_start.working_bank, + PohRecorderBank::LastResetBank(last_reset_bank) => last_reset_bank, + } + } + + pub fn working_bank_start(&self) -> Option<&BankStart> { + match self { + PohRecorderBank::WorkingBank(bank_start) => Some(bank_start), + PohRecorderBank::LastResetBank(_last_reset_bank) => None, + } + } +} + #[derive(Clone)] pub struct WorkingBank { pub bank: Arc, @@ -245,9 +283,10 @@ impl PohRecorder { } pub fn bank_start(&self) -> Option { - self.working_bank - .as_ref() - .map(|w| (w.bank.clone(), w.start.clone())) + self.working_bank.as_ref().map(|w| BankStart { + working_bank: w.bank.clone(), + bank_creation_time: w.start.clone(), + }) } pub fn has_bank(&self) -> bool { @@ -714,16 +753,23 @@ impl PohRecorder { ) } + pub fn get_poh_recorder_bank(&self) -> PohRecorderBank { + let bank_start = self.bank_start(); + if let Some(bank_start) = bank_start { + PohRecorderBank::WorkingBank(bank_start) + } else { + PohRecorderBank::LastResetBank(self.start_bank.clone()) + } + } + // Filters the return result of PohRecorder::bank_start(), returns the bank // if it's still processing transactions - pub fn get_bank_still_processing_txs(bank_start: &Option) -> Option<&Arc> { - bank_start.as_ref().and_then(|(bank, bank_creation_time)| { - if Bank::should_bank_still_be_processing_txs(bank_creation_time, bank.ns_per_slot) { - Some(bank) - } else { - None - } - }) + pub fn get_working_bank_if_not_expired<'a, 'b>( + bank_start: &'b Option<&'a BankStart>, + ) -> Option<&'a Arc> { + bank_start + .as_ref() + .and_then(|bank_start| bank_start.get_working_bank_if_not_expired()) } // Used in tests