diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 6c3f39e243..c0386f4ab6 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -454,10 +454,10 @@ fn retain_staked(values: &mut Vec, stakes: &HashMap) { // Otherwise unstaked voting nodes will show up with no version in // the various dashboards. CrdsData::Version(_) => true, + CrdsData::NodeInstance(_) => true, CrdsData::LowestSlot(_, _) | CrdsData::AccountsHashes(_) | CrdsData::LegacyVersion(_) - | CrdsData::NodeInstance(_) | CrdsData::DuplicateShred(_, _) => { let stake = stakes.get(&value.pubkey()).copied(); stake.unwrap_or_default() >= MIN_STAKE_FOR_GOSSIP diff --git a/core/src/crds.rs b/core/src/crds.rs index 0f22fc2553..4af80a7e54 100644 --- a/core/src/crds.rs +++ b/core/src/crds.rs @@ -40,9 +40,6 @@ use std::{ }; const CRDS_SHARDS_BITS: u32 = 8; -// Limit number of crds values associated with each unique pubkey. This -// excludes crds values which by label design are limited per each pubkey. -const MAX_CRDS_VALUES_PER_PUBKEY: usize = 32; #[derive(Clone)] pub struct Crds { @@ -128,6 +125,14 @@ impl Default for Crds { // Both values should have the same key/label. fn overrides(value: &CrdsValue, other: &VersionedCrdsValue) -> bool { assert_eq!(value.label(), other.value.label(), "labels mismatch!"); + // Node instances are special cased so that if there are two running + // instances of the same node, the more recent start is propagated through + // gossip regardless of wallclocks. + if let CrdsData::NodeInstance(value) = &value.data { + if let Some(out) = value.overrides(&other.value) { + return out; + } + } match value.wallclock().cmp(&other.value.wallclock()) { Ordering::Less => false, Ordering::Greater => true, @@ -372,9 +377,7 @@ impl Crds { None => 0, } }; - let mut old_labels = Vec::new(); - // Buffer of crds values to be evicted based on their wallclock. - let mut recent_unlimited_labels: Vec<(u64 /*wallclock*/, usize /*index*/)> = index + index .into_iter() .filter_map(|ix| { let (label, value) = self.table.get_index(*ix).unwrap(); @@ -383,32 +386,12 @@ impl Crds { .max(local_timestamp) .saturating_add(timeout); if expiry_timestamp <= now { - old_labels.push(label.clone()); - None + Some(label.clone()) } else { - match label.value_space() { - Some(_) => None, - None => Some((value.value.wallclock(), *ix)), - } + None } }) - .collect(); - // Number of values to discard from the buffer: - let nth = recent_unlimited_labels - .len() - .saturating_sub(MAX_CRDS_VALUES_PER_PUBKEY); - // Partition on wallclock to discard the older ones. - if nth > 0 && nth < recent_unlimited_labels.len() { - recent_unlimited_labels.select_nth_unstable(nth); - } - old_labels.extend( - recent_unlimited_labels - .split_at(nth) - .0 - .iter() - .map(|(_ /*wallclock*/, ix)| self.table.get_index(*ix).unwrap().0.clone()), - ); - old_labels + .collect::>() }; thread_pool.install(|| { self.records @@ -541,7 +524,8 @@ mod test { contact_info::ContactInfo, crds_value::{new_rand_timestamp, NodeInstance}, }; - use rand::{thread_rng, Rng}; + use rand::{thread_rng, Rng, SeedableRng}; + use rand_chacha::ChaChaRng; use rayon::ThreadPoolBuilder; use solana_sdk::signature::{Keypair, Signer}; use std::{collections::HashSet, iter::repeat_with}; @@ -616,6 +600,62 @@ mod test { assert_eq!(crds.table[&val2.label()].local_timestamp, 3); assert_eq!(crds.table[&val2.label()].ordinal, 2); } + + #[test] + fn test_upsert_node_instance() { + const SEED: [u8; 32] = [0x42; 32]; + let mut rng = ChaChaRng::from_seed(SEED); + fn make_crds_value(node: NodeInstance) -> CrdsValue { + CrdsValue::new_unsigned(CrdsData::NodeInstance(node)) + } + let now = 1_620_838_767_000; + let mut crds = Crds::default(); + let pubkey = Pubkey::new_unique(); + let node = NodeInstance::new(&mut rng, pubkey, now); + let node = make_crds_value(node); + assert_eq!(crds.insert(node, now), Ok(None)); + // A node-instance with a different key should insert fine even with + // older timestamps. + let other = NodeInstance::new(&mut rng, Pubkey::new_unique(), now - 1); + let other = make_crds_value(other); + assert_eq!(crds.insert(other, now), Ok(None)); + // A node-instance with older timestamp should fail to insert, even if + // the wallclock is more recent. + let other = NodeInstance::new(&mut rng, pubkey, now - 1); + let other = other.with_wallclock(now + 1); + let other = make_crds_value(other); + let value_hash = hash(&serialize(&other).unwrap()); + assert_eq!( + crds.insert(other, now), + Err(CrdsError::InsertFailed(value_hash)) + ); + // A node instance with the same timestamp should insert only if the + // random token is larger. + let mut num_overrides = 0; + for _ in 0..100 { + let other = NodeInstance::new(&mut rng, pubkey, now); + let other = make_crds_value(other); + let value_hash = hash(&serialize(&other).unwrap()); + match crds.insert(other, now) { + Ok(Some(_)) => num_overrides += 1, + Err(CrdsError::InsertFailed(x)) => assert_eq!(x, value_hash), + _ => panic!(), + } + } + assert_eq!(num_overrides, 5); + // A node instance with larger timestamp should insert regardless of + // its token value. + for k in 1..10 { + let other = NodeInstance::new(&mut rng, pubkey, now + k); + let other = other.with_wallclock(now - 1); + let other = make_crds_value(other); + match crds.insert(other, now) { + Ok(Some(_)) => (), + _ => panic!(), + } + } + } + #[test] fn test_find_old_records_default() { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); @@ -662,40 +702,6 @@ mod test { ); } - #[test] - fn test_find_old_records_unlimited() { - let thread_pool = ThreadPoolBuilder::new().build().unwrap(); - let mut rng = thread_rng(); - let now = 1_610_034_423_000; - let pubkey = Pubkey::new_unique(); - let mut crds = Crds::default(); - let mut timeouts = HashMap::new(); - timeouts.insert(Pubkey::default(), 1); - timeouts.insert(pubkey, 180); - for _ in 0..1024 { - let wallclock = now - rng.gen_range(0, 240); - let val = NodeInstance::new(&mut rng, pubkey, wallclock); - let val = CrdsData::NodeInstance(val); - let val = CrdsValue::new_unsigned(val); - assert_eq!(crds.insert(val, now), Ok(None)); - } - let now = now + 1; - let labels = crds.find_old_labels(&thread_pool, now, &timeouts); - assert_eq!(crds.table.len() - labels.len(), MAX_CRDS_VALUES_PER_PUBKEY); - let max_wallclock = labels - .iter() - .map(|label| crds.lookup(label).unwrap().wallclock()) - .max() - .unwrap(); - assert!(max_wallclock > now - 180); - let labels: HashSet<_> = labels.into_iter().collect(); - for (label, value) in crds.table.iter() { - if !labels.contains(label) { - assert!(max_wallclock <= value.value.wallclock()); - } - } - } - #[test] fn test_remove_default() { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); diff --git a/core/src/crds_value.rs b/core/src/crds_value.rs index 9fc6e4bf81..5fce1a2286 100644 --- a/core/src/crds_value.rs +++ b/core/src/crds_value.rs @@ -20,6 +20,7 @@ use solana_sdk::{ use solana_vote_program::vote_transaction::parse_vote_transaction; use std::{ borrow::{Borrow, Cow}, + cmp::Ordering, collections::{hash_map::Entry, BTreeSet, HashMap}, fmt, }; @@ -405,7 +406,7 @@ impl NodeInstance { } // Clones the value with an updated wallclock. - pub fn with_wallclock(&self, now: u64) -> Self { + pub(crate) fn with_wallclock(&self, now: u64) -> Self { Self { wallclock: now, ..*self @@ -414,7 +415,7 @@ impl NodeInstance { // Returns true if the crds-value is a duplicate instance // of this node, with a more recent timestamp. - pub fn check_duplicate(&self, other: &CrdsValue) -> bool { + pub(crate) fn check_duplicate(&self, other: &CrdsValue) -> bool { match &other.data { CrdsData::NodeInstance(other) => { self.token != other.token @@ -424,6 +425,26 @@ impl NodeInstance { _ => false, } } + + // Returns None if tokens are the same or other is not a node-instance from + // the same owner. Otherwise returns true if self has more recent timestamp + // than other, and so overrides it. + pub(crate) fn overrides(&self, other: &CrdsValue) -> Option { + let other = match &other.data { + CrdsData::NodeInstance(other) => other, + _ => return None, + }; + if self.token == other.token || self.from != other.from { + return None; + } + match self.timestamp.cmp(&other.timestamp) { + Ordering::Less => Some(false), + Ordering::Greater => Some(true), + // Ties should be broken in a deterministic way across the cluster, + // so that nodes propagate the same value through gossip. + Ordering::Equal => Some(other.token < self.token), + } + } } impl Sanitize for NodeInstance { @@ -445,7 +466,7 @@ pub enum CrdsValueLabel { AccountsHashes(Pubkey), LegacyVersion(Pubkey), Version(Pubkey), - NodeInstance(Pubkey, u64 /*token*/), + NodeInstance(Pubkey), DuplicateShred(DuplicateShredIndex, Pubkey), } @@ -460,7 +481,7 @@ impl fmt::Display for CrdsValueLabel { CrdsValueLabel::AccountsHashes(_) => write!(f, "AccountsHashes({})", self.pubkey()), CrdsValueLabel::LegacyVersion(_) => write!(f, "LegacyVersion({})", self.pubkey()), CrdsValueLabel::Version(_) => write!(f, "Version({})", self.pubkey()), - CrdsValueLabel::NodeInstance(pk, token) => write!(f, "NodeInstance({}, {})", pk, token), + CrdsValueLabel::NodeInstance(pk) => write!(f, "NodeInstance({})", pk), CrdsValueLabel::DuplicateShred(ix, pk) => write!(f, "DuplicateShred({}, {})", ix, pk), } } @@ -477,27 +498,10 @@ impl CrdsValueLabel { CrdsValueLabel::AccountsHashes(p) => *p, CrdsValueLabel::LegacyVersion(p) => *p, CrdsValueLabel::Version(p) => *p, - CrdsValueLabel::NodeInstance(p, _ /*token*/) => *p, + CrdsValueLabel::NodeInstance(p) => *p, CrdsValueLabel::DuplicateShred(_, p) => *p, } } - - /// Returns number of possible distinct labels of the same type for - /// a fixed pubkey, and None if that is practically unlimited. - pub(crate) fn value_space(&self) -> Option { - match self { - CrdsValueLabel::ContactInfo(_) => Some(1), - CrdsValueLabel::Vote(_, _) => Some(MAX_VOTES as usize), - CrdsValueLabel::LowestSlot(_) => Some(1), - CrdsValueLabel::SnapshotHashes(_) => Some(1), - CrdsValueLabel::EpochSlots(_, _) => Some(MAX_EPOCH_SLOTS as usize), - CrdsValueLabel::AccountsHashes(_) => Some(1), - CrdsValueLabel::LegacyVersion(_) => Some(1), - CrdsValueLabel::Version(_) => Some(1), - CrdsValueLabel::NodeInstance(_, _) => None, - CrdsValueLabel::DuplicateShred(_, _) => Some(MAX_DUPLICATE_SHREDS as usize), - } - } } impl CrdsValue { @@ -570,7 +574,7 @@ impl CrdsValue { CrdsData::EpochSlots(ix, _) => CrdsValueLabel::EpochSlots(*ix, self.pubkey()), CrdsData::LegacyVersion(_) => CrdsValueLabel::LegacyVersion(self.pubkey()), CrdsData::Version(_) => CrdsValueLabel::Version(self.pubkey()), - CrdsData::NodeInstance(node) => CrdsValueLabel::NodeInstance(node.from, node.token), + CrdsData::NodeInstance(node) => CrdsValueLabel::NodeInstance(node.from), CrdsData::DuplicateShred(ix, shred) => CrdsValueLabel::DuplicateShred(*ix, shred.from), } } @@ -931,7 +935,7 @@ mod test { token: rng.gen(), ..node }; - assert_ne!( + assert_eq!( make_crds_value(node).label(), make_crds_value(other).label() ); @@ -946,20 +950,31 @@ mod test { let mut rng = rand::thread_rng(); let pubkey = Pubkey::new_unique(); let node = NodeInstance::new(&mut rng, pubkey, now); + let node_crds = make_crds_value(node.clone()); // Same token is not a duplicate. - assert!(!node.check_duplicate(&make_crds_value(NodeInstance { + let other = NodeInstance { from: pubkey, wallclock: now + 1, timestamp: now + 1, token: node.token, - }))); + }; + let other_crds = make_crds_value(other.clone()); + assert!(!node.check_duplicate(&other_crds)); + assert!(!other.check_duplicate(&node_crds)); + assert_eq!(node.overrides(&other_crds), None); + assert_eq!(other.overrides(&node_crds), None); // Older timestamp is not a duplicate. - assert!(!node.check_duplicate(&make_crds_value(NodeInstance { + let other = NodeInstance { from: pubkey, wallclock: now + 1, timestamp: now - 1, token: rng.gen(), - }))); + }; + let other_crds = make_crds_value(other.clone()); + assert!(!node.check_duplicate(&other_crds)); + assert!(other.check_duplicate(&node_crds)); + assert_eq!(node.overrides(&other_crds), Some(true)); + assert_eq!(other.overrides(&node_crds), Some(false)); // Updated wallclock is not a duplicate. let other = node.with_wallclock(now + 8); assert_eq!( @@ -971,27 +986,56 @@ mod test { token: node.token, } ); - assert!(!node.check_duplicate(&make_crds_value(other))); - // Duplicate instance. - assert!(node.check_duplicate(&make_crds_value(NodeInstance { - from: pubkey, - wallclock: 0, - timestamp: now, - token: rng.gen(), - }))); + let other_crds = make_crds_value(other.clone()); + assert!(!node.check_duplicate(&other_crds)); + assert!(!other.check_duplicate(&node_crds)); + assert_eq!(node.overrides(&other_crds), None); + assert_eq!(other.overrides(&node_crds), None); + // Duplicate instance; tied timestamp. + for _ in 0..10 { + let other = NodeInstance { + from: pubkey, + wallclock: 0, + timestamp: now, + token: rng.gen(), + }; + let other_crds = make_crds_value(other.clone()); + assert!(node.check_duplicate(&other_crds)); + assert!(other.check_duplicate(&node_crds)); + assert_eq!(node.overrides(&other_crds), Some(other.token < node.token)); + assert_eq!(other.overrides(&node_crds), Some(node.token < other.token)); + } + // Duplicate instance; more recent timestamp. + for _ in 0..10 { + let other = NodeInstance { + from: pubkey, + wallclock: 0, + timestamp: now + 1, + token: rng.gen(), + }; + let other_crds = make_crds_value(other.clone()); + assert!(node.check_duplicate(&other_crds)); + assert!(!other.check_duplicate(&node_crds)); + assert_eq!(node.overrides(&other_crds), Some(false)); + assert_eq!(other.overrides(&node_crds), Some(true)); + } // Different pubkey is not a duplicate. - assert!(!node.check_duplicate(&make_crds_value(NodeInstance { + let other = NodeInstance { from: Pubkey::new_unique(), wallclock: now + 1, timestamp: now + 1, token: rng.gen(), - }))); + }; + let other_crds = make_crds_value(other.clone()); + assert!(!node.check_duplicate(&other_crds)); + assert!(!other.check_duplicate(&node_crds)); + assert_eq!(node.overrides(&other_crds), None); + assert_eq!(other.overrides(&node_crds), None); // Differnt crds value is not a duplicate. - assert!( - !node.check_duplicate(&CrdsValue::new_unsigned(CrdsData::ContactInfo( - ContactInfo::new_rand(&mut rng, Some(pubkey)) - ))) - ); + let other = ContactInfo::new_rand(&mut rng, Some(pubkey)); + let other = CrdsValue::new_unsigned(CrdsData::ContactInfo(other)); + assert!(!node.check_duplicate(&other)); + assert_eq!(node.overrides(&other), None); } #[test]