@ -207,7 +207,6 @@ impl ServeRepair {
|
|||||||
response_sender: &PacketSender,
|
response_sender: &PacketSender,
|
||||||
stats: &mut ServeRepairStats,
|
stats: &mut ServeRepairStats,
|
||||||
max_packets: &mut usize,
|
max_packets: &mut usize,
|
||||||
repairs_by_peer: &mut HashMap<String, usize>,
|
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
//TODO cache connections
|
//TODO cache connections
|
||||||
let timeout = Duration::new(1, 0);
|
let timeout = Duration::new(1, 0);
|
||||||
@ -230,15 +229,7 @@ impl ServeRepair {
|
|||||||
|
|
||||||
let mut time = Measure::start("repair::handle_packets");
|
let mut time = Measure::start("repair::handle_packets");
|
||||||
for reqs in reqs_v {
|
for reqs in reqs_v {
|
||||||
Self::handle_packets(
|
Self::handle_packets(obj, &recycler, blockstore, reqs, response_sender, stats);
|
||||||
obj,
|
|
||||||
&recycler,
|
|
||||||
blockstore,
|
|
||||||
reqs,
|
|
||||||
response_sender,
|
|
||||||
stats,
|
|
||||||
repairs_by_peer,
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
time.stop();
|
time.stop();
|
||||||
if total_packets >= *max_packets {
|
if total_packets >= *max_packets {
|
||||||
@ -294,7 +285,6 @@ impl ServeRepair {
|
|||||||
let mut last_print = Instant::now();
|
let mut last_print = Instant::now();
|
||||||
let mut stats = ServeRepairStats::default();
|
let mut stats = ServeRepairStats::default();
|
||||||
let mut max_packets = 1024;
|
let mut max_packets = 1024;
|
||||||
let mut repairs_by_peer = HashMap::new();
|
|
||||||
loop {
|
loop {
|
||||||
let result = Self::run_listen(
|
let result = Self::run_listen(
|
||||||
&me,
|
&me,
|
||||||
@ -304,7 +294,6 @@ impl ServeRepair {
|
|||||||
&response_sender,
|
&response_sender,
|
||||||
&mut stats,
|
&mut stats,
|
||||||
&mut max_packets,
|
&mut max_packets,
|
||||||
&mut repairs_by_peer,
|
|
||||||
);
|
);
|
||||||
match result {
|
match result {
|
||||||
Err(Error::RecvTimeoutError(_)) | Ok(_) => {}
|
Err(Error::RecvTimeoutError(_)) | Ok(_) => {}
|
||||||
@ -316,8 +305,6 @@ impl ServeRepair {
|
|||||||
if last_print.elapsed().as_secs() > 2 {
|
if last_print.elapsed().as_secs() > 2 {
|
||||||
Self::report_reset_stats(&me, &mut stats);
|
Self::report_reset_stats(&me, &mut stats);
|
||||||
last_print = Instant::now();
|
last_print = Instant::now();
|
||||||
info!("repairs_by_peer: {:#?}", repairs_by_peer);
|
|
||||||
repairs_by_peer.clear();
|
|
||||||
}
|
}
|
||||||
thread_mem_usage::datapoint("solana-repair-listen");
|
thread_mem_usage::datapoint("solana-repair-listen");
|
||||||
}
|
}
|
||||||
@ -332,14 +319,12 @@ impl ServeRepair {
|
|||||||
packets: Packets,
|
packets: Packets,
|
||||||
response_sender: &PacketSender,
|
response_sender: &PacketSender,
|
||||||
stats: &mut ServeRepairStats,
|
stats: &mut ServeRepairStats,
|
||||||
repairs_by_peer: &mut HashMap<String, usize>,
|
|
||||||
) {
|
) {
|
||||||
// iter over the packets
|
// iter over the packets
|
||||||
let allocated = thread_mem_usage::Allocatedp::default();
|
let allocated = thread_mem_usage::Allocatedp::default();
|
||||||
packets.packets.iter().for_each(|packet| {
|
packets.packets.iter().for_each(|packet| {
|
||||||
let start = allocated.get();
|
let start = allocated.get();
|
||||||
let from_addr = packet.meta.addr();
|
let from_addr = packet.meta.addr();
|
||||||
*repairs_by_peer.entry(from_addr.to_string()).or_insert(0) += 1;
|
|
||||||
limited_deserialize(&packet.data[..packet.meta.size])
|
limited_deserialize(&packet.data[..packet.meta.size])
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.for_each(|request| {
|
.for_each(|request| {
|
||||||
|
@ -133,7 +133,6 @@ pub fn responder(name: &'static str, sock: Arc<UdpSocket>, r: PacketReceiver) ->
|
|||||||
match e {
|
match e {
|
||||||
StreamerError::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
StreamerError::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||||
StreamerError::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
StreamerError::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||||
StreamerError::IO(_) => (),
|
|
||||||
_ => info!("{} responder error: {:?}", name, e),
|
_ => info!("{} responder error: {:?}", name, e),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user