From 379feecae5e94d53c3731b2de64c7cbbf677710e Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Tue, 4 Jan 2022 09:37:44 -0500 Subject: [PATCH] patches bug in recv_mmsg when npkts != nrecv If recv_mmsg receives 2 packets where the first one is filtered out, then it returns npkts == 1: https://github.com/solana-labs/solana/blob/01a096adc/streamer/src/recvmmsg.rs#L104-L115 But then streamer::packet::recv_from will erroneously keep the 1st packet and drop the 2nd one: https://github.com/solana-labs/solana/blob/01a096adc/streamer/src/packet.rs#L34-L49 To avoid this bug, this commit updates recv_mmsg to always return total number of received packets. If socket address cannot be correctly obtained, it is left as the default value which is UNSPECIFIED: https://github.com/solana-labs/solana/blob/01a096adc/sdk/src/packet.rs#L145 --- streamer/src/packet.rs | 4 ++++ streamer/src/recvmmsg.rs | 39 ++++++++++++++++++++++---------------- streamer/tests/recvmmsg.rs | 5 ++++- 3 files changed, 31 insertions(+), 17 deletions(-) diff --git a/streamer/src/packet.rs b/streamer/src/packet.rs index 018fae4531..34404143f1 100644 --- a/streamer/src/packet.rs +++ b/streamer/src/packet.rs @@ -112,6 +112,10 @@ mod tests { } send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap(); + batch + .packets + .iter_mut() + .for_each(|pkt| pkt.meta = Meta::default()); let recvd = recv_from(&mut batch, &recv_socket, 1).unwrap(); assert_eq!(recvd, batch.packets.len()); diff --git a/streamer/src/recvmmsg.rs b/streamer/src/recvmmsg.rs index ec734f1d0a..b9713a2b72 100644 --- a/streamer/src/recvmmsg.rs +++ b/streamer/src/recvmmsg.rs @@ -2,7 +2,7 @@ pub use solana_perf::packet::NUM_RCVMMSGS; use { - crate::packet::Packet, + crate::packet::{Meta, Packet}, std::{cmp, io, net::UdpSocket}, }; #[cfg(target_os = "linux")] @@ -15,6 +15,7 @@ use { #[cfg(not(target_os = "linux"))] pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result { + debug_assert!(packets.iter().all(|pkt| pkt.meta == Meta::default())); let mut i = 0; let count = cmp::min(NUM_RCVMMSGS, packets.len()); for p in packets.iter_mut().take(count) { @@ -66,6 +67,8 @@ fn cast_socket_addr(addr: &sockaddr_storage, hdr: &mmsghdr) -> Option #[cfg(target_os = "linux")] #[allow(clippy::uninit_assumed_init)] pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result { + // Assert that there are no leftovers in packets. + debug_assert!(packets.iter().all(|pkt| pkt.meta == Meta::default())); const SOCKADDR_STORAGE_SIZE: usize = mem::size_of::(); let mut hdrs: [mmsghdr; NUM_RCVMMSGS] = unsafe { mem::zeroed() }; @@ -93,23 +96,18 @@ pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result= TEST_BATCH_SIZE { break; } + packets + .iter_mut() + .for_each(|pkt| pkt.meta = Meta::default()); } elapsed_in_small_batch += now.elapsed().as_nanos(); assert_eq!(TEST_BATCH_SIZE, recv);