removes raw indexing from streamer (#19183)
Raw indexing is verbose and error-prone. This same code had an indexing bug causing validator nodes panic just a few months ago: https://github.com/solana-labs/solana/commit/482b8c6be
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -5807,6 +5807,7 @@ dependencies = [
|
|||||||
name = "solana-streamer"
|
name = "solana-streamer"
|
||||||
version = "1.8.0"
|
version = "1.8.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"itertools 0.10.1",
|
||||||
"libc",
|
"libc",
|
||||||
"log 0.4.14",
|
"log 0.4.14",
|
||||||
"nix",
|
"nix",
|
||||||
|
@ -10,6 +10,7 @@ documentation = "https://docs.rs/solana-streamer"
|
|||||||
edition = "2018"
|
edition = "2018"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
itertools = "0.10.1"
|
||||||
log = "0.4.14"
|
log = "0.4.14"
|
||||||
solana-metrics = { path = "../metrics", version = "=1.8.0" }
|
solana-metrics = { path = "../metrics", version = "=1.8.0" }
|
||||||
solana-sdk = { path = "../sdk", version = "=1.8.0" }
|
solana-sdk = { path = "../sdk", version = "=1.8.0" }
|
||||||
|
@ -1,10 +1,17 @@
|
|||||||
//! The `recvmmsg` module provides recvmmsg() API implementation
|
//! The `recvmmsg` module provides recvmmsg() API implementation
|
||||||
|
|
||||||
use crate::packet::Packet;
|
|
||||||
pub use solana_perf::packet::NUM_RCVMMSGS;
|
pub use solana_perf::packet::NUM_RCVMMSGS;
|
||||||
use std::cmp;
|
use {
|
||||||
use std::io;
|
crate::packet::Packet,
|
||||||
use std::net::UdpSocket;
|
std::{cmp, io, net::UdpSocket},
|
||||||
|
};
|
||||||
|
#[cfg(target_os = "linux")]
|
||||||
|
use {
|
||||||
|
itertools::izip,
|
||||||
|
libc::{iovec, mmsghdr, sockaddr_storage, socklen_t, AF_INET, AF_INET6, MSG_WAITFORONE},
|
||||||
|
nix::sys::socket::InetAddr,
|
||||||
|
std::{mem, os::unix::io::AsRawFd},
|
||||||
|
};
|
||||||
|
|
||||||
#[cfg(not(target_os = "linux"))]
|
#[cfg(not(target_os = "linux"))]
|
||||||
pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result<(usize, usize)> {
|
pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result<(usize, usize)> {
|
||||||
@ -34,76 +41,79 @@ pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result<(usiz
|
|||||||
Ok((total_size, i))
|
Ok((total_size, i))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(target_os = "linux")]
|
||||||
|
fn cast_socket_addr(addr: &sockaddr_storage, hdr: &mmsghdr) -> Option<InetAddr> {
|
||||||
|
use libc::{sa_family_t, sockaddr_in, sockaddr_in6};
|
||||||
|
const SOCKADDR_IN_SIZE: usize = std::mem::size_of::<sockaddr_in>();
|
||||||
|
const SOCKADDR_IN6_SIZE: usize = std::mem::size_of::<sockaddr_in6>();
|
||||||
|
if addr.ss_family == AF_INET as sa_family_t
|
||||||
|
&& hdr.msg_hdr.msg_namelen == SOCKADDR_IN_SIZE as socklen_t
|
||||||
|
{
|
||||||
|
let addr = addr as *const _ as *const sockaddr_in;
|
||||||
|
return Some(unsafe { InetAddr::V4(*addr) });
|
||||||
|
}
|
||||||
|
if addr.ss_family == AF_INET6 as sa_family_t
|
||||||
|
&& hdr.msg_hdr.msg_namelen == SOCKADDR_IN6_SIZE as socklen_t
|
||||||
|
{
|
||||||
|
let addr = addr as *const _ as *const sockaddr_in6;
|
||||||
|
return Some(unsafe { InetAddr::V6(*addr) });
|
||||||
|
}
|
||||||
|
error!(
|
||||||
|
"recvmmsg unexpected ss_family:{} msg_namelen:{}",
|
||||||
|
addr.ss_family, hdr.msg_hdr.msg_namelen
|
||||||
|
);
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(target_os = "linux")]
|
#[cfg(target_os = "linux")]
|
||||||
#[allow(clippy::uninit_assumed_init)]
|
#[allow(clippy::uninit_assumed_init)]
|
||||||
pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result<(usize, usize)> {
|
pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result<(usize, usize)> {
|
||||||
use libc::{
|
const SOCKADDR_STORAGE_SIZE: usize = mem::size_of::<sockaddr_storage>();
|
||||||
c_void, iovec, mmsghdr, recvmmsg, sa_family_t, sockaddr_in, sockaddr_in6, sockaddr_storage,
|
|
||||||
socklen_t, timespec, AF_INET, AF_INET6, MSG_WAITFORONE,
|
|
||||||
};
|
|
||||||
use nix::sys::socket::InetAddr;
|
|
||||||
use std::mem;
|
|
||||||
use std::os::unix::io::AsRawFd;
|
|
||||||
|
|
||||||
const SOCKADDR_STORAGE_SIZE: socklen_t = mem::size_of::<sockaddr_storage>() as socklen_t;
|
|
||||||
const SOCKADDR_IN_SIZE: socklen_t = mem::size_of::<sockaddr_in>() as socklen_t;
|
|
||||||
const SOCKADDR_IN6_SIZE: socklen_t = mem::size_of::<sockaddr_in6>() as socklen_t;
|
|
||||||
|
|
||||||
let mut hdrs: [mmsghdr; NUM_RCVMMSGS] = unsafe { mem::zeroed() };
|
let mut hdrs: [mmsghdr; NUM_RCVMMSGS] = unsafe { mem::zeroed() };
|
||||||
let mut iovs: [iovec; NUM_RCVMMSGS] = unsafe { mem::MaybeUninit::uninit().assume_init() };
|
let mut iovs: [iovec; NUM_RCVMMSGS] = unsafe { mem::MaybeUninit::uninit().assume_init() };
|
||||||
let mut addr: [sockaddr_storage; NUM_RCVMMSGS] = unsafe { mem::zeroed() };
|
let mut addrs: [sockaddr_storage; NUM_RCVMMSGS] = unsafe { mem::zeroed() };
|
||||||
|
|
||||||
let sock_fd = sock.as_raw_fd();
|
let sock_fd = sock.as_raw_fd();
|
||||||
let count = cmp::min(iovs.len(), packets.len());
|
let count = cmp::min(iovs.len(), packets.len());
|
||||||
|
|
||||||
for i in 0..count {
|
for (packet, hdr, iov, addr) in
|
||||||
iovs[i].iov_base = packets[i].data.as_mut_ptr() as *mut c_void;
|
izip!(packets.iter_mut(), &mut hdrs, &mut iovs, &mut addrs).take(count)
|
||||||
iovs[i].iov_len = packets[i].data.len();
|
{
|
||||||
|
*iov = iovec {
|
||||||
hdrs[i].msg_hdr.msg_name = &mut addr[i] as *mut _ as *mut _;
|
iov_base: packet.data.as_mut_ptr() as *mut libc::c_void,
|
||||||
hdrs[i].msg_hdr.msg_namelen = SOCKADDR_STORAGE_SIZE;
|
iov_len: packet.data.len(),
|
||||||
hdrs[i].msg_hdr.msg_iov = &mut iovs[i];
|
};
|
||||||
hdrs[i].msg_hdr.msg_iovlen = 1;
|
hdr.msg_hdr.msg_name = addr as *mut _ as *mut _;
|
||||||
|
hdr.msg_hdr.msg_namelen = SOCKADDR_STORAGE_SIZE as socklen_t;
|
||||||
|
hdr.msg_hdr.msg_iov = iov;
|
||||||
|
hdr.msg_hdr.msg_iovlen = 1;
|
||||||
}
|
}
|
||||||
let mut ts = timespec {
|
let mut ts = libc::timespec {
|
||||||
tv_sec: 1,
|
tv_sec: 1,
|
||||||
tv_nsec: 0,
|
tv_nsec: 0,
|
||||||
};
|
};
|
||||||
|
let nrecv =
|
||||||
let mut total_size = 0;
|
unsafe { libc::recvmmsg(sock_fd, &mut hdrs[0], count as u32, MSG_WAITFORONE, &mut ts) };
|
||||||
let npkts =
|
if nrecv < 0 {
|
||||||
match unsafe { recvmmsg(sock_fd, &mut hdrs[0], count as u32, MSG_WAITFORONE, &mut ts) } {
|
return Err(io::Error::last_os_error());
|
||||||
-1 => return Err(io::Error::last_os_error()),
|
|
||||||
n => {
|
|
||||||
let mut pkt_idx: usize = 0;
|
|
||||||
for i in 0..n as usize {
|
|
||||||
let inet_addr = if addr[i].ss_family == AF_INET as sa_family_t
|
|
||||||
&& hdrs[i].msg_hdr.msg_namelen == SOCKADDR_IN_SIZE
|
|
||||||
{
|
|
||||||
let a: *const sockaddr_in = &addr[i] as *const _ as *const _;
|
|
||||||
unsafe { InetAddr::V4(*a) }
|
|
||||||
} else if addr[i].ss_family == AF_INET6 as sa_family_t
|
|
||||||
&& hdrs[i].msg_hdr.msg_namelen == SOCKADDR_IN6_SIZE
|
|
||||||
{
|
|
||||||
let a: *const sockaddr_in6 = &addr[i] as *const _ as *const _;
|
|
||||||
unsafe { InetAddr::V6(*a) }
|
|
||||||
} else {
|
|
||||||
error!(
|
|
||||||
"recvmmsg unexpected ss_family:{} msg_namelen:{}",
|
|
||||||
addr[i].ss_family, hdrs[i].msg_hdr.msg_namelen
|
|
||||||
);
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
let mut p = &mut packets[pkt_idx];
|
|
||||||
p.meta.size = hdrs[i].msg_len as usize;
|
|
||||||
total_size += p.meta.size;
|
|
||||||
p.meta.set_addr(&inet_addr.to_std());
|
|
||||||
pkt_idx += 1;
|
|
||||||
}
|
}
|
||||||
pkt_idx
|
let mut npkts = 0;
|
||||||
}
|
addrs
|
||||||
};
|
.iter()
|
||||||
|
.zip(hdrs)
|
||||||
|
.take(nrecv as usize)
|
||||||
|
.filter_map(|(addr, hdr)| {
|
||||||
|
let addr = cast_socket_addr(addr, &hdr)?.to_std();
|
||||||
|
Some((addr, hdr))
|
||||||
|
})
|
||||||
|
.zip(packets.iter_mut())
|
||||||
|
.for_each(|((addr, hdr), pkt)| {
|
||||||
|
pkt.meta.size = hdr.msg_len as usize;
|
||||||
|
pkt.meta.set_addr(&addr);
|
||||||
|
npkts += 1;
|
||||||
|
});
|
||||||
|
let total_size = packets.iter().take(npkts).map(|pkt| pkt.meta.size).sum();
|
||||||
Ok((total_size, npkts))
|
Ok((total_size, npkts))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,5 +1,12 @@
|
|||||||
//! The `sendmmsg` module provides sendmmsg() API implementation
|
//! The `sendmmsg` module provides sendmmsg() API implementation
|
||||||
|
|
||||||
|
#[cfg(target_os = "linux")]
|
||||||
|
use {
|
||||||
|
itertools::izip,
|
||||||
|
libc::{iovec, mmsghdr, sockaddr_in, sockaddr_in6, sockaddr_storage},
|
||||||
|
nix::sys::socket::InetAddr,
|
||||||
|
std::os::unix::io::AsRawFd,
|
||||||
|
};
|
||||||
use {
|
use {
|
||||||
std::{
|
std::{
|
||||||
borrow::Borrow,
|
borrow::Borrow,
|
||||||
@ -41,78 +48,50 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(target_os = "linux")]
|
|
||||||
use libc::{iovec, mmsghdr, sockaddr_in, sockaddr_in6, sockaddr_storage};
|
|
||||||
|
|
||||||
#[cfg(target_os = "linux")]
|
#[cfg(target_os = "linux")]
|
||||||
fn mmsghdr_for_packet(
|
fn mmsghdr_for_packet(
|
||||||
packet: &[u8],
|
packet: &[u8],
|
||||||
dest: &SocketAddr,
|
dest: &SocketAddr,
|
||||||
iovs: &mut Vec<iovec>,
|
iov: &mut iovec,
|
||||||
addrs: &mut Vec<sockaddr_storage>,
|
addr: &mut sockaddr_storage,
|
||||||
hdrs: &mut Vec<mmsghdr>,
|
hdr: &mut mmsghdr,
|
||||||
) {
|
) {
|
||||||
use libc::c_void;
|
const SIZE_OF_SOCKADDR_IN: usize = std::mem::size_of::<sockaddr_in>();
|
||||||
use nix::sys::socket::InetAddr;
|
const SIZE_OF_SOCKADDR_IN6: usize = std::mem::size_of::<sockaddr_in6>();
|
||||||
use std::mem;
|
|
||||||
|
|
||||||
const SIZE_OF_SOCKADDR_IN: usize = mem::size_of::<sockaddr_in>();
|
*iov = iovec {
|
||||||
const SIZE_OF_SOCKADDR_IN6: usize = mem::size_of::<sockaddr_in6>();
|
iov_base: packet.as_ptr() as *mut libc::c_void,
|
||||||
|
|
||||||
let index = hdrs.len();
|
|
||||||
|
|
||||||
debug_assert!(index < hdrs.capacity());
|
|
||||||
debug_assert!(index < addrs.capacity());
|
|
||||||
debug_assert!(index < iovs.capacity());
|
|
||||||
debug_assert_eq!(hdrs.len(), addrs.len());
|
|
||||||
debug_assert_eq!(hdrs.len(), iovs.len());
|
|
||||||
|
|
||||||
iovs.push(iovec {
|
|
||||||
iov_base: packet.as_ptr() as *mut c_void,
|
|
||||||
iov_len: packet.len(),
|
iov_len: packet.len(),
|
||||||
});
|
};
|
||||||
|
hdr.msg_hdr.msg_iov = iov;
|
||||||
let hdr: mmsghdr = unsafe { mem::zeroed() };
|
hdr.msg_hdr.msg_iovlen = 1;
|
||||||
hdrs.push(hdr);
|
hdr.msg_hdr.msg_name = addr as *mut _ as *mut _;
|
||||||
|
|
||||||
let addr_storage: sockaddr_storage = unsafe { mem::zeroed() };
|
|
||||||
addrs.push(addr_storage);
|
|
||||||
|
|
||||||
debug_assert!(index < hdrs.len());
|
|
||||||
|
|
||||||
hdrs[index].msg_hdr.msg_iov = &mut iovs[index];
|
|
||||||
hdrs[index].msg_hdr.msg_iovlen = 1;
|
|
||||||
|
|
||||||
match InetAddr::from_std(dest) {
|
match InetAddr::from_std(dest) {
|
||||||
InetAddr::V4(addr) => {
|
InetAddr::V4(dest) => {
|
||||||
unsafe {
|
unsafe {
|
||||||
core::ptr::write(&mut addrs[index] as *mut _ as *mut _, addr);
|
std::ptr::write(addr as *mut _ as *mut _, dest);
|
||||||
}
|
}
|
||||||
hdrs[index].msg_hdr.msg_name = &mut addrs[index] as *mut _ as *mut _;
|
hdr.msg_hdr.msg_namelen = SIZE_OF_SOCKADDR_IN as u32;
|
||||||
hdrs[index].msg_hdr.msg_namelen = SIZE_OF_SOCKADDR_IN as u32;
|
|
||||||
}
|
}
|
||||||
InetAddr::V6(addr) => {
|
InetAddr::V6(dest) => {
|
||||||
unsafe {
|
unsafe {
|
||||||
core::ptr::write(&mut addrs[index] as *mut _ as *mut _, addr);
|
std::ptr::write(addr as *mut _ as *mut _, dest);
|
||||||
}
|
}
|
||||||
hdrs[index].msg_hdr.msg_name = &mut addrs[index] as *mut _ as *mut _;
|
hdr.msg_hdr.msg_namelen = SIZE_OF_SOCKADDR_IN6 as u32;
|
||||||
hdrs[index].msg_hdr.msg_namelen = SIZE_OF_SOCKADDR_IN6 as u32;
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(target_os = "linux")]
|
#[cfg(target_os = "linux")]
|
||||||
fn sendmmsg_retry(sock: &UdpSocket, hdrs: &mut Vec<mmsghdr>) -> Result<(), SendPktsError> {
|
fn sendmmsg_retry(sock: &UdpSocket, hdrs: &mut Vec<mmsghdr>) -> Result<(), SendPktsError> {
|
||||||
use libc::sendmmsg;
|
|
||||||
use std::os::unix::io::AsRawFd;
|
|
||||||
|
|
||||||
let sock_fd = sock.as_raw_fd();
|
let sock_fd = sock.as_raw_fd();
|
||||||
let mut total_sent = 0;
|
let mut total_sent = 0;
|
||||||
let mut erropt = None;
|
let mut erropt = None;
|
||||||
|
|
||||||
let mut pkts = &mut hdrs[..];
|
let mut pkts = &mut hdrs[..];
|
||||||
while !pkts.is_empty() {
|
while !pkts.is_empty() {
|
||||||
let npkts = match unsafe { sendmmsg(sock_fd, &mut pkts[0], pkts.len() as u32, 0) } {
|
let npkts = match unsafe { libc::sendmmsg(sock_fd, &mut pkts[0], pkts.len() as u32, 0) } {
|
||||||
-1 => {
|
-1 => {
|
||||||
if erropt.is_none() {
|
if erropt.is_none() {
|
||||||
erropt = Some(io::Error::last_os_error());
|
erropt = Some(io::Error::last_os_error());
|
||||||
@ -143,22 +122,14 @@ where
|
|||||||
S: Borrow<SocketAddr>,
|
S: Borrow<SocketAddr>,
|
||||||
T: AsRef<[u8]>,
|
T: AsRef<[u8]>,
|
||||||
{
|
{
|
||||||
// The vectors are allocated with capacity, as later code inserts elements
|
let size = packets.len();
|
||||||
// at specific indices, and uses the address of the vector index in hdrs
|
#[allow(clippy::uninit_assumed_init)]
|
||||||
let mut iovs: Vec<iovec> = Vec::with_capacity(packets.len());
|
let mut iovs = vec![unsafe { std::mem::MaybeUninit::uninit().assume_init() }; size];
|
||||||
let mut addrs: Vec<sockaddr_storage> = Vec::with_capacity(packets.len());
|
let mut addrs = vec![unsafe { std::mem::zeroed() }; size];
|
||||||
let mut hdrs: Vec<mmsghdr> = Vec::with_capacity(packets.len());
|
let mut hdrs = vec![unsafe { std::mem::zeroed() }; size];
|
||||||
|
for ((pkt, dest), hdr, iov, addr) in izip!(packets, &mut hdrs, &mut iovs, &mut addrs) {
|
||||||
for (pkt, dest) in packets.iter() {
|
mmsghdr_for_packet(pkt.as_ref(), dest.borrow(), iov, addr, hdr);
|
||||||
mmsghdr_for_packet(
|
|
||||||
pkt.as_ref(),
|
|
||||||
dest.borrow(),
|
|
||||||
&mut iovs,
|
|
||||||
&mut addrs,
|
|
||||||
&mut hdrs,
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
sendmmsg_retry(sock, &mut hdrs)
|
sendmmsg_retry(sock, &mut hdrs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user