allows sendmmsg api taking owned values (as well as references) (#18999)
Current signature of api in sendmmsg requires a slice of inner references: https://github.com/solana-labs/solana/blob/fe1ee4980/streamer/src/sendmmsg.rs#L130-L152 That forces the call-site to convert owned values to references even though doing so is redundant and adds an extra level of indirection: https://github.com/solana-labs/solana/blob/fe1ee4980/core/src/repair_service.rs#L291 This commit expands the api using AsRef and Borrow traits to allow calling the method with owned values (as well as references like before).
This commit is contained in:
		@@ -414,7 +414,7 @@ pub fn broadcast_shreds(
 | 
			
		||||
            let seed = shred.seed(Some(self_pubkey), &root_bank);
 | 
			
		||||
            let node = cluster_nodes.get_broadcast_peer(seed)?;
 | 
			
		||||
            if socket_addr_space.check(&node.tvu) {
 | 
			
		||||
                Some((&shred.payload[..], &node.tvu))
 | 
			
		||||
                Some((&shred.payload, node.tvu))
 | 
			
		||||
            } else {
 | 
			
		||||
                None
 | 
			
		||||
            }
 | 
			
		||||
 
 | 
			
		||||
@@ -288,7 +288,6 @@ impl RepairService {
 | 
			
		||||
                    })
 | 
			
		||||
                    .collect()
 | 
			
		||||
            };
 | 
			
		||||
            let batch: Vec<(&[u8], &SocketAddr)> = batch.iter().map(|(v, s)| (&v[..], s)).collect();
 | 
			
		||||
            build_repairs_batch_elapsed.stop();
 | 
			
		||||
 | 
			
		||||
            let mut batch_send_repairs_elapsed = Measure::start("batch_send_repairs_elapsed");
 | 
			
		||||
 
 | 
			
		||||
@@ -1250,13 +1250,13 @@ impl ClusterInfo {
 | 
			
		||||
        let dests: Vec<_> = if forwarded {
 | 
			
		||||
            peers
 | 
			
		||||
                .iter()
 | 
			
		||||
                .map(|peer| &peer.tvu_forwards)
 | 
			
		||||
                .map(|peer| peer.tvu_forwards)
 | 
			
		||||
                .filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space))
 | 
			
		||||
                .collect()
 | 
			
		||||
        } else {
 | 
			
		||||
            peers
 | 
			
		||||
                .iter()
 | 
			
		||||
                .map(|peer| &peer.tvu)
 | 
			
		||||
                .map(|peer| peer.tvu)
 | 
			
		||||
                .filter(|addr| socket_addr_space.check(addr))
 | 
			
		||||
                .collect()
 | 
			
		||||
        };
 | 
			
		||||
 
 | 
			
		||||
@@ -1,8 +1,14 @@
 | 
			
		||||
//! The `sendmmsg` module provides sendmmsg() API implementation
 | 
			
		||||
 | 
			
		||||
use std::io;
 | 
			
		||||
use std::net::{SocketAddr, UdpSocket};
 | 
			
		||||
use thiserror::Error;
 | 
			
		||||
