Pull streamer out into its own module. (#8917)

automerge
This commit is contained in:
anatoly yakovenko
2020-03-17 23:30:23 -07:00
committed by GitHub
parent e37a4823f1
commit 9cedeb0a8d
34 changed files with 121 additions and 69 deletions

26
streamer/Cargo.toml Normal file
View File

@ -0,0 +1,26 @@
[package]
name = "solana-streamer"
version = "1.1.0"
description = "Solana Streamer"
authors = ["Solana Maintainers <maintainers@solana.com>"]
repository = "https://github.com/solana-labs/solana"
license = "Apache-2.0"
homepage = "https://solana.com/"
edition = "2018"
[dependencies]
log = "0.4.8"
solana-metrics = { path = "../metrics", version = "1.1.0" }
solana-sdk = { path = "../sdk", version = "1.1.0" }
thiserror = "1.0"
solana-measure = { path = "../measure", version = "1.1.0" }
solana-logger = { path = "../logger", version = "1.1.0" }
libc = "0.2.67"
nix = "0.17.0"
solana-perf = { path = "../perf", version = "1.1.0" }
[dev-dependencies]
[lib]
crate-type = ["lib"]
name = "solana_streamer"

10
streamer/src/lib.rs Normal file
View File

@ -0,0 +1,10 @@
pub mod packet;
pub mod recvmmsg;
pub mod sendmmsg;
pub mod streamer;
#[macro_use]
extern crate log;
#[macro_use]
extern crate solana_metrics;

159
streamer/src/packet.rs Normal file
View File

@ -0,0 +1,159 @@
//! The `packet` module defines data structures and methods to pull data from the network.
use crate::recvmmsg::{recv_mmsg, NUM_RCVMMSGS};
pub use solana_perf::packet::{
limited_deserialize, to_packets, to_packets_chunked, Packets, PacketsRecycler, NUM_PACKETS,
PACKETS_BATCH_SIZE, PACKETS_PER_BATCH,
};
use solana_metrics::inc_new_counter_debug;
pub use solana_sdk::packet::{Meta, Packet, PACKET_DATA_SIZE};
use std::{io::Result, net::UdpSocket, time::Instant};
pub fn recv_from(obj: &mut Packets, socket: &UdpSocket, max_wait_ms: usize) -> Result<usize> {
let mut i = 0;
//DOCUMENTED SIDE-EFFECT
//Performance out of the IO without poll
// * block on the socket until it's readable
// * set the socket to non blocking
// * read until it fails
// * set it back to blocking before returning
socket.set_nonblocking(false)?;
trace!("receiving on {}", socket.local_addr().unwrap());
let start = Instant::now();
loop {
obj.packets.resize(
std::cmp::min(i + NUM_RCVMMSGS, PACKETS_PER_BATCH),
Packet::default(),
);
match recv_mmsg(socket, &mut obj.packets[i..]) {
Err(_) if i > 0 => {
if start.elapsed().as_millis() > 1 {
break;
}
}
Err(e) => {
trace!("recv_from err {:?}", e);
return Err(e);
}
Ok((_, npkts)) => {
if i == 0 {
socket.set_nonblocking(true)?;
}
trace!("got {} packets", npkts);
i += npkts;
// Try to batch into big enough buffers
// will cause less re-shuffling later on.
if start.elapsed().as_millis() > max_wait_ms as u128 || i >= PACKETS_PER_BATCH {
break;
}
}
}
}
obj.packets.truncate(i);
inc_new_counter_debug!("packets-recv_count", i);
Ok(i)
}
pub fn send_to(obj: &Packets, socket: &UdpSocket) -> Result<()> {
for p in &obj.packets {
let a = p.meta.addr();
socket.send_to(&p.data[..p.meta.size], &a)?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::io;
use std::io::Write;
use std::net::{SocketAddr, UdpSocket};
#[test]
fn test_packets_set_addr() {
// test that the address is actually being updated
let send_addr: SocketAddr = "127.0.0.1:123".parse().unwrap();
let packets = vec![Packet::default()];
let mut msgs = Packets::new(packets);
msgs.set_addr(&send_addr);
assert_eq!(SocketAddr::from(msgs.packets[0].meta.addr()), send_addr);
}
#[test]
pub fn packet_send_recv() {
solana_logger::setup();
let recv_socket = UdpSocket::bind("127.0.0.1:0").expect("bind");
let addr = recv_socket.local_addr().unwrap();
let send_socket = UdpSocket::bind("127.0.0.1:0").expect("bind");
let saddr = send_socket.local_addr().unwrap();
let mut p = Packets::default();
p.packets.resize(10, Packet::default());
for m in p.packets.iter_mut() {
m.meta.set_addr(&addr);
m.meta.size = PACKET_DATA_SIZE;
}
send_to(&p, &send_socket).unwrap();
let recvd = recv_from(&mut p, &recv_socket, 1).unwrap();
assert_eq!(recvd, p.packets.len());
for m in &p.packets {
assert_eq!(m.meta.size, PACKET_DATA_SIZE);
assert_eq!(m.meta.addr(), saddr);
}
}
#[test]
pub fn debug_trait() {
write!(io::sink(), "{:?}", Packet::default()).unwrap();
write!(io::sink(), "{:?}", Packets::default()).unwrap();
}
#[test]
fn test_packet_partial_eq() {
let mut p1 = Packet::default();
let mut p2 = Packet::default();
p1.meta.size = 1;
p1.data[0] = 0;
p2.meta.size = 1;
p2.data[0] = 0;
assert!(p1 == p2);
p2.data[0] = 4;
assert!(p1 != p2);
}
#[test]
fn test_packet_resize() {
solana_logger::setup();
let recv_socket = UdpSocket::bind("127.0.0.1:0").expect("bind");
let addr = recv_socket.local_addr().unwrap();
let send_socket = UdpSocket::bind("127.0.0.1:0").expect("bind");
let mut p = Packets::default();
p.packets.resize(PACKETS_PER_BATCH, Packet::default());
// Should only get PACKETS_PER_BATCH packets per iteration even
// if a lot more were sent, and regardless of packet size
for _ in 0..2 * PACKETS_PER_BATCH {
let mut p = Packets::default();
p.packets.resize(1, Packet::default());
for m in p.packets.iter_mut() {
m.meta.set_addr(&addr);
m.meta.size = 1;
}
send_to(&p, &send_socket).unwrap();
}
let recvd = recv_from(&mut p, &recv_socket, 100).unwrap();
// Check we only got PACKETS_PER_BATCH packets
assert_eq!(recvd, PACKETS_PER_BATCH);
assert_eq!(p.packets.capacity(), PACKETS_PER_BATCH);
}
}

261
streamer/src/recvmmsg.rs Normal file
View File

@ -0,0 +1,261 @@
//! The `recvmmsg` module provides recvmmsg() API implementation
use crate::packet::Packet;
pub use solana_perf::packet::NUM_RCVMMSGS;
use std::cmp;
use std::io;
use std::net::UdpSocket;
#[cfg(not(target_os = "linux"))]
pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result<(usize, usize)> {
let mut i = 0;
let count = cmp::min(NUM_RCVMMSGS, packets.len());
let mut total_size = 0;
for p in packets.iter_mut().take(count) {
p.meta.size = 0;
match socket.recv_from(&mut p.data) {
Err(_) if i > 0 => {
break;
}
Err(e) => {
return Err(e);
}
Ok((nrecv, from)) => {
total_size += nrecv;
p.meta.size = nrecv;
p.meta.set_addr(&from);
if i == 0 {
socket.set_nonblocking(true)?;
}
}
}
i += 1;
}
Ok((total_size, i))
}
#[cfg(target_os = "linux")]
pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result<(usize, usize)> {
use libc::{
c_void, iovec, mmsghdr, recvmmsg, sockaddr_in, socklen_t, time_t, timespec, MSG_WAITFORONE,
};
use nix::sys::socket::InetAddr;
use std::mem;
use std::os::unix::io::AsRawFd;
let mut hdrs: [mmsghdr; NUM_RCVMMSGS] = unsafe { mem::zeroed() };
let mut iovs: [iovec; NUM_RCVMMSGS] = unsafe { mem::zeroed() };
let mut addr: [sockaddr_in; NUM_RCVMMSGS] = unsafe { mem::zeroed() };
let addrlen = mem::size_of_val(&addr) as socklen_t;
let sock_fd = sock.as_raw_fd();
let count = cmp::min(iovs.len(), packets.len());
for i in 0..count {
iovs[i].iov_base = packets[i].data.as_mut_ptr() as *mut c_void;
iovs[i].iov_len = packets[i].data.len();
hdrs[i].msg_hdr.msg_name = &mut addr[i] as *mut _ as *mut _;
hdrs[i].msg_hdr.msg_namelen = addrlen;
hdrs[i].msg_hdr.msg_iov = &mut iovs[i];
hdrs[i].msg_hdr.msg_iovlen = 1;
}
let mut ts = timespec {
tv_sec: 1 as time_t,
tv_nsec: 0,
};
let mut total_size = 0;
let npkts =
match unsafe { recvmmsg(sock_fd, &mut hdrs[0], count as u32, MSG_WAITFORONE, &mut ts) } {
-1 => return Err(io::Error::last_os_error()),
n => {
for i in 0..n as usize {
let mut p = &mut packets[i];
p.meta.size = hdrs[i].msg_len as usize;
total_size += p.meta.size;
let inet_addr = InetAddr::V4(addr[i]);
p.meta.set_addr(&inet_addr.to_std());
}
n as usize
}
};
Ok((total_size, npkts))
}
#[cfg(test)]
mod tests {
use crate::packet::PACKET_DATA_SIZE;
use crate::recvmmsg::*;
use std::time::{Duration, Instant};
const TEST_NUM_MSGS: usize = 32;
#[test]
pub fn test_recv_mmsg_one_iter() {
let reader = UdpSocket::bind("127.0.0.1:0").expect("bind");
let addr = reader.local_addr().unwrap();
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
let saddr = sender.local_addr().unwrap();
let sent = TEST_NUM_MSGS - 1;
for _ in 0..sent {
let data = [0; PACKET_DATA_SIZE];
sender.send_to(&data[..], &addr).unwrap();
}
let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1;
assert_eq!(sent, recv);
for i in 0..recv {
assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE);
assert_eq!(packets[i].meta.addr(), saddr);
}
}
#[test]
pub fn test_recv_mmsg_multi_iter() {
let reader = UdpSocket::bind("127.0.0.1:0").expect("bind");
let addr = reader.local_addr().unwrap();
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
let saddr = sender.local_addr().unwrap();
let sent = TEST_NUM_MSGS + 10;
for _ in 0..sent {
let data = [0; PACKET_DATA_SIZE];
sender.send_to(&data[..], &addr).unwrap();
}
let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1;
assert_eq!(TEST_NUM_MSGS, recv);
for i in 0..recv {
assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE);
assert_eq!(packets[i].meta.addr(), saddr);
}
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1;
assert_eq!(sent - TEST_NUM_MSGS, recv);
for i in 0..recv {
assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE);
assert_eq!(packets[i].meta.addr(), saddr);
}
}
#[test]
pub fn test_recv_mmsg_multi_iter_timeout() {
let reader = UdpSocket::bind("127.0.0.1:0").expect("bind");
let addr = reader.local_addr().unwrap();
reader.set_read_timeout(Some(Duration::new(5, 0))).unwrap();
reader.set_nonblocking(false).unwrap();
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
let saddr = sender.local_addr().unwrap();
let sent = TEST_NUM_MSGS;
for _ in 0..sent {
let data = [0; PACKET_DATA_SIZE];
sender.send_to(&data[..], &addr).unwrap();
}
let start = Instant::now();
let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1;
assert_eq!(TEST_NUM_MSGS, recv);
for i in 0..recv {
assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE);
assert_eq!(packets[i].meta.addr(), saddr);
}
reader.set_nonblocking(true).unwrap();
let _recv = recv_mmsg(&reader, &mut packets[..]);
assert!(start.elapsed().as_secs() < 5);
}
#[test]
pub fn test_recv_mmsg_multi_addrs() {
let reader = UdpSocket::bind("127.0.0.1:0").expect("bind");
let addr = reader.local_addr().unwrap();
let sender1 = UdpSocket::bind("127.0.0.1:0").expect("bind");
let saddr1 = sender1.local_addr().unwrap();
let sent1 = TEST_NUM_MSGS - 1;
let sender2 = UdpSocket::bind("127.0.0.1:0").expect("bind");
let saddr2 = sender2.local_addr().unwrap();
let sent2 = TEST_NUM_MSGS + 1;
for _ in 0..sent1 {
let data = [0; PACKET_DATA_SIZE];
sender1.send_to(&data[..], &addr).unwrap();
}
for _ in 0..sent2 {
let data = [0; PACKET_DATA_SIZE];
sender2.send_to(&data[..], &addr).unwrap();
}
let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1;
assert_eq!(TEST_NUM_MSGS, recv);
for i in 0..sent1 {
assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE);
assert_eq!(packets[i].meta.addr(), saddr1);
}
for i in sent1..recv {
assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE);
assert_eq!(packets[i].meta.addr(), saddr2);
}
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1;
assert_eq!(sent1 + sent2 - TEST_NUM_MSGS, recv);
for i in 0..recv {
assert_eq!(packets[i].meta.size, PACKET_DATA_SIZE);
assert_eq!(packets[i].meta.addr(), saddr2);
}
}
#[cfg(target_os = "linux")]
#[test]
pub fn test_recv_mmsg_batch_size() {
let reader = UdpSocket::bind("127.0.0.1:0").expect("bind");
let addr = reader.local_addr().unwrap();
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
const TEST_BATCH_SIZE: usize = 64;
let sent = TEST_BATCH_SIZE;
let mut elapsed_in_max_batch = 0;
(0..1000).for_each(|_| {
for _ in 0..sent {
let data = [0; PACKET_DATA_SIZE];
sender.send_to(&data[..], &addr).unwrap();
}
let mut packets = vec![Packet::default(); TEST_BATCH_SIZE];
let now = Instant::now();
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1;
elapsed_in_max_batch += now.elapsed().as_nanos();
assert_eq!(TEST_BATCH_SIZE, recv);
});
let mut elapsed_in_small_batch = 0;
(0..1000).for_each(|_| {
for _ in 0..sent {
let data = [0; PACKET_DATA_SIZE];
sender.send_to(&data[..], &addr).unwrap();
}
let mut packets = vec![Packet::default(); 4];
let mut recv = 0;
let now = Instant::now();
while let Ok(num) = recv_mmsg(&reader, &mut packets[..]) {
recv += num.1;
if recv >= TEST_BATCH_SIZE {
break;
}
}
elapsed_in_small_batch += now.elapsed().as_nanos();
assert_eq!(TEST_BATCH_SIZE, recv);
});
assert!(elapsed_in_max_batch <= elapsed_in_small_batch);
}
}

