diff --git a/core/src/packet.rs b/core/src/packet.rs index 2824195878..032f15f5f7 100644 --- a/core/src/packet.rs +++ b/core/src/packet.rs @@ -10,6 +10,8 @@ use solana_sdk::pubkey::Pubkey; use std::cmp; use std::fmt; use std::io; +use std::io::Cursor; +use std::io::Write; use std::mem::size_of; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}; use std::sync::{Arc, RwLock}; @@ -23,7 +25,7 @@ pub const BLOB_SIZE: usize = (64 * 1024 - 128); // wikipedia says there should b pub const BLOB_DATA_SIZE: usize = BLOB_SIZE - (BLOB_HEADER_SIZE * 2); pub const NUM_BLOBS: usize = (NUM_PACKETS * PACKET_DATA_SIZE) / BLOB_SIZE; -#[derive(Clone, Default, Debug, PartialEq)] +#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)] #[repr(C)] pub struct Meta { pub size: usize, @@ -40,6 +42,12 @@ pub struct Packet { pub meta: Meta, } +impl Packet { + pub fn new(data: [u8; PACKET_DATA_SIZE], meta: Meta) -> Self { + Packet { data, meta } + } +} + impl fmt::Debug for Packet { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( @@ -60,6 +68,12 @@ impl Default for Packet { } } +impl PartialEq for Packet { + fn eq(&self, other: &Packet) -> bool { + self.data.iter().zip(other.data.iter()).all(|(a, b)| a == b) && self.meta == other.meta + } +} + impl Meta { pub fn addr(&self) -> SocketAddr { if !self.v6 { @@ -405,6 +419,19 @@ impl Blob { self.set_data_size(new_size as u64); } + pub fn store_packets(&mut self, packets: &[Packet]) -> Result<()> { + let mut cursor = Cursor::new(&mut self.data[..]); + cursor.set_position(BLOB_HEADER_SIZE as u64); + for packet in packets { + serialize_into(&mut cursor, &packet.meta)?; + cursor.write(&packet.data[..])?; + } + let size = cursor.position(); + self.set_size(size as usize); + + Ok(()) + } + pub fn recv_blob(socket: &UdpSocket, r: &SharedBlob) -> io::Result<()> { let mut p = r.write().unwrap(); trace!("receiving on {}", socket.local_addr().unwrap()); diff --git a/core/src/streamer.rs b/core/src/streamer.rs index fed499f7fa..3f161a141a 100644 --- a/core/src/streamer.rs +++ b/core/src/streamer.rs @@ -1,9 +1,9 @@ //! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets. //! -use crate::packet::{Blob, Packet, Packets, SharedBlobs, SharedPackets}; +use crate::packet::{Blob, Meta, Packet, Packets, SharedBlobs, SharedPackets, PACKET_DATA_SIZE}; use crate::result::{Error, Result}; -use bincode::deserialize; +use bincode::{deserialize, serialized_size}; use solana_metrics::{influxdb, submit}; use solana_sdk::timing::duration_as_ms; use std::net::UdpSocket; @@ -140,19 +140,58 @@ pub fn blob_receiver( .unwrap() } +fn deserialize_single_packet_in_blob(data: &[u8], serialized_meta_size: usize) -> Result { + let meta = deserialize(&data[..serialized_meta_size])?; + let mut packet_data = [0; PACKET_DATA_SIZE]; + packet_data + .copy_from_slice(&data[serialized_meta_size..serialized_meta_size + PACKET_DATA_SIZE]); + Ok(Packet::new(packet_data, meta)) +} + +fn deserialize_packets_in_blob( + data: &[u8], + serialized_packet_size: usize, + serialized_meta_size: usize, +) -> Result> { + let mut packets: Vec = Vec::with_capacity(data.len() / serialized_packet_size); + let mut pos = 0; + while pos + serialized_packet_size <= data.len() { + let packet = deserialize_single_packet_in_blob( + &data[pos..pos + serialized_packet_size], + serialized_meta_size, + )?; + pos += serialized_packet_size; + packets.push(packet); + } + Ok(packets) +} + fn recv_blob_packets(sock: &UdpSocket, s: &PacketSender) -> Result<()> { trace!( "recv_blob_packets: receiving on {}", sock.local_addr().unwrap() ); + + let meta = Meta::default(); + let serialized_meta_size = serialized_size(&meta)? as usize; + let serialized_packet_size = serialized_meta_size + PACKET_DATA_SIZE; let blobs = Blob::recv_from(sock)?; for blob in blobs { - let msgs: Vec = { - let r_blob = blob.read().unwrap(); + let r_blob = blob.read().unwrap(); + let data = { let msg_size = r_blob.size(); - deserialize(&r_blob.data()[..msg_size])? + &r_blob.data()[..msg_size] }; - s.send(Arc::new(RwLock::new(Packets::new(msgs))))?; + + let packets = + deserialize_packets_in_blob(data, serialized_packet_size, serialized_meta_size); + + if packets.is_err() { + continue; + } + + let packets = packets?; + s.send(Arc::new(RwLock::new(Packets::new(packets))))?; } Ok(()) @@ -182,9 +221,10 @@ pub fn blob_packet_receiver( #[cfg(test)] mod test { - use crate::packet::{Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE}; - use crate::streamer::PacketReceiver; + use super::*; + use crate::packet::{Blob, Meta, Packet, Packets, SharedBlob, PACKET_DATA_SIZE}; use crate::streamer::{receiver, responder}; + use rand::Rng; use std::io; use std::io::Write; use std::net::UdpSocket; @@ -246,4 +286,36 @@ mod test { t_receiver.join().expect("join"); t_responder.join().expect("join"); } + + #[test] + pub fn streamer_test_deserialize_packets_in_blob() { + let meta = Meta::default(); + let serialized_meta_size = serialized_size(&meta).unwrap() as usize; + let serialized_packet_size = serialized_meta_size + PACKET_DATA_SIZE; + let num_packets = 10; + let mut rng = rand::thread_rng(); + let packets: Vec<_> = (0..num_packets) + .map(|_| { + let mut packet = Packet::default(); + for i in 0..packet.meta.addr.len() { + packet.meta.addr[i] = rng.gen_range(1, std::u16::MAX); + } + for i in 0..packet.data.len() { + packet.data[i] = rng.gen_range(1, std::u8::MAX); + } + packet + }) + .collect(); + + let mut blob = Blob::default(); + blob.store_packets(&packets[..]).unwrap(); + let result = deserialize_packets_in_blob( + &blob.data()[..blob.size()], + serialized_packet_size, + serialized_meta_size, + ) + .unwrap(); + + assert_eq!(result, packets); + } }