Process Gossip in parallel and add an upper limit (#8328) (#8345)

automerge
This commit is contained in:
mergify[bot]
2020-02-19 21:54:52 -08:00
committed by GitHub
parent 5fbddd5894
commit c3ac85828b

View File

@ -28,6 +28,9 @@ use crate::{
use bincode::{serialize, serialized_size};
use core::cmp;
use itertools::Itertools;
use rayon::iter::IntoParallelIterator;
use rayon::iter::ParallelIterator;
use rayon::ThreadPool;
use solana_ledger::{bank_forks::BankForks, staking_utils};
use solana_measure::thread_mem_usage;
use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_error};
@ -36,6 +39,7 @@ use solana_net_utils::{
multi_bind_in_range, PortRange,
};
use solana_perf::packet::{to_packets_with_destination, Packets, PacketsRecycler};
use solana_rayon_threadlimit::get_thread_count;
use solana_sdk::{
clock::{Slot, DEFAULT_MS_PER_SLOT},
pubkey::Pubkey,
@ -67,6 +71,10 @@ pub const MAX_BLOOM_SIZE: usize = 1018;
const MAX_PROTOCOL_PAYLOAD_SIZE: u64 = PACKET_DATA_SIZE as u64 - MAX_PROTOCOL_HEADER_SIZE;
/// The largest protocol header size
const MAX_PROTOCOL_HEADER_SIZE: u64 = 214;
/// A hard limit on incoming gossip messages
/// Chosen to be able to handle 1Gbps of pure gossip traffic
/// 128MB/PACKET_DATA_SIZE
const MAX_GOSSIP_TRAFFIC: usize = 128_000_000 / PACKET_DATA_SIZE;
#[derive(Debug, PartialEq, Eq)]
pub enum ClusterInfoError {
@ -1348,10 +1356,22 @@ impl ClusterInfo {
bank_forks: Option<&Arc<RwLock<BankForks>>>,
requests_receiver: &PacketReceiver,
response_sender: &PacketSender,
thread_pool: &ThreadPool,
) -> Result<()> {
//TODO cache connections
let timeout = Duration::new(1, 0);
let reqs = requests_receiver.recv_timeout(timeout)?;
let mut requests = vec![requests_receiver.recv_timeout(timeout)?];
let mut num_requests = requests.last().unwrap().packets.len();
while let Ok(more_reqs) = requests_receiver.try_recv() {
if num_requests >= MAX_GOSSIP_TRAFFIC {
continue;
}
num_requests += more_reqs.packets.len();
requests.push(more_reqs)
}
if num_requests >= MAX_GOSSIP_TRAFFIC {
warn!("Too much gossip traffic, ignoring some messages");
}
let epoch_ms;
let stakes: HashMap<_, _> = match bank_forks {
Some(ref bank_forks) => {
@ -1367,8 +1387,13 @@ impl ClusterInfo {
HashMap::new()
}
};
let sender = response_sender.clone();
thread_pool.install(|| {
requests.into_par_iter().for_each_with(sender, |s, reqs| {
Self::handle_packets(obj, &recycler, &stakes, reqs, s, epoch_ms)
});
});
Self::handle_packets(obj, &recycler, &stakes, reqs, response_sender, epoch_ms);
Ok(())
}
pub fn listen(
@ -1382,26 +1407,33 @@ impl ClusterInfo {
let recycler = PacketsRecycler::default();
Builder::new()
.name("solana-listen".to_string())
.spawn(move || loop {
let e = Self::run_listen(
&me,
&recycler,
bank_forks.as_ref(),
&requests_receiver,
&response_sender,
);
if exit.load(Ordering::Relaxed) {
return;
}
if e.is_err() {
let me = me.read().unwrap();
debug!(
"{}: run_listen timeout, table size: {}",
me.gossip.id,
me.gossip.crds.table.len()
.spawn(move || {
let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.build()
.unwrap();
loop {
let e = Self::run_listen(
&me,
&recycler,
bank_forks.as_ref(),
&requests_receiver,
&response_sender,
&thread_pool,
);
if exit.load(Ordering::Relaxed) {
return;
}
if e.is_err() {
let me = me.read().unwrap();
debug!(
"{}: run_listen timeout, table size: {}",
me.gossip.id,
me.gossip.crds.table.len()
);
}
thread_mem_usage::datapoint("solana-listen");
}
thread_mem_usage::datapoint("solana-listen");
})
.unwrap()
}