* limits number of crds values associated with a pubkey (#14467)
(cherry picked from commit 766195dded)
* updates smallvec
Co-authored-by: behzad nouri <behzadnouri@gmail.com>
			
			
This commit is contained in:
		
							
								
								
									
										1
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										1
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							| @@ -4026,6 +4026,7 @@ dependencies = [ | ||||
|  "regex", | ||||
|  "reqwest", | ||||
|  "rustc_version", | ||||
|  "rustversion", | ||||
|  "serde", | ||||
|  "serde_bytes", | ||||
|  "serde_derive", | ||||
|   | ||||
| @@ -45,6 +45,7 @@ rand_chacha = "0.2.2" | ||||
| raptorq = "1.4.2" | ||||
| rayon = "1.4.1" | ||||
| regex = "1.3.9" | ||||
| rustversion = "1.0.4" | ||||
| serde = "1.0.112" | ||||
| serde_bytes = "0.11" | ||||
| serde_derive = "1.0.103" | ||||
|   | ||||
							
								
								
									
										101
									
								
								core/src/crds.rs
									
									
									
									
									
								
							
							
						
						
									
										101
									
								
								core/src/crds.rs
									
									
									
									
									
								
							| @@ -40,6 +40,11 @@ use std::collections::{hash_map, HashMap}; | ||||
| use std::ops::{Index, IndexMut}; | ||||
|  | ||||
| const CRDS_SHARDS_BITS: u32 = 8; | ||||
| // Limit number of crds values associated with each unique pubkey. This | ||||
| // excludes crds values which by label design are limited per each pubkey. | ||||
| // TODO: Find the right value for this once duplicate shreds and corresponding | ||||
| // votes are broadcasted over gossip. | ||||
| const MAX_CRDS_VALUES_PER_PUBKEY: usize = 512; | ||||
|  | ||||
| #[derive(Clone)] | ||||
| pub struct Crds { | ||||
| @@ -275,22 +280,59 @@ impl Crds { | ||||
|         now: u64, | ||||
|         timeouts: &HashMap<Pubkey, u64>, | ||||
|     ) -> Vec<CrdsValueLabel> { | ||||
|         // TODO: need custom logic for purging duplicate shreds. | ||||
|         #[rustversion::before(1.49.0)] | ||||
|         fn select_nth<T: Ord>(xs: &mut Vec<T>, _nth: usize) { | ||||
|             xs.sort_unstable(); | ||||
|         } | ||||
|         #[rustversion::since(1.49.0)] | ||||
|         fn select_nth<T: Ord>(xs: &mut Vec<T>, nth: usize) { | ||||
|             xs.select_nth_unstable(nth); | ||||
|         } | ||||
|         let default_timeout = *timeouts | ||||
|             .get(&Pubkey::default()) | ||||
|             .expect("must have default timeout"); | ||||
|         thread_pool.install(|| { | ||||
|             self.table | ||||
|                 .par_iter() | ||||
|                 .with_min_len(1024) | ||||
|                 .filter_map(|(k, v)| { | ||||
|                     let timeout = timeouts.get(&k.pubkey()).unwrap_or(&default_timeout); | ||||
|                     if v.local_timestamp.saturating_add(*timeout) <= now { | ||||
|                         Some(k.clone()) | ||||
|                     } else { | ||||
|         // Given an index of all crd values associated with a pubkey, | ||||
|         // returns crds labels of old values to be evicted. | ||||
|         let evict = |pubkey, index: &IndexSet<usize>| { | ||||
|             let timeout = *timeouts.get(pubkey).unwrap_or(&default_timeout); | ||||
|             let mut old_labels = Vec::new(); | ||||
|             // Buffer of crds values to be evicted based on their wallclock. | ||||
|             let mut recent_unlimited_labels: Vec<(u64 /*wallclock*/, usize /*index*/)> = index | ||||
|                 .into_iter() | ||||
|                 .filter_map(|ix| { | ||||
|                     let (label, value) = self.table.get_index(*ix).unwrap(); | ||||
|                     if value.local_timestamp.saturating_add(timeout) <= now { | ||||
|                         old_labels.push(label.clone()); | ||||
|                         None | ||||
|                     } else { | ||||
|                         match label.value_space() { | ||||
|                             Some(_) => None, | ||||
|                             None => Some((value.value.wallclock(), *ix)), | ||||
|                         } | ||||
|                     } | ||||
|                 }) | ||||
|                 .collect(); | ||||
|             // Number of values to discard from the buffer: | ||||
|             let nth = recent_unlimited_labels | ||||
|                 .len() | ||||
|                 .saturating_sub(MAX_CRDS_VALUES_PER_PUBKEY); | ||||
|             // Partition on wallclock to discard the older ones. | ||||
|             if nth > 0 && nth < recent_unlimited_labels.len() { | ||||
|                 select_nth(&mut recent_unlimited_labels, nth); | ||||
|             } | ||||
|             old_labels.extend( | ||||
|                 recent_unlimited_labels | ||||
|                     .split_at(nth) | ||||
|                     .0 | ||||
|                     .iter() | ||||
|                     .map(|(_ /*wallclock*/, ix)| self.table.get_index(*ix).unwrap().0.clone()), | ||||
|             ); | ||||
|             old_labels | ||||
|         }; | ||||
|         thread_pool.install(|| { | ||||
|             self.records | ||||
|                 .par_iter() | ||||
|                 .flat_map(|(pubkey, index)| evict(pubkey, index)) | ||||
|                 .collect() | ||||
|         }) | ||||
|     } | ||||
| @@ -350,10 +392,10 @@ impl Crds { | ||||
| #[cfg(test)] | ||||
| mod test { | ||||
|     use super::*; | ||||
|     use crate::contact_info::ContactInfo; | ||||
|     use crate::{contact_info::ContactInfo, crds_value::NodeInstance}; | ||||
|     use rand::{thread_rng, Rng}; | ||||
|     use rayon::ThreadPoolBuilder; | ||||
|     use std::iter::repeat_with; | ||||
|     use std::{collections::HashSet, iter::repeat_with}; | ||||
|  | ||||
|     #[test] | ||||
|     fn test_insert() { | ||||
| @@ -467,6 +509,41 @@ mod test { | ||||
|             vec![val.label()] | ||||
|         ); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn test_find_old_records_unlimited() { | ||||
|         let thread_pool = ThreadPoolBuilder::new().build().unwrap(); | ||||
|         let mut rng = thread_rng(); | ||||
|         let now = 1_610_034_423_000; | ||||
|         let pubkey = Pubkey::new_unique(); | ||||
|         let mut crds = Crds::default(); | ||||
|         let mut timeouts = HashMap::new(); | ||||
|         timeouts.insert(Pubkey::default(), 1); | ||||
|         timeouts.insert(pubkey, 180); | ||||
|         for _ in 0..1024 { | ||||
|             let wallclock = now - rng.gen_range(0, 240); | ||||
|             let val = NodeInstance::new(&mut rng, pubkey, wallclock); | ||||
|             let val = CrdsData::NodeInstance(val); | ||||
|             let val = CrdsValue::new_unsigned(val); | ||||
|             assert_eq!(crds.insert(val, now), Ok(None)); | ||||
|         } | ||||
|         let now = now + 1; | ||||
|         let labels = crds.find_old_labels(&thread_pool, now, &timeouts); | ||||
|         assert_eq!(crds.table.len() - labels.len(), MAX_CRDS_VALUES_PER_PUBKEY); | ||||
|         let max_wallclock = labels | ||||
|             .iter() | ||||
|             .map(|label| crds.lookup(label).unwrap().wallclock()) | ||||
|             .max() | ||||
|             .unwrap(); | ||||
|         assert!(max_wallclock > now - 180); | ||||
|         let labels: HashSet<_> = labels.into_iter().collect(); | ||||
|         for (label, value) in crds.table.iter() { | ||||
|             if !labels.contains(label) { | ||||
|                 assert!(max_wallclock <= value.value.wallclock()); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn test_remove_default() { | ||||
|         let thread_pool = ThreadPoolBuilder::new().build().unwrap(); | ||||
|   | ||||
| @@ -428,6 +428,23 @@ impl CrdsValueLabel { | ||||
|             CrdsValueLabel::DuplicateShred(_, p) => *p, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /// Returns number of possible distinct labels of the same type for | ||||
|     /// a fixed pubkey, and None if that is practically unlimited. | ||||
|     pub(crate) fn value_space(&self) -> Option<usize> { | ||||
|         match self { | ||||
|             CrdsValueLabel::ContactInfo(_) => Some(1), | ||||
|             CrdsValueLabel::Vote(_, _) => Some(MAX_VOTES as usize), | ||||
|             CrdsValueLabel::LowestSlot(_) => Some(1), | ||||
|             CrdsValueLabel::SnapshotHashes(_) => Some(1), | ||||
|             CrdsValueLabel::EpochSlots(_, _) => Some(MAX_EPOCH_SLOTS as usize), | ||||
|             CrdsValueLabel::AccountsHashes(_) => Some(1), | ||||
|             CrdsValueLabel::LegacyVersion(_) => Some(1), | ||||
|             CrdsValueLabel::Version(_) => Some(1), | ||||
|             CrdsValueLabel::NodeInstance(_, _) => None, | ||||
|             CrdsValueLabel::DuplicateShred(_, _) => None, | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl CrdsValue { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user