Use sendmmsg for broadcasting shreds (#6325)

* Replace packet with slice of data in sendmmsg

* fixes

* fix bench
This commit is contained in:
Pankaj Garg
2019-10-10 19:38:48 -07:00
committed by GitHub
parent fa64a0b367
commit 364781366a
5 changed files with 49 additions and 49 deletions

View File

@ -32,8 +32,9 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) {
stakes.insert(id, thread_rng().gen_range(1, NUM_PEERS) as u64); stakes.insert(id, thread_rng().gen_range(1, NUM_PEERS) as u64);
} }
bencher.iter(move || { bencher.iter(move || {
let shreds = shreds.clone();
cluster_info cluster_info
.broadcast_shreds(&socket, &shreds, &seeds, Some(&stakes)) .broadcast_shreds(&socket, shreds, &seeds, Some(&stakes))
.unwrap(); .unwrap();
}); });
} }

View File

@ -74,7 +74,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
// Broadcast data // Broadcast data
cluster_info.read().unwrap().broadcast_shreds( cluster_info.read().unwrap().broadcast_shreds(
sock, sock,
&all_shred_bufs, all_shred_bufs,
&all_seeds, &all_seeds,
stakes.as_ref(), stakes.as_ref(),
)?; )?;

View File

@ -201,7 +201,7 @@ impl StandardBroadcastRun {
cluster_info.read().unwrap().broadcast_shreds( cluster_info.read().unwrap().broadcast_shreds(
sock, sock,
&all_shred_bufs, all_shred_bufs,
&all_seeds, &all_seeds,
stakes.as_ref(), stakes.as_ref(),
)?; )?;

View File

@ -22,7 +22,7 @@ use crate::crds_value::{CrdsValue, CrdsValueLabel, EpochSlots, Vote};
use crate::packet::{to_shared_blob, Blob, Packet, SharedBlob}; use crate::packet::{to_shared_blob, Blob, Packet, SharedBlob};
use crate::repair_service::RepairType; use crate::repair_service::RepairType;
use crate::result::{Error, Result}; use crate::result::{Error, Result};
use crate::sendmmsg::multicast; use crate::sendmmsg::{multicast, send_mmsg};
use crate::staking_utils; use crate::staking_utils;
use crate::streamer::{BlobReceiver, BlobSender}; use crate::streamer::{BlobReceiver, BlobSender};
use crate::weighted_shuffle::{weighted_best, weighted_shuffle}; use crate::weighted_shuffle::{weighted_best, weighted_shuffle};
@ -709,27 +709,36 @@ impl ClusterInfo {
pub fn broadcast_shreds( pub fn broadcast_shreds(
&self, &self,
s: &UdpSocket, s: &UdpSocket,
shreds: &[Vec<u8>], shreds: Vec<Vec<u8>>,
seeds: &[[u8; 32]], seeds: &[[u8; 32]],
stakes: Option<&HashMap<Pubkey, u64>>, stakes: Option<&HashMap<Pubkey, u64>>,
) -> Result<()> { ) -> Result<()> {
let mut last_err = Ok(());
let (peers, peers_and_stakes) = self.sorted_tvu_peers_and_stakes(stakes); let (peers, peers_and_stakes) = self.sorted_tvu_peers_and_stakes(stakes);
let broadcast_len = peers_and_stakes.len(); let broadcast_len = peers_and_stakes.len();
if broadcast_len == 0 { if broadcast_len == 0 {
datapoint_info!("cluster_info-num_nodes", ("count", 1, i64)); datapoint_info!("cluster_info-num_nodes", ("count", 1, i64));
return Ok(()); return Ok(());
} }
shreds.iter().zip(seeds).for_each(|(shred, seed)| { let mut packets: Vec<_> = shreds
let broadcast_index = weighted_best(&peers_and_stakes, ChaChaRng::from_seed(*seed)); .into_iter()
.zip(seeds)
.map(|(shred, seed)| {
let broadcast_index = weighted_best(&peers_and_stakes, ChaChaRng::from_seed(*seed));
if let Err(e) = s.send_to(shred, &peers[broadcast_index].tvu) { (shred, &peers[broadcast_index].tvu)
trace!("{}: broadcast result {:?}", self.id(), e); })
last_err = Err(e); .collect();
let mut sent = 0;
while sent < packets.len() {
match send_mmsg(s, &mut packets[sent..]) {
Ok(n) => sent += n,
Err(e) => {
return Err(Error::IO(e));
}
} }
}); }
last_err?;
datapoint_debug!("cluster_info-num_nodes", ("count", broadcast_len + 1, i64)); datapoint_debug!("cluster_info-num_nodes", ("count", broadcast_len + 1, i64));
Ok(()) Ok(())
} }
@ -753,7 +762,7 @@ impl ClusterInfo {
let mut sent = 0; let mut sent = 0;
while sent < dests.len() { while sent < dests.len() {
match multicast(s, packet, &dests[sent..]) { match multicast(s, &mut packet.data[..packet.meta.size], &dests[sent..]) {
Ok(n) => sent += n, Ok(n) => sent += n,
Err(e) => { Err(e) => {
inc_new_counter_error!( inc_new_counter_error!(

View File

@ -1,15 +1,13 @@
//! The `sendmmsg` module provides sendmmsg() API implementation //! The `sendmmsg` module provides sendmmsg() API implementation
use crate::packet::Packet;
use std::io; use std::io;
use std::net::{SocketAddr, UdpSocket}; use std::net::{SocketAddr, UdpSocket};
#[cfg(not(target_os = "linux"))] #[cfg(not(target_os = "linux"))]
pub fn send_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result<usize> { pub fn send_mmsg(sock: &UdpSocket, packets: &mut [(Vec<u8>, &SocketAddr)]) -> io::Result<usize> {
let count = packets.len(); let count = packets.len();
for p in packets { for (p, a) in packets {
let a = p.meta.addr(); sock.send_to(p, *a)?;
sock.send_to(&p.data[..p.meta.size], &a)?;
} }
Ok(count) Ok(count)
@ -20,7 +18,7 @@ use libc::{iovec, mmsghdr, sockaddr_in, sockaddr_in6};
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
fn mmsghdr_for_packet( fn mmsghdr_for_packet(
packet: &mut Packet, packet: &mut [u8],
dest: &SocketAddr, dest: &SocketAddr,
index: usize, index: usize,
addr_in_len: u32, addr_in_len: u32,
@ -34,14 +32,14 @@ fn mmsghdr_for_packet(
use std::mem; use std::mem;
iovs.push(iovec { iovs.push(iovec {
iov_base: packet.data.as_mut_ptr() as *mut c_void, iov_base: packet.as_mut_ptr() as *mut c_void,
iov_len: packet.data.len(), iov_len: packet.len(),
}); });
let mut hdr: mmsghdr = unsafe { mem::zeroed() }; let mut hdr: mmsghdr = unsafe { mem::zeroed() };
hdr.msg_hdr.msg_iov = &mut iovs[index]; hdr.msg_hdr.msg_iov = &mut iovs[index];
hdr.msg_hdr.msg_iovlen = 1; hdr.msg_hdr.msg_iovlen = 1;
hdr.msg_len = packet.data.len() as u32; hdr.msg_len = packet.len() as u32;
match InetAddr::from_std(dest) { match InetAddr::from_std(dest) {
InetAddr::V4(addr) => { InetAddr::V4(addr) => {
@ -57,8 +55,9 @@ fn mmsghdr_for_packet(
}; };
hdr hdr
} }
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
pub fn send_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result<usize> { pub fn send_mmsg(sock: &UdpSocket, packets: &mut [(Vec<u8>, &SocketAddr)]) -> io::Result<usize> {
use libc::{sendmmsg, socklen_t}; use libc::{sendmmsg, socklen_t};
use std::mem; use std::mem;
use std::os::unix::io::AsRawFd; use std::os::unix::io::AsRawFd;
@ -76,10 +75,10 @@ pub fn send_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result<usize>
let mut hdrs: Vec<mmsghdr> = packets let mut hdrs: Vec<mmsghdr> = packets
.iter_mut() .iter_mut()
.enumerate() .enumerate()
.map(|(i, packet)| { .map(|(i, (packet, dest))| {
mmsghdr_for_packet( mmsghdr_for_packet(
packet, packet,
&packet.meta.addr(), dest,
i, i,
addr_in_len as u32, addr_in_len as u32,
addr_in6_len as u32, addr_in6_len as u32,
@ -98,25 +97,17 @@ pub fn send_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result<usize>
} }
#[cfg(not(target_os = "linux"))] #[cfg(not(target_os = "linux"))]
pub fn multicast( pub fn multicast(sock: &UdpSocket, packet: &mut [u8], dests: &[&SocketAddr]) -> io::Result<usize> {
sock: &UdpSocket,
packet: &mut Packet,
dests: &[&SocketAddr],
) -> io::Result<usize> {
let count = dests.len(); let count = dests.len();
for a in dests { for a in dests {
sock.send_to(&packet.data[..packet.meta.size], a)?; sock.send_to(packet, a)?;
} }
Ok(count) Ok(count)
} }
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
pub fn multicast( pub fn multicast(sock: &UdpSocket, packet: &mut [u8], dests: &[&SocketAddr]) -> io::Result<usize> {
sock: &UdpSocket,
packet: &mut Packet,
dests: &[&SocketAddr],
) -> io::Result<usize> {
use libc::{sendmmsg, socklen_t}; use libc::{sendmmsg, socklen_t};
use std::mem; use std::mem;
use std::os::unix::io::AsRawFd; use std::os::unix::io::AsRawFd;
@ -160,6 +151,7 @@ mod tests {
use crate::packet::Packet; use crate::packet::Packet;
use crate::recvmmsg::recv_mmsg; use crate::recvmmsg::recv_mmsg;
use crate::sendmmsg::{multicast, send_mmsg}; use crate::sendmmsg::{multicast, send_mmsg};
use solana_sdk::packet::PACKET_DATA_SIZE;
use std::net::UdpSocket; use std::net::UdpSocket;
#[test] #[test]
@ -168,12 +160,8 @@ mod tests {
let addr = reader.local_addr().unwrap(); let addr = reader.local_addr().unwrap();
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
let mut packets: Vec<Packet> = (0..32) let mut packets: Vec<_> = (0..32)
.map(|_| { .map(|_| (vec![0u8; PACKET_DATA_SIZE], &addr))
let mut p = Packet::default();
p.meta.set_addr(&addr);
p
})
.collect(); .collect();
let sent = send_mmsg(&sender, &mut packets); let sent = send_mmsg(&sender, &mut packets);
@ -194,15 +182,13 @@ mod tests {
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
let mut packets: Vec<Packet> = (0..32) let mut packets: Vec<_> = (0..32)
.map(|i| { .map(|i| {
let mut p = Packet::default();
if i < 16 { if i < 16 {
p.meta.set_addr(&addr); (vec![0u8; PACKET_DATA_SIZE], &addr)
} else { } else {
p.meta.set_addr(&addr2); (vec![0u8; PACKET_DATA_SIZE], &addr2)
} }
p
}) })
.collect(); .collect();
@ -236,7 +222,11 @@ mod tests {
let mut packet = Packet::default(); let mut packet = Packet::default();
let sent = multicast(&sender, &mut packet, &[&addr, &addr2, &addr3, &addr4]); let sent = multicast(
&sender,
&mut packet.data[..packet.meta.size],
&[&addr, &addr2, &addr3, &addr4],
);
assert_matches!(sent, Ok(4)); assert_matches!(sent, Ok(4));
let mut packets = vec![Packet::default(); 32]; let mut packets = vec![Packet::default(); 32];