crds table retains up to 32 node-instance values per each pubkey. This
is so because if there are multiple running instances of the same node,
then we want gossip to propagate node-instance values associated with
both instances, therefore the corresponding label/key includes the
randomly generated token in addition to the pubkey:
https://github.com/solana-labs/solana/blob/9c42a89a4/core/src/crds_value.rs#L448
https://github.com/solana-labs/solana/pull/14037
As a result, the number of such values per pubkey are effectively
unbounded, requiring custom mitigations implemented in:
https://github.com/solana-labs/solana/pull/14467
but still taking redundant extra memory and bandwidth.
This commit instead retains only one node-instance per pubkey by
extending crds values override logic. If a crds value is of type
node-instance, it will always override an existing one with the same key
if it has more recent starting timestamp (not wallclock). As a result,
gossip will always propagate the node-instance with more recent
timestamp. Since the check_duplicate logic will stop the node with older
timestamp, this change should preserve existing functionality.
(cherry picked from commit 0aa7824884)
Co-authored-by: behzad nouri <behzadnouri@gmail.com>
			
			
This commit is contained in:
		| @@ -444,10 +444,10 @@ fn retain_staked(values: &mut Vec<CrdsValue>, stakes: &HashMap<Pubkey, u64>) { | ||||
|             // Otherwise unstaked voting nodes will show up with no version in | ||||
|             // the various dashboards. | ||||
|             CrdsData::Version(_) => true, | ||||
|             CrdsData::NodeInstance(_) => true, | ||||
|             CrdsData::LowestSlot(_, _) | ||||
|             | CrdsData::AccountsHashes(_) | ||||
|             | CrdsData::LegacyVersion(_) | ||||
|             | CrdsData::NodeInstance(_) | ||||
|             | CrdsData::DuplicateShred(_, _) => { | ||||
|                 let stake = stakes.get(&value.pubkey()).copied(); | ||||
|                 stake.unwrap_or_default() >= MIN_STAKE_FOR_GOSSIP | ||||
|   | ||||
							
								
								
									
										134
									
								
								core/src/crds.rs
									
									
									
									
									
								
							
							
						
						
									
										134
									
								
								core/src/crds.rs
									
									
									
									
									
								
							| @@ -40,9 +40,6 @@ use std::{ | ||||
| }; | ||||
|  | ||||
| const CRDS_SHARDS_BITS: u32 = 8; | ||||
| // Limit number of crds values associated with each unique pubkey. This | ||||
| // excludes crds values which by label design are limited per each pubkey. | ||||
| const MAX_CRDS_VALUES_PER_PUBKEY: usize = 32; | ||||
|  | ||||
| #[derive(Clone)] | ||||
| pub struct Crds { | ||||
| @@ -126,6 +123,14 @@ impl Default for Crds { | ||||
| // Both values should have the same key/label. | ||||
| fn overrides(value: &CrdsValue, other: &VersionedCrdsValue) -> bool { | ||||
|     assert_eq!(value.label(), other.value.label(), "labels mismatch!"); | ||||
|     // Node instances are special cased so that if there are two running | ||||
|     // instances of the same node, the more recent start is propagated through | ||||
|     // gossip regardless of wallclocks. | ||||
|     if let CrdsData::NodeInstance(value) = &value.data { | ||||
|         if let Some(out) = value.overrides(&other.value) { | ||||
|             return out; | ||||
|         } | ||||
|     } | ||||
|     match value.wallclock().cmp(&other.value.wallclock()) { | ||||
|         Ordering::Less => false, | ||||
|         Ordering::Greater => true, | ||||
| @@ -370,9 +375,7 @@ impl Crds { | ||||
|                     None => 0, | ||||
|                 } | ||||
|             }; | ||||
|             let mut old_labels = Vec::new(); | ||||
|             // Buffer of crds values to be evicted based on their wallclock. | ||||
|             let mut recent_unlimited_labels: Vec<(u64 /*wallclock*/, usize /*index*/)> = index | ||||
|             index | ||||
|                 .into_iter() | ||||
|                 .filter_map(|ix| { | ||||
|                     let (label, value) = self.table.get_index(*ix).unwrap(); | ||||
| @@ -381,32 +384,12 @@ impl Crds { | ||||
|                         .max(local_timestamp) | ||||
|                         .saturating_add(timeout); | ||||
|                     if expiry_timestamp <= now { | ||||
|                         old_labels.push(label.clone()); | ||||
|                         None | ||||
|                         Some(label.clone()) | ||||
|                     } else { | ||||
|                         match label.value_space() { | ||||
|                             Some(_) => None, | ||||
|                             None => Some((value.value.wallclock(), *ix)), | ||||
|                         } | ||||
|                         None | ||||
|                     } | ||||
|                 }) | ||||
|                 .collect(); | ||||
|             // Number of values to discard from the buffer: | ||||
|             let nth = recent_unlimited_labels | ||||
|                 .len() | ||||
|                 .saturating_sub(MAX_CRDS_VALUES_PER_PUBKEY); | ||||
|             // Partition on wallclock to discard the older ones. | ||||
|             if nth > 0 && nth < recent_unlimited_labels.len() { | ||||
|                 recent_unlimited_labels.select_nth_unstable(nth); | ||||
|             } | ||||
|             old_labels.extend( | ||||
|                 recent_unlimited_labels | ||||
|                     .split_at(nth) | ||||
|                     .0 | ||||
|                     .iter() | ||||
|                     .map(|(_ /*wallclock*/, ix)| self.table.get_index(*ix).unwrap().0.clone()), | ||||
|             ); | ||||
|             old_labels | ||||
|                 .collect::<Vec<_>>() | ||||
|         }; | ||||
|         thread_pool.install(|| { | ||||
|             self.records | ||||
| @@ -539,7 +522,8 @@ mod test { | ||||
|         contact_info::ContactInfo, | ||||
|         crds_value::{new_rand_timestamp, NodeInstance}, | ||||
|     }; | ||||
|     use rand::{thread_rng, Rng}; | ||||
|     use rand::{thread_rng, Rng, SeedableRng}; | ||||
|     use rand_chacha::ChaChaRng; | ||||
|     use rayon::ThreadPoolBuilder; | ||||
|     use solana_sdk::signature::{Keypair, Signer}; | ||||
|     use std::{collections::HashSet, iter::repeat_with}; | ||||
| @@ -614,6 +598,62 @@ mod test { | ||||
|         assert_eq!(crds.table[&val2.label()].local_timestamp, 3); | ||||
|         assert_eq!(crds.table[&val2.label()].ordinal, 2); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn test_upsert_node_instance() { | ||||
|         const SEED: [u8; 32] = [0x42; 32]; | ||||
|         let mut rng = ChaChaRng::from_seed(SEED); | ||||
|         fn make_crds_value(node: NodeInstance) -> CrdsValue { | ||||
|             CrdsValue::new_unsigned(CrdsData::NodeInstance(node)) | ||||
|         } | ||||
|         let now = 1_620_838_767_000; | ||||
|         let mut crds = Crds::default(); | ||||
|         let pubkey = Pubkey::new_unique(); | ||||
|         let node = NodeInstance::new(&mut rng, pubkey, now); | ||||
|         let node = make_crds_value(node); | ||||
|         assert_eq!(crds.insert(node, now), Ok(None)); | ||||
|         // A node-instance with a different key should insert fine even with | ||||
|         // older timestamps. | ||||
|         let other = NodeInstance::new(&mut rng, Pubkey::new_unique(), now - 1); | ||||
|         let other = make_crds_value(other); | ||||
|         assert_eq!(crds.insert(other, now), Ok(None)); | ||||
|         // A node-instance with older timestamp should fail to insert, even if | ||||
|         // the wallclock is more recent. | ||||
|         let other = NodeInstance::new(&mut rng, pubkey, now - 1); | ||||
|         let other = other.with_wallclock(now + 1); | ||||
|         let other = make_crds_value(other); | ||||
|         let value_hash = hash(&serialize(&other).unwrap()); | ||||
|         assert_eq!( | ||||
|             crds.insert(other, now), | ||||
|             Err(CrdsError::InsertFailed(value_hash)) | ||||
|         ); | ||||
|         // A node instance with the same timestamp should insert only if the | ||||
|         // random token is larger. | ||||
|         let mut num_overrides = 0; | ||||
|         for _ in 0..100 { | ||||
|             let other = NodeInstance::new(&mut rng, pubkey, now); | ||||
|             let other = make_crds_value(other); | ||||
|             let value_hash = hash(&serialize(&other).unwrap()); | ||||
|             match crds.insert(other, now) { | ||||
|                 Ok(Some(_)) => num_overrides += 1, | ||||
|                 Err(CrdsError::InsertFailed(x)) => assert_eq!(x, value_hash), | ||||
|                 _ => panic!(), | ||||
|             } | ||||
|         } | ||||
|         assert_eq!(num_overrides, 5); | ||||
|         // A node instance with larger timestamp should insert regardless of | ||||
|         // its token value. | ||||
|         for k in 1..10 { | ||||
|             let other = NodeInstance::new(&mut rng, pubkey, now + k); | ||||
|             let other = other.with_wallclock(now - 1); | ||||
|             let other = make_crds_value(other); | ||||
|             match crds.insert(other, now) { | ||||
|                 Ok(Some(_)) => (), | ||||
|                 _ => panic!(), | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn test_find_old_records_default() { | ||||
|         let thread_pool = ThreadPoolBuilder::new().build().unwrap(); | ||||
| @@ -660,40 +700,6 @@ mod test { | ||||
|         ); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn test_find_old_records_unlimited() { | ||||
|         let thread_pool = ThreadPoolBuilder::new().build().unwrap(); | ||||
|         let mut rng = thread_rng(); | ||||
|         let now = 1_610_034_423_000; | ||||
|         let pubkey = Pubkey::new_unique(); | ||||
|         let mut crds = Crds::default(); | ||||
|         let mut timeouts = HashMap::new(); | ||||
|         timeouts.insert(Pubkey::default(), 1); | ||||
|         timeouts.insert(pubkey, 180); | ||||
|         for _ in 0..1024 { | ||||
|             let wallclock = now - rng.gen_range(0, 240); | ||||
|             let val = NodeInstance::new(&mut rng, pubkey, wallclock); | ||||
|             let val = CrdsData::NodeInstance(val); | ||||
|             let val = CrdsValue::new_unsigned(val); | ||||
|             assert_eq!(crds.insert(val, now), Ok(None)); | ||||
|         } | ||||
|         let now = now + 1; | ||||
|         let labels = crds.find_old_labels(&thread_pool, now, &timeouts); | ||||
|         assert_eq!(crds.table.len() - labels.len(), MAX_CRDS_VALUES_PER_PUBKEY); | ||||
|         let max_wallclock = labels | ||||
|             .iter() | ||||
|             .map(|label| crds.lookup(label).unwrap().wallclock()) | ||||
|             .max() | ||||
|             .unwrap(); | ||||
|         assert!(max_wallclock > now - 180); | ||||
|         let labels: HashSet<_> = labels.into_iter().collect(); | ||||
|         for (label, value) in crds.table.iter() { | ||||
|             if !labels.contains(label) { | ||||
|                 assert!(max_wallclock <= value.value.wallclock()); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn test_remove_default() { | ||||
|         let thread_pool = ThreadPoolBuilder::new().build().unwrap(); | ||||
|   | ||||
| @@ -20,6 +20,7 @@ use solana_sdk::{ | ||||
| use solana_vote_program::vote_transaction::parse_vote_transaction; | ||||
| use std::{ | ||||
|     borrow::{Borrow, Cow}, | ||||
|     cmp::Ordering, | ||||
|     collections::{hash_map::Entry, BTreeSet, HashMap}, | ||||
|     fmt, | ||||
| }; | ||||
| @@ -405,7 +406,7 @@ impl NodeInstance { | ||||
|     } | ||||
|  | ||||
|     // Clones the value with an updated wallclock. | ||||
|     pub fn with_wallclock(&self, now: u64) -> Self { | ||||
|     pub(crate) fn with_wallclock(&self, now: u64) -> Self { | ||||
|         Self { | ||||
|             wallclock: now, | ||||
|             ..*self | ||||
| @@ -414,7 +415,7 @@ impl NodeInstance { | ||||
|  | ||||
|     // Returns true if the crds-value is a duplicate instance | ||||
|     // of this node, with a more recent timestamp. | ||||
|     pub fn check_duplicate(&self, other: &CrdsValue) -> bool { | ||||
|     pub(crate) fn check_duplicate(&self, other: &CrdsValue) -> bool { | ||||
|         match &other.data { | ||||
|             CrdsData::NodeInstance(other) => { | ||||
|                 self.token != other.token | ||||
| @@ -424,6 +425,26 @@ impl NodeInstance { | ||||
|             _ => false, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     // Returns None if tokens are the same or other is not a node-instance from | ||||
|     // the same owner. Otherwise returns true if self has more recent timestamp | ||||
|     // than other, and so overrides it. | ||||
|     pub(crate) fn overrides(&self, other: &CrdsValue) -> Option<bool> { | ||||
|         let other = match &other.data { | ||||
|             CrdsData::NodeInstance(other) => other, | ||||
|             _ => return None, | ||||
|         }; | ||||
|         if self.token == other.token || self.from != other.from { | ||||
|             return None; | ||||
|         } | ||||
|         match self.timestamp.cmp(&other.timestamp) { | ||||
|             Ordering::Less => Some(false), | ||||
|             Ordering::Greater => Some(true), | ||||
|             // Ties should be broken in a deterministic way across the cluster, | ||||
|             // so that nodes propagate the same value through gossip. | ||||
|             Ordering::Equal => Some(other.token < self.token), | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Sanitize for NodeInstance { | ||||
| @@ -445,7 +466,7 @@ pub enum CrdsValueLabel { | ||||
|     AccountsHashes(Pubkey), | ||||
|     LegacyVersion(Pubkey), | ||||
|     Version(Pubkey), | ||||
|     NodeInstance(Pubkey, u64 /*token*/), | ||||
|     NodeInstance(Pubkey), | ||||
|     DuplicateShred(DuplicateShredIndex, Pubkey), | ||||
| } | ||||
|  | ||||
| @@ -460,7 +481,7 @@ impl fmt::Display for CrdsValueLabel { | ||||
|             CrdsValueLabel::AccountsHashes(_) => write!(f, "AccountsHashes({})", self.pubkey()), | ||||
|             CrdsValueLabel::LegacyVersion(_) => write!(f, "LegacyVersion({})", self.pubkey()), | ||||
|             CrdsValueLabel::Version(_) => write!(f, "Version({})", self.pubkey()), | ||||
|             CrdsValueLabel::NodeInstance(pk, token) => write!(f, "NodeInstance({}, {})", pk, token), | ||||
|             CrdsValueLabel::NodeInstance(pk) => write!(f, "NodeInstance({})", pk), | ||||
|             CrdsValueLabel::DuplicateShred(ix, pk) => write!(f, "DuplicateShred({}, {})", ix, pk), | ||||
|         } | ||||
|     } | ||||
| @@ -477,27 +498,10 @@ impl CrdsValueLabel { | ||||
|             CrdsValueLabel::AccountsHashes(p) => *p, | ||||
|             CrdsValueLabel::LegacyVersion(p) => *p, | ||||
|             CrdsValueLabel::Version(p) => *p, | ||||
|             CrdsValueLabel::NodeInstance(p, _ /*token*/) => *p, | ||||
|             CrdsValueLabel::NodeInstance(p) => *p, | ||||
|             CrdsValueLabel::DuplicateShred(_, p) => *p, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /// Returns number of possible distinct labels of the same type for | ||||
|     /// a fixed pubkey, and None if that is practically unlimited. | ||||
|     pub(crate) fn value_space(&self) -> Option<usize> { | ||||
|         match self { | ||||
|             CrdsValueLabel::ContactInfo(_) => Some(1), | ||||
|             CrdsValueLabel::Vote(_, _) => Some(MAX_VOTES as usize), | ||||
|             CrdsValueLabel::LowestSlot(_) => Some(1), | ||||
|             CrdsValueLabel::SnapshotHashes(_) => Some(1), | ||||
|             CrdsValueLabel::EpochSlots(_, _) => Some(MAX_EPOCH_SLOTS as usize), | ||||
|             CrdsValueLabel::AccountsHashes(_) => Some(1), | ||||
|             CrdsValueLabel::LegacyVersion(_) => Some(1), | ||||
|             CrdsValueLabel::Version(_) => Some(1), | ||||
|             CrdsValueLabel::NodeInstance(_, _) => None, | ||||
|             CrdsValueLabel::DuplicateShred(_, _) => Some(MAX_DUPLICATE_SHREDS as usize), | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl CrdsValue { | ||||
| @@ -570,7 +574,7 @@ impl CrdsValue { | ||||
|             CrdsData::EpochSlots(ix, _) => CrdsValueLabel::EpochSlots(*ix, self.pubkey()), | ||||
|             CrdsData::LegacyVersion(_) => CrdsValueLabel::LegacyVersion(self.pubkey()), | ||||
|             CrdsData::Version(_) => CrdsValueLabel::Version(self.pubkey()), | ||||
|             CrdsData::NodeInstance(node) => CrdsValueLabel::NodeInstance(node.from, node.token), | ||||
|             CrdsData::NodeInstance(node) => CrdsValueLabel::NodeInstance(node.from), | ||||
|             CrdsData::DuplicateShred(ix, shred) => CrdsValueLabel::DuplicateShred(*ix, shred.from), | ||||
|         } | ||||
|     } | ||||
| @@ -931,7 +935,7 @@ mod test { | ||||
|             token: rng.gen(), | ||||
|             ..node | ||||
|         }; | ||||
|         assert_ne!( | ||||
|         assert_eq!( | ||||
|             make_crds_value(node).label(), | ||||
|             make_crds_value(other).label() | ||||
|         ); | ||||
| @@ -946,20 +950,31 @@ mod test { | ||||
|         let mut rng = rand::thread_rng(); | ||||
|         let pubkey = Pubkey::new_unique(); | ||||
|         let node = NodeInstance::new(&mut rng, pubkey, now); | ||||
|         let node_crds = make_crds_value(node.clone()); | ||||
|         // Same token is not a duplicate. | ||||
|         assert!(!node.check_duplicate(&make_crds_value(NodeInstance { | ||||
|         let other = NodeInstance { | ||||
|             from: pubkey, | ||||
|             wallclock: now + 1, | ||||
|             timestamp: now + 1, | ||||
|             token: node.token, | ||||
|         }))); | ||||
|         }; | ||||
|         let other_crds = make_crds_value(other.clone()); | ||||
|         assert!(!node.check_duplicate(&other_crds)); | ||||
|         assert!(!other.check_duplicate(&node_crds)); | ||||
|         assert_eq!(node.overrides(&other_crds), None); | ||||
|         assert_eq!(other.overrides(&node_crds), None); | ||||
|         // Older timestamp is not a duplicate. | ||||
|         assert!(!node.check_duplicate(&make_crds_value(NodeInstance { | ||||
|         let other = NodeInstance { | ||||
|             from: pubkey, | ||||
|             wallclock: now + 1, | ||||
|             timestamp: now - 1, | ||||
|             token: rng.gen(), | ||||
|         }))); | ||||
|         }; | ||||
|         let other_crds = make_crds_value(other.clone()); | ||||
|         assert!(!node.check_duplicate(&other_crds)); | ||||
|         assert!(other.check_duplicate(&node_crds)); | ||||
|         assert_eq!(node.overrides(&other_crds), Some(true)); | ||||
|         assert_eq!(other.overrides(&node_crds), Some(false)); | ||||
|         // Updated wallclock is not a duplicate. | ||||
|         let other = node.with_wallclock(now + 8); | ||||
|         assert_eq!( | ||||
| @@ -971,27 +986,56 @@ mod test { | ||||
|                 token: node.token, | ||||
|             } | ||||
|         ); | ||||
|         assert!(!node.check_duplicate(&make_crds_value(other))); | ||||
|         // Duplicate instance. | ||||
|         assert!(node.check_duplicate(&make_crds_value(NodeInstance { | ||||
|             from: pubkey, | ||||
|             wallclock: 0, | ||||
|             timestamp: now, | ||||
|             token: rng.gen(), | ||||
|         }))); | ||||
|         let other_crds = make_crds_value(other.clone()); | ||||
|         assert!(!node.check_duplicate(&other_crds)); | ||||
|         assert!(!other.check_duplicate(&node_crds)); | ||||
|         assert_eq!(node.overrides(&other_crds), None); | ||||
|         assert_eq!(other.overrides(&node_crds), None); | ||||
|         // Duplicate instance; tied timestamp. | ||||
|         for _ in 0..10 { | ||||
|             let other = NodeInstance { | ||||
|                 from: pubkey, | ||||
|                 wallclock: 0, | ||||
|                 timestamp: now, | ||||
|                 token: rng.gen(), | ||||
|             }; | ||||
|             let other_crds = make_crds_value(other.clone()); | ||||
|             assert!(node.check_duplicate(&other_crds)); | ||||
|             assert!(other.check_duplicate(&node_crds)); | ||||
|             assert_eq!(node.overrides(&other_crds), Some(other.token < node.token)); | ||||
|             assert_eq!(other.overrides(&node_crds), Some(node.token < other.token)); | ||||
|         } | ||||
|         // Duplicate instance; more recent timestamp. | ||||
|         for _ in 0..10 { | ||||
|             let other = NodeInstance { | ||||
|                 from: pubkey, | ||||
|                 wallclock: 0, | ||||
|                 timestamp: now + 1, | ||||
|                 token: rng.gen(), | ||||
|             }; | ||||
|             let other_crds = make_crds_value(other.clone()); | ||||
|             assert!(node.check_duplicate(&other_crds)); | ||||
|             assert!(!other.check_duplicate(&node_crds)); | ||||
|             assert_eq!(node.overrides(&other_crds), Some(false)); | ||||
|             assert_eq!(other.overrides(&node_crds), Some(true)); | ||||
|         } | ||||
|         // Different pubkey is not a duplicate. | ||||
|         assert!(!node.check_duplicate(&make_crds_value(NodeInstance { | ||||
|         let other = NodeInstance { | ||||
|             from: Pubkey::new_unique(), | ||||
|             wallclock: now + 1, | ||||
|             timestamp: now + 1, | ||||
|             token: rng.gen(), | ||||
|         }))); | ||||
|         }; | ||||
|         let other_crds = make_crds_value(other.clone()); | ||||
|         assert!(!node.check_duplicate(&other_crds)); | ||||
|         assert!(!other.check_duplicate(&node_crds)); | ||||
|         assert_eq!(node.overrides(&other_crds), None); | ||||
|         assert_eq!(other.overrides(&node_crds), None); | ||||
|         // Differnt crds value is not a duplicate. | ||||
|         assert!( | ||||
|             !node.check_duplicate(&CrdsValue::new_unsigned(CrdsData::ContactInfo( | ||||
|                 ContactInfo::new_rand(&mut rng, Some(pubkey)) | ||||
|             ))) | ||||
|         ); | ||||
|         let other = ContactInfo::new_rand(&mut rng, Some(pubkey)); | ||||
|         let other = CrdsValue::new_unsigned(CrdsData::ContactInfo(other)); | ||||
|         assert!(!node.check_duplicate(&other)); | ||||
|         assert_eq!(node.overrides(&other), None); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|   | ||||
		Reference in New Issue
	
	Block a user