diff --git a/core/src/crds.rs b/core/src/crds.rs index 544f523250..1e785386a9 100644 --- a/core/src/crds.rs +++ b/core/src/crds.rs @@ -36,8 +36,8 @@ use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::Keypair; use solana_sdk::timing::timestamp; use std::cmp; -use std::collections::HashMap; -use std::ops::Index; +use std::collections::{hash_map, HashMap}; +use std::ops::{Index, IndexMut}; const CRDS_SHARDS_BITS: u32 = 8; @@ -49,6 +49,8 @@ pub struct Crds { shards: CrdsShards, // Indices of all crds values which are node ContactInfo. nodes: IndexSet, + // Indices of all crds values associated with a node. + records: HashMap>, } #[derive(PartialEq, Debug)] @@ -107,6 +109,7 @@ impl Default for Crds { num_inserts: 0, shards: CrdsShards::new(CRDS_SHARDS_BITS), nodes: IndexSet::default(), + records: HashMap::default(), } } } @@ -141,6 +144,10 @@ impl Crds { if let CrdsData::ContactInfo(_) = new_value.value.data { self.nodes.insert(entry_index); } + self.records + .entry(new_value.value.pubkey()) + .or_default() + .insert(entry_index); entry.insert(new_value); self.num_inserts += 1; Ok(None) @@ -150,6 +157,9 @@ impl Crds { self.shards.remove(index, entry.get()); self.shards.insert(index, &new_value); self.num_inserts += 1; + // As long as the pubkey does not change, self.records + // does not need to be updated. + debug_assert_eq!(entry.get().value.pubkey(), new_value.value.pubkey()); Ok(Some(entry.insert(new_value))) } _ => { @@ -237,16 +247,15 @@ impl Crds { .map(move |i| self.table.index(i)) } - fn update_label_timestamp(&mut self, id: &CrdsValueLabel, now: u64) { - if let Some(e) = self.table.get_mut(id) { - e.local_timestamp = cmp::max(e.local_timestamp, now); - } - } - /// Update the timestamp's of all the labels that are associated with Pubkey pub fn update_record_timestamp(&mut self, pubkey: &Pubkey, now: u64) { - for label in CrdsValue::record_labels(*pubkey) { - self.update_label_timestamp(&label, now); + if let Some(indices) = self.records.get(pubkey) { + for index in indices { + let entry = self.table.index_mut(*index); + if entry.local_timestamp < now { + entry.local_timestamp = now; + } + } } } @@ -278,11 +287,21 @@ impl Crds { } pub fn remove(&mut self, key: &CrdsValueLabel) -> Option { - let (index, _, value) = self.table.swap_remove_full(key)?; + let (index, _ /*label*/, value) = self.table.swap_remove_full(key)?; self.shards.remove(index, &value); if let CrdsData::ContactInfo(_) = value.value.data { self.nodes.swap_remove(&index); } + // Remove the index from records associated with the value's pubkey. + let pubkey = value.value.pubkey(); + let mut records_entry = match self.records.entry(pubkey) { + hash_map::Entry::Vacant(_) => panic!("this should not happen!"), + hash_map::Entry::Occupied(entry) => entry, + }; + records_entry.get_mut().swap_remove(&index); + if records_entry.get().is_empty() { + records_entry.remove(); + } // If index == self.table.len(), then the removed entry was the last // entry in the table, in which case no other keys were modified. // Otherwise, the previously last element in the table is now moved to @@ -297,6 +316,10 @@ impl Crds { self.nodes.swap_remove(&size); self.nodes.insert(index); } + let pubkey = value.value.pubkey(); + let records = self.records.get_mut(&pubkey).unwrap(); + records.swap_remove(&size); + records.insert(index); } Some(value) } @@ -308,6 +331,7 @@ mod test { use crate::contact_info::ContactInfo; use rand::{thread_rng, Rng}; use rayon::ThreadPoolBuilder; + use std::iter::repeat_with; #[test] fn test_insert() { @@ -353,8 +377,6 @@ mod test { ))); assert_eq!(crds.insert(val.clone(), 0), Ok(None)); - crds.update_label_timestamp(&val.label(), 1); - assert_eq!(crds.table[&val.label()].local_timestamp, 1); assert_eq!(crds.table[&val.label()].insert_timestamp, 0); let val2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); @@ -562,6 +584,45 @@ mod test { } } + #[test] + fn test_crds_records() { + fn check_crds_records(crds: &Crds) { + assert_eq!( + crds.table.len(), + crds.records.values().map(IndexSet::len).sum::() + ); + for (pubkey, indices) in &crds.records { + for index in indices { + let value = crds.table.index(*index); + assert_eq!(*pubkey, value.value.pubkey()); + } + } + } + let mut rng = thread_rng(); + let keypairs: Vec<_> = repeat_with(Keypair::new).take(128).collect(); + let mut crds = Crds::default(); + for k in 0..4096 { + let keypair = &keypairs[rng.gen_range(0, keypairs.len())]; + let value = VersionedCrdsValue::new_rand(&mut rng, Some(keypair)); + let _ = crds.insert_versioned(value); + if k % 64 == 0 { + check_crds_records(&crds); + } + } + assert!(crds.records.len() > 96); + assert!(crds.records.len() <= keypairs.len()); + // Remove values one by one and assert that records stay valid. + while !crds.table.is_empty() { + let index = rng.gen_range(0, crds.table.len()); + let key = crds.table.get_index(index).unwrap().0.clone(); + crds.remove(&key); + if crds.table.len() % 64 == 0 { + check_crds_records(&crds); + } + } + assert!(crds.records.is_empty()); + } + #[test] fn test_remove_staked() { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); diff --git a/core/src/crds_value.rs b/core/src/crds_value.rs index 74e59c0b60..5454668ea4 100644 --- a/core/src/crds_value.rs +++ b/core/src/crds_value.rs @@ -557,25 +557,6 @@ impl CrdsValue { } } - /// Return all the possible labels for a record identified by Pubkey. - /// Excludes NodeInstance, which is pushed priodically, and does not need - /// to update its local-timestmap. - pub fn record_labels(key: Pubkey) -> impl Iterator { - const CRDS_VALUE_LABEL_STUBS: [fn(Pubkey) -> CrdsValueLabel; 6] = [ - CrdsValueLabel::ContactInfo, - CrdsValueLabel::LowestSlot, - CrdsValueLabel::SnapshotHashes, - CrdsValueLabel::AccountsHashes, - CrdsValueLabel::LegacyVersion, - CrdsValueLabel::Version, - ]; - CRDS_VALUE_LABEL_STUBS - .iter() - .map(move |f| (f)(key)) - .chain((0..MAX_VOTES).map(move |ix| CrdsValueLabel::Vote(ix, key))) - .chain((0..MAX_EPOCH_SLOTS).map(move |ix| CrdsValueLabel::EpochSlots(ix, key))) - } - /// Returns the size (in bytes) of a CrdsValue pub fn size(&self) -> u64 { serialized_size(&self).expect("unable to serialize contact info") @@ -653,27 +634,6 @@ mod test { use std::cmp::Ordering; use std::iter::repeat_with; - #[test] - fn test_labels() { - let mut hits = [false; 6 + MAX_VOTES as usize + MAX_EPOCH_SLOTS as usize]; - // this method should cover all the possible labels - for v in CrdsValue::record_labels(Pubkey::default()) { - match &v { - CrdsValueLabel::ContactInfo(_) => hits[0] = true, - CrdsValueLabel::LowestSlot(_) => hits[1] = true, - CrdsValueLabel::SnapshotHashes(_) => hits[2] = true, - CrdsValueLabel::AccountsHashes(_) => hits[3] = true, - CrdsValueLabel::LegacyVersion(_) => hits[4] = true, - CrdsValueLabel::Version(_) => hits[5] = true, - CrdsValueLabel::NodeInstance(_, _) => panic!("NodeInstance!?"), - CrdsValueLabel::Vote(ix, _) => hits[*ix as usize + 6] = true, - CrdsValueLabel::EpochSlots(ix, _) => { - hits[*ix as usize + MAX_VOTES as usize + 6] = true - } - } - } - assert!(hits.iter().all(|x| *x)); - } #[test] fn test_keys_and_values() { let v = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));