diff --git a/core/benches/crds_gossip_pull.rs b/core/benches/crds_gossip_pull.rs index 44c351f054..6d7d991342 100644 --- a/core/benches/crds_gossip_pull.rs +++ b/core/benches/crds_gossip_pull.rs @@ -29,13 +29,8 @@ fn bench_hash_as_u64(bencher: &mut Bencher) { fn bench_build_crds_filters(bencher: &mut Bencher) { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let mut rng = thread_rng(); - let mut crds_gossip_pull = CrdsGossipPull::default(); + let crds_gossip_pull = CrdsGossipPull::default(); let mut crds = Crds::default(); - for _ in 0..50_000 { - crds_gossip_pull - .purged_values - .push_back((solana_sdk::hash::new_rand(&mut rng), rng.gen())); - } let mut num_inserts = 0; for _ in 0..90_000 { if crds diff --git a/core/benches/crds_shards.rs b/core/benches/crds_shards.rs index 22e94603e7..abeb7762a2 100644 --- a/core/benches/crds_shards.rs +++ b/core/benches/crds_shards.rs @@ -19,7 +19,7 @@ fn new_test_crds_value(rng: &mut R) -> VersionedCrdsValue { let label = value.label(); let mut crds = Crds::default(); crds.insert(value, timestamp()).unwrap(); - crds.remove(&label).unwrap() + crds.get(&label).cloned().unwrap() } fn bench_crds_shards_find(bencher: &mut Bencher, num_values: usize, mask_bits: u32) { diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index c1fb5b2351..4d7385d288 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1745,18 +1745,15 @@ impl ClusterInfo { .chain(std::iter::once(self.id)) .collect(); let mut gossip = self.gossip.write().unwrap(); - match gossip.crds.trim(cap, &keep, stakes) { + match gossip.crds.trim(cap, &keep, stakes, timestamp()) { Err(err) => { self.stats.trim_crds_table_failed.add_relaxed(1); error!("crds table trim failed: {:?}", err); } - Ok(purged_values) => { - let now = timestamp(); + Ok(num_purged) => { self.stats .trim_crds_table_purged_values_count - .add_relaxed(purged_values.len() as u64); - let purged_values = purged_values.into_iter().map(|v| (v.value_hash, now)); - gossip.pull.purged_values.extend(purged_values); + .add_relaxed(num_purged as u64); } } } @@ -2380,8 +2377,7 @@ impl ClusterInfo { self.stats .skip_push_message_shred_version .add_relaxed(num_crds_values - num_filtered_crds_values); - // Origins' pubkeys of updated crds values. - // TODO: Should this also include origins of new crds values? + // Origins' pubkeys of upserted crds values. let origins: HashSet<_> = { let mut gossip = self.time_gossip_write_lock("process_push", &self.stats.process_push_message); @@ -2391,7 +2387,6 @@ impl ClusterInfo { .flat_map(|(from, crds_values)| { gossip.process_push_message(&from, crds_values, now) }) - .map(|v| v.value.pubkey()) .collect() }; // Generate prune messages. diff --git a/core/src/cluster_info_metrics.rs b/core/src/cluster_info_metrics.rs index 2bdb5e00c3..832e335ea6 100644 --- a/core/src/cluster_info_metrics.rs +++ b/core/src/cluster_info_metrics.rs @@ -128,7 +128,7 @@ pub(crate) fn submit_gossip_stats( ( gossip.crds.len(), gossip.crds.num_nodes(), - gossip.pull.purged_values.len(), + gossip.crds.num_purged(), gossip.pull.failed_inserts.len(), ) }; diff --git a/core/src/crds.rs b/core/src/crds.rs index 480c953a81..51596671ce 100644 --- a/core/src/crds.rs +++ b/core/src/crds.rs @@ -35,7 +35,7 @@ use solana_sdk::hash::{hash, Hash}; use solana_sdk::pubkey::Pubkey; use std::{ cmp::Ordering, - collections::{hash_map, BTreeMap, HashMap}, + collections::{hash_map, BTreeMap, HashMap, VecDeque}, ops::{Bound, Index, IndexMut}, }; @@ -56,13 +56,13 @@ pub struct Crds { records: HashMap>, // Indices of all entries keyed by insert order. entries: BTreeMap, + // Hash of recently purged values. + purged: VecDeque<(Hash, u64 /*timestamp*/)>, } #[derive(PartialEq, Debug)] pub enum CrdsError { - // Hash of the crds value which failed to insert should be recorded in - // failed_inserts to be excluded from the next pull-request. - InsertFailed(Hash), + InsertFailed, UnknownStakes, } @@ -116,6 +116,7 @@ impl Default for Crds { epoch_slots: BTreeMap::default(), records: HashMap::default(), entries: BTreeMap::default(), + purged: VecDeque::default(), } } } @@ -156,14 +157,10 @@ impl Crds { } } - pub fn insert( - &mut self, - value: CrdsValue, - local_timestamp: u64, - ) -> Result, CrdsError> { + pub fn insert(&mut self, value: CrdsValue, now: u64) -> Result<(), CrdsError> { let label = value.label(); let pubkey = value.pubkey(); - let value = VersionedCrdsValue::new(value, self.cursor, local_timestamp); + let value = VersionedCrdsValue::new(value, self.cursor, now); match self.table.entry(label) { Entry::Vacant(entry) => { let entry_index = entry.index(); @@ -184,7 +181,7 @@ impl Crds { self.records.entry(pubkey).or_default().insert(entry_index); self.cursor.consume(value.ordinal); entry.insert(value); - Ok(None) + Ok(()) } Entry::Occupied(mut entry) if overrides(&value.value, entry.get()) => { let entry_index = entry.index(); @@ -207,15 +204,20 @@ impl Crds { // does not need to be updated. debug_assert_eq!(entry.get().value.pubkey(), pubkey); self.cursor.consume(value.ordinal); - Ok(Some(entry.insert(value))) + self.purged.push_back((entry.get().value_hash, now)); + entry.insert(value); + Ok(()) } - _ => { + Entry::Occupied(entry) => { trace!( "INSERT FAILED data: {} new.wallclock: {}", value.value.label(), value.value.wallclock(), ); - Err(CrdsError::InsertFailed(value.value_hash)) + if entry.get().value_hash != value.value_hash { + self.purged.push_back((value.value_hash, now)); + } + Err(CrdsError::InsertFailed) } } } @@ -324,6 +326,24 @@ impl Crds { self.table.par_values() } + pub(crate) fn num_purged(&self) -> usize { + self.purged.len() + } + + pub(crate) fn purged(&self) -> impl IndexedParallelIterator + '_ { + self.purged.par_iter().map(|(hash, _)| *hash) + } + + /// Drops purged value hashes with timestamp less than the given one. + pub(crate) fn trim_purged(&mut self, timestamp: u64) { + let count = self + .purged + .iter() + .take_while(|(_, ts)| *ts < timestamp) + .count(); + self.purged.drain(..count); + } + /// Returns all crds values which the first 'mask_bits' /// of their hash value is equal to 'mask'. pub fn filter_bitmask( @@ -402,8 +422,12 @@ impl Crds { }) } - pub fn remove(&mut self, key: &CrdsValueLabel) -> Option { - let (index, _ /*label*/, value) = self.table.swap_remove_full(key)?; + pub fn remove(&mut self, key: &CrdsValueLabel, now: u64) { + let (index, _ /*label*/, value) = match self.table.swap_remove_full(key) { + Some(entry) => entry, + None => return, + }; + self.purged.push_back((value.value_hash, now)); self.shards.remove(index, &value); match value.value.data { CrdsData::ContactInfo(_) => { @@ -457,7 +481,6 @@ impl Crds { records.swap_remove(&size); records.insert(index); } - Some(value) } /// Returns true if the number of unique pubkeys in the table exceeds the @@ -478,12 +501,13 @@ impl Crds { // e.g. trusted validators, self pubkey, ... keep: &[Pubkey], stakes: &HashMap, - ) -> Result, CrdsError> { + now: u64, + ) -> Result { if self.should_trim(cap) { let size = self.records.len().saturating_sub(cap); - self.drop(size, keep, stakes) + self.drop(size, keep, stakes, now) } else { - Ok(Vec::default()) + Ok(0) } } @@ -493,7 +517,8 @@ impl Crds { size: usize, keep: &[Pubkey], stakes: &HashMap, - ) -> Result, CrdsError> { + now: u64, + ) -> Result { if stakes.is_empty() { return Err(CrdsError::UnknownStakes); } @@ -513,7 +538,10 @@ impl Crds { .flat_map(|k| &self.records[&k]) .map(|k| self.table.get_index(*k).unwrap().0.clone()) .collect(); - Ok(keys.iter().map(|k| self.remove(k).unwrap()).collect()) + for key in &keys { + self.remove(key, now); + } + Ok(keys.len()) } } @@ -534,7 +562,7 @@ mod test { fn test_insert() { let mut crds = Crds::default(); let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); - assert_eq!(crds.insert(val.clone(), 0).ok(), Some(None)); + assert_eq!(crds.insert(val.clone(), 0), Ok(())); assert_eq!(crds.table.len(), 1); assert!(crds.table.contains_key(&val.label())); assert_eq!(crds.table[&val.label()].local_timestamp, 0); @@ -543,12 +571,9 @@ mod test { fn test_update_old() { let mut crds = Crds::default(); let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); - let value_hash = hash(&serialize(&val).unwrap()); - assert_eq!(crds.insert(val.clone(), 0), Ok(None)); - assert_eq!( - crds.insert(val.clone(), 1), - Err(CrdsError::InsertFailed(value_hash)) - ); + assert_eq!(crds.insert(val.clone(), 0), Ok(())); + assert_eq!(crds.insert(val.clone(), 1), Err(CrdsError::InsertFailed)); + assert!(crds.purged.is_empty()); assert_eq!(crds.table[&val.label()].local_timestamp, 0); } #[test] @@ -558,15 +583,14 @@ mod test { &Pubkey::default(), 0, ))); - assert_matches!(crds.insert(original.clone(), 0), Ok(_)); + let value_hash = hash(&serialize(&original).unwrap()); + assert_matches!(crds.insert(original, 0), Ok(())); let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &Pubkey::default(), 1, ))); - assert_eq!( - crds.insert(val.clone(), 1).unwrap().unwrap().value, - original - ); + assert_eq!(crds.insert(val.clone(), 1), Ok(())); + assert_eq!(*crds.purged.back().unwrap(), (value_hash, 1)); assert_eq!(crds.table[&val.label()].local_timestamp, 1); } #[test] @@ -576,12 +600,13 @@ mod test { &Pubkey::default(), 0, ))); - assert_eq!(crds.insert(val.clone(), 0), Ok(None)); + assert_eq!(crds.insert(val.clone(), 0), Ok(())); assert_eq!(crds.table[&val.label()].ordinal, 0); let val2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); + let value_hash = hash(&serialize(&val2).unwrap()); assert_eq!(val2.label().pubkey(), val.label().pubkey()); - assert_matches!(crds.insert(val2.clone(), 0), Ok(Some(_))); + assert_eq!(crds.insert(val2.clone(), 0), Ok(())); crds.update_record_timestamp(&val.label().pubkey(), 2); assert_eq!(crds.table[&val.label()].local_timestamp, 2); @@ -596,7 +621,8 @@ mod test { let mut ci = ContactInfo::default(); ci.wallclock += 1; let val3 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci)); - assert_matches!(crds.insert(val3, 3), Ok(Some(_))); + assert_eq!(crds.insert(val3, 3), Ok(())); + assert_eq!(*crds.purged.back().unwrap(), (value_hash, 3)); assert_eq!(crds.table[&val2.label()].local_timestamp, 3); assert_eq!(crds.table[&val2.label()].ordinal, 2); } @@ -613,22 +639,20 @@ mod test { let pubkey = Pubkey::new_unique(); let node = NodeInstance::new(&mut rng, pubkey, now); let node = make_crds_value(node); - assert_eq!(crds.insert(node, now), Ok(None)); + assert_eq!(crds.insert(node, now), Ok(())); // A node-instance with a different key should insert fine even with // older timestamps. let other = NodeInstance::new(&mut rng, Pubkey::new_unique(), now - 1); let other = make_crds_value(other); - assert_eq!(crds.insert(other, now), Ok(None)); + assert_eq!(crds.insert(other, now), Ok(())); // A node-instance with older timestamp should fail to insert, even if // the wallclock is more recent. let other = NodeInstance::new(&mut rng, pubkey, now - 1); let other = other.with_wallclock(now + 1); let other = make_crds_value(other); let value_hash = hash(&serialize(&other).unwrap()); - assert_eq!( - crds.insert(other, now), - Err(CrdsError::InsertFailed(value_hash)) - ); + assert_eq!(crds.insert(other, now), Err(CrdsError::InsertFailed)); + assert_eq!(*crds.purged.back().unwrap(), (value_hash, now)); // A node instance with the same timestamp should insert only if the // random token is larger. let mut num_overrides = 0; @@ -637,8 +661,10 @@ mod test { let other = make_crds_value(other); let value_hash = hash(&serialize(&other).unwrap()); match crds.insert(other, now) { - Ok(Some(_)) => num_overrides += 1, - Err(CrdsError::InsertFailed(x)) => assert_eq!(x, value_hash), + Ok(()) => num_overrides += 1, + Err(CrdsError::InsertFailed) => { + assert_eq!(*crds.purged.back().unwrap(), (value_hash, now)) + } _ => panic!(), } } @@ -650,7 +676,7 @@ mod test { let other = other.with_wallclock(now - 1); let other = make_crds_value(other); match crds.insert(other, now) { - Ok(Some(_)) => (), + Ok(()) => (), _ => panic!(), } } @@ -661,7 +687,7 @@ mod test { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let mut crds = Crds::default(); let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); - assert_eq!(crds.insert(val.clone(), 1), Ok(None)); + assert_eq!(crds.insert(val.clone(), 1), Ok(())); let mut set = HashMap::new(); set.insert(Pubkey::default(), 0); assert!(crds.find_old_labels(&thread_pool, 0, &set).is_empty()); @@ -684,7 +710,7 @@ mod test { let mut timeouts = HashMap::new(); let val = CrdsValue::new_rand(&mut rng, None); timeouts.insert(Pubkey::default(), 3); - assert_eq!(crds.insert(val.clone(), 0), Ok(None)); + assert_eq!(crds.insert(val.clone(), 0), Ok(())); assert!(crds.find_old_labels(&thread_pool, 2, &timeouts).is_empty()); timeouts.insert(val.pubkey(), 1); assert_eq!( @@ -714,7 +740,7 @@ mod test { crds.find_old_labels(&thread_pool, 2, &set), vec![val.label()] ); - crds.remove(&val.label()); + crds.remove(&val.label(), /*now=*/ 0); assert!(crds.find_old_labels(&thread_pool, 2, &set).is_empty()); } #[test] @@ -722,7 +748,7 @@ mod test { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let mut crds = Crds::default(); let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); - assert_eq!(crds.insert(val.clone(), 1), Ok(None)); + assert_eq!(crds.insert(val.clone(), 1), Ok(())); let mut set = HashMap::new(); //now < timestamp set.insert(Pubkey::default(), 0); @@ -760,27 +786,19 @@ mod test { let keypairs: Vec<_> = std::iter::repeat_with(Keypair::new).take(256).collect(); let mut rng = thread_rng(); let mut num_inserts = 0; - let mut num_overrides = 0; for _ in 0..4096 { let keypair = &keypairs[rng.gen_range(0, keypairs.len())]; let value = CrdsValue::new_rand(&mut rng, Some(keypair)); let local_timestamp = new_rand_timestamp(&mut rng); - match crds.insert(value, local_timestamp) { - Ok(None) => { - num_inserts += 1; - check_crds_shards(&crds); - } - Ok(Some(_)) => { - num_inserts += 1; - num_overrides += 1; - check_crds_shards(&crds); - } - Err(_) => (), + if let Ok(()) = crds.insert(value, local_timestamp) { + num_inserts += 1; + check_crds_shards(&crds); } } assert_eq!(num_inserts, crds.cursor.0 as usize); assert!(num_inserts > 700); - assert!(num_overrides > 500); + assert!(crds.num_purged() > 500); + assert_eq!(crds.num_purged() + crds.table.len(), 4096); assert!(crds.table.len() > 200); assert!(num_inserts > crds.table.len()); check_crds_shards(&crds); @@ -788,7 +806,7 @@ mod test { while !crds.table.is_empty() { let index = rng.gen_range(0, crds.table.len()); let key = crds.table.get_index(index).unwrap().0.clone(); - crds.remove(&key); + crds.remove(&key, /*now=*/ 0); check_crds_shards(&crds); } } @@ -922,20 +940,12 @@ mod test { let keypairs: Vec<_> = repeat_with(Keypair::new).take(128).collect(); let mut crds = Crds::default(); let mut num_inserts = 0; - let mut num_overrides = 0; for k in 0..4096 { let keypair = &keypairs[rng.gen_range(0, keypairs.len())]; let value = CrdsValue::new_rand(&mut rng, Some(keypair)); let local_timestamp = new_rand_timestamp(&mut rng); - match crds.insert(value, local_timestamp) { - Ok(None) => { - num_inserts += 1; - } - Ok(Some(_)) => { - num_inserts += 1; - num_overrides += 1; - } - Err(_) => (), + if let Ok(()) = crds.insert(value, local_timestamp) { + num_inserts += 1; } if k % 16 == 0 { check_crds_value_indices(&mut rng, &crds); @@ -943,8 +953,9 @@ mod test { } assert_eq!(num_inserts, crds.cursor.0 as usize); assert!(num_inserts > 700); - assert!(num_overrides > 500); + assert!(crds.num_purged() > 500); assert!(crds.table.len() > 200); + assert_eq!(crds.num_purged() + crds.table.len(), 4096); assert!(num_inserts > crds.table.len()); let (num_nodes, num_votes, num_epoch_slots) = check_crds_value_indices(&mut rng, &crds); assert!(num_nodes * 3 < crds.table.len()); @@ -959,7 +970,7 @@ mod test { while !crds.table.is_empty() { let index = rng.gen_range(0, crds.table.len()); let key = crds.table.get_index(index).unwrap().0.clone(); - crds.remove(&key); + crds.remove(&key, /*now=*/ 0); if crds.table.len() % 16 == 0 { check_crds_value_indices(&mut rng, &crds); } @@ -998,7 +1009,7 @@ mod test { while !crds.table.is_empty() { let index = rng.gen_range(0, crds.table.len()); let key = crds.table.get_index(index).unwrap().0.clone(); - crds.remove(&key); + crds.remove(&key, /*now=*/ 0); if crds.table.len() % 64 == 0 { check_crds_records(&crds); } @@ -1007,6 +1018,7 @@ mod test { } #[test] + #[allow(clippy::needless_collect)] fn test_drop() { fn num_unique_pubkeys<'a, I>(values: I) -> usize where @@ -1035,7 +1047,15 @@ mod test { let num_pubkeys = num_unique_pubkeys(crds.table.values()); assert!(!crds.should_trim(num_pubkeys)); assert!(crds.should_trim(num_pubkeys * 5 / 6)); - let purged = crds.drop(16, &[], &stakes).unwrap(); + let values: Vec<_> = crds.table.values().cloned().collect(); + crds.drop(16, &[], &stakes, /*now=*/ 0).unwrap(); + let purged: Vec<_> = { + let purged: HashSet<_> = crds.purged.iter().map(|(hash, _)| hash).copied().collect(); + values + .into_iter() + .filter(|v| purged.contains(&v.value_hash)) + .collect() + }; assert_eq!(purged.len() + crds.table.len(), num_values); assert_eq!(num_unique_pubkeys(&purged), 16); assert_eq!(num_unique_pubkeys(crds.table.values()), num_pubkeys - 16); @@ -1072,7 +1092,7 @@ mod test { crds.find_old_labels(&thread_pool, 2, &set), vec![val.label()] ); - crds.remove(&val.label()); + crds.remove(&val.label(), /*now=*/ 0); assert!(crds.find_old_labels(&thread_pool, 2, &set).is_empty()); } diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index 2177fc990a..5a2d08a644 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -6,7 +6,7 @@ use crate::{ cluster_info::Ping, contact_info::ContactInfo, - crds::{Crds, VersionedCrdsValue}, + crds::Crds, crds_gossip_error::CrdsGossipError, crds_gossip_pull::{CrdsFilter, CrdsGossipPull, ProcessPullStats}, crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE}, @@ -59,21 +59,21 @@ impl CrdsGossip { } /// process a push message to the network + /// Returns origins' pubkeys of upserted values. pub fn process_push_message( &mut self, from: &Pubkey, values: Vec, now: u64, - ) -> Vec { + ) -> Vec { values .into_iter() - .filter_map(|val| { - let old = self - .push + .flat_map(|val| { + let origin = val.pubkey(); + self.push .process_push_message(&mut self.crds, from, val, now) .ok()?; - self.pull.record_old_hash(old.as_ref()?.value_hash, now); - old + Some(origin) }) .collect() } @@ -325,10 +325,8 @@ impl CrdsGossip { .pull .purge_active(thread_pool, &mut self.crds, now, &timeouts); } - if now > 5 * self.pull.crds_timeout { - let min = now - 5 * self.pull.crds_timeout; - self.pull.purge_purged(min); - } + self.crds + .trim_purged(now.saturating_sub(5 * self.pull.crds_timeout)); self.pull.purge_failed_inserts(now); rv } diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index bf5c04f830..30171cdb0e 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -12,7 +12,7 @@ use crate::{ cluster_info::{Ping, CRDS_UNIQUE_PUBKEY_CAPACITY}, contact_info::ContactInfo, - crds::{Crds, CrdsError}, + crds::Crds, crds_gossip::{get_stake, get_weight}, crds_gossip_error::CrdsGossipError, crds_value::CrdsValue, @@ -182,8 +182,6 @@ pub struct ProcessPullStats { pub struct CrdsGossipPull { /// timestamp of last request pub(crate) pull_request_time: LruCache, - /// hash and insert time - pub purged_values: VecDeque<(Hash, u64)>, // Hash value and record time (ms) of the pull responses which failed to be // inserted in crds table; Preserved to stop the sender to send back the // same outdated payload again by adding them to the filter for the next @@ -197,7 +195,6 @@ pub struct CrdsGossipPull { impl Default for CrdsGossipPull { fn default() -> Self { Self { - purged_values: VecDeque::new(), pull_request_time: LruCache::new(CRDS_UNIQUE_PUBKEY_CAPACITY), failed_inserts: VecDeque::new(), crds_timeout: CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, @@ -321,21 +318,14 @@ impl CrdsGossipPull { self.pull_request_time.put(from, now); } - /// Store an old hash in the purged values set - pub fn record_old_hash(&mut self, hash: Hash, timestamp: u64) { - self.purged_values.push_back((hash, timestamp)) - } - /// process a pull request pub fn process_pull_requests(&mut self, crds: &mut Crds, callers: I, now: u64) where I: IntoIterator, { for caller in callers { - let key = caller.label().pubkey(); - if let Ok(Some(val)) = crds.insert(caller, now) { - self.purged_values.push_back((val.value_hash, now)); - } + let key = caller.pubkey(); + let _ = crds.insert(caller, now); crds.update_record_timestamp(&key, now); } } @@ -409,32 +399,20 @@ impl CrdsGossipPull { from: &Pubkey, responses: Vec, responses_expired_timeout: Vec, - mut failed_inserts: Vec, + failed_inserts: Vec, now: u64, stats: &mut ProcessPullStats, ) { let mut owners = HashSet::new(); for response in responses_expired_timeout { - match crds.insert(response, now) { - Ok(None) => (), - Ok(Some(old)) => self.purged_values.push_back((old.value_hash, now)), - Err(CrdsError::InsertFailed(value_hash)) => failed_inserts.push(value_hash), - Err(CrdsError::UnknownStakes) => (), - } + let _ = crds.insert(response, now); } for response in responses { let owner = response.pubkey(); - match crds.insert(response, now) { - Err(CrdsError::InsertFailed(value_hash)) => failed_inserts.push(value_hash), - Err(CrdsError::UnknownStakes) => (), - Ok(old) => { - stats.success += 1; - self.num_pulls += 1; - owners.insert(owner); - if let Some(val) = old { - self.purged_values.push_back((val.value_hash, now)) - } - } + if let Ok(()) = crds.insert(response, now) { + stats.success += 1; + self.num_pulls += 1; + owners.insert(owner); } } owners.insert(*from); @@ -472,19 +450,14 @@ impl CrdsGossipPull { const MIN_NUM_BLOOM_ITEMS: usize = 512; #[cfg(not(debug_assertions))] const MIN_NUM_BLOOM_ITEMS: usize = 65_536; - let num_items = crds.len() + self.purged_values.len() + self.failed_inserts.len(); + let num_items = crds.len() + crds.num_purged() + self.failed_inserts.len(); let num_items = MIN_NUM_BLOOM_ITEMS.max(num_items); let filters = CrdsFilterSet::new(num_items, bloom_size); thread_pool.install(|| { crds.par_values() .with_min_len(PAR_MIN_LENGTH) .map(|v| v.value_hash) - .chain( - self.purged_values - .par_iter() - .with_min_len(PAR_MIN_LENGTH) - .map(|(v, _)| *v), - ) + .chain(crds.purged().with_min_len(PAR_MIN_LENGTH)) .chain( self.failed_inserts .par_iter() @@ -576,7 +549,6 @@ impl CrdsGossipPull { } /// Purge values from the crds that are older then `active_timeout` - /// The value_hash of an active item is put into self.purged_values queue pub fn purge_active( &mut self, thread_pool: &ThreadPool, @@ -584,25 +556,11 @@ impl CrdsGossipPull { now: u64, timeouts: &HashMap, ) -> usize { - let num_purged_values = self.purged_values.len(); - self.purged_values.extend( - crds.find_old_labels(thread_pool, now, timeouts) - .into_iter() - .filter_map(|label| { - let val = crds.remove(&label)?; - Some((val.value_hash, now)) - }), - ); - self.purged_values.len() - num_purged_values - } - /// Purge values from the `self.purged_values` queue that are older then purge_timeout - pub fn purge_purged(&mut self, min_ts: u64) { - let cnt = self - .purged_values - .iter() - .take_while(|v| v.1 < min_ts) - .count(); - self.purged_values.drain(..cnt); + let labels = crds.find_old_labels(thread_pool, now, timeouts); + for label in &labels { + crds.remove(label, now); + } + labels.len() } /// For legacy tests @@ -642,7 +600,6 @@ impl CrdsGossipPull { } Self { pull_request_time, - purged_values: self.purged_values.clone(), failed_inserts: self.failed_inserts.clone(), ..*self } @@ -655,7 +612,7 @@ pub(crate) mod tests { use crate::contact_info::ContactInfo; use crate::crds_value::{CrdsData, Vote}; use itertools::Itertools; - use rand::thread_rng; + use rand::{seq::SliceRandom, thread_rng}; use rayon::ThreadPoolBuilder; use solana_perf::test_tx::test_tx; use solana_sdk::{ @@ -906,37 +863,23 @@ pub(crate) mod tests { fn test_build_crds_filter() { let mut rng = thread_rng(); let thread_pool = ThreadPoolBuilder::new().build().unwrap(); - let mut crds_gossip_pull = CrdsGossipPull::default(); + let crds_gossip_pull = CrdsGossipPull::default(); let mut crds = Crds::default(); - for _ in 0..10_000 { - crds_gossip_pull - .purged_values - .push_back((solana_sdk::hash::new_rand(&mut rng), rng.gen())); - } + let keypairs: Vec<_> = repeat_with(Keypair::new).take(10_000).collect(); let mut num_inserts = 0; - for _ in 0..20_000 { - if crds - .insert(CrdsValue::new_rand(&mut rng, None), rng.gen()) - .is_ok() - { + for _ in 0..40_000 { + let keypair = keypairs.choose(&mut rng).unwrap(); + let value = CrdsValue::new_rand(&mut rng, Some(keypair)); + if crds.insert(value, rng.gen()).is_ok() { num_inserts += 1; } } - assert_eq!(num_inserts, 20_000); + assert!(num_inserts > 30_000, "num inserts: {}", num_inserts); let filters = crds_gossip_pull.build_crds_filters(&thread_pool, &crds, MAX_BLOOM_SIZE); assert_eq!(filters.len(), MIN_NUM_BLOOM_FILTERS.max(32)); - let hash_values: Vec<_> = crds - .values() - .map(|v| v.value_hash) - .chain( - crds_gossip_pull - .purged_values - .iter() - .map(|(value_hash, _)| value_hash) - .cloned(), - ) - .collect(); - assert_eq!(hash_values.len(), 10_000 + 20_000); + let purged: Vec<_> = thread_pool.install(|| crds.purged().collect()); + let hash_values: Vec<_> = crds.values().map(|v| v.value_hash).chain(purged).collect(); + assert_eq!(hash_values.len(), 40_000); let mut false_positives = 0; for hash_value in hash_values { let mut num_hits = 0; @@ -951,7 +894,7 @@ pub(crate) mod tests { } assert_eq!(num_hits, 1); } - assert!(false_positives < 50_000, "fp: {}", false_positives); + assert!(false_positives < 150_000, "fp: {}", false_positives); } #[test] @@ -1407,7 +1350,7 @@ pub(crate) mod tests { assert_eq!(node_crds.lookup(&node_label).unwrap().label(), node_label); assert_eq!(node_crds.lookup_versioned(&old.label()), None); - assert_eq!(node.purged_values.len(), 1); + assert_eq!(node_crds.num_purged(), 1); for _ in 0..30 { // there is a chance of a false positive with bloom filters // assert that purged value is still in the set @@ -1417,8 +1360,8 @@ pub(crate) mod tests { } // purge the value - node.purge_purged(node.crds_timeout + 1); - assert_eq!(node.purged_values.len(), 0); + node_crds.trim_purged(node.crds_timeout + 1); + assert_eq!(node_crds.num_purged(), 0); } #[test] #[allow(clippy::float_cmp)] diff --git a/core/src/crds_gossip_push.rs b/core/src/crds_gossip_push.rs index b6a914df86..8775ea60a4 100644 --- a/core/src/crds_gossip_push.rs +++ b/core/src/crds_gossip_push.rs @@ -11,7 +11,7 @@ use crate::{ cluster_info::CRDS_UNIQUE_PUBKEY_CAPACITY, contact_info::ContactInfo, - crds::{Crds, Cursor, VersionedCrdsValue}, + crds::{Crds, Cursor}, crds_gossip::{get_stake, get_weight}, crds_gossip_error::CrdsGossipError, crds_value::CrdsValue, @@ -169,7 +169,7 @@ impl CrdsGossipPush { from: &Pubkey, value: CrdsValue, now: u64, - ) -> Result, CrdsGossipError> { + ) -> Result<(), CrdsGossipError> { self.num_total += 1; if !self.wallclock_window(now).contains(&value.wallclock()) { return Err(CrdsGossipError::PushMessageTimeout); @@ -457,7 +457,7 @@ mod test { // push a new message assert_eq!( push.process_push_message(&mut crds, &Pubkey::default(), value.clone(), 0), - Ok(None) + Ok(()) ); assert_eq!(crds.lookup(&label), Some(&value)); @@ -478,7 +478,7 @@ mod test { // push a new message assert_eq!( push.process_push_message(&mut crds, &Pubkey::default(), value, 0), - Ok(None) + Ok(()) ); // push an old version @@ -522,19 +522,16 @@ mod test { // push a new message assert_eq!( - push.process_push_message(&mut crds, &Pubkey::default(), value_old.clone(), 0), - Ok(None) + push.process_push_message(&mut crds, &Pubkey::default(), value_old, 0), + Ok(()) ); // push an old version ci.wallclock = 1; let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci)); assert_eq!( - push.process_push_message(&mut crds, &Pubkey::default(), value, 0) - .unwrap() - .unwrap() - .value, - value_old + push.process_push_message(&mut crds, &Pubkey::default(), value, 0), + Ok(()) ); } #[test] @@ -555,7 +552,7 @@ mod test { 0, ))); - assert_eq!(crds.insert(value1.clone(), now), Ok(None)); + assert_eq!(crds.insert(value1.clone(), now), Ok(())); push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); assert!(push.active_set.get(&value1.label().pubkey()).is_some()); @@ -564,7 +561,7 @@ mod test { 0, ))); assert!(push.active_set.get(&value2.label().pubkey()).is_none()); - assert_eq!(crds.insert(value2.clone(), now), Ok(None)); + assert_eq!(crds.insert(value2.clone(), now), Ok(())); for _ in 0..30 { push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); if push.active_set.get(&value2.label().pubkey()).is_some() { @@ -577,7 +574,7 @@ mod test { let value2 = CrdsValue::new_unsigned(CrdsData::ContactInfo( ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0), )); - assert_eq!(crds.insert(value2.clone(), now), Ok(None)); + assert_eq!(crds.insert(value2.clone(), now), Ok(())); } push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); assert_eq!(push.active_set.len(), push.num_active); @@ -735,7 +732,7 @@ mod test { &solana_sdk::pubkey::new_rand(), 0, ))); - assert_eq!(crds.insert(peer.clone(), now), Ok(None)); + assert_eq!(crds.insert(peer.clone(), now), Ok(())); push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( @@ -746,7 +743,7 @@ mod test { expected.insert(peer.label().pubkey(), vec![new_msg.clone()]); assert_eq!( push.process_push_message(&mut crds, &Pubkey::default(), new_msg, 0), - Ok(None) + Ok(()) ); assert_eq!(push.active_set.len(), 1); assert_eq!(push.new_push_messages(&crds, 0), expected); @@ -765,11 +762,11 @@ mod test { CrdsValue::new_unsigned(CrdsData::ContactInfo(peer)) }) .collect(); - assert_eq!(crds.insert(peers[0].clone(), now), Ok(None)); - assert_eq!(crds.insert(peers[1].clone(), now), Ok(None)); + assert_eq!(crds.insert(peers[0].clone(), now), Ok(())); + assert_eq!(crds.insert(peers[1].clone(), now), Ok(())); assert_eq!( push.process_push_message(&mut crds, &Pubkey::default(), peers[2].clone(), now), - Ok(None) + Ok(()) ); push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); @@ -792,7 +789,7 @@ mod test { &solana_sdk::pubkey::new_rand(), 0, ))); - assert_eq!(crds.insert(peer.clone(), 0), Ok(None)); + assert_eq!(crds.insert(peer.clone(), 0), Ok(())); push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( @@ -802,7 +799,7 @@ mod test { let expected = HashMap::new(); assert_eq!( push.process_push_message(&mut crds, &Pubkey::default(), new_msg.clone(), 0), - Ok(None) + Ok(()) ); push.process_prune_msg( &self_id, @@ -819,7 +816,7 @@ mod test { &solana_sdk::pubkey::new_rand(), 0, ))); - assert_eq!(crds.insert(peer, 0), Ok(None)); + assert_eq!(crds.insert(peer, 0), Ok(())); push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); let mut ci = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); @@ -828,7 +825,7 @@ mod test { let expected = HashMap::new(); assert_eq!( push.process_push_message(&mut crds, &Pubkey::default(), new_msg, 1), - Ok(None) + Ok(()) ); assert_eq!(push.new_push_messages(&crds, 0), expected); } @@ -844,7 +841,7 @@ mod test { // push a new message assert_eq!( push.process_push_message(&mut crds, &Pubkey::default(), value.clone(), 0), - Ok(None) + Ok(()) ); assert_eq!(crds.lookup(&label), Some(&value)); diff --git a/core/src/crds_shards.rs b/core/src/crds_shards.rs index 006d0e2995..74bef44b85 100644 --- a/core/src/crds_shards.rs +++ b/core/src/crds_shards.rs @@ -140,7 +140,7 @@ mod test { let label = value.label(); let mut crds = Crds::default(); crds.insert(value, timestamp()).unwrap(); - crds.remove(&label).unwrap() + crds.get(&label).cloned().unwrap() } // Returns true if the first mask_bits most significant bits of hash is the diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index ad93e39f68..732f7eb9bb 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -351,17 +351,14 @@ fn network_run_push( for (to, msgs) in push_messages { bytes += serialized_size(&msgs).unwrap() as usize; num_msgs += 1; - let updated = network + let origins: HashSet<_> = network .get(&to) - .map(|node| { - node.lock() - .unwrap() - .process_push_message(&from, msgs.clone(), now) - }) - .unwrap(); - - let origins: HashSet<_> = - updated.into_iter().map(|u| u.value.pubkey()).collect(); + .unwrap() + .lock() + .unwrap() + .process_push_message(&from, msgs.clone(), now) + .into_iter() + .collect(); let prunes_map = network .get(&to) .map(|node| node.lock().unwrap().prune_received_cache(origins, &stakes))