Fix repair dos (#9056)

This commit is contained in:
sakridge
2020-04-01 06:48:35 -07:00
committed by Michael Vines
parent c9030660d6
commit a15fa4840c

View File

@ -9,6 +9,7 @@ use crate::{
use bincode::serialize; use bincode::serialize;
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use solana_ledger::blockstore::Blockstore; use solana_ledger::blockstore::Blockstore;
use solana_measure::measure::Measure;
use solana_measure::thread_mem_usage; use solana_measure::thread_mem_usage;
use solana_metrics::{datapoint_debug, inc_new_counter_debug}; use solana_metrics::{datapoint_debug, inc_new_counter_debug};
use solana_perf::packet::{Packets, PacketsRecycler}; use solana_perf::packet::{Packets, PacketsRecycler};
@ -184,12 +185,33 @@ impl ServeRepair {
blockstore: Option<&Arc<Blockstore>>, blockstore: Option<&Arc<Blockstore>>,
requests_receiver: &PacketReceiver, requests_receiver: &PacketReceiver,
response_sender: &PacketSender, response_sender: &PacketSender,
max_packets: &mut usize,
) -> Result<()> { ) -> Result<()> {
//TODO cache connections //TODO cache connections
let timeout = Duration::new(1, 0); let timeout = Duration::new(1, 0);
let reqs = requests_receiver.recv_timeout(timeout)?; let mut reqs_v = vec![requests_receiver.recv_timeout(timeout)?];
let mut total_packets = reqs_v[0].packets.len();
Self::handle_packets(obj, &recycler, blockstore, reqs, response_sender); while let Ok(more) = requests_receiver.try_recv() {
total_packets += more.packets.len();
if total_packets < *max_packets {
// Drop the rest in the channel in case of dos
reqs_v.push(more);
}
}
let mut time = Measure::start("repair::handle_packets");
for reqs in reqs_v {
Self::handle_packets(obj, &recycler, blockstore, reqs, response_sender);
}
time.stop();
if total_packets >= *max_packets {
if time.as_ms() > 1000 {
*max_packets = (*max_packets * 9) / 10;
} else {
*max_packets = (*max_packets * 10) / 9;
}
}
Ok(()) Ok(())
} }
@ -204,22 +226,26 @@ impl ServeRepair {
let recycler = PacketsRecycler::default(); let recycler = PacketsRecycler::default();
Builder::new() Builder::new()
.name("solana-repair-listen".to_string()) .name("solana-repair-listen".to_string())
.spawn(move || loop { .spawn(move || {
let result = Self::run_listen( let mut max_packets = 1024;
&me, loop {
&recycler, let result = Self::run_listen(
blockstore.as_ref(), &me,
&requests_receiver, &recycler,
&response_sender, blockstore.as_ref(),
); &requests_receiver,
match result { &response_sender,
Err(Error::RecvTimeoutError(_)) | Ok(_) => {} &mut max_packets,
Err(err) => info!("repair listener error: {:?}", err), );
}; match result {
if exit.load(Ordering::Relaxed) { Err(Error::RecvTimeoutError(_)) | Ok(_) => {}
return; Err(err) => info!("repair listener error: {:?}", err),
};
if exit.load(Ordering::Relaxed) {
return;
}
thread_mem_usage::datapoint("solana-repair-listen");
} }
thread_mem_usage::datapoint("solana-repair-listen");
}) })
.unwrap() .unwrap()
} }