diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 2bb0ba5605..7a219dc3ea 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -23,7 +23,7 @@ use solana_poh::poh_recorder::WorkingBankEntry; use solana_runtime::{bank::Bank, bank_forks::BankForks}; use solana_sdk::timing::timestamp; use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair}; -use solana_streamer::sendmmsg::send_mmsg; +use solana_streamer::sendmmsg::{batch_send, SendPktsError}; use std::sync::atomic::AtomicU64; use std::{ collections::HashMap, @@ -402,10 +402,11 @@ pub fn broadcast_shreds( self_pubkey: Pubkey, bank_forks: &Arc>, ) -> Result<()> { + let mut result = Ok(()); let broadcast_len = cluster_nodes.num_peers(); if broadcast_len == 0 { update_peer_stats(1, 1, last_datapoint_submit); - return Ok(()); + return result; } let mut shred_select = Measure::start("shred_select"); let root_bank = bank_forks.read().unwrap().root_bank(); @@ -414,24 +415,20 @@ pub fn broadcast_shreds( .filter_map(|shred| { let seed = shred.seed(Some(self_pubkey), &root_bank); let node = cluster_nodes.get_broadcast_peer(seed)?; - Some((&shred.payload, &node.tvu)) + Some((&shred.payload[..], &node.tvu)) }) .collect(); shred_select.stop(); transmit_stats.shred_select += shred_select.as_us(); - let mut sent = 0; let mut send_mmsg_time = Measure::start("send_mmsg"); - while sent < packets.len() { - match send_mmsg(s, &packets[sent..]) { - Ok(n) => sent += n, - Err(e) => { - return Err(Error::Io(e)); - } - } + if let Err(SendPktsError::IoError(ioerr, num_failed)) = batch_send(s, &packets[..]) { + transmit_stats.dropped_packets += num_failed; + result = Err(Error::Io(ioerr)); } send_mmsg_time.stop(); transmit_stats.send_mmsg_elapsed += send_mmsg_time.as_us(); + transmit_stats.total_packets += packets.len(); let num_live_peers = cluster_nodes.num_peers_live(timestamp()) as i64; update_peer_stats( @@ -439,7 +436,7 @@ pub fn broadcast_shreds( broadcast_len as i64 + 1, last_datapoint_submit, ); - Ok(()) + result } #[cfg(test)] diff --git a/core/src/broadcast_stage/broadcast_metrics.rs b/core/src/broadcast_stage/broadcast_metrics.rs index b29ace0791..364da3d79a 100644 --- a/core/src/broadcast_stage/broadcast_metrics.rs +++ b/core/src/broadcast_stage/broadcast_metrics.rs @@ -19,6 +19,8 @@ pub struct TransmitShredsStats { pub get_peers_elapsed: u64, pub shred_select: u64, pub num_shreds: usize, + pub total_packets: usize, + pub dropped_packets: usize, } impl BroadcastStats for TransmitShredsStats { @@ -28,6 +30,8 @@ impl BroadcastStats for TransmitShredsStats { self.get_peers_elapsed += new_stats.get_peers_elapsed; self.num_shreds += new_stats.num_shreds; self.shred_select += new_stats.shred_select; + self.total_packets += new_stats.total_packets; + self.dropped_packets += new_stats.dropped_packets; } fn report_stats(&mut self, slot: Slot, slot_start: Instant) { datapoint_info!( @@ -45,6 +49,8 @@ impl BroadcastStats for TransmitShredsStats { ("get_peers_elapsed", self.get_peers_elapsed as i64, i64), ("num_shreds", self.num_shreds as i64, i64), ("shred_select", self.shred_select as i64, i64), + ("total_packets", self.total_packets as i64, i64), + ("dropped_packets", self.dropped_packets as i64, i64), ); } } @@ -173,6 +179,8 @@ mod test { send_mmsg_elapsed: 3, shred_select: 4, num_shreds: 5, + total_packets: 6, + dropped_packets: 7, }, &Some(BroadcastShredBatchInfo { slot: 0, @@ -190,14 +198,18 @@ mod test { assert_eq!(slot_0_stats.broadcast_shred_stats.send_mmsg_elapsed, 3); assert_eq!(slot_0_stats.broadcast_shred_stats.shred_select, 4); assert_eq!(slot_0_stats.broadcast_shred_stats.num_shreds, 5); + assert_eq!(slot_0_stats.broadcast_shred_stats.total_packets, 6); + assert_eq!(slot_0_stats.broadcast_shred_stats.dropped_packets, 7); slot_broadcast_stats.update( &TransmitShredsStats { - transmit_elapsed: 7, - get_peers_elapsed: 8, - send_mmsg_elapsed: 9, - shred_select: 10, - num_shreds: 11, + transmit_elapsed: 11, + get_peers_elapsed: 12, + send_mmsg_elapsed: 13, + shred_select: 14, + num_shreds: 15, + total_packets: 16, + dropped_packets: 17, }, &None, ); @@ -211,6 +223,8 @@ mod test { assert_eq!(slot_0_stats.broadcast_shred_stats.send_mmsg_elapsed, 3); assert_eq!(slot_0_stats.broadcast_shred_stats.shred_select, 4); assert_eq!(slot_0_stats.broadcast_shred_stats.num_shreds, 5); + assert_eq!(slot_0_stats.broadcast_shred_stats.total_packets, 6); + assert_eq!(slot_0_stats.broadcast_shred_stats.dropped_packets, 7); // If another batch is given, then total number of batches == num_expected_batches == 2, // so the batch should be purged from the HashMap @@ -221,6 +235,8 @@ mod test { send_mmsg_elapsed: 1, shred_select: 1, num_shreds: 1, + total_packets: 1, + dropped_packets: 1, }, &Some(BroadcastShredBatchInfo { slot: 0, diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index b8d4df85a7..544c09983a 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -63,7 +63,7 @@ use { }, solana_streamer::{ packet, - sendmmsg::multicast, + sendmmsg::{multi_target_send, SendPktsError}, streamer::{PacketReceiver, PacketSender}, }, solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY, @@ -1239,27 +1239,17 @@ impl ClusterInfo { } else { peers.iter().map(|peer| &peer.tvu).collect() }; - let mut dests = &dests[..]; let data = &packet.data[..packet.meta.size]; - while !dests.is_empty() { - match multicast(s, data, dests) { - Ok(n) => dests = &dests[n..], - Err(err) => { - inc_new_counter_error!("cluster_info-retransmit-send_to_error", dests.len(), 1); - error!("retransmit multicast: {:?}", err); - break; - } - } - } - let mut errs = 0; - for dest in dests { - if let Err(err) = s.send_to(data, dest) { - error!("retransmit send: {}, {:?}", dest, err); - errs += 1; - } - } - if errs != 0 { - inc_new_counter_error!("cluster_info-retransmit-error", errs, 1); + + if let Err(SendPktsError::IoError(ioerr, num_failed)) = multi_target_send(s, data, &dests) { + inc_new_counter_info!("cluster_info-retransmit-packets", dests.len(), 1); + inc_new_counter_error!("cluster_info-retransmit-error", num_failed, 1); + error!( + "retransmit_to multi_target_send error: {:?}, {}/{} packets failed", + ioerr, + num_failed, + dests.len(), + ); } } diff --git a/streamer/src/sendmmsg.rs b/streamer/src/sendmmsg.rs index d64af9c8ca..d1e4a5875f 100644 --- a/streamer/src/sendmmsg.rs +++ b/streamer/src/sendmmsg.rs @@ -2,159 +2,165 @@ use std::io; use std::net::{SocketAddr, UdpSocket}; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum SendPktsError { + /// IO Error during send: first error, num failed packets + #[error("IO Error, some packets could not be sent")] + IoError(io::Error, usize), +} #[cfg(not(target_os = "linux"))] -pub fn send_mmsg(sock: &UdpSocket, packets: &[(&Vec, &SocketAddr)]) -> io::Result { - let count = packets.len(); +pub fn batch_send(sock: &UdpSocket, packets: &[(&[u8], &SocketAddr)]) -> Result<(), SendPktsError> { + let mut num_failed = 0; + let mut erropt = None; for (p, a) in packets { - sock.send_to(p, *a)?; + if let Err(e) = sock.send_to(p, *a) { + num_failed += 1; + if erropt.is_none() { + erropt = Some(e); + } + } } - Ok(count) + if let Some(err) = erropt { + Err(SendPktsError::IoError(err, num_failed)) + } else { + Ok(()) + } } #[cfg(target_os = "linux")] -use libc::{iovec, mmsghdr, sockaddr_in, sockaddr_in6}; +use libc::{iovec, mmsghdr, sockaddr_in, sockaddr_in6, sockaddr_storage}; #[cfg(target_os = "linux")] fn mmsghdr_for_packet( packet: &[u8], dest: &SocketAddr, - index: usize, - addr_in_len: u32, - addr_in6_len: u32, iovs: &mut Vec, - addr_in: &mut Vec, - addr_in6: &mut Vec, -) -> mmsghdr { + addrs: &mut Vec, + hdrs: &mut Vec, +) { use libc::c_void; use nix::sys::socket::InetAddr; use std::mem; + const SIZE_OF_SOCKADDR_IN: usize = mem::size_of::(); + const SIZE_OF_SOCKADDR_IN6: usize = mem::size_of::(); + + let index = hdrs.len(); + + debug_assert!(index < hdrs.capacity()); + debug_assert!(index < addrs.capacity()); + debug_assert!(index < iovs.capacity()); + debug_assert_eq!(hdrs.len(), addrs.len()); + debug_assert_eq!(hdrs.len(), iovs.len()); + iovs.push(iovec { iov_base: packet.as_ptr() as *mut c_void, iov_len: packet.len(), }); - let mut hdr: mmsghdr = unsafe { mem::zeroed() }; - hdr.msg_hdr.msg_iov = &mut iovs[index]; - hdr.msg_hdr.msg_iovlen = 1; - hdr.msg_len = packet.len() as u32; + let hdr: mmsghdr = unsafe { mem::zeroed() }; + hdrs.push(hdr); + + let addr_storage: sockaddr_storage = unsafe { mem::zeroed() }; + addrs.push(addr_storage); + + debug_assert!(index < hdrs.len()); + + hdrs[index].msg_hdr.msg_iov = &mut iovs[index]; + hdrs[index].msg_hdr.msg_iovlen = 1; match InetAddr::from_std(dest) { InetAddr::V4(addr) => { - let index = addr_in.len(); - addr_in.push(addr); - hdr.msg_hdr.msg_name = &mut addr_in[index] as *mut _ as *mut _; - hdr.msg_hdr.msg_namelen = addr_in_len; + unsafe { + core::ptr::write(&mut addrs[index] as *mut _ as *mut _, addr); + } + hdrs[index].msg_hdr.msg_name = &mut addrs[index] as *mut _ as *mut _; + hdrs[index].msg_hdr.msg_namelen = SIZE_OF_SOCKADDR_IN as u32; } InetAddr::V6(addr) => { - let index = addr_in6.len(); - addr_in6.push(addr); - hdr.msg_hdr.msg_name = &mut addr_in6[index] as *mut _ as *mut _; - hdr.msg_hdr.msg_namelen = addr_in6_len; + unsafe { + core::ptr::write(&mut addrs[index] as *mut _ as *mut _, addr); + } + hdrs[index].msg_hdr.msg_name = &mut addrs[index] as *mut _ as *mut _; + hdrs[index].msg_hdr.msg_namelen = SIZE_OF_SOCKADDR_IN6 as u32; } }; - hdr } #[cfg(target_os = "linux")] -pub fn send_mmsg(sock: &UdpSocket, packets: &[(&Vec, &SocketAddr)]) -> io::Result { - use libc::{sendmmsg, socklen_t}; - use std::mem; +fn sendmmsg_retry(sock: &UdpSocket, hdrs: &mut Vec) -> Result<(), SendPktsError> { + use libc::sendmmsg; use std::os::unix::io::AsRawFd; + let sock_fd = sock.as_raw_fd(); + let mut total_sent = 0; + let mut erropt = None; + + let mut pkts = &mut hdrs[..]; + while !pkts.is_empty() { + let npkts = match unsafe { sendmmsg(sock_fd, &mut pkts[0], pkts.len() as u32, 0) } { + -1 => { + if erropt.is_none() { + erropt = Some(io::Error::last_os_error()); + } + // skip over the failing packet + 1_usize + } + n => { + // if we fail to send all packets we advance to the failing + // packet and retry in order to capture the error code + total_sent += n as usize; + n as usize + } + }; + pkts = &mut pkts[npkts..]; + } + + if let Some(err) = erropt { + Err(SendPktsError::IoError(err, hdrs.len() - total_sent)) + } else { + Ok(()) + } +} + +#[cfg(target_os = "linux")] +pub fn batch_send(sock: &UdpSocket, packets: &[(&[u8], &SocketAddr)]) -> Result<(), SendPktsError> { // The vectors are allocated with capacity, as later code inserts elements // at specific indices, and uses the address of the vector index in hdrs let mut iovs: Vec = Vec::with_capacity(packets.len()); - let mut addr_in: Vec = Vec::with_capacity(packets.len()); - let mut addr_in6: Vec = Vec::with_capacity(packets.len()); + let mut addrs: Vec = Vec::with_capacity(packets.len()); + let mut hdrs: Vec = Vec::with_capacity(packets.len()); - let addr_in_len = mem::size_of_val(&addr_in) as socklen_t; - let addr_in6_len = mem::size_of_val(&addr_in6) as socklen_t; - let sock_fd = sock.as_raw_fd(); - - let mut hdrs: Vec = packets - .iter() - .enumerate() - .map(|(i, (packet, dest))| { - mmsghdr_for_packet( - packet, - dest, - i, - addr_in_len as u32, - addr_in6_len as u32, - &mut iovs, - &mut addr_in, - &mut addr_in6, - ) - }) - .collect(); - - let npkts = match unsafe { sendmmsg(sock_fd, &mut hdrs[0], packets.len() as u32, 0) } { - -1 => return Err(io::Error::last_os_error()), - n => n as usize, - }; - Ok(npkts) -} - -#[cfg(not(target_os = "linux"))] -pub fn multicast(sock: &UdpSocket, packet: &[u8], dests: &[&SocketAddr]) -> io::Result { - let count = dests.len(); - for a in dests { - sock.send_to(packet, a)?; + for (pkt, dest) in packets.iter() { + mmsghdr_for_packet(pkt, dest, &mut iovs, &mut addrs, &mut hdrs); } - Ok(count) + sendmmsg_retry(sock, &mut hdrs) } -#[cfg(target_os = "linux")] -pub fn multicast(sock: &UdpSocket, packet: &[u8], dests: &[&SocketAddr]) -> io::Result { - use libc::{sendmmsg, socklen_t}; - use std::mem; - use std::os::unix::io::AsRawFd; - - // The vectors are allocated with capacity, as later code inserts elements - // at specific indices, and uses the address of the vector index in hdrs - let mut iovs: Vec = Vec::with_capacity(dests.len()); - let mut addr_in: Vec = Vec::with_capacity(dests.len()); - let mut addr_in6: Vec = Vec::with_capacity(dests.len()); - - let addr_in_len = mem::size_of_val(&addr_in) as socklen_t; - let addr_in6_len = mem::size_of_val(&addr_in6) as socklen_t; - let sock_fd = sock.as_raw_fd(); - - let mut hdrs: Vec = dests - .iter() - .enumerate() - .map(|(i, dest)| { - mmsghdr_for_packet( - packet, - dest, - i, - addr_in_len as u32, - addr_in6_len as u32, - &mut iovs, - &mut addr_in, - &mut addr_in6, - ) - }) - .collect(); - - let npkts = match unsafe { sendmmsg(sock_fd, &mut hdrs[0], dests.len() as u32, 0) } { - -1 => return Err(io::Error::last_os_error()), - n => n as usize, - }; - Ok(npkts) +pub fn multi_target_send( + sock: &UdpSocket, + packet: &[u8], + dests: &[&SocketAddr], +) -> Result<(), SendPktsError> { + let pkts: Vec<_> = dests.iter().map(|addr| (packet, *addr)).collect(); + batch_send(sock, &pkts[..]) } #[cfg(test)] mod tests { use crate::packet::Packet; use crate::recvmmsg::recv_mmsg; - use crate::sendmmsg::{multicast, send_mmsg}; + use crate::sendmmsg::{batch_send, multi_target_send, SendPktsError}; use solana_sdk::packet::PACKET_DATA_SIZE; - use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}; + use std::{ + io::ErrorKind, + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}, + }; #[test] pub fn test_send_mmsg_one_dest() { @@ -163,10 +169,10 @@ mod tests { let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); let packets: Vec<_> = (0..32).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect(); - let packet_refs: Vec<_> = packets.iter().map(|p| (p, &addr)).collect(); + let packet_refs: Vec<_> = packets.iter().map(|p| (&p[..], &addr)).collect(); - let sent = send_mmsg(&sender, &packet_refs).ok(); - assert_eq!(sent, Some(32)); + let sent = batch_send(&sender, &packet_refs[..]).ok(); + assert_eq!(sent, Some(())); let mut packets = vec![Packet::default(); 32]; let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1; @@ -187,11 +193,17 @@ mod tests { let packet_refs: Vec<_> = packets .iter() .enumerate() - .map(|(i, p)| if i < 16 { (p, &addr) } else { (p, &addr2) }) + .map(|(i, p)| { + if i < 16 { + (&p[..], &addr) + } else { + (&p[..], &addr2) + } + }) .collect(); - let sent = send_mmsg(&sender, &packet_refs).ok(); - assert_eq!(sent, Some(32)); + let sent = batch_send(&sender, &packet_refs[..]).ok(); + assert_eq!(sent, Some(())); let mut packets = vec![Packet::default(); 32]; let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1; @@ -220,13 +232,13 @@ mod tests { let packet = Packet::default(); - let sent = multicast( + let sent = multi_target_send( &sender, &packet.data[..packet.meta.size], &[&addr, &addr2, &addr3, &addr4], ) .ok(); - assert_eq!(sent, Some(4)); + assert_eq!(sent, Some(())); let mut packets = vec![Packet::default(); 32]; let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1; @@ -246,12 +258,108 @@ mod tests { } #[test] - fn test_message_header_from_packet() { - let packets: Vec<_> = (0..2).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect(); - let ip4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); - let ip6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), 8080); - let packet_refs: Vec<_> = vec![(&packets[0], &ip4), (&packets[1], &ip6)]; - let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); - send_mmsg(&sender, &packet_refs).unwrap(); + fn test_intermediate_failures_mismatched_bind() { + let packets: Vec<_> = (0..3).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect(); + let ip4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8080); + let ip6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 8080); + let packet_refs: Vec<_> = vec![ + (&packets[0][..], &ip4), + (&packets[1][..], &ip6), + (&packets[2][..], &ip4), + ]; + let dest_refs: Vec<_> = vec![&ip4, &ip6, &ip4]; + + let sender = UdpSocket::bind("0.0.0.0:0").expect("bind"); + if let Err(SendPktsError::IoError(_, num_failed)) = batch_send(&sender, &packet_refs[..]) { + assert_eq!(num_failed, 1); + } + if let Err(SendPktsError::IoError(_, num_failed)) = + multi_target_send(&sender, &packets[0], &dest_refs) + { + assert_eq!(num_failed, 1); + } + } + + #[test] + fn test_intermediate_failures_unreachable_address() { + let packets: Vec<_> = (0..5).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect(); + let ipv4local = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8080); + let ipv4broadcast = SocketAddr::new(IpAddr::V4(Ipv4Addr::BROADCAST), 8080); + let sender = UdpSocket::bind("0.0.0.0:0").expect("bind"); + + // test intermediate failures for batch_send + let packet_refs: Vec<_> = vec![ + (&packets[0][..], &ipv4local), + (&packets[1][..], &ipv4broadcast), + (&packets[2][..], &ipv4local), + (&packets[3][..], &ipv4broadcast), + (&packets[4][..], &ipv4local), + ]; + if let Err(SendPktsError::IoError(ioerror, num_failed)) = + batch_send(&sender, &packet_refs[..]) + { + assert!(matches!(ioerror.kind(), ErrorKind::PermissionDenied)); + assert_eq!(num_failed, 2); + } + + // test leading and trailing failures for batch_send + let packet_refs: Vec<_> = vec![ + (&packets[0][..], &ipv4broadcast), + (&packets[1][..], &ipv4local), + (&packets[2][..], &ipv4broadcast), + (&packets[3][..], &ipv4local), + (&packets[4][..], &ipv4broadcast), + ]; + if let Err(SendPktsError::IoError(ioerror, num_failed)) = + batch_send(&sender, &packet_refs[..]) + { + assert!(matches!(ioerror.kind(), ErrorKind::PermissionDenied)); + assert_eq!(num_failed, 3); + } + + // test consecutive intermediate failures for batch_send + let packet_refs: Vec<_> = vec![ + (&packets[0][..], &ipv4local), + (&packets[1][..], &ipv4local), + (&packets[2][..], &ipv4broadcast), + (&packets[3][..], &ipv4broadcast), + (&packets[4][..], &ipv4local), + ]; + if let Err(SendPktsError::IoError(ioerror, num_failed)) = + batch_send(&sender, &packet_refs[..]) + { + assert!(matches!(ioerror.kind(), ErrorKind::PermissionDenied)); + assert_eq!(num_failed, 2); + } + + // test intermediate failures for multi_target_send + let dest_refs: Vec<_> = vec![ + &ipv4local, + &ipv4broadcast, + &ipv4local, + &ipv4broadcast, + &ipv4local, + ]; + if let Err(SendPktsError::IoError(ioerror, num_failed)) = + multi_target_send(&sender, &packets[0], &dest_refs) + { + assert!(matches!(ioerror.kind(), ErrorKind::PermissionDenied)); + assert_eq!(num_failed, 2); + } + + // test leading and trailing failures for multi_target_send + let dest_refs: Vec<_> = vec![ + &ipv4broadcast, + &ipv4local, + &ipv4broadcast, + &ipv4local, + &ipv4broadcast, + ]; + if let Err(SendPktsError::IoError(ioerror, num_failed)) = + multi_target_send(&sender, &packets[0], &dest_refs) + { + assert!(matches!(ioerror.kind(), ErrorKind::PermissionDenied)); + assert_eq!(num_failed, 3); + } } }