makes crds fields private (#13703)

Crds fields should maintain several invariants between themselves, so
exposing them as public fields can be bug prone. In addition these
invariants are asserted on every write:
https://github.com/solana-labs/solana/blob/9668dd85d/core/src/crds.rs#L138-L154
https://github.com/solana-labs/solana/blob/9668dd85d/core/src/crds.rs#L239-L262
which adds extra instructions and is not optimal. Should these fields be
private the asserts will be redundant.
This commit is contained in:
behzad nouri
2020-11-19 20:57:40 +00:00
committed by GitHub
parent 397cf726fc
commit b58f69297f
5 changed files with 57 additions and 36 deletions

View File

@ -996,7 +996,6 @@ impl ClusterInfo {
let (labels, txs): (Vec<CrdsValueLabel>, Vec<Transaction>) = self let (labels, txs): (Vec<CrdsValueLabel>, Vec<Transaction>) = self
.time_gossip_read_lock("get_votes", &self.stats.get_votes) .time_gossip_read_lock("get_votes", &self.stats.get_votes)
.crds .crds
.table
.iter() .iter()
.filter(|(_, x)| x.insert_timestamp > since) .filter(|(_, x)| x.insert_timestamp > since)
.filter_map(|(label, x)| { .filter_map(|(label, x)| {
@ -1013,7 +1012,6 @@ impl ClusterInfo {
pub fn get_snapshot_hash(&self, slot: Slot) -> Vec<(Pubkey, Hash)> { pub fn get_snapshot_hash(&self, slot: Slot) -> Vec<(Pubkey, Hash)> {
self.time_gossip_read_lock("get_snapshot_hash", &self.stats.get_snapshot_hash) self.time_gossip_read_lock("get_snapshot_hash", &self.stats.get_snapshot_hash)
.crds .crds
.table
.values() .values()
.filter_map(|x| x.value.snapshot_hash()) .filter_map(|x| x.value.snapshot_hash())
.filter_map(|x| { .filter_map(|x| {
@ -1033,7 +1031,6 @@ impl ClusterInfo {
{ {
self.time_gossip_read_lock("get_accounts_hash", &self.stats.get_accounts_hash) self.time_gossip_read_lock("get_accounts_hash", &self.stats.get_accounts_hash)
.crds .crds
.table
.get(&CrdsValueLabel::AccountsHashes(*pubkey)) .get(&CrdsValueLabel::AccountsHashes(*pubkey))
.map(|x| &x.value.accounts_hash().unwrap().hashes) .map(|x| &x.value.accounts_hash().unwrap().hashes)
.map(map) .map(map)
@ -1047,7 +1044,6 @@ impl ClusterInfo {
.read() .read()
.unwrap() .unwrap()
.crds .crds
.table
.get(&CrdsValueLabel::SnapshotHashes(*pubkey)) .get(&CrdsValueLabel::SnapshotHashes(*pubkey))
.map(|x| &x.value.snapshot_hash().unwrap().hashes) .map(|x| &x.value.snapshot_hash().unwrap().hashes)
.map(map) .map(map)
@ -1066,7 +1062,6 @@ impl ClusterInfo {
.read() .read()
.unwrap() .unwrap()
.crds .crds
.table
.get(&CrdsValueLabel::LowestSlot(*pubkey)) .get(&CrdsValueLabel::LowestSlot(*pubkey))
.filter(|x| { .filter(|x| {
since since
@ -1082,7 +1077,6 @@ impl ClusterInfo {
.read() .read()
.unwrap() .unwrap()
.crds .crds
.table
.values() .values()
.filter(|x| { .filter(|x| {
since since
@ -1102,7 +1096,6 @@ impl ClusterInfo {
.read() .read()
.unwrap() .unwrap()
.crds .crds
.table
.get(&CrdsValueLabel::Version(*pubkey)) .get(&CrdsValueLabel::Version(*pubkey))
.map(|x| x.value.version()) .map(|x| x.value.version())
.flatten() .flatten()
@ -1113,7 +1106,6 @@ impl ClusterInfo {
.read() .read()
.unwrap() .unwrap()
.crds .crds
.table
.get(&CrdsValueLabel::LegacyVersion(*pubkey)) .get(&CrdsValueLabel::LegacyVersion(*pubkey))
.map(|x| x.value.legacy_version()) .map(|x| x.value.legacy_version())
.flatten() .flatten()
@ -2603,7 +2595,7 @@ impl ClusterInfo {
let (table_size, purged_values_size, failed_inserts_size) = { let (table_size, purged_values_size, failed_inserts_size) = {
let r_gossip = self.gossip.read().unwrap(); let r_gossip = self.gossip.read().unwrap();
( (
r_gossip.crds.table.len(), r_gossip.crds.len(),
r_gossip.pull.purged_values.len(), r_gossip.pull.purged_values.len(),
r_gossip.pull.failed_inserts.len(), r_gossip.pull.failed_inserts.len(),
) )
@ -2873,7 +2865,7 @@ impl ClusterInfo {
debug!( debug!(
"{}: run_listen timeout, table size: {}", "{}: run_listen timeout, table size: {}",
self.id(), self.id(),
r_gossip.crds.table.len() r_gossip.crds.len()
); );
} }
thread_mem_usage::datapoint("solana-listen"); thread_mem_usage::datapoint("solana-listen");

View File

@ -28,7 +28,7 @@ use crate::contact_info::ContactInfo;
use crate::crds_shards::CrdsShards; use crate::crds_shards::CrdsShards;
use crate::crds_value::{CrdsData, CrdsValue, CrdsValueLabel}; use crate::crds_value::{CrdsData, CrdsValue, CrdsValueLabel};
use bincode::serialize; use bincode::serialize;
use indexmap::map::{Entry, IndexMap}; use indexmap::map::{rayon::ParValues, Entry, IndexMap, Iter, Values};
use indexmap::set::IndexSet; use indexmap::set::IndexSet;
use rayon::{prelude::*, ThreadPool}; use rayon::{prelude::*, ThreadPool};
use solana_sdk::hash::{hash, Hash}; use solana_sdk::hash::{hash, Hash};
@ -44,9 +44,9 @@ const CRDS_SHARDS_BITS: u32 = 8;
#[derive(Clone)] #[derive(Clone)]
pub struct Crds { pub struct Crds {
/// Stores the map of labels and values /// Stores the map of labels and values
pub table: IndexMap<CrdsValueLabel, VersionedCrdsValue>, table: IndexMap<CrdsValueLabel, VersionedCrdsValue>,
pub num_inserts: usize, pub num_inserts: usize, // Only used in tests.
pub shards: CrdsShards, shards: CrdsShards,
// Indices of all crds values which are node ContactInfo. // Indices of all crds values which are node ContactInfo.
nodes: IndexSet<usize>, nodes: IndexSet<usize>,
} }
@ -137,9 +137,9 @@ impl Crds {
match self.table.entry(label) { match self.table.entry(label) {
Entry::Vacant(entry) => { Entry::Vacant(entry) => {
let entry_index = entry.index(); let entry_index = entry.index();
assert!(self.shards.insert(entry_index, &new_value)); self.shards.insert(entry_index, &new_value);
if let CrdsData::ContactInfo(_) = new_value.value.data { if let CrdsData::ContactInfo(_) = new_value.value.data {
assert!(self.nodes.insert(entry_index)); self.nodes.insert(entry_index);
} }
entry.insert(new_value); entry.insert(new_value);
self.num_inserts += 1; self.num_inserts += 1;
@ -147,8 +147,8 @@ impl Crds {
} }
Entry::Occupied(mut entry) if *entry.get() < new_value => { Entry::Occupied(mut entry) if *entry.get() < new_value => {
let index = entry.index(); let index = entry.index();
assert!(self.shards.remove(index, entry.get())); self.shards.remove(index, entry.get());
assert!(self.shards.insert(index, &new_value)); self.shards.insert(index, &new_value);
self.num_inserts += 1; self.num_inserts += 1;
Ok(Some(entry.insert(new_value))) Ok(Some(entry.insert(new_value)))
} }
@ -178,6 +178,10 @@ impl Crds {
self.table.get(label) self.table.get(label)
} }
pub fn get(&self, label: &CrdsValueLabel) -> Option<&VersionedCrdsValue> {
self.table.get(label)
}
pub fn get_contact_info(&self, pubkey: &Pubkey) -> Option<&ContactInfo> { pub fn get_contact_info(&self, pubkey: &Pubkey) -> Option<&ContactInfo> {
let label = CrdsValueLabel::ContactInfo(*pubkey); let label = CrdsValueLabel::ContactInfo(*pubkey);
self.table.get(&label)?.value.contact_info() self.table.get(&label)?.value.contact_info()
@ -196,6 +200,38 @@ impl Crds {
}) })
} }
pub fn len(&self) -> usize {
self.table.len()
}
pub fn is_empty(&self) -> bool {
self.table.is_empty()
}
pub fn iter(&self) -> Iter<'_, CrdsValueLabel, VersionedCrdsValue> {
self.table.iter()
}
pub fn values(&self) -> Values<'_, CrdsValueLabel, VersionedCrdsValue> {
self.table.values()
}
pub fn par_values(&self) -> ParValues<'_, CrdsValueLabel, VersionedCrdsValue> {
self.table.par_values()
}
/// Returns all crds values which the first 'mask_bits'
/// of their hash value is equal to 'mask'.
pub fn filter_bitmask(
&self,
mask: u64,
mask_bits: u32,
) -> impl Iterator<Item = &VersionedCrdsValue> {
self.shards
.find(mask, mask_bits)
.map(move |i| self.table.index(i))
}
fn update_label_timestamp(&mut self, id: &CrdsValueLabel, now: u64) { fn update_label_timestamp(&mut self, id: &CrdsValueLabel, now: u64) {
if let Some(e) = self.table.get_mut(id) { if let Some(e) = self.table.get_mut(id) {
e.local_timestamp = cmp::max(e.local_timestamp, now); e.local_timestamp = cmp::max(e.local_timestamp, now);
@ -238,9 +274,9 @@ impl Crds {
pub fn remove(&mut self, key: &CrdsValueLabel) -> Option<VersionedCrdsValue> { pub fn remove(&mut self, key: &CrdsValueLabel) -> Option<VersionedCrdsValue> {
let (index, _, value) = self.table.swap_remove_full(key)?; let (index, _, value) = self.table.swap_remove_full(key)?;
assert!(self.shards.remove(index, &value)); self.shards.remove(index, &value);
if let CrdsData::ContactInfo(_) = value.value.data { if let CrdsData::ContactInfo(_) = value.value.data {
assert!(self.nodes.swap_remove(&index)); self.nodes.swap_remove(&index);
} }
// If index == self.table.len(), then the removed entry was the last // If index == self.table.len(), then the removed entry was the last
// entry in the table, in which case no other keys were modified. // entry in the table, in which case no other keys were modified.
@ -250,11 +286,11 @@ impl Crds {
let size = self.table.len(); let size = self.table.len();
if index < size { if index < size {
let value = self.table.index(index); let value = self.table.index(index);
assert!(self.shards.remove(size, value)); self.shards.remove(size, value);
assert!(self.shards.insert(index, value)); self.shards.insert(index, value);
if let CrdsData::ContactInfo(_) = value.value.data { if let CrdsData::ContactInfo(_) = value.value.data {
assert!(self.nodes.swap_remove(&size)); self.nodes.swap_remove(&size);
assert!(self.nodes.insert(index)); self.nodes.insert(index);
} }
} }
Some(value) Some(value)

View File

@ -24,7 +24,6 @@ use std::cmp;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::convert::TryInto; use std::convert::TryInto;
use std::ops::Index;
pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000; pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000;
// The maximum age of a value received over pull responses // The maximum age of a value received over pull responses
@ -451,12 +450,11 @@ impl CrdsGossipPull {
const PAR_MIN_LENGTH: usize = 512; const PAR_MIN_LENGTH: usize = 512;
let num = cmp::max( let num = cmp::max(
CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS,
crds.table.len() + self.purged_values.len() + self.failed_inserts.len(), crds.len() + self.purged_values.len() + self.failed_inserts.len(),
); );
let filters = CrdsFilterSet::new(num, bloom_size); let filters = CrdsFilterSet::new(num, bloom_size);
thread_pool.install(|| { thread_pool.install(|| {
crds.table crds.par_values()
.par_values()
.with_min_len(PAR_MIN_LENGTH) .with_min_len(PAR_MIN_LENGTH)
.map(|v| v.value_hash) .map(|v| v.value_hash)
.chain( .chain(
@ -499,10 +497,8 @@ impl CrdsGossipPull {
return vec![]; return vec![];
} }
let caller_wallclock = caller_wallclock.checked_add(jitter).unwrap_or(0); let caller_wallclock = caller_wallclock.checked_add(jitter).unwrap_or(0);
crds.shards crds.filter_bitmask(filter.mask, filter.mask_bits)
.find(filter.mask, filter.mask_bits) .filter_map(|item| {
.filter_map(|index| {
let item = crds.table.index(index);
debug_assert!(filter.test_mask(&item.value_hash)); debug_assert!(filter.test_mask(&item.value_hash));
//skip values that are too new //skip values that are too new
if item.value.wallclock() > caller_wallclock { if item.value.wallclock() > caller_wallclock {
@ -874,7 +870,6 @@ mod test {
let filters = crds_gossip_pull.build_crds_filters(&thread_pool, &crds, MAX_BLOOM_SIZE); let filters = crds_gossip_pull.build_crds_filters(&thread_pool, &crds, MAX_BLOOM_SIZE);
assert_eq!(filters.len(), 32); assert_eq!(filters.len(), 32);
let hash_values: Vec<_> = crds let hash_values: Vec<_> = crds
.table
.values() .values()
.map(|v| v.value_hash) .map(|v| v.value_hash)
.chain( .chain(

View File

@ -21,13 +21,11 @@ impl CrdsShards {
} }
} }
#[must_use]
pub fn insert(&mut self, index: usize, value: &VersionedCrdsValue) -> bool { pub fn insert(&mut self, index: usize, value: &VersionedCrdsValue) -> bool {
let hash = CrdsFilter::hash_as_u64(&value.value_hash); let hash = CrdsFilter::hash_as_u64(&value.value_hash);
self.shard_mut(hash).insert(index, hash).is_none() self.shard_mut(hash).insert(index, hash).is_none()
} }
#[must_use]
pub fn remove(&mut self, index: usize, value: &VersionedCrdsValue) -> bool { pub fn remove(&mut self, index: usize, value: &VersionedCrdsValue) -> bool {
let hash = CrdsFilter::hash_as_u64(&value.value_hash); let hash = CrdsFilter::hash_as_u64(&value.value_hash);
self.shard_mut(hash).swap_remove(&index).is_some() self.shard_mut(hash).swap_remove(&index).is_some()

View File

@ -498,7 +498,7 @@ fn network_run_pull(
} }
let total: usize = network_values let total: usize = network_values
.par_iter() .par_iter()
.map(|v| v.lock().unwrap().crds.table.len()) .map(|v| v.lock().unwrap().crds.len())
.sum(); .sum();
convergance = total as f64 / ((num * num) as f64); convergance = total as f64 / ((num * num) as f64);
if convergance > max_convergance { if convergance > max_convergance {