@ -8,6 +8,7 @@ use crate::{
|
|||||||
};
|
};
|
||||||
use bincode::serialize;
|
use bincode::serialize;
|
||||||
use solana_ledger::blockstore::Blockstore;
|
use solana_ledger::blockstore::Blockstore;
|
||||||
|
use solana_measure::measure::Measure;
|
||||||
use solana_measure::thread_mem_usage;
|
use solana_measure::thread_mem_usage;
|
||||||
use solana_metrics::{datapoint_debug, inc_new_counter_debug};
|
use solana_metrics::{datapoint_debug, inc_new_counter_debug};
|
||||||
use solana_perf::packet::{limited_deserialize, Packet, Packets, PacketsRecycler};
|
use solana_perf::packet::{limited_deserialize, Packet, Packets, PacketsRecycler};
|
||||||
@ -50,6 +51,7 @@ impl RepairType {
|
|||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct ServeRepairStats {
|
pub struct ServeRepairStats {
|
||||||
pub total_packets: usize,
|
pub total_packets: usize,
|
||||||
|
pub dropped_packets: usize,
|
||||||
pub processed: usize,
|
pub processed: usize,
|
||||||
pub self_repair: usize,
|
pub self_repair: usize,
|
||||||
pub window_index: usize,
|
pub window_index: usize,
|
||||||
@ -196,13 +198,39 @@ impl ServeRepair {
|
|||||||
requests_receiver: &PacketReceiver,
|
requests_receiver: &PacketReceiver,
|
||||||
response_sender: &PacketSender,
|
response_sender: &PacketSender,
|
||||||
stats: &mut ServeRepairStats,
|
stats: &mut ServeRepairStats,
|
||||||
|
max_packets: &mut usize,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
//TODO cache connections
|
//TODO cache connections
|
||||||
let timeout = Duration::new(1, 0);
|
let timeout = Duration::new(1, 0);
|
||||||
let reqs = requests_receiver.recv_timeout(timeout)?;
|
let mut reqs_v = vec![requests_receiver.recv_timeout(timeout)?];
|
||||||
stats.total_packets += reqs.packets.len();
|
let mut total_packets = reqs_v[0].packets.len();
|
||||||
|
|
||||||
Self::handle_packets(obj, &recycler, blockstore, reqs, response_sender, stats);
|
let mut dropped_packets = 0;
|
||||||
|
while let Ok(more) = requests_receiver.try_recv() {
|
||||||
|
total_packets += more.packets.len();
|
||||||
|
if total_packets < *max_packets {
|
||||||
|
// Drop the rest in the channel in case of dos
|
||||||
|
reqs_v.push(more);
|
||||||
|
} else {
|
||||||
|
dropped_packets += more.packets.len();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
stats.dropped_packets += dropped_packets;
|
||||||
|
stats.total_packets += total_packets;
|
||||||
|
|
||||||
|
let mut time = Measure::start("repair::handle_packets");
|
||||||
|
for reqs in reqs_v {
|
||||||
|
Self::handle_packets(obj, &recycler, blockstore, reqs, response_sender, stats);
|
||||||
|
}
|
||||||
|
time.stop();
|
||||||
|
if total_packets >= *max_packets {
|
||||||
|
if time.as_ms() > 1000 {
|
||||||
|
*max_packets = (*max_packets * 9) / 10;
|
||||||
|
} else {
|
||||||
|
*max_packets = (*max_packets * 10) / 9;
|
||||||
|
}
|
||||||
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -216,6 +244,9 @@ impl ServeRepair {
|
|||||||
inc_new_counter_debug!("serve_repair-handle-repair--eq", stats.self_repair);
|
inc_new_counter_debug!("serve_repair-handle-repair--eq", stats.self_repair);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inc_new_counter_info!("serve_repair-total_packets", stats.total_packets);
|
||||||
|
inc_new_counter_info!("serve_repair-dropped_packets", stats.dropped_packets);
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
"repair_listener: total_packets: {} passed: {}",
|
"repair_listener: total_packets: {} passed: {}",
|
||||||
stats.total_packets, stats.processed
|
stats.total_packets, stats.processed
|
||||||
@ -245,6 +276,7 @@ impl ServeRepair {
|
|||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
let mut last_print = Instant::now();
|
let mut last_print = Instant::now();
|
||||||
let mut stats = ServeRepairStats::default();
|
let mut stats = ServeRepairStats::default();
|
||||||
|
let mut max_packets = 1024;
|
||||||
loop {
|
loop {
|
||||||
let result = Self::run_listen(
|
let result = Self::run_listen(
|
||||||
&me,
|
&me,
|
||||||
@ -253,6 +285,7 @@ impl ServeRepair {
|
|||||||
&requests_receiver,
|
&requests_receiver,
|
||||||
&response_sender,
|
&response_sender,
|
||||||
&mut stats,
|
&mut stats,
|
||||||
|
&mut max_packets,
|
||||||
);
|
);
|
||||||
match result {
|
match result {
|
||||||
Err(Error::RecvTimeoutError(_)) | Ok(_) => {}
|
Err(Error::RecvTimeoutError(_)) | Ok(_) => {}
|
||||||
|
Reference in New Issue
Block a user