diff --git a/Cargo.lock b/Cargo.lock index 711442c5d5..39a4d8e256 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5465,6 +5465,7 @@ dependencies = [ name = "solana-streamer" version = "1.7.13" dependencies = [ + "itertools 0.10.1", "libc", "log 0.4.11", "nix 0.19.0", diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index 76651b0689..055088c88f 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -10,6 +10,7 @@ documentation = "https://docs.rs/solana-streamer" edition = "2018" [dependencies] +itertools = "0.10.1" log = "0.4.11" solana-metrics = { path = "../metrics", version = "=1.7.13" } solana-sdk = { path = "../sdk", version = "=1.7.13" } diff --git a/streamer/src/recvmmsg.rs b/streamer/src/recvmmsg.rs index 32b2f4c4e1..64eb3e86b4 100644 --- a/streamer/src/recvmmsg.rs +++ b/streamer/src/recvmmsg.rs @@ -1,10 +1,17 @@ //! The `recvmmsg` module provides recvmmsg() API implementation -use crate::packet::Packet; pub use solana_perf::packet::NUM_RCVMMSGS; -use std::cmp; -use std::io; -use std::net::UdpSocket; +use { + crate::packet::Packet, + std::{cmp, io, net::UdpSocket}, +}; +#[cfg(target_os = "linux")] +use { + itertools::izip, + libc::{iovec, mmsghdr, sockaddr_storage, socklen_t, AF_INET, AF_INET6, MSG_WAITFORONE}, + nix::sys::socket::InetAddr, + std::{mem, os::unix::io::AsRawFd}, +}; #[cfg(not(target_os = "linux"))] pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result<(usize, usize)> { @@ -34,76 +41,79 @@ pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result<(usiz Ok((total_size, i)) } +#[cfg(target_os = "linux")] +fn cast_socket_addr(addr: &sockaddr_storage, hdr: &mmsghdr) -> Option { + use libc::{sa_family_t, sockaddr_in, sockaddr_in6}; + const SOCKADDR_IN_SIZE: usize = std::mem::size_of::(); + const SOCKADDR_IN6_SIZE: usize = std::mem::size_of::(); + if addr.ss_family == AF_INET as sa_family_t + && hdr.msg_hdr.msg_namelen == SOCKADDR_IN_SIZE as socklen_t + { + let addr = addr as *const _ as *const sockaddr_in; + return Some(unsafe { InetAddr::V4(*addr) }); + } + if addr.ss_family == AF_INET6 as sa_family_t + && hdr.msg_hdr.msg_namelen == SOCKADDR_IN6_SIZE as socklen_t + { + let addr = addr as *const _ as *const sockaddr_in6; + return Some(unsafe { InetAddr::V6(*addr) }); + } + error!( + "recvmmsg unexpected ss_family:{} msg_namelen:{}", + addr.ss_family, hdr.msg_hdr.msg_namelen + ); + None +} + #[cfg(target_os = "linux")] #[allow(clippy::uninit_assumed_init)] pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result<(usize, usize)> { - use libc::{ - c_void, iovec, mmsghdr, recvmmsg, sa_family_t, sockaddr_in, sockaddr_in6, sockaddr_storage, - socklen_t, timespec, AF_INET, AF_INET6, MSG_WAITFORONE, - }; - use nix::sys::socket::InetAddr; - use std::mem; - use std::os::unix::io::AsRawFd; - - const SOCKADDR_STORAGE_SIZE: socklen_t = mem::size_of::() as socklen_t; - const SOCKADDR_IN_SIZE: socklen_t = mem::size_of::() as socklen_t; - const SOCKADDR_IN6_SIZE: socklen_t = mem::size_of::() as socklen_t; + const SOCKADDR_STORAGE_SIZE: usize = mem::size_of::(); let mut hdrs: [mmsghdr; NUM_RCVMMSGS] = unsafe { mem::zeroed() }; let mut iovs: [iovec; NUM_RCVMMSGS] = unsafe { mem::MaybeUninit::uninit().assume_init() }; - let mut addr: [sockaddr_storage; NUM_RCVMMSGS] = unsafe { mem::zeroed() }; + let mut addrs: [sockaddr_storage; NUM_RCVMMSGS] = unsafe { mem::zeroed() }; let sock_fd = sock.as_raw_fd(); let count = cmp::min(iovs.len(), packets.len()); - for i in 0..count { - iovs[i].iov_base = packets[i].data.as_mut_ptr() as *mut c_void; - iovs[i].iov_len = packets[i].data.len(); - - hdrs[i].msg_hdr.msg_name = &mut addr[i] as *mut _ as *mut _; - hdrs[i].msg_hdr.msg_namelen = SOCKADDR_STORAGE_SIZE; - hdrs[i].msg_hdr.msg_iov = &mut iovs[i]; - hdrs[i].msg_hdr.msg_iovlen = 1; + for (packet, hdr, iov, addr) in + izip!(packets.iter_mut(), &mut hdrs, &mut iovs, &mut addrs).take(count) + { + *iov = iovec { + iov_base: packet.data.as_mut_ptr() as *mut libc::c_void, + iov_len: packet.data.len(), + }; + hdr.msg_hdr.msg_name = addr as *mut _ as *mut _; + hdr.msg_hdr.msg_namelen = SOCKADDR_STORAGE_SIZE as socklen_t; + hdr.msg_hdr.msg_iov = iov; + hdr.msg_hdr.msg_iovlen = 1; } - let mut ts = timespec { + let mut ts = libc::timespec { tv_sec: 1, tv_nsec: 0, }; - - let mut total_size = 0; - let npkts = - match unsafe { recvmmsg(sock_fd, &mut hdrs[0], count as u32, MSG_WAITFORONE, &mut ts) } { - -1 => return Err(io::Error::last_os_error()), - n => { - let mut pkt_idx: usize = 0; - for i in 0..n as usize { - let inet_addr = if addr[i].ss_family == AF_INET as sa_family_t - && hdrs[i].msg_hdr.msg_namelen == SOCKADDR_IN_SIZE - { - let a: *const sockaddr_in = &addr[i] as *const _ as *const _; - unsafe { InetAddr::V4(*a) } - } else if addr[i].ss_family == AF_INET6 as sa_family_t - && hdrs[i].msg_hdr.msg_namelen == SOCKADDR_IN6_SIZE - { - let a: *const sockaddr_in6 = &addr[i] as *const _ as *const _; - unsafe { InetAddr::V6(*a) } - } else { - error!( - "recvmmsg unexpected ss_family:{} msg_namelen:{}", - addr[i].ss_family, hdrs[i].msg_hdr.msg_namelen - ); - continue; - }; - let mut p = &mut packets[pkt_idx]; - p.meta.size = hdrs[i].msg_len as usize; - total_size += p.meta.size; - p.meta.set_addr(&inet_addr.to_std()); - pkt_idx += 1; - } - pkt_idx - } - }; - + let nrecv = + unsafe { libc::recvmmsg(sock_fd, &mut hdrs[0], count as u32, MSG_WAITFORONE, &mut ts) }; + if nrecv < 0 { + return Err(io::Error::last_os_error()); + } + let mut npkts = 0; + addrs + .iter() + .zip(&hdrs) + .take(nrecv as usize) + .filter_map(|(addr, hdr)| { + let addr = cast_socket_addr(addr, &hdr)?.to_std(); + Some((addr, hdr)) + }) + .zip(packets.iter_mut()) + .for_each(|((addr, hdr), pkt)| { + pkt.meta.size = hdr.msg_len as usize; + pkt.meta.set_addr(&addr); + npkts += 1; + }); + let total_size = packets.iter().take(npkts).map(|pkt| pkt.meta.size).sum(); Ok((total_size, npkts)) } diff --git a/streamer/src/sendmmsg.rs b/streamer/src/sendmmsg.rs index 7f2526b5b4..e3ce61713e 100644 --- a/streamer/src/sendmmsg.rs +++ b/streamer/src/sendmmsg.rs @@ -1,5 +1,12 @@ //! The `sendmmsg` module provides sendmmsg() API implementation +#[cfg(target_os = "linux")] +use { + itertools::izip, + libc::{iovec, mmsghdr, sockaddr_in, sockaddr_in6, sockaddr_storage}, + nix::sys::socket::InetAddr, + std::os::unix::io::AsRawFd, +}; use { std::{ borrow::Borrow, @@ -41,78 +48,50 @@ where } } -#[cfg(target_os = "linux")] -use libc::{iovec, mmsghdr, sockaddr_in, sockaddr_in6, sockaddr_storage}; - #[cfg(target_os = "linux")] fn mmsghdr_for_packet( packet: &[u8], dest: &SocketAddr, - iovs: &mut Vec, - addrs: &mut Vec, - hdrs: &mut Vec, + iov: &mut iovec, + addr: &mut sockaddr_storage, + hdr: &mut mmsghdr, ) { - use libc::c_void; - use nix::sys::socket::InetAddr; - use std::mem; + const SIZE_OF_SOCKADDR_IN: usize = std::mem::size_of::(); + const SIZE_OF_SOCKADDR_IN6: usize = std::mem::size_of::(); - const SIZE_OF_SOCKADDR_IN: usize = mem::size_of::(); - const SIZE_OF_SOCKADDR_IN6: usize = mem::size_of::(); - - let index = hdrs.len(); - - debug_assert!(index < hdrs.capacity()); - debug_assert!(index < addrs.capacity()); - debug_assert!(index < iovs.capacity()); - debug_assert_eq!(hdrs.len(), addrs.len()); - debug_assert_eq!(hdrs.len(), iovs.len()); - - iovs.push(iovec { - iov_base: packet.as_ptr() as *mut c_void, + *iov = iovec { + iov_base: packet.as_ptr() as *mut libc::c_void, iov_len: packet.len(), - }); - - let hdr: mmsghdr = unsafe { mem::zeroed() }; - hdrs.push(hdr); - - let addr_storage: sockaddr_storage = unsafe { mem::zeroed() }; - addrs.push(addr_storage); - - debug_assert!(index < hdrs.len()); - - hdrs[index].msg_hdr.msg_iov = &mut iovs[index]; - hdrs[index].msg_hdr.msg_iovlen = 1; + }; + hdr.msg_hdr.msg_iov = iov; + hdr.msg_hdr.msg_iovlen = 1; + hdr.msg_hdr.msg_name = addr as *mut _ as *mut _; match InetAddr::from_std(dest) { - InetAddr::V4(addr) => { + InetAddr::V4(dest) => { unsafe { - core::ptr::write(&mut addrs[index] as *mut _ as *mut _, addr); + std::ptr::write(addr as *mut _ as *mut _, dest); } - hdrs[index].msg_hdr.msg_name = &mut addrs[index] as *mut _ as *mut _; - hdrs[index].msg_hdr.msg_namelen = SIZE_OF_SOCKADDR_IN as u32; + hdr.msg_hdr.msg_namelen = SIZE_OF_SOCKADDR_IN as u32; } - InetAddr::V6(addr) => { + InetAddr::V6(dest) => { unsafe { - core::ptr::write(&mut addrs[index] as *mut _ as *mut _, addr); + std::ptr::write(addr as *mut _ as *mut _, dest); } - hdrs[index].msg_hdr.msg_name = &mut addrs[index] as *mut _ as *mut _; - hdrs[index].msg_hdr.msg_namelen = SIZE_OF_SOCKADDR_IN6 as u32; + hdr.msg_hdr.msg_namelen = SIZE_OF_SOCKADDR_IN6 as u32; } }; } #[cfg(target_os = "linux")] fn sendmmsg_retry(sock: &UdpSocket, hdrs: &mut Vec) -> Result<(), SendPktsError> { - use libc::sendmmsg; - use std::os::unix::io::AsRawFd; - let sock_fd = sock.as_raw_fd(); let mut total_sent = 0; let mut erropt = None; let mut pkts = &mut hdrs[..]; while !pkts.is_empty() { - let npkts = match unsafe { sendmmsg(sock_fd, &mut pkts[0], pkts.len() as u32, 0) } { + let npkts = match unsafe { libc::sendmmsg(sock_fd, &mut pkts[0], pkts.len() as u32, 0) } { -1 => { if erropt.is_none() { erropt = Some(io::Error::last_os_error()); @@ -143,22 +122,14 @@ where S: Borrow, T: AsRef<[u8]>, { - // The vectors are allocated with capacity, as later code inserts elements - // at specific indices, and uses the address of the vector index in hdrs - let mut iovs: Vec = Vec::with_capacity(packets.len()); - let mut addrs: Vec = Vec::with_capacity(packets.len()); - let mut hdrs: Vec = Vec::with_capacity(packets.len()); - - for (pkt, dest) in packets.iter() { - mmsghdr_for_packet( - pkt.as_ref(), - dest.borrow(), - &mut iovs, - &mut addrs, - &mut hdrs, - ); + let size = packets.len(); + #[allow(clippy::uninit_assumed_init)] + let mut iovs = vec![unsafe { std::mem::MaybeUninit::uninit().assume_init() }; size]; + let mut addrs = vec![unsafe { std::mem::zeroed() }; size]; + let mut hdrs = vec![unsafe { std::mem::zeroed() }; size]; + for ((pkt, dest), hdr, iov, addr) in izip!(packets, &mut hdrs, &mut iovs, &mut addrs) { + mmsghdr_for_packet(pkt.as_ref(), dest.borrow(), iov, addr, hdr); } - sendmmsg_retry(sock, &mut hdrs) }