From d2e98cb531a0f25996c3e4257c27cc8a867dea09 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Wed, 19 May 2021 22:50:42 +0000 Subject: [PATCH] prunes received-cache only once per unique owner's key (#17039) (#17337) (cherry picked from commit 0e646d10bb92cf379c524ccf13192ca7a5d875ee) Co-authored-by: behzad nouri --- core/src/cluster_info.rs | 9 ++-- core/src/crds_gossip.rs | 32 ++++++++------- core/src/crds_gossip_push.rs | 80 +++++++++++++++++------------------- core/tests/crds_gossip.rs | 10 ++--- 4 files changed, 63 insertions(+), 68 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index bd565e19ee..b4330712db 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -2423,8 +2423,9 @@ impl ClusterInfo { self.stats .skip_push_message_shred_version .add_relaxed(num_crds_values - num_filtered_crds_values); - // Update crds values and obtain updated keys. - let updated_labels: Vec<_> = { + // Origins' pubkeys of updated crds values. + // TODO: Should this also include origins of new crds values? + let origins: HashSet<_> = { let mut gossip = self.time_gossip_write_lock("process_push", &self.stats.process_push_message); let now = timestamp(); @@ -2433,13 +2434,13 @@ impl ClusterInfo { .flat_map(|(from, crds_values)| { gossip.process_push_message(&from, crds_values, now) }) - .map(|v| v.value.label()) + .map(|v| v.value.pubkey()) .collect() }; // Generate prune messages. let prunes = self .time_gossip_write_lock("prune_received_cache", &self.stats.prune_received_cache) - .prune_received_cache(updated_labels, stakes); + .prune_received_cache(origins, stakes); let prunes: Vec<(Pubkey /*from*/, Vec /*origins*/)> = prunes .into_iter() .flat_map(|(from, prunes)| { diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index a6ee16eb78..98a63e13dd 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -10,10 +10,11 @@ use crate::{ crds_gossip_error::CrdsGossipError, crds_gossip_pull::{CrdsFilter, CrdsGossipPull, ProcessPullStats}, crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE}, - crds_value::{CrdsData, CrdsValue, CrdsValueLabel}, + crds_value::{CrdsData, CrdsValue}, duplicate_shred::{self, DuplicateShredIndex, LeaderScheduleFn, MAX_DUPLICATE_SHREDS}, ping_pong::PingCache, }; +use itertools::Itertools; use rayon::ThreadPool; use solana_ledger::shred::Shred; use solana_sdk::{ @@ -80,21 +81,24 @@ impl CrdsGossip { } /// remove redundant paths in the network - pub fn prune_received_cache( + pub fn prune_received_cache( &mut self, - labels: Vec, + origins: I, // Unique pubkeys of crds values' owners. stakes: &HashMap, - ) -> HashMap> { - let id = &self.id; - let push = &mut self.push; - let mut prune_map: HashMap> = HashMap::new(); - for origin in labels.iter().map(|k| k.pubkey()) { - let peers = push.prune_received_cache(id, &origin, stakes); - for from in peers { - prune_map.entry(from).or_default().insert(origin); - } - } - prune_map + ) -> HashMap> + where + I: IntoIterator, + { + let self_pubkey = self.id; + origins + .into_iter() + .flat_map(|origin| { + self.push + .prune_received_cache(&self_pubkey, &origin, stakes) + .into_iter() + .zip(std::iter::repeat(origin)) + }) + .into_group_map() } pub fn new_push_messages( diff --git a/core/src/crds_gossip_push.rs b/core/src/crds_gossip_push.rs index 8df54067d9..f0b3b95e85 100644 --- a/core/src/crds_gossip_push.rs +++ b/core/src/crds_gossip_push.rs @@ -53,7 +53,10 @@ pub struct CrdsGossipPush { /// bool indicates it has been pruned. /// This cache represents a lagging view of which validators /// currently have this node in their `active_set` - received_cache: HashMap>, + received_cache: HashMap< + Pubkey, // origin/owner + HashMap, + >, last_pushed_to: LruCache, pub num_active: usize, pub push_fanout: usize, @@ -102,67 +105,58 @@ impl CrdsGossipPush { ) -> Vec { let origin_stake = stakes.get(origin).unwrap_or(&0); let self_stake = stakes.get(self_pubkey).unwrap_or(&0); - let cache = self.received_cache.get(origin); - if cache.is_none() { - return Vec::new(); - } - let peers = cache.unwrap(); - + let peers = match self.received_cache.get_mut(origin) { + None => return Vec::default(), + Some(peers) => peers, + }; let peer_stake_total: u64 = peers .iter() - .filter(|v| !(v.1).0) - .map(|v| stakes.get(v.0).unwrap_or(&0)) + .filter(|(_, (pruned, _))| !pruned) + .filter_map(|(peer, _)| stakes.get(peer)) .sum(); let prune_stake_threshold = Self::prune_stake_threshold(*self_stake, *origin_stake); if peer_stake_total < prune_stake_threshold { return Vec::new(); } - - let staked_peers: Vec<(Pubkey, u64)> = peers - .iter() - .filter(|v| !(v.1).0) - .filter_map(|p| stakes.get(p.0).map(|s| (*p.0, *s))) - .filter(|(_, s)| *s > 0) - .collect(); - - let mut seed = [0; 32]; - rand::thread_rng().fill(&mut seed[..]); - let shuffle = weighted_shuffle( - &staked_peers.iter().map(|(_, stake)| *stake).collect_vec(), - seed, - ); - + let shuffled_staked_peers = { + let peers: Vec<_> = peers + .iter() + .filter(|(_, (pruned, _))| !pruned) + .filter_map(|(peer, _)| Some((*peer, *stakes.get(peer)?))) + .filter(|(_, stake)| *stake > 0) + .collect(); + let mut seed = [0; 32]; + rand::thread_rng().fill(&mut seed[..]); + let weights: Vec<_> = peers.iter().map(|(_, stake)| *stake).collect(); + weighted_shuffle(&weights, seed) + .into_iter() + .map(move |i| peers[i]) + }; let mut keep = HashSet::new(); let mut peer_stake_sum = 0; keep.insert(*origin); - for next in shuffle { - let (next_peer, next_stake) = staked_peers[next]; - if next_peer == *origin { + for (peer, stake) in shuffled_staked_peers { + if peer == *origin { continue; } - keep.insert(next_peer); - peer_stake_sum += next_stake; + keep.insert(peer); + peer_stake_sum += stake; if peer_stake_sum >= prune_stake_threshold && keep.len() >= CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES { break; } } - - let pruned_peers: Vec = peers + for (peer, (pruned, _)) in peers.iter_mut() { + if !*pruned && !keep.contains(peer) { + *pruned = true; + } + } + peers .keys() - .filter(|p| !keep.contains(p)) - .cloned() - .collect(); - pruned_peers.iter().for_each(|p| { - self.received_cache - .get_mut(origin) - .unwrap() - .get_mut(p) - .unwrap() - .0 = true; - }); - pruned_peers + .filter(|peer| !keep.contains(peer)) + .copied() + .collect() } fn wallclock_window(&self, now: u64) -> impl RangeBounds { diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index 656332bfab..d455d0a30c 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -357,15 +357,11 @@ fn network_run_push( }) .unwrap(); - let updated_labels: Vec<_> = - updated.into_iter().map(|u| u.value.label()).collect(); + let origins: HashSet<_> = + updated.into_iter().map(|u| u.value.pubkey()).collect(); let prunes_map = network .get(&to) - .map(|node| { - node.lock() - .unwrap() - .prune_received_cache(updated_labels, &stakes) - }) + .map(|node| node.lock().unwrap().prune_received_cache(origins, &stakes)) .unwrap(); for (from, prune_set) in prunes_map {