diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 0610333e2e..3b42e35b9f 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -207,6 +207,7 @@ impl ServeRepair { response_sender: &PacketSender, stats: &mut ServeRepairStats, max_packets: &mut usize, + repairs_by_peer: &mut HashMap, ) -> Result<()> { //TODO cache connections let timeout = Duration::new(1, 0); @@ -229,7 +230,15 @@ impl ServeRepair { let mut time = Measure::start("repair::handle_packets"); for reqs in reqs_v { - Self::handle_packets(obj, &recycler, blockstore, reqs, response_sender, stats); + Self::handle_packets( + obj, + &recycler, + blockstore, + reqs, + response_sender, + stats, + repairs_by_peer, + ); } time.stop(); if total_packets >= *max_packets { @@ -285,6 +294,7 @@ impl ServeRepair { let mut last_print = Instant::now(); let mut stats = ServeRepairStats::default(); let mut max_packets = 1024; + let mut repairs_by_peer = HashMap::new(); loop { let result = Self::run_listen( &me, @@ -294,6 +304,7 @@ impl ServeRepair { &response_sender, &mut stats, &mut max_packets, + &mut repairs_by_peer, ); match result { Err(Error::RecvTimeoutError(_)) | Ok(_) => {} @@ -305,6 +316,8 @@ impl ServeRepair { if last_print.elapsed().as_secs() > 2 { Self::report_reset_stats(&me, &mut stats); last_print = Instant::now(); + info!("repairs_by_peer: {:#?}", repairs_by_peer); + repairs_by_peer.clear(); } thread_mem_usage::datapoint("solana-repair-listen"); } @@ -319,12 +332,14 @@ impl ServeRepair { packets: Packets, response_sender: &PacketSender, stats: &mut ServeRepairStats, + repairs_by_peer: &mut HashMap, ) { // iter over the packets let allocated = thread_mem_usage::Allocatedp::default(); packets.packets.iter().for_each(|packet| { let start = allocated.get(); let from_addr = packet.meta.addr(); + *repairs_by_peer.entry(from_addr.to_string()).or_insert(0) += 1; limited_deserialize(&packet.data[..packet.meta.size]) .into_iter() .for_each(|request| { diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index 2181a71be9..393f762d0a 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -133,6 +133,7 @@ pub fn responder(name: &'static str, sock: Arc, r: PacketReceiver) -> match e { StreamerError::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, StreamerError::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + StreamerError::IO(_) => (), _ => info!("{} responder error: {:?}", name, e), } }