diff --git a/core/src/packet.rs b/core/src/packet.rs index e0003e8f3a..255c485fbb 100644 --- a/core/src/packet.rs +++ b/core/src/packet.rs @@ -75,7 +75,7 @@ impl PartialEq for Packet { fn eq(&self, other: &Packet) -> bool { let self_data: &[u8] = self.data.as_ref(); let other_data: &[u8] = other.data.as_ref(); - self.meta == other.meta && self_data == other_data + self.meta == other.meta && self_data[..self.meta.size] == other_data[..other.meta.size] } } @@ -322,32 +322,6 @@ pub fn packets_to_blobs>(packets: &[T]) -> Vec { blobs } -pub fn deserialize_packets_in_blob( - data: &[u8], - serialized_packet_size: usize, - serialized_meta_size: usize, -) -> Result> { - let mut packets: Vec = Vec::with_capacity(data.len() / serialized_packet_size); - let mut pos = 0; - while pos + serialized_packet_size <= data.len() { - let packet = deserialize_single_packet_in_blob( - &data[pos..pos + serialized_packet_size], - serialized_meta_size, - )?; - pos += serialized_packet_size; - packets.push(packet); - } - Ok(packets) -} - -fn deserialize_single_packet_in_blob(data: &[u8], serialized_meta_size: usize) -> Result { - let meta = bincode::deserialize(&data[..serialized_meta_size])?; - let mut packet_data = [0; PACKET_DATA_SIZE]; - packet_data - .copy_from_slice(&data[serialized_meta_size..serialized_meta_size + PACKET_DATA_SIZE]); - Ok(Packet::new(packet_data, meta)) -} - macro_rules! range { ($prev:expr, $type:ident) => { $prev..$prev + size_of::<$type>() @@ -510,10 +484,11 @@ impl Blob { let mut written = 0; let mut last_index = 0; for packet in packets { - if bincode::serialize_into(&mut cursor, &packet.borrow().meta).is_err() { + if bincode::serialize_into(&mut cursor, &packet.borrow().meta.size).is_err() { break; } - if cursor.write_all(&packet.borrow().data[..]).is_err() { + let packet = packet.borrow(); + if cursor.write_all(&packet.data[..packet.meta.size]).is_err() { break; } @@ -525,6 +500,32 @@ impl Blob { last_index } + // other side of store_packets + pub fn load_packets(&self) -> Vec { + // rough estimate + let mut packets: Vec = Vec::with_capacity(self.size() / PACKET_DATA_SIZE); + let mut pos = 0; + let size_len = bincode::serialized_size(&0usize).unwrap() as usize; + + while pos + size_len < self.size() { + let size: usize = bincode::deserialize_from(&self.data()[pos..]).unwrap(); + + pos += size_len; + + if size > PACKET_DATA_SIZE || pos + size > self.size() { + break; + } + + let mut packet = Packet::default(); + packet.meta.size = size; + packet.data[..size].copy_from_slice(&self.data()[pos..pos + size]); + + pos += size; + packets.push(packet); + } + packets + } + pub fn recv_blob(socket: &UdpSocket, r: &SharedBlob) -> io::Result<()> { let mut p = r.write().unwrap(); trace!("receiving on {}", socket.local_addr().unwrap()); @@ -741,12 +742,17 @@ mod tests { #[test] fn test_store_blobs_max() { - let meta = Meta::default(); - let serialized_meta_size = bincode::serialized_size(&meta).unwrap() as usize; - let serialized_packet_size = serialized_meta_size + PACKET_DATA_SIZE; + let serialized_size_size = bincode::serialized_size(&0usize).unwrap() as usize; + let serialized_packet_size = serialized_size_size + PACKET_DATA_SIZE; let num_packets = (BLOB_SIZE - BLOB_HEADER_SIZE) / serialized_packet_size + 1; let mut blob = Blob::default(); - let packets: Vec<_> = (0..num_packets).map(|_| Packet::default()).collect(); + let packets: Vec<_> = (0..num_packets) + .map(|_| { + let mut packet = Packet::default(); + packet.meta.size = PACKET_DATA_SIZE; + packet + }) + .collect(); // Everything except the last packet should have been written assert_eq!(blob.store_packets(&packets[..]), (num_packets - 1) as u64); @@ -768,21 +774,12 @@ mod tests { #[test] fn test_packets_to_blobs() { let mut rng = rand::thread_rng(); - let meta = Meta::default(); - let serialized_meta_size = bincode::serialized_size(&meta).unwrap() as usize; - let serialized_packet_size = serialized_meta_size + PACKET_DATA_SIZE; - let packets_per_blob = (BLOB_SIZE - BLOB_HEADER_SIZE) / serialized_packet_size; - assert!(packets_per_blob > 1); - let num_packets = packets_per_blob * 10 + packets_per_blob - 1; - - let packets: Vec<_> = (0..num_packets) + let packets: Vec<_> = (0..2) .map(|_| { let mut packet = Packet::default(); - for i in 0..packet.meta.addr.len() { - packet.meta.addr[i] = rng.gen_range(1, std::u16::MAX); - } - for i in 0..packet.data.len() { + packet.meta.size = rng.gen_range(1, PACKET_DATA_SIZE); + for i in 0..packet.meta.size { packet.data[i] = rng.gen_range(1, std::u8::MAX); } packet @@ -790,55 +787,13 @@ mod tests { .collect(); let blobs = packets_to_blobs(&packets[..]); - assert_eq!(blobs.len(), 11); - let reconstructed_packets: Vec = blobs - .iter() - .flat_map(|b| { - deserialize_packets_in_blob( - &b.data()[..b.size()], - serialized_packet_size, - serialized_meta_size, - ) - .unwrap() - }) - .collect(); + let reconstructed_packets: Vec = + blobs.iter().flat_map(|b| b.load_packets()).collect(); assert_eq!(reconstructed_packets, packets); } - #[test] - fn test_deserialize_packets_in_blob() { - let meta = Meta::default(); - let serialized_meta_size = bincode::serialized_size(&meta).unwrap() as usize; - let serialized_packet_size = serialized_meta_size + PACKET_DATA_SIZE; - let num_packets = 10; - let mut rng = rand::thread_rng(); - let packets: Vec<_> = (0..num_packets) - .map(|_| { - let mut packet = Packet::default(); - for i in 0..packet.meta.addr.len() { - packet.meta.addr[i] = rng.gen_range(1, std::u16::MAX); - } - for i in 0..packet.data.len() { - packet.data[i] = rng.gen_range(1, std::u8::MAX); - } - packet - }) - .collect(); - - let mut blob = Blob::default(); - assert_eq!(blob.store_packets(&packets[..]), num_packets); - let result = deserialize_packets_in_blob( - &blob.data()[..blob.size()], - serialized_packet_size, - serialized_meta_size, - ) - .unwrap(); - - assert_eq!(result, packets); - } - #[test] fn test_blob_data_align() { assert_eq!(std::mem::align_of::(), BLOB_DATA_ALIGN); @@ -846,13 +801,21 @@ mod tests { #[test] fn test_packet_partial_eq() { - let p1 = Packet::default(); + let mut p1 = Packet::default(); let mut p2 = Packet::default(); + p1.meta.size = 1; + p1.data[0] = 0; + + p2.meta.size = 1; + p2.data[0] = 0; + assert!(p1 == p2); - p2.data[1] = 4; + + p2.data[0] = 4; assert!(p1 != p2); } + #[test] fn test_blob_partial_eq() { let p1 = Blob::default(); diff --git a/core/src/streamer.rs b/core/src/streamer.rs index e74a7a38ae..6cc0a93a29 100644 --- a/core/src/streamer.rs +++ b/core/src/streamer.rs @@ -1,11 +1,8 @@ //! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets. //! -use crate::packet::{ - deserialize_packets_in_blob, Blob, Meta, Packets, SharedBlobs, PACKET_DATA_SIZE, -}; +use crate::packet::{Blob, Packets, SharedBlobs}; use crate::result::{Error, Result}; -use bincode; use solana_sdk::timing::duration_as_ms; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; @@ -135,25 +132,9 @@ fn recv_blob_packets(sock: &UdpSocket, s: &PacketSender) -> Result<()> { sock.local_addr().unwrap() ); - let meta = Meta::default(); - let serialized_meta_size = bincode::serialized_size(&meta)? as usize; - let serialized_packet_size = serialized_meta_size + PACKET_DATA_SIZE; let blobs = Blob::recv_from(sock)?; for blob in blobs { - let r_blob = blob.read().unwrap(); - let data = { - let msg_size = r_blob.size(); - &r_blob.data()[..msg_size] - }; - - let packets = - deserialize_packets_in_blob(data, serialized_packet_size, serialized_meta_size); - - if packets.is_err() { - continue; - } - - let packets = packets?; + let packets = blob.read().unwrap().load_packets(); s.send(Packets::new(packets))?; }