prunes received-cache only once per unique owner's key (#17039) (#17337)

(cherry picked from commit 0e646d10bb)

Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
mergify[bot]
2021-05-19 22:50:42 +00:00
committed by GitHub
parent 32681e2739
commit d2e98cb531
4 changed files with 63 additions and 68 deletions

View File

@ -2423,8 +2423,9 @@ impl ClusterInfo {
self.stats self.stats
.skip_push_message_shred_version .skip_push_message_shred_version
.add_relaxed(num_crds_values - num_filtered_crds_values); .add_relaxed(num_crds_values - num_filtered_crds_values);
// Update crds values and obtain updated keys. // Origins' pubkeys of updated crds values.
let updated_labels: Vec<_> = { // TODO: Should this also include origins of new crds values?
let origins: HashSet<_> = {
let mut gossip = let mut gossip =
self.time_gossip_write_lock("process_push", &self.stats.process_push_message); self.time_gossip_write_lock("process_push", &self.stats.process_push_message);
let now = timestamp(); let now = timestamp();
@ -2433,13 +2434,13 @@ impl ClusterInfo {
.flat_map(|(from, crds_values)| { .flat_map(|(from, crds_values)| {
gossip.process_push_message(&from, crds_values, now) gossip.process_push_message(&from, crds_values, now)
}) })
.map(|v| v.value.label()) .map(|v| v.value.pubkey())
.collect() .collect()
}; };
// Generate prune messages. // Generate prune messages.
let prunes = self let prunes = self
.time_gossip_write_lock("prune_received_cache", &self.stats.prune_received_cache) .time_gossip_write_lock("prune_received_cache", &self.stats.prune_received_cache)
.prune_received_cache(updated_labels, stakes); .prune_received_cache(origins, stakes);
let prunes: Vec<(Pubkey /*from*/, Vec<Pubkey> /*origins*/)> = prunes let prunes: Vec<(Pubkey /*from*/, Vec<Pubkey> /*origins*/)> = prunes
.into_iter() .into_iter()
.flat_map(|(from, prunes)| { .flat_map(|(from, prunes)| {

View File

@ -10,10 +10,11 @@ use crate::{
crds_gossip_error::CrdsGossipError, crds_gossip_error::CrdsGossipError,
crds_gossip_pull::{CrdsFilter, CrdsGossipPull, ProcessPullStats}, crds_gossip_pull::{CrdsFilter, CrdsGossipPull, ProcessPullStats},
crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE}, crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE},
crds_value::{CrdsData, CrdsValue, CrdsValueLabel}, crds_value::{CrdsData, CrdsValue},
duplicate_shred::{self, DuplicateShredIndex, LeaderScheduleFn, MAX_DUPLICATE_SHREDS}, duplicate_shred::{self, DuplicateShredIndex, LeaderScheduleFn, MAX_DUPLICATE_SHREDS},
ping_pong::PingCache, ping_pong::PingCache,
}; };
use itertools::Itertools;
use rayon::ThreadPool; use rayon::ThreadPool;
use solana_ledger::shred::Shred; use solana_ledger::shred::Shred;
use solana_sdk::{ use solana_sdk::{
@ -80,21 +81,24 @@ impl CrdsGossip {
} }
/// remove redundant paths in the network /// remove redundant paths in the network
pub fn prune_received_cache( pub fn prune_received_cache<I>(
&mut self, &mut self,
labels: Vec<CrdsValueLabel>, origins: I, // Unique pubkeys of crds values' owners.
stakes: &HashMap<Pubkey, u64>, stakes: &HashMap<Pubkey, u64>,
) -> HashMap<Pubkey, HashSet<Pubkey>> { ) -> HashMap</*gossip peer:*/ Pubkey, /*origins:*/ Vec<Pubkey>>
let id = &self.id; where
let push = &mut self.push; I: IntoIterator<Item = Pubkey>,
let mut prune_map: HashMap<Pubkey, HashSet<_>> = HashMap::new(); {
for origin in labels.iter().map(|k| k.pubkey()) { let self_pubkey = self.id;
let peers = push.prune_received_cache(id, &origin, stakes); origins
for from in peers { .into_iter()
prune_map.entry(from).or_default().insert(origin); .flat_map(|origin| {
} self.push
} .prune_received_cache(&self_pubkey, &origin, stakes)
prune_map .into_iter()
.zip(std::iter::repeat(origin))
})
.into_group_map()
} }
pub fn new_push_messages( pub fn new_push_messages(

View File

@ -53,7 +53,10 @@ pub struct CrdsGossipPush {
/// bool indicates it has been pruned. /// bool indicates it has been pruned.
/// This cache represents a lagging view of which validators /// This cache represents a lagging view of which validators
/// currently have this node in their `active_set` /// currently have this node in their `active_set`
received_cache: HashMap<Pubkey, HashMap<Pubkey, (bool, u64)>>, received_cache: HashMap<
Pubkey, // origin/owner
HashMap</*gossip peer:*/ Pubkey, (/*pruned:*/ bool, /*timestamp:*/ u64)>,
>,
last_pushed_to: LruCache<Pubkey, u64>, last_pushed_to: LruCache<Pubkey, u64>,
pub num_active: usize, pub num_active: usize,
pub push_fanout: usize, pub push_fanout: usize,
@ -102,67 +105,58 @@ impl CrdsGossipPush {
) -> Vec<Pubkey> { ) -> Vec<Pubkey> {
let origin_stake = stakes.get(origin).unwrap_or(&0); let origin_stake = stakes.get(origin).unwrap_or(&0);
let self_stake = stakes.get(self_pubkey).unwrap_or(&0); let self_stake = stakes.get(self_pubkey).unwrap_or(&0);
let cache = self.received_cache.get(origin); let peers = match self.received_cache.get_mut(origin) {
if cache.is_none() { None => return Vec::default(),
return Vec::new(); Some(peers) => peers,
} };
let peers = cache.unwrap();
let peer_stake_total: u64 = peers let peer_stake_total: u64 = peers
.iter() .iter()
.filter(|v| !(v.1).0) .filter(|(_, (pruned, _))| !pruned)
.map(|v| stakes.get(v.0).unwrap_or(&0)) .filter_map(|(peer, _)| stakes.get(peer))
.sum(); .sum();
let prune_stake_threshold = Self::prune_stake_threshold(*self_stake, *origin_stake); let prune_stake_threshold = Self::prune_stake_threshold(*self_stake, *origin_stake);
if peer_stake_total < prune_stake_threshold { if peer_stake_total < prune_stake_threshold {
return Vec::new(); return Vec::new();
} }
let shuffled_staked_peers = {
let staked_peers: Vec<(Pubkey, u64)> = peers let peers: Vec<_> = peers
.iter() .iter()
.filter(|v| !(v.1).0) .filter(|(_, (pruned, _))| !pruned)
.filter_map(|p| stakes.get(p.0).map(|s| (*p.0, *s))) .filter_map(|(peer, _)| Some((*peer, *stakes.get(peer)?)))
.filter(|(_, s)| *s > 0) .filter(|(_, stake)| *stake > 0)
.collect(); .collect();
let mut seed = [0; 32]; let mut seed = [0; 32];
rand::thread_rng().fill(&mut seed[..]); rand::thread_rng().fill(&mut seed[..]);
let shuffle = weighted_shuffle( let weights: Vec<_> = peers.iter().map(|(_, stake)| *stake).collect();
&staked_peers.iter().map(|(_, stake)| *stake).collect_vec(), weighted_shuffle(&weights, seed)
seed, .into_iter()
); .map(move |i| peers[i])
};
let mut keep = HashSet::new(); let mut keep = HashSet::new();
let mut peer_stake_sum = 0; let mut peer_stake_sum = 0;
keep.insert(*origin); keep.insert(*origin);
for next in shuffle { for (peer, stake) in shuffled_staked_peers {
let (next_peer, next_stake) = staked_peers[next]; if peer == *origin {
if next_peer == *origin {
continue; continue;
} }
keep.insert(next_peer); keep.insert(peer);
peer_stake_sum += next_stake; peer_stake_sum += stake;
if peer_stake_sum >= prune_stake_threshold if peer_stake_sum >= prune_stake_threshold
&& keep.len() >= CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES && keep.len() >= CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES
{ {
break; break;
} }
} }
for (peer, (pruned, _)) in peers.iter_mut() {
let pruned_peers: Vec<Pubkey> = peers if !*pruned && !keep.contains(peer) {
*pruned = true;
}
}
peers
.keys() .keys()
.filter(|p| !keep.contains(p)) .filter(|peer| !keep.contains(peer))
.cloned() .copied()
.collect(); .collect()
pruned_peers.iter().for_each(|p| {
self.received_cache
.get_mut(origin)
.unwrap()
.get_mut(p)
.unwrap()
.0 = true;
});
pruned_peers
} }
fn wallclock_window(&self, now: u64) -> impl RangeBounds<u64> { fn wallclock_window(&self, now: u64) -> impl RangeBounds<u64> {

View File

@ -357,15 +357,11 @@ fn network_run_push(
}) })
.unwrap(); .unwrap();
let updated_labels: Vec<_> = let origins: HashSet<_> =
updated.into_iter().map(|u| u.value.label()).collect(); updated.into_iter().map(|u| u.value.pubkey()).collect();
let prunes_map = network let prunes_map = network
.get(&to) .get(&to)
.map(|node| { .map(|node| node.lock().unwrap().prune_received_cache(origins, &stakes))
node.lock()
.unwrap()
.prune_received_cache(updated_labels, &stakes)
})
.unwrap(); .unwrap();
for (from, prune_set) in prunes_map { for (from, prune_set) in prunes_map {