From e9a993fb59d9496a1b67049d0cfa44ee1f620a62 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Sun, 26 Sep 2021 20:35:13 +0000 Subject: [PATCH] allows sendmmsg api taking owned values (as well as references) (#18999) (#20226) Current signature of api in sendmmsg requires a slice of inner references: https://github.com/solana-labs/solana/blob/fe1ee4980/streamer/src/sendmmsg.rs#L130-L152 That forces the call-site to convert owned values to references even though doing so is redundant and adds an extra level of indirection: https://github.com/solana-labs/solana/blob/fe1ee4980/core/src/repair_service.rs#L291 This commit expands the api using AsRef and Borrow traits to allow calling the method with owned values (as well as references like before). (cherry picked from commit 049fb0417f3e4a21181f159ccc146d63265794d3) Co-authored-by: behzad nouri --- core/src/broadcast_stage.rs | 2 +- core/src/repair_service.rs | 1 - gossip/src/cluster_info.rs | 4 +-- streamer/src/sendmmsg.rs | 51 +++++++++++++++++++++++++++---------- 4 files changed, 41 insertions(+), 17 deletions(-) diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 14b5106375..316edfb9dc 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -411,7 +411,7 @@ pub fn broadcast_shreds( let seed = shred.seed(Some(self_pubkey), &root_bank); let node = cluster_nodes.get_broadcast_peer(seed)?; if socket_addr_space.check(&node.tvu) { - Some((&shred.payload[..], &node.tvu)) + Some((&shred.payload, node.tvu)) } else { None } diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 09e997e593..7aa70da19e 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -300,7 +300,6 @@ impl RepairService { }) .collect() }; - let batch: Vec<(&[u8], &SocketAddr)> = batch.iter().map(|(v, s)| (&v[..], s)).collect(); build_repairs_batch_elapsed.stop(); let mut batch_send_repairs_elapsed = Measure::start("batch_send_repairs_elapsed"); diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 35cb9722ef..4f81701101 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -1281,13 +1281,13 @@ impl ClusterInfo { let dests: Vec<_> = if forwarded { peers .iter() - .map(|peer| &peer.tvu_forwards) + .map(|peer| peer.tvu_forwards) .filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space)) .collect() } else { peers .iter() - .map(|peer| &peer.tvu) + .map(|peer| peer.tvu) .filter(|addr| socket_addr_space.check(addr)) .collect() }; diff --git a/streamer/src/sendmmsg.rs b/streamer/src/sendmmsg.rs index d1e4a5875f..7f2526b5b4 100644 --- a/streamer/src/sendmmsg.rs +++ b/streamer/src/sendmmsg.rs @@ -1,8 +1,14 @@ //! The `sendmmsg` module provides sendmmsg() API implementation -use std::io; -use std::net::{SocketAddr, UdpSocket}; -use thiserror::Error; +use { + std::{ + borrow::Borrow, + io, + iter::repeat, + net::{SocketAddr, UdpSocket}, + }, + thiserror::Error, +}; #[derive(Debug, Error)] pub enum SendPktsError { @@ -12,11 +18,15 @@ pub enum SendPktsError { } #[cfg(not(target_os = "linux"))] -pub fn batch_send(sock: &UdpSocket, packets: &[(&[u8], &SocketAddr)]) -> Result<(), SendPktsError> { +pub fn batch_send(sock: &UdpSocket, packets: &[(T, S)]) -> Result<(), SendPktsError> +where + S: Borrow, + T: AsRef<[u8]>, +{ let mut num_failed = 0; let mut erropt = None; for (p, a) in packets { - if let Err(e) = sock.send_to(p, *a) { + if let Err(e) = sock.send_to(p.as_ref(), a.borrow()) { num_failed += 1; if erropt.is_none() { erropt = Some(e); @@ -128,7 +138,11 @@ fn sendmmsg_retry(sock: &UdpSocket, hdrs: &mut Vec) -> Result<(), SendP } #[cfg(target_os = "linux")] -pub fn batch_send(sock: &UdpSocket, packets: &[(&[u8], &SocketAddr)]) -> Result<(), SendPktsError> { +pub fn batch_send(sock: &UdpSocket, packets: &[(T, S)]) -> Result<(), SendPktsError> +where + S: Borrow, + T: AsRef<[u8]>, +{ // The vectors are allocated with capacity, as later code inserts elements // at specific indices, and uses the address of the vector index in hdrs let mut iovs: Vec = Vec::with_capacity(packets.len()); @@ -136,19 +150,30 @@ pub fn batch_send(sock: &UdpSocket, packets: &[(&[u8], &SocketAddr)]) -> Result< let mut hdrs: Vec = Vec::with_capacity(packets.len()); for (pkt, dest) in packets.iter() { - mmsghdr_for_packet(pkt, dest, &mut iovs, &mut addrs, &mut hdrs); + mmsghdr_for_packet( + pkt.as_ref(), + dest.borrow(), + &mut iovs, + &mut addrs, + &mut hdrs, + ); } sendmmsg_retry(sock, &mut hdrs) } -pub fn multi_target_send( +pub fn multi_target_send( sock: &UdpSocket, - packet: &[u8], - dests: &[&SocketAddr], -) -> Result<(), SendPktsError> { - let pkts: Vec<_> = dests.iter().map(|addr| (packet, *addr)).collect(); - batch_send(sock, &pkts[..]) + packet: T, + dests: &[S], +) -> Result<(), SendPktsError> +where + S: Borrow, + T: AsRef<[u8]>, +{ + let dests = dests.iter().map(Borrow::borrow); + let pkts: Vec<_> = repeat(&packet).zip(dests).collect(); + batch_send(sock, &pkts) } #[cfg(test)]