diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index 4ac711ced3..81535c8d2c 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -3,8 +3,9 @@ use { crate::{ - packet::{self, send_to, PacketBatch, PacketBatchRecycler, PACKETS_PER_BATCH}, + packet::{self, PacketBatch, PacketBatchRecycler, PACKETS_PER_BATCH}, recvmmsg::NUM_RCVMMSGS, + sendmmsg::{batch_send, SendPktsError}, socket::SocketAddrSpace, }, crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender}, @@ -37,6 +38,9 @@ pub enum StreamerError { #[error("send packets error")] Send(#[from] SendError), + + #[error(transparent)] + SendPktsError(#[from] SendPktsError), } pub type Result = std::result::Result; @@ -242,7 +246,13 @@ fn recv_send( if let Some(stats) = stats { packet_batch.packets.iter().for_each(|p| stats.record(p)); } - send_to(&packet_batch, sock, socket_addr_space)?; + let packets = packet_batch.packets.iter().filter_map(|pkt| { + let addr = pkt.meta.addr(); + socket_addr_space + .check(&addr) + .then(|| (&pkt.data[..pkt.meta.size], addr)) + }); + batch_send(sock, &packets.collect::>())?; Ok(()) }