Coalesce packets better (#4456)

This commit is contained in:
sakridge 2019-05-29 12:17:50 -07:00 committed by GitHub
parent 6a1de33138
commit 4404634b14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 30 additions and 15 deletions

View File

@ -18,6 +18,7 @@ use std::mem::size_of;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket};
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::time::Instant;
pub type SharedBlob = Arc<RwLock<Blob>>; pub type SharedBlob = Arc<RwLock<Blob>>;
pub type SharedBlobs = Vec<SharedBlob>; pub type SharedBlobs = Vec<SharedBlob>;
@ -222,23 +223,30 @@ impl Packets {
// * set it back to blocking before returning // * set it back to blocking before returning
socket.set_nonblocking(false)?; socket.set_nonblocking(false)?;
trace!("receiving on {}", socket.local_addr().unwrap()); trace!("receiving on {}", socket.local_addr().unwrap());
let start = Instant::now();
let mut total_size = 0;
loop { loop {
self.packets.resize(i + NUM_RCVMMSGS, Packet::default()); self.packets.resize(i + NUM_RCVMMSGS, Packet::default());
match recv_mmsg(socket, &mut self.packets[i..]) { match recv_mmsg(socket, &mut self.packets[i..]) {
Err(_) if i > 0 => { Err(_) if i > 0 => {
break; if start.elapsed().as_millis() > 1 {
break;
}
} }
Err(e) => { Err(e) => {
trace!("recv_from err {:?}", e); trace!("recv_from err {:?}", e);
return Err(Error::IO(e)); return Err(Error::IO(e));
} }
Ok(npkts) => { Ok((size, npkts)) => {
if i == 0 { if i == 0 {
socket.set_nonblocking(true)?; socket.set_nonblocking(true)?;
} }
trace!("got {} packets", npkts); trace!("got {} packets", npkts);
i += npkts; i += npkts;
if npkts != NUM_RCVMMSGS || i >= 1024 { total_size += size;
// Try to batch into blob-sized buffers
// will cause less re-shuffling later on.
if start.elapsed().as_millis() > 1 || total_size >= (BLOB_DATA_SIZE - 4096) {
break; break;
} }
} }

View File

@ -8,9 +8,10 @@ use std::net::UdpSocket;
pub const NUM_RCVMMSGS: usize = 16; pub const NUM_RCVMMSGS: usize = 16;
#[cfg(not(target_os = "linux"))] #[cfg(not(target_os = "linux"))]
pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result<usize> { pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result<(usize, usize)> {
let mut i = 0; let mut i = 0;
let count = cmp::min(NUM_RCVMMSGS, packets.len()); let count = cmp::min(NUM_RCVMMSGS, packets.len());
let mut total_size = 0;
for p in packets.iter_mut().take(count) { for p in packets.iter_mut().take(count) {
p.meta.size = 0; p.meta.size = 0;
match socket.recv_from(&mut p.data) { match socket.recv_from(&mut p.data) {
@ -21,6 +22,7 @@ pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result<usize
return Err(e); return Err(e);
} }
Ok((nrecv, from)) => { Ok((nrecv, from)) => {
total_size += nrecv;
p.meta.size = nrecv; p.meta.size = nrecv;
p.meta.set_addr(&from); p.meta.set_addr(&from);
if i == 0 { if i == 0 {
@ -30,11 +32,11 @@ pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result<usize
} }
i += 1; i += 1;
} }
Ok(i) Ok((total_size, i))
} }
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result<usize> { pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result<(usize, usize)> {
use libc::{ use libc::{
c_void, iovec, mmsghdr, recvmmsg, sockaddr_in, socklen_t, time_t, timespec, MSG_WAITFORONE, c_void, iovec, mmsghdr, recvmmsg, sockaddr_in, socklen_t, time_t, timespec, MSG_WAITFORONE,
}; };
@ -65,6 +67,7 @@ pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result<usize>
tv_nsec: 0, tv_nsec: 0,
}; };
let mut total_size = 0;
let npkts = let npkts =
match unsafe { recvmmsg(sock_fd, &mut hdrs[0], count as u32, MSG_WAITFORONE, &mut ts) } { match unsafe { recvmmsg(sock_fd, &mut hdrs[0], count as u32, MSG_WAITFORONE, &mut ts) } {
-1 => return Err(io::Error::last_os_error()), -1 => return Err(io::Error::last_os_error()),
@ -72,6 +75,7 @@ pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result<usize>
for i in 0..n as usize { for i in 0..n as usize {
let mut p = &mut packets[i]; let mut p = &mut packets[i];
p.meta.size = hdrs[i].msg_len as usize; p.meta.size = hdrs[i].msg_len as usize;
total_size += p.meta.size;
let inet_addr = InetAddr::V4(addr[i]); let inet_addr = InetAddr::V4(addr[i]);
p.meta.set_addr(&inet_addr.to_std()); p.meta.set_addr(&inet_addr.to_std());
} }
@ -79,7 +83,7 @@ pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result<usize>
} }
}; };
Ok(npkts) Ok((total_size, npkts))
} }
#[cfg(test)] #[cfg(test)]
@ -101,7 +105,7 @@ mod tests {
} }
let mut packets = vec![Packet::default(); NUM_RCVMMSGS]; let mut packets = vec![Packet::default(); NUM_RCVMMSGS];
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap(); let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1;
assert_eq!(sent, recv); assert_eq!(sent, recv);
for i in 0..recv { for i in 0..recv {
assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE); assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE);
@ -122,14 +126,14 @@ mod tests {
} }
let mut packets = vec![Packet::default(); NUM_RCVMMSGS * 2]; let mut packets = vec![Packet::default(); NUM_RCVMMSGS * 2];
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap(); let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1;
assert_eq!(NUM_RCVMMSGS, recv); assert_eq!(NUM_RCVMMSGS, recv);
for i in 0..recv { for i in 0..recv {
assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE); assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE);
assert_eq!(packets[i].meta.addr(), saddr); assert_eq!(packets[i].meta.addr(), saddr);
} }
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap(); let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1;
assert_eq!(sent - NUM_RCVMMSGS, recv); assert_eq!(sent - NUM_RCVMMSGS, recv);
for i in 0..recv { for i in 0..recv {
assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE); assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE);
@ -153,7 +157,7 @@ mod tests {
let start = Instant::now(); let start = Instant::now();
let mut packets = vec![Packet::default(); NUM_RCVMMSGS * 2]; let mut packets = vec![Packet::default(); NUM_RCVMMSGS * 2];
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap(); let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1;
assert_eq!(NUM_RCVMMSGS, recv); assert_eq!(NUM_RCVMMSGS, recv);
for i in 0..recv { for i in 0..recv {
assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE); assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE);
@ -190,7 +194,7 @@ mod tests {
let mut packets = vec![Packet::default(); NUM_RCVMMSGS * 2]; let mut packets = vec![Packet::default(); NUM_RCVMMSGS * 2];
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap(); let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1;
assert_eq!(NUM_RCVMMSGS, recv); assert_eq!(NUM_RCVMMSGS, recv);
for i in 0..sent1 { for i in 0..sent1 {
assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE); assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE);
@ -202,7 +206,7 @@ mod tests {
assert_eq!(packets[i].meta.addr(), saddr2); assert_eq!(packets[i].meta.addr(), saddr2);
} }
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap(); let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1;
assert_eq!(sent1 + sent2 - NUM_RCVMMSGS, recv); assert_eq!(sent1 + sent2 - NUM_RCVMMSGS, recv);
for i in 0..recv { for i in 0..recv {
assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE); assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE);

View File

@ -178,9 +178,12 @@ mod test {
fn get_msgs(r: PacketReceiver, num: &mut usize) -> Result<()> { fn get_msgs(r: PacketReceiver, num: &mut usize) -> Result<()> {
for _ in 0..10 { for _ in 0..10 {
let m = r.recv_timeout(Duration::new(1, 0))?; let m = r.recv_timeout(Duration::new(1, 0));
if m.is_err() {
continue;
}
*num -= m.packets.len(); *num -= m.unwrap().packets.len();
if *num == 0 { if *num == 0 {
break; break;