diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 44e4ba8f20..cf89d5380c 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -19,7 +19,7 @@ use solana_metrics::counter::Counter; use solana_runtime::bank::Bank; use solana_runtime::locked_accounts_results::LockedAccountsResults; use solana_sdk::pubkey::Pubkey; -use solana_sdk::timing::{self, duration_as_us, MAX_RECENT_BLOCKHASHES}; +use solana_sdk::timing::{self, duration_as_us, DEFAULT_TICKS_PER_SLOT, MAX_RECENT_BLOCKHASHES}; use solana_sdk::transaction::{self, Transaction, TransactionError}; use std::cmp; use std::net::UdpSocket; @@ -186,6 +186,7 @@ impl BankingStage { fn process_or_forward_packets( leader_data: Option<&ContactInfo>, bank_is_available: bool, + would_be_leader: bool, my_id: &Pubkey, ) -> BufferedPacketsDecision { leader_data.map_or( @@ -196,6 +197,9 @@ impl BankingStage { if bank_is_available { // If the bank is available, this node is the leader BufferedPacketsDecision::Consume + } else if would_be_leader { + // If the node will be the leader soon, hold the packets for now + BufferedPacketsDecision::Hold } else if x.id != *my_id { // If the current node is not the leader, forward the buffered packets BufferedPacketsDecision::Forward @@ -216,11 +220,15 @@ impl BankingStage { ) -> Result { let rcluster_info = cluster_info.read().unwrap(); - let decision = Self::process_or_forward_packets( - rcluster_info.leader_data(), - poh_recorder.lock().unwrap().bank().is_some(), - &rcluster_info.id(), - ); + let decision = { + let poh = poh_recorder.lock().unwrap(); + Self::process_or_forward_packets( + rcluster_info.leader_data(), + poh.bank().is_some(), + poh.would_be_leader(DEFAULT_TICKS_PER_SLOT), + &rcluster_info.id(), + ) + }; match decision { BufferedPacketsDecision::Consume => { @@ -250,13 +258,20 @@ impl BankingStage { // Buffer the packets if I am the next leader // or, if it was getting sent to me // or, the next leader is unknown - let leader_id = match poh_recorder.lock().unwrap().bank() { + let poh = poh_recorder.lock().unwrap(); + let leader_id = match poh.bank() { Some(bank) => leader_schedule_cache .slot_leader_at_else_compute(bank.slot() + 1, &bank) .unwrap_or_default(), - None => rcluster_info - .leader_data() - .map_or(rcluster_info.id(), |x| x.id), + None => { + if poh.would_be_leader(DEFAULT_TICKS_PER_SLOT) { + rcluster_info.id() + } else { + rcluster_info + .leader_data() + .map_or(rcluster_info.id(), |x| x.id) + } + } }; leader_id == rcluster_info.id() @@ -997,34 +1012,38 @@ mod tests { let my_id1 = Pubkey::new_rand(); assert_eq!( - BankingStage::process_or_forward_packets(None, true, &my_id), + BankingStage::process_or_forward_packets(None, true, false, &my_id), BufferedPacketsDecision::Hold ); assert_eq!( - BankingStage::process_or_forward_packets(None, false, &my_id), + BankingStage::process_or_forward_packets(None, false, false, &my_id), BufferedPacketsDecision::Hold ); assert_eq!( - BankingStage::process_or_forward_packets(None, false, &my_id1), + BankingStage::process_or_forward_packets(None, false, false, &my_id1), BufferedPacketsDecision::Hold ); let mut contact_info = ContactInfo::default(); contact_info.id = my_id1; assert_eq!( - BankingStage::process_or_forward_packets(Some(&contact_info), false, &my_id), + BankingStage::process_or_forward_packets(Some(&contact_info), false, false, &my_id), BufferedPacketsDecision::Forward ); assert_eq!( - BankingStage::process_or_forward_packets(Some(&contact_info), true, &my_id), - BufferedPacketsDecision::Consume - ); - assert_eq!( - BankingStage::process_or_forward_packets(Some(&contact_info), false, &my_id1), + BankingStage::process_or_forward_packets(Some(&contact_info), false, true, &my_id), BufferedPacketsDecision::Hold ); assert_eq!( - BankingStage::process_or_forward_packets(Some(&contact_info), true, &my_id1), + BankingStage::process_or_forward_packets(Some(&contact_info), true, false, &my_id), + BufferedPacketsDecision::Consume + ); + assert_eq!( + BankingStage::process_or_forward_packets(Some(&contact_info), false, false, &my_id1), + BufferedPacketsDecision::Hold + ); + assert_eq!( + BankingStage::process_or_forward_packets(Some(&contact_info), true, false, &my_id1), BufferedPacketsDecision::Consume ); } diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index ef9158314f..3c0c221994 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -64,7 +64,7 @@ impl FetchStage { if poh_recorder .lock() .unwrap() - .would_be_leader(1, DEFAULT_TICKS_PER_SLOT) + .would_be_leader(DEFAULT_TICKS_PER_SLOT) { inc_new_counter_info!("fetch_stage-honor_forwards", len); for packets in batch { diff --git a/core/src/poh_recorder.rs b/core/src/poh_recorder.rs index d359079b78..2b8155d3d7 100644 --- a/core/src/poh_recorder.rs +++ b/core/src/poh_recorder.rs @@ -79,14 +79,13 @@ impl PohRecorder { } } - pub fn would_be_leader(&self, within_next_n_slots: u64, ticks_per_slot: u64) -> bool { + pub fn would_be_leader(&self, within_next_n_ticks: u64) -> bool { let close_to_leader_tick = self.start_leader_at_tick.map_or(false, |leader_tick| { let leader_ideal_start_tick = leader_tick.saturating_sub(self.max_last_leader_grace_ticks); self.tick_height() <= self.last_leader_tick.unwrap_or(0) - && self.tick_height() - >= leader_ideal_start_tick.saturating_sub(within_next_n_slots * ticks_per_slot) + && self.tick_height() >= leader_ideal_start_tick.saturating_sub(within_next_n_ticks) }); self.working_bank.is_some() || close_to_leader_tick @@ -1191,7 +1190,7 @@ mod tests { // Test that with no leader slot, we don't reach the leader tick assert_eq!( - poh_recorder.would_be_leader(2, bank.ticks_per_slot()), + poh_recorder.would_be_leader(2 * bank.ticks_per_slot()), false ); @@ -1201,7 +1200,7 @@ mod tests { // Test that with no leader slot, we don't reach the leader tick after sending some ticks assert_eq!( - poh_recorder.would_be_leader(2, bank.ticks_per_slot()), + poh_recorder.would_be_leader(2 * bank.ticks_per_slot()), false ); @@ -1214,7 +1213,7 @@ mod tests { ); assert_eq!( - poh_recorder.would_be_leader(2, bank.ticks_per_slot()), + poh_recorder.would_be_leader(2 * bank.ticks_per_slot()), false ); @@ -1229,21 +1228,27 @@ mod tests { // Test that the node won't be leader in next 2 slots assert_eq!( - poh_recorder.would_be_leader(2, bank.ticks_per_slot()), + poh_recorder.would_be_leader(2 * bank.ticks_per_slot()), false ); // Test that the node will be leader in next 3 slots - assert_eq!(poh_recorder.would_be_leader(3, bank.ticks_per_slot()), true); + assert_eq!( + poh_recorder.would_be_leader(3 * bank.ticks_per_slot()), + true + ); assert_eq!( - poh_recorder.would_be_leader(2, bank.ticks_per_slot()), + poh_recorder.would_be_leader(2 * bank.ticks_per_slot()), false ); // If we set the working bank, the node should be leader within next 2 slots poh_recorder.set_bank(&bank); - assert_eq!(poh_recorder.would_be_leader(2, bank.ticks_per_slot()), true); + assert_eq!( + poh_recorder.would_be_leader(2 * bank.ticks_per_slot()), + true + ); } } }