Rename Packets to PacketBatch (#21794)

This commit is contained in:
Justin Starry
2021-12-11 09:44:15 -05:00
committed by GitHub
parent 379e3ec848
commit 254ef3e7b6
27 changed files with 720 additions and 673 deletions

View File

@@ -9,13 +9,13 @@ use {
};
pub use {
solana_perf::packet::{
limited_deserialize, to_packets_chunked, Packets, PacketsRecycler, NUM_PACKETS,
limited_deserialize, to_packet_batches, PacketBatch, PacketBatchRecycler, NUM_PACKETS,
PACKETS_PER_BATCH,
},
solana_sdk::packet::{Meta, Packet, PACKET_DATA_SIZE},
};
pub fn recv_from(obj: &mut Packets, socket: &UdpSocket, max_wait_ms: u64) -> Result<usize> {
pub fn recv_from(batch: &mut PacketBatch, socket: &UdpSocket, max_wait_ms: u64) -> Result<usize> {
let mut i = 0;
//DOCUMENTED SIDE-EFFECT
//Performance out of the IO without poll
@@ -27,11 +27,11 @@ pub fn recv_from(obj: &mut Packets, socket: &UdpSocket, max_wait_ms: u64) -> Res
trace!("receiving on {}", socket.local_addr().unwrap());
let start = Instant::now();
loop {
obj.packets.resize(
batch.packets.resize(
std::cmp::min(i + NUM_RCVMMSGS, PACKETS_PER_BATCH),
Packet::default(),
);
match recv_mmsg(socket, &mut obj.packets[i..]) {
match recv_mmsg(socket, &mut batch.packets[i..]) {
Err(_) if i > 0 => {
if start.elapsed().as_millis() as u64 > max_wait_ms {
break;
@@ -55,17 +55,17 @@ pub fn recv_from(obj: &mut Packets, socket: &UdpSocket, max_wait_ms: u64) -> Res
}
}
}
obj.packets.truncate(i);
batch.packets.truncate(i);
inc_new_counter_debug!("packets-recv_count", i);
Ok(i)
}
pub fn send_to(
obj: &Packets,
batch: &PacketBatch,
socket: &UdpSocket,
socket_addr_space: &SocketAddrSpace,
) -> Result<()> {
for p in &obj.packets {
for p in &batch.packets {
let addr = p.meta.addr();
if socket_addr_space.check(&addr) {
socket.send_to(&p.data[..p.meta.size], &addr)?;
@@ -90,9 +90,9 @@ mod tests {
// test that the address is actually being updated
let send_addr: SocketAddr = "127.0.0.1:123".parse().unwrap();
let packets = vec![Packet::default()];
let mut msgs = Packets::new(packets);
msgs.set_addr(&send_addr);
assert_eq!(msgs.packets[0].meta.addr(), send_addr);
let mut packet_batch = PacketBatch::new(packets);
packet_batch.set_addr(&send_addr);
assert_eq!(packet_batch.packets[0].meta.addr(), send_addr);
}
#[test]
@@ -102,21 +102,21 @@ mod tests {
let addr = recv_socket.local_addr().unwrap();
let send_socket = UdpSocket::bind("127.0.0.1:0").expect("bind");
let saddr = send_socket.local_addr().unwrap();
let mut p = Packets::default();
let mut batch = PacketBatch::default();
p.packets.resize(10, Packet::default());
batch.packets.resize(10, Packet::default());
for m in p.packets.iter_mut() {
for m in batch.packets.iter_mut() {
m.meta.set_addr(&addr);
m.meta.size = PACKET_DATA_SIZE;
}
send_to(&p, &send_socket, &SocketAddrSpace::Unspecified).unwrap();
send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap();
let recvd = recv_from(&mut p, &recv_socket, 1).unwrap();
let recvd = recv_from(&mut batch, &recv_socket, 1).unwrap();
assert_eq!(recvd, p.packets.len());
assert_eq!(recvd, batch.packets.len());
for m in &p.packets {
for m in &batch.packets {
assert_eq!(m.meta.size, PACKET_DATA_SIZE);
assert_eq!(m.meta.addr(), saddr);
}
@@ -125,7 +125,7 @@ mod tests {
#[test]
pub fn debug_trait() {
write!(io::sink(), "{:?}", Packet::default()).unwrap();
write!(io::sink(), "{:?}", Packets::default()).unwrap();
write!(io::sink(), "{:?}", PacketBatch::default()).unwrap();
}
#[test]
@@ -151,25 +151,25 @@ mod tests {
let recv_socket = UdpSocket::bind("127.0.0.1:0").expect("bind");
let addr = recv_socket.local_addr().unwrap();
let send_socket = UdpSocket::bind("127.0.0.1:0").expect("bind");
let mut p = Packets::default();
p.packets.resize(PACKETS_PER_BATCH, Packet::default());
let mut batch = PacketBatch::default();
batch.packets.resize(PACKETS_PER_BATCH, Packet::default());
// Should only get PACKETS_PER_BATCH packets per iteration even
// if a lot more were sent, and regardless of packet size
for _ in 0..2 * PACKETS_PER_BATCH {
let mut p = Packets::default();
p.packets.resize(1, Packet::default());
for m in p.packets.iter_mut() {
let mut batch = PacketBatch::default();
batch.packets.resize(1, Packet::default());
for m in batch.packets.iter_mut() {
m.meta.set_addr(&addr);
m.meta.size = 1;
}
send_to(&p, &send_socket, &SocketAddrSpace::Unspecified).unwrap();
send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap();
}
let recvd = recv_from(&mut p, &recv_socket, 100).unwrap();
let recvd = recv_from(&mut batch, &recv_socket, 100).unwrap();
// Check we only got PACKETS_PER_BATCH packets
assert_eq!(recvd, PACKETS_PER_BATCH);
assert_eq!(p.packets.capacity(), PACKETS_PER_BATCH);
assert_eq!(batch.packets.capacity(), PACKETS_PER_BATCH);
}
}

View File

@@ -3,7 +3,7 @@
use {
crate::{
packet::{self, send_to, Packets, PacketsRecycler, PACKETS_PER_BATCH},
packet::{self, send_to, PacketBatch, PacketBatchRecycler, PACKETS_PER_BATCH},
recvmmsg::NUM_RCVMMSGS,
socket::SocketAddrSpace,
},
@@ -21,8 +21,8 @@ use {
thiserror::Error,
};
pub type PacketReceiver = Receiver<Packets>;
pub type PacketSender = Sender<Packets>;
pub type PacketBatchReceiver = Receiver<PacketBatch>;
pub type PacketBatchSender = Sender<PacketBatch>;
#[derive(Error, Debug)]
pub enum StreamerError {
@@ -33,7 +33,7 @@ pub enum StreamerError {
RecvTimeout(#[from] RecvTimeoutError),
#[error("send packets error")]
Send(#[from] SendError<Packets>),
Send(#[from] SendError<PacketBatch>),
}
pub type Result<T> = std::result::Result<T, StreamerError>;
@@ -41,8 +41,8 @@ pub type Result<T> = std::result::Result<T, StreamerError>;
fn recv_loop(
sock: &UdpSocket,
exit: Arc<AtomicBool>,
channel: &PacketSender,
recycler: &PacketsRecycler,
channel: &PacketBatchSender,
recycler: &PacketBatchRecycler,
name: &'static str,
coalesce_ms: u64,
use_pinned_memory: bool,
@@ -52,10 +52,10 @@ fn recv_loop(
let mut now = Instant::now();
let mut num_max_received = 0; // Number of times maximum packets were received
loop {
let mut msgs = if use_pinned_memory {
Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BATCH, name)
let mut packet_batch = if use_pinned_memory {
PacketBatch::new_with_recycler(recycler.clone(), PACKETS_PER_BATCH, name)
} else {
Packets::with_capacity(PACKETS_PER_BATCH)
PacketBatch::with_capacity(PACKETS_PER_BATCH)
};
loop {
// Check for exit signal, even if socket is busy
@@ -63,14 +63,14 @@ fn recv_loop(
if exit.load(Ordering::Relaxed) {
return Ok(());
}
if let Ok(len) = packet::recv_from(&mut msgs, sock, coalesce_ms) {
if let Ok(len) = packet::recv_from(&mut packet_batch, sock, coalesce_ms) {
if len == NUM_RCVMMSGS {
num_max_received += 1;
}
recv_count += len;
call_count += 1;
if len > 0 {
channel.send(msgs)?;
channel.send(packet_batch)?;
}
break;
}
@@ -94,8 +94,8 @@ fn recv_loop(
pub fn receiver(
sock: Arc<UdpSocket>,
exit: &Arc<AtomicBool>,
packet_sender: PacketSender,
recycler: PacketsRecycler,
packet_sender: PacketBatchSender,
recycler: PacketBatchRecycler,
name: &'static str,
coalesce_ms: u64,
use_pinned_memory: bool,
@@ -121,36 +121,42 @@ pub fn receiver(
fn recv_send(
sock: &UdpSocket,
r: &PacketReceiver,
r: &PacketBatchReceiver,
socket_addr_space: &SocketAddrSpace,
) -> Result<()> {
let timer = Duration::new(1, 0);
let msgs = r.recv_timeout(timer)?;
send_to(&msgs, sock, socket_addr_space)?;
let packet_batch = r.recv_timeout(timer)?;
send_to(&packet_batch, sock, socket_addr_space)?;
Ok(())
}
pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec<Packets>, usize, Duration)> {
pub fn recv_packet_batches(
recvr: &PacketBatchReceiver,
) -> Result<(Vec<PacketBatch>, usize, Duration)> {
let timer = Duration::new(1, 0);
let msgs = recvr.recv_timeout(timer)?;
let packet_batch = recvr.recv_timeout(timer)?;
let recv_start = Instant::now();
trace!("got msgs");
let mut len = msgs.packets.len();
let mut batch = vec![msgs];
while let Ok(more) = recvr.try_recv() {
trace!("got more msgs");
len += more.packets.len();
batch.push(more);
trace!("got packets");
let mut num_packets = packet_batch.packets.len();
let mut packet_batches = vec![packet_batch];
while let Ok(packet_batch) = recvr.try_recv() {
trace!("got more packets");
num_packets += packet_batch.packets.len();
packet_batches.push(packet_batch);
}
let recv_duration = recv_start.elapsed();
trace!("batch len {}", batch.len());
Ok((batch, len, recv_duration))
trace!(
"packet batches len: {}, num packets: {}",
packet_batches.len(),
num_packets
);
Ok((packet_batches, num_packets, recv_duration))
}
pub fn responder(
name: &'static str,
sock: Arc<UdpSocket>,
r: PacketReceiver,
r: PacketBatchReceiver,
socket_addr_space: SocketAddrSpace,
) -> JoinHandle<()> {
Builder::new()
@@ -187,7 +193,7 @@ mod test {
use {
super::*,
crate::{
packet::{Packet, Packets, PACKET_DATA_SIZE},
packet::{Packet, PacketBatch, PACKET_DATA_SIZE},
streamer::{receiver, responder},
},
solana_perf::recycler::Recycler,
@@ -204,16 +210,16 @@ mod test {
},
};
fn get_msgs(r: PacketReceiver, num: &mut usize) {
fn get_packet_batches(r: PacketBatchReceiver, num_packets: &mut usize) {
for _ in 0..10 {
let m = r.recv_timeout(Duration::new(1, 0));
if m.is_err() {
let packet_batch_res = r.recv_timeout(Duration::new(1, 0));
if packet_batch_res.is_err() {
continue;
}
*num -= m.unwrap().packets.len();
*num_packets -= packet_batch_res.unwrap().packets.len();
if *num == 0 {
if *num_packets == 0 {
break;
}
}
@@ -222,7 +228,7 @@ mod test {
#[test]
fn streamer_debug() {
write!(io::sink(), "{:?}", Packet::default()).unwrap();
write!(io::sink(), "{:?}", Packets::default()).unwrap();
write!(io::sink(), "{:?}", PacketBatch::default()).unwrap();
}
#[test]
fn streamer_send_test() {
@@ -250,23 +256,23 @@ mod test {
r_responder,
SocketAddrSpace::Unspecified,
);
let mut msgs = Packets::default();
let mut packet_batch = PacketBatch::default();
for i in 0..5 {
let mut b = Packet::default();
let mut p = Packet::default();
{
b.data[0] = i as u8;
b.meta.size = PACKET_DATA_SIZE;
b.meta.set_addr(&addr);
p.data[0] = i as u8;
p.meta.size = PACKET_DATA_SIZE;
p.meta.set_addr(&addr);
}
msgs.packets.push(b);
packet_batch.packets.push(p);
}
s_responder.send(msgs).expect("send");
s_responder.send(packet_batch).expect("send");
t_responder
};
let mut num = 5;
get_msgs(r_reader, &mut num);
assert_eq!(num, 0);
let mut packets_remaining = 5;
get_packet_batches(r_reader, &mut packets_remaining);
assert_eq!(packets_remaining, 0);
exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join");
t_responder.join().expect("join");