diff --git a/core/src/packet.rs b/core/src/packet.rs index 255c485fbb..62bd8aba8c 100644 --- a/core/src/packet.rs +++ b/core/src/packet.rs @@ -18,6 +18,7 @@ use std::mem::size_of; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}; use std::ops::{Deref, DerefMut}; use std::sync::{Arc, RwLock}; +use std::time::Instant; pub type SharedBlob = Arc>; pub type SharedBlobs = Vec; @@ -222,23 +223,30 @@ impl Packets { // * set it back to blocking before returning socket.set_nonblocking(false)?; trace!("receiving on {}", socket.local_addr().unwrap()); + let start = Instant::now(); + let mut total_size = 0; loop { self.packets.resize(i + NUM_RCVMMSGS, Packet::default()); match recv_mmsg(socket, &mut self.packets[i..]) { Err(_) if i > 0 => { - break; + if start.elapsed().as_millis() > 1 { + break; + } } Err(e) => { trace!("recv_from err {:?}", e); return Err(Error::IO(e)); } - Ok(npkts) => { + Ok((size, npkts)) => { if i == 0 { socket.set_nonblocking(true)?; } trace!("got {} packets", npkts); i += npkts; - if npkts != NUM_RCVMMSGS || i >= 1024 { + total_size += size; + // Try to batch into blob-sized buffers + // will cause less re-shuffling later on. + if start.elapsed().as_millis() > 1 || total_size >= (BLOB_DATA_SIZE - 4096) { break; } } diff --git a/core/src/recvmmsg.rs b/core/src/recvmmsg.rs index 6a57da66a5..99d5dc83ee 100644 --- a/core/src/recvmmsg.rs +++ b/core/src/recvmmsg.rs @@ -8,9 +8,10 @@ use std::net::UdpSocket; pub const NUM_RCVMMSGS: usize = 16; #[cfg(not(target_os = "linux"))] -pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result { +pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result<(usize, usize)> { let mut i = 0; let count = cmp::min(NUM_RCVMMSGS, packets.len()); + let mut total_size = 0; for p in packets.iter_mut().take(count) { p.meta.size = 0; match socket.recv_from(&mut p.data) { @@ -21,6 +22,7 @@ pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result { + total_size += nrecv; p.meta.size = nrecv; p.meta.set_addr(&from); if i == 0 { @@ -30,11 +32,11 @@ pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result io::Result { +pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result<(usize, usize)> { use libc::{ c_void, iovec, mmsghdr, recvmmsg, sockaddr_in, socklen_t, time_t, timespec, MSG_WAITFORONE, }; @@ -65,6 +67,7 @@ pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result tv_nsec: 0, }; + let mut total_size = 0; let npkts = match unsafe { recvmmsg(sock_fd, &mut hdrs[0], count as u32, MSG_WAITFORONE, &mut ts) } { -1 => return Err(io::Error::last_os_error()), @@ -72,6 +75,7 @@ pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result for i in 0..n as usize { let mut p = &mut packets[i]; p.meta.size = hdrs[i].msg_len as usize; + total_size += p.meta.size; let inet_addr = InetAddr::V4(addr[i]); p.meta.set_addr(&inet_addr.to_std()); } @@ -79,7 +83,7 @@ pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result } }; - Ok(npkts) + Ok((total_size, npkts)) } #[cfg(test)] @@ -101,7 +105,7 @@ mod tests { } let mut packets = vec![Packet::default(); NUM_RCVMMSGS]; - let recv = recv_mmsg(&reader, &mut packets[..]).unwrap(); + let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1; assert_eq!(sent, recv); for i in 0..recv { assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE); @@ -122,14 +126,14 @@ mod tests { } let mut packets = vec![Packet::default(); NUM_RCVMMSGS * 2]; - let recv = recv_mmsg(&reader, &mut packets[..]).unwrap(); + let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1; assert_eq!(NUM_RCVMMSGS, recv); for i in 0..recv { assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE); assert_eq!(packets[i].meta.addr(), saddr); } - let recv = recv_mmsg(&reader, &mut packets[..]).unwrap(); + let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1; assert_eq!(sent - NUM_RCVMMSGS, recv); for i in 0..recv { assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE); @@ -153,7 +157,7 @@ mod tests { let start = Instant::now(); let mut packets = vec![Packet::default(); NUM_RCVMMSGS * 2]; - let recv = recv_mmsg(&reader, &mut packets[..]).unwrap(); + let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1; assert_eq!(NUM_RCVMMSGS, recv); for i in 0..recv { assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE); @@ -190,7 +194,7 @@ mod tests { let mut packets = vec![Packet::default(); NUM_RCVMMSGS * 2]; - let recv = recv_mmsg(&reader, &mut packets[..]).unwrap(); + let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1; assert_eq!(NUM_RCVMMSGS, recv); for i in 0..sent1 { assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE); @@ -202,7 +206,7 @@ mod tests { assert_eq!(packets[i].meta.addr(), saddr2); } - let recv = recv_mmsg(&reader, &mut packets[..]).unwrap(); + let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1; assert_eq!(sent1 + sent2 - NUM_RCVMMSGS, recv); for i in 0..recv { assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE); diff --git a/core/src/streamer.rs b/core/src/streamer.rs index 6cc0a93a29..b565534004 100644 --- a/core/src/streamer.rs +++ b/core/src/streamer.rs @@ -178,9 +178,12 @@ mod test { fn get_msgs(r: PacketReceiver, num: &mut usize) -> Result<()> { for _ in 0..10 { - let m = r.recv_timeout(Duration::new(1, 0))?; + let m = r.recv_timeout(Duration::new(1, 0)); + if m.is_err() { + continue; + } - *num -= m.packets.len(); + *num -= m.unwrap().packets.len(); if *num == 0 { break;