Process Gossip in parallel and add an upper limit (#8328)

This commit is contained in:
Sagar Dhawan 2020-02-19 21:31:55 -06:00 committed by GitHub
parent 3e96d59359
commit 221866f74e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -29,6 +29,9 @@ use bincode::{serialize, serialized_size};
use compression::prelude::*; use compression::prelude::*;
use core::cmp; use core::cmp;
use itertools::Itertools; use itertools::Itertools;
use rayon::iter::IntoParallelIterator;
use rayon::iter::ParallelIterator;
use rayon::ThreadPool;
use solana_ledger::{bank_forks::BankForks, staking_utils}; use solana_ledger::{bank_forks::BankForks, staking_utils};
use solana_measure::thread_mem_usage; use solana_measure::thread_mem_usage;
use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_error}; use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_error};
@ -37,6 +40,7 @@ use solana_net_utils::{
multi_bind_in_range, PortRange, multi_bind_in_range, PortRange,
}; };
use solana_perf::packet::{to_packets_with_destination, Packets, PacketsRecycler}; use solana_perf::packet::{to_packets_with_destination, Packets, PacketsRecycler};
use solana_rayon_threadlimit::get_thread_count;
use solana_sdk::{ use solana_sdk::{
clock::{Slot, DEFAULT_MS_PER_SLOT}, clock::{Slot, DEFAULT_MS_PER_SLOT},
pubkey::Pubkey, pubkey::Pubkey,
@ -68,6 +72,10 @@ pub const MAX_BLOOM_SIZE: usize = 1018;
const MAX_PROTOCOL_PAYLOAD_SIZE: u64 = PACKET_DATA_SIZE as u64 - MAX_PROTOCOL_HEADER_SIZE; const MAX_PROTOCOL_PAYLOAD_SIZE: u64 = PACKET_DATA_SIZE as u64 - MAX_PROTOCOL_HEADER_SIZE;
/// The largest protocol header size /// The largest protocol header size
const MAX_PROTOCOL_HEADER_SIZE: u64 = 214; const MAX_PROTOCOL_HEADER_SIZE: u64 = 214;
/// A hard limit on incoming gossip messages
/// Chosen to be able to handle 1Gbps of pure gossip traffic
/// 128MB/PACKET_DATA_SIZE
const MAX_GOSSIP_TRAFFIC: usize = 128_000_000 / PACKET_DATA_SIZE;
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
pub enum ClusterInfoError { pub enum ClusterInfoError {
@ -1424,10 +1432,22 @@ impl ClusterInfo {
bank_forks: Option<&Arc<RwLock<BankForks>>>, bank_forks: Option<&Arc<RwLock<BankForks>>>,
requests_receiver: &PacketReceiver, requests_receiver: &PacketReceiver,
response_sender: &PacketSender, response_sender: &PacketSender,
thread_pool: &ThreadPool,
) -> Result<()> { ) -> Result<()> {
//TODO cache connections //TODO cache connections
let timeout = Duration::new(1, 0); let timeout = Duration::new(1, 0);
let reqs = requests_receiver.recv_timeout(timeout)?; let mut requests = vec![requests_receiver.recv_timeout(timeout)?];
let mut num_requests = requests.last().unwrap().packets.len();
while let Ok(more_reqs) = requests_receiver.try_recv() {
if num_requests >= MAX_GOSSIP_TRAFFIC {
continue;
}
num_requests += more_reqs.packets.len();
requests.push(more_reqs)
}
if num_requests >= MAX_GOSSIP_TRAFFIC {
warn!("Too much gossip traffic, ignoring some messages");
}
let epoch_ms; let epoch_ms;
let stakes: HashMap<_, _> = match bank_forks { let stakes: HashMap<_, _> = match bank_forks {
Some(ref bank_forks) => { Some(ref bank_forks) => {
@ -1443,8 +1463,13 @@ impl ClusterInfo {
HashMap::new() HashMap::new()
} }
}; };
let sender = response_sender.clone();
thread_pool.install(|| {
requests.into_par_iter().for_each_with(sender, |s, reqs| {
Self::handle_packets(obj, &recycler, &stakes, reqs, s, epoch_ms)
});
});
Self::handle_packets(obj, &recycler, &stakes, reqs, response_sender, epoch_ms);
Ok(()) Ok(())
} }
pub fn listen( pub fn listen(
@ -1458,26 +1483,33 @@ impl ClusterInfo {
let recycler = PacketsRecycler::default(); let recycler = PacketsRecycler::default();
Builder::new() Builder::new()
.name("solana-listen".to_string()) .name("solana-listen".to_string())
.spawn(move || loop { .spawn(move || {
let e = Self::run_listen( let thread_pool = rayon::ThreadPoolBuilder::new()
&me, .num_threads(get_thread_count())
&recycler, .build()
bank_forks.as_ref(), .unwrap();
&requests_receiver, loop {
&response_sender, let e = Self::run_listen(
); &me,
if exit.load(Ordering::Relaxed) { &recycler,
return; bank_forks.as_ref(),
} &requests_receiver,
if e.is_err() { &response_sender,
let me = me.read().unwrap(); &thread_pool,
debug!(
"{}: run_listen timeout, table size: {}",
me.gossip.id,
me.gossip.crds.table.len()
); );
if exit.load(Ordering::Relaxed) {
return;
}
if e.is_err() {
let me = me.read().unwrap();
debug!(
"{}: run_listen timeout, table size: {}",
me.gossip.id,
me.gossip.crds.table.len()
);
}
thread_mem_usage::datapoint("solana-listen");
} }
thread_mem_usage::datapoint("solana-listen");
}) })
.unwrap() .unwrap()
} }