Increase batch size for recvmmsg() (#6260)
* Increase batch size for recvmmsg() * fix tests * new test
This commit is contained in:
@ -5,7 +5,7 @@ use std::cmp;
|
|||||||
use std::io;
|
use std::io;
|
||||||
use std::net::UdpSocket;
|
use std::net::UdpSocket;
|
||||||
|
|
||||||
pub const NUM_RCVMMSGS: usize = 16;
|
pub const NUM_RCVMMSGS: usize = 128;
|
||||||
|
|
||||||
#[cfg(not(target_os = "linux"))]
|
#[cfg(not(target_os = "linux"))]
|
||||||
pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result<(usize, usize)> {
|
pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result<(usize, usize)> {
|
||||||
@ -92,19 +92,20 @@ mod tests {
|
|||||||
use crate::recvmmsg::*;
|
use crate::recvmmsg::*;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
|
const TEST_NUM_MSGS: usize = 32;
|
||||||
#[test]
|
#[test]
|
||||||
pub fn test_recv_mmsg_one_iter() {
|
pub fn test_recv_mmsg_one_iter() {
|
||||||
let reader = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
let reader = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||||
let addr = reader.local_addr().unwrap();
|
let addr = reader.local_addr().unwrap();
|
||||||
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||||
let saddr = sender.local_addr().unwrap();
|
let saddr = sender.local_addr().unwrap();
|
||||||
let sent = NUM_RCVMMSGS - 1;
|
let sent = TEST_NUM_MSGS - 1;
|
||||||
for _ in 0..sent {
|
for _ in 0..sent {
|
||||||
let data = [0; PACKET_DATA_SIZE];
|
let data = [0; PACKET_DATA_SIZE];
|
||||||
sender.send_to(&data[..], &addr).unwrap();
|
sender.send_to(&data[..], &addr).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut packets = vec![Packet::default(); NUM_RCVMMSGS];
|
let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
|
||||||
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1;
|
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1;
|
||||||
assert_eq!(sent, recv);
|
assert_eq!(sent, recv);
|
||||||
for i in 0..recv {
|
for i in 0..recv {
|
||||||
@ -119,22 +120,22 @@ mod tests {
|
|||||||
let addr = reader.local_addr().unwrap();
|
let addr = reader.local_addr().unwrap();
|
||||||
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||||
let saddr = sender.local_addr().unwrap();
|
let saddr = sender.local_addr().unwrap();
|
||||||
let sent = NUM_RCVMMSGS + 10;
|
let sent = TEST_NUM_MSGS + 10;
|
||||||
for _ in 0..sent {
|
for _ in 0..sent {
|
||||||
let data = [0; PACKET_DATA_SIZE];
|
let data = [0; PACKET_DATA_SIZE];
|
||||||
sender.send_to(&data[..], &addr).unwrap();
|
sender.send_to(&data[..], &addr).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut packets = vec![Packet::default(); NUM_RCVMMSGS * 2];
|
let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
|
||||||
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1;
|
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1;
|
||||||
assert_eq!(NUM_RCVMMSGS, recv);
|
assert_eq!(TEST_NUM_MSGS, recv);
|
||||||
for i in 0..recv {
|
for i in 0..recv {
|
||||||
assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE);
|
assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE);
|
||||||
assert_eq!(packets[i].meta.addr(), saddr);
|
assert_eq!(packets[i].meta.addr(), saddr);
|
||||||
}
|
}
|
||||||
|
|
||||||
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1;
|
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1;
|
||||||
assert_eq!(sent - NUM_RCVMMSGS, recv);
|
assert_eq!(sent - TEST_NUM_MSGS, recv);
|
||||||
for i in 0..recv {
|
for i in 0..recv {
|
||||||
assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE);
|
assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE);
|
||||||
assert_eq!(packets[i].meta.addr(), saddr);
|
assert_eq!(packets[i].meta.addr(), saddr);
|
||||||
@ -149,16 +150,16 @@ mod tests {
|
|||||||
reader.set_nonblocking(false).unwrap();
|
reader.set_nonblocking(false).unwrap();
|
||||||
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||||
let saddr = sender.local_addr().unwrap();
|
let saddr = sender.local_addr().unwrap();
|
||||||
let sent = NUM_RCVMMSGS;
|
let sent = TEST_NUM_MSGS;
|
||||||
for _ in 0..sent {
|
for _ in 0..sent {
|
||||||
let data = [0; PACKET_DATA_SIZE];
|
let data = [0; PACKET_DATA_SIZE];
|
||||||
sender.send_to(&data[..], &addr).unwrap();
|
sender.send_to(&data[..], &addr).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
let mut packets = vec![Packet::default(); NUM_RCVMMSGS * 2];
|
let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
|
||||||
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1;
|
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1;
|
||||||
assert_eq!(NUM_RCVMMSGS, recv);
|
assert_eq!(TEST_NUM_MSGS, recv);
|
||||||
for i in 0..recv {
|
for i in 0..recv {
|
||||||
assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE);
|
assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE);
|
||||||
assert_eq!(packets[i].meta.addr(), saddr);
|
assert_eq!(packets[i].meta.addr(), saddr);
|
||||||
@ -176,11 +177,11 @@ mod tests {
|
|||||||
|
|
||||||
let sender1 = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
let sender1 = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||||
let saddr1 = sender1.local_addr().unwrap();
|
let saddr1 = sender1.local_addr().unwrap();
|
||||||
let sent1 = NUM_RCVMMSGS - 1;
|
let sent1 = TEST_NUM_MSGS - 1;
|
||||||
|
|
||||||
let sender2 = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
let sender2 = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||||
let saddr2 = sender2.local_addr().unwrap();
|
let saddr2 = sender2.local_addr().unwrap();
|
||||||
let sent2 = NUM_RCVMMSGS + 1;
|
let sent2 = TEST_NUM_MSGS + 1;
|
||||||
|
|
||||||
for _ in 0..sent1 {
|
for _ in 0..sent1 {
|
||||||
let data = [0; PACKET_DATA_SIZE];
|
let data = [0; PACKET_DATA_SIZE];
|
||||||
@ -192,10 +193,10 @@ mod tests {
|
|||||||
sender2.send_to(&data[..], &addr).unwrap();
|
sender2.send_to(&data[..], &addr).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut packets = vec![Packet::default(); NUM_RCVMMSGS * 2];
|
let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
|
||||||
|
|
||||||
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1;
|
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1;
|
||||||
assert_eq!(NUM_RCVMMSGS, recv);
|
assert_eq!(TEST_NUM_MSGS, recv);
|
||||||
for i in 0..sent1 {
|
for i in 0..sent1 {
|
||||||
assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE);
|
assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE);
|
||||||
assert_eq!(packets[i].meta.addr(), saddr1);
|
assert_eq!(packets[i].meta.addr(), saddr1);
|
||||||
@ -207,10 +208,48 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1;
|
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1;
|
||||||
assert_eq!(sent1 + sent2 - NUM_RCVMMSGS, recv);
|
assert_eq!(sent1 + sent2 - TEST_NUM_MSGS, recv);
|
||||||
for i in 0..recv {
|
for i in 0..recv {
|
||||||
assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE);
|
assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE);
|
||||||
assert_eq!(packets[i].meta.addr(), saddr2);
|
assert_eq!(packets[i].meta.addr(), saddr2);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(target_os = "linux")]
|
||||||
|
#[test]
|
||||||
|
pub fn test_recv_mmsg_batch_size() {
|
||||||
|
let reader = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||||
|
let addr = reader.local_addr().unwrap();
|
||||||
|
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||||
|
let sent = TEST_NUM_MSGS;
|
||||||
|
for _ in 0..sent {
|
||||||
|
let data = [0; PACKET_DATA_SIZE];
|
||||||
|
sender.send_to(&data[..], &addr).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
let now = Instant::now();
|
||||||
|
let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
|
||||||
|
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1;
|
||||||
|
assert_eq!(TEST_NUM_MSGS, recv);
|
||||||
|
let elapsed_in_max_batch = now.elapsed().as_millis();
|
||||||
|
|
||||||
|
for _ in 0..sent {
|
||||||
|
let data = [0; PACKET_DATA_SIZE];
|
||||||
|
sender.send_to(&data[..], &addr).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
let now = Instant::now();
|
||||||
|
let mut packets = vec![Packet::default(); TEST_NUM_MSGS / 4];
|
||||||
|
let mut recv = 0;
|
||||||
|
while let Ok(num) = recv_mmsg(&reader, &mut packets[..]) {
|
||||||
|
recv += num.1;
|
||||||
|
if recv >= TEST_NUM_MSGS {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert_eq!(TEST_NUM_MSGS, recv);
|
||||||
|
let elapsed_in_small_batch = now.elapsed().as_millis();
|
||||||
|
|
||||||
|
assert!(elapsed_in_max_batch <= elapsed_in_small_batch);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -52,7 +52,7 @@ impl ShredFetchStage {
|
|||||||
&exit,
|
&exit,
|
||||||
sender.clone(),
|
sender.clone(),
|
||||||
recycler.clone(),
|
recycler.clone(),
|
||||||
"blob_fetch_stage",
|
"shred_fetch_stage",
|
||||||
)
|
)
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -63,7 +63,7 @@ impl ShredFetchStage {
|
|||||||
&exit,
|
&exit,
|
||||||
forward_sender.clone(),
|
forward_sender.clone(),
|
||||||
recycler.clone(),
|
recycler.clone(),
|
||||||
"blob_fetch_stage",
|
"shred_fetch_stage",
|
||||||
)
|
)
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -2,6 +2,7 @@
|
|||||||
//!
|
//!
|
||||||
|
|
||||||
use crate::packet::{Blob, Packets, PacketsRecycler, SharedBlobs, PACKETS_PER_BATCH};
|
use crate::packet::{Blob, Packets, PacketsRecycler, SharedBlobs, PACKETS_PER_BATCH};
|
||||||
|
use crate::recvmmsg::NUM_RCVMMSGS;
|
||||||
use crate::result::{Error, Result};
|
use crate::result::{Error, Result};
|
||||||
use solana_sdk::timing::duration_as_ms;
|
use solana_sdk::timing::duration_as_ms;
|
||||||
use std::net::UdpSocket;
|
use std::net::UdpSocket;
|
||||||
@ -23,6 +24,10 @@ fn recv_loop(
|
|||||||
recycler: &PacketsRecycler,
|
recycler: &PacketsRecycler,
|
||||||
name: &'static str,
|
name: &'static str,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
|
let mut recv_count = 0;
|
||||||
|
let mut call_count = 0;
|
||||||
|
let mut now = Instant::now();
|
||||||
|
let mut num_max_received = 0; // Number of times maximum packets were received
|
||||||
loop {
|
loop {
|
||||||
let mut msgs = Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BATCH, name);
|
let mut msgs = Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BATCH, name);
|
||||||
loop {
|
loop {
|
||||||
@ -31,11 +36,29 @@ fn recv_loop(
|
|||||||
if exit.load(Ordering::Relaxed) {
|
if exit.load(Ordering::Relaxed) {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
if let Ok(_len) = msgs.recv_from(sock) {
|
if let Ok(len) = msgs.recv_from(sock) {
|
||||||
|
if len == NUM_RCVMMSGS {
|
||||||
|
num_max_received += 1;
|
||||||
|
}
|
||||||
|
recv_count += len;
|
||||||
|
call_count += 1;
|
||||||
channel.send(msgs)?;
|
channel.send(msgs)?;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if recv_count > 1024 {
|
||||||
|
datapoint_info!(
|
||||||
|
"receiver-stats",
|
||||||
|
("received", recv_count as i64, i64),
|
||||||
|
("call_count", i64::from(call_count), i64),
|
||||||
|
("elapsed", now.elapsed().as_millis() as i64, i64),
|
||||||
|
("max_received", i64::from(num_max_received), i64),
|
||||||
|
);
|
||||||
|
recv_count = 0;
|
||||||
|
call_count = 0;
|
||||||
|
now = Instant::now();
|
||||||
|
num_max_received = 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user