diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index a23a3648f4..01b199a510 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -10,7 +10,6 @@ use solana::banking_stage::{create_test_recorder, BankingStage}; use solana::blocktree::{get_tmp_ledger_path, Blocktree}; use solana::cluster_info::ClusterInfo; use solana::cluster_info::Node; -use solana::leader_schedule_cache::LeaderScheduleCache; use solana::packet::to_packets_chunked; use solana::poh_recorder::WorkingBankEntries; use solana::service::Service; @@ -58,7 +57,6 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { let (verified_sender, verified_receiver) = channel(); let (vote_sender, vote_receiver) = channel(); let bank = Arc::new(Bank::new(&genesis_block)); - let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); let dummy = system_transaction::transfer( &mint_keypair, &mint_keypair.pubkey(), @@ -124,7 +122,6 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { &poh_recorder, verified_receiver, vote_receiver, - &leader_schedule_cache, ); poh_recorder.lock().unwrap().set_bank(&bank); @@ -167,7 +164,6 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) { let (verified_sender, verified_receiver) = channel(); let (vote_sender, vote_receiver) = channel(); let bank = Arc::new(Bank::new(&genesis_block)); - let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); let dummy = system_transaction::transfer( &mint_keypair, &mint_keypair.pubkey(), @@ -249,7 +245,6 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) { &poh_recorder, verified_receiver, vote_receiver, - &leader_schedule_cache, ); poh_recorder.lock().unwrap().set_bank(&bank); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 60603b6679..1ac846b32c 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -3,7 +3,6 @@ //! can do its processing in parallel with signature verification on the GPU. use crate::blocktree::Blocktree; use crate::cluster_info::ClusterInfo; -use crate::contact_info::ContactInfo; use crate::entry; use crate::entry::{hash_transactions, Entry}; use crate::leader_schedule_cache::LeaderScheduleCache; @@ -56,7 +55,6 @@ impl BankingStage { poh_recorder: &Arc>, verified_receiver: Receiver, verified_vote_receiver: Receiver, - leader_schedule_cache: &Arc, ) -> Self { Self::new_num_threads( cluster_info, @@ -64,17 +62,15 @@ impl BankingStage { verified_receiver, verified_vote_receiver, cmp::min(2, Self::num_threads()), - leader_schedule_cache, ) } - pub fn new_num_threads( + fn new_num_threads( cluster_info: &Arc>, poh_recorder: &Arc>, verified_receiver: Receiver, verified_vote_receiver: Receiver, num_threads: u32, - leader_schedule_cache: &Arc, ) -> Self { let verified_receiver = Arc::new(Mutex::new(verified_receiver)); let verified_vote_receiver = Arc::new(Mutex::new(verified_vote_receiver)); @@ -98,7 +94,6 @@ impl BankingStage { let cluster_info = cluster_info.clone(); let exit = exit.clone(); let mut recv_start = Instant::now(); - let leader_schedule_cache = leader_schedule_cache.clone(); Builder::new() .name("solana-banking-stage-tx".to_string()) .spawn(move || { @@ -108,7 +103,6 @@ impl BankingStage { &cluster_info, &mut recv_start, enable_forwarding, - leader_schedule_cache, ); exit.store(true, Ordering::Relaxed); }) @@ -118,7 +112,7 @@ impl BankingStage { Self { bank_thread_hdls } } - fn forward_unprocessed_packets( + fn forward_buffered_packets( socket: &std::net::UdpSocket, tpu_via_blobs: &std::net::SocketAddr, unprocessed_packets: &[(Packets, usize, Vec)], @@ -141,7 +135,7 @@ impl BankingStage { Ok(()) } - fn process_buffered_packets( + fn consume_buffered_packets( poh_recorder: &Arc>, buffered_packets: &[(Packets, usize, Vec)], ) -> Result { @@ -188,13 +182,13 @@ impl BankingStage { Ok(unprocessed_packets) } - fn process_or_forward_packets( - leader_data: Option<&ContactInfo>, + fn consume_or_forward_packets( + leader_id: Option, bank_is_available: bool, would_be_leader: bool, my_id: &Pubkey, ) -> BufferedPacketsDecision { - leader_data.map_or( + leader_id.map_or( // If leader is not known, return the buffered packets as is BufferedPacketsDecision::Hold, // else process the packets @@ -205,7 +199,7 @@ impl BankingStage { } else if would_be_leader { // If the node will be the leader soon, hold the packets for now BufferedPacketsDecision::Hold - } else if x.id != *my_id { + } else if x != *my_id { // If the current node is not the leader, forward the buffered packets BufferedPacketsDecision::Forward } else { @@ -216,7 +210,7 @@ impl BankingStage { ) } - fn handle_buffered_packets( + fn process_buffered_packets( socket: &std::net::UdpSocket, poh_recorder: &Arc>, cluster_info: &Arc>, @@ -225,84 +219,59 @@ impl BankingStage { ) -> Result { let rcluster_info = cluster_info.read().unwrap(); - let decision = { + let (decision, next_leader) = { let poh = poh_recorder.lock().unwrap(); - Self::process_or_forward_packets( - rcluster_info.leader_data(), - poh.bank().is_some(), - poh.would_be_leader(DEFAULT_TICKS_PER_SLOT), - &rcluster_info.id(), + let next_leader = poh.next_slot_leader(DEFAULT_TICKS_PER_SLOT, None); + ( + Self::consume_or_forward_packets( + next_leader, + poh.bank().is_some(), + poh.would_be_leader(DEFAULT_TICKS_PER_SLOT * 2), + &rcluster_info.id(), + ), + next_leader, ) }; match decision { BufferedPacketsDecision::Consume => { - Self::process_buffered_packets(poh_recorder, buffered_packets) + Self::consume_buffered_packets(poh_recorder, buffered_packets) } BufferedPacketsDecision::Forward => { if enable_forwarding { - if let Some(leader_id) = poh_recorder - .lock() - .unwrap() - .next_slot_leader(DEFAULT_TICKS_PER_SLOT, None) - { - if let Some(leader) = rcluster_info.lookup(&leader_id) { - let _ = Self::forward_unprocessed_packets( - &socket, - &leader.tpu_via_blobs, - &buffered_packets, - ); - } - } + next_leader.map_or(Ok(buffered_packets.to_vec()), |leader_id| { + rcluster_info.lookup(&leader_id).map_or( + Ok(buffered_packets.to_vec()), + |leader| { + let _ = Self::forward_buffered_packets( + &socket, + &leader.tpu_via_blobs, + &buffered_packets, + ); + Ok(vec![]) + }, + ) + }) + } else { + Ok(vec![]) } - Ok(vec![]) } _ => Ok(buffered_packets.to_vec()), } } - fn should_buffer_packets( - poh_recorder: &Arc>, - cluster_info: &Arc>, - leader_schedule_cache: &Arc, - ) -> bool { - let rcluster_info = cluster_info.read().unwrap(); - - // Buffer the packets if I am the next leader - // or, if it was getting sent to me - // or, the next leader is unknown - let poh = poh_recorder.lock().unwrap(); - let leader_id = match poh.bank() { - Some(bank) => leader_schedule_cache - .slot_leader_at(bank.slot() + 1, Some(&bank)) - .unwrap_or_default(), - None => { - if poh.would_be_leader(DEFAULT_TICKS_PER_SLOT) { - rcluster_info.id() - } else { - rcluster_info - .leader_data() - .map_or(rcluster_info.id(), |x| x.id) - } - } - }; - - leader_id == rcluster_info.id() - } - pub fn process_loop( verified_receiver: &Arc>>, poh_recorder: &Arc>, cluster_info: &Arc>, recv_start: &mut Instant, enable_forwarding: bool, - leader_schedule_cache: Arc, ) { let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut buffered_packets = vec![]; loop { if !buffered_packets.is_empty() { - Self::handle_buffered_packets( + Self::process_buffered_packets( &socket, poh_recorder, cluster_info, @@ -330,48 +299,18 @@ impl BankingStage { if unprocessed_packets.is_empty() { continue; } - if Self::should_buffer_packets( - poh_recorder, - cluster_info, - &leader_schedule_cache, - ) { - let num = unprocessed_packets - .iter() - .map(|(x, start, _)| x.packets.len().saturating_sub(*start)) - .sum(); - inc_new_counter_info!("banking_stage-buffered_packets", num); - buffered_packets.extend_from_slice(&unprocessed_packets); - continue; - } - - if enable_forwarding { - let rcluster_info = cluster_info.read().unwrap(); - if let Some(leader_id) = poh_recorder - .lock() - .unwrap() - .next_slot_leader(DEFAULT_TICKS_PER_SLOT, None) - { - if let Some(leader) = rcluster_info.lookup(&leader_id) { - let _ = Self::forward_unprocessed_packets( - &socket, - &leader.tpu_via_blobs, - &unprocessed_packets, - ); - } - } - } + let num = unprocessed_packets + .iter() + .map(|(x, start, _)| x.packets.len().saturating_sub(*start)) + .sum(); + inc_new_counter_info!("banking_stage-buffered_packets", num); + buffered_packets.extend_from_slice(&unprocessed_packets); } Err(err) => { debug!("solana-banking-stage-tx: exit due to {:?}", err); break; } } - - let num = buffered_packets - .iter() - .map(|(x, start, _)| x.packets.len().saturating_sub(*start)) - .sum(); - inc_new_counter_info!("banking_stage-total_buffered_packets", num); } } @@ -700,7 +639,6 @@ mod tests { fn test_banking_stage_shutdown1() { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); - let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); let (verified_sender, verified_receiver) = channel(); let (vote_sender, vote_receiver) = channel(); let ledger_path = get_tmp_ledger_path!(); @@ -717,7 +655,6 @@ mod tests { &poh_recorder, verified_receiver, vote_receiver, - &leader_schedule_cache, ); drop(verified_sender); drop(vote_sender); @@ -734,7 +671,6 @@ mod tests { let (mut genesis_block, _mint_keypair) = GenesisBlock::new(2); genesis_block.ticks_per_slot = 4; let bank = Arc::new(Bank::new(&genesis_block)); - let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); let start_hash = bank.last_blockhash(); let (verified_sender, verified_receiver) = channel(); let (vote_sender, vote_receiver) = channel(); @@ -752,7 +688,6 @@ mod tests { &poh_recorder, verified_receiver, vote_receiver, - &leader_schedule_cache, ); trace!("sending bank"); sleep(Duration::from_millis(600)); @@ -781,7 +716,6 @@ mod tests { solana_logger::setup(); let (genesis_block, mint_keypair) = GenesisBlock::new(10); let bank = Arc::new(Bank::new(&genesis_block)); - let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); let start_hash = bank.last_blockhash(); let (verified_sender, verified_receiver) = channel(); let (vote_sender, vote_receiver) = channel(); @@ -799,7 +733,6 @@ mod tests { &poh_recorder, verified_receiver, vote_receiver, - &leader_schedule_cache, ); // fund another account so we can send 2 good transactions in a single batch. @@ -921,7 +854,6 @@ mod tests { let entry_receiver = { // start a banking_stage to eat verified receiver let bank = Arc::new(Bank::new(&genesis_block)); - let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); let blocktree = Arc::new( Blocktree::open(&ledger_path) .expect("Expected to be able to open database ledger"), @@ -937,7 +869,6 @@ mod tests { verified_receiver, vote_receiver, 2, - &leader_schedule_cache, ); // wait for banking_stage to eat the packets @@ -1038,38 +969,36 @@ mod tests { let my_id1 = Pubkey::new_rand(); assert_eq!( - BankingStage::process_or_forward_packets(None, true, false, &my_id), + BankingStage::consume_or_forward_packets(None, true, false, &my_id), BufferedPacketsDecision::Hold ); assert_eq!( - BankingStage::process_or_forward_packets(None, false, false, &my_id), + BankingStage::consume_or_forward_packets(None, false, false, &my_id), BufferedPacketsDecision::Hold ); assert_eq!( - BankingStage::process_or_forward_packets(None, false, false, &my_id1), + BankingStage::consume_or_forward_packets(None, false, false, &my_id1), BufferedPacketsDecision::Hold ); - let mut contact_info = ContactInfo::default(); - contact_info.id = my_id1; assert_eq!( - BankingStage::process_or_forward_packets(Some(&contact_info), false, false, &my_id), + BankingStage::consume_or_forward_packets(Some(my_id1.clone()), false, false, &my_id), BufferedPacketsDecision::Forward ); assert_eq!( - BankingStage::process_or_forward_packets(Some(&contact_info), false, true, &my_id), + BankingStage::consume_or_forward_packets(Some(my_id1.clone()), false, true, &my_id), BufferedPacketsDecision::Hold ); assert_eq!( - BankingStage::process_or_forward_packets(Some(&contact_info), true, false, &my_id), + BankingStage::consume_or_forward_packets(Some(my_id1.clone()), true, false, &my_id), BufferedPacketsDecision::Consume ); assert_eq!( - BankingStage::process_or_forward_packets(Some(&contact_info), false, false, &my_id1), + BankingStage::consume_or_forward_packets(Some(my_id1.clone()), false, false, &my_id1), BufferedPacketsDecision::Hold ); assert_eq!( - BankingStage::process_or_forward_packets(Some(&contact_info), true, false, &my_id1), + BankingStage::consume_or_forward_packets(Some(my_id1.clone()), true, false, &my_id1), BufferedPacketsDecision::Consume ); } diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index 3c0c221994..1d3d3bded2 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -64,7 +64,7 @@ impl FetchStage { if poh_recorder .lock() .unwrap() - .would_be_leader(DEFAULT_TICKS_PER_SLOT) + .would_be_leader(DEFAULT_TICKS_PER_SLOT * 2) { inc_new_counter_info!("fetch_stage-honor_forwards", len); for packets in batch { diff --git a/core/src/fullnode.rs b/core/src/fullnode.rs index f7321bbfc6..4e6363140a 100644 --- a/core/src/fullnode.rs +++ b/core/src/fullnode.rs @@ -255,7 +255,6 @@ impl Fullnode { config.sigverify_disabled, &blocktree, sender, - &leader_schedule_cache, &exit, &genesis_blockhash, ); diff --git a/core/src/tpu.rs b/core/src/tpu.rs index cb729b4fef..ea4259b5b7 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -8,7 +8,6 @@ use crate::cluster_info::ClusterInfo; use crate::cluster_info_vote_listener::ClusterInfoVoteListener; use crate::entry::EntrySender; use crate::fetch_stage::FetchStage; -use crate::leader_schedule_cache::LeaderScheduleCache; use crate::poh_recorder::{PohRecorder, WorkingBankEntries}; use crate::service::Service; use crate::sigverify_stage::SigVerifyStage; @@ -41,7 +40,6 @@ impl Tpu { sigverify_disabled: bool, blocktree: &Arc, storage_entry_sender: EntrySender, - leader_schedule_cache: &Arc, exit: &Arc, genesis_blockhash: &Hash, ) -> Self { @@ -74,7 +72,6 @@ impl Tpu { poh_recorder, verified_receiver, verified_vote_receiver, - leader_schedule_cache, ); let broadcast_stage = BroadcastStage::new(