diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index ee90150284..bca2dad305 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -239,10 +239,10 @@ impl BankingStage { } fn consume_or_forward_packets( + my_pubkey: &Pubkey, leader_pubkey: Option, bank_is_available: bool, would_be_leader: bool, - my_pubkey: &Pubkey, ) -> BufferedPacketsDecision { leader_pubkey.map_or( // If leader is not known, return the buffered packets as is @@ -275,18 +275,24 @@ impl BankingStage { enable_forwarding: bool, batch_limit: usize, ) -> Result<()> { - let decision = { + let (poh_next_slot_leader, poh_has_bank, would_be_leader) = { let poh = poh_recorder.lock().unwrap(); - Self::consume_or_forward_packets( + ( poh.next_slot_leader(), - poh.bank().is_some(), + poh.has_bank(), poh.would_be_leader( (FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET - 1) * DEFAULT_TICKS_PER_SLOT, ), - my_pubkey, ) }; + let decision = Self::consume_or_forward_packets( + my_pubkey, + poh_next_slot_leader, + poh_has_bank, + would_be_leader, + ); + match decision { BufferedPacketsDecision::Consume => { let mut unprocessed = Self::consume_buffered_packets( @@ -300,9 +306,10 @@ impl BankingStage { } BufferedPacketsDecision::Forward => { if enable_forwarding { - let poh = poh_recorder.lock().unwrap(); - let next_leader = - poh.leader_after_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET); + let next_leader = poh_recorder + .lock() + .unwrap() + .leader_after_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET); next_leader.map_or(Ok(()), |leader_pubkey| { let leader_addr = { cluster_info @@ -1507,60 +1514,60 @@ mod tests { let my_pubkey1 = Pubkey::new_rand(); assert_eq!( - BankingStage::consume_or_forward_packets(None, true, false, &my_pubkey), + BankingStage::consume_or_forward_packets(&my_pubkey, None, true, false,), BufferedPacketsDecision::Hold ); assert_eq!( - BankingStage::consume_or_forward_packets(None, false, false, &my_pubkey), + BankingStage::consume_or_forward_packets(&my_pubkey, None, false, false), BufferedPacketsDecision::Hold ); assert_eq!( - BankingStage::consume_or_forward_packets(None, false, false, &my_pubkey1), + BankingStage::consume_or_forward_packets(&my_pubkey1, None, false, false), BufferedPacketsDecision::Hold ); assert_eq!( BankingStage::consume_or_forward_packets( + &my_pubkey, Some(my_pubkey1.clone()), false, false, - &my_pubkey ), BufferedPacketsDecision::Forward ); assert_eq!( BankingStage::consume_or_forward_packets( + &my_pubkey, Some(my_pubkey1.clone()), false, true, - &my_pubkey ), BufferedPacketsDecision::Hold ); assert_eq!( BankingStage::consume_or_forward_packets( + &my_pubkey, Some(my_pubkey1.clone()), true, false, - &my_pubkey ), BufferedPacketsDecision::Consume ); assert_eq!( BankingStage::consume_or_forward_packets( + &my_pubkey1, Some(my_pubkey1.clone()), false, false, - &my_pubkey1 ), BufferedPacketsDecision::Hold ); assert_eq!( BankingStage::consume_or_forward_packets( + &my_pubkey1, Some(my_pubkey1.clone()), true, false, - &my_pubkey1 ), BufferedPacketsDecision::Consume ); diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 3f0b992d1a..e5b124aec6 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -54,7 +54,7 @@ impl ClusterInfoVoteListener { return Ok(()); } let (votes, new_ts) = cluster_info.read().unwrap().get_votes(last_ts); - if poh_recorder.lock().unwrap().bank().is_some() { + if poh_recorder.lock().unwrap().has_bank() { last_ts = new_ts; inc_new_counter_debug!("cluster_info_vote_listener-recv_count", votes.len()); let msgs = packet::to_packets(&votes); diff --git a/core/src/leader_schedule_utils.rs b/core/src/leader_schedule_utils.rs index f42407041a..449e58abde 100644 --- a/core/src/leader_schedule_utils.rs +++ b/core/src/leader_schedule_utils.rs @@ -33,10 +33,6 @@ pub fn num_ticks_left_in_slot(bank: &Bank, tick_height: u64) -> u64 { bank.ticks_per_slot() - tick_height % bank.ticks_per_slot() - 1 } -pub fn tick_height_to_slot(ticks_per_slot: u64, tick_height: u64) -> u64 { - tick_height / ticks_per_slot -} - fn sort_stakes(stakes: &mut Vec<(Pubkey, u64)>) { // Sort first by stake. If stakes are the same, sort by pubkey to ensure a // deterministic result. diff --git a/core/src/poh_recorder.rs b/core/src/poh_recorder.rs index f589531ee3..0b21e0e0a6 100644 --- a/core/src/poh_recorder.rs +++ b/core/src/poh_recorder.rs @@ -101,6 +101,7 @@ impl PohRecorder { self.leader_schedule_cache .slot_leader_at(self.tick_height / self.ticks_per_slot + slots, None) } + pub fn next_slot_leader(&self) -> Option { self.leader_after_slots(1) } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 1ad07ecb90..d6add9e471 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -105,11 +105,13 @@ impl ReplayStage { let leader_schedule_cache = leader_schedule_cache.clone(); let vote_account = *vote_account; let voting_keypair = voting_keypair.cloned(); + let t_replay = Builder::new() .name("solana-replay-stage".to_string()) .spawn(move || { let _exit = Finalizer::new(exit_.clone()); let mut progress = HashMap::new(); + loop { let now = Instant::now(); // Stop getting entries if we get exit signal @@ -123,6 +125,8 @@ impl ReplayStage { &leader_schedule_cache, ); + let mut tpu_has_bank = poh_recorder.lock().unwrap().has_bank(); + let did_complete_bank = Self::replay_active_banks( &blocktree, &bank_forks, @@ -156,15 +160,17 @@ impl ReplayStage { &poh_recorder, &leader_schedule_cache, ); - assert!(!poh_recorder.lock().unwrap().has_bank()); + tpu_has_bank = false; } - Self::maybe_start_leader( - &my_pubkey, - &bank_forks, - &poh_recorder, - &leader_schedule_cache, - ); + if !tpu_has_bank { + Self::maybe_start_leader( + &my_pubkey, + &bank_forks, + &poh_recorder, + &leader_schedule_cache, + ); + } inc_new_counter_info!( "replicate_stage-duration", @@ -194,37 +200,19 @@ impl ReplayStage { poh_recorder: &Arc>, leader_schedule_cache: &Arc, ) { - let (grace_ticks, poh_slot, parent_slot) = { - let poh_recorder = poh_recorder.lock().unwrap(); + // all the individual calls to poh_recorder.lock() are designed to + // increase granularity, decrease contention - // we're done - if poh_recorder.has_bank() { - trace!("{} poh_recorder already has a bank", my_pubkey); - return; - } + assert!(!poh_recorder.lock().unwrap().has_bank()); - let (reached_leader_tick, grace_ticks, poh_slot, parent_slot) = - poh_recorder.reached_leader_tick(); + let (reached_leader_tick, grace_ticks, poh_slot, parent_slot) = + poh_recorder.lock().unwrap().reached_leader_tick(); - if !reached_leader_tick { - trace!("{} poh_recorder hasn't reached_leader_tick", my_pubkey); - return; - } - - (grace_ticks, poh_slot, parent_slot) - }; - - trace!( - "{} reached_leader_tick, poh_slot: {} parent_slot: {}", - my_pubkey, - poh_slot, - parent_slot, - ); - - if bank_forks.read().unwrap().get(poh_slot).is_some() { - warn!("{} already have bank in forks at {}", my_pubkey, poh_slot); + if !reached_leader_tick { + trace!("{} poh_recorder hasn't reached_leader_tick", my_pubkey); return; } + trace!("{} reached_leader_tick", my_pubkey,); let parent = bank_forks .read() @@ -233,16 +221,18 @@ impl ReplayStage { .expect("parent_slot doesn't exist in bank forks") .clone(); - // the parent was still in poh_recorder last time we looked for votable banks - // break out and re-run the consensus loop above - if !parent.is_frozen() { - trace!( - "{} parent {} isn't frozen, must be re-considered", - my_pubkey, - parent.slot() - ); + assert!(parent.is_frozen()); + + if bank_forks.read().unwrap().get(poh_slot).is_some() { + warn!("{} already have bank in forks at {}?", my_pubkey, poh_slot); return; } + trace!( + "{} poh_slot {} parent_slot {}", + my_pubkey, + poh_slot, + parent_slot + ); if let Some(next_leader) = leader_schedule_cache.slot_leader_at(poh_slot, Some(&parent)) { trace!(