From 788296047ad141bf4889f02e934d139289343ce8 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Tue, 8 Oct 2019 09:54:49 -0700 Subject: [PATCH] Increase batch size for recvmmsg() (#6260) * Increase batch size for recvmmsg() * fix tests * new test --- core/src/recvmmsg.rs | 69 +++++++++++++++++++++++++++-------- core/src/shred_fetch_stage.rs | 4 +- core/src/streamer.rs | 25 ++++++++++++- 3 files changed, 80 insertions(+), 18 deletions(-) diff --git a/core/src/recvmmsg.rs b/core/src/recvmmsg.rs index 99d5dc83ee..203c364cca 100644 --- a/core/src/recvmmsg.rs +++ b/core/src/recvmmsg.rs @@ -5,7 +5,7 @@ use std::cmp; use std::io; use std::net::UdpSocket; -pub const NUM_RCVMMSGS: usize = 16; +pub const NUM_RCVMMSGS: usize = 128; #[cfg(not(target_os = "linux"))] pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result<(usize, usize)> { @@ -92,19 +92,20 @@ mod tests { use crate::recvmmsg::*; use std::time::{Duration, Instant}; + const TEST_NUM_MSGS: usize = 32; #[test] pub fn test_recv_mmsg_one_iter() { let reader = UdpSocket::bind("127.0.0.1:0").expect("bind"); let addr = reader.local_addr().unwrap(); let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); let saddr = sender.local_addr().unwrap(); - let sent = NUM_RCVMMSGS - 1; + let sent = TEST_NUM_MSGS - 1; for _ in 0..sent { let data = [0; PACKET_DATA_SIZE]; sender.send_to(&data[..], &addr).unwrap(); } - let mut packets = vec![Packet::default(); NUM_RCVMMSGS]; + let mut packets = vec![Packet::default(); TEST_NUM_MSGS]; let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1; assert_eq!(sent, recv); for i in 0..recv { @@ -119,22 +120,22 @@ mod tests { let addr = reader.local_addr().unwrap(); let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); let saddr = sender.local_addr().unwrap(); - let sent = NUM_RCVMMSGS + 10; + let sent = TEST_NUM_MSGS + 10; for _ in 0..sent { let data = [0; PACKET_DATA_SIZE]; sender.send_to(&data[..], &addr).unwrap(); } - let mut packets = vec![Packet::default(); NUM_RCVMMSGS * 2]; + let mut packets = vec![Packet::default(); TEST_NUM_MSGS]; let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1; - assert_eq!(NUM_RCVMMSGS, recv); + assert_eq!(TEST_NUM_MSGS, recv); for i in 0..recv { assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE); assert_eq!(packets[i].meta.addr(), saddr); } let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1; - assert_eq!(sent - NUM_RCVMMSGS, recv); + assert_eq!(sent - TEST_NUM_MSGS, recv); for i in 0..recv { assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE); assert_eq!(packets[i].meta.addr(), saddr); @@ -149,16 +150,16 @@ mod tests { reader.set_nonblocking(false).unwrap(); let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); let saddr = sender.local_addr().unwrap(); - let sent = NUM_RCVMMSGS; + let sent = TEST_NUM_MSGS; for _ in 0..sent { let data = [0; PACKET_DATA_SIZE]; sender.send_to(&data[..], &addr).unwrap(); } let start = Instant::now(); - let mut packets = vec![Packet::default(); NUM_RCVMMSGS * 2]; + let mut packets = vec![Packet::default(); TEST_NUM_MSGS]; let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1; - assert_eq!(NUM_RCVMMSGS, recv); + assert_eq!(TEST_NUM_MSGS, recv); for i in 0..recv { assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE); assert_eq!(packets[i].meta.addr(), saddr); @@ -176,11 +177,11 @@ mod tests { let sender1 = UdpSocket::bind("127.0.0.1:0").expect("bind"); let saddr1 = sender1.local_addr().unwrap(); - let sent1 = NUM_RCVMMSGS - 1; + let sent1 = TEST_NUM_MSGS - 1; let sender2 = UdpSocket::bind("127.0.0.1:0").expect("bind"); let saddr2 = sender2.local_addr().unwrap(); - let sent2 = NUM_RCVMMSGS + 1; + let sent2 = TEST_NUM_MSGS + 1; for _ in 0..sent1 { let data = [0; PACKET_DATA_SIZE]; @@ -192,10 +193,10 @@ mod tests { sender2.send_to(&data[..], &addr).unwrap(); } - let mut packets = vec![Packet::default(); NUM_RCVMMSGS * 2]; + let mut packets = vec![Packet::default(); TEST_NUM_MSGS]; let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1; - assert_eq!(NUM_RCVMMSGS, recv); + assert_eq!(TEST_NUM_MSGS, recv); for i in 0..sent1 { assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE); assert_eq!(packets[i].meta.addr(), saddr1); @@ -207,10 +208,48 @@ mod tests { } let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1; - assert_eq!(sent1 + sent2 - NUM_RCVMMSGS, recv); + assert_eq!(sent1 + sent2 - TEST_NUM_MSGS, recv); for i in 0..recv { assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE); assert_eq!(packets[i].meta.addr(), saddr2); } } + + #[cfg(target_os = "linux")] + #[test] + pub fn test_recv_mmsg_batch_size() { + let reader = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let addr = reader.local_addr().unwrap(); + let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let sent = TEST_NUM_MSGS; + for _ in 0..sent { + let data = [0; PACKET_DATA_SIZE]; + sender.send_to(&data[..], &addr).unwrap(); + } + + let now = Instant::now(); + let mut packets = vec![Packet::default(); TEST_NUM_MSGS]; + let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1; + assert_eq!(TEST_NUM_MSGS, recv); + let elapsed_in_max_batch = now.elapsed().as_millis(); + + for _ in 0..sent { + let data = [0; PACKET_DATA_SIZE]; + sender.send_to(&data[..], &addr).unwrap(); + } + + let now = Instant::now(); + let mut packets = vec![Packet::default(); TEST_NUM_MSGS / 4]; + let mut recv = 0; + while let Ok(num) = recv_mmsg(&reader, &mut packets[..]) { + recv += num.1; + if recv >= TEST_NUM_MSGS { + break; + } + } + assert_eq!(TEST_NUM_MSGS, recv); + let elapsed_in_small_batch = now.elapsed().as_millis(); + + assert!(elapsed_in_max_batch <= elapsed_in_small_batch); + } } diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 778b922026..bc4c9289c3 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -52,7 +52,7 @@ impl ShredFetchStage { &exit, sender.clone(), recycler.clone(), - "blob_fetch_stage", + "shred_fetch_stage", ) }); @@ -63,7 +63,7 @@ impl ShredFetchStage { &exit, forward_sender.clone(), recycler.clone(), - "blob_fetch_stage", + "shred_fetch_stage", ) }); diff --git a/core/src/streamer.rs b/core/src/streamer.rs index ef773df5bd..e69240c7f6 100644 --- a/core/src/streamer.rs +++ b/core/src/streamer.rs @@ -2,6 +2,7 @@ //! use crate::packet::{Blob, Packets, PacketsRecycler, SharedBlobs, PACKETS_PER_BATCH}; +use crate::recvmmsg::NUM_RCVMMSGS; use crate::result::{Error, Result}; use solana_sdk::timing::duration_as_ms; use std::net::UdpSocket; @@ -23,6 +24,10 @@ fn recv_loop( recycler: &PacketsRecycler, name: &'static str, ) -> Result<()> { + let mut recv_count = 0; + let mut call_count = 0; + let mut now = Instant::now(); + let mut num_max_received = 0; // Number of times maximum packets were received loop { let mut msgs = Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BATCH, name); loop { @@ -31,11 +36,29 @@ fn recv_loop( if exit.load(Ordering::Relaxed) { return Ok(()); } - if let Ok(_len) = msgs.recv_from(sock) { + if let Ok(len) = msgs.recv_from(sock) { + if len == NUM_RCVMMSGS { + num_max_received += 1; + } + recv_count += len; + call_count += 1; channel.send(msgs)?; break; } } + if recv_count > 1024 { + datapoint_info!( + "receiver-stats", + ("received", recv_count as i64, i64), + ("call_count", i64::from(call_count), i64), + ("elapsed", now.elapsed().as_millis() as i64, i64), + ("max_received", i64::from(num_max_received), i64), + ); + recv_count = 0; + call_count = 0; + now = Instant::now(); + num_max_received = 0; + } } }