diff --git a/bench-streamer/src/main.rs b/bench-streamer/src/main.rs index 70d8adb5d9..7aaed931f5 100644 --- a/bench-streamer/src/main.rs +++ b/bench-streamer/src/main.rs @@ -83,7 +83,7 @@ fn main() -> Result<()> { let (s_reader, r_reader) = channel(); read_channels.push(r_reader); - read_threads.push(receiver(Arc::new(read), &exit, s_reader, "bench-streamer")); + read_threads.push(receiver(Arc::new(read), &exit, s_reader)); } let t_producer1 = producer(&addr, exit.clone()); diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index bb7391303e..fab6152165 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -44,7 +44,7 @@ impl FetchStage { ) -> Self { let tpu_threads = sockets .into_iter() - .map(|socket| streamer::receiver(socket, &exit, sender.clone(), "fetch-stage")); + .map(|socket| streamer::receiver(socket, &exit, sender.clone())); let tpu_via_blobs_threads = tpu_via_blobs_sockets .into_iter() diff --git a/core/src/packet.rs b/core/src/packet.rs index 1bf6f810eb..68802f4276 100644 --- a/core/src/packet.rs +++ b/core/src/packet.rs @@ -65,7 +65,7 @@ impl fmt::Debug for Packet { impl Default for Packet { fn default() -> Packet { Packet { - data: [0u8; PACKET_DATA_SIZE], + data: unsafe { std::mem::uninitialized() }, meta: Meta::default(), } } @@ -126,7 +126,7 @@ pub struct Packets { impl Default for Packets { fn default() -> Packets { Packets { - packets: vec![Packet::default(); NUM_PACKETS], + packets: Vec::with_capacity(NUM_RCVMMSGS), } } } @@ -208,8 +208,7 @@ pub enum BlobError { } impl Packets { - fn run_read_from(&mut self, socket: &UdpSocket) -> Result { - self.packets.resize(NUM_PACKETS, Packet::default()); + pub fn recv_from(&mut self, socket: &UdpSocket) -> Result { let mut i = 0; //DOCUMENTED SIDE-EFFECT //Performance out of the IO without poll @@ -220,11 +219,10 @@ impl Packets { socket.set_nonblocking(false)?; trace!("receiving on {}", socket.local_addr().unwrap()); loop { + self.packets.resize(i + NUM_RCVMMSGS, Packet::default()); match recv_mmsg(socket, &mut self.packets[i..]) { Err(_) if i > 0 => { - inc_new_counter_info!("packets-recv_count", i); - debug!("got {:?} messages on {}", i, socket.local_addr().unwrap()); - return Ok(i); + break; } Err(e) => { trace!("recv_from err {:?}", e); @@ -237,19 +235,16 @@ impl Packets { trace!("got {} packets", npkts); i += npkts; if npkts != NUM_RCVMMSGS || i >= 1024 { - inc_new_counter_info!("packets-recv_count", i); - return Ok(i); + break; } } } } + self.packets.truncate(i); + inc_new_counter_info!("packets-recv_count", i); + Ok(i) } - pub fn recv_from(&mut self, socket: &UdpSocket) -> Result<()> { - let sz = self.run_read_from(socket)?; - self.packets.resize(sz, Packet::default()); - debug!("recv_from: {}", sz); - Ok(()) - } + pub fn send_to(&self, socket: &UdpSocket) -> Result<()> { for p in &self.packets { let a = p.meta.addr(); @@ -615,19 +610,26 @@ mod tests { #[test] pub fn packet_send_recv() { - let reader = UdpSocket::bind("127.0.0.1:0").expect("bind"); - let addr = reader.local_addr().unwrap(); - let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); - let saddr = sender.local_addr().unwrap(); - let p = SharedPackets::default(); - p.write().unwrap().packets.resize(10, Packet::default()); - for m in p.write().unwrap().packets.iter_mut() { + solana_logger::setup(); + let recv_socket = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let addr = recv_socket.local_addr().unwrap(); + let send_socket = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let saddr = send_socket.local_addr().unwrap(); + let mut p = Packets::default(); + + p.packets.resize(10, Packet::default()); + + for m in p.packets.iter_mut() { m.meta.set_addr(&addr); m.meta.size = PACKET_DATA_SIZE; } - p.read().unwrap().send_to(&sender).unwrap(); - p.write().unwrap().recv_from(&reader).unwrap(); - for m in p.write().unwrap().packets.iter_mut() { + p.send_to(&send_socket).unwrap(); + + let recvd = p.recv_from(&recv_socket).unwrap(); + + assert_eq!(recvd, p.packets.len()); + + for m in p.packets { assert_eq!(m.meta.size, PACKET_DATA_SIZE); assert_eq!(m.meta.addr(), saddr); } diff --git a/core/src/replicator.rs b/core/src/replicator.rs index a065ee420e..ea65b3c117 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -129,12 +129,7 @@ fn create_request_processor( let (s_reader, r_reader) = channel(); let (s_responder, r_responder) = channel(); let storage_socket = Arc::new(socket); - let t_receiver = receiver( - storage_socket.clone(), - exit, - s_reader, - "replicator-receiver", - ); + let t_receiver = receiver(storage_socket.clone(), exit, s_reader); thread_handles.push(t_receiver); let t_responder = responder("replicator-responder", storage_socket.clone(), r_responder); diff --git a/core/src/streamer.rs b/core/src/streamer.rs index fd30dc7471..e98cba0b35 100644 --- a/core/src/streamer.rs +++ b/core/src/streamer.rs @@ -6,7 +6,6 @@ use crate::packet::{ }; use crate::result::{Error, Result}; use bincode; -use solana_metrics::{influxdb, submit}; use solana_sdk::timing::duration_as_ms; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; @@ -20,28 +19,17 @@ pub type PacketSender = Sender; pub type BlobSender = Sender; pub type BlobReceiver = Receiver; -fn recv_loop( - sock: &UdpSocket, - exit: Arc, - channel: &PacketSender, - channel_tag: &'static str, -) -> Result<()> { +fn recv_loop(sock: &UdpSocket, exit: Arc, channel: &PacketSender) -> Result<()> { loop { - let msgs = SharedPackets::default(); + let mut msgs = Packets::default(); loop { // Check for exit signal, even if socket is busy // (for instance the leader trasaction socket) if exit.load(Ordering::Relaxed) { return Ok(()); } - if msgs.write().unwrap().recv_from(sock).is_ok() { - let len = msgs.read().unwrap().packets.len(); - submit( - influxdb::Point::new(channel_tag) - .add_field("count", influxdb::Value::Integer(len as i64)) - .to_owned(), - ); - channel.send(msgs)?; + if let Ok(_len) = msgs.recv_from(sock) { + channel.send(Arc::new(RwLock::new(msgs)))?; break; } } @@ -52,7 +40,6 @@ pub fn receiver( sock: Arc, exit: &Arc, packet_sender: PacketSender, - sender_tag: &'static str, ) -> JoinHandle<()> { let res = sock.set_read_timeout(Some(Duration::new(1, 0))); if res.is_err() { @@ -62,7 +49,7 @@ pub fn receiver( Builder::new() .name("solana-receiver".to_string()) .spawn(move || { - let _ = recv_loop(&sock, exit, &packet_sender, sender_tag); + let _ = recv_loop(&sock, exit, &packet_sender); }) .unwrap() } @@ -236,7 +223,7 @@ mod test { let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); let exit = Arc::new(AtomicBool::new(false)); let (s_reader, r_reader) = channel(); - let t_receiver = receiver(Arc::new(read), &exit, s_reader, "streamer-test"); + let t_receiver = receiver(Arc::new(read), &exit, s_reader); let t_responder = { let (s_responder, r_responder) = channel(); let t_responder = responder("streamer_send_test", Arc::new(send), r_responder); diff --git a/metrics/src/counter.rs b/metrics/src/counter.rs index 2002a5a3fc..eb2e036c9e 100644 --- a/metrics/src/counter.rs +++ b/metrics/src/counter.rs @@ -38,6 +38,13 @@ macro_rules! inc_counter { }; } +#[macro_export] +macro_rules! inc_counter_info { + ($name:expr, $count:expr) => { + unsafe { $name.inc(log::Level::Info, $count) }; + }; +} + #[macro_export] macro_rules! inc_new_counter { ($name:expr, $count:expr, $level:expr, $lograte:expr) => {{ @@ -89,7 +96,7 @@ impl Counter { self.lograte.store(lograte, Ordering::Relaxed); } if times % lograte == 0 && times > 0 && log_enabled!(level) { - info!( + log!(level, "COUNTER:{{\"name\": \"{}\", \"counts\": {}, \"samples\": {}, \"now\": {}, \"events\": {}}}", self.name, counts + events,