From 364781366a26b861da3ab2640dad7b70a759e46d Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Thu, 10 Oct 2019 19:38:48 -0700 Subject: [PATCH] Use sendmmsg for broadcasting shreds (#6325) * Replace packet with slice of data in sendmmsg * fixes * fix bench --- core/benches/cluster_info.rs | 3 +- .../fail_entry_verification_broadcast_run.rs | 2 +- .../broadcast_stage/standard_broadcast_run.rs | 2 +- core/src/cluster_info.rs | 31 ++++++---- core/src/sendmmsg.rs | 60 ++++++++----------- 5 files changed, 49 insertions(+), 49 deletions(-) diff --git a/core/benches/cluster_info.rs b/core/benches/cluster_info.rs index 6e21b932dd..207c9a0d5c 100644 --- a/core/benches/cluster_info.rs +++ b/core/benches/cluster_info.rs @@ -32,8 +32,9 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) { stakes.insert(id, thread_rng().gen_range(1, NUM_PEERS) as u64); } bencher.iter(move || { + let shreds = shreds.clone(); cluster_info - .broadcast_shreds(&socket, &shreds, &seeds, Some(&stakes)) + .broadcast_shreds(&socket, shreds, &seeds, Some(&stakes)) .unwrap(); }); } diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index 14bd2519d6..6c1b37e71d 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -74,7 +74,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { // Broadcast data cluster_info.read().unwrap().broadcast_shreds( sock, - &all_shred_bufs, + all_shred_bufs, &all_seeds, stakes.as_ref(), )?; diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index f21f612262..fd0310fe6d 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -201,7 +201,7 @@ impl StandardBroadcastRun { cluster_info.read().unwrap().broadcast_shreds( sock, - &all_shred_bufs, + all_shred_bufs, &all_seeds, stakes.as_ref(), )?; diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 613ccd6f14..49ad5fb3a1 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -22,7 +22,7 @@ use crate::crds_value::{CrdsValue, CrdsValueLabel, EpochSlots, Vote}; use crate::packet::{to_shared_blob, Blob, Packet, SharedBlob}; use crate::repair_service::RepairType; use crate::result::{Error, Result}; -use crate::sendmmsg::multicast; +use crate::sendmmsg::{multicast, send_mmsg}; use crate::staking_utils; use crate::streamer::{BlobReceiver, BlobSender}; use crate::weighted_shuffle::{weighted_best, weighted_shuffle}; @@ -709,27 +709,36 @@ impl ClusterInfo { pub fn broadcast_shreds( &self, s: &UdpSocket, - shreds: &[Vec], + shreds: Vec>, seeds: &[[u8; 32]], stakes: Option<&HashMap>, ) -> Result<()> { - let mut last_err = Ok(()); let (peers, peers_and_stakes) = self.sorted_tvu_peers_and_stakes(stakes); let broadcast_len = peers_and_stakes.len(); if broadcast_len == 0 { datapoint_info!("cluster_info-num_nodes", ("count", 1, i64)); return Ok(()); } - shreds.iter().zip(seeds).for_each(|(shred, seed)| { - let broadcast_index = weighted_best(&peers_and_stakes, ChaChaRng::from_seed(*seed)); + let mut packets: Vec<_> = shreds + .into_iter() + .zip(seeds) + .map(|(shred, seed)| { + let broadcast_index = weighted_best(&peers_and_stakes, ChaChaRng::from_seed(*seed)); - if let Err(e) = s.send_to(shred, &peers[broadcast_index].tvu) { - trace!("{}: broadcast result {:?}", self.id(), e); - last_err = Err(e); + (shred, &peers[broadcast_index].tvu) + }) + .collect(); + + let mut sent = 0; + while sent < packets.len() { + match send_mmsg(s, &mut packets[sent..]) { + Ok(n) => sent += n, + Err(e) => { + return Err(Error::IO(e)); + } } - }); + } - last_err?; datapoint_debug!("cluster_info-num_nodes", ("count", broadcast_len + 1, i64)); Ok(()) } @@ -753,7 +762,7 @@ impl ClusterInfo { let mut sent = 0; while sent < dests.len() { - match multicast(s, packet, &dests[sent..]) { + match multicast(s, &mut packet.data[..packet.meta.size], &dests[sent..]) { Ok(n) => sent += n, Err(e) => { inc_new_counter_error!( diff --git a/core/src/sendmmsg.rs b/core/src/sendmmsg.rs index 9b5298b0ef..e43a4adc1c 100644 --- a/core/src/sendmmsg.rs +++ b/core/src/sendmmsg.rs @@ -1,15 +1,13 @@ //! The `sendmmsg` module provides sendmmsg() API implementation -use crate::packet::Packet; use std::io; use std::net::{SocketAddr, UdpSocket}; #[cfg(not(target_os = "linux"))] -pub fn send_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result { +pub fn send_mmsg(sock: &UdpSocket, packets: &mut [(Vec, &SocketAddr)]) -> io::Result { let count = packets.len(); - for p in packets { - let a = p.meta.addr(); - sock.send_to(&p.data[..p.meta.size], &a)?; + for (p, a) in packets { + sock.send_to(p, *a)?; } Ok(count) @@ -20,7 +18,7 @@ use libc::{iovec, mmsghdr, sockaddr_in, sockaddr_in6}; #[cfg(target_os = "linux")] fn mmsghdr_for_packet( - packet: &mut Packet, + packet: &mut [u8], dest: &SocketAddr, index: usize, addr_in_len: u32, @@ -34,14 +32,14 @@ fn mmsghdr_for_packet( use std::mem; iovs.push(iovec { - iov_base: packet.data.as_mut_ptr() as *mut c_void, - iov_len: packet.data.len(), + iov_base: packet.as_mut_ptr() as *mut c_void, + iov_len: packet.len(), }); let mut hdr: mmsghdr = unsafe { mem::zeroed() }; hdr.msg_hdr.msg_iov = &mut iovs[index]; hdr.msg_hdr.msg_iovlen = 1; - hdr.msg_len = packet.data.len() as u32; + hdr.msg_len = packet.len() as u32; match InetAddr::from_std(dest) { InetAddr::V4(addr) => { @@ -57,8 +55,9 @@ fn mmsghdr_for_packet( }; hdr } + #[cfg(target_os = "linux")] -pub fn send_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result { +pub fn send_mmsg(sock: &UdpSocket, packets: &mut [(Vec, &SocketAddr)]) -> io::Result { use libc::{sendmmsg, socklen_t}; use std::mem; use std::os::unix::io::AsRawFd; @@ -76,10 +75,10 @@ pub fn send_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result let mut hdrs: Vec = packets .iter_mut() .enumerate() - .map(|(i, packet)| { + .map(|(i, (packet, dest))| { mmsghdr_for_packet( packet, - &packet.meta.addr(), + dest, i, addr_in_len as u32, addr_in6_len as u32, @@ -98,25 +97,17 @@ pub fn send_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result } #[cfg(not(target_os = "linux"))] -pub fn multicast( - sock: &UdpSocket, - packet: &mut Packet, - dests: &[&SocketAddr], -) -> io::Result { +pub fn multicast(sock: &UdpSocket, packet: &mut [u8], dests: &[&SocketAddr]) -> io::Result { let count = dests.len(); for a in dests { - sock.send_to(&packet.data[..packet.meta.size], a)?; + sock.send_to(packet, a)?; } Ok(count) } #[cfg(target_os = "linux")] -pub fn multicast( - sock: &UdpSocket, - packet: &mut Packet, - dests: &[&SocketAddr], -) -> io::Result { +pub fn multicast(sock: &UdpSocket, packet: &mut [u8], dests: &[&SocketAddr]) -> io::Result { use libc::{sendmmsg, socklen_t}; use std::mem; use std::os::unix::io::AsRawFd; @@ -160,6 +151,7 @@ mod tests { use crate::packet::Packet; use crate::recvmmsg::recv_mmsg; use crate::sendmmsg::{multicast, send_mmsg}; + use solana_sdk::packet::PACKET_DATA_SIZE; use std::net::UdpSocket; #[test] @@ -168,12 +160,8 @@ mod tests { let addr = reader.local_addr().unwrap(); let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); - let mut packets: Vec = (0..32) - .map(|_| { - let mut p = Packet::default(); - p.meta.set_addr(&addr); - p - }) + let mut packets: Vec<_> = (0..32) + .map(|_| (vec![0u8; PACKET_DATA_SIZE], &addr)) .collect(); let sent = send_mmsg(&sender, &mut packets); @@ -194,15 +182,13 @@ mod tests { let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); - let mut packets: Vec = (0..32) + let mut packets: Vec<_> = (0..32) .map(|i| { - let mut p = Packet::default(); if i < 16 { - p.meta.set_addr(&addr); + (vec![0u8; PACKET_DATA_SIZE], &addr) } else { - p.meta.set_addr(&addr2); + (vec![0u8; PACKET_DATA_SIZE], &addr2) } - p }) .collect(); @@ -236,7 +222,11 @@ mod tests { let mut packet = Packet::default(); - let sent = multicast(&sender, &mut packet, &[&addr, &addr2, &addr3, &addr4]); + let sent = multicast( + &sender, + &mut packet.data[..packet.meta.size], + &[&addr, &addr2, &addr3, &addr4], + ); assert_matches!(sent, Ok(4)); let mut packets = vec![Packet::default(); 32];