diff --git a/core/src/crds_gossip_push.rs b/core/src/crds_gossip_push.rs index 3abe0e58f2..4be6f0f877 100644 --- a/core/src/crds_gossip_push.rs +++ b/core/src/crds_gossip_push.rs @@ -37,6 +37,9 @@ pub const CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS: u64 = 500; pub const CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT: f64 = 0.15; pub const CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES: usize = 2; +// 10 minutes +const MAX_PUSHED_TO_TIMEOUT_MS: u64 = 10 * 60 * 1000; + #[derive(Clone)] pub struct CrdsGossipPush { /// max bytes per message @@ -50,6 +53,8 @@ pub struct CrdsGossipPush { /// This cache represents a lagging view of which validators /// currently have this node in their `active_set` received_cache: HashMap>, + last_pushed_to: HashMap, + last_pushed_to_cleanup_ts: u64, pub num_active: usize, pub push_fanout: usize, pub msg_timeout: u64, @@ -67,6 +72,8 @@ impl Default for CrdsGossipPush { active_set: IndexMap::new(), push_messages: HashMap::new(), received_cache: HashMap::new(), + last_pushed_to: HashMap::new(), + last_pushed_to_cleanup_ts: 0, num_active: CRDS_GOSSIP_NUM_ACTIVE, push_fanout: CRDS_GOSSIP_PUSH_FANOUT, msg_timeout: CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS, @@ -254,6 +261,15 @@ impl CrdsGossipPush { self.push_messages.remove(&v.label()); } } + + for target_pubkey in push_messages.keys() { + *self.last_pushed_to.entry(*target_pubkey).or_insert(0) = now; + } + if now - self.last_pushed_to_cleanup_ts > MAX_PUSHED_TO_TIMEOUT_MS { + self.last_pushed_to + .retain(|_id, timestamp| now - *timestamp > MAX_PUSHED_TO_TIMEOUT_MS); + self.last_pushed_to_cleanup_ts = now; + } push_messages } @@ -357,10 +373,10 @@ impl CrdsGossipPush { gossip_validators.contains(&info.id) }) }) - .map(|(info, value)| { + .map(|(info, _value)| { let max_weight = f32::from(u16::max_value()) - 1.0; - let last_updated: u64 = value.local_timestamp; - let since = ((timestamp() - last_updated) / 1024) as u32; + let last_pushed_to: u64 = *self.last_pushed_to.get(&info.id).unwrap_or(&0); + let since = ((timestamp() - last_pushed_to) / 1024) as u32; let stake = get_stake(&info.id, stakes); let weight = get_weight(max_weight, since, stake); (weight, info) @@ -576,9 +592,10 @@ mod test { } #[test] fn test_active_set_refresh_with_bank() { + solana_logger::setup(); let time = timestamp() - 1024; //make sure there's at least a 1 second delay let mut crds = Crds::default(); - let push = CrdsGossipPush::default(); + let mut push = CrdsGossipPush::default(); let mut stakes = HashMap::new(); for i in 1..=100 { let peer = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( @@ -588,6 +605,7 @@ mod test { let id = peer.label().pubkey(); crds.insert(peer.clone(), time).unwrap(); stakes.insert(id, i * 100); + push.last_pushed_to.insert(id, time); } let mut options = push.push_options(&crds, &Pubkey::default(), 0, &stakes, None); assert!(!options.is_empty());