filters crds values in parallel when responding to gossip pull-requests (backport #18877) (#19450)

* filters crds values in parallel when responding to gossip pull-requests (#18877)

When responding to gossip pull-requests, filter_crds_values takes a lot of time
while holding onto read-lock:
https://github.com/solana-labs/solana/blob/f51d64868/gossip/src/crds_gossip_pull.rs#L509-L566

This commit will filter-crds-values in parallel using rayon thread-pools.

(cherry picked from commit f1198fc6d5)

* removes backport merge conflicts

Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
mergify[bot]
2021-09-08 17:09:35 +00:00
committed by GitHub
parent 53f8e58300
commit a78c3c07f2
5 changed files with 126 additions and 97 deletions

View File

@ -1778,9 +1778,8 @@ impl ClusterInfo {
bank_forks: Option<Arc<RwLock<BankForks>>>, bank_forks: Option<Arc<RwLock<BankForks>>>,
sender: PacketSender, sender: PacketSender,
gossip_validators: Option<HashSet<Pubkey>>, gossip_validators: Option<HashSet<Pubkey>>,
exit: &Arc<AtomicBool>, exit: Arc<AtomicBool>,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
let exit = exit.clone();
let thread_pool = ThreadPoolBuilder::new() let thread_pool = ThreadPoolBuilder::new()
.num_threads(std::cmp::min(get_thread_count(), 8)) .num_threads(std::cmp::min(get_thread_count(), 8))
.thread_name(|i| format!("ClusterInfo::gossip-{}", i)) .thread_name(|i| format!("ClusterInfo::gossip-{}", i))
@ -1963,8 +1962,13 @@ impl ClusterInfo {
self.stats self.stats
.pull_requests_count .pull_requests_count
.add_relaxed(requests.len() as u64); .add_relaxed(requests.len() as u64);
let response = let response = self.handle_pull_requests(
self.handle_pull_requests(recycler, requests, stakes, require_stake_for_gossip); thread_pool,
recycler,
requests,
stakes,
require_stake_for_gossip,
);
if !response.is_empty() { if !response.is_empty() {
self.stats self.stats
.packets_sent_pull_responses_count .packets_sent_pull_responses_count
@ -2034,6 +2038,7 @@ impl ClusterInfo {
// and tries to send back to them the values it detects are missing. // and tries to send back to them the values it detects are missing.
fn handle_pull_requests( fn handle_pull_requests(
&self, &self,
thread_pool: &ThreadPool,
recycler: &PacketsRecycler, recycler: &PacketsRecycler,
requests: Vec<PullData>, requests: Vec<PullData>,
stakes: &HashMap<Pubkey, u64>, stakes: &HashMap<Pubkey, u64>,
@ -2065,7 +2070,7 @@ impl ClusterInfo {
"generate_pull_responses", "generate_pull_responses",
&self.stats.generate_pull_responses, &self.stats.generate_pull_responses,
) )
.generate_pull_responses(&caller_and_filters, output_size_limit, now); .generate_pull_responses(thread_pool, &caller_and_filters, output_size_limit, now);
if require_stake_for_gossip { if require_stake_for_gossip {
for resp in &mut pull_responses { for resp in &mut pull_responses {
retain_staked(resp, stakes); retain_staked(resp, stakes);
@ -2706,6 +2711,9 @@ impl ClusterInfo {
match self.run_socket_consume(&receiver, &sender, &thread_pool) { match self.run_socket_consume(&receiver, &sender, &thread_pool) {
Err(Error::RecvTimeoutError(RecvTimeoutError::Disconnected)) => break, Err(Error::RecvTimeoutError(RecvTimeoutError::Disconnected)) => break,
Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (), Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (),
// A send operation can only fail if the receiving end of a
// channel is disconnected.
Err(Error::SendError) => break,
Err(err) => error!("gossip consume: {}", err), Err(err) => error!("gossip consume: {}", err),
Ok(()) => (), Ok(()) => (),
} }
@ -2721,20 +2729,19 @@ impl ClusterInfo {
requests_receiver: Receiver<Vec<(/*from:*/ SocketAddr, Protocol)>>, requests_receiver: Receiver<Vec<(/*from:*/ SocketAddr, Protocol)>>,
response_sender: PacketSender, response_sender: PacketSender,
should_check_duplicate_instance: bool, should_check_duplicate_instance: bool,
exit: &Arc<AtomicBool>, exit: Arc<AtomicBool>,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
let exit = exit.clone();
let recycler = let recycler =
PacketsRecycler::new_without_limit("cluster-info-listen-recycler-shrink-stats"); PacketsRecycler::new_without_limit("cluster-info-listen-recycler-shrink-stats");
let mut last_print = Instant::now();
let thread_pool = ThreadPoolBuilder::new()
.num_threads(get_thread_count().min(8))
.thread_name(|i| format!("sol-gossip-work-{}", i))
.build()
.unwrap();
Builder::new() Builder::new()
.name("solana-listen".to_string()) .name("solana-listen".to_string())
.spawn(move || { .spawn(move || {
let thread_pool = ThreadPoolBuilder::new()
.num_threads(std::cmp::min(get_thread_count(), 8))
.thread_name(|i| format!("sol-gossip-work-{}", i))
.build()
.unwrap();
let mut last_print = Instant::now();
while !exit.load(Ordering::Relaxed) { while !exit.load(Ordering::Relaxed) {
if let Err(err) = self.run_listen( if let Err(err) = self.run_listen(
&recycler, &recycler,

View File

@ -254,12 +254,13 @@ impl CrdsGossip {
pub fn generate_pull_responses( pub fn generate_pull_responses(
&self, &self,
thread_pool: &ThreadPool,
filters: &[(CrdsValue, CrdsFilter)], filters: &[(CrdsValue, CrdsFilter)],
output_size_limit: usize, // Limit number of crds values returned. output_size_limit: usize, // Limit number of crds values returned.
now: u64, now: u64,
) -> Vec<Vec<CrdsValue>> { ) -> Vec<Vec<CrdsValue>> {
self.pull self.pull
.generate_pull_responses(&self.crds, filters, output_size_limit, now) .generate_pull_responses(thread_pool, &self.crds, filters, output_size_limit, now)
} }
pub fn filter_pull_responses( pub fn filter_pull_responses(

View File

@ -9,32 +9,38 @@
//! with random hash functions. So each subsequent request will have a different distribution //! with random hash functions. So each subsequent request will have a different distribution
//! of false positives. //! of false positives.
use crate::{ use {
cluster_info::{Ping, CRDS_UNIQUE_PUBKEY_CAPACITY}, crate::{
contact_info::ContactInfo, cluster_info::{Ping, CRDS_UNIQUE_PUBKEY_CAPACITY},
crds::Crds, contact_info::ContactInfo,
crds_gossip::{get_stake, get_weight}, crds::Crds,
crds_gossip_error::CrdsGossipError, crds_gossip::{get_stake, get_weight},
crds_value::CrdsValue, crds_gossip_error::CrdsGossipError,
ping_pong::PingCache, crds_value::CrdsValue,
}; ping_pong::PingCache,
use itertools::Itertools; },
use lru::LruCache; lru::LruCache,
use rand::distributions::{Distribution, WeightedIndex}; rand::{
use rand::Rng; distributions::{Distribution, WeightedIndex},
use rayon::{prelude::*, ThreadPool}; Rng,
use solana_runtime::bloom::{AtomicBloom, Bloom}; },
use solana_sdk::{ rayon::{prelude::*, ThreadPool},
hash::{hash, Hash}, solana_runtime::bloom::{AtomicBloom, Bloom},
pubkey::Pubkey, solana_sdk::{
signature::{Keypair, Signer}, hash::{hash, Hash},
}; pubkey::Pubkey,
use std::{ signature::{Keypair, Signer},
collections::{HashMap, HashSet, VecDeque}, },
convert::TryInto, std::{
net::SocketAddr, collections::{HashMap, HashSet, VecDeque},
sync::Mutex, convert::TryInto,
time::{Duration, Instant}, net::SocketAddr,
sync::{
atomic::{AtomicI64, AtomicUsize, Ordering},
Mutex,
},
time::{Duration, Instant},
},
}; };
pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000; pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000;
@ -333,12 +339,13 @@ impl CrdsGossipPull {
/// Create gossip responses to pull requests /// Create gossip responses to pull requests
pub fn generate_pull_responses( pub fn generate_pull_responses(
&self, &self,
thread_pool: &ThreadPool,
crds: &Crds, crds: &Crds,
requests: &[(CrdsValue, CrdsFilter)], requests: &[(CrdsValue, CrdsFilter)],
output_size_limit: usize, // Limit number of crds values returned. output_size_limit: usize, // Limit number of crds values returned.
now: u64, now: u64,
) -> Vec<Vec<CrdsValue>> { ) -> Vec<Vec<CrdsValue>> {
self.filter_crds_values(crds, requests, output_size_limit, now) self.filter_crds_values(thread_pool, crds, requests, output_size_limit, now)
} }
// Checks if responses should be inserted and // Checks if responses should be inserted and
@ -472,9 +479,10 @@ impl CrdsGossipPull {
/// filter values that fail the bloom filter up to max_bytes /// filter values that fail the bloom filter up to max_bytes
fn filter_crds_values( fn filter_crds_values(
&self, &self,
thread_pool: &ThreadPool,
crds: &Crds, crds: &Crds,
filters: &[(CrdsValue, CrdsFilter)], filters: &[(CrdsValue, CrdsFilter)],
mut output_size_limit: usize, // Limit number of crds values returned. output_size_limit: usize, // Limit number of crds values returned.
now: u64, now: u64,
) -> Vec<Vec<CrdsValue>> { ) -> Vec<Vec<CrdsValue>> {
let msg_timeout = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS; let msg_timeout = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS;
@ -482,46 +490,53 @@ impl CrdsGossipPull {
//skip filters from callers that are too old //skip filters from callers that are too old
let caller_wallclock_window = let caller_wallclock_window =
now.saturating_sub(msg_timeout)..now.saturating_add(msg_timeout); now.saturating_sub(msg_timeout)..now.saturating_add(msg_timeout);
let mut dropped_requests = 0; let dropped_requests = AtomicUsize::default();
let mut total_skipped = 0; let total_skipped = AtomicUsize::default();
let ret: Vec<_> = filters let output_size_limit = output_size_limit.try_into().unwrap_or(i64::MAX);
.iter() let output_size_limit = AtomicI64::new(output_size_limit);
.map(|(caller, filter)| { let apply_filter = |caller: &CrdsValue, filter: &CrdsFilter| {
if output_size_limit == 0 { if output_size_limit.load(Ordering::Relaxed) <= 0 {
return None; return Vec::default();
} }
let caller_wallclock = caller.wallclock(); let caller_wallclock = caller.wallclock();
if !caller_wallclock_window.contains(&caller_wallclock) { if !caller_wallclock_window.contains(&caller_wallclock) {
dropped_requests += 1; dropped_requests.fetch_add(1, Ordering::Relaxed);
return Some(vec![]); return Vec::default();
} }
let caller_wallclock = caller_wallclock.checked_add(jitter).unwrap_or(0); let caller_wallclock = caller_wallclock.checked_add(jitter).unwrap_or(0);
let out: Vec<_> = crds let out: Vec<_> = crds
.filter_bitmask(filter.mask, filter.mask_bits) .filter_bitmask(filter.mask, filter.mask_bits)
.filter_map(|item| { .filter_map(|item| {
debug_assert!(filter.test_mask(&item.value_hash)); debug_assert!(filter.test_mask(&item.value_hash));
//skip values that are too new //skip values that are too new
if item.value.wallclock() > caller_wallclock { if item.value.wallclock() > caller_wallclock {
total_skipped += 1; total_skipped.fetch_add(1, Ordering::Relaxed);
None None
} else if filter.filter_contains(&item.value_hash) { } else if filter.filter_contains(&item.value_hash) {
None None
} else { } else {
Some(item.value.clone()) Some(item.value.clone())
} }
}) })
.take(output_size_limit) .take(output_size_limit.load(Ordering::Relaxed).max(0) as usize)
.collect(); .collect();
output_size_limit -= out.len(); output_size_limit.fetch_sub(out.len() as i64, Ordering::Relaxed);
Some(out) out
}) };
.while_some() let ret: Vec<_> = thread_pool.install(|| {
.collect(); filters
.par_iter()
.map(|(caller, filter)| apply_filter(caller, filter))
.collect()
});
inc_new_counter_info!( inc_new_counter_info!(
"gossip_filter_crds_values-dropped_requests", "gossip_filter_crds_values-dropped_requests",
dropped_requests + filters.len() - ret.len() dropped_requests.into_inner()
);
inc_new_counter_info!(
"gossip_filter_crds_values-dropped_values",
total_skipped.into_inner()
); );
inc_new_counter_info!("gossip_filter_crds_values-dropped_values", total_skipped);
ret ret
} }
@ -1098,10 +1113,11 @@ pub(crate) mod tests {
let (_, filters) = req.unwrap(); let (_, filters) = req.unwrap();
let mut filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); let mut filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
let rsp = dest.generate_pull_responses( let rsp = dest.generate_pull_responses(
&thread_pool,
&dest_crds, &dest_crds,
&filters, &filters,
/*output_size_limit=*/ usize::MAX, usize::MAX, // output_size_limit
0, 0, // now
); );
assert_eq!(rsp[0].len(), 0); assert_eq!(rsp[0].len(), 0);
@ -1116,10 +1132,11 @@ pub(crate) mod tests {
//should skip new value since caller is to old //should skip new value since caller is to old
let rsp = dest.generate_pull_responses( let rsp = dest.generate_pull_responses(
&thread_pool,
&dest_crds, &dest_crds,
&filters, &filters,
/*output_size_limit=*/ usize::MAX, usize::MAX, // output_size_limit
CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, // now
); );
assert_eq!(rsp[0].len(), 0); assert_eq!(rsp[0].len(), 0);
assert_eq!(filters.len(), MIN_NUM_BLOOM_FILTERS); assert_eq!(filters.len(), MIN_NUM_BLOOM_FILTERS);
@ -1134,9 +1151,10 @@ pub(crate) mod tests {
.collect::<Vec<_>>() .collect::<Vec<_>>()
}); });
let rsp = dest.generate_pull_responses( let rsp = dest.generate_pull_responses(
&thread_pool,
&dest_crds, &dest_crds,
&filters, &filters,
/*output_size_limit=*/ usize::MAX, usize::MAX, // output_size_limit
CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS,
); );
assert_eq!(rsp.len(), 2 * MIN_NUM_BLOOM_FILTERS); assert_eq!(rsp.len(), 2 * MIN_NUM_BLOOM_FILTERS);
@ -1186,10 +1204,11 @@ pub(crate) mod tests {
let (_, filters) = req.unwrap(); let (_, filters) = req.unwrap();
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
let rsp = dest.generate_pull_responses( let rsp = dest.generate_pull_responses(
&thread_pool,
&dest_crds, &dest_crds,
&filters, &filters,
/*output_size_limit=*/ usize::MAX, usize::MAX, // output_size_limit
0, 0, // now
); );
dest.process_pull_requests( dest.process_pull_requests(
&mut dest_crds, &mut dest_crds,
@ -1258,10 +1277,11 @@ pub(crate) mod tests {
let (_, filters) = req.unwrap(); let (_, filters) = req.unwrap();
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
let rsp = dest.generate_pull_responses( let rsp = dest.generate_pull_responses(
&thread_pool,
&dest_crds, &dest_crds,
&filters, &filters,
/*output_size_limit=*/ usize::MAX, usize::MAX, // output_size_limit
0, 0, // now
); );
dest.process_pull_requests( dest.process_pull_requests(
&mut dest_crds, &mut dest_crds,

View File

@ -51,27 +51,27 @@ impl GossipService {
"gossip_receiver", "gossip_receiver",
1, 1,
); );
let (response_sender, response_receiver) = channel();
let (consume_sender, listen_receiver) = channel(); let (consume_sender, listen_receiver) = channel();
// https://github.com/rust-lang/rust/issues/39364#issuecomment-634545136
let _consume_sender = consume_sender.clone();
let t_socket_consume = cluster_info.clone().start_socket_consume_thread( let t_socket_consume = cluster_info.clone().start_socket_consume_thread(
request_receiver, request_receiver,
consume_sender, consume_sender,
exit.clone(), exit.clone(),
); );
let t_listen = ClusterInfo::listen( let (response_sender, response_receiver) = channel();
cluster_info.clone(), let t_listen = cluster_info.clone().listen(
bank_forks.clone(), bank_forks.clone(),
listen_receiver, listen_receiver,
response_sender.clone(), response_sender.clone(),
should_check_duplicate_instance, should_check_duplicate_instance,
exit, exit.clone(),
); );
let t_gossip = ClusterInfo::gossip( let t_gossip = cluster_info.clone().gossip(
cluster_info.clone(),
bank_forks, bank_forks,
response_sender, response_sender,
gossip_validators, gossip_validators,
exit, exit.clone(),
); );
// To work around: // To work around:
// https://github.com/rust-lang/rust/issues/54267 // https://github.com/rust-lang/rust/issues/54267

View File

@ -483,7 +483,7 @@ fn network_run_pull(
.collect() .collect()
}; };
let transfered: Vec<_> = requests let transfered: Vec<_> = requests
.into_par_iter() .into_iter()
.map(|(to, filters, caller_info)| { .map(|(to, filters, caller_info)| {
let mut bytes: usize = 0; let mut bytes: usize = 0;
let mut msgs: usize = 0; let mut msgs: usize = 0;
@ -506,8 +506,9 @@ fn network_run_pull(
.lock() .lock()
.unwrap() .unwrap()
.generate_pull_responses( .generate_pull_responses(
thread_pool,
&filters, &filters,
/*output_size_limit=*/ usize::MAX, usize::MAX, // output_size_limit
now, now,
) )
.into_iter() .into_iter()