diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index b1cc79fbc4..c060d5246e 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -115,7 +115,7 @@ pub const DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS: u64 = 60_000; /// Minimum serialized size of a Protocol::PullResponse packet. const PULL_RESPONSE_MIN_SERIALIZED_SIZE: usize = 161; // Limit number of unique pubkeys in the crds table. -const CRDS_UNIQUE_PUBKEY_CAPACITY: usize = 4096; +pub(crate) const CRDS_UNIQUE_PUBKEY_CAPACITY: usize = 4096; /// Minimum stake that a node should have so that its CRDS values are /// propagated through gossip (few types are exempted). const MIN_STAKE_FOR_GOSSIP: u64 = solana_sdk::native_token::LAMPORTS_PER_SOL; @@ -3481,6 +3481,7 @@ mod tests { }; use itertools::izip; use rand::seq::SliceRandom; + use serial_test::serial; use solana_ledger::shred::Shredder; use solana_sdk::signature::{Keypair, Signer}; use solana_vote_program::{vote_instruction, vote_state::Vote}; @@ -4757,4 +4758,54 @@ mod tests { } } } + + #[test] + #[serial] + fn test_pull_request_time_pruning() { + let node = Node::new_localhost(); + let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(node.info)); + let entrypoint_pubkey = solana_sdk::pubkey::new_rand(); + let entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp()); + cluster_info.set_entrypoint(entrypoint); + + let mut rng = rand::thread_rng(); + let shred_version = cluster_info.my_shred_version(); + let mut peers: Vec = vec![]; + + const NO_ENTRIES: usize = 20000; + let data: Vec<_> = repeat_with(|| { + let keypair = Keypair::new(); + peers.push(keypair.pubkey()); + let mut rand_ci = ContactInfo::new_rand(&mut rng, Some(keypair.pubkey())); + rand_ci.shred_version = shred_version; + rand_ci.wallclock = timestamp(); + CrdsValue::new_signed(CrdsData::ContactInfo(rand_ci), &keypair) + }) + .take(NO_ENTRIES) + .collect(); + let timeouts = cluster_info.gossip.read().unwrap().make_timeouts_test(); + assert_eq!( + (0, 0, NO_ENTRIES), + cluster_info.handle_pull_response(&entrypoint_pubkey, data, &timeouts) + ); + + let now = timestamp(); + for peer in peers { + cluster_info + .gossip + .write() + .unwrap() + .mark_pull_request_creation_time(&peer, now); + } + assert_eq!( + cluster_info + .gossip + .read() + .unwrap() + .pull + .pull_request_time + .len(), + CRDS_UNIQUE_PUBKEY_CAPACITY + ); + } } diff --git a/core/src/crds.rs b/core/src/crds.rs index 18a0dd6d64..31daecc03b 100644 --- a/core/src/crds.rs +++ b/core/src/crds.rs @@ -264,6 +264,11 @@ impl Crds { .map(move |i| self.table.index(*i)) } + /// Returns number of known pubkeys (network size). + pub(crate) fn num_nodes(&self) -> usize { + self.records.len() + } + pub fn len(&self) -> usize { self.table.len() } diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index ab0399ca0d..d7a45dc524 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -207,7 +207,7 @@ impl CrdsGossip { gossip_validators, &self.id, self.shred_version, - self.pull.pull_request_time.len(), + self.crds.num_nodes(), CRDS_GOSSIP_NUM_ACTIVE, ) } @@ -341,7 +341,7 @@ impl CrdsGossip { Self { crds: self.crds.clone(), push: self.push.mock_clone(), - pull: self.pull.clone(), + pull: self.pull.mock_clone(), ..*self } } diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 0ad6d045e3..c9ada0d5a8 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -9,12 +9,16 @@ //! with random hash functions. So each subsequent request will have a different distribution //! of false positives. -use crate::contact_info::ContactInfo; -use crate::crds::{Crds, VersionedCrdsValue}; -use crate::crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS}; -use crate::crds_gossip_error::CrdsGossipError; -use crate::crds_value::{CrdsValue, CrdsValueLabel}; +use crate::{ + cluster_info::CRDS_UNIQUE_PUBKEY_CAPACITY, + contact_info::ContactInfo, + crds::{Crds, VersionedCrdsValue}, + crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS}, + crds_gossip_error::CrdsGossipError, + crds_value::{CrdsValue, CrdsValueLabel}, +}; use itertools::Itertools; +use lru::LruCache; use rand::distributions::{Distribution, WeightedIndex}; use rand::Rng; use rayon::{prelude::*, ThreadPool}; @@ -168,10 +172,9 @@ pub struct ProcessPullStats { pub timeout_count: usize, } -#[derive(Clone)] pub struct CrdsGossipPull { /// timestamp of last request - pub pull_request_time: HashMap, + pub(crate) pull_request_time: LruCache, /// hash and insert time pub purged_values: VecDeque<(Hash, u64)>, // Hash value and record time (ms) of the pull responses which failed to be @@ -188,7 +191,7 @@ impl Default for CrdsGossipPull { fn default() -> Self { Self { purged_values: VecDeque::new(), - pull_request_time: HashMap::new(), + pull_request_time: LruCache::new(CRDS_UNIQUE_PUBKEY_CAPACITY), failed_inserts: VecDeque::new(), crds_timeout: CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, msg_timeout: CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, @@ -263,8 +266,12 @@ impl CrdsGossipPull { }) .map(|item| { let max_weight = f32::from(u16::max_value()) - 1.0; - let req_time: u64 = *self.pull_request_time.get(&item.id).unwrap_or(&0); - let since = ((now - req_time) / 1024) as u32; + let req_time: u64 = self + .pull_request_time + .peek(&item.id) + .copied() + .unwrap_or_default(); + let since = (now.saturating_sub(req_time).min(3600 * 1000) / 1024) as u32; let stake = get_stake(&item.id, stakes); let weight = get_weight(max_weight, since, stake); (weight, item) @@ -277,7 +284,7 @@ impl CrdsGossipPull { /// It's important to use the local nodes request creation time as the weight /// instead of the response received time otherwise failed nodes will increase their weight. pub fn mark_pull_request_creation_time(&mut self, from: &Pubkey, now: u64) { - self.pull_request_time.insert(*from, now); + self.pull_request_time.put(*from, now); } /// Store an old hash in the purged values set @@ -606,6 +613,20 @@ impl CrdsGossipPull { stats.success, ) } + + // Only for tests and simulations. + pub(crate) fn mock_clone(&self) -> Self { + let mut pull_request_time = LruCache::new(self.pull_request_time.cap()); + for (k, v) in self.pull_request_time.iter().rev() { + pull_request_time.put(*k, *v); + } + Self { + pull_request_time, + purged_values: self.purged_values.clone(), + failed_inserts: self.failed_inserts.clone(), + ..*self + } + } } #[cfg(test)] mod test { @@ -617,8 +638,12 @@ mod test { use rand::thread_rng; use rayon::ThreadPoolBuilder; use solana_perf::test_tx::test_tx; - use solana_sdk::hash::{hash, HASH_BYTES}; - use solana_sdk::packet::PACKET_DATA_SIZE; + use solana_sdk::{ + hash::{hash, HASH_BYTES}, + packet::PACKET_DATA_SIZE, + timing::timestamp, + }; + use std::iter::repeat_with; #[test] fn test_hash_as_u64() { @@ -1009,6 +1034,41 @@ mod test { } } + #[test] + fn test_pull_request_time() { + const NUM_REPS: usize = 2 * CRDS_UNIQUE_PUBKEY_CAPACITY; + let mut rng = rand::thread_rng(); + let pubkeys: Vec<_> = repeat_with(Pubkey::new_unique).take(NUM_REPS).collect(); + let mut node = CrdsGossipPull::default(); + let mut requests = HashMap::new(); + let now = timestamp(); + for k in 0..NUM_REPS { + let pubkey = pubkeys[rng.gen_range(0, pubkeys.len())]; + let now = now + k as u64; + node.mark_pull_request_creation_time(&pubkey, now); + *requests.entry(pubkey).or_default() = now; + } + assert!(node.pull_request_time.len() <= CRDS_UNIQUE_PUBKEY_CAPACITY); + // Assert that timestamps match most recent request. + for (pk, ts) in &node.pull_request_time { + assert_eq!(*ts, requests[pk]); + } + // Assert that most recent pull timestamps are maintained. + let max_ts = requests + .iter() + .filter(|(pk, _)| !node.pull_request_time.contains(*pk)) + .map(|(_, ts)| *ts) + .max() + .unwrap(); + let min_ts = requests + .iter() + .filter(|(pk, _)| node.pull_request_time.contains(*pk)) + .map(|(_, ts)| *ts) + .min() + .unwrap(); + assert!(max_ts <= min_ts); + } + #[test] fn test_generate_pull_responses() { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); diff --git a/core/src/crds_gossip_push.rs b/core/src/crds_gossip_push.rs index 64a09875b3..4495aa4e2a 100644 --- a/core/src/crds_gossip_push.rs +++ b/core/src/crds_gossip_push.rs @@ -9,6 +9,7 @@ //! 2. The prune set is stored in a Bloom filter. use crate::{ + cluster_info::CRDS_UNIQUE_PUBKEY_CAPACITY, contact_info::ContactInfo, crds::{Crds, VersionedCrdsValue}, crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS}, @@ -19,6 +20,7 @@ use crate::{ use bincode::serialized_size; use indexmap::map::IndexMap; use itertools::Itertools; +use lru::LruCache; use rand::{seq::SliceRandom, Rng}; use solana_runtime::bloom::{AtomicBloom, Bloom}; use solana_sdk::{hash::Hash, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::timestamp}; @@ -39,9 +41,6 @@ pub const CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES: usize = 3; // Do not push to peers which have not been updated for this long. const PUSH_ACTIVE_TIMEOUT_MS: u64 = 60_000; -// 10 minutes -const MAX_PUSHED_TO_TIMEOUT_MS: u64 = 10 * 60 * 1000; - pub struct CrdsGossipPush { /// max bytes per message pub max_bytes: usize, @@ -54,8 +53,7 @@ pub struct CrdsGossipPush { /// This cache represents a lagging view of which validators /// currently have this node in their `active_set` received_cache: HashMap>, - last_pushed_to: HashMap, - last_pushed_to_cleanup_ts: u64, + last_pushed_to: LruCache, pub num_active: usize, pub push_fanout: usize, pub msg_timeout: u64, @@ -73,8 +71,7 @@ impl Default for CrdsGossipPush { active_set: IndexMap::new(), push_messages: HashMap::new(), received_cache: HashMap::new(), - last_pushed_to: HashMap::new(), - last_pushed_to_cleanup_ts: 0, + last_pushed_to: LruCache::new(CRDS_UNIQUE_PUBKEY_CAPACITY), num_active: CRDS_GOSSIP_NUM_ACTIVE, push_fanout: CRDS_GOSSIP_PUSH_FANOUT, msg_timeout: CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS, @@ -269,13 +266,8 @@ impl CrdsGossipPush { for label in labels { self.push_messages.remove(&label); } - for target_pubkey in push_messages.keys() { - *self.last_pushed_to.entry(*target_pubkey).or_insert(0) = now; - } - if now - self.last_pushed_to_cleanup_ts > MAX_PUSHED_TO_TIMEOUT_MS { - self.last_pushed_to - .retain(|_id, timestamp| now - *timestamp > MAX_PUSHED_TO_TIMEOUT_MS); - self.last_pushed_to_cleanup_ts = now; + for target_pubkey in push_messages.keys().copied() { + self.last_pushed_to.put(target_pubkey, now); } push_messages } @@ -395,8 +387,12 @@ impl CrdsGossipPush { }) }) .map(|info| { - let last_pushed_to: u64 = *self.last_pushed_to.get(&info.id).unwrap_or(&0); - let since = (now.saturating_sub(last_pushed_to) / 1024) as u32; + let last_pushed_to = self + .last_pushed_to + .peek(&info.id) + .copied() + .unwrap_or_default(); + let since = (now.saturating_sub(last_pushed_to).min(3600 * 1000) / 1024) as u32; let stake = get_stake(&info.id, stakes); let weight = get_weight(max_weight, since, stake); (weight, info) @@ -423,15 +419,20 @@ impl CrdsGossipPush { // Only for tests and simulations. pub(crate) fn mock_clone(&self) -> Self { - let mut active_set = IndexMap::>::new(); - for (k, v) in &self.active_set { - active_set.insert(*k, v.mock_clone()); + let active_set = self + .active_set + .iter() + .map(|(k, v)| (*k, v.mock_clone())) + .collect(); + let mut last_pushed_to = LruCache::new(self.last_pushed_to.cap()); + for (k, v) in self.last_pushed_to.iter().rev() { + last_pushed_to.put(*k, *v); } Self { active_set, push_messages: self.push_messages.clone(), received_cache: self.received_cache.clone(), - last_pushed_to: self.last_pushed_to.clone(), + last_pushed_to, ..*self } } @@ -641,7 +642,7 @@ mod test { let id = peer.label().pubkey(); crds.insert(peer.clone(), time).unwrap(); stakes.insert(id, i * 100); - push.last_pushed_to.insert(id, time); + push.last_pushed_to.put(id, time); } let mut options = push.push_options(&crds, &Pubkey::default(), 0, &stakes, None); assert!(!options.is_empty());