diff --git a/core/benches/crds.rs b/core/benches/crds.rs index c415febfee..cafd82fdb9 100644 --- a/core/benches/crds.rs +++ b/core/benches/crds.rs @@ -17,7 +17,7 @@ fn bench_find_old_labels(bencher: &mut Bencher) { let mut rng = thread_rng(); let mut crds = Crds::default(); let now = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS + CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 1000; - std::iter::repeat_with(|| (CrdsValue::new_rand(&mut rng), rng.gen_range(0, now))) + std::iter::repeat_with(|| (CrdsValue::new_rand(&mut rng, None), rng.gen_range(0, now))) .take(50_000) .for_each(|(v, ts)| assert!(crds.insert(v, ts).is_ok())); let mut timeouts = HashMap::new(); diff --git a/core/benches/crds_gossip_pull.rs b/core/benches/crds_gossip_pull.rs index 33f85fb3b7..44c351f054 100644 --- a/core/benches/crds_gossip_pull.rs +++ b/core/benches/crds_gossip_pull.rs @@ -39,7 +39,7 @@ fn bench_build_crds_filters(bencher: &mut Bencher) { let mut num_inserts = 0; for _ in 0..90_000 { if crds - .insert(CrdsValue::new_rand(&mut rng), rng.gen()) + .insert(CrdsValue::new_rand(&mut rng, None), rng.gen()) .is_ok() { num_inserts += 1; diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index a4e7b05cdf..c7bb9f26d5 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -624,15 +624,13 @@ impl ClusterInfo { &self, gossip_addr: &SocketAddr, ) -> Option { - for versioned_value in self.gossip.read().unwrap().crds.table.values() { - if let Some(contact_info) = CrdsValue::contact_info(&versioned_value.value) { - if contact_info.gossip == *gossip_addr { - return Some(contact_info.clone()); - } - } - } - - None + self.gossip + .read() + .unwrap() + .crds + .get_nodes_contact_info() + .find(|peer| peer.gossip == *gossip_addr) + .cloned() } pub fn my_contact_info(&self) -> ContactInfo { @@ -1105,9 +1103,7 @@ impl ClusterInfo { .read() .unwrap() .crds - .table - .values() - .filter_map(|x| x.value.contact_info()) + .get_nodes_contact_info() .filter(|x| x.id != self.id() && ContactInfo::is_valid_address(&x.rpc)) .cloned() .collect() @@ -1119,13 +1115,8 @@ impl ClusterInfo { .read() .unwrap() .crds - .table - .values() - .filter_map(|x| { - x.value - .contact_info() - .map(|ci| (ci.clone(), x.local_timestamp)) - }) + .get_nodes() + .map(|x| (x.value.contact_info().unwrap().clone(), x.local_timestamp)) .collect() } @@ -1135,9 +1126,7 @@ impl ClusterInfo { .read() .unwrap() .crds - .table - .values() - .filter_map(|x| x.value.contact_info()) + .get_nodes_contact_info() // shred_version not considered for gossip peers (ie, spy nodes do not set shred_version) .filter(|x| x.id != me && ContactInfo::is_valid_address(&x.gossip)) .cloned() @@ -1148,9 +1137,7 @@ impl ClusterInfo { pub fn all_tvu_peers(&self) -> Vec { self.time_gossip_read_lock("all_tvu_peers", &self.stats.all_tvu_peers) .crds - .table - .values() - .filter_map(|x| x.value.contact_info()) + .get_nodes_contact_info() .filter(|x| ContactInfo::is_valid_address(&x.tvu) && x.id != self.id()) .cloned() .collect() @@ -1160,9 +1147,7 @@ impl ClusterInfo { pub fn tvu_peers(&self) -> Vec { self.time_gossip_read_lock("tvu_peers", &self.stats.tvu_peers) .crds - .table - .values() - .filter_map(|x| x.value.contact_info()) + .get_nodes_contact_info() .filter(|x| { ContactInfo::is_valid_address(&x.tvu) && x.id != self.id() @@ -1176,9 +1161,7 @@ impl ClusterInfo { pub fn retransmit_peers(&self) -> Vec { self.time_gossip_read_lock("retransmit_peers", &self.stats.retransmit_peers) .crds - .table - .values() - .filter_map(|x| x.value.contact_info()) + .get_nodes_contact_info() .filter(|x| { x.id != self.id() && x.shred_version == self.my_shred_version() @@ -1294,9 +1277,7 @@ impl ClusterInfo { .read() .unwrap() .crds - .table - .values() - .filter_map(|x| x.value.contact_info()) + .get_nodes_contact_info() .filter(|x| x.id != self.id() && ContactInfo::is_valid_address(&x.tpu)) .cloned() .collect() @@ -1494,14 +1475,8 @@ impl ClusterInfo { let found_entrypoint = self .time_gossip_read_lock("entrypoint", &self.stats.entrypoint) .crds - .table - .iter() - .any(|(_, v)| { - v.value - .contact_info() - .map(|ci| ci.gossip == entrypoint.gossip) - .unwrap_or(false) - }); + .get_nodes_contact_info() + .any(|node| node.gossip == entrypoint.gossip); !found_entrypoint } } diff --git a/core/src/contact_info.rs b/core/src/contact_info.rs index b12f42ca5c..ea43093f7e 100644 --- a/core/src/contact_info.rs +++ b/core/src/contact_info.rs @@ -124,6 +124,14 @@ impl ContactInfo { } } + /// New random ContactInfo for tests and simulations. + pub(crate) fn new_rand(rng: &mut R, pubkey: Option) -> Self { + let delay = 10 * 60 * 1000; // 10 minutes + let now = timestamp() - delay + rng.gen_range(0, 2 * delay); + let pubkey = pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand); + ContactInfo::new_localhost(&pubkey, now) + } + #[cfg(test)] /// ContactInfo with multicast addresses for adversarial testing. pub fn new_multicast() -> Self { diff --git a/core/src/crds.rs b/core/src/crds.rs index 24d5561a78..00f88115b5 100644 --- a/core/src/crds.rs +++ b/core/src/crds.rs @@ -26,12 +26,15 @@ use crate::contact_info::ContactInfo; use crate::crds_shards::CrdsShards; -use crate::crds_value::{CrdsValue, CrdsValueLabel}; +use crate::crds_value::{CrdsData, CrdsValue, CrdsValueLabel}; use bincode::serialize; use indexmap::map::{Entry, IndexMap}; +use indexmap::set::IndexSet; use rayon::{prelude::*, ThreadPool}; use solana_sdk::hash::{hash, Hash}; use solana_sdk::pubkey::Pubkey; +use solana_sdk::signature::Keypair; +use solana_sdk::timing::timestamp; use std::cmp; use std::collections::HashMap; use std::ops::Index; @@ -44,6 +47,8 @@ pub struct Crds { pub table: IndexMap, pub num_inserts: usize, pub shards: CrdsShards, + // Indices of all crds values which are node ContactInfo. + nodes: IndexSet, } #[derive(PartialEq, Debug)] @@ -86,14 +91,22 @@ impl VersionedCrdsValue { value_hash, } } + + /// New random VersionedCrdsValue for tests and simulations. + pub fn new_rand(rng: &mut R, keypair: Option<&Keypair>) -> Self { + let delay = 10 * 60 * 1000; // 10 minutes + let now = timestamp() - delay + rng.gen_range(0, 2 * delay); + Self::new(now, CrdsValue::new_rand(rng, keypair)) + } } impl Default for Crds { fn default() -> Self { Crds { - table: IndexMap::new(), + table: IndexMap::default(), num_inserts: 0, shards: CrdsShards::new(CRDS_SHARDS_BITS), + nodes: IndexSet::default(), } } } @@ -123,7 +136,11 @@ impl Crds { let label = new_value.value.label(); match self.table.entry(label) { Entry::Vacant(entry) => { - assert!(self.shards.insert(entry.index(), &new_value)); + let entry_index = entry.index(); + assert!(self.shards.insert(entry_index, &new_value)); + if let CrdsData::ContactInfo(_) = new_value.value.data { + assert!(self.nodes.insert(entry_index)); + } entry.insert(new_value); self.num_inserts += 1; Ok(None) @@ -166,6 +183,19 @@ impl Crds { self.table.get(&label)?.value.contact_info() } + /// Returns all entries which are ContactInfo. + pub fn get_nodes(&self) -> impl Iterator { + self.nodes.iter().map(move |i| self.table.index(*i)) + } + + /// Returns ContactInfo of all known nodes. + pub fn get_nodes_contact_info(&self) -> impl Iterator { + self.get_nodes().map(|v| match &v.value.data { + CrdsData::ContactInfo(info) => info, + _ => panic!("this should not happen!"), + }) + } + fn update_label_timestamp(&mut self, id: &CrdsValueLabel, now: u64) { if let Some(e) = self.table.get_mut(id) { e.local_timestamp = cmp::max(e.local_timestamp, now); @@ -209,12 +239,23 @@ impl Crds { pub fn remove(&mut self, key: &CrdsValueLabel) -> Option { let (index, _, value) = self.table.swap_remove_full(key)?; assert!(self.shards.remove(index, &value)); - // The previously last element in the table is now moved to the - // 'index' position. Shards need to be updated accordingly. - if index < self.table.len() { + if let CrdsData::ContactInfo(_) = value.value.data { + assert!(self.nodes.swap_remove(&index)); + } + // If index == self.table.len(), then the removed entry was the last + // entry in the table, in which case no other keys were modified. + // Otherwise, the previously last element in the table is now moved to + // the 'index' position; and so shards and nodes need to be updated + // accordingly. + let size = self.table.len(); + if index < size { let value = self.table.index(index); - assert!(self.shards.remove(self.table.len(), value)); + assert!(self.shards.remove(size, value)); assert!(self.shards.insert(index, value)); + if let CrdsData::ContactInfo(_) = value.value.data { + assert!(self.nodes.swap_remove(&size)); + assert!(self.nodes.insert(index)); + } } Some(value) } @@ -224,7 +265,6 @@ impl Crds { mod test { use super::*; use crate::contact_info::ContactInfo; - use crate::crds_value::CrdsData; use rand::{thread_rng, Rng}; use rayon::ThreadPoolBuilder; @@ -323,7 +363,7 @@ mod test { let mut rng = thread_rng(); let mut crds = Crds::default(); let mut timeouts = HashMap::new(); - let val = CrdsValue::new_rand(&mut rng); + let val = CrdsValue::new_rand(&mut rng, None); timeouts.insert(Pubkey::default(), 3); assert_eq!(crds.insert(val.clone(), 0), Ok(None)); assert!(crds.find_old_labels(&thread_pool, 2, &timeouts).is_empty()); @@ -397,27 +437,29 @@ mod test { } let mut crds = Crds::default(); - let pubkeys: Vec<_> = std::iter::repeat_with(solana_sdk::pubkey::new_rand) - .take(256) - .collect(); + let keypairs: Vec<_> = std::iter::repeat_with(Keypair::new).take(256).collect(); let mut rng = thread_rng(); let mut num_inserts = 0; + let mut num_overrides = 0; for _ in 0..4096 { - let pubkey = pubkeys[rng.gen_range(0, pubkeys.len())]; - let value = VersionedCrdsValue::new( - rng.gen(), // local_timestamp - CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( - &pubkey, - rng.gen(), // now - ))), - ); - if crds.insert_versioned(value).is_ok() { - check_crds_shards(&crds); - num_inserts += 1; + let keypair = &keypairs[rng.gen_range(0, keypairs.len())]; + let value = VersionedCrdsValue::new_rand(&mut rng, Some(keypair)); + match crds.insert_versioned(value) { + Ok(None) => { + num_inserts += 1; + check_crds_shards(&crds); + } + Ok(Some(_)) => { + num_inserts += 1; + num_overrides += 1; + check_crds_shards(&crds); + } + Err(_) => (), } } assert_eq!(num_inserts, crds.num_inserts); assert!(num_inserts > 700); + assert!(num_overrides > 500); assert!(crds.table.len() > 200); assert!(num_inserts > crds.table.len()); check_crds_shards(&crds); @@ -430,6 +472,55 @@ mod test { } } + #[test] + fn test_crds_nodes() { + fn check_crds_nodes(crds: &Crds) -> usize { + let num_nodes = crds + .table + .values() + .filter(|value| matches!(value.value.data, CrdsData::ContactInfo(_))) + .count(); + assert_eq!(num_nodes, crds.get_nodes_contact_info().count()); + num_nodes + } + let mut rng = thread_rng(); + let keypairs: Vec<_> = std::iter::repeat_with(Keypair::new).take(256).collect(); + let mut crds = Crds::default(); + let mut num_inserts = 0; + let mut num_overrides = 0; + for _ in 0..4096 { + let keypair = &keypairs[rng.gen_range(0, keypairs.len())]; + let value = VersionedCrdsValue::new_rand(&mut rng, Some(keypair)); + match crds.insert_versioned(value) { + Ok(None) => { + num_inserts += 1; + check_crds_nodes(&crds); + } + Ok(Some(_)) => { + num_inserts += 1; + num_overrides += 1; + check_crds_nodes(&crds); + } + Err(_) => (), + } + } + assert_eq!(num_inserts, crds.num_inserts); + assert!(num_inserts > 700); + assert!(num_overrides > 500); + assert!(crds.table.len() > 200); + assert!(num_inserts > crds.table.len()); + let num_nodes = check_crds_nodes(&crds); + assert!(num_nodes * 3 < crds.table.len()); + assert!(num_nodes > 150); + // Remove values one by one and assert that nodes indices stay valid. + while !crds.table.is_empty() { + let index = rng.gen_range(0, crds.table.len()); + let key = crds.table.get_index(index).unwrap().0.clone(); + crds.remove(&key); + check_crds_nodes(&crds); + } + } + #[test] fn test_remove_staked() { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 45e419ddb3..ca0dd300a0 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -242,10 +242,9 @@ impl CrdsGossipPull { ) -> Vec<(f32, &'a ContactInfo)> { let mut rng = rand::thread_rng(); let active_cutoff = now.saturating_sub(PULL_ACTIVE_TIMEOUT_MS); - crds.table - .values() + crds.get_nodes() .filter_map(|value| { - let info = value.value.contact_info()?; + let info = value.value.contact_info().unwrap(); // Stop pulling from nodes which have not been active recently. if value.local_timestamp < active_cutoff { // In order to mitigate eclipse attack, for staked nodes @@ -865,7 +864,7 @@ mod test { let mut num_inserts = 0; for _ in 0..20_000 { if crds - .insert(CrdsValue::new_rand(&mut rng), rng.gen()) + .insert(CrdsValue::new_rand(&mut rng, None), rng.gen()) .is_ok() { num_inserts += 1; diff --git a/core/src/crds_gossip_push.rs b/core/src/crds_gossip_push.rs index 88429462cb..edf9b028a9 100644 --- a/core/src/crds_gossip_push.rs +++ b/core/src/crds_gossip_push.rs @@ -377,10 +377,9 @@ impl CrdsGossipPush { let mut rng = rand::thread_rng(); let max_weight = u16::MAX as f32 - 1.0; let active_cutoff = now.saturating_sub(PUSH_ACTIVE_TIMEOUT_MS); - crds.table - .values() + crds.get_nodes() .filter_map(|value| { - let info = value.value.contact_info()?; + let info = value.value.contact_info().unwrap(); // Stop pushing to nodes which have not been active recently. if value.local_timestamp < active_cutoff { // In order to mitigate eclipse attack, for staked nodes diff --git a/core/src/crds_value.rs b/core/src/crds_value.rs index bc800a3c29..195c3004d4 100644 --- a/core/src/crds_value.rs +++ b/core/src/crds_value.rs @@ -1,14 +1,16 @@ +use crate::cluster_info::MAX_SNAPSHOT_HASHES; use crate::contact_info::ContactInfo; use crate::deprecated; use crate::epoch_slots::EpochSlots; use bincode::{serialize, serialized_size}; +use rand::Rng; use solana_sdk::sanitize::{Sanitize, SanitizeError}; use solana_sdk::timing::timestamp; use solana_sdk::{ clock::Slot, hash::Hash, - pubkey::Pubkey, - signature::{Keypair, Signable, Signature}, + pubkey::{self, Pubkey}, + signature::{Keypair, Signable, Signature, Signer}, transaction::Transaction, }; use std::{ @@ -109,6 +111,29 @@ impl Sanitize for CrdsData { } } +/// Random timestamp for tests and benchmarks. +fn new_rand_timestamp(rng: &mut R) -> u64 { + let delay = 10 * 60 * 1000; // 10 minutes + timestamp() - delay + rng.gen_range(0, 2 * delay) +} + +impl CrdsData { + /// New random CrdsData for tests and benchmarks. + fn new_rand(rng: &mut R, pubkey: Option) -> CrdsData { + let kind = rng.gen_range(0, 5); + // TODO: Implement other kinds of CrdsData here. + // TODO: Assign ranges to each arm proportional to their frequency in + // the mainnet crds table. + match kind { + 0 => CrdsData::ContactInfo(ContactInfo::new_rand(rng, pubkey)), + 1 => CrdsData::LowestSlot(rng.gen(), LowestSlot::new_rand(rng, pubkey)), + 2 => CrdsData::SnapshotHashes(SnapshotHash::new_rand(rng, pubkey)), + 3 => CrdsData::AccountsHashes(SnapshotHash::new_rand(rng, pubkey)), + _ => CrdsData::Version(Version::new_rand(rng, pubkey)), + } + } +} + #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, AbiExample)] pub struct SnapshotHash { pub from: Pubkey, @@ -138,6 +163,23 @@ impl SnapshotHash { wallclock: timestamp(), } } + + /// New random SnapshotHash for tests and benchmarks. + pub(crate) fn new_rand(rng: &mut R, pubkey: Option) -> Self { + let num_hashes = rng.gen_range(0, MAX_SNAPSHOT_HASHES) + 1; + let hashes = std::iter::repeat_with(|| { + let slot = 47825632 + rng.gen_range(0, 512); + let hash = solana_sdk::hash::new_rand(rng); + (slot, hash) + }) + .take(num_hashes) + .collect(); + Self { + from: pubkey.unwrap_or_else(pubkey::new_rand), + hashes, + wallclock: new_rand_timestamp(rng), + } + } } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, AbiExample)] pub struct LowestSlot { @@ -160,6 +202,18 @@ impl LowestSlot { wallclock, } } + + /// New random LowestSlot for tests and benchmarks. + fn new_rand(rng: &mut R, pubkey: Option) -> Self { + Self { + from: pubkey.unwrap_or_else(pubkey::new_rand), + root: rng.gen(), + lowest: rng.gen(), + slots: BTreeSet::default(), + stash: Vec::default(), + wallclock: new_rand_timestamp(rng), + } + } } impl Sanitize for LowestSlot { @@ -252,6 +306,21 @@ impl Version { version: solana_version::Version::default(), } } + + /// New random Version for tests and benchmarks. + fn new_rand(rng: &mut R, pubkey: Option) -> Self { + Self { + from: pubkey.unwrap_or_else(pubkey::new_rand), + wallclock: new_rand_timestamp(rng), + version: solana_version::Version { + major: rng.gen(), + minor: rng.gen(), + patch: rng.gen(), + commit: Some(rng.gen()), + feature_set: rng.gen(), + }, + } + } } /// Type of the replicated value @@ -312,14 +381,19 @@ impl CrdsValue { value } - /// New random crds value for tests and benchmarks. - pub fn new_rand(rng: &mut R) -> CrdsValue - where - R: rand::Rng, - { - let now = rng.gen(); - let contact_info = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), now); - Self::new_signed(CrdsData::ContactInfo(contact_info), &Keypair::new()) + /// New random CrdsValue for tests and benchmarks. + pub fn new_rand(rng: &mut R, keypair: Option<&Keypair>) -> CrdsValue { + match keypair { + None => { + let keypair = Keypair::new(); + let data = CrdsData::new_rand(rng, Some(keypair.pubkey())); + Self::new_signed(data, &keypair) + } + Some(keypair) => { + let data = CrdsData::new_rand(rng, Some(keypair.pubkey())); + Self::new_signed(data, keypair) + } + } } /// Totally unsecure unverifiable wallclock of the node that generated this message