249
streamer/src/sendmmsg.rs Normal file
View File

@ -0,0 +1,249 @@
//! The `sendmmsg` module provides sendmmsg() API implementation
use std::io;
use std::net::{SocketAddr, UdpSocket};
#[cfg(not(target_os = "linux"))]
pub fn send_mmsg(sock: &UdpSocket, packets: &mut [(Vec<u8>, &SocketAddr)]) -> io::Result<usize> {
let count = packets.len();
for (p, a) in packets {
sock.send_to(p, *a)?;
}
Ok(count)
}
#[cfg(target_os = "linux")]
use libc::{iovec, mmsghdr, sockaddr_in, sockaddr_in6};
#[cfg(target_os = "linux")]
fn mmsghdr_for_packet(
packet: &mut [u8],
dest: &SocketAddr,
index: usize,
addr_in_len: u32,
addr_in6_len: u32,
iovs: &mut Vec<iovec>,
addr_in: &mut Vec<sockaddr_in>,
addr_in6: &mut Vec<sockaddr_in6>,
) -> mmsghdr {
use libc::c_void;
use nix::sys::socket::InetAddr;
use std::mem;
iovs.push(iovec {
iov_base: packet.as_mut_ptr() as *mut c_void,
iov_len: packet.len(),
});
let mut hdr: mmsghdr = unsafe { mem::zeroed() };
hdr.msg_hdr.msg_iov = &mut iovs[index];
hdr.msg_hdr.msg_iovlen = 1;
hdr.msg_len = packet.len() as u32;
match InetAddr::from_std(dest) {
InetAddr::V4(addr) => {
addr_in.insert(index, addr);
hdr.msg_hdr.msg_name = &mut addr_in[index] as *mut _ as *mut _;
hdr.msg_hdr.msg_namelen = addr_in_len;
}
InetAddr::V6(addr) => {
addr_in6.insert(index, addr);
hdr.msg_hdr.msg_name = &mut addr_in6[index] as *mut _ as *mut _;
hdr.msg_hdr.msg_namelen = addr_in6_len;
}
};
hdr
}
#[cfg(target_os = "linux")]
pub fn send_mmsg(sock: &UdpSocket, packets: &mut [(Vec<u8>, &SocketAddr)]) -> io::Result<usize> {
use libc::{sendmmsg, socklen_t};
use std::mem;
use std::os::unix::io::AsRawFd;
// The vectors are allocated with capacity, as later code inserts elements
// at specific indices, and uses the address of the vector index in hdrs
let mut iovs: Vec<iovec> = Vec::with_capacity(packets.len());
let mut addr_in: Vec<sockaddr_in> = Vec::with_capacity(packets.len());
let mut addr_in6: Vec<sockaddr_in6> = Vec::with_capacity(packets.len());
let addr_in_len = mem::size_of_val(&addr_in) as socklen_t;
let addr_in6_len = mem::size_of_val(&addr_in6) as socklen_t;
let sock_fd = sock.as_raw_fd();
let mut hdrs: Vec<mmsghdr> = packets
.iter_mut()
.enumerate()
.map(|(i, (packet, dest))| {
mmsghdr_for_packet(
packet,
dest,
i,
addr_in_len as u32,
addr_in6_len as u32,
&mut iovs,
&mut addr_in,
&mut addr_in6,
)
})
.collect();
let npkts = match unsafe { sendmmsg(sock_fd, &mut hdrs[0], packets.len() as u32, 0) } {
-1 => return Err(io::Error::last_os_error()),
n => n as usize,
};
Ok(npkts)
}
#[cfg(not(target_os = "linux"))]
pub fn multicast(sock: &UdpSocket, packet: &mut [u8], dests: &[&SocketAddr]) -> io::Result<usize> {
let count = dests.len();
for a in dests {
sock.send_to(packet, a)?;
}
Ok(count)
}
#[cfg(target_os = "linux")]
pub fn multicast(sock: &UdpSocket, packet: &mut [u8], dests: &[&SocketAddr]) -> io::Result<usize> {
use libc::{sendmmsg, socklen_t};
use std::mem;
use std::os::unix::io::AsRawFd;
// The vectors are allocated with capacity, as later code inserts elements
// at specific indices, and uses the address of the vector index in hdrs
let mut iovs: Vec<iovec> = Vec::with_capacity(dests.len());
let mut addr_in: Vec<sockaddr_in> = Vec::with_capacity(dests.len());
let mut addr_in6: Vec<sockaddr_in6> = Vec::with_capacity(dests.len());
let addr_in_len = mem::size_of_val(&addr_in) as socklen_t;
let addr_in6_len = mem::size_of_val(&addr_in6) as socklen_t;
let sock_fd = sock.as_raw_fd();
let mut hdrs: Vec<mmsghdr> = dests
.iter()
.enumerate()
.map(|(i, dest)| {
mmsghdr_for_packet(
packet,
dest,
i,
addr_in_len as u32,
addr_in6_len as u32,
&mut iovs,
&mut addr_in,
&mut addr_in6,
)
})
.collect();
let npkts = match unsafe { sendmmsg(sock_fd, &mut hdrs[0], dests.len() as u32, 0) } {
-1 => return Err(io::Error::last_os_error()),
n => n as usize,
};
Ok(npkts)
}
#[cfg(test)]
mod tests {
use crate::packet::Packet;
use crate::recvmmsg::recv_mmsg;
use crate::sendmmsg::{multicast, send_mmsg};
use solana_sdk::packet::PACKET_DATA_SIZE;
use std::net::UdpSocket;
#[test]
pub fn test_send_mmsg_one_dest() {
let reader = UdpSocket::bind("127.0.0.1:0").expect("bind");
let addr = reader.local_addr().unwrap();
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
let mut packets: Vec<_> = (0..32)
.map(|_| (vec![0u8; PACKET_DATA_SIZE], &addr))
.collect();
let sent = send_mmsg(&sender, &mut packets).ok();
assert_eq!(sent, Some(32));
let mut packets = vec![Packet::default(); 32];
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1;
assert_eq!(32, recv);
}
#[test]
pub fn test_send_mmsg_multi_dest() {
let reader = UdpSocket::bind("127.0.0.1:0").expect("bind");
let addr = reader.local_addr().unwrap();
let reader2 = UdpSocket::bind("127.0.0.1:0").expect("bind");
let addr2 = reader2.local_addr().unwrap();
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
let mut packets: Vec<_> = (0..32)
.map(|i| {
if i < 16 {
(vec![0u8; PACKET_DATA_SIZE], &addr)
} else {
(vec![0u8; PACKET_DATA_SIZE], &addr2)
}
})
.collect();
let sent = send_mmsg(&sender, &mut packets).ok();
assert_eq!(sent, Some(32));
let mut packets = vec![Packet::default(); 32];
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1;
assert_eq!(16, recv);
let mut packets = vec![Packet::default(); 32];
let recv = recv_mmsg(&reader2, &mut packets[..]).unwrap().1;
assert_eq!(16, recv);
}
#[test]
pub fn test_multicast_msg() {
let reader = UdpSocket::bind("127.0.0.1:0").expect("bind");
let addr = reader.local_addr().unwrap();
let reader2 = UdpSocket::bind("127.0.0.1:0").expect("bind");
let addr2 = reader2.local_addr().unwrap();
let reader3 = UdpSocket::bind("127.0.0.1:0").expect("bind");
let addr3 = reader3.local_addr().unwrap();
let reader4 = UdpSocket::bind("127.0.0.1:0").expect("bind");
let addr4 = reader4.local_addr().unwrap();
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
let mut packet = Packet::default();
let sent = multicast(
&sender,
&mut packet.data[..packet.meta.size],
&[&addr, &addr2, &addr3, &addr4],
)
.ok();
assert_eq!(sent, Some(4));
let mut packets = vec![Packet::default(); 32];
let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1;
assert_eq!(1, recv);
let mut packets = vec![Packet::default(); 32];
let recv = recv_mmsg(&reader2, &mut packets[..]).unwrap().1;
assert_eq!(1, recv);
let mut packets = vec![Packet::default(); 32];
let recv = recv_mmsg(&reader3, &mut packets[..]).unwrap().1;
assert_eq!(1, recv);
let mut packets = vec![Packet::default(); 32];
let recv = recv_mmsg(&reader4, &mut packets[..]).unwrap().1;
assert_eq!(1, recv);
}
}

