diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 2db3ceedfd..47ec43e3fb 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -114,6 +114,8 @@ pub const DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS: u64 = 10_000; pub const DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS: u64 = 60_000; /// Minimum serialized size of a Protocol::PullResponse packet. const PULL_RESPONSE_MIN_SERIALIZED_SIZE: usize = 161; +// Limit number of unique pubkeys in the crds table. +const CRDS_UNIQUE_PUBKEY_CAPACITY: usize = 4096; #[derive(Debug, PartialEq, Eq)] pub enum ClusterInfoError { @@ -285,6 +287,8 @@ struct GossipStats { prune_message_len: Counter, pull_request_ping_pong_check_failed_count: Counter, purge: Counter, + trim_crds_table_failed: Counter, + trim_crds_table_purged_values_count: Counter, epoch_slots_lookup: Counter, new_pull_requests: Counter, new_pull_requests_count: Counter, @@ -603,16 +607,6 @@ impl ClusterInfo { self.contact_debug_interval = new; } - pub fn update_contact_info(&self, modify: F) - where - F: FnOnce(&mut ContactInfo), - { - let my_id = self.id(); - modify(&mut self.my_contact_info.write().unwrap()); - assert_eq!(self.my_contact_info.read().unwrap().id, my_id); - self.insert_self() - } - fn push_self( &self, stakes: &HashMap, @@ -1726,6 +1720,7 @@ impl ClusterInfo { stakes: &HashMap, generate_pull_requests: bool, ) -> Vec<(SocketAddr, Protocol)> { + self.trim_crds_table(CRDS_UNIQUE_PUBKEY_CAPACITY, &stakes); let mut pulls: Vec<_> = if generate_pull_requests { self.new_pull_requests(&thread_pool, gossip_validators, stakes) } else { @@ -1839,6 +1834,39 @@ impl ClusterInfo { inc_new_counter_info!("cluster_info-purge-count", num_purged); } + // Trims the CRDS table by dropping all values associated with the pubkeys + // with the lowest stake, so that the number of unique pubkeys are bounded. + fn trim_crds_table(&self, cap: usize, stakes: &HashMap) { + if !self.gossip.read().unwrap().crds.should_trim(cap) { + return; + } + let keep: Vec<_> = self + .entrypoints + .read() + .unwrap() + .iter() + .map(|k| k.id) + .chain(std::iter::once(self.id)) + .collect(); + let mut gossip = self.gossip.write().unwrap(); + match gossip.crds.trim(cap, &keep, stakes) { + Err(err) => { + self.stats.trim_crds_table_failed.add_relaxed(1); + error!("crds table trim failed: {:?}", err); + } + Ok(purged_values) => { + self.stats + .trim_crds_table_purged_values_count + .add_relaxed(purged_values.len() as u64); + gossip.pull.purged_values.extend( + purged_values + .into_iter() + .map(|v| (v.value_hash, v.local_timestamp)), + ); + } + } + } + /// randomly pick a node and ask them for updates asynchronously pub fn gossip( self: Arc, @@ -2663,6 +2691,7 @@ impl ClusterInfo { response_sender, ); self.handle_batch_pull_responses(pull_responses, thread_pool, &stakes, epoch_time_ms); + self.trim_crds_table(CRDS_UNIQUE_PUBKEY_CAPACITY, &stakes); self.handle_batch_pong_messages(pong_messages, Instant::now()); self.handle_batch_pull_requests( pull_requests, @@ -3010,6 +3039,16 @@ impl ClusterInfo { self.stats.packets_sent_push_messages_count.clear(), i64 ), + ( + "trim_crds_table_failed", + self.stats.trim_crds_table_failed.clear(), + i64 + ), + ( + "trim_crds_table_purged_values_count", + self.stats.trim_crds_table_purged_values_count.clear(), + i64 + ), ); *last_print = Instant::now(); @@ -3718,40 +3757,6 @@ mod tests { .lookup(&label) .is_some()); } - #[test] - #[should_panic] - fn test_update_contact_info() { - let d = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp()); - let cluster_info = ClusterInfo::new_with_invalid_keypair(d); - let entry_label = CrdsValueLabel::ContactInfo(cluster_info.id()); - assert!(cluster_info - .gossip - .read() - .unwrap() - .crds - .lookup(&entry_label) - .is_some()); - - let now = timestamp(); - cluster_info.update_contact_info(|ci| ci.wallclock = now); - assert_eq!( - cluster_info - .gossip - .read() - .unwrap() - .crds - .lookup(&entry_label) - .unwrap() - .contact_info() - .unwrap() - .wallclock, - now - ); - - // Inserting Contactinfo with different pubkey should panic, - // and update should fail - cluster_info.update_contact_info(|ci| ci.id = solana_sdk::pubkey::new_rand()) - } fn assert_in_range(x: u16, range: (u16, u16)) { assert!(x >= range.0); diff --git a/core/src/cluster_slots.rs b/core/src/cluster_slots.rs index e2d2e06d74..77adc37177 100644 --- a/core/src/cluster_slots.rs +++ b/core/src/cluster_slots.rs @@ -78,9 +78,10 @@ impl ClusterSlots { { let mut cluster_slots = self.cluster_slots.write().unwrap(); *cluster_slots = cluster_slots.split_off(&(root + 1)); - // Trimming is done at 2x size so that amortized it has a constant - // cost. The slots furthest away from the root are discarded. - if cluster_slots.len() > 2 * CLUSTER_SLOTS_TRIM_SIZE { + // Allow 10% overshoot so that the computation cost is amortized + // down. The slots furthest away from the root are discarded. + if 10 * cluster_slots.len() > 11 * CLUSTER_SLOTS_TRIM_SIZE { + warn!("trimming cluster slots"); let key = *cluster_slots.keys().nth(CLUSTER_SLOTS_TRIM_SIZE).unwrap(); cluster_slots.split_off(&key); } diff --git a/core/src/crds.rs b/core/src/crds.rs index b3ef01294a..0cdef2d1ef 100644 --- a/core/src/crds.rs +++ b/core/src/crds.rs @@ -61,6 +61,7 @@ pub struct Crds { #[derive(PartialEq, Debug)] pub enum CrdsError { InsertFailed, + UnknownStakes, } /// This structure stores some local metadata associated with the CrdsValue @@ -425,6 +426,62 @@ impl Crds { } Some(value) } + + /// Returns true if the number of unique pubkeys in the table exceeds the + /// given capacity (plus some margin). + /// Allows skipping unnecessary calls to trim without obtaining a write + /// lock on gossip. + pub(crate) fn should_trim(&self, cap: usize) -> bool { + // Allow 10% overshoot so that the computation cost is amortized down. + 10 * self.records.len() > 11 * cap + } + + /// Trims the table by dropping all values associated with the pubkeys with + /// the lowest stake, so that the number of unique pubkeys are bounded. + pub(crate) fn trim( + &mut self, + cap: usize, // Capacity hint for number of unique pubkeys. + // Set of pubkeys to never drop. + // e.g. trusted validators, self pubkey, ... + keep: &[Pubkey], + stakes: &HashMap, + ) -> Result, CrdsError> { + if self.should_trim(cap) { + let size = self.records.len().saturating_sub(cap); + self.drop(size, keep, stakes) + } else { + Ok(Vec::default()) + } + } + + // Drops 'size' many pubkeys with the lowest stake. + fn drop( + &mut self, + size: usize, + keep: &[Pubkey], + stakes: &HashMap, + ) -> Result, CrdsError> { + if stakes.is_empty() { + return Err(CrdsError::UnknownStakes); + } + let mut keys: Vec<_> = self + .records + .keys() + .map(|k| (stakes.get(k).copied().unwrap_or_default(), *k)) + .collect(); + if size < keys.len() { + keys.select_nth_unstable(size); + } + let keys: Vec<_> = keys + .into_iter() + .take(size) + .map(|(_, k)| k) + .filter(|k| !keep.contains(k)) + .flat_map(|k| &self.records[&k]) + .map(|k| self.table.get_index(*k).unwrap().0.clone()) + .collect(); + Ok(keys.iter().map(|k| self.remove(k).unwrap()).collect()) + } } #[cfg(test)] @@ -433,6 +490,7 @@ mod test { use crate::{contact_info::ContactInfo, crds_value::NodeInstance}; use rand::{thread_rng, Rng}; use rayon::ThreadPoolBuilder; + use solana_sdk::signature::Signer; use std::{collections::HashSet, iter::repeat_with}; #[test] @@ -813,6 +871,56 @@ mod test { assert!(crds.records.is_empty()); } + #[test] + fn test_drop() { + fn num_unique_pubkeys<'a, I>(values: I) -> usize + where + I: IntoIterator, + { + values + .into_iter() + .map(|v| v.value.pubkey()) + .collect::>() + .len() + } + let mut rng = thread_rng(); + let keypairs: Vec<_> = repeat_with(Keypair::new).take(64).collect(); + let stakes = keypairs + .iter() + .map(|k| (k.pubkey(), rng.gen_range(0, 1000))) + .collect(); + let mut crds = Crds::default(); + for _ in 0..2048 { + let keypair = &keypairs[rng.gen_range(0, keypairs.len())]; + let value = VersionedCrdsValue::new_rand(&mut rng, Some(keypair)); + let _ = crds.insert_versioned(value); + } + let num_values = crds.table.len(); + let num_pubkeys = num_unique_pubkeys(crds.table.values()); + assert!(!crds.should_trim(num_pubkeys)); + assert!(crds.should_trim(num_pubkeys * 5 / 6)); + let purged = crds.drop(16, &[], &stakes).unwrap(); + assert_eq!(purged.len() + crds.table.len(), num_values); + assert_eq!(num_unique_pubkeys(&purged), 16); + assert_eq!(num_unique_pubkeys(crds.table.values()), num_pubkeys - 16); + let attach_stake = |v: &VersionedCrdsValue| { + let pk = v.value.pubkey(); + (stakes[&pk], pk) + }; + assert!( + purged.iter().map(attach_stake).max().unwrap() + < crds.table.values().map(attach_stake).min().unwrap() + ); + let purged = purged + .into_iter() + .map(|v| v.value.pubkey()) + .collect::>(); + for (k, v) in crds.table { + assert!(!purged.contains(&k.pubkey())); + assert!(!purged.contains(&v.value.pubkey())); + } + } + #[test] fn test_remove_staked() { let thread_pool = ThreadPoolBuilder::new().build().unwrap();