allows sendmmsg api taking owned values (as well as references) (#18999) (#20226)

Current signature of api in sendmmsg requires a slice of inner
references:
https://github.com/solana-labs/solana/blob/fe1ee4980/streamer/src/sendmmsg.rs#L130-L152

That forces the call-site to convert owned values to references even
though doing so is redundant and adds an extra level of indirection:
https://github.com/solana-labs/solana/blob/fe1ee4980/core/src/repair_service.rs#L291

This commit expands the api using AsRef and Borrow traits to allow
calling the method with owned values (as well as references like
before).

(cherry picked from commit 049fb0417f)

Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
mergify[bot]
2021-09-26 20:35:13 +00:00
committed by GitHub
parent 88177d33fd
commit e9a993fb59
4 changed files with 41 additions and 17 deletions

View File

@ -411,7 +411,7 @@ pub fn broadcast_shreds(
let seed = shred.seed(Some(self_pubkey), &root_bank);
let node = cluster_nodes.get_broadcast_peer(seed)?;
if socket_addr_space.check(&node.tvu) {
Some((&shred.payload[..], &node.tvu))
Some((&shred.payload, node.tvu))
} else {
None
}

View File

@ -300,7 +300,6 @@ impl RepairService {
})
.collect()
};
let batch: Vec<(&[u8], &SocketAddr)> = batch.iter().map(|(v, s)| (&v[..], s)).collect();
build_repairs_batch_elapsed.stop();
let mut batch_send_repairs_elapsed = Measure::start("batch_send_repairs_elapsed");

View File

@ -1281,13 +1281,13 @@ impl ClusterInfo {
let dests: Vec<_> = if forwarded {
peers
.iter()
.map(|peer| &peer.tvu_forwards)
.map(|peer| peer.tvu_forwards)
.filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space))
.collect()
} else {
peers
.iter()
.map(|peer| &peer.tvu)
.map(|peer| peer.tvu)
.filter(|addr| socket_addr_space.check(addr))
.collect()
};

View File

@ -1,8 +1,14 @@
//! The `sendmmsg` module provides sendmmsg() API implementation
use std::io;
use std::net::{SocketAddr, UdpSocket};
use thiserror::Error;
use {
std::{
borrow::Borrow,
io,
iter::repeat,
net::{SocketAddr, UdpSocket},
},
thiserror::Error,
};
#[derive(Debug, Error)]
pub enum SendPktsError {
@ -12,11 +18,15 @@ pub enum SendPktsError {
}
#[cfg(not(target_os = "linux"))]
pub fn batch_send(sock: &UdpSocket, packets: &[(&[u8], &SocketAddr)]) -> Result<(), SendPktsError> {
pub fn batch_send<S, T>(sock: &UdpSocket, packets: &[(T, S)]) -> Result<(), SendPktsError>
where
S: Borrow<SocketAddr>,
T: AsRef<[u8]>,
{
let mut num_failed = 0;
let mut erropt = None;
for (p, a) in packets {
if let Err(e) = sock.send_to(p, *a) {
if let Err(e) = sock.send_to(p.as_ref(), a.borrow()) {
num_failed += 1;
if erropt.is_none() {
erropt = Some(e);
@ -128,7 +138,11 @@ fn sendmmsg_retry(sock: &UdpSocket, hdrs: &mut Vec<mmsghdr>) -> Result<(), SendP
}
#[cfg(target_os = "linux")]
pub fn batch_send(sock: &UdpSocket, packets: &[(&[u8], &SocketAddr)]) -> Result<(), SendPktsError> {
pub fn batch_send<S, T>(sock: &UdpSocket, packets: &[(T, S)]) -> Result<(), SendPktsError>
where
S: Borrow<SocketAddr>,
T: AsRef<[u8]>,
{
// The vectors are allocated with capacity, as later code inserts elements
// at specific indices, and uses the address of the vector index in hdrs
let mut iovs: Vec<iovec> = Vec::with_capacity(packets.len());
@ -136,19 +150,30 @@ pub fn batch_send(sock: &UdpSocket, packets: &[(&[u8], &SocketAddr)]) -> Result<
let mut hdrs: Vec<mmsghdr> = Vec::with_capacity(packets.len());
for (pkt, dest) in packets.iter() {
mmsghdr_for_packet(pkt, dest, &mut iovs, &mut addrs, &mut hdrs);
mmsghdr_for_packet(
pkt.as_ref(),
dest.borrow(),
&mut iovs,
&mut addrs,
&mut hdrs,
);
}
sendmmsg_retry(sock, &mut hdrs)
}
pub fn multi_target_send(
pub fn multi_target_send<S, T>(
sock: &UdpSocket,
packet: &[u8],
dests: &[&SocketAddr],
) -> Result<(), SendPktsError> {
let pkts: Vec<_> = dests.iter().map(|addr| (packet, *addr)).collect();
batch_send(sock, &pkts[..])
packet: T,
dests: &[S],
) -> Result<(), SendPktsError>
where
S: Borrow<SocketAddr>,
T: AsRef<[u8]>,
{
let dests = dests.iter().map(Borrow::borrow);
let pkts: Vec<_> = repeat(&packet).zip(dests).collect();
batch_send(sock, &pkts)
}
#[cfg(test)]