use {
 | 
			
		||||
    std::{
 | 
			
		||||
        borrow::Borrow,
 | 
			
		||||
        io,
 | 
			
		||||
        iter::repeat,
 | 
			
		||||
        net::{SocketAddr, UdpSocket},
 | 
			
		||||
    },
 | 
			
		||||
    thiserror::Error,
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
#[derive(Debug, Error)]
 | 
			
		||||
pub enum SendPktsError {
 | 
			
		||||
@@ -12,11 +18,15 @@ pub enum SendPktsError {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[cfg(not(target_os = "linux"))]
 | 
			
		||||
pub fn batch_send(sock: &UdpSocket, packets: &[(&[u8], &SocketAddr)]) -> Result<(), SendPktsError> {
 | 
			
		||||
pub fn batch_send<S, T>(sock: &UdpSocket, packets: &[(T, S)]) -> Result<(), SendPktsError>
 | 
			
		||||
where
 | 
			
		||||
    S: Borrow<SocketAddr>,
 | 
			
		||||
    T: AsRef<[u8]>,
 | 
			
		||||
{
 | 
			
		||||
    let mut num_failed = 0;
 | 
			
		||||
    let mut erropt = None;
 | 
			
		||||
    for (p, a) in packets {
 | 
			
		||||
        if let Err(e) = sock.send_to(p, *a) {
 | 
			
		||||
        if let Err(e) = sock.send_to(p.as_ref(), a.borrow()) {
 | 
			
		||||
            num_failed += 1;
 | 
			
		||||
            if erropt.is_none() {
 | 
			
		||||
                erropt = Some(e);
 | 
			
		||||
@@ -128,7 +138,11 @@ fn sendmmsg_retry(sock: &UdpSocket, hdrs: &mut Vec<mmsghdr>) -> Result<(), SendP
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[cfg(target_os = "linux")]
 | 
			
		||||
pub fn batch_send(sock: &UdpSocket, packets: &[(&[u8], &SocketAddr)]) -> Result<(), SendPktsError> {
 | 
			
		||||
pub fn batch_send<S, T>(sock: &UdpSocket, packets: &[(T, S)]) -> Result<(), SendPktsError>
 | 
			
		||||
where
 | 
			
		||||
    S: Borrow<SocketAddr>,
 | 
			
		||||
    T: AsRef<[u8]>,
 | 
			
		||||
{
 | 
			
		||||
    // The vectors are allocated with capacity, as later code inserts elements
 | 
			
		||||
    // at specific indices, and uses the address of the vector index in hdrs
 | 
			
		||||
    let mut iovs: Vec<iovec> = Vec::with_capacity(packets.len());
 | 
			
		||||
@@ -136,19 +150,30 @@ pub fn batch_send(sock: &UdpSocket, packets: &[(&[u8], &SocketAddr)]) -> Result<
 | 
			
		||||
    let mut hdrs: Vec<mmsghdr> = Vec::with_capacity(packets.len());
 | 
			
		||||
 | 
			
		||||
    for (pkt, dest) in packets.iter() {
 | 
			
		||||
        mmsghdr_for_packet(pkt, dest, &mut iovs, &mut addrs, &mut hdrs);
 | 
			
		||||
        mmsghdr_for_packet(
 | 
			
		||||
            pkt.as_ref(),
 | 
			
		||||
            dest.borrow(),
 | 
			
		||||
            &mut iovs,
 | 
			
		||||
            &mut addrs,
 | 
			
		||||
            &mut hdrs,
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    sendmmsg_retry(sock, &mut hdrs)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub fn multi_target_send(
 | 
			
		||||
pub fn multi_target_send<S, T>(
 | 
			
		||||
    sock: &UdpSocket,
 | 
			
		||||
    packet: &[u8],
 | 
			
		||||
    dests: &[&SocketAddr],
 | 
			
		||||
) -> Result<(), SendPktsError> {
 | 
			
		||||
    let pkts: Vec<_> = dests.iter().map(|addr| (packet, *addr)).collect();
 | 
			
		||||
    batch_send(sock, &pkts[..])
 | 
			
		||||
    packet: T,
 | 
			
		||||
    dests: &[S],
 | 
			
		||||
) -> Result<(), SendPktsError>
 | 
			
		||||
where
 | 
			
		||||
    S: Borrow<SocketAddr>,
 | 
			
		||||
    T: AsRef<[u8]>,
 | 
			
		||||
{
 | 
			
		||||
    let dests = dests.iter().map(Borrow::borrow);
 | 
			
		||||
    let pkts: Vec<_> = repeat(&packet).zip(dests).collect();
 | 
			
		||||
    batch_send(sock, &pkts)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[cfg(test)]
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user