tighten up packets_to_blobs (#4464)
* tighten up packets_to_blobs * missed a test
This commit is contained in:
parent
c05c3e69ca
commit
6a1de33138
@ -75,7 +75,7 @@ impl PartialEq for Packet {
|
||||
fn eq(&self, other: &Packet) -> bool {
|
||||
let self_data: &[u8] = self.data.as_ref();
|
||||
let other_data: &[u8] = other.data.as_ref();
|
||||
self.meta == other.meta && self_data == other_data
|
||||
self.meta == other.meta && self_data[..self.meta.size] == other_data[..other.meta.size]
|
||||
}
|
||||
}
|
||||
|
||||
@ -322,32 +322,6 @@ pub fn packets_to_blobs<T: Borrow<Packet>>(packets: &[T]) -> Vec<Blob> {
|
||||
blobs
|
||||
}
|
||||
|
||||
pub fn deserialize_packets_in_blob(
|
||||
data: &[u8],
|
||||
serialized_packet_size: usize,
|
||||
serialized_meta_size: usize,
|
||||
) -> Result<Vec<Packet>> {
|
||||
let mut packets: Vec<Packet> = Vec::with_capacity(data.len() / serialized_packet_size);
|
||||
let mut pos = 0;
|
||||
while pos + serialized_packet_size <= data.len() {
|
||||
let packet = deserialize_single_packet_in_blob(
|
||||
&data[pos..pos + serialized_packet_size],
|
||||
serialized_meta_size,
|
||||
)?;
|
||||
pos += serialized_packet_size;
|
||||
packets.push(packet);
|
||||
}
|
||||
Ok(packets)
|
||||
}
|
||||
|
||||
fn deserialize_single_packet_in_blob(data: &[u8], serialized_meta_size: usize) -> Result<Packet> {
|
||||
let meta = bincode::deserialize(&data[..serialized_meta_size])?;
|
||||
let mut packet_data = [0; PACKET_DATA_SIZE];
|
||||
packet_data
|
||||
.copy_from_slice(&data[serialized_meta_size..serialized_meta_size + PACKET_DATA_SIZE]);
|
||||
Ok(Packet::new(packet_data, meta))
|
||||
}
|
||||
|
||||
macro_rules! range {
|
||||
($prev:expr, $type:ident) => {
|
||||
$prev..$prev + size_of::<$type>()
|
||||
@ -510,10 +484,11 @@ impl Blob {
|
||||
let mut written = 0;
|
||||
let mut last_index = 0;
|
||||
for packet in packets {
|
||||
if bincode::serialize_into(&mut cursor, &packet.borrow().meta).is_err() {
|
||||
if bincode::serialize_into(&mut cursor, &packet.borrow().meta.size).is_err() {
|
||||
break;
|
||||
}
|
||||
if cursor.write_all(&packet.borrow().data[..]).is_err() {
|
||||
let packet = packet.borrow();
|
||||
if cursor.write_all(&packet.data[..packet.meta.size]).is_err() {
|
||||
break;
|
||||
}
|
||||
|
||||
@ -525,6 +500,32 @@ impl Blob {
|
||||
last_index
|
||||
}
|
||||
|
||||
// other side of store_packets
|
||||
pub fn load_packets(&self) -> Vec<Packet> {
|
||||
// rough estimate
|
||||
let mut packets: Vec<Packet> = Vec::with_capacity(self.size() / PACKET_DATA_SIZE);
|
||||
let mut pos = 0;
|
||||
let size_len = bincode::serialized_size(&0usize).unwrap() as usize;
|
||||
|
||||
while pos + size_len < self.size() {
|
||||
let size: usize = bincode::deserialize_from(&self.data()[pos..]).unwrap();
|
||||
|
||||
pos += size_len;
|
||||
|
||||
if size > PACKET_DATA_SIZE || pos + size > self.size() {
|
||||
break;
|
||||
}
|
||||
|
||||
let mut packet = Packet::default();
|
||||
packet.meta.size = size;
|
||||
packet.data[..size].copy_from_slice(&self.data()[pos..pos + size]);
|
||||
|
||||
pos += size;
|
||||
packets.push(packet);
|
||||
}
|
||||
packets
|
||||
}
|
||||
|
||||
pub fn recv_blob(socket: &UdpSocket, r: &SharedBlob) -> io::Result<()> {
|
||||
let mut p = r.write().unwrap();
|
||||
trace!("receiving on {}", socket.local_addr().unwrap());
|
||||
@ -741,12 +742,17 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_store_blobs_max() {
|
||||
let meta = Meta::default();
|
||||
let serialized_meta_size = bincode::serialized_size(&meta).unwrap() as usize;
|
||||
let serialized_packet_size = serialized_meta_size + PACKET_DATA_SIZE;
|
||||
let serialized_size_size = bincode::serialized_size(&0usize).unwrap() as usize;
|
||||
let serialized_packet_size = serialized_size_size + PACKET_DATA_SIZE;
|
||||
let num_packets = (BLOB_SIZE - BLOB_HEADER_SIZE) / serialized_packet_size + 1;
|
||||
let mut blob = Blob::default();
|
||||
let packets: Vec<_> = (0..num_packets).map(|_| Packet::default()).collect();
|
||||
let packets: Vec<_> = (0..num_packets)
|
||||
.map(|_| {
|
||||
let mut packet = Packet::default();
|
||||
packet.meta.size = PACKET_DATA_SIZE;
|
||||
packet
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Everything except the last packet should have been written
|
||||
assert_eq!(blob.store_packets(&packets[..]), (num_packets - 1) as u64);
|
||||
@ -768,21 +774,12 @@ mod tests {
|
||||
#[test]
|
||||
fn test_packets_to_blobs() {
|
||||
let mut rng = rand::thread_rng();
|
||||
let meta = Meta::default();
|
||||
let serialized_meta_size = bincode::serialized_size(&meta).unwrap() as usize;
|
||||
let serialized_packet_size = serialized_meta_size + PACKET_DATA_SIZE;
|
||||
|
||||
let packets_per_blob = (BLOB_SIZE - BLOB_HEADER_SIZE) / serialized_packet_size;
|
||||
assert!(packets_per_blob > 1);
|
||||
let num_packets = packets_per_blob * 10 + packets_per_blob - 1;
|
||||
|
||||
let packets: Vec<_> = (0..num_packets)
|
||||
let packets: Vec<_> = (0..2)
|
||||
.map(|_| {
|
||||
let mut packet = Packet::default();
|
||||
for i in 0..packet.meta.addr.len() {
|
||||
packet.meta.addr[i] = rng.gen_range(1, std::u16::MAX);
|
||||
}
|
||||
for i in 0..packet.data.len() {
|
||||
packet.meta.size = rng.gen_range(1, PACKET_DATA_SIZE);
|
||||
for i in 0..packet.meta.size {
|
||||
packet.data[i] = rng.gen_range(1, std::u8::MAX);
|
||||
}
|
||||
packet
|
||||
@ -790,55 +787,13 @@ mod tests {
|
||||
.collect();
|
||||
|
||||
let blobs = packets_to_blobs(&packets[..]);
|
||||
assert_eq!(blobs.len(), 11);
|
||||
|
||||
let reconstructed_packets: Vec<Packet> = blobs
|
||||
.iter()
|
||||
.flat_map(|b| {
|
||||
deserialize_packets_in_blob(
|
||||
&b.data()[..b.size()],
|
||||
serialized_packet_size,
|
||||
serialized_meta_size,
|
||||
)
|
||||
.unwrap()
|
||||
})
|
||||
.collect();
|
||||
let reconstructed_packets: Vec<Packet> =
|
||||
blobs.iter().flat_map(|b| b.load_packets()).collect();
|
||||
|
||||
assert_eq!(reconstructed_packets, packets);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_deserialize_packets_in_blob() {
|
||||
let meta = Meta::default();
|
||||
let serialized_meta_size = bincode::serialized_size(&meta).unwrap() as usize;
|
||||
let serialized_packet_size = serialized_meta_size + PACKET_DATA_SIZE;
|
||||
let num_packets = 10;
|
||||
let mut rng = rand::thread_rng();
|
||||
let packets: Vec<_> = (0..num_packets)
|
||||
.map(|_| {
|
||||
let mut packet = Packet::default();
|
||||
for i in 0..packet.meta.addr.len() {
|
||||
packet.meta.addr[i] = rng.gen_range(1, std::u16::MAX);
|
||||
}
|
||||
for i in 0..packet.data.len() {
|
||||
packet.data[i] = rng.gen_range(1, std::u8::MAX);
|
||||
}
|
||||
packet
|
||||
})
|
||||
.collect();
|
||||
|
||||
let mut blob = Blob::default();
|
||||
assert_eq!(blob.store_packets(&packets[..]), num_packets);
|
||||
let result = deserialize_packets_in_blob(
|
||||
&blob.data()[..blob.size()],
|
||||
serialized_packet_size,
|
||||
serialized_meta_size,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(result, packets);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_blob_data_align() {
|
||||
assert_eq!(std::mem::align_of::<BlobData>(), BLOB_DATA_ALIGN);
|
||||
@ -846,13 +801,21 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_packet_partial_eq() {
|
||||
let p1 = Packet::default();
|
||||
let mut p1 = Packet::default();
|
||||
let mut p2 = Packet::default();
|
||||
|
||||
p1.meta.size = 1;
|
||||
p1.data[0] = 0;
|
||||
|
||||
p2.meta.size = 1;
|
||||
p2.data[0] = 0;
|
||||
|
||||
assert!(p1 == p2);
|
||||
p2.data[1] = 4;
|
||||
|
||||
p2.data[0] = 4;
|
||||
assert!(p1 != p2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_blob_partial_eq() {
|
||||
let p1 = Blob::default();
|
||||
|
@ -1,11 +1,8 @@
|
||||
//! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets.
|
||||
//!
|
||||
|
||||
use crate::packet::{
|
||||
deserialize_packets_in_blob, Blob, Meta, Packets, SharedBlobs, PACKET_DATA_SIZE,
|
||||
};
|
||||
use crate::packet::{Blob, Packets, SharedBlobs};
|
||||
use crate::result::{Error, Result};
|
||||
use bincode;
|
||||
use solana_sdk::timing::duration_as_ms;
|
||||
use std::net::UdpSocket;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
@ -135,25 +132,9 @@ fn recv_blob_packets(sock: &UdpSocket, s: &PacketSender) -> Result<()> {
|
||||
sock.local_addr().unwrap()
|
||||
);
|
||||
|
||||
let meta = Meta::default();
|
||||
let serialized_meta_size = bincode::serialized_size(&meta)? as usize;
|
||||
let serialized_packet_size = serialized_meta_size + PACKET_DATA_SIZE;
|
||||
let blobs = Blob::recv_from(sock)?;
|
||||
for blob in blobs {
|
||||
let r_blob = blob.read().unwrap();
|
||||
let data = {
|
||||
let msg_size = r_blob.size();
|
||||
&r_blob.data()[..msg_size]
|
||||
};
|
||||
|
||||
let packets =
|
||||
deserialize_packets_in_blob(data, serialized_packet_size, serialized_meta_size);
|
||||
|
||||
if packets.is_err() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let packets = packets?;
|
||||
let packets = blob.read().unwrap().load_packets();
|
||||
s.send(Packets::new(packets))?;
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user