diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index d87370b979..fc5d71a7e9 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -5,6 +5,7 @@ use crate::cluster_info::ClusterInfo; use crate::entry::Entry; use crate::leader_confirmation_service::LeaderConfirmationService; +use crate::leader_schedule_utils; use crate::packet::Packets; use crate::packet::SharedPackets; use crate::poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntries}; @@ -76,32 +77,110 @@ impl BankingStage { fn forward_unprocessed_packets( socket: &std::net::UdpSocket, tpu: &std::net::SocketAddr, - unprocessed_packets: UnprocessedPackets, + unprocessed_packets: &UnprocessedPackets, ) -> std::io::Result<()> { for (packets, start_index) in unprocessed_packets { let packets = packets.read().unwrap(); - for packet in packets.packets.iter().skip(start_index) { + for packet in packets.packets.iter().skip(*start_index) { socket.send_to(&packet.data[..packet.meta.size], tpu)?; } } Ok(()) } + fn forward_buffered_packets( + socket: &std::net::UdpSocket, + poh_recorder: &Arc>, + cluster_info: &Arc>, + buffered_packets: &UnprocessedPackets, + ) -> bool { + let (leader, my_id) = { + let rcluster_info = cluster_info.read().unwrap(); + let leader_id = if let Some(leader) = rcluster_info.leader_data() { + Some(leader.clone()) + } else { + None + }; + (leader_id, rcluster_info.id()) + }; + + let bank = poh_recorder.lock().unwrap().bank(); + + // if the current node is not the leader, forward the buffered packets + if bank.is_none() { + if let Some(leader) = leader.clone() { + if my_id == leader.id { + let _ = + Self::forward_unprocessed_packets(&socket, &leader.tpu, &buffered_packets); + return true; + } + } + } + + // If there's a bank, and leader is available, forward the packets + if bank.is_some() && leader.is_some() { + let _ = + Self::forward_unprocessed_packets(&socket, &leader.unwrap().tpu, &buffered_packets); + return true; + } + + return false; + } + + fn should_buffer_packets( + poh_recorder: &Arc>, + cluster_info: &Arc>, + ) -> bool { + let bank = poh_recorder.lock().unwrap().bank(); + let rcluster_info = cluster_info.read().unwrap(); + let my_id = rcluster_info.id(); + let leader = rcluster_info.leader_data(); + + // Buffer the packets if it was getting sent to me + if bank.is_none() && leader.is_some() && my_id == leader.unwrap().id { + return true; + } + + if let Some(bank) = bank { + // Buffer the packets if I am the next leader + if leader_schedule_utils::slot_leader_at(bank.slot() + 1, &bank).unwrap() == my_id { + return true; + } + } + + return false; + } + pub fn process_loop( verified_receiver: &Arc>>, poh_recorder: &Arc>, cluster_info: &Arc>, ) { let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let mut buffered_packets = vec![]; loop { + if Self::forward_buffered_packets( + &socket, + poh_recorder, + cluster_info, + &buffered_packets, + ) { + buffered_packets.clear(); + } + match Self::process_packets(&verified_receiver, &poh_recorder) { Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (), Ok(unprocessed_packets) => { + if Self::should_buffer_packets(poh_recorder, cluster_info) { + buffered_packets.extend_from_slice(&unprocessed_packets); + continue; + } + if let Some(leader) = cluster_info.read().unwrap().leader_data() { let _ = Self::forward_unprocessed_packets( &socket, &leader.tpu, - unprocessed_packets, + &unprocessed_packets, ); } }