Add bank to banking stage regardless of if there is a working bank (#19855)

This commit is contained in:
carllin
2021-09-17 16:55:53 -07:00
committed by GitHub
parent 4dc2f08198
commit e6b4dd3866
2 changed files with 91 additions and 28 deletions

View File

@ -16,7 +16,7 @@ use solana_perf::{
packet::{limited_deserialize, Packet, Packets, PACKETS_PER_BATCH}, packet::{limited_deserialize, Packet, Packets, PACKETS_PER_BATCH},
perf_libs, perf_libs,
}; };
use solana_poh::poh_recorder::{PohRecorder, PohRecorderError, TransactionRecorder}; use solana_poh::poh_recorder::{BankStart, PohRecorder, PohRecorderError, TransactionRecorder};
use solana_runtime::{ use solana_runtime::{
accounts_db::ErrorCounters, accounts_db::ErrorCounters,
bank::{ bank::{
@ -433,15 +433,19 @@ impl BankingStage {
) )
} else { } else {
let bank_start = poh_recorder.lock().unwrap().bank_start(); let bank_start = poh_recorder.lock().unwrap().bank_start();
if let Some((bank, bank_creation_time)) = bank_start { if let Some(BankStart {
working_bank,
bank_creation_time,
}) = bank_start
{
Self::reset_cost_tracker_if_new_bank( Self::reset_cost_tracker_if_new_bank(
cost_tracker, cost_tracker,
bank.slot(), working_bank.slot(),
banking_stage_stats, banking_stage_stats,
); );
let (processed, verified_txs_len, new_unprocessed_indexes) = let (processed, verified_txs_len, new_unprocessed_indexes) =
Self::process_packets_transactions( Self::process_packets_transactions(
&bank, &working_bank,
&bank_creation_time, &bank_creation_time,
recorder, recorder,
msgs, msgs,
@ -457,8 +461,10 @@ impl BankingStage {
max_tx_ingestion_ns, max_tx_ingestion_ns,
) )
{ {
reached_end_of_slot = reached_end_of_slot = Some((
Some((poh_recorder.lock().unwrap().next_slot_leader(), bank)); poh_recorder.lock().unwrap().next_slot_leader(),
working_bank,
));
} }
new_tx_count += processed; new_tx_count += processed;
// Out of the buffered packets just retried, collect any still unprocessed // Out of the buffered packets just retried, collect any still unprocessed
@ -563,16 +569,17 @@ impl BankingStage {
) = { ) = {
let poh = poh_recorder.lock().unwrap(); let poh = poh_recorder.lock().unwrap();
bank_start = poh.bank_start(); bank_start = poh.bank_start();
if let Some((ref bank, _)) = bank_start { if let Some(ref bank_start) = bank_start {
Self::reset_cost_tracker_if_new_bank( Self::reset_cost_tracker_if_new_bank(
cost_tracker, cost_tracker,
bank.slot(), bank_start.working_bank.slot(),
banking_stage_stats, banking_stage_stats,
); );
}; };
( (
poh.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET), poh.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET),
PohRecorder::get_bank_still_processing_txs(&bank_start), PohRecorder::get_working_bank_if_not_expired(&bank_start.as_ref()),
poh.would_be_leader(HOLD_TRANSACTIONS_SLOT_OFFSET * DEFAULT_TICKS_PER_SLOT), poh.would_be_leader(HOLD_TRANSACTIONS_SLOT_OFFSET * DEFAULT_TICKS_PER_SLOT),
poh.would_be_leader( poh.would_be_leader(
(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET - 1) * DEFAULT_TICKS_PER_SLOT, (FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET - 1) * DEFAULT_TICKS_PER_SLOT,
@ -1375,8 +1382,9 @@ impl BankingStage {
let mut newly_buffered_packets_count = 0; let mut newly_buffered_packets_count = 0;
while let Some(msgs) = mms_iter.next() { while let Some(msgs) = mms_iter.next() {
let packet_indexes = Self::generate_packet_indexes(&msgs.packets); let packet_indexes = Self::generate_packet_indexes(&msgs.packets);
let bank_start = poh.lock().unwrap().bank_start(); let poh_recorder_bank = poh.lock().unwrap().get_poh_recorder_bank();
if PohRecorder::get_bank_still_processing_txs(&bank_start).is_none() { let working_bank_start = poh_recorder_bank.working_bank_start();
if PohRecorder::get_working_bank_if_not_expired(&working_bank_start).is_none() {
Self::push_unprocessed( Self::push_unprocessed(
buffered_packets, buffered_packets,
msgs, msgs,
@ -1390,13 +1398,22 @@ impl BankingStage {
); );
continue; continue;
} }
let (bank, bank_creation_time) = bank_start.unwrap();
Self::reset_cost_tracker_if_new_bank(cost_tracker, bank.slot(), banking_stage_stats); // Destructure the `BankStart` behind an Arc
let BankStart {
working_bank,
bank_creation_time,
} = &*working_bank_start.unwrap();
Self::reset_cost_tracker_if_new_bank(
cost_tracker,
working_bank.slot(),
banking_stage_stats,
);
let (processed, verified_txs_len, unprocessed_indexes) = let (processed, verified_txs_len, unprocessed_indexes) =
Self::process_packets_transactions( Self::process_packets_transactions(
&bank, working_bank,
&bank_creation_time, bank_creation_time,
recorder, recorder,
&msgs, &msgs,
packet_indexes, packet_indexes,
@ -1430,7 +1447,7 @@ impl BankingStage {
while let Some(msgs) = mms_iter.next() { while let Some(msgs) = mms_iter.next() {
let packet_indexes = Self::generate_packet_indexes(&msgs.packets); let packet_indexes = Self::generate_packet_indexes(&msgs.packets);
let unprocessed_indexes = Self::filter_unprocessed_packets( let unprocessed_indexes = Self::filter_unprocessed_packets(
&bank, working_bank,
&msgs, &msgs,
&packet_indexes, &packet_indexes,
my_pubkey, my_pubkey,

View File

@ -61,7 +61,24 @@ pub enum PohRecorderError {
type Result<T> = std::result::Result<T, PohRecorderError>; type Result<T> = std::result::Result<T, PohRecorderError>;
pub type WorkingBankEntry = (Arc<Bank>, (Entry, u64)); pub type WorkingBankEntry = (Arc<Bank>, (Entry, u64));
pub type BankStart = (Arc<Bank>, Arc<Instant>);
pub struct BankStart {
pub working_bank: Arc<Bank>,
pub bank_creation_time: Arc<Instant>,
}
impl BankStart {
fn get_working_bank_if_not_expired(&self) -> Option<&Arc<Bank>> {
if Bank::should_bank_still_be_processing_txs(
&self.bank_creation_time,
self.working_bank.ns_per_slot,
) {
Some(&self.working_bank)
} else {
None
}
}
}
pub struct Record { pub struct Record {
pub mixin: Hash, pub mixin: Hash,
@ -147,6 +164,27 @@ impl TransactionRecorder {
} }
} }
pub enum PohRecorderBank {
WorkingBank(BankStart),
LastResetBank(Arc<Bank>),
}
impl PohRecorderBank {
pub fn bank(&self) -> &Arc<Bank> {
match self {
PohRecorderBank::WorkingBank(bank_start) => &bank_start.working_bank,
PohRecorderBank::LastResetBank(last_reset_bank) => last_reset_bank,
}
}
pub fn working_bank_start(&self) -> Option<&BankStart> {
match self {
PohRecorderBank::WorkingBank(bank_start) => Some(bank_start),
PohRecorderBank::LastResetBank(_last_reset_bank) => None,
}
}
}
#[derive(Clone)] #[derive(Clone)]
pub struct WorkingBank { pub struct WorkingBank {
pub bank: Arc<Bank>, pub bank: Arc<Bank>,
@ -245,9 +283,10 @@ impl PohRecorder {
} }
pub fn bank_start(&self) -> Option<BankStart> { pub fn bank_start(&self) -> Option<BankStart> {
self.working_bank self.working_bank.as_ref().map(|w| BankStart {
.as_ref() working_bank: w.bank.clone(),
.map(|w| (w.bank.clone(), w.start.clone())) bank_creation_time: w.start.clone(),
})
} }
pub fn has_bank(&self) -> bool { pub fn has_bank(&self) -> bool {
@ -714,16 +753,23 @@ impl PohRecorder {
) )
} }
pub fn get_poh_recorder_bank(&self) -> PohRecorderBank {
let bank_start = self.bank_start();
if let Some(bank_start) = bank_start {
PohRecorderBank::WorkingBank(bank_start)
} else {
PohRecorderBank::LastResetBank(self.start_bank.clone())
}
}
// Filters the return result of PohRecorder::bank_start(), returns the bank // Filters the return result of PohRecorder::bank_start(), returns the bank
// if it's still processing transactions // if it's still processing transactions
pub fn get_bank_still_processing_txs(bank_start: &Option<BankStart>) -> Option<&Arc<Bank>> { pub fn get_working_bank_if_not_expired<'a, 'b>(
bank_start.as_ref().and_then(|(bank, bank_creation_time)| { bank_start: &'b Option<&'a BankStart>,
if Bank::should_bank_still_be_processing_txs(bank_creation_time, bank.ns_per_slot) { ) -> Option<&'a Arc<Bank>> {
Some(bank) bank_start
} else { .as_ref()
None .and_then(|bank_start| bank_start.get_working_bank_if_not_expired())
}
})
} }
// Used in tests // Used in tests