limits number of crds values associated with a pubkey (bp #14467) (#14489)

* limits number of crds values associated with a pubkey (#14467)

(cherry picked from commit 766195dded)

# Conflicts:
#	core/src/crds.rs

* removes backport merge conflicts

* updates smallvec

Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
mergify[bot]
2021-01-08 21:27:00 +00:00
committed by GitHub
parent 1ab4c616be
commit cbcbc3f1c8
5 changed files with 124 additions and 33 deletions

28
Cargo.lock generated
View File

@ -2512,7 +2512,7 @@ dependencies = [
"libc", "libc",
"redox_syscall", "redox_syscall",
"rustc_version", "rustc_version",
"smallvec 0.6.13", "smallvec 0.6.14",
"winapi 0.3.8", "winapi 0.3.8",
] ]
@ -2526,7 +2526,7 @@ dependencies = [
"cloudabi 0.0.3", "cloudabi 0.0.3",
"libc", "libc",
"redox_syscall", "redox_syscall",
"smallvec 1.4.2", "smallvec 1.6.1",
"winapi 0.3.8", "winapi 0.3.8",
] ]
@ -2541,7 +2541,7 @@ dependencies = [
"instant", "instant",
"libc", "libc",
"redox_syscall", "redox_syscall",
"smallvec 1.4.2", "smallvec 1.6.1",
"winapi 0.3.8", "winapi 0.3.8",
] ]
@ -3053,7 +3053,7 @@ checksum = "a415a013dd7c5d4221382329a5a3482566da675737494935cbbbcdec04662f9d"
dependencies = [ dependencies = [
"cc", "cc",
"libc", "libc",
"smallvec 1.4.2", "smallvec 1.6.1",
] ]
[[package]] [[package]]
@ -3215,14 +3215,9 @@ dependencies = [
[[package]] [[package]]
name = "rustversion" name = "rustversion"
version = "1.0.3" version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9bdc5e856e51e685846fb6c13a1f5e5432946c2c90501bdc76a1319f19e29da" checksum = "cb5d2a036dc6d2d8fd16fde3498b04306e29bd193bf306a57427019b823d5acd"
dependencies = [
"proc-macro2 1.0.24",
"quote 1.0.6",
"syn 1.0.48",
]
[[package]] [[package]]
name = "ryu" name = "ryu"
@ -3532,18 +3527,18 @@ checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
[[package]] [[package]]
name = "smallvec" name = "smallvec"
version = "0.6.13" version = "0.6.14"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7b0758c52e15a8b5e3691eae6cc559f08eee9406e548a4477ba4e67770a82b6" checksum = "b97fcaeba89edba30f044a10c6a3cc39df9c3f17d7cd829dd1446cab35f890e0"
dependencies = [ dependencies = [
"maybe-uninit", "maybe-uninit",
] ]
[[package]] [[package]]
name = "smallvec" name = "smallvec"
version = "1.4.2" version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fbee7696b84bbf3d89a1c2eccff0850e3047ed46bfcd2e92c29a2d074d57e252" checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e"
[[package]] [[package]]
name = "smpl_jwt" name = "smpl_jwt"
@ -3972,6 +3967,7 @@ dependencies = [
"regex", "regex",
"reqwest", "reqwest",
"rustc_version", "rustc_version",
"rustversion",
"serde", "serde",
"serde_derive", "serde_derive",
"serde_json", "serde_json",
@ -6253,7 +6249,7 @@ version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5479532badd04e128284890390c1e876ef7a993d0570b3597ae43dfa1d59afa4" checksum = "5479532badd04e128284890390c1e876ef7a993d0570b3597ae43dfa1d59afa4"
dependencies = [ dependencies = [
"smallvec 1.4.2", "smallvec 1.6.1",
] ]
[[package]] [[package]]

View File

@ -45,6 +45,7 @@ rand_chacha = "0.2.2"
raptorq = "1.4.2" raptorq = "1.4.2"
rayon = "1.4.1" rayon = "1.4.1"
regex = "1.3.9" regex = "1.3.9"
rustversion = "1.0.4"
serde = "1.0.112" serde = "1.0.112"
serde_derive = "1.0.103" serde_derive = "1.0.103"
serde_json = "1.0.56" serde_json = "1.0.56"

View File

@ -40,6 +40,11 @@ use std::collections::{hash_map, HashMap};
use std::ops::{Index, IndexMut}; use std::ops::{Index, IndexMut};
const CRDS_SHARDS_BITS: u32 = 8; const CRDS_SHARDS_BITS: u32 = 8;
// Limit number of crds values associated with each unique pubkey. This
// excludes crds values which by label design are limited per each pubkey.
// TODO: Find the right value for this once duplicate shreds and corresponding
// votes are broadcasted over gossip.
const MAX_CRDS_VALUES_PER_PUBKEY: usize = 512;
#[derive(Clone)] #[derive(Clone)]
pub struct Crds { pub struct Crds {
@ -267,21 +272,59 @@ impl Crds {
now: u64, now: u64,
timeouts: &HashMap<Pubkey, u64>, timeouts: &HashMap<Pubkey, u64>,
) -> Vec<CrdsValueLabel> { ) -> Vec<CrdsValueLabel> {
#[rustversion::before(1.49.0)]
fn select_nth<T: Ord>(xs: &mut Vec<T>, _nth: usize) {
xs.sort_unstable();
}
#[rustversion::since(1.49.0)]
fn select_nth<T: Ord>(xs: &mut Vec<T>, nth: usize) {
xs.select_nth_unstable(nth);
}
let default_timeout = *timeouts let default_timeout = *timeouts
.get(&Pubkey::default()) .get(&Pubkey::default())
.expect("must have default timeout"); .expect("must have default timeout");
thread_pool.install(|| { // Given an index of all crd values associated with a pubkey,
self.table // returns crds labels of old values to be evicted.
.par_iter() let evict = |pubkey, index: &IndexSet<usize>| {
.with_min_len(1024) let timeout = *timeouts.get(pubkey).unwrap_or(&default_timeout);
.filter_map(|(k, v)| { let mut old_labels = Vec::new();
let timeout = timeouts.get(&k.pubkey()).unwrap_or(&default_timeout); // Buffer of crds values to be evicted based on their wallclock.
if v.local_timestamp.saturating_add(*timeout) <= now { let mut recent_unlimited_labels: Vec<(u64 /*wallclock*/, usize /*index*/)> = index
Some(k.clone()) .into_iter()
} else { .filter_map(|ix| {
let (label, value) = self.table.get_index(*ix).unwrap();
if value.local_timestamp.saturating_add(timeout) <= now {
old_labels.push(label.clone());
None None
} else {
match label.value_space() {
Some(_) => None,
None => Some((value.value.wallclock(), *ix)),
}
} }
}) })
.collect();
// Number of values to discard from the buffer:
let nth = recent_unlimited_labels
.len()
.saturating_sub(MAX_CRDS_VALUES_PER_PUBKEY);
// Partition on wallclock to discard the older ones.
if nth > 0 && nth < recent_unlimited_labels.len() {
select_nth(&mut recent_unlimited_labels, nth);
}
old_labels.extend(
recent_unlimited_labels
.split_at(nth)
.0
.iter()
.map(|(_ /*wallclock*/, ix)| self.table.get_index(*ix).unwrap().0.clone()),
);
old_labels
};
thread_pool.install(|| {
self.records
.par_iter()
.flat_map(|(pubkey, index)| evict(pubkey, index))
.collect() .collect()
}) })
} }
@ -328,10 +371,10 @@ impl Crds {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use super::*;
use crate::contact_info::ContactInfo; use crate::{contact_info::ContactInfo, crds_value::NodeInstance};
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use rayon::ThreadPoolBuilder; use rayon::ThreadPoolBuilder;
use std::iter::repeat_with; use std::{collections::HashSet, iter::repeat_with};
#[test] #[test]
fn test_insert() { fn test_insert() {
@ -445,6 +488,41 @@ mod test {
vec![val.label()] vec![val.label()]
); );
} }
#[test]
fn test_find_old_records_unlimited() {
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
let mut rng = thread_rng();
let now = 1_610_034_423_000;
let pubkey = Pubkey::new_unique();
let mut crds = Crds::default();
let mut timeouts = HashMap::new();
timeouts.insert(Pubkey::default(), 1);
timeouts.insert(pubkey, 180);
for _ in 0..1024 {
let wallclock = now - rng.gen_range(0, 240);
let val = NodeInstance::new(&mut rng, pubkey, wallclock);
let val = CrdsData::NodeInstance(val);
let val = CrdsValue::new_unsigned(val);
assert_eq!(crds.insert(val, now), Ok(None));
}
let now = now + 1;
let labels = crds.find_old_labels(&thread_pool, now, &timeouts);
assert_eq!(crds.table.len() - labels.len(), MAX_CRDS_VALUES_PER_PUBKEY);
let max_wallclock = labels
.iter()
.map(|label| crds.lookup(label).unwrap().wallclock())
.max()
.unwrap();
assert!(max_wallclock > now - 180);
let labels: HashSet<_> = labels.into_iter().collect();
for (label, value) in crds.table.iter() {
if !labels.contains(label) {
assert!(max_wallclock <= value.value.wallclock());
}
}
}
#[test] #[test]
fn test_remove_default() { fn test_remove_default() {
let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let thread_pool = ThreadPoolBuilder::new().build().unwrap();

View File

@ -422,6 +422,22 @@ impl CrdsValueLabel {
CrdsValueLabel::NodeInstance(p, _ /*token*/) => *p, CrdsValueLabel::NodeInstance(p, _ /*token*/) => *p,
} }
} }
/// Returns number of possible distinct labels of the same type for
/// a fixed pubkey, and None if that is practically unlimited.
pub(crate) fn value_space(&self) -> Option<usize> {
match self {
CrdsValueLabel::ContactInfo(_) => Some(1),
CrdsValueLabel::Vote(_, _) => Some(MAX_VOTES as usize),
CrdsValueLabel::LowestSlot(_) => Some(1),
CrdsValueLabel::SnapshotHashes(_) => Some(1),
CrdsValueLabel::EpochSlots(_, _) => Some(MAX_EPOCH_SLOTS as usize),
CrdsValueLabel::AccountsHashes(_) => Some(1),
CrdsValueLabel::LegacyVersion(_) => Some(1),
CrdsValueLabel::Version(_) => Some(1),
CrdsValueLabel::NodeInstance(_, _) => None,
}
}
} }
impl CrdsValue { impl CrdsValue {

View File

@ -1306,7 +1306,7 @@ dependencies = [
"libc", "libc",
"redox_syscall", "redox_syscall",
"rustc_version", "rustc_version",
"smallvec 0.6.13", "smallvec 0.6.14",
"winapi 0.3.8", "winapi 0.3.8",
] ]
@ -1815,18 +1815,18 @@ checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
[[package]] [[package]]
name = "smallvec" name = "smallvec"
version = "0.6.13" version = "0.6.14"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7b0758c52e15a8b5e3691eae6cc559f08eee9406e548a4477ba4e67770a82b6" checksum = "b97fcaeba89edba30f044a10c6a3cc39df9c3f17d7cd829dd1446cab35f890e0"
dependencies = [ dependencies = [
"maybe-uninit", "maybe-uninit",
] ]
[[package]] [[package]]
name = "smallvec" name = "smallvec"
version = "1.4.2" version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fbee7696b84bbf3d89a1c2eccff0850e3047ed46bfcd2e92c29a2d074d57e252" checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e"
[[package]] [[package]]
name = "socket2" name = "socket2"
@ -2846,7 +2846,7 @@ version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5479532badd04e128284890390c1e876ef7a993d0570b3597ae43dfa1d59afa4" checksum = "5479532badd04e128284890390c1e876ef7a993d0570b3597ae43dfa1d59afa4"
dependencies = [ dependencies = [
"smallvec 1.4.2", "smallvec 1.6.1",
] ]
[[package]] [[package]]