Look at repair peers

This commit is contained in:
Stephen Akridge
2020-06-12 19:39:05 -07:00
parent f13498b428
commit 0013bfff4e
2 changed files with 17 additions and 1 deletions

View File

@@ -207,6 +207,7 @@ impl ServeRepair {
response_sender: &PacketSender,
stats: &mut ServeRepairStats,
max_packets: &mut usize,
repairs_by_peer: &mut HashMap<String, usize>,
) -> Result<()> {
//TODO cache connections
let timeout = Duration::new(1, 0);
@@ -229,7 +230,15 @@ impl ServeRepair {
let mut time = Measure::start("repair::handle_packets");
for reqs in reqs_v {
Self::handle_packets(obj, &recycler, blockstore, reqs, response_sender, stats);
Self::handle_packets(
obj,
&recycler,
blockstore,
reqs,
response_sender,
stats,
repairs_by_peer,
);
}
time.stop();
if total_packets >= *max_packets {
@@ -285,6 +294,7 @@ impl ServeRepair {
let mut last_print = Instant::now();
let mut stats = ServeRepairStats::default();
let mut max_packets = 1024;
let mut repairs_by_peer = HashMap::new();
loop {
let result = Self::run_listen(
&me,
@@ -294,6 +304,7 @@ impl ServeRepair {
&response_sender,
&mut stats,
&mut max_packets,
&mut repairs_by_peer,
);
match result {
Err(Error::RecvTimeoutError(_)) | Ok(_) => {}
@@ -305,6 +316,8 @@ impl ServeRepair {
if last_print.elapsed().as_secs() > 2 {
Self::report_reset_stats(&me, &mut stats);
last_print = Instant::now();
info!("repairs_by_peer: {:#?}", repairs_by_peer);
repairs_by_peer.clear();
}
thread_mem_usage::datapoint("solana-repair-listen");
}
@@ -319,12 +332,14 @@ impl ServeRepair {
packets: Packets,
response_sender: &PacketSender,
stats: &mut ServeRepairStats,
repairs_by_peer: &mut HashMap<String, usize>,
) {
// iter over the packets
let allocated = thread_mem_usage::Allocatedp::default();
packets.packets.iter().for_each(|packet| {
let start = allocated.get();
let from_addr = packet.meta.addr();
*repairs_by_peer.entry(from_addr.to_string()).or_insert(0) += 1;
limited_deserialize(&packet.data[..packet.meta.size])
.into_iter()
.for_each(|request| {

View File

@@ -133,6 +133,7 @@ pub fn responder(name: &'static str, sock: Arc<UdpSocket>, r: PacketReceiver) ->
match e {
StreamerError::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
StreamerError::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
StreamerError::IO(_) => (),
_ => info!("{} responder error: {:?}", name, e),
}
}