diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index e04dc6500e..8a0112251f 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -29,6 +29,9 @@ use bincode::{serialize, serialized_size}; use compression::prelude::*; use core::cmp; use itertools::Itertools; +use rayon::iter::IntoParallelIterator; +use rayon::iter::ParallelIterator; +use rayon::ThreadPool; use solana_ledger::{bank_forks::BankForks, staking_utils}; use solana_measure::thread_mem_usage; use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_error}; @@ -37,6 +40,7 @@ use solana_net_utils::{ multi_bind_in_range, PortRange, }; use solana_perf::packet::{to_packets_with_destination, Packets, PacketsRecycler}; +use solana_rayon_threadlimit::get_thread_count; use solana_sdk::{ clock::{Slot, DEFAULT_MS_PER_SLOT}, pubkey::Pubkey, @@ -68,6 +72,10 @@ pub const MAX_BLOOM_SIZE: usize = 1018; const MAX_PROTOCOL_PAYLOAD_SIZE: u64 = PACKET_DATA_SIZE as u64 - MAX_PROTOCOL_HEADER_SIZE; /// The largest protocol header size const MAX_PROTOCOL_HEADER_SIZE: u64 = 214; +/// A hard limit on incoming gossip messages +/// Chosen to be able to handle 1Gbps of pure gossip traffic +/// 128MB/PACKET_DATA_SIZE +const MAX_GOSSIP_TRAFFIC: usize = 128_000_000 / PACKET_DATA_SIZE; #[derive(Debug, PartialEq, Eq)] pub enum ClusterInfoError { @@ -1424,10 +1432,22 @@ impl ClusterInfo { bank_forks: Option<&Arc>>, requests_receiver: &PacketReceiver, response_sender: &PacketSender, + thread_pool: &ThreadPool, ) -> Result<()> { //TODO cache connections let timeout = Duration::new(1, 0); - let reqs = requests_receiver.recv_timeout(timeout)?; + let mut requests = vec![requests_receiver.recv_timeout(timeout)?]; + let mut num_requests = requests.last().unwrap().packets.len(); + while let Ok(more_reqs) = requests_receiver.try_recv() { + if num_requests >= MAX_GOSSIP_TRAFFIC { + continue; + } + num_requests += more_reqs.packets.len(); + requests.push(more_reqs) + } + if num_requests >= MAX_GOSSIP_TRAFFIC { + warn!("Too much gossip traffic, ignoring some messages"); + } let epoch_ms; let stakes: HashMap<_, _> = match bank_forks { Some(ref bank_forks) => { @@ -1443,8 +1463,13 @@ impl ClusterInfo { HashMap::new() } }; + let sender = response_sender.clone(); + thread_pool.install(|| { + requests.into_par_iter().for_each_with(sender, |s, reqs| { + Self::handle_packets(obj, &recycler, &stakes, reqs, s, epoch_ms) + }); + }); - Self::handle_packets(obj, &recycler, &stakes, reqs, response_sender, epoch_ms); Ok(()) } pub fn listen( @@ -1458,26 +1483,33 @@ impl ClusterInfo { let recycler = PacketsRecycler::default(); Builder::new() .name("solana-listen".to_string()) - .spawn(move || loop { - let e = Self::run_listen( - &me, - &recycler, - bank_forks.as_ref(), - &requests_receiver, - &response_sender, - ); - if exit.load(Ordering::Relaxed) { - return; - } - if e.is_err() { - let me = me.read().unwrap(); - debug!( - "{}: run_listen timeout, table size: {}", - me.gossip.id, - me.gossip.crds.table.len() + .spawn(move || { + let thread_pool = rayon::ThreadPoolBuilder::new() + .num_threads(get_thread_count()) + .build() + .unwrap(); + loop { + let e = Self::run_listen( + &me, + &recycler, + bank_forks.as_ref(), + &requests_receiver, + &response_sender, + &thread_pool, ); + if exit.load(Ordering::Relaxed) { + return; + } + if e.is_err() { + let me = me.read().unwrap(); + debug!( + "{}: run_listen timeout, table size: {}", + me.gossip.id, + me.gossip.crds.table.len() + ); + } + thread_mem_usage::datapoint("solana-listen"); } - thread_mem_usage::datapoint("solana-listen"); }) .unwrap() }