diff --git a/Cargo.lock b/Cargo.lock index 4d04110b67..de1f2c7d86 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2512,7 +2512,7 @@ dependencies = [ "libc", "redox_syscall", "rustc_version", - "smallvec 0.6.13", + "smallvec 0.6.14", "winapi 0.3.8", ] @@ -2526,7 +2526,7 @@ dependencies = [ "cloudabi 0.0.3", "libc", "redox_syscall", - "smallvec 1.4.2", + "smallvec 1.6.1", "winapi 0.3.8", ] @@ -2541,7 +2541,7 @@ dependencies = [ "instant", "libc", "redox_syscall", - "smallvec 1.4.2", + "smallvec 1.6.1", "winapi 0.3.8", ] @@ -3053,7 +3053,7 @@ checksum = "a415a013dd7c5d4221382329a5a3482566da675737494935cbbbcdec04662f9d" dependencies = [ "cc", "libc", - "smallvec 1.4.2", + "smallvec 1.6.1", ] [[package]] @@ -3215,14 +3215,9 @@ dependencies = [ [[package]] name = "rustversion" -version = "1.0.3" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9bdc5e856e51e685846fb6c13a1f5e5432946c2c90501bdc76a1319f19e29da" -dependencies = [ - "proc-macro2 1.0.24", - "quote 1.0.6", - "syn 1.0.48", -] +checksum = "cb5d2a036dc6d2d8fd16fde3498b04306e29bd193bf306a57427019b823d5acd" [[package]] name = "ryu" @@ -3532,18 +3527,18 @@ checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" [[package]] name = "smallvec" -version = "0.6.13" +version = "0.6.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7b0758c52e15a8b5e3691eae6cc559f08eee9406e548a4477ba4e67770a82b6" +checksum = "b97fcaeba89edba30f044a10c6a3cc39df9c3f17d7cd829dd1446cab35f890e0" dependencies = [ "maybe-uninit", ] [[package]] name = "smallvec" -version = "1.4.2" +version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbee7696b84bbf3d89a1c2eccff0850e3047ed46bfcd2e92c29a2d074d57e252" +checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e" [[package]] name = "smpl_jwt" @@ -3972,6 +3967,7 @@ dependencies = [ "regex", "reqwest", "rustc_version", + "rustversion", "serde", "serde_derive", "serde_json", @@ -6253,7 +6249,7 @@ version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5479532badd04e128284890390c1e876ef7a993d0570b3597ae43dfa1d59afa4" dependencies = [ - "smallvec 1.4.2", + "smallvec 1.6.1", ] [[package]] diff --git a/core/Cargo.toml b/core/Cargo.toml index ee2e204901..fc2ac95b02 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -45,6 +45,7 @@ rand_chacha = "0.2.2" raptorq = "1.4.2" rayon = "1.4.1" regex = "1.3.9" +rustversion = "1.0.4" serde = "1.0.112" serde_derive = "1.0.103" serde_json = "1.0.56" diff --git a/core/src/crds.rs b/core/src/crds.rs index 1e785386a9..162bc1e1da 100644 --- a/core/src/crds.rs +++ b/core/src/crds.rs @@ -40,6 +40,11 @@ use std::collections::{hash_map, HashMap}; use std::ops::{Index, IndexMut}; const CRDS_SHARDS_BITS: u32 = 8; +// Limit number of crds values associated with each unique pubkey. This +// excludes crds values which by label design are limited per each pubkey. +// TODO: Find the right value for this once duplicate shreds and corresponding +// votes are broadcasted over gossip. +const MAX_CRDS_VALUES_PER_PUBKEY: usize = 512; #[derive(Clone)] pub struct Crds { @@ -267,21 +272,59 @@ impl Crds { now: u64, timeouts: &HashMap, ) -> Vec { + #[rustversion::before(1.49.0)] + fn select_nth(xs: &mut Vec, _nth: usize) { + xs.sort_unstable(); + } + #[rustversion::since(1.49.0)] + fn select_nth(xs: &mut Vec, nth: usize) { + xs.select_nth_unstable(nth); + } let default_timeout = *timeouts .get(&Pubkey::default()) .expect("must have default timeout"); - thread_pool.install(|| { - self.table - .par_iter() - .with_min_len(1024) - .filter_map(|(k, v)| { - let timeout = timeouts.get(&k.pubkey()).unwrap_or(&default_timeout); - if v.local_timestamp.saturating_add(*timeout) <= now { - Some(k.clone()) - } else { + // Given an index of all crd values associated with a pubkey, + // returns crds labels of old values to be evicted. + let evict = |pubkey, index: &IndexSet| { + let timeout = *timeouts.get(pubkey).unwrap_or(&default_timeout); + let mut old_labels = Vec::new(); + // Buffer of crds values to be evicted based on their wallclock. + let mut recent_unlimited_labels: Vec<(u64 /*wallclock*/, usize /*index*/)> = index + .into_iter() + .filter_map(|ix| { + let (label, value) = self.table.get_index(*ix).unwrap(); + if value.local_timestamp.saturating_add(timeout) <= now { + old_labels.push(label.clone()); None + } else { + match label.value_space() { + Some(_) => None, + None => Some((value.value.wallclock(), *ix)), + } } }) + .collect(); + // Number of values to discard from the buffer: + let nth = recent_unlimited_labels + .len() + .saturating_sub(MAX_CRDS_VALUES_PER_PUBKEY); + // Partition on wallclock to discard the older ones. + if nth > 0 && nth < recent_unlimited_labels.len() { + select_nth(&mut recent_unlimited_labels, nth); + } + old_labels.extend( + recent_unlimited_labels + .split_at(nth) + .0 + .iter() + .map(|(_ /*wallclock*/, ix)| self.table.get_index(*ix).unwrap().0.clone()), + ); + old_labels + }; + thread_pool.install(|| { + self.records + .par_iter() + .flat_map(|(pubkey, index)| evict(pubkey, index)) .collect() }) } @@ -328,10 +371,10 @@ impl Crds { #[cfg(test)] mod test { use super::*; - use crate::contact_info::ContactInfo; + use crate::{contact_info::ContactInfo, crds_value::NodeInstance}; use rand::{thread_rng, Rng}; use rayon::ThreadPoolBuilder; - use std::iter::repeat_with; + use std::{collections::HashSet, iter::repeat_with}; #[test] fn test_insert() { @@ -445,6 +488,41 @@ mod test { vec![val.label()] ); } + + #[test] + fn test_find_old_records_unlimited() { + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); + let mut rng = thread_rng(); + let now = 1_610_034_423_000; + let pubkey = Pubkey::new_unique(); + let mut crds = Crds::default(); + let mut timeouts = HashMap::new(); + timeouts.insert(Pubkey::default(), 1); + timeouts.insert(pubkey, 180); + for _ in 0..1024 { + let wallclock = now - rng.gen_range(0, 240); + let val = NodeInstance::new(&mut rng, pubkey, wallclock); + let val = CrdsData::NodeInstance(val); + let val = CrdsValue::new_unsigned(val); + assert_eq!(crds.insert(val, now), Ok(None)); + } + let now = now + 1; + let labels = crds.find_old_labels(&thread_pool, now, &timeouts); + assert_eq!(crds.table.len() - labels.len(), MAX_CRDS_VALUES_PER_PUBKEY); + let max_wallclock = labels + .iter() + .map(|label| crds.lookup(label).unwrap().wallclock()) + .max() + .unwrap(); + assert!(max_wallclock > now - 180); + let labels: HashSet<_> = labels.into_iter().collect(); + for (label, value) in crds.table.iter() { + if !labels.contains(label) { + assert!(max_wallclock <= value.value.wallclock()); + } + } + } + #[test] fn test_remove_default() { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); diff --git a/core/src/crds_value.rs b/core/src/crds_value.rs index 5454668ea4..61573c1217 100644 --- a/core/src/crds_value.rs +++ b/core/src/crds_value.rs @@ -422,6 +422,22 @@ impl CrdsValueLabel { CrdsValueLabel::NodeInstance(p, _ /*token*/) => *p, } } + + /// Returns number of possible distinct labels of the same type for + /// a fixed pubkey, and None if that is practically unlimited. + pub(crate) fn value_space(&self) -> Option { + match self { + CrdsValueLabel::ContactInfo(_) => Some(1), + CrdsValueLabel::Vote(_, _) => Some(MAX_VOTES as usize), + CrdsValueLabel::LowestSlot(_) => Some(1), + CrdsValueLabel::SnapshotHashes(_) => Some(1), + CrdsValueLabel::EpochSlots(_, _) => Some(MAX_EPOCH_SLOTS as usize), + CrdsValueLabel::AccountsHashes(_) => Some(1), + CrdsValueLabel::LegacyVersion(_) => Some(1), + CrdsValueLabel::Version(_) => Some(1), + CrdsValueLabel::NodeInstance(_, _) => None, + } + } } impl CrdsValue { diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index 719850f6a4..eca8f1e210 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -1306,7 +1306,7 @@ dependencies = [ "libc", "redox_syscall", "rustc_version", - "smallvec 0.6.13", + "smallvec 0.6.14", "winapi 0.3.8", ] @@ -1815,18 +1815,18 @@ checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" [[package]] name = "smallvec" -version = "0.6.13" +version = "0.6.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7b0758c52e15a8b5e3691eae6cc559f08eee9406e548a4477ba4e67770a82b6" +checksum = "b97fcaeba89edba30f044a10c6a3cc39df9c3f17d7cd829dd1446cab35f890e0" dependencies = [ "maybe-uninit", ] [[package]] name = "smallvec" -version = "1.4.2" +version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbee7696b84bbf3d89a1c2eccff0850e3047ed46bfcd2e92c29a2d074d57e252" +checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e" [[package]] name = "socket2" @@ -2846,7 +2846,7 @@ version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5479532badd04e128284890390c1e876ef7a993d0570b3597ae43dfa1d59afa4" dependencies = [ - "smallvec 1.4.2", + "smallvec 1.6.1", ] [[package]]