From b58f69297fa771263694ddda5c517d41f8997a00 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Thu, 19 Nov 2020 20:57:40 +0000 Subject: [PATCH] makes crds fields private (#13703) Crds fields should maintain several invariants between themselves, so exposing them as public fields can be bug prone. In addition these invariants are asserted on every write: https://github.com/solana-labs/solana/blob/9668dd85d/core/src/crds.rs#L138-L154 https://github.com/solana-labs/solana/blob/9668dd85d/core/src/crds.rs#L239-L262 which adds extra instructions and is not optimal. Should these fields be private the asserts will be redundant. --- core/src/cluster_info.rs | 12 ++----- core/src/crds.rs | 64 ++++++++++++++++++++++++++++-------- core/src/crds_gossip_pull.rs | 13 +++----- core/src/crds_shards.rs | 2 -- core/tests/crds_gossip.rs | 2 +- 5 files changed, 57 insertions(+), 36 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 0dc8975b13..b0057dbd8f 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -996,7 +996,6 @@ impl ClusterInfo { let (labels, txs): (Vec, Vec) = self .time_gossip_read_lock("get_votes", &self.stats.get_votes) .crds - .table .iter() .filter(|(_, x)| x.insert_timestamp > since) .filter_map(|(label, x)| { @@ -1013,7 +1012,6 @@ impl ClusterInfo { pub fn get_snapshot_hash(&self, slot: Slot) -> Vec<(Pubkey, Hash)> { self.time_gossip_read_lock("get_snapshot_hash", &self.stats.get_snapshot_hash) .crds - .table .values() .filter_map(|x| x.value.snapshot_hash()) .filter_map(|x| { @@ -1033,7 +1031,6 @@ impl ClusterInfo { { self.time_gossip_read_lock("get_accounts_hash", &self.stats.get_accounts_hash) .crds - .table .get(&CrdsValueLabel::AccountsHashes(*pubkey)) .map(|x| &x.value.accounts_hash().unwrap().hashes) .map(map) @@ -1047,7 +1044,6 @@ impl ClusterInfo { .read() .unwrap() .crds - .table .get(&CrdsValueLabel::SnapshotHashes(*pubkey)) .map(|x| &x.value.snapshot_hash().unwrap().hashes) .map(map) @@ -1066,7 +1062,6 @@ impl ClusterInfo { .read() .unwrap() .crds - .table .get(&CrdsValueLabel::LowestSlot(*pubkey)) .filter(|x| { since @@ -1082,7 +1077,6 @@ impl ClusterInfo { .read() .unwrap() .crds - .table .values() .filter(|x| { since @@ -1102,7 +1096,6 @@ impl ClusterInfo { .read() .unwrap() .crds - .table .get(&CrdsValueLabel::Version(*pubkey)) .map(|x| x.value.version()) .flatten() @@ -1113,7 +1106,6 @@ impl ClusterInfo { .read() .unwrap() .crds - .table .get(&CrdsValueLabel::LegacyVersion(*pubkey)) .map(|x| x.value.legacy_version()) .flatten() @@ -2603,7 +2595,7 @@ impl ClusterInfo { let (table_size, purged_values_size, failed_inserts_size) = { let r_gossip = self.gossip.read().unwrap(); ( - r_gossip.crds.table.len(), + r_gossip.crds.len(), r_gossip.pull.purged_values.len(), r_gossip.pull.failed_inserts.len(), ) @@ -2873,7 +2865,7 @@ impl ClusterInfo { debug!( "{}: run_listen timeout, table size: {}", self.id(), - r_gossip.crds.table.len() + r_gossip.crds.len() ); } thread_mem_usage::datapoint("solana-listen"); diff --git a/core/src/crds.rs b/core/src/crds.rs index 00f88115b5..7c9c2a1516 100644 --- a/core/src/crds.rs +++ b/core/src/crds.rs @@ -28,7 +28,7 @@ use crate::contact_info::ContactInfo; use crate::crds_shards::CrdsShards; use crate::crds_value::{CrdsData, CrdsValue, CrdsValueLabel}; use bincode::serialize; -use indexmap::map::{Entry, IndexMap}; +use indexmap::map::{rayon::ParValues, Entry, IndexMap, Iter, Values}; use indexmap::set::IndexSet; use rayon::{prelude::*, ThreadPool}; use solana_sdk::hash::{hash, Hash}; @@ -44,9 +44,9 @@ const CRDS_SHARDS_BITS: u32 = 8; #[derive(Clone)] pub struct Crds { /// Stores the map of labels and values - pub table: IndexMap, - pub num_inserts: usize, - pub shards: CrdsShards, + table: IndexMap, + pub num_inserts: usize, // Only used in tests. + shards: CrdsShards, // Indices of all crds values which are node ContactInfo. nodes: IndexSet, } @@ -137,9 +137,9 @@ impl Crds { match self.table.entry(label) { Entry::Vacant(entry) => { let entry_index = entry.index(); - assert!(self.shards.insert(entry_index, &new_value)); + self.shards.insert(entry_index, &new_value); if let CrdsData::ContactInfo(_) = new_value.value.data { - assert!(self.nodes.insert(entry_index)); + self.nodes.insert(entry_index); } entry.insert(new_value); self.num_inserts += 1; @@ -147,8 +147,8 @@ impl Crds { } Entry::Occupied(mut entry) if *entry.get() < new_value => { let index = entry.index(); - assert!(self.shards.remove(index, entry.get())); - assert!(self.shards.insert(index, &new_value)); + self.shards.remove(index, entry.get()); + self.shards.insert(index, &new_value); self.num_inserts += 1; Ok(Some(entry.insert(new_value))) } @@ -178,6 +178,10 @@ impl Crds { self.table.get(label) } + pub fn get(&self, label: &CrdsValueLabel) -> Option<&VersionedCrdsValue> { + self.table.get(label) + } + pub fn get_contact_info(&self, pubkey: &Pubkey) -> Option<&ContactInfo> { let label = CrdsValueLabel::ContactInfo(*pubkey); self.table.get(&label)?.value.contact_info() @@ -196,6 +200,38 @@ impl Crds { }) } + pub fn len(&self) -> usize { + self.table.len() + } + + pub fn is_empty(&self) -> bool { + self.table.is_empty() + } + + pub fn iter(&self) -> Iter<'_, CrdsValueLabel, VersionedCrdsValue> { + self.table.iter() + } + + pub fn values(&self) -> Values<'_, CrdsValueLabel, VersionedCrdsValue> { + self.table.values() + } + + pub fn par_values(&self) -> ParValues<'_, CrdsValueLabel, VersionedCrdsValue> { + self.table.par_values() + } + + /// Returns all crds values which the first 'mask_bits' + /// of their hash value is equal to 'mask'. + pub fn filter_bitmask( + &self, + mask: u64, + mask_bits: u32, + ) -> impl Iterator { + self.shards + .find(mask, mask_bits) + .map(move |i| self.table.index(i)) + } + fn update_label_timestamp(&mut self, id: &CrdsValueLabel, now: u64) { if let Some(e) = self.table.get_mut(id) { e.local_timestamp = cmp::max(e.local_timestamp, now); @@ -238,9 +274,9 @@ impl Crds { pub fn remove(&mut self, key: &CrdsValueLabel) -> Option { let (index, _, value) = self.table.swap_remove_full(key)?; - assert!(self.shards.remove(index, &value)); + self.shards.remove(index, &value); if let CrdsData::ContactInfo(_) = value.value.data { - assert!(self.nodes.swap_remove(&index)); + self.nodes.swap_remove(&index); } // If index == self.table.len(), then the removed entry was the last // entry in the table, in which case no other keys were modified. @@ -250,11 +286,11 @@ impl Crds { let size = self.table.len(); if index < size { let value = self.table.index(index); - assert!(self.shards.remove(size, value)); - assert!(self.shards.insert(index, value)); + self.shards.remove(size, value); + self.shards.insert(index, value); if let CrdsData::ContactInfo(_) = value.value.data { - assert!(self.nodes.swap_remove(&size)); - assert!(self.nodes.insert(index)); + self.nodes.swap_remove(&size); + self.nodes.insert(index); } } Some(value) diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index ca0dd300a0..7fc3d16c1d 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -24,7 +24,6 @@ use std::cmp; use std::collections::VecDeque; use std::collections::{HashMap, HashSet}; use std::convert::TryInto; -use std::ops::Index; pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000; // The maximum age of a value received over pull responses @@ -451,12 +450,11 @@ impl CrdsGossipPull { const PAR_MIN_LENGTH: usize = 512; let num = cmp::max( CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS, - crds.table.len() + self.purged_values.len() + self.failed_inserts.len(), + crds.len() + self.purged_values.len() + self.failed_inserts.len(), ); let filters = CrdsFilterSet::new(num, bloom_size); thread_pool.install(|| { - crds.table - .par_values() + crds.par_values() .with_min_len(PAR_MIN_LENGTH) .map(|v| v.value_hash) .chain( @@ -499,10 +497,8 @@ impl CrdsGossipPull { return vec![]; } let caller_wallclock = caller_wallclock.checked_add(jitter).unwrap_or(0); - crds.shards - .find(filter.mask, filter.mask_bits) - .filter_map(|index| { - let item = crds.table.index(index); + crds.filter_bitmask(filter.mask, filter.mask_bits) + .filter_map(|item| { debug_assert!(filter.test_mask(&item.value_hash)); //skip values that are too new if item.value.wallclock() > caller_wallclock { @@ -874,7 +870,6 @@ mod test { let filters = crds_gossip_pull.build_crds_filters(&thread_pool, &crds, MAX_BLOOM_SIZE); assert_eq!(filters.len(), 32); let hash_values: Vec<_> = crds - .table .values() .map(|v| v.value_hash) .chain( diff --git a/core/src/crds_shards.rs b/core/src/crds_shards.rs index 9ff54628a0..8b0fc5df87 100644 --- a/core/src/crds_shards.rs +++ b/core/src/crds_shards.rs @@ -21,13 +21,11 @@ impl CrdsShards { } } - #[must_use] pub fn insert(&mut self, index: usize, value: &VersionedCrdsValue) -> bool { let hash = CrdsFilter::hash_as_u64(&value.value_hash); self.shard_mut(hash).insert(index, hash).is_none() } - #[must_use] pub fn remove(&mut self, index: usize, value: &VersionedCrdsValue) -> bool { let hash = CrdsFilter::hash_as_u64(&value.value_hash); self.shard_mut(hash).swap_remove(&index).is_some() diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index 96df3679db..6a3167a0e5 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -498,7 +498,7 @@ fn network_run_pull( } let total: usize = network_values .par_iter() - .map(|v| v.lock().unwrap().crds.table.len()) + .map(|v| v.lock().unwrap().crds.len()) .sum(); convergance = total as f64 / ((num * num) as f64); if convergance > max_convergance {