diff --git a/Cargo.lock b/Cargo.lock index c9a6fb6721..1c1e272f38 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4848,7 +4848,7 @@ dependencies = [ "clap", "flate2", "indexmap", - "itertools 0.9.0", + "itertools 0.10.1", "log 0.4.14", "lru", "matches", diff --git a/core/src/cluster_nodes.rs b/core/src/cluster_nodes.rs index 9927689561..09e94b6f91 100644 --- a/core/src/cluster_nodes.rs +++ b/core/src/cluster_nodes.rs @@ -318,6 +318,7 @@ mod tests { super::*, rand::{seq::SliceRandom, Rng}, solana_gossip::{ + crds::GossipRoute, crds_value::{CrdsData, CrdsValue}, deprecated::{ shuffle_peers_and_index, sorted_retransmit_peers_and_stakes, @@ -384,7 +385,10 @@ mod tests { for node in nodes.iter().skip(1) { let node = CrdsData::ContactInfo(node.clone()); let node = CrdsValue::new_unsigned(node); - assert_eq!(gossip.crds.insert(node, now), Ok(())); + assert_eq!( + gossip.crds.insert(node, now, GossipRoute::LocalMessage), + Ok(()) + ); } } (nodes, stakes, cluster_info) diff --git a/gossip/Cargo.toml b/gossip/Cargo.toml index 8a97c32c58..7875d27813 100644 --- a/gossip/Cargo.toml +++ b/gossip/Cargo.toml @@ -15,7 +15,7 @@ bv = { version = "0.11.1", features = ["serde"] } clap = "2.33.1" flate2 = "1.0" indexmap = { version = "1.5", features = ["rayon"] } -itertools = "0.9.0" +itertools = "0.10.1" log = "0.4.11" lru = "0.6.1" matches = "0.1.8" diff --git a/gossip/benches/crds.rs b/gossip/benches/crds.rs index a5bfac749d..9ee78a6aa6 100644 --- a/gossip/benches/crds.rs +++ b/gossip/benches/crds.rs @@ -6,7 +6,9 @@ use { rand::{thread_rng, Rng}, rayon::ThreadPoolBuilder, solana_gossip::{ - crds::Crds, crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, crds_value::CrdsValue, + crds::{Crds, GossipRoute}, + crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, + crds_value::CrdsValue, }, solana_sdk::pubkey::Pubkey, std::collections::HashMap, @@ -21,7 +23,7 @@ fn bench_find_old_labels(bencher: &mut Bencher) { let now = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS + CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 1000; std::iter::repeat_with(|| (CrdsValue::new_rand(&mut rng, None), rng.gen_range(0, now))) .take(50_000) - .for_each(|(v, ts)| assert!(crds.insert(v, ts).is_ok())); + .for_each(|(v, ts)| assert!(crds.insert(v, ts, GossipRoute::LocalMessage).is_ok())); let mut timeouts = HashMap::new(); timeouts.insert(Pubkey::default(), CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS); bencher.iter(|| { diff --git a/gossip/benches/crds_gossip_pull.rs b/gossip/benches/crds_gossip_pull.rs index e82e9b5733..50a4897efd 100644 --- a/gossip/benches/crds_gossip_pull.rs +++ b/gossip/benches/crds_gossip_pull.rs @@ -7,7 +7,7 @@ use { rayon::ThreadPoolBuilder, solana_gossip::{ cluster_info::MAX_BLOOM_SIZE, - crds::Crds, + crds::{Crds, GossipRoute}, crds_gossip_pull::{CrdsFilter, CrdsGossipPull}, crds_value::CrdsValue, }, @@ -38,7 +38,11 @@ fn bench_build_crds_filters(bencher: &mut Bencher) { let mut num_inserts = 0; for _ in 0..90_000 { if crds - .insert(CrdsValue::new_rand(&mut rng, None), rng.gen()) + .insert( + CrdsValue::new_rand(&mut rng, None), + rng.gen(), + GossipRoute::LocalMessage, + ) .is_ok() { num_inserts += 1; diff --git a/gossip/benches/crds_shards.rs b/gossip/benches/crds_shards.rs index 04493322bd..02db41d2a1 100644 --- a/gossip/benches/crds_shards.rs +++ b/gossip/benches/crds_shards.rs @@ -5,7 +5,7 @@ extern crate test; use { rand::{thread_rng, Rng}, solana_gossip::{ - crds::{Crds, VersionedCrdsValue}, + crds::{Crds, GossipRoute, VersionedCrdsValue}, crds_shards::CrdsShards, crds_value::CrdsValue, }, @@ -20,7 +20,8 @@ fn new_test_crds_value(rng: &mut R) -> VersionedCrdsValue { let value = CrdsValue::new_rand(rng, None); let label = value.label(); let mut crds = Crds::default(); - crds.insert(value, timestamp()).unwrap(); + crds.insert(value, timestamp(), GossipRoute::LocalMessage) + .unwrap(); crds.get(&label).cloned().unwrap() } diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index a5f92b1c9a..b6d030cc4a 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -18,7 +18,7 @@ use { submit_gossip_stats, Counter, GossipStats, ScopedTimer, TimedGuard, }, contact_info::ContactInfo, - crds::{Crds, Cursor}, + crds::{Crds, Cursor, GossipRoute}, crds_gossip::CrdsGossip, crds_gossip_error::CrdsGossipError, crds_gossip_pull::{CrdsFilter, ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}, @@ -492,7 +492,12 @@ impl ClusterInfo { // TODO kill insert_info, only used by tests pub fn insert_info(&self, contact_info: ContactInfo) { let value = CrdsValue::new_signed(CrdsData::ContactInfo(contact_info), &self.keypair); - let _ = self.gossip.write().unwrap().crds.insert(value, timestamp()); + let _ = + self.gossip + .write() + .unwrap() + .crds + .insert(value, timestamp(), GossipRoute::LocalMessage); } pub fn set_entrypoint(&self, entrypoint: ContactInfo) { @@ -608,7 +613,7 @@ impl ClusterInfo { let now = timestamp(); let mut gossip = self.gossip.write().unwrap(); for node in nodes { - if let Err(err) = gossip.crds.insert(node, now) { + if let Err(err) = gossip.crds.insert(node, now, GossipRoute::LocalMessage) { warn!("crds insert failed {:?}", err); } } @@ -896,7 +901,7 @@ impl ClusterInfo { let mut gossip = self.gossip.write().unwrap(); let now = timestamp(); for entry in entries { - if let Err(err) = gossip.crds.insert(entry, now) { + if let Err(err) = gossip.crds.insert(entry, now, GossipRoute::LocalMessage) { error!("push_epoch_slots failed: {:?}", err); } } @@ -959,7 +964,7 @@ impl ClusterInfo { let vote = CrdsData::Vote(vote_index, vote); let vote = CrdsValue::new_signed(vote, &self.keypair); let mut gossip = self.gossip.write().unwrap(); - if let Err(err) = gossip.crds.insert(vote, now) { + if let Err(err) = gossip.crds.insert(vote, now, GossipRoute::LocalMessage) { error!("push_vote failed: {:?}", err); } } @@ -1307,7 +1312,12 @@ impl ClusterInfo { fn insert_self(&self) { let value = CrdsValue::new_signed(CrdsData::ContactInfo(self.my_contact_info()), &self.keypair); - let _ = self.gossip.write().unwrap().crds.insert(value, timestamp()); + let _ = + self.gossip + .write() + .unwrap() + .crds + .insert(value, timestamp(), GossipRoute::LocalMessage); } // If the network entrypoint hasn't been discovered yet, add it to the crds table @@ -1468,7 +1478,7 @@ impl ClusterInfo { let mut gossip = self.gossip.write().unwrap(); let now = timestamp(); for entry in pending_push_messages { - let _ = gossip.crds.insert(entry, now); + let _ = gossip.crds.insert(entry, now, GossipRoute::LocalMessage); } } fn new_push_requests( @@ -3751,7 +3761,10 @@ mod tests { { let mut gossip = cluster_info.gossip.write().unwrap(); for entry in entries { - assert!(gossip.crds.insert(entry, /*now=*/ 0).is_ok()); + assert!(gossip + .crds + .insert(entry, /*now=*/ 0, GossipRoute::LocalMessage) + .is_ok()); } } // Should exclude other node's epoch-slot because of different @@ -4050,12 +4063,11 @@ mod tests { 0, LowestSlot::new(other_node_pubkey, peer_lowest, timestamp()), )); - let _ = cluster_info - .gossip - .write() - .unwrap() - .crds - .insert(value, timestamp()); + let _ = cluster_info.gossip.write().unwrap().crds.insert( + value, + timestamp(), + GossipRoute::LocalMessage, + ); } // only half the visible peers should be eligible to serve this repair assert_eq!(cluster_info.repair_peers(5).len(), 5); diff --git a/gossip/src/cluster_info_metrics.rs b/gossip/src/cluster_info_metrics.rs index 054e48219d..b23ed49bba 100644 --- a/gossip/src/cluster_info_metrics.rs +++ b/gossip/src/cluster_info_metrics.rs @@ -1,7 +1,8 @@ use { crate::crds_gossip::CrdsGossip, + itertools::Itertools, solana_measure::measure::Measure, - solana_sdk::pubkey::Pubkey, + solana_sdk::{clock::Slot, pubkey::Pubkey}, std::{ collections::HashMap, ops::{Deref, DerefMut}, @@ -163,9 +164,10 @@ pub(crate) fn submit_gossip_stats( gossip: &RwLock, stakes: &HashMap, ) { - let (table_size, num_nodes, num_pubkeys, purged_values_size, failed_inserts_size) = { + let (crds_stats, table_size, num_nodes, num_pubkeys, purged_values_size, failed_inserts_size) = { let gossip = gossip.read().unwrap(); ( + gossip.crds.take_stats(), gossip.crds.len(), gossip.crds.num_nodes(), gossip.crds.num_pubkeys(), @@ -449,4 +451,155 @@ pub(crate) fn submit_gossip_stats( i64 ), ); + let counts: Vec<_> = crds_stats + .pull + .counts + .iter() + .zip(crds_stats.push.counts.iter()) + .map(|(a, b)| a + b) + .collect(); + datapoint_info!( + "cluster_info_crds_stats", + ("ContactInfo", counts[0], i64), + ("ContactInfo-push", crds_stats.push.counts[0], i64), + ("ContactInfo-pull", crds_stats.pull.counts[0], i64), + ("Vote", counts[1], i64), + ("Vote-push", crds_stats.push.counts[1], i64), + ("Vote-pull", crds_stats.pull.counts[1], i64), + ("LowestSlot", counts[2], i64), + ("LowestSlot-push", crds_stats.push.counts[2], i64), + ("LowestSlot-pull", crds_stats.pull.counts[2], i64), + ("SnapshotHashes", counts[3], i64), + ("SnapshotHashes-push", crds_stats.push.counts[3], i64), + ("SnapshotHashes-pull", crds_stats.pull.counts[3], i64), + ("AccountsHashes", counts[4], i64), + ("AccountsHashes-push", crds_stats.push.counts[4], i64), + ("AccountsHashes-pull", crds_stats.pull.counts[4], i64), + ("EpochSlots", counts[5], i64), + ("EpochSlots-push", crds_stats.push.counts[5], i64), + ("EpochSlots-pull", crds_stats.pull.counts[5], i64), + ("LegacyVersion", counts[6], i64), + ("LegacyVersion-push", crds_stats.push.counts[6], i64), + ("LegacyVersion-pull", crds_stats.pull.counts[6], i64), + ("Version", counts[7], i64), + ("Version-push", crds_stats.push.counts[7], i64), + ("Version-pull", crds_stats.pull.counts[7], i64), + ("NodeInstance", counts[8], i64), + ("NodeInstance-push", crds_stats.push.counts[8], i64), + ("NodeInstance-pull", crds_stats.pull.counts[8], i64), + ("DuplicateShred", counts[9], i64), + ("DuplicateShred-push", crds_stats.push.counts[9], i64), + ("DuplicateShred-pull", crds_stats.pull.counts[9], i64), + ("IncrementalSnapshotHashes", counts[10], i64), + ( + "IncrementalSnapshotHashes-push", + crds_stats.push.counts[10], + i64 + ), + ( + "IncrementalSnapshotHashes-pull", + crds_stats.pull.counts[10], + i64 + ), + ("all", counts.iter().sum::(), i64), + ( + "all-push", + crds_stats.push.counts.iter().sum::(), + i64 + ), + ( + "all-pull", + crds_stats.pull.counts.iter().sum::(), + i64 + ), + ); + let fails: Vec<_> = crds_stats + .pull + .fails + .iter() + .zip(crds_stats.push.fails.iter()) + .map(|(a, b)| a + b) + .collect(); + datapoint_info!( + "cluster_info_crds_stats_fails", + ("ContactInfo", fails[0], i64), + ("ContactInfo-push", crds_stats.push.fails[0], i64), + ("ContactInfo-pull", crds_stats.pull.fails[0], i64), + ("Vote", fails[1], i64), + ("Vote-push", crds_stats.push.fails[1], i64), + ("Vote-pull", crds_stats.pull.fails[1], i64), + ("LowestSlot", fails[2], i64), + ("LowestSlot-push", crds_stats.push.fails[2], i64), + ("LowestSlot-pull", crds_stats.pull.fails[2], i64), + ("SnapshotHashes", fails[3], i64), + ("SnapshotHashes-push", crds_stats.push.fails[3], i64), + ("SnapshotHashes-pull", crds_stats.pull.fails[3], i64), + ("AccountsHashes", fails[4], i64), + ("AccountsHashes-push", crds_stats.push.fails[4], i64), + ("AccountsHashes-pull", crds_stats.pull.fails[4], i64), + ("EpochSlots", fails[5], i64), + ("EpochSlots-push", crds_stats.push.fails[5], i64), + ("EpochSlots-pull", crds_stats.pull.fails[5], i64), + ("LegacyVersion", fails[6], i64), + ("LegacyVersion-push", crds_stats.push.fails[6], i64), + ("LegacyVersion-pull", crds_stats.pull.fails[6], i64), + ("Version", fails[7], i64), + ("Version-push", crds_stats.push.fails[7], i64), + ("Version-pull", crds_stats.pull.fails[7], i64), + ("NodeInstance", fails[8], i64), + ("NodeInstance-push", crds_stats.push.fails[8], i64), + ("NodeInstance-pull", crds_stats.pull.fails[8], i64), + ("DuplicateShred", fails[9], i64), + ("DuplicateShred-push", crds_stats.push.fails[9], i64), + ("DuplicateShred-pull", crds_stats.pull.fails[9], i64), + ("IncrementalSnapshotHashes", fails[10], i64), + ( + "IncrementalSnapshotHashes-push", + crds_stats.push.fails[10], + i64 + ), + ( + "IncrementalSnapshotHashes-pull", + crds_stats.pull.fails[10], + i64 + ), + ("all", fails.iter().sum::(), i64), + ("all-push", crds_stats.push.fails.iter().sum::(), i64), + ("all-pull", crds_stats.pull.fails.iter().sum::(), i64), + ); + for (slot, num_votes) in &crds_stats.pull.votes { + datapoint_info!( + "cluster_info_crds_stats_votes_pull", + ("slot", *slot, i64), + ("num_votes", *num_votes, i64), + ); + } + for (slot, num_votes) in &crds_stats.push.votes { + datapoint_info!( + "cluster_info_crds_stats_votes_push", + ("slot", *slot, i64), + ("num_votes", *num_votes, i64), + ); + } + let votes: HashMap = crds_stats + .pull + .votes + .into_iter() + .map(|(slot, num_votes)| (*slot, *num_votes)) + .chain( + crds_stats + .push + .votes + .into_iter() + .map(|(slot, num_votes)| (*slot, *num_votes)), + ) + .into_grouping_map() + .aggregate(|acc, _slot, num_votes| Some(acc.unwrap_or_default() + num_votes)); + for (slot, num_votes) in votes { + datapoint_info!( + "cluster_info_crds_stats_votes", + ("slot", slot, i64), + ("num_votes", num_votes, i64), + ); + } } diff --git a/gossip/src/crds.rs b/gossip/src/crds.rs index 59bf942ded..0b22561559 100644 --- a/gossip/src/crds.rs +++ b/gossip/src/crds.rs @@ -35,9 +35,11 @@ use { map::{rayon::ParValues, Entry, IndexMap}, set::IndexSet, }, + lru::LruCache, matches::debug_assert_matches, rayon::{prelude::*, ThreadPool}, solana_sdk::{ + clock::Slot, hash::{hash, Hash}, pubkey::Pubkey, }, @@ -45,12 +47,14 @@ use { cmp::Ordering, collections::{hash_map, BTreeMap, HashMap, VecDeque}, ops::{Bound, Index, IndexMut}, + sync::Mutex, }, }; const CRDS_SHARDS_BITS: u32 = 8; +// Number of vote slots to track in an lru-cache for metrics. +const VOTE_SLOTS_METRICS_CAP: usize = 100; -#[derive(Clone)] pub struct Crds { /// Stores the map of labels and values table: IndexMap, @@ -69,6 +73,7 @@ pub struct Crds { purged: VecDeque<(Hash, u64 /*timestamp*/)>, // Mapping from nodes' pubkeys to their respective shred-version. shred_versions: HashMap, + stats: Mutex, } #[derive(PartialEq, Debug)] @@ -77,6 +82,28 @@ pub enum CrdsError { UnknownStakes, } +#[derive(Clone, Copy)] +pub enum GossipRoute { + LocalMessage, + PullRequest, + PullResponse, + PushMessage, +} + +type CrdsCountsArray = [usize; 11]; + +pub(crate) struct CrdsDataStats { + pub(crate) counts: CrdsCountsArray, + pub(crate) fails: CrdsCountsArray, + pub(crate) votes: LruCache, +} + +#[derive(Default)] +pub(crate) struct CrdsStats { + pub(crate) pull: CrdsDataStats, + pub(crate) push: CrdsDataStats, +} + /// This structure stores some local metadata associated with the CrdsValue #[derive(PartialEq, Debug, Clone)] pub struct VersionedCrdsValue { @@ -129,6 +156,7 @@ impl Default for Crds { entries: BTreeMap::default(), purged: VecDeque::default(), shred_versions: HashMap::default(), + stats: Mutex::::default(), } } } @@ -169,12 +197,18 @@ impl Crds { } } - pub fn insert(&mut self, value: CrdsValue, now: u64) -> Result<(), CrdsError> { + pub fn insert( + &mut self, + value: CrdsValue, + now: u64, + route: GossipRoute, + ) -> Result<(), CrdsError> { let label = value.label(); let pubkey = value.pubkey(); let value = VersionedCrdsValue::new(value, self.cursor, now); match self.table.entry(label) { Entry::Vacant(entry) => { + self.stats.lock().unwrap().record_insert(&value, route); let entry_index = entry.index(); self.shards.insert(entry_index, &value); match &value.value.data { @@ -197,6 +231,7 @@ impl Crds { Ok(()) } Entry::Occupied(mut entry) if overrides(&value.value, entry.get()) => { + self.stats.lock().unwrap().record_insert(&value, route); let entry_index = entry.index(); self.shards.remove(entry_index, entry.get()); self.shards.insert(entry_index, &value); @@ -228,6 +263,7 @@ impl Crds { Ok(()) } Entry::Occupied(entry) => { + self.stats.lock().unwrap().record_fail(&value, route); trace!( "INSERT FAILED data: {} new.wallclock: {}", value.value.label(), @@ -562,6 +598,88 @@ impl Crds { } Ok(keys.len()) } + + pub(crate) fn take_stats(&self) -> CrdsStats { + std::mem::take(&mut self.stats.lock().unwrap()) + } + + // Only for tests and simulations. + pub(crate) fn mock_clone(&self) -> Self { + Self { + table: self.table.clone(), + cursor: self.cursor, + shards: self.shards.clone(), + nodes: self.nodes.clone(), + votes: self.votes.clone(), + epoch_slots: self.epoch_slots.clone(), + records: self.records.clone(), + entries: self.entries.clone(), + purged: self.purged.clone(), + shred_versions: self.shred_versions.clone(), + stats: Mutex::::default(), + } + } +} + +impl Default for CrdsDataStats { + fn default() -> Self { + Self { + counts: CrdsCountsArray::default(), + fails: CrdsCountsArray::default(), + votes: LruCache::new(VOTE_SLOTS_METRICS_CAP), + } + } +} + +impl CrdsDataStats { + fn record_insert(&mut self, entry: &VersionedCrdsValue) { + self.counts[Self::ordinal(entry)] += 1; + if let CrdsData::Vote(_, vote) = &entry.value.data { + if let Some(slot) = vote.slot() { + let num_nodes = self.votes.get(&slot).copied().unwrap_or_default(); + self.votes.put(slot, num_nodes + 1); + } + } + } + + fn record_fail(&mut self, entry: &VersionedCrdsValue) { + self.fails[Self::ordinal(entry)] += 1; + } + + fn ordinal(entry: &VersionedCrdsValue) -> usize { + match &entry.value.data { + CrdsData::ContactInfo(_) => 0, + CrdsData::Vote(_, _) => 1, + CrdsData::LowestSlot(_, _) => 2, + CrdsData::SnapshotHashes(_) => 3, + CrdsData::AccountsHashes(_) => 4, + CrdsData::EpochSlots(_, _) => 5, + CrdsData::LegacyVersion(_) => 6, + CrdsData::Version(_) => 7, + CrdsData::NodeInstance(_) => 8, + CrdsData::DuplicateShred(_, _) => 9, + } + } +} + +impl CrdsStats { + fn record_insert(&mut self, entry: &VersionedCrdsValue, route: GossipRoute) { + match route { + GossipRoute::LocalMessage => (), + GossipRoute::PullRequest => (), + GossipRoute::PushMessage => self.push.record_insert(entry), + GossipRoute::PullResponse => self.pull.record_insert(entry), + } + } + + fn record_fail(&mut self, entry: &VersionedCrdsValue, route: GossipRoute) { + match route { + GossipRoute::LocalMessage => (), + GossipRoute::PullRequest => (), + GossipRoute::PushMessage => self.push.record_fail(entry), + GossipRoute::PullResponse => self.pull.record_fail(entry), + } + } } #[cfg(test)] @@ -586,7 +704,10 @@ mod tests { fn test_insert() { let mut crds = Crds::default(); let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); - assert_eq!(crds.insert(val.clone(), 0), Ok(())); + assert_eq!( + crds.insert(val.clone(), 0, GossipRoute::LocalMessage), + Ok(()) + ); assert_eq!(crds.table.len(), 1); assert!(crds.table.contains_key(&val.label())); assert_eq!(crds.table[&val.label()].local_timestamp, 0); @@ -595,8 +716,14 @@ mod tests { fn test_update_old() { let mut crds = Crds::default(); let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); - assert_eq!(crds.insert(val.clone(), 0), Ok(())); - assert_eq!(crds.insert(val.clone(), 1), Err(CrdsError::InsertFailed)); + assert_eq!( + crds.insert(val.clone(), 0, GossipRoute::LocalMessage), + Ok(()) + ); + assert_eq!( + crds.insert(val.clone(), 1, GossipRoute::LocalMessage), + Err(CrdsError::InsertFailed) + ); assert!(crds.purged.is_empty()); assert_eq!(crds.table[&val.label()].local_timestamp, 0); } @@ -608,12 +735,15 @@ mod tests { 0, ))); let value_hash = hash(&serialize(&original).unwrap()); - assert_matches!(crds.insert(original, 0), Ok(())); + assert_matches!(crds.insert(original, 0, GossipRoute::LocalMessage), Ok(())); let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &Pubkey::default(), 1, ))); - assert_eq!(crds.insert(val.clone(), 1), Ok(())); + assert_eq!( + crds.insert(val.clone(), 1, GossipRoute::LocalMessage), + Ok(()) + ); assert_eq!(*crds.purged.back().unwrap(), (value_hash, 1)); assert_eq!(crds.table[&val.label()].local_timestamp, 1); } @@ -624,13 +754,19 @@ mod tests { &Pubkey::default(), 0, ))); - assert_eq!(crds.insert(val.clone(), 0), Ok(())); + assert_eq!( + crds.insert(val.clone(), 0, GossipRoute::LocalMessage), + Ok(()) + ); assert_eq!(crds.table[&val.label()].ordinal, 0); let val2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); let value_hash = hash(&serialize(&val2).unwrap()); assert_eq!(val2.label().pubkey(), val.label().pubkey()); - assert_eq!(crds.insert(val2.clone(), 0), Ok(())); + assert_eq!( + crds.insert(val2.clone(), 0, GossipRoute::LocalMessage), + Ok(()) + ); crds.update_record_timestamp(&val.label().pubkey(), 2); assert_eq!(crds.table[&val.label()].local_timestamp, 2); @@ -645,7 +781,7 @@ mod tests { let mut ci = ContactInfo::default(); ci.wallclock += 1; let val3 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci)); - assert_eq!(crds.insert(val3, 3), Ok(())); + assert_eq!(crds.insert(val3, 3, GossipRoute::LocalMessage), Ok(())); assert_eq!(*crds.purged.back().unwrap(), (value_hash, 3)); assert_eq!(crds.table[&val2.label()].local_timestamp, 3); assert_eq!(crds.table[&val2.label()].ordinal, 2); @@ -663,19 +799,22 @@ mod tests { let pubkey = Pubkey::new_unique(); let node = NodeInstance::new(&mut rng, pubkey, now); let node = make_crds_value(node); - assert_eq!(crds.insert(node, now), Ok(())); + assert_eq!(crds.insert(node, now, GossipRoute::LocalMessage), Ok(())); // A node-instance with a different key should insert fine even with // older timestamps. let other = NodeInstance::new(&mut rng, Pubkey::new_unique(), now - 1); let other = make_crds_value(other); - assert_eq!(crds.insert(other, now), Ok(())); + assert_eq!(crds.insert(other, now, GossipRoute::LocalMessage), Ok(())); // A node-instance with older timestamp should fail to insert, even if // the wallclock is more recent. let other = NodeInstance::new(&mut rng, pubkey, now - 1); let other = other.with_wallclock(now + 1); let other = make_crds_value(other); let value_hash = hash(&serialize(&other).unwrap()); - assert_eq!(crds.insert(other, now), Err(CrdsError::InsertFailed)); + assert_eq!( + crds.insert(other, now, GossipRoute::LocalMessage), + Err(CrdsError::InsertFailed) + ); assert_eq!(*crds.purged.back().unwrap(), (value_hash, now)); // A node instance with the same timestamp should insert only if the // random token is larger. @@ -684,7 +823,7 @@ mod tests { let other = NodeInstance::new(&mut rng, pubkey, now); let other = make_crds_value(other); let value_hash = hash(&serialize(&other).unwrap()); - match crds.insert(other, now) { + match crds.insert(other, now, GossipRoute::LocalMessage) { Ok(()) => num_overrides += 1, Err(CrdsError::InsertFailed) => { assert_eq!(*crds.purged.back().unwrap(), (value_hash, now)) @@ -699,7 +838,7 @@ mod tests { let other = NodeInstance::new(&mut rng, pubkey, now + k); let other = other.with_wallclock(now - 1); let other = make_crds_value(other); - match crds.insert(other, now) { + match crds.insert(other, now, GossipRoute::LocalMessage) { Ok(()) => (), _ => panic!(), } @@ -711,7 +850,10 @@ mod tests { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let mut crds = Crds::default(); let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); - assert_eq!(crds.insert(val.clone(), 1), Ok(())); + assert_eq!( + crds.insert(val.clone(), 1, GossipRoute::LocalMessage), + Ok(()) + ); let mut set = HashMap::new(); set.insert(Pubkey::default(), 0); assert!(crds.find_old_labels(&thread_pool, 0, &set).is_empty()); @@ -734,7 +876,10 @@ mod tests { let mut timeouts = HashMap::new(); let val = CrdsValue::new_rand(&mut rng, None); timeouts.insert(Pubkey::default(), 3); - assert_eq!(crds.insert(val.clone(), 0), Ok(())); + assert_eq!( + crds.insert(val.clone(), 0, GossipRoute::LocalMessage), + Ok(()) + ); assert!(crds.find_old_labels(&thread_pool, 2, &timeouts).is_empty()); timeouts.insert(val.pubkey(), 1); assert_eq!( @@ -757,7 +902,10 @@ mod tests { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let mut crds = Crds::default(); let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); - assert_matches!(crds.insert(val.clone(), 1), Ok(_)); + assert_matches!( + crds.insert(val.clone(), 1, GossipRoute::LocalMessage), + Ok(_) + ); let mut set = HashMap::new(); set.insert(Pubkey::default(), 1); assert_eq!( @@ -772,7 +920,10 @@ mod tests { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let mut crds = Crds::default(); let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); - assert_eq!(crds.insert(val.clone(), 1), Ok(())); + assert_eq!( + crds.insert(val.clone(), 1, GossipRoute::LocalMessage), + Ok(()) + ); let mut set = HashMap::new(); //now < timestamp set.insert(Pubkey::default(), 0); @@ -814,7 +965,7 @@ mod tests { let keypair = &keypairs[rng.gen_range(0, keypairs.len())]; let value = CrdsValue::new_rand(&mut rng, Some(keypair)); let local_timestamp = new_rand_timestamp(&mut rng); - if let Ok(()) = crds.insert(value, local_timestamp) { + if let Ok(()) = crds.insert(value, local_timestamp, GossipRoute::LocalMessage) { num_inserts += 1; check_crds_shards(&crds); } @@ -968,7 +1119,7 @@ mod tests { let keypair = &keypairs[rng.gen_range(0, keypairs.len())]; let value = CrdsValue::new_rand(&mut rng, Some(keypair)); let local_timestamp = new_rand_timestamp(&mut rng); - if let Ok(()) = crds.insert(value, local_timestamp) { + if let Ok(()) = crds.insert(value, local_timestamp, GossipRoute::LocalMessage) { num_inserts += 1; } if k % 16 == 0 { @@ -1022,7 +1173,7 @@ mod tests { let keypair = &keypairs[rng.gen_range(0, keypairs.len())]; let value = CrdsValue::new_rand(&mut rng, Some(keypair)); let local_timestamp = new_rand_timestamp(&mut rng); - let _ = crds.insert(value, local_timestamp); + let _ = crds.insert(value, local_timestamp, GossipRoute::LocalMessage); if k % 64 == 0 { check_crds_records(&crds); } @@ -1053,7 +1204,10 @@ mod tests { node.shred_version = 42; let node = CrdsData::ContactInfo(node); let node = CrdsValue::new_unsigned(node); - assert_eq!(crds.insert(node, timestamp()), Ok(())); + assert_eq!( + crds.insert(node, timestamp(), GossipRoute::LocalMessage), + Ok(()) + ); assert_eq!(crds.get_shred_version(&pubkey), Some(42)); // An outdated value should not update shred-version: let mut node = ContactInfo::new_rand(&mut rng, Some(pubkey)); @@ -1061,7 +1215,10 @@ mod tests { node.shred_version = 8; let node = CrdsData::ContactInfo(node); let node = CrdsValue::new_unsigned(node); - assert_eq!(crds.insert(node, timestamp()), Err(CrdsError::InsertFailed)); + assert_eq!( + crds.insert(node, timestamp(), GossipRoute::LocalMessage), + Err(CrdsError::InsertFailed) + ); assert_eq!(crds.get_shred_version(&pubkey), Some(42)); // Update shred version: let mut node = ContactInfo::new_rand(&mut rng, Some(pubkey)); @@ -1069,13 +1226,19 @@ mod tests { node.shred_version = 8; let node = CrdsData::ContactInfo(node); let node = CrdsValue::new_unsigned(node); - assert_eq!(crds.insert(node, timestamp()), Ok(())); + assert_eq!( + crds.insert(node, timestamp(), GossipRoute::LocalMessage), + Ok(()) + ); assert_eq!(crds.get_shred_version(&pubkey), Some(8)); // Add other crds values with the same pubkey. let val = SnapshotHash::new_rand(&mut rng, Some(pubkey)); let val = CrdsData::SnapshotHashes(val); let val = CrdsValue::new_unsigned(val); - assert_eq!(crds.insert(val, timestamp()), Ok(())); + assert_eq!( + crds.insert(val, timestamp(), GossipRoute::LocalMessage), + Ok(()) + ); assert_eq!(crds.get_shred_version(&pubkey), Some(8)); // Remove contact-info. Shred version should stay there since there // are still values associated with the pubkey. @@ -1112,7 +1275,7 @@ mod tests { let keypair = &keypairs[rng.gen_range(0, keypairs.len())]; let value = CrdsValue::new_rand(&mut rng, Some(keypair)); let local_timestamp = new_rand_timestamp(&mut rng); - let _ = crds.insert(value, local_timestamp); + let _ = crds.insert(value, local_timestamp, GossipRoute::LocalMessage); } let num_values = crds.table.len(); let num_pubkeys = num_unique_pubkeys(crds.table.values()); @@ -1153,7 +1316,10 @@ mod tests { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let mut crds = Crds::default(); let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); - assert_matches!(crds.insert(val.clone(), 1), Ok(_)); + assert_matches!( + crds.insert(val.clone(), 1, GossipRoute::LocalMessage), + Ok(_) + ); let mut set = HashMap::new(); //default has max timeout, but pubkey should still expire diff --git a/gossip/src/crds_gossip.rs b/gossip/src/crds_gossip.rs index f45125e644..6a41a94b47 100644 --- a/gossip/src/crds_gossip.rs +++ b/gossip/src/crds_gossip.rs @@ -7,7 +7,7 @@ use { crate::{ cluster_info::Ping, contact_info::ContactInfo, - crds::Crds, + crds::{Crds, GossipRoute}, crds_gossip_error::CrdsGossipError, crds_gossip_pull::{CrdsFilter, CrdsGossipPull, ProcessPullStats}, crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE}, @@ -88,7 +88,7 @@ impl CrdsGossip { now: u64, ) -> HashMap> { for entry in pending_push_messages { - let _ = self.crds.insert(entry, now); + let _ = self.crds.insert(entry, now, GossipRoute::LocalMessage); } self.push.new_push_messages(&self.crds, now) } @@ -150,7 +150,7 @@ impl CrdsGossip { }); let now = timestamp(); for entry in entries { - if let Err(err) = self.crds.insert(entry, now) { + if let Err(err) = self.crds.insert(entry, now, GossipRoute::LocalMessage) { error!("push_duplicate_shred faild: {:?}", err); } } @@ -334,7 +334,7 @@ impl CrdsGossip { // Only for tests and simulations. pub(crate) fn mock_clone(&self) -> Self { Self { - crds: self.crds.clone(), + crds: self.crds.mock_clone(), push: self.push.mock_clone(), pull: self.pull.mock_clone(), } @@ -377,6 +377,7 @@ mod test { .insert( CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone())), 0, + GossipRoute::LocalMessage, ) .unwrap(); crds_gossip.refresh_push_active_set( diff --git a/gossip/src/crds_gossip_pull.rs b/gossip/src/crds_gossip_pull.rs index 4c67c8bbbf..8630337d61 100644 --- a/gossip/src/crds_gossip_pull.rs +++ b/gossip/src/crds_gossip_pull.rs @@ -13,7 +13,7 @@ use { crate::{ cluster_info::{Ping, CRDS_UNIQUE_PUBKEY_CAPACITY}, contact_info::ContactInfo, - crds::{Crds, VersionedCrdsValue}, + crds::{Crds, GossipRoute, VersionedCrdsValue}, crds_gossip::{get_stake, get_weight}, crds_gossip_error::CrdsGossipError, crds_value::CrdsValue, @@ -333,7 +333,7 @@ impl CrdsGossipPull { { for caller in callers { let key = caller.pubkey(); - let _ = crds.insert(caller, now); + let _ = crds.insert(caller, now, GossipRoute::PullRequest); crds.update_record_timestamp(&key, now); } } @@ -413,11 +413,11 @@ impl CrdsGossipPull { ) { let mut owners = HashSet::new(); for response in responses_expired_timeout { - let _ = crds.insert(response, now); + let _ = crds.insert(response, now, GossipRoute::PullResponse); } for response in responses { let owner = response.pubkey(); - if let Ok(()) = crds.insert(response, now) { + if let Ok(()) = crds.insert(response, now, GossipRoute::PullResponse) { stats.success += 1; self.num_pulls += 1; owners.insert(owner); @@ -688,14 +688,16 @@ pub(crate) mod tests { &solana_sdk::pubkey::new_rand(), 0, ))); - crds.insert(me.clone(), 0).unwrap(); + crds.insert(me.clone(), 0, GossipRoute::LocalMessage) + .unwrap(); for i in 1..=30 { let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &solana_sdk::pubkey::new_rand(), 0, ))); let id = entry.label().pubkey(); - crds.insert(entry.clone(), 0).unwrap(); + crds.insert(entry.clone(), 0, GossipRoute::LocalMessage) + .unwrap(); stakes.insert(id, i * 100); } let now = 1024; @@ -750,10 +752,14 @@ pub(crate) mod tests { ..ContactInfo::default() })); - crds.insert(me.clone(), 0).unwrap(); - crds.insert(spy.clone(), 0).unwrap(); - crds.insert(node_123.clone(), 0).unwrap(); - crds.insert(node_456.clone(), 0).unwrap(); + crds.insert(me.clone(), 0, GossipRoute::LocalMessage) + .unwrap(); + crds.insert(spy.clone(), 0, GossipRoute::LocalMessage) + .unwrap(); + crds.insert(node_123.clone(), 0, GossipRoute::LocalMessage) + .unwrap(); + crds.insert(node_456.clone(), 0, GossipRoute::LocalMessage) + .unwrap(); // shred version 123 should ignore nodes with versions 0 and 456 let options = node @@ -811,8 +817,10 @@ pub(crate) mod tests { ..ContactInfo::default() })); - crds.insert(me.clone(), 0).unwrap(); - crds.insert(node_123.clone(), 0).unwrap(); + crds.insert(me.clone(), 0, GossipRoute::LocalMessage) + .unwrap(); + crds.insert(node_123.clone(), 0, GossipRoute::LocalMessage) + .unwrap(); // Empty gossip_validators -- will pull from nobody let mut gossip_validators = HashSet::new(); @@ -914,7 +922,10 @@ pub(crate) mod tests { for _ in 0..40_000 { let keypair = keypairs.choose(&mut rng).unwrap(); let value = CrdsValue::new_rand(&mut rng, Some(keypair)); - if crds.insert(value, rng.gen()).is_ok() { + if crds + .insert(value, rng.gen(), GossipRoute::LocalMessage) + .is_ok() + { num_inserts += 1; } } @@ -973,7 +984,7 @@ pub(crate) mod tests { Err(CrdsGossipError::NoPeers) ); - crds.insert(entry, 0).unwrap(); + crds.insert(entry, 0, GossipRoute::LocalMessage).unwrap(); assert_eq!( node.new_pull_request( &thread_pool, @@ -997,7 +1008,8 @@ pub(crate) mod tests { .unwrap() .mock_pong(new.id, new.gossip, Instant::now()); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); - crds.insert(new.clone(), now).unwrap(); + crds.insert(new.clone(), now, GossipRoute::LocalMessage) + .unwrap(); let req = node.new_pull_request( &thread_pool, &crds, @@ -1017,7 +1029,8 @@ pub(crate) mod tests { node.mark_pull_request_creation_time(new.contact_info().unwrap().id, now); let offline = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), now); let offline = CrdsValue::new_unsigned(CrdsData::ContactInfo(offline)); - crds.insert(offline, now).unwrap(); + crds.insert(offline, now, GossipRoute::LocalMessage) + .unwrap(); let req = node.new_pull_request( &thread_pool, &crds, @@ -1052,15 +1065,17 @@ pub(crate) mod tests { 0, ))); let mut node = CrdsGossipPull::default(); - crds.insert(entry, now).unwrap(); + crds.insert(entry, now, GossipRoute::LocalMessage).unwrap(); let old = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); ping_cache.mock_pong(old.id, old.gossip, Instant::now()); let old = CrdsValue::new_unsigned(CrdsData::ContactInfo(old)); - crds.insert(old.clone(), now).unwrap(); + crds.insert(old.clone(), now, GossipRoute::LocalMessage) + .unwrap(); let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); ping_cache.mock_pong(new.id, new.gossip, Instant::now()); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); - crds.insert(new.clone(), now).unwrap(); + crds.insert(new.clone(), now, GossipRoute::LocalMessage) + .unwrap(); // set request creation time to now. let now = now + 50_000; @@ -1145,11 +1160,13 @@ pub(crate) mod tests { ))); let caller = entry.clone(); let node = CrdsGossipPull::default(); - node_crds.insert(entry, 0).unwrap(); + node_crds + .insert(entry, 0, GossipRoute::LocalMessage) + .unwrap(); let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); ping_cache.mock_pong(new.id, new.gossip, Instant::now()); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); - node_crds.insert(new, 0).unwrap(); + node_crds.insert(new, 0, GossipRoute::LocalMessage).unwrap(); let mut pings = Vec::new(); let req = node.new_pull_request( &thread_pool, @@ -1183,7 +1200,11 @@ pub(crate) mod tests { CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, ))); dest_crds - .insert(new, CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS) + .insert( + new, + CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, + GossipRoute::LocalMessage, + ) .unwrap(); //should skip new value since caller is to old @@ -1232,7 +1253,9 @@ pub(crate) mod tests { ))); let caller = entry.clone(); let node = CrdsGossipPull::default(); - node_crds.insert(entry, 0).unwrap(); + node_crds + .insert(entry, 0, GossipRoute::LocalMessage) + .unwrap(); let mut ping_cache = PingCache::new( Duration::from_secs(20 * 60), // ttl 128, // capacity @@ -1240,7 +1263,7 @@ pub(crate) mod tests { let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); ping_cache.mock_pong(new.id, new.gossip, Instant::now()); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); - node_crds.insert(new, 0).unwrap(); + node_crds.insert(new, 0, GossipRoute::LocalMessage).unwrap(); let mut pings = Vec::new(); let req = node.new_pull_request( &thread_pool, @@ -1287,7 +1310,9 @@ pub(crate) mod tests { let caller = entry.clone(); let node_pubkey = entry.label().pubkey(); let mut node = CrdsGossipPull::default(); - node_crds.insert(entry, 0).unwrap(); + node_crds + .insert(entry, 0, GossipRoute::LocalMessage) + .unwrap(); let mut ping_cache = PingCache::new( Duration::from_secs(20 * 60), // ttl 128, // capacity @@ -1295,14 +1320,16 @@ pub(crate) mod tests { let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 1); ping_cache.mock_pong(new.id, new.gossip, Instant::now()); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); - node_crds.insert(new, 0).unwrap(); + node_crds.insert(new, 0, GossipRoute::LocalMessage).unwrap(); let mut dest_crds = Crds::default(); let new_id = solana_sdk::pubkey::new_rand(); let new = ContactInfo::new_localhost(&new_id, 1); ping_cache.mock_pong(new.id, new.gossip, Instant::now()); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); - dest_crds.insert(new.clone(), 0).unwrap(); + dest_crds + .insert(new.clone(), 0, GossipRoute::LocalMessage) + .unwrap(); // node contains a key from the dest node, but at an older local timestamp let same_key = ContactInfo::new_localhost(&new_id, 0); @@ -1310,7 +1337,9 @@ pub(crate) mod tests { let same_key = CrdsValue::new_unsigned(CrdsData::ContactInfo(same_key)); assert_eq!(same_key.label(), new.label()); assert!(same_key.wallclock() < new.wallclock()); - node_crds.insert(same_key.clone(), 0).unwrap(); + node_crds + .insert(same_key.clone(), 0, GossipRoute::LocalMessage) + .unwrap(); assert_eq!(node_crds.get(&same_key.label()).unwrap().local_timestamp, 0); let mut done = false; let mut pings = Vec::new(); @@ -1383,12 +1412,16 @@ pub(crate) mod tests { let node_label = entry.label(); let node_pubkey = node_label.pubkey(); let node = CrdsGossipPull::default(); - node_crds.insert(entry, 0).unwrap(); + node_crds + .insert(entry, 0, GossipRoute::LocalMessage) + .unwrap(); let old = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &solana_sdk::pubkey::new_rand(), 0, ))); - node_crds.insert(old.clone(), 0).unwrap(); + node_crds + .insert(old.clone(), 0, GossipRoute::LocalMessage) + .unwrap(); let value_hash = node_crds.get(&old.label()).unwrap().value_hash; //verify self is valid diff --git a/gossip/src/crds_gossip_push.rs b/gossip/src/crds_gossip_push.rs index 4639dca479..48feae09c0 100644 --- a/gossip/src/crds_gossip_push.rs +++ b/gossip/src/crds_gossip_push.rs @@ -12,7 +12,7 @@ use { crate::{ cluster_info::CRDS_UNIQUE_PUBKEY_CAPACITY, contact_info::ContactInfo, - crds::{Crds, Cursor}, + crds::{Crds, Cursor, GossipRoute}, crds_gossip::{get_stake, get_weight}, crds_gossip_error::CrdsGossipError, crds_value::CrdsValue, @@ -183,10 +183,11 @@ impl CrdsGossipPush { .entry(*from) .and_modify(|(_pruned, timestamp)| *timestamp = now) .or_insert((/*pruned:*/ false, now)); - crds.insert(value, now).map_err(|_| { - self.num_old += 1; - CrdsGossipError::PushMessageOldVersion - }) + crds.insert(value, now, GossipRoute::PushMessage) + .map_err(|_| { + self.num_old += 1; + CrdsGossipError::PushMessageOldVersion + }) } /// New push message to broadcast to peers. @@ -556,7 +557,10 @@ mod test { 0, ))); - assert_eq!(crds.insert(value1.clone(), now), Ok(())); + assert_eq!( + crds.insert(value1.clone(), now, GossipRoute::LocalMessage), + Ok(()) + ); push.refresh_push_active_set( &crds, &HashMap::new(), @@ -574,7 +578,10 @@ mod test { 0, ))); assert!(push.active_set.get(&value2.label().pubkey()).is_none()); - assert_eq!(crds.insert(value2.clone(), now), Ok(())); + assert_eq!( + crds.insert(value2.clone(), now, GossipRoute::LocalMessage), + Ok(()) + ); for _ in 0..30 { push.refresh_push_active_set( &crds, @@ -596,7 +603,10 @@ mod test { let value2 = CrdsValue::new_unsigned(CrdsData::ContactInfo( ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0), )); - assert_eq!(crds.insert(value2.clone(), now), Ok(())); + assert_eq!( + crds.insert(value2.clone(), now, GossipRoute::LocalMessage), + Ok(()) + ); } push.refresh_push_active_set( &crds, @@ -623,7 +633,8 @@ mod test { time, ))); let id = peer.label().pubkey(); - crds.insert(peer.clone(), time).unwrap(); + crds.insert(peer.clone(), time, GossipRoute::LocalMessage) + .unwrap(); stakes.insert(id, i * 100); push.last_pushed_to.put(id, time); } @@ -678,10 +689,14 @@ mod test { ..ContactInfo::default() })); - crds.insert(me.clone(), now).unwrap(); - crds.insert(spy.clone(), now).unwrap(); - crds.insert(node_123.clone(), now).unwrap(); - crds.insert(node_456, now).unwrap(); + crds.insert(me.clone(), now, GossipRoute::LocalMessage) + .unwrap(); + crds.insert(spy.clone(), now, GossipRoute::LocalMessage) + .unwrap(); + crds.insert(node_123.clone(), now, GossipRoute::LocalMessage) + .unwrap(); + crds.insert(node_456, now, GossipRoute::LocalMessage) + .unwrap(); // shred version 123 should ignore nodes with versions 0 and 456 let options = node @@ -735,8 +750,10 @@ mod test { ..ContactInfo::default() })); - crds.insert(me.clone(), 0).unwrap(); - crds.insert(node_123.clone(), now).unwrap(); + crds.insert(me.clone(), 0, GossipRoute::LocalMessage) + .unwrap(); + crds.insert(node_123.clone(), now, GossipRoute::LocalMessage) + .unwrap(); // Unknown pubkey in gossip_validators -- will push to nobody let mut gossip_validators = HashSet::new(); @@ -787,7 +804,10 @@ mod test { &solana_sdk::pubkey::new_rand(), 0, ))); - assert_eq!(crds.insert(peer.clone(), now), Ok(())); + assert_eq!( + crds.insert(peer.clone(), now, GossipRoute::LocalMessage), + Ok(()) + ); push.refresh_push_active_set( &crds, &HashMap::new(), @@ -826,8 +846,14 @@ mod test { CrdsValue::new_unsigned(CrdsData::ContactInfo(peer)) }) .collect(); - assert_eq!(crds.insert(peers[0].clone(), now), Ok(())); - assert_eq!(crds.insert(peers[1].clone(), now), Ok(())); + assert_eq!( + crds.insert(peers[0].clone(), now, GossipRoute::LocalMessage), + Ok(()) + ); + assert_eq!( + crds.insert(peers[1].clone(), now, GossipRoute::LocalMessage), + Ok(()) + ); assert_eq!( push.process_push_message(&mut crds, &Pubkey::default(), peers[2].clone(), now), Ok(()) @@ -862,7 +888,10 @@ mod test { &solana_sdk::pubkey::new_rand(), 0, ))); - assert_eq!(crds.insert(peer.clone(), 0), Ok(())); + assert_eq!( + crds.insert(peer.clone(), 0, GossipRoute::LocalMessage), + Ok(()) + ); push.refresh_push_active_set( &crds, &HashMap::new(), @@ -898,7 +927,7 @@ mod test { &solana_sdk::pubkey::new_rand(), 0, ))); - assert_eq!(crds.insert(peer, 0), Ok(())); + assert_eq!(crds.insert(peer, 0, GossipRoute::LocalMessage), Ok(())); push.refresh_push_active_set( &crds, &HashMap::new(), diff --git a/gossip/src/crds_shards.rs b/gossip/src/crds_shards.rs index 9412ac0a03..f962e8e2a7 100644 --- a/gossip/src/crds_shards.rs +++ b/gossip/src/crds_shards.rs @@ -134,7 +134,10 @@ where mod test { use { super::*, - crate::{crds::Crds, crds_value::CrdsValue}, + crate::{ + crds::{Crds, GossipRoute}, + crds_value::CrdsValue, + }, rand::{thread_rng, Rng}, solana_sdk::timing::timestamp, std::{collections::HashSet, iter::repeat_with, ops::Index}, @@ -144,7 +147,8 @@ mod test { let value = CrdsValue::new_rand(rng, None); let label = value.label(); let mut crds = Crds::default(); - crds.insert(value, timestamp()).unwrap(); + crds.insert(value, timestamp(), GossipRoute::LocalMessage) + .unwrap(); crds.get(&label).cloned().unwrap() } diff --git a/gossip/tests/crds_gossip.rs b/gossip/tests/crds_gossip.rs index 564b6a7e28..958f89832f 100644 --- a/gossip/tests/crds_gossip.rs +++ b/gossip/tests/crds_gossip.rs @@ -7,6 +7,7 @@ use { solana_gossip::{ cluster_info, contact_info::ContactInfo, + crds::GossipRoute, crds_gossip::*, crds_gossip_error::CrdsGossipError, crds_gossip_pull::{ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}, @@ -118,15 +119,21 @@ fn star_network_create(num: usize) -> Network { let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone())); let mut node = CrdsGossip::default(); - node.crds.insert(new.clone(), timestamp()).unwrap(); - node.crds.insert(entry.clone(), timestamp()).unwrap(); + node.crds + .insert(new.clone(), timestamp(), GossipRoute::LocalMessage) + .unwrap(); + node.crds + .insert(entry.clone(), timestamp(), GossipRoute::LocalMessage) + .unwrap(); let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node))); (new.label().pubkey(), node) }) .collect(); let mut node = CrdsGossip::default(); let id = entry.label().pubkey(); - node.crds.insert(entry, timestamp()).unwrap(); + node.crds + .insert(entry, timestamp(), GossipRoute::LocalMessage) + .unwrap(); let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node))); network.insert(id, node); Network::new(network) @@ -138,15 +145,23 @@ fn rstar_network_create(num: usize) -> Network { let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone())); let mut origin = CrdsGossip::default(); let id = entry.label().pubkey(); - origin.crds.insert(entry, timestamp()).unwrap(); + origin + .crds + .insert(entry, timestamp(), GossipRoute::LocalMessage) + .unwrap(); let mut network: HashMap<_, _> = (1..num) .map(|_| { let node_keypair = Arc::new(Keypair::new()); let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone())); let mut node = CrdsGossip::default(); - node.crds.insert(new.clone(), timestamp()).unwrap(); - origin.crds.insert(new.clone(), timestamp()).unwrap(); + node.crds + .insert(new.clone(), timestamp(), GossipRoute::LocalMessage) + .unwrap(); + origin + .crds + .insert(new.clone(), timestamp(), GossipRoute::LocalMessage) + .unwrap(); let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node))); (new.label().pubkey(), node) }) @@ -163,7 +178,9 @@ fn ring_network_create(num: usize) -> Network { let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone())); let mut node = CrdsGossip::default(); - node.crds.insert(new.clone(), timestamp()).unwrap(); + node.crds + .insert(new.clone(), timestamp(), GossipRoute::LocalMessage) + .unwrap(); let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node))); (new.label().pubkey(), node) }) @@ -181,7 +198,7 @@ fn ring_network_create(num: usize) -> Network { end.lock() .unwrap() .crds - .insert(start_info, timestamp()) + .insert(start_info, timestamp(), GossipRoute::LocalMessage) .unwrap(); } Network::new(network) @@ -195,7 +212,9 @@ fn connected_staked_network_create(stakes: &[u64]) -> Network { let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone())); let mut node = CrdsGossip::default(); - node.crds.insert(new.clone(), timestamp()).unwrap(); + node.crds + .insert(new.clone(), timestamp(), GossipRoute::LocalMessage) + .unwrap(); let node = Node::staked( node_keypair, contact_info, @@ -220,7 +239,9 @@ fn connected_staked_network_create(stakes: &[u64]) -> Network { let mut end = end.lock().unwrap(); if keys[k] != *end_pubkey { let start_info = start_entries[k].clone(); - end.crds.insert(start_info, timestamp()).unwrap(); + end.crds + .insert(start_info, timestamp(), GossipRoute::LocalMessage) + .unwrap(); } } } @@ -706,6 +727,7 @@ fn test_prune_errors() { .insert( CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone())), 0, + GossipRoute::LocalMessage, ) .unwrap(); crds_gossip.refresh_push_active_set( diff --git a/rpc/src/rpc_service.rs b/rpc/src/rpc_service.rs index 531ca37335..42813e51b9 100644 --- a/rpc/src/rpc_service.rs +++ b/rpc/src/rpc_service.rs @@ -474,6 +474,7 @@ mod tests { crate::rpc::create_validator_exit, solana_gossip::{ contact_info::ContactInfo, + crds::GossipRoute, crds_value::{CrdsData, CrdsValue, SnapshotHash}, }, solana_ledger::{ @@ -759,6 +760,7 @@ mod tests { ], ))), 1, + GossipRoute::LocalMessage, ) .unwrap(); assert_eq!(rm.health_check(), "ok"); @@ -775,6 +777,7 @@ mod tests { vec![(1000 + health_check_slot_distance - 1, Hash::default())], ))), 1, + GossipRoute::LocalMessage, ) .unwrap(); assert_eq!(rm.health_check(), "ok"); @@ -791,6 +794,7 @@ mod tests { vec![(1000 + health_check_slot_distance, Hash::default())], ))), 1, + GossipRoute::LocalMessage, ) .unwrap(); assert_eq!(rm.health_check(), "behind");