diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 5b913fde72..199f7d7abd 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -6,8 +6,8 @@ use crate::cluster_info::ClusterInfo; use crate::entry::Entry; use crate::leader_confirmation_service::LeaderConfirmationService; use crate::leader_schedule_utils; -use crate::packet::Packets; use crate::packet::SharedPackets; +use crate::packet::{Blob, Packets}; use crate::poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntries}; use crate::poh_service::{PohService, PohServiceConfig}; use crate::result::{Error, Result}; @@ -76,15 +76,29 @@ impl BankingStage { fn forward_unprocessed_packets( socket: &std::net::UdpSocket, - tpu: &std::net::SocketAddr, + forwarder: &std::net::SocketAddr, unprocessed_packets: &[(SharedPackets, usize)], ) -> std::io::Result<()> { + let mut blob = Blob::default(); for (packets, start_index) in unprocessed_packets { let packets = packets.read().unwrap(); - for packet in packets.packets.iter().skip(*start_index) { - socket.send_to(&packet.data[..packet.meta.size], tpu)?; + let mut current_index = *start_index; + while current_index < packets.packets.len() { + current_index += blob.store_packets(&packets.packets[current_index..]) as usize; + if current_index < packets.packets.len() { + // Blob is full, send it + socket.send_to(&blob.data[..blob.meta.size], forwarder)?; + blob = Blob::default(); + } else { + break; + } } } + + if blob.size() > 0 { + socket.send_to(&blob.data[..blob.meta.size], forwarder)?; + } + Ok(()) } @@ -110,8 +124,11 @@ impl BankingStage { if bank.is_none() { if let Some(leader) = leader.clone() { if my_id == leader.id { - let _ = - Self::forward_unprocessed_packets(&socket, &leader.tpu, &buffered_packets); + let _ = Self::forward_unprocessed_packets( + &socket, + &leader.forwarder, + &buffered_packets, + ); return true; } } @@ -119,8 +136,11 @@ impl BankingStage { // If there's a bank, and leader is available, forward the packets if bank.is_some() && leader.is_some() { - let _ = - Self::forward_unprocessed_packets(&socket, &leader.unwrap().tpu, &buffered_packets); + let _ = Self::forward_unprocessed_packets( + &socket, + &leader.unwrap().forwarder, + &buffered_packets, + ); return true; } @@ -179,7 +199,7 @@ impl BankingStage { if let Some(leader) = cluster_info.read().unwrap().leader_data() { let _ = Self::forward_unprocessed_packets( &socket, - &leader.tpu, + &leader.forwarder, &unprocessed_packets, ); } diff --git a/core/src/packet.rs b/core/src/packet.rs index 032f15f5f7..01d062b937 100644 --- a/core/src/packet.rs +++ b/core/src/packet.rs @@ -419,17 +419,25 @@ impl Blob { self.set_data_size(new_size as u64); } - pub fn store_packets(&mut self, packets: &[Packet]) -> Result<()> { - let mut cursor = Cursor::new(&mut self.data[..]); - cursor.set_position(BLOB_HEADER_SIZE as u64); + pub fn store_packets(&mut self, packets: &[Packet]) -> u64 { + let size = self.size(); + let mut cursor = Cursor::new(&mut self.data_mut()[size..]); + let mut written = 0; + let mut last_index = 0; for packet in packets { - serialize_into(&mut cursor, &packet.meta)?; - cursor.write(&packet.data[..])?; - } - let size = cursor.position(); - self.set_size(size as usize); + if serialize_into(&mut cursor, &packet.meta).is_err() { + break; + } + if cursor.write_all(&packet.data[..]).is_err() { + break; + } - Ok(()) + written = cursor.position() as usize; + last_index += 1; + } + + self.set_size(size + written); + last_index } pub fn recv_blob(socket: &UdpSocket, r: &SharedBlob) -> io::Result<()> { @@ -514,6 +522,8 @@ mod tests { to_packets, Blob, Meta, Packet, Packets, SharedBlob, SharedPackets, NUM_PACKETS, PACKET_DATA_SIZE, }; + use crate::packet::{BLOB_HEADER_SIZE, BLOB_SIZE}; + use bincode::serialized_size; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction::SystemTransaction; @@ -629,4 +639,29 @@ mod tests { assert!(b.should_forward()); } + #[test] + fn test_store_blobs_max() { + let meta = Meta::default(); + let serialized_meta_size = serialized_size(&meta).unwrap() as usize; + let serialized_packet_size = serialized_meta_size + PACKET_DATA_SIZE; + let num_packets = (BLOB_SIZE - BLOB_HEADER_SIZE) / serialized_packet_size + 1; + let mut blob = Blob::default(); + let packets: Vec<_> = (0..num_packets).map(|_| Packet::default()).collect(); + + // Everything except the last packet should have been written + assert_eq!(blob.store_packets(&packets[..]), (num_packets - 1) as u64); + + blob = Blob::default(); + // Store packets such that blob only has room for one more + assert_eq!( + blob.store_packets(&packets[..num_packets - 2]), + (num_packets - 2) as u64 + ); + + // Fill the last packet in the blob + assert_eq!(blob.store_packets(&packets[..num_packets - 2]), 1); + + // Blob is now full + assert_eq!(blob.store_packets(&packets), 0); + } } diff --git a/core/src/streamer.rs b/core/src/streamer.rs index 3f161a141a..7eef5b2983 100644 --- a/core/src/streamer.rs +++ b/core/src/streamer.rs @@ -308,7 +308,7 @@ mod test { .collect(); let mut blob = Blob::default(); - blob.store_packets(&packets[..]).unwrap(); + assert_eq!(blob.store_packets(&packets[..]), num_packets); let result = deserialize_packets_in_blob( &blob.data()[..blob.size()], serialized_packet_size, diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 8517e298a8..49e4494c68 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -26,6 +26,7 @@ pub struct Tpu { } impl Tpu { + #[allow(clippy::too_many_arguments)] pub fn new( id: &Pubkey, cluster_info: &Arc>,