diff --git a/streamer/src/packet.rs b/streamer/src/packet.rs index 018fae4531..34404143f1 100644 --- a/streamer/src/packet.rs +++ b/streamer/src/packet.rs @@ -112,6 +112,10 @@ mod tests { } send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap(); + batch + .packets + .iter_mut() + .for_each(|pkt| pkt.meta = Meta::default()); let recvd = recv_from(&mut batch, &recv_socket, 1).unwrap(); assert_eq!(recvd, batch.packets.len()); diff --git a/streamer/src/recvmmsg.rs b/streamer/src/recvmmsg.rs index ec734f1d0a..b9713a2b72 100644 --- a/streamer/src/recvmmsg.rs +++ b/streamer/src/recvmmsg.rs @@ -2,7 +2,7 @@ pub use solana_perf::packet::NUM_RCVMMSGS; use { - crate::packet::Packet, + crate::packet::{Meta, Packet}, std::{cmp, io, net::UdpSocket}, }; #[cfg(target_os = "linux")] @@ -15,6 +15,7 @@ use { #[cfg(not(target_os = "linux"))] pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result { + debug_assert!(packets.iter().all(|pkt| pkt.meta == Meta::default())); let mut i = 0; let count = cmp::min(NUM_RCVMMSGS, packets.len()); for p in packets.iter_mut().take(count) { @@ -66,6 +67,8 @@ fn cast_socket_addr(addr: &sockaddr_storage, hdr: &mmsghdr) -> Option #[cfg(target_os = "linux")] #[allow(clippy::uninit_assumed_init)] pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result { + // Assert that there are no leftovers in packets. + debug_assert!(packets.iter().all(|pkt| pkt.meta == Meta::default())); const SOCKADDR_STORAGE_SIZE: usize = mem::size_of::(); let mut hdrs: [mmsghdr; NUM_RCVMMSGS] = unsafe { mem::zeroed() }; @@ -93,23 +96,18 @@ pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result= TEST_BATCH_SIZE { break; } + packets + .iter_mut() + .for_each(|pkt| pkt.meta = Meta::default()); } elapsed_in_small_batch += now.elapsed().as_nanos(); assert_eq!(TEST_BATCH_SIZE, recv);