212
streamer/src/streamer.rs Normal file
View File

@ -0,0 +1,212 @@
//! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets.
//!
use crate::packet::{self, send_to, Packets, PacketsRecycler, PACKETS_PER_BATCH};
use crate::recvmmsg::NUM_RCVMMSGS;
use solana_measure::thread_mem_usage;
use solana_sdk::timing::duration_as_ms;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError, SendError, Sender};
use std::sync::Arc;
use std::thread::{Builder, JoinHandle};
use std::time::{Duration, Instant};
use thiserror::Error;
pub type PacketReceiver = Receiver<Packets>;
pub type PacketSender = Sender<Packets>;
#[derive(Error, Debug)]
pub enum StreamerError {
#[error("I/O error")]
IO(#[from] std::io::Error),
#[error("receive timeout error")]
RecvTimeoutError(#[from] RecvTimeoutError),
#[error("send packets error")]
SendError(#[from] SendError<Packets>),
}
pub type Result<T> = std::result::Result<T, StreamerError>;
fn recv_loop(
sock: &UdpSocket,
exit: Arc<AtomicBool>,
channel: &PacketSender,
recycler: &PacketsRecycler,
name: &'static str,
) -> Result<()> {
let mut recv_count = 0;
let mut call_count = 0;
let mut now = Instant::now();
let mut num_max_received = 0; // Number of times maximum packets were received
loop {
let mut msgs = Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BATCH, name);
loop {
// Check for exit signal, even if socket is busy
// (for instance the leader transaction socket)
if exit.load(Ordering::Relaxed) {
return Ok(());
}
if let Ok(len) = packet::recv_from(&mut msgs, sock, 1) {
if len == NUM_RCVMMSGS {
num_max_received += 1;
}
recv_count += len;
call_count += 1;
if len > 0 {
channel.send(msgs)?;
}
break;
}
}
if recv_count > 1024 {
datapoint_debug!(
name,
("received", recv_count as i64, i64),
("call_count", i64::from(call_count), i64),
("elapsed", now.elapsed().as_millis() as i64, i64),
("max_received", i64::from(num_max_received), i64),
);
recv_count = 0;
call_count = 0;
num_max_received = 0;
}
now = Instant::now();
}
}
pub fn receiver(
sock: Arc<UdpSocket>,
exit: &Arc<AtomicBool>,
packet_sender: PacketSender,
recycler: PacketsRecycler,
name: &'static str,
) -> JoinHandle<()> {
let res = sock.set_read_timeout(Some(Duration::new(1, 0)));
if res.is_err() {
panic!("streamer::receiver set_read_timeout error");
}
let exit = exit.clone();
Builder::new()
.name("solana-receiver".to_string())
.spawn(move || {
thread_mem_usage::datapoint(name);
let _ = recv_loop(&sock, exit, &packet_sender, &recycler.clone(), name);
})
.unwrap()
}
fn recv_send(sock: &UdpSocket, r: &PacketReceiver) -> Result<()> {
let timer = Duration::new(1, 0);
let msgs = r.recv_timeout(timer)?;
send_to(&msgs, sock)?;
Ok(())
}
pub fn recv_batch(recvr: &PacketReceiver, max_batch: usize) -> Result<(Vec<Packets>, usize, u64)> {
let timer = Duration::new(1, 0);
let msgs = recvr.recv_timeout(timer)?;
let recv_start = Instant::now();
trace!("got msgs");
let mut len = msgs.packets.len();
let mut batch = vec![msgs];
while let Ok(more) = recvr.try_recv() {
trace!("got more msgs");
len += more.packets.len();
batch.push(more);
if len > max_batch {
break;
}
}
trace!("batch len {}", batch.len());
Ok((batch, len, duration_as_ms(&recv_start.elapsed())))
}
pub fn responder(name: &'static str, sock: Arc<UdpSocket>, r: PacketReceiver) -> JoinHandle<()> {
Builder::new()
.name(format!("solana-responder-{}", name))
.spawn(move || loop {
thread_mem_usage::datapoint(name);
if let Err(e) = recv_send(&sock, &r) {
match e {
StreamerError::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
StreamerError::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => info!("{} responder error: {:?}", name, e),
}
}
})
.unwrap()
}
#[cfg(test)]
mod test {
use super::*;
use crate::packet::{Packet, Packets, PACKET_DATA_SIZE};
use crate::streamer::{receiver, responder};
use solana_perf::recycler::Recycler;
use std::io;
use std::io::Write;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::time::Duration;
fn get_msgs(r: PacketReceiver, num: &mut usize) -> Result<()> {
for _ in 0..10 {
let m = r.recv_timeout(Duration::new(1, 0));
if m.is_err() {
continue;
}
*num -= m.unwrap().packets.len();
if *num == 0 {
break;
}
}
Ok(())
}
#[test]
fn streamer_debug() {
write!(io::sink(), "{:?}", Packet::default()).unwrap();
write!(io::sink(), "{:?}", Packets::default()).unwrap();
}
#[test]
fn streamer_send_test() {
let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
read.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
let addr = read.local_addr().unwrap();
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false));
let (s_reader, r_reader) = channel();
let t_receiver = receiver(Arc::new(read), &exit, s_reader, Recycler::default(), "test");
let t_responder = {
let (s_responder, r_responder) = channel();
let t_responder = responder("streamer_send_test", Arc::new(send), r_responder);
let mut msgs = Packets::default();
for i in 0..5 {
let mut b = Packet::default();
{
b.data[0] = i as u8;
b.meta.size = PACKET_DATA_SIZE;
b.meta.set_addr(&addr);
}
msgs.packets.push(b);
}
s_responder.send(msgs).expect("send");
t_responder
};
let mut num = 5;
get_msgs(r_reader, &mut num).expect("get_msgs");
assert_eq!(num, 0);
exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join");
t_responder.join().expect("join");
}
}