diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 945f4b86d6..f79a6cf83c 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -85,7 +85,7 @@ impl BankingStage { // This thread talks to poh_service and broadcasts the entries once they have been recorded. // Once an entry has been recorded, its blockhash is registered with the bank. let exit = Arc::new(AtomicBool::new(false)); - + let my_pubkey = cluster_info.read().unwrap().id(); // Many banks that process transactions in parallel. let bank_thread_hdls: Vec> = (0..num_threads) .map(|i| { @@ -104,6 +104,7 @@ impl BankingStage { .name("solana-banking-stage-tx".to_string()) .spawn(move || { Self::process_loop( + my_pubkey, &verified_receiver, &poh_recorder, &cluster_info, @@ -241,14 +242,13 @@ impl BankingStage { } fn process_buffered_packets( + my_pubkey: &Pubkey, socket: &std::net::UdpSocket, poh_recorder: &Arc>, cluster_info: &Arc>, buffered_packets: &mut Vec, enable_forwarding: bool, ) -> Result<()> { - let rcluster_info = cluster_info.read().unwrap(); - let (decision, next_leader) = { let poh = poh_recorder.lock().unwrap(); let next_leader = poh.next_slot_leader(); @@ -257,7 +257,7 @@ impl BankingStage { next_leader, poh.bank().is_some(), poh.would_be_leader(DEFAULT_TICKS_PER_SLOT * 2), - &rcluster_info.id(), + my_pubkey, ), next_leader, ) @@ -265,28 +265,31 @@ impl BankingStage { match decision { BufferedPacketsDecision::Consume => { - let mut unprocessed = Self::consume_buffered_packets( - &rcluster_info.id(), - poh_recorder, - buffered_packets, - )?; + let mut unprocessed = + Self::consume_buffered_packets(my_pubkey, poh_recorder, buffered_packets)?; buffered_packets.append(&mut unprocessed); Ok(()) } BufferedPacketsDecision::Forward => { if enable_forwarding { next_leader.map_or(Ok(()), |leader_pubkey| { - rcluster_info - .lookup(&leader_pubkey) - .map_or(Ok(()), |leader| { - let _ = Self::forward_buffered_packets( - &socket, - &leader.tpu_via_blobs, - &buffered_packets, - ); - buffered_packets.clear(); - Ok(()) - }) + let leader_addr = { + cluster_info + .read() + .unwrap() + .lookup(&leader_pubkey) + .map(|leader| leader.tpu_via_blobs) + }; + + leader_addr.map_or(Ok(()), |leader_addr| { + let _ = Self::forward_buffered_packets( + &socket, + &leader_addr, + &buffered_packets, + ); + buffered_packets.clear(); + Ok(()) + }) }) } else { buffered_packets.clear(); @@ -298,6 +301,7 @@ impl BankingStage { } pub fn process_loop( + my_pubkey: Pubkey, verified_receiver: &Arc>>, poh_recorder: &Arc>, cluster_info: &Arc>, @@ -310,6 +314,7 @@ impl BankingStage { loop { if !buffered_packets.is_empty() { Self::process_buffered_packets( + &my_pubkey, &socket, poh_recorder, cluster_info, @@ -330,11 +335,11 @@ impl BankingStage { }; match Self::process_packets( + &my_pubkey, &verified_receiver, &poh_recorder, recv_start, recv_timeout, - cluster_info, id, ) { Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (), @@ -694,11 +699,11 @@ impl BankingStage { /// Process the incoming packets pub fn process_packets( + my_pubkey: &Pubkey, verified_receiver: &Arc>>, poh: &Arc>, recv_start: &mut Instant, recv_timeout: Duration, - cluster_info: &Arc>, id: u32, ) -> Result { let mms = verified_receiver @@ -740,7 +745,6 @@ impl BankingStage { if processed < verified_txs_len { let next_leader = poh.lock().unwrap().next_slot_leader(); - let my_pubkey = cluster_info.read().unwrap().id(); // Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones while let Some((msgs, vers)) = mms_iter.next() { let packet_indexes = Self::generate_packet_indexes(vers);