From a78c3c07f29ef2e09180348a76e689d20e9cef6a Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Wed, 8 Sep 2021 17:09:35 +0000 Subject: [PATCH] filters crds values in parallel when responding to gossip pull-requests (backport #18877) (#19450) * filters crds values in parallel when responding to gossip pull-requests (#18877) When responding to gossip pull-requests, filter_crds_values takes a lot of time while holding onto read-lock: https://github.com/solana-labs/solana/blob/f51d64868/gossip/src/crds_gossip_pull.rs#L509-L566 This commit will filter-crds-values in parallel using rayon thread-pools. (cherry picked from commit f1198fc6d5ac1f9131d2b044a03fad524e5616e2) * removes backport merge conflicts Co-authored-by: behzad nouri --- core/src/cluster_info.rs | 33 ++++--- core/src/crds_gossip.rs | 3 +- core/src/crds_gossip_pull.rs | 168 ++++++++++++++++++++--------------- core/src/gossip_service.rs | 14 +-- core/tests/crds_gossip.rs | 5 +- 5 files changed, 126 insertions(+), 97 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 7a17ba8672..5c3b278576 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1778,9 +1778,8 @@ impl ClusterInfo { bank_forks: Option>>, sender: PacketSender, gossip_validators: Option>, - exit: &Arc, + exit: Arc, ) -> JoinHandle<()> { - let exit = exit.clone(); let thread_pool = ThreadPoolBuilder::new() .num_threads(std::cmp::min(get_thread_count(), 8)) .thread_name(|i| format!("ClusterInfo::gossip-{}", i)) @@ -1963,8 +1962,13 @@ impl ClusterInfo { self.stats .pull_requests_count .add_relaxed(requests.len() as u64); - let response = - self.handle_pull_requests(recycler, requests, stakes, require_stake_for_gossip); + let response = self.handle_pull_requests( + thread_pool, + recycler, + requests, + stakes, + require_stake_for_gossip, + ); if !response.is_empty() { self.stats .packets_sent_pull_responses_count @@ -2034,6 +2038,7 @@ impl ClusterInfo { // and tries to send back to them the values it detects are missing. fn handle_pull_requests( &self, + thread_pool: &ThreadPool, recycler: &PacketsRecycler, requests: Vec, stakes: &HashMap, @@ -2065,7 +2070,7 @@ impl ClusterInfo { "generate_pull_responses", &self.stats.generate_pull_responses, ) - .generate_pull_responses(&caller_and_filters, output_size_limit, now); + .generate_pull_responses(thread_pool, &caller_and_filters, output_size_limit, now); if require_stake_for_gossip { for resp in &mut pull_responses { retain_staked(resp, stakes); @@ -2706,6 +2711,9 @@ impl ClusterInfo { match self.run_socket_consume(&receiver, &sender, &thread_pool) { Err(Error::RecvTimeoutError(RecvTimeoutError::Disconnected)) => break, Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (), + // A send operation can only fail if the receiving end of a + // channel is disconnected. + Err(Error::SendError) => break, Err(err) => error!("gossip consume: {}", err), Ok(()) => (), } @@ -2721,20 +2729,19 @@ impl ClusterInfo { requests_receiver: Receiver>, response_sender: PacketSender, should_check_duplicate_instance: bool, - exit: &Arc, + exit: Arc, ) -> JoinHandle<()> { - let exit = exit.clone(); let recycler = PacketsRecycler::new_without_limit("cluster-info-listen-recycler-shrink-stats"); + let mut last_print = Instant::now(); + let thread_pool = ThreadPoolBuilder::new() + .num_threads(get_thread_count().min(8)) + .thread_name(|i| format!("sol-gossip-work-{}", i)) + .build() + .unwrap(); Builder::new() .name("solana-listen".to_string()) .spawn(move || { - let thread_pool = ThreadPoolBuilder::new() - .num_threads(std::cmp::min(get_thread_count(), 8)) - .thread_name(|i| format!("sol-gossip-work-{}", i)) - .build() - .unwrap(); - let mut last_print = Instant::now(); while !exit.load(Ordering::Relaxed) { if let Err(err) = self.run_listen( &recycler, diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index 5a2d08a644..a8ee3bc9c1 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -254,12 +254,13 @@ impl CrdsGossip { pub fn generate_pull_responses( &self, + thread_pool: &ThreadPool, filters: &[(CrdsValue, CrdsFilter)], output_size_limit: usize, // Limit number of crds values returned. now: u64, ) -> Vec> { self.pull - .generate_pull_responses(&self.crds, filters, output_size_limit, now) + .generate_pull_responses(thread_pool, &self.crds, filters, output_size_limit, now) } pub fn filter_pull_responses( diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 98a6bcd486..abeaa8557d 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -9,32 +9,38 @@ //! with random hash functions. So each subsequent request will have a different distribution //! of false positives. -use crate::{ - cluster_info::{Ping, CRDS_UNIQUE_PUBKEY_CAPACITY}, - contact_info::ContactInfo, - crds::Crds, - crds_gossip::{get_stake, get_weight}, - crds_gossip_error::CrdsGossipError, - crds_value::CrdsValue, - ping_pong::PingCache, -}; -use itertools::Itertools; -use lru::LruCache; -use rand::distributions::{Distribution, WeightedIndex}; -use rand::Rng; -use rayon::{prelude::*, ThreadPool}; -use solana_runtime::bloom::{AtomicBloom, Bloom}; -use solana_sdk::{ - hash::{hash, Hash}, - pubkey::Pubkey, - signature::{Keypair, Signer}, -}; -use std::{ - collections::{HashMap, HashSet, VecDeque}, - convert::TryInto, - net::SocketAddr, - sync::Mutex, - time::{Duration, Instant}, +use { + crate::{ + cluster_info::{Ping, CRDS_UNIQUE_PUBKEY_CAPACITY}, + contact_info::ContactInfo, + crds::Crds, + crds_gossip::{get_stake, get_weight}, + crds_gossip_error::CrdsGossipError, + crds_value::CrdsValue, + ping_pong::PingCache, + }, + lru::LruCache, + rand::{ + distributions::{Distribution, WeightedIndex}, + Rng, + }, + rayon::{prelude::*, ThreadPool}, + solana_runtime::bloom::{AtomicBloom, Bloom}, + solana_sdk::{ + hash::{hash, Hash}, + pubkey::Pubkey, + signature::{Keypair, Signer}, + }, + std::{ + collections::{HashMap, HashSet, VecDeque}, + convert::TryInto, + net::SocketAddr, + sync::{ + atomic::{AtomicI64, AtomicUsize, Ordering}, + Mutex, + }, + time::{Duration, Instant}, + }, }; pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000; @@ -333,12 +339,13 @@ impl CrdsGossipPull { /// Create gossip responses to pull requests pub fn generate_pull_responses( &self, + thread_pool: &ThreadPool, crds: &Crds, requests: &[(CrdsValue, CrdsFilter)], output_size_limit: usize, // Limit number of crds values returned. now: u64, ) -> Vec> { - self.filter_crds_values(crds, requests, output_size_limit, now) + self.filter_crds_values(thread_pool, crds, requests, output_size_limit, now) } // Checks if responses should be inserted and @@ -472,9 +479,10 @@ impl CrdsGossipPull { /// filter values that fail the bloom filter up to max_bytes fn filter_crds_values( &self, + thread_pool: &ThreadPool, crds: &Crds, filters: &[(CrdsValue, CrdsFilter)], - mut output_size_limit: usize, // Limit number of crds values returned. + output_size_limit: usize, // Limit number of crds values returned. now: u64, ) -> Vec> { let msg_timeout = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS; @@ -482,46 +490,53 @@ impl CrdsGossipPull { //skip filters from callers that are too old let caller_wallclock_window = now.saturating_sub(msg_timeout)..now.saturating_add(msg_timeout); - let mut dropped_requests = 0; - let mut total_skipped = 0; - let ret: Vec<_> = filters - .iter() - .map(|(caller, filter)| { - if output_size_limit == 0 { - return None; - } - let caller_wallclock = caller.wallclock(); - if !caller_wallclock_window.contains(&caller_wallclock) { - dropped_requests += 1; - return Some(vec![]); - } - let caller_wallclock = caller_wallclock.checked_add(jitter).unwrap_or(0); - let out: Vec<_> = crds - .filter_bitmask(filter.mask, filter.mask_bits) - .filter_map(|item| { - debug_assert!(filter.test_mask(&item.value_hash)); - //skip values that are too new - if item.value.wallclock() > caller_wallclock { - total_skipped += 1; - None - } else if filter.filter_contains(&item.value_hash) { - None - } else { - Some(item.value.clone()) - } - }) - .take(output_size_limit) - .collect(); - output_size_limit -= out.len(); - Some(out) - }) - .while_some() - .collect(); + let dropped_requests = AtomicUsize::default(); + let total_skipped = AtomicUsize::default(); + let output_size_limit = output_size_limit.try_into().unwrap_or(i64::MAX); + let output_size_limit = AtomicI64::new(output_size_limit); + let apply_filter = |caller: &CrdsValue, filter: &CrdsFilter| { + if output_size_limit.load(Ordering::Relaxed) <= 0 { + return Vec::default(); + } + let caller_wallclock = caller.wallclock(); + if !caller_wallclock_window.contains(&caller_wallclock) { + dropped_requests.fetch_add(1, Ordering::Relaxed); + return Vec::default(); + } + let caller_wallclock = caller_wallclock.checked_add(jitter).unwrap_or(0); + let out: Vec<_> = crds + .filter_bitmask(filter.mask, filter.mask_bits) + .filter_map(|item| { + debug_assert!(filter.test_mask(&item.value_hash)); + //skip values that are too new + if item.value.wallclock() > caller_wallclock { + total_skipped.fetch_add(1, Ordering::Relaxed); + None + } else if filter.filter_contains(&item.value_hash) { + None + } else { + Some(item.value.clone()) + } + }) + .take(output_size_limit.load(Ordering::Relaxed).max(0) as usize) + .collect(); + output_size_limit.fetch_sub(out.len() as i64, Ordering::Relaxed); + out + }; + let ret: Vec<_> = thread_pool.install(|| { + filters + .par_iter() + .map(|(caller, filter)| apply_filter(caller, filter)) + .collect() + }); inc_new_counter_info!( "gossip_filter_crds_values-dropped_requests", - dropped_requests + filters.len() - ret.len() + dropped_requests.into_inner() + ); + inc_new_counter_info!( + "gossip_filter_crds_values-dropped_values", + total_skipped.into_inner() ); - inc_new_counter_info!("gossip_filter_crds_values-dropped_values", total_skipped); ret } @@ -1098,10 +1113,11 @@ pub(crate) mod tests { let (_, filters) = req.unwrap(); let mut filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); let rsp = dest.generate_pull_responses( + &thread_pool, &dest_crds, &filters, - /*output_size_limit=*/ usize::MAX, - 0, + usize::MAX, // output_size_limit + 0, // now ); assert_eq!(rsp[0].len(), 0); @@ -1116,10 +1132,11 @@ pub(crate) mod tests { //should skip new value since caller is to old let rsp = dest.generate_pull_responses( + &thread_pool, &dest_crds, &filters, - /*output_size_limit=*/ usize::MAX, - CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, + usize::MAX, // output_size_limit + CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, // now ); assert_eq!(rsp[0].len(), 0); assert_eq!(filters.len(), MIN_NUM_BLOOM_FILTERS); @@ -1134,9 +1151,10 @@ pub(crate) mod tests { .collect::>() }); let rsp = dest.generate_pull_responses( + &thread_pool, &dest_crds, &filters, - /*output_size_limit=*/ usize::MAX, + usize::MAX, // output_size_limit CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, ); assert_eq!(rsp.len(), 2 * MIN_NUM_BLOOM_FILTERS); @@ -1186,10 +1204,11 @@ pub(crate) mod tests { let (_, filters) = req.unwrap(); let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); let rsp = dest.generate_pull_responses( + &thread_pool, &dest_crds, &filters, - /*output_size_limit=*/ usize::MAX, - 0, + usize::MAX, // output_size_limit + 0, // now ); dest.process_pull_requests( &mut dest_crds, @@ -1258,10 +1277,11 @@ pub(crate) mod tests { let (_, filters) = req.unwrap(); let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); let rsp = dest.generate_pull_responses( + &thread_pool, &dest_crds, &filters, - /*output_size_limit=*/ usize::MAX, - 0, + usize::MAX, // output_size_limit + 0, // now ); dest.process_pull_requests( &mut dest_crds, diff --git a/core/src/gossip_service.rs b/core/src/gossip_service.rs index bf13a7330d..9bec97e70d 100644 --- a/core/src/gossip_service.rs +++ b/core/src/gossip_service.rs @@ -51,27 +51,27 @@ impl GossipService { "gossip_receiver", 1, ); - let (response_sender, response_receiver) = channel(); let (consume_sender, listen_receiver) = channel(); + // https://github.com/rust-lang/rust/issues/39364#issuecomment-634545136 + let _consume_sender = consume_sender.clone(); let t_socket_consume = cluster_info.clone().start_socket_consume_thread( request_receiver, consume_sender, exit.clone(), ); - let t_listen = ClusterInfo::listen( - cluster_info.clone(), + let (response_sender, response_receiver) = channel(); + let t_listen = cluster_info.clone().listen( bank_forks.clone(), listen_receiver, response_sender.clone(), should_check_duplicate_instance, - exit, + exit.clone(), ); - let t_gossip = ClusterInfo::gossip( - cluster_info.clone(), + let t_gossip = cluster_info.clone().gossip( bank_forks, response_sender, gossip_validators, - exit, + exit.clone(), ); // To work around: // https://github.com/rust-lang/rust/issues/54267 diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index 04339db235..081aeca2df 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -483,7 +483,7 @@ fn network_run_pull( .collect() }; let transfered: Vec<_> = requests - .into_par_iter() + .into_iter() .map(|(to, filters, caller_info)| { let mut bytes: usize = 0; let mut msgs: usize = 0; @@ -506,8 +506,9 @@ fn network_run_pull( .lock() .unwrap() .generate_pull_responses( + thread_pool, &filters, - /*output_size_limit=*/ usize::MAX, + usize::MAX, // output_size_limit now, ) .into_iter()