adds metrics tracking gossip crds writes and votes (backport #20953) (#20982)

* adds metrics tracking crds writes and votes (#20953)

(cherry picked from commit 1297a13586)

# Conflicts:
#	core/src/cluster_nodes.rs
#	gossip/benches/crds_shards.rs
#	gossip/src/cluster_info.rs
#	gossip/src/cluster_info_metrics.rs
#	gossip/src/crds_entry.rs
#	gossip/src/crds_gossip.rs
#	gossip/src/crds_gossip_pull.rs
#	gossip/src/crds_gossip_push.rs
#	gossip/src/crds_shards.rs
#	gossip/tests/crds_gossip.rs
#	rpc/src/rpc_service.rs

* updates itertools version in gossip

* removes backport merge conflicts

Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
mergify[bot]
2021-10-26 17:41:45 +00:00
committed by GitHub
parent 6baad8e239
commit 8986bd301c
15 changed files with 554 additions and 119 deletions

2
Cargo.lock generated
View File

@ -4848,7 +4848,7 @@ dependencies = [
"clap",
"flate2",
"indexmap",
"itertools 0.9.0",
"itertools 0.10.1",
"log 0.4.14",
"lru",
"matches",

View File

@ -318,6 +318,7 @@ mod tests {
super::*,
rand::{seq::SliceRandom, Rng},
solana_gossip::{
crds::GossipRoute,
crds_value::{CrdsData, CrdsValue},
deprecated::{
shuffle_peers_and_index, sorted_retransmit_peers_and_stakes,
@ -384,7 +385,10 @@ mod tests {
for node in nodes.iter().skip(1) {
let node = CrdsData::ContactInfo(node.clone());
let node = CrdsValue::new_unsigned(node);
assert_eq!(gossip.crds.insert(node, now), Ok(()));
assert_eq!(
gossip.crds.insert(node, now, GossipRoute::LocalMessage),
Ok(())
);
}
}
(nodes, stakes, cluster_info)

View File

@ -15,7 +15,7 @@ bv = { version = "0.11.1", features = ["serde"] }
clap = "2.33.1"
flate2 = "1.0"
indexmap = { version = "1.5", features = ["rayon"] }
itertools = "0.9.0"
itertools = "0.10.1"
log = "0.4.11"
lru = "0.6.1"
matches = "0.1.8"

View File

@ -6,7 +6,9 @@ use {
rand::{thread_rng, Rng},
rayon::ThreadPoolBuilder,
solana_gossip::{
crds::Crds, crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, crds_value::CrdsValue,
crds::{Crds, GossipRoute},
crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
crds_value::CrdsValue,
},
solana_sdk::pubkey::Pubkey,
std::collections::HashMap,
@ -21,7 +23,7 @@ fn bench_find_old_labels(bencher: &mut Bencher) {
let now = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS + CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 1000;
std::iter::repeat_with(|| (CrdsValue::new_rand(&mut rng, None), rng.gen_range(0, now)))
.take(50_000)
.for_each(|(v, ts)| assert!(crds.insert(v, ts).is_ok()));
.for_each(|(v, ts)| assert!(crds.insert(v, ts, GossipRoute::LocalMessage).is_ok()));
let mut timeouts = HashMap::new();
timeouts.insert(Pubkey::default(), CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS);
bencher.iter(|| {

View File

@ -7,7 +7,7 @@ use {
rayon::ThreadPoolBuilder,
solana_gossip::{
cluster_info::MAX_BLOOM_SIZE,
crds::Crds,
crds::{Crds, GossipRoute},
crds_gossip_pull::{CrdsFilter, CrdsGossipPull},
crds_value::CrdsValue,
},
@ -38,7 +38,11 @@ fn bench_build_crds_filters(bencher: &mut Bencher) {
let mut num_inserts = 0;
for _ in 0..90_000 {
if crds
.insert(CrdsValue::new_rand(&mut rng, None), rng.gen())
.insert(
CrdsValue::new_rand(&mut rng, None),
rng.gen(),
GossipRoute::LocalMessage,
)
.is_ok()
{
num_inserts += 1;

View File

@ -5,7 +5,7 @@ extern crate test;
use {
rand::{thread_rng, Rng},
solana_gossip::{
crds::{Crds, VersionedCrdsValue},
crds::{Crds, GossipRoute, VersionedCrdsValue},
crds_shards::CrdsShards,
crds_value::CrdsValue,
},
@ -20,7 +20,8 @@ fn new_test_crds_value<R: Rng>(rng: &mut R) -> VersionedCrdsValue {
let value = CrdsValue::new_rand(rng, None);
let label = value.label();
let mut crds = Crds::default();
crds.insert(value, timestamp()).unwrap();
crds.insert(value, timestamp(), GossipRoute::LocalMessage)
.unwrap();
crds.get(&label).cloned().unwrap()
}

View File

@ -18,7 +18,7 @@ use {
submit_gossip_stats, Counter, GossipStats, ScopedTimer, TimedGuard,
},
contact_info::ContactInfo,
crds::{Crds, Cursor},
crds::{Crds, Cursor, GossipRoute},
crds_gossip::CrdsGossip,
crds_gossip_error::CrdsGossipError,
crds_gossip_pull::{CrdsFilter, ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS},
@ -492,7 +492,12 @@ impl ClusterInfo {
// TODO kill insert_info, only used by tests
pub fn insert_info(&self, contact_info: ContactInfo) {
let value = CrdsValue::new_signed(CrdsData::ContactInfo(contact_info), &self.keypair);
let _ = self.gossip.write().unwrap().crds.insert(value, timestamp());
let _ =
self.gossip
.write()
.unwrap()
.crds
.insert(value, timestamp(), GossipRoute::LocalMessage);
}
pub fn set_entrypoint(&self, entrypoint: ContactInfo) {
@ -608,7 +613,7 @@ impl ClusterInfo {
let now = timestamp();
let mut gossip = self.gossip.write().unwrap();
for node in nodes {
if let Err(err) = gossip.crds.insert(node, now) {
if let Err(err) = gossip.crds.insert(node, now, GossipRoute::LocalMessage) {
warn!("crds insert failed {:?}", err);
}
}
@ -896,7 +901,7 @@ impl ClusterInfo {
let mut gossip = self.gossip.write().unwrap();
let now = timestamp();
for entry in entries {
if let Err(err) = gossip.crds.insert(entry, now) {
if let Err(err) = gossip.crds.insert(entry, now, GossipRoute::LocalMessage) {
error!("push_epoch_slots failed: {:?}", err);
}
}
@ -959,7 +964,7 @@ impl ClusterInfo {
let vote = CrdsData::Vote(vote_index, vote);
let vote = CrdsValue::new_signed(vote, &self.keypair);
let mut gossip = self.gossip.write().unwrap();
if let Err(err) = gossip.crds.insert(vote, now) {
if let Err(err) = gossip.crds.insert(vote, now, GossipRoute::LocalMessage) {
error!("push_vote failed: {:?}", err);
}
}
@ -1307,7 +1312,12 @@ impl ClusterInfo {
fn insert_self(&self) {
let value =
CrdsValue::new_signed(CrdsData::ContactInfo(self.my_contact_info()), &self.keypair);
let _ = self.gossip.write().unwrap().crds.insert(value, timestamp());
let _ =
self.gossip
.write()
.unwrap()
.crds
.insert(value, timestamp(), GossipRoute::LocalMessage);
}
// If the network entrypoint hasn't been discovered yet, add it to the crds table
@ -1468,7 +1478,7 @@ impl ClusterInfo {
let mut gossip = self.gossip.write().unwrap();
let now = timestamp();
for entry in pending_push_messages {
let _ = gossip.crds.insert(entry, now);
let _ = gossip.crds.insert(entry, now, GossipRoute::LocalMessage);
}
}
fn new_push_requests(
@ -3751,7 +3761,10 @@ mod tests {
{
let mut gossip = cluster_info.gossip.write().unwrap();
for entry in entries {
assert!(gossip.crds.insert(entry, /*now=*/ 0).is_ok());
assert!(gossip
.crds
.insert(entry, /*now=*/ 0, GossipRoute::LocalMessage)
.is_ok());
}
}
// Should exclude other node's epoch-slot because of different
@ -4050,12 +4063,11 @@ mod tests {
0,
LowestSlot::new(other_node_pubkey, peer_lowest, timestamp()),
));
let _ = cluster_info
.gossip
.write()
.unwrap()
.crds
.insert(value, timestamp());
let _ = cluster_info.gossip.write().unwrap().crds.insert(
value,
timestamp(),
GossipRoute::LocalMessage,
);
}
// only half the visible peers should be eligible to serve this repair
assert_eq!(cluster_info.repair_peers(5).len(), 5);

View File

@ -1,7 +1,8 @@
use {
crate::crds_gossip::CrdsGossip,
itertools::Itertools,
solana_measure::measure::Measure,
solana_sdk::pubkey::Pubkey,
solana_sdk::{clock::Slot, pubkey::Pubkey},
std::{
collections::HashMap,
ops::{Deref, DerefMut},
@ -163,9 +164,10 @@ pub(crate) fn submit_gossip_stats(
gossip: &RwLock<CrdsGossip>,
stakes: &HashMap<Pubkey, u64>,
) {
let (table_size, num_nodes, num_pubkeys, purged_values_size, failed_inserts_size) = {
let (crds_stats, table_size, num_nodes, num_pubkeys, purged_values_size, failed_inserts_size) = {
let gossip = gossip.read().unwrap();
(
gossip.crds.take_stats(),
gossip.crds.len(),
gossip.crds.num_nodes(),
gossip.crds.num_pubkeys(),
@ -449,4 +451,155 @@ pub(crate) fn submit_gossip_stats(
i64
),
);
let counts: Vec<_> = crds_stats
.pull
.counts
.iter()
.zip(crds_stats.push.counts.iter())
.map(|(a, b)| a + b)
.collect();
datapoint_info!(
"cluster_info_crds_stats",
("ContactInfo", counts[0], i64),
("ContactInfo-push", crds_stats.push.counts[0], i64),
("ContactInfo-pull", crds_stats.pull.counts[0], i64),
("Vote", counts[1], i64),
("Vote-push", crds_stats.push.counts[1], i64),
("Vote-pull", crds_stats.pull.counts[1], i64),
("LowestSlot", counts[2], i64),
("LowestSlot-push", crds_stats.push.counts[2], i64),
("LowestSlot-pull", crds_stats.pull.counts[2], i64),
("SnapshotHashes", counts[3], i64),
("SnapshotHashes-push", crds_stats.push.counts[3], i64),
("SnapshotHashes-pull", crds_stats.pull.counts[3], i64),
("AccountsHashes", counts[4], i64),
("AccountsHashes-push", crds_stats.push.counts[4], i64),
("AccountsHashes-pull", crds_stats.pull.counts[4], i64),
("EpochSlots", counts[5], i64),
("EpochSlots-push", crds_stats.push.counts[5], i64),
("EpochSlots-pull", crds_stats.pull.counts[5], i64),
("LegacyVersion", counts[6], i64),
("LegacyVersion-push", crds_stats.push.counts[6], i64),
("LegacyVersion-pull", crds_stats.pull.counts[6], i64),
("Version", counts[7], i64),
("Version-push", crds_stats.push.counts[7], i64),
("Version-pull", crds_stats.pull.counts[7], i64),
("NodeInstance", counts[8], i64),
("NodeInstance-push", crds_stats.push.counts[8], i64),
("NodeInstance-pull", crds_stats.pull.counts[8], i64),
("DuplicateShred", counts[9], i64),
("DuplicateShred-push", crds_stats.push.counts[9], i64),
("DuplicateShred-pull", crds_stats.pull.counts[9], i64),
("IncrementalSnapshotHashes", counts[10], i64),
(
"IncrementalSnapshotHashes-push",
crds_stats.push.counts[10],
i64
),
(
"IncrementalSnapshotHashes-pull",
crds_stats.pull.counts[10],
i64
),
("all", counts.iter().sum::<usize>(), i64),
(
"all-push",
crds_stats.push.counts.iter().sum::<usize>(),
i64
),
(
"all-pull",
crds_stats.pull.counts.iter().sum::<usize>(),
i64
),
);
let fails: Vec<_> = crds_stats
.pull
.fails
.iter()
.zip(crds_stats.push.fails.iter())
.map(|(a, b)| a + b)
.collect();
datapoint_info!(
"cluster_info_crds_stats_fails",
("ContactInfo", fails[0], i64),
("ContactInfo-push", crds_stats.push.fails[0], i64),
("ContactInfo-pull", crds_stats.pull.fails[0], i64),
("Vote", fails[1], i64),
("Vote-push", crds_stats.push.fails[1], i64),
("Vote-pull", crds_stats.pull.fails[1], i64),
("LowestSlot", fails[2], i64),
("LowestSlot-push", crds_stats.push.fails[2], i64),
("LowestSlot-pull", crds_stats.pull.fails[2], i64),
("SnapshotHashes", fails[3], i64),
("SnapshotHashes-push", crds_stats.push.fails[3], i64),
("SnapshotHashes-pull", crds_stats.pull.fails[3], i64),
("AccountsHashes", fails[4], i64),
("AccountsHashes-push", crds_stats.push.fails[4], i64),
("AccountsHashes-pull", crds_stats.pull.fails[4], i64),
("EpochSlots", fails[5], i64),
("EpochSlots-push", crds_stats.push.fails[5], i64),
("EpochSlots-pull", crds_stats.pull.fails[5], i64),
("LegacyVersion", fails[6], i64),
("LegacyVersion-push", crds_stats.push.fails[6], i64),
("LegacyVersion-pull", crds_stats.pull.fails[6], i64),
("Version", fails[7], i64),
("Version-push", crds_stats.push.fails[7], i64),
("Version-pull", crds_stats.pull.fails[7], i64),
("NodeInstance", fails[8], i64),
("NodeInstance-push", crds_stats.push.fails[8], i64),
("NodeInstance-pull", crds_stats.pull.fails[8], i64),
("DuplicateShred", fails[9], i64),
("DuplicateShred-push", crds_stats.push.fails[9], i64),
("DuplicateShred-pull", crds_stats.pull.fails[9], i64),
("IncrementalSnapshotHashes", fails[10], i64),
(
"IncrementalSnapshotHashes-push",
crds_stats.push.fails[10],
i64
),
(
"IncrementalSnapshotHashes-pull",
crds_stats.pull.fails[10],
i64
),
("all", fails.iter().sum::<usize>(), i64),
("all-push", crds_stats.push.fails.iter().sum::<usize>(), i64),
("all-pull", crds_stats.pull.fails.iter().sum::<usize>(), i64),
);
for (slot, num_votes) in &crds_stats.pull.votes {
datapoint_info!(
"cluster_info_crds_stats_votes_pull",
("slot", *slot, i64),
("num_votes", *num_votes, i64),
);
}
for (slot, num_votes) in &crds_stats.push.votes {
datapoint_info!(
"cluster_info_crds_stats_votes_push",
("slot", *slot, i64),
("num_votes", *num_votes, i64),
);
}
let votes: HashMap<Slot, usize> = crds_stats
.pull
.votes
.into_iter()
.map(|(slot, num_votes)| (*slot, *num_votes))
.chain(
crds_stats
.push
.votes
.into_iter()
.map(|(slot, num_votes)| (*slot, *num_votes)),
)
.into_grouping_map()
.aggregate(|acc, _slot, num_votes| Some(acc.unwrap_or_default() + num_votes));
for (slot, num_votes) in votes {
datapoint_info!(
"cluster_info_crds_stats_votes",
("slot", slot, i64),
("num_votes", num_votes, i64),
);
}
}

View File

@ -35,9 +35,11 @@ use {
map::{rayon::ParValues, Entry, IndexMap},
set::IndexSet,
},
lru::LruCache,
matches::debug_assert_matches,
rayon::{prelude::*, ThreadPool},
solana_sdk::{
clock::Slot,
hash::{hash, Hash},
pubkey::Pubkey,
},
@ -45,12 +47,14 @@ use {
cmp::Ordering,
collections::{hash_map, BTreeMap, HashMap, VecDeque},
ops::{Bound, Index, IndexMut},
sync::Mutex,
},
};
const CRDS_SHARDS_BITS: u32 = 8;
// Number of vote slots to track in an lru-cache for metrics.
const VOTE_SLOTS_METRICS_CAP: usize = 100;
#[derive(Clone)]
pub struct Crds {
/// Stores the map of labels and values
table: IndexMap<CrdsValueLabel, VersionedCrdsValue>,
@ -69,6 +73,7 @@ pub struct Crds {
purged: VecDeque<(Hash, u64 /*timestamp*/)>,
// Mapping from nodes' pubkeys to their respective shred-version.
shred_versions: HashMap<Pubkey, u16>,
stats: Mutex<CrdsStats>,
}
#[derive(PartialEq, Debug)]
@ -77,6 +82,28 @@ pub enum CrdsError {
UnknownStakes,
}
#[derive(Clone, Copy)]
pub enum GossipRoute {
LocalMessage,
PullRequest,
PullResponse,
PushMessage,
}
type CrdsCountsArray = [usize; 11];
pub(crate) struct CrdsDataStats {
pub(crate) counts: CrdsCountsArray,
pub(crate) fails: CrdsCountsArray,
pub(crate) votes: LruCache<Slot, /*count:*/ usize>,
}
#[derive(Default)]
pub(crate) struct CrdsStats {
pub(crate) pull: CrdsDataStats,
pub(crate) push: CrdsDataStats,
}
/// This structure stores some local metadata associated with the CrdsValue
#[derive(PartialEq, Debug, Clone)]
pub struct VersionedCrdsValue {
@ -129,6 +156,7 @@ impl Default for Crds {
entries: BTreeMap::default(),
purged: VecDeque::default(),
shred_versions: HashMap::default(),
stats: Mutex::<CrdsStats>::default(),
}
}
}
@ -169,12 +197,18 @@ impl Crds {
}
}
pub fn insert(&mut self, value: CrdsValue, now: u64) -> Result<(), CrdsError> {
pub fn insert(
&mut self,
value: CrdsValue,
now: u64,
route: GossipRoute,
) -> Result<(), CrdsError> {
let label = value.label();
let pubkey = value.pubkey();
let value = VersionedCrdsValue::new(value, self.cursor, now);
match self.table.entry(label) {
Entry::Vacant(entry) => {
self.stats.lock().unwrap().record_insert(&value, route);
let entry_index = entry.index();
self.shards.insert(entry_index, &value);
match &value.value.data {
@ -197,6 +231,7 @@ impl Crds {
Ok(())
}
Entry::Occupied(mut entry) if overrides(&value.value, entry.get()) => {
self.stats.lock().unwrap().record_insert(&value, route);
let entry_index = entry.index();
self.shards.remove(entry_index, entry.get());
self.shards.insert(entry_index, &value);
@ -228,6 +263,7 @@ impl Crds {
Ok(())
}
Entry::Occupied(entry) => {
self.stats.lock().unwrap().record_fail(&value, route);
trace!(
"INSERT FAILED data: {} new.wallclock: {}",
value.value.label(),
@ -562,6 +598,88 @@ impl Crds {
}
Ok(keys.len())
}
pub(crate) fn take_stats(&self) -> CrdsStats {
std::mem::take(&mut self.stats.lock().unwrap())
}
// Only for tests and simulations.
pub(crate) fn mock_clone(&self) -> Self {
Self {
table: self.table.clone(),
cursor: self.cursor,
shards: self.shards.clone(),
nodes: self.nodes.clone(),
votes: self.votes.clone(),
epoch_slots: self.epoch_slots.clone(),
records: self.records.clone(),
entries: self.entries.clone(),
purged: self.purged.clone(),
shred_versions: self.shred_versions.clone(),
stats: Mutex::<CrdsStats>::default(),
}
}
}
impl Default for CrdsDataStats {
fn default() -> Self {
Self {
counts: CrdsCountsArray::default(),
fails: CrdsCountsArray::default(),
votes: LruCache::new(VOTE_SLOTS_METRICS_CAP),
}
}
}
impl CrdsDataStats {
fn record_insert(&mut self, entry: &VersionedCrdsValue) {
self.counts[Self::ordinal(entry)] += 1;
if let CrdsData::Vote(_, vote) = &entry.value.data {
if let Some(slot) = vote.slot() {
let num_nodes = self.votes.get(&slot).copied().unwrap_or_default();
self.votes.put(slot, num_nodes + 1);
}
}
}
fn record_fail(&mut self, entry: &VersionedCrdsValue) {
self.fails[Self::ordinal(entry)] += 1;
}
fn ordinal(entry: &VersionedCrdsValue) -> usize {
match &entry.value.data {
CrdsData::ContactInfo(_) => 0,
CrdsData::Vote(_, _) => 1,
CrdsData::LowestSlot(_, _) => 2,
CrdsData::SnapshotHashes(_) => 3,
CrdsData::AccountsHashes(_) => 4,
CrdsData::EpochSlots(_, _) => 5,
CrdsData::LegacyVersion(_) => 6,
CrdsData::Version(_) => 7,
CrdsData::NodeInstance(_) => 8,
CrdsData::DuplicateShred(_, _) => 9,
}
}
}
impl CrdsStats {
fn record_insert(&mut self, entry: &VersionedCrdsValue, route: GossipRoute) {
match route {
GossipRoute::LocalMessage => (),
GossipRoute::PullRequest => (),
GossipRoute::PushMessage => self.push.record_insert(entry),
GossipRoute::PullResponse => self.pull.record_insert(entry),
}
}
fn record_fail(&mut self, entry: &VersionedCrdsValue, route: GossipRoute) {
match route {
GossipRoute::LocalMessage => (),
GossipRoute::PullRequest => (),
GossipRoute::PushMessage => self.push.record_fail(entry),
GossipRoute::PullResponse => self.pull.record_fail(entry),
}
}
}
#[cfg(test)]
@ -586,7 +704,10 @@ mod tests {
fn test_insert() {
let mut crds = Crds::default();
let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
assert_eq!(crds.insert(val.clone(), 0), Ok(()));
assert_eq!(
crds.insert(val.clone(), 0, GossipRoute::LocalMessage),
Ok(())
);
assert_eq!(crds.table.len(), 1);
assert!(crds.table.contains_key(&val.label()));
assert_eq!(crds.table[&val.label()].local_timestamp, 0);
@ -595,8 +716,14 @@ mod tests {
fn test_update_old() {
let mut crds = Crds::default();
let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
assert_eq!(crds.insert(val.clone(), 0), Ok(()));
assert_eq!(crds.insert(val.clone(), 1), Err(CrdsError::InsertFailed));
assert_eq!(
crds.insert(val.clone(), 0, GossipRoute::LocalMessage),
Ok(())
);
assert_eq!(
crds.insert(val.clone(), 1, GossipRoute::LocalMessage),
Err(CrdsError::InsertFailed)
);
assert!(crds.purged.is_empty());
assert_eq!(crds.table[&val.label()].local_timestamp, 0);
}
@ -608,12 +735,15 @@ mod tests {
0,
)));
let value_hash = hash(&serialize(&original).unwrap());
assert_matches!(crds.insert(original, 0), Ok(()));
assert_matches!(crds.insert(original, 0, GossipRoute::LocalMessage), Ok(()));
let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::default(),
1,
)));
assert_eq!(crds.insert(val.clone(), 1), Ok(()));
assert_eq!(
crds.insert(val.clone(), 1, GossipRoute::LocalMessage),
Ok(())
);
assert_eq!(*crds.purged.back().unwrap(), (value_hash, 1));
assert_eq!(crds.table[&val.label()].local_timestamp, 1);
}
@ -624,13 +754,19 @@ mod tests {
&Pubkey::default(),
0,
)));
assert_eq!(crds.insert(val.clone(), 0), Ok(()));
assert_eq!(
crds.insert(val.clone(), 0, GossipRoute::LocalMessage),
Ok(())
);
assert_eq!(crds.table[&val.label()].ordinal, 0);
let val2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
let value_hash = hash(&serialize(&val2).unwrap());
assert_eq!(val2.label().pubkey(), val.label().pubkey());
assert_eq!(crds.insert(val2.clone(), 0), Ok(()));
assert_eq!(
crds.insert(val2.clone(), 0, GossipRoute::LocalMessage),
Ok(())
);
crds.update_record_timestamp(&val.label().pubkey(), 2);
assert_eq!(crds.table[&val.label()].local_timestamp, 2);
@ -645,7 +781,7 @@ mod tests {
let mut ci = ContactInfo::default();
ci.wallclock += 1;
let val3 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci));
assert_eq!(crds.insert(val3, 3), Ok(()));
assert_eq!(crds.insert(val3, 3, GossipRoute::LocalMessage), Ok(()));
assert_eq!(*crds.purged.back().unwrap(), (value_hash, 3));
assert_eq!(crds.table[&val2.label()].local_timestamp, 3);
assert_eq!(crds.table[&val2.label()].ordinal, 2);
@ -663,19 +799,22 @@ mod tests {
let pubkey = Pubkey::new_unique();
let node = NodeInstance::new(&mut rng, pubkey, now);
let node = make_crds_value(node);
assert_eq!(crds.insert(node, now), Ok(()));
assert_eq!(crds.insert(node, now, GossipRoute::LocalMessage), Ok(()));
// A node-instance with a different key should insert fine even with
// older timestamps.
let other = NodeInstance::new(&mut rng, Pubkey::new_unique(), now - 1);
let other = make_crds_value(other);
assert_eq!(crds.insert(other, now), Ok(()));
assert_eq!(crds.insert(other, now, GossipRoute::LocalMessage), Ok(()));
// A node-instance with older timestamp should fail to insert, even if
// the wallclock is more recent.
let other = NodeInstance::new(&mut rng, pubkey, now - 1);
let other = other.with_wallclock(now + 1);
let other = make_crds_value(other);
let value_hash = hash(&serialize(&other).unwrap());
assert_eq!(crds.insert(other, now), Err(CrdsError::InsertFailed));
assert_eq!(
crds.insert(other, now, GossipRoute::LocalMessage),
Err(CrdsError::InsertFailed)
);
assert_eq!(*crds.purged.back().unwrap(), (value_hash, now));
// A node instance with the same timestamp should insert only if the
// random token is larger.
@ -684,7 +823,7 @@ mod tests {
let other = NodeInstance::new(&mut rng, pubkey, now);
let other = make_crds_value(other);
let value_hash = hash(&serialize(&other).unwrap());
match crds.insert(other, now) {
match crds.insert(other, now, GossipRoute::LocalMessage) {
Ok(()) => num_overrides += 1,
Err(CrdsError::InsertFailed) => {
assert_eq!(*crds.purged.back().unwrap(), (value_hash, now))
@ -699,7 +838,7 @@ mod tests {
let other = NodeInstance::new(&mut rng, pubkey, now + k);
let other = other.with_wallclock(now - 1);
let other = make_crds_value(other);
match crds.insert(other, now) {
match crds.insert(other, now, GossipRoute::LocalMessage) {
Ok(()) => (),
_ => panic!(),
}
@ -711,7 +850,10 @@ mod tests {
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
let mut crds = Crds::default();
let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
assert_eq!(crds.insert(val.clone(), 1), Ok(()));
assert_eq!(
crds.insert(val.clone(), 1, GossipRoute::LocalMessage),
Ok(())
);
let mut set = HashMap::new();
set.insert(Pubkey::default(), 0);
assert!(crds.find_old_labels(&thread_pool, 0, &set).is_empty());
@ -734,7 +876,10 @@ mod tests {
let mut timeouts = HashMap::new();
let val = CrdsValue::new_rand(&mut rng, None);
timeouts.insert(Pubkey::default(), 3);
assert_eq!(crds.insert(val.clone(), 0), Ok(()));
assert_eq!(
crds.insert(val.clone(), 0, GossipRoute::LocalMessage),
Ok(())
);
assert!(crds.find_old_labels(&thread_pool, 2, &timeouts).is_empty());
timeouts.insert(val.pubkey(), 1);
assert_eq!(
@ -757,7 +902,10 @@ mod tests {
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
let mut crds = Crds::default();
let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
assert_matches!(crds.insert(val.clone(), 1), Ok(_));
assert_matches!(
crds.insert(val.clone(), 1, GossipRoute::LocalMessage),
Ok(_)
);
let mut set = HashMap::new();
set.insert(Pubkey::default(), 1);
assert_eq!(
@ -772,7 +920,10 @@ mod tests {
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
let mut crds = Crds::default();
let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
assert_eq!(crds.insert(val.clone(), 1), Ok(()));
assert_eq!(
crds.insert(val.clone(), 1, GossipRoute::LocalMessage),
Ok(())
);
let mut set = HashMap::new();
//now < timestamp
set.insert(Pubkey::default(), 0);
@ -814,7 +965,7 @@ mod tests {
let keypair = &keypairs[rng.gen_range(0, keypairs.len())];
let value = CrdsValue::new_rand(&mut rng, Some(keypair));
let local_timestamp = new_rand_timestamp(&mut rng);
if let Ok(()) = crds.insert(value, local_timestamp) {
if let Ok(()) = crds.insert(value, local_timestamp, GossipRoute::LocalMessage) {
num_inserts += 1;
check_crds_shards(&crds);
}
@ -968,7 +1119,7 @@ mod tests {
let keypair = &keypairs[rng.gen_range(0, keypairs.len())];
let value = CrdsValue::new_rand(&mut rng, Some(keypair));
let local_timestamp = new_rand_timestamp(&mut rng);
if let Ok(()) = crds.insert(value, local_timestamp) {
if let Ok(()) = crds.insert(value, local_timestamp, GossipRoute::LocalMessage) {
num_inserts += 1;
}
if k % 16 == 0 {
@ -1022,7 +1173,7 @@ mod tests {
let keypair = &keypairs[rng.gen_range(0, keypairs.len())];
let value = CrdsValue::new_rand(&mut rng, Some(keypair));
let local_timestamp = new_rand_timestamp(&mut rng);
let _ = crds.insert(value, local_timestamp);
let _ = crds.insert(value, local_timestamp, GossipRoute::LocalMessage);
if k % 64 == 0 {
check_crds_records(&crds);
}
@ -1053,7 +1204,10 @@ mod tests {
node.shred_version = 42;
let node = CrdsData::ContactInfo(node);
let node = CrdsValue::new_unsigned(node);
assert_eq!(crds.insert(node, timestamp()), Ok(()));
assert_eq!(
crds.insert(node, timestamp(), GossipRoute::LocalMessage),
Ok(())
);
assert_eq!(crds.get_shred_version(&pubkey), Some(42));
// An outdated value should not update shred-version:
let mut node = ContactInfo::new_rand(&mut rng, Some(pubkey));
@ -1061,7 +1215,10 @@ mod tests {
node.shred_version = 8;
let node = CrdsData::ContactInfo(node);
let node = CrdsValue::new_unsigned(node);
assert_eq!(crds.insert(node, timestamp()), Err(CrdsError::InsertFailed));
assert_eq!(
crds.insert(node, timestamp(), GossipRoute::LocalMessage),
Err(CrdsError::InsertFailed)
);
assert_eq!(crds.get_shred_version(&pubkey), Some(42));
// Update shred version:
let mut node = ContactInfo::new_rand(&mut rng, Some(pubkey));
@ -1069,13 +1226,19 @@ mod tests {
node.shred_version = 8;
let node = CrdsData::ContactInfo(node);
let node = CrdsValue::new_unsigned(node);
assert_eq!(crds.insert(node, timestamp()), Ok(()));
assert_eq!(
crds.insert(node, timestamp(), GossipRoute::LocalMessage),
Ok(())
);
assert_eq!(crds.get_shred_version(&pubkey), Some(8));
// Add other crds values with the same pubkey.
let val = SnapshotHash::new_rand(&mut rng, Some(pubkey));
let val = CrdsData::SnapshotHashes(val);
let val = CrdsValue::new_unsigned(val);
assert_eq!(crds.insert(val, timestamp()), Ok(()));
assert_eq!(
crds.insert(val, timestamp(), GossipRoute::LocalMessage),
Ok(())
);
assert_eq!(crds.get_shred_version(&pubkey), Some(8));
// Remove contact-info. Shred version should stay there since there
// are still values associated with the pubkey.
@ -1112,7 +1275,7 @@ mod tests {
let keypair = &keypairs[rng.gen_range(0, keypairs.len())];
let value = CrdsValue::new_rand(&mut rng, Some(keypair));
let local_timestamp = new_rand_timestamp(&mut rng);
let _ = crds.insert(value, local_timestamp);
let _ = crds.insert(value, local_timestamp, GossipRoute::LocalMessage);
}
let num_values = crds.table.len();
let num_pubkeys = num_unique_pubkeys(crds.table.values());
@ -1153,7 +1316,10 @@ mod tests {
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
let mut crds = Crds::default();
let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
assert_matches!(crds.insert(val.clone(), 1), Ok(_));
assert_matches!(
crds.insert(val.clone(), 1, GossipRoute::LocalMessage),
Ok(_)
);
let mut set = HashMap::new();
//default has max timeout, but pubkey should still expire

View File

@ -7,7 +7,7 @@ use {
crate::{
cluster_info::Ping,
contact_info::ContactInfo,
crds::Crds,
crds::{Crds, GossipRoute},
crds_gossip_error::CrdsGossipError,
crds_gossip_pull::{CrdsFilter, CrdsGossipPull, ProcessPullStats},
crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE},
@ -88,7 +88,7 @@ impl CrdsGossip {
now: u64,
) -> HashMap<Pubkey, Vec<CrdsValue>> {
for entry in pending_push_messages {
let _ = self.crds.insert(entry, now);
let _ = self.crds.insert(entry, now, GossipRoute::LocalMessage);
}
self.push.new_push_messages(&self.crds, now)
}
@ -150,7 +150,7 @@ impl CrdsGossip {
});
let now = timestamp();
for entry in entries {
if let Err(err) = self.crds.insert(entry, now) {
if let Err(err) = self.crds.insert(entry, now, GossipRoute::LocalMessage) {
error!("push_duplicate_shred faild: {:?}", err);
}
}
@ -334,7 +334,7 @@ impl CrdsGossip {
// Only for tests and simulations.
pub(crate) fn mock_clone(&self) -> Self {
Self {
crds: self.crds.clone(),
crds: self.crds.mock_clone(),
push: self.push.mock_clone(),
pull: self.pull.mock_clone(),
}
@ -377,6 +377,7 @@ mod test {
.insert(
CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone())),
0,
GossipRoute::LocalMessage,
)
.unwrap();
crds_gossip.refresh_push_active_set(

View File

@ -13,7 +13,7 @@ use {
crate::{
cluster_info::{Ping, CRDS_UNIQUE_PUBKEY_CAPACITY},
contact_info::ContactInfo,
crds::{Crds, VersionedCrdsValue},
crds::{Crds, GossipRoute, VersionedCrdsValue},
crds_gossip::{get_stake, get_weight},
crds_gossip_error::CrdsGossipError,
crds_value::CrdsValue,
@ -333,7 +333,7 @@ impl CrdsGossipPull {
{
for caller in callers {
let key = caller.pubkey();
let _ = crds.insert(caller, now);
let _ = crds.insert(caller, now, GossipRoute::PullRequest);
crds.update_record_timestamp(&key, now);
}
}
@ -413,11 +413,11 @@ impl CrdsGossipPull {
) {
let mut owners = HashSet::new();
for response in responses_expired_timeout {
let _ = crds.insert(response, now);
let _ = crds.insert(response, now, GossipRoute::PullResponse);
}
for response in responses {
let owner = response.pubkey();
if let Ok(()) = crds.insert(response, now) {
if let Ok(()) = crds.insert(response, now, GossipRoute::PullResponse) {
stats.success += 1;
self.num_pulls += 1;
owners.insert(owner);
@ -688,14 +688,16 @@ pub(crate) mod tests {
&solana_sdk::pubkey::new_rand(),
0,
)));
crds.insert(me.clone(), 0).unwrap();
crds.insert(me.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
for i in 1..=30 {
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&solana_sdk::pubkey::new_rand(),
0,
)));
let id = entry.label().pubkey();
crds.insert(entry.clone(), 0).unwrap();
crds.insert(entry.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
stakes.insert(id, i * 100);
}
let now = 1024;
@ -750,10 +752,14 @@ pub(crate) mod tests {
..ContactInfo::default()
}));
crds.insert(me.clone(), 0).unwrap();
crds.insert(spy.clone(), 0).unwrap();
crds.insert(node_123.clone(), 0).unwrap();
crds.insert(node_456.clone(), 0).unwrap();
crds.insert(me.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
crds.insert(spy.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
crds.insert(node_123.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
crds.insert(node_456.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
// shred version 123 should ignore nodes with versions 0 and 456
let options = node
@ -811,8 +817,10 @@ pub(crate) mod tests {
..ContactInfo::default()
}));
crds.insert(me.clone(), 0).unwrap();
crds.insert(node_123.clone(), 0).unwrap();
crds.insert(me.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
crds.insert(node_123.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
// Empty gossip_validators -- will pull from nobody
let mut gossip_validators = HashSet::new();
@ -914,7 +922,10 @@ pub(crate) mod tests {
for _ in 0..40_000 {
let keypair = keypairs.choose(&mut rng).unwrap();
let value = CrdsValue::new_rand(&mut rng, Some(keypair));
if crds.insert(value, rng.gen()).is_ok() {
if crds
.insert(value, rng.gen(), GossipRoute::LocalMessage)
.is_ok()
{
num_inserts += 1;
}
}
@ -973,7 +984,7 @@ pub(crate) mod tests {
Err(CrdsGossipError::NoPeers)
);
crds.insert(entry, 0).unwrap();
crds.insert(entry, 0, GossipRoute::LocalMessage).unwrap();
assert_eq!(
node.new_pull_request(
&thread_pool,
@ -997,7 +1008,8 @@ pub(crate) mod tests {
.unwrap()
.mock_pong(new.id, new.gossip, Instant::now());
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new));
crds.insert(new.clone(), now).unwrap();
crds.insert(new.clone(), now, GossipRoute::LocalMessage)
.unwrap();
let req = node.new_pull_request(
&thread_pool,
&crds,
@ -1017,7 +1029,8 @@ pub(crate) mod tests {
node.mark_pull_request_creation_time(new.contact_info().unwrap().id, now);
let offline = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), now);
let offline = CrdsValue::new_unsigned(CrdsData::ContactInfo(offline));
crds.insert(offline, now).unwrap();
crds.insert(offline, now, GossipRoute::LocalMessage)
.unwrap();
let req = node.new_pull_request(
&thread_pool,
&crds,
@ -1052,15 +1065,17 @@ pub(crate) mod tests {
0,
)));
let mut node = CrdsGossipPull::default();
crds.insert(entry, now).unwrap();
crds.insert(entry, now, GossipRoute::LocalMessage).unwrap();
let old = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
ping_cache.mock_pong(old.id, old.gossip, Instant::now());
let old = CrdsValue::new_unsigned(CrdsData::ContactInfo(old));
crds.insert(old.clone(), now).unwrap();
crds.insert(old.clone(), now, GossipRoute::LocalMessage)
.unwrap();
let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
ping_cache.mock_pong(new.id, new.gossip, Instant::now());
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new));
crds.insert(new.clone(), now).unwrap();
crds.insert(new.clone(), now, GossipRoute::LocalMessage)
.unwrap();
// set request creation time to now.
let now = now + 50_000;
@ -1145,11 +1160,13 @@ pub(crate) mod tests {
)));
let caller = entry.clone();
let node = CrdsGossipPull::default();
node_crds.insert(entry, 0).unwrap();
node_crds
.insert(entry, 0, GossipRoute::LocalMessage)
.unwrap();
let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
ping_cache.mock_pong(new.id, new.gossip, Instant::now());
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new));
node_crds.insert(new, 0).unwrap();
node_crds.insert(new, 0, GossipRoute::LocalMessage).unwrap();
let mut pings = Vec::new();
let req = node.new_pull_request(
&thread_pool,
@ -1183,7 +1200,11 @@ pub(crate) mod tests {
CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS,
)));
dest_crds
.insert(new, CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS)
.insert(
new,
CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS,
GossipRoute::LocalMessage,
)
.unwrap();
//should skip new value since caller is to old
@ -1232,7 +1253,9 @@ pub(crate) mod tests {
)));
let caller = entry.clone();
let node = CrdsGossipPull::default();
node_crds.insert(entry, 0).unwrap();
node_crds
.insert(entry, 0, GossipRoute::LocalMessage)
.unwrap();
let mut ping_cache = PingCache::new(
Duration::from_secs(20 * 60), // ttl
128, // capacity
@ -1240,7 +1263,7 @@ pub(crate) mod tests {
let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
ping_cache.mock_pong(new.id, new.gossip, Instant::now());
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new));
node_crds.insert(new, 0).unwrap();
node_crds.insert(new, 0, GossipRoute::LocalMessage).unwrap();
let mut pings = Vec::new();
let req = node.new_pull_request(
&thread_pool,
@ -1287,7 +1310,9 @@ pub(crate) mod tests {
let caller = entry.clone();
let node_pubkey = entry.label().pubkey();
let mut node = CrdsGossipPull::default();
node_crds.insert(entry, 0).unwrap();
node_crds
.insert(entry, 0, GossipRoute::LocalMessage)
.unwrap();
let mut ping_cache = PingCache::new(
Duration::from_secs(20 * 60), // ttl
128, // capacity
@ -1295,14 +1320,16 @@ pub(crate) mod tests {
let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 1);
ping_cache.mock_pong(new.id, new.gossip, Instant::now());
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new));
node_crds.insert(new, 0).unwrap();
node_crds.insert(new, 0, GossipRoute::LocalMessage).unwrap();
let mut dest_crds = Crds::default();
let new_id = solana_sdk::pubkey::new_rand();
let new = ContactInfo::new_localhost(&new_id, 1);
ping_cache.mock_pong(new.id, new.gossip, Instant::now());
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new));
dest_crds.insert(new.clone(), 0).unwrap();
dest_crds
.insert(new.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
// node contains a key from the dest node, but at an older local timestamp
let same_key = ContactInfo::new_localhost(&new_id, 0);
@ -1310,7 +1337,9 @@ pub(crate) mod tests {
let same_key = CrdsValue::new_unsigned(CrdsData::ContactInfo(same_key));
assert_eq!(same_key.label(), new.label());
assert!(same_key.wallclock() < new.wallclock());
node_crds.insert(same_key.clone(), 0).unwrap();
node_crds
.insert(same_key.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
assert_eq!(node_crds.get(&same_key.label()).unwrap().local_timestamp, 0);
let mut done = false;
let mut pings = Vec::new();
@ -1383,12 +1412,16 @@ pub(crate) mod tests {
let node_label = entry.label();
let node_pubkey = node_label.pubkey();
let node = CrdsGossipPull::default();
node_crds.insert(entry, 0).unwrap();
node_crds
.insert(entry, 0, GossipRoute::LocalMessage)
.unwrap();
let old = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&solana_sdk::pubkey::new_rand(),
0,
)));
node_crds.insert(old.clone(), 0).unwrap();
node_crds
.insert(old.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
let value_hash = node_crds.get(&old.label()).unwrap().value_hash;
//verify self is valid

View File

@ -12,7 +12,7 @@ use {
crate::{
cluster_info::CRDS_UNIQUE_PUBKEY_CAPACITY,
contact_info::ContactInfo,
crds::{Crds, Cursor},
crds::{Crds, Cursor, GossipRoute},
crds_gossip::{get_stake, get_weight},
crds_gossip_error::CrdsGossipError,
crds_value::CrdsValue,
@ -183,7 +183,8 @@ impl CrdsGossipPush {
.entry(*from)
.and_modify(|(_pruned, timestamp)| *timestamp = now)
.or_insert((/*pruned:*/ false, now));
crds.insert(value, now).map_err(|_| {
crds.insert(value, now, GossipRoute::PushMessage)
.map_err(|_| {
self.num_old += 1;
CrdsGossipError::PushMessageOldVersion
})
@ -556,7 +557,10 @@ mod test {
0,
)));
assert_eq!(crds.insert(value1.clone(), now), Ok(()));
assert_eq!(
crds.insert(value1.clone(), now, GossipRoute::LocalMessage),
Ok(())
);
push.refresh_push_active_set(
&crds,
&HashMap::new(),
@ -574,7 +578,10 @@ mod test {
0,
)));
assert!(push.active_set.get(&value2.label().pubkey()).is_none());
assert_eq!(crds.insert(value2.clone(), now), Ok(()));
assert_eq!(
crds.insert(value2.clone(), now, GossipRoute::LocalMessage),
Ok(())
);
for _ in 0..30 {
push.refresh_push_active_set(
&crds,
@ -596,7 +603,10 @@ mod test {
let value2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(
ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0),
));
assert_eq!(crds.insert(value2.clone(), now), Ok(()));
assert_eq!(
crds.insert(value2.clone(), now, GossipRoute::LocalMessage),
Ok(())
);
}
push.refresh_push_active_set(
&crds,
@ -623,7 +633,8 @@ mod test {
time,
)));
let id = peer.label().pubkey();
crds.insert(peer.clone(), time).unwrap();
crds.insert(peer.clone(), time, GossipRoute::LocalMessage)
.unwrap();
stakes.insert(id, i * 100);
push.last_pushed_to.put(id, time);
}
@ -678,10 +689,14 @@ mod test {
..ContactInfo::default()
}));
crds.insert(me.clone(), now).unwrap();
crds.insert(spy.clone(), now).unwrap();
crds.insert(node_123.clone(), now).unwrap();
crds.insert(node_456, now).unwrap();
crds.insert(me.clone(), now, GossipRoute::LocalMessage)
.unwrap();
crds.insert(spy.clone(), now, GossipRoute::LocalMessage)
.unwrap();
crds.insert(node_123.clone(), now, GossipRoute::LocalMessage)
.unwrap();
crds.insert(node_456, now, GossipRoute::LocalMessage)
.unwrap();
// shred version 123 should ignore nodes with versions 0 and 456
let options = node
@ -735,8 +750,10 @@ mod test {
..ContactInfo::default()
}));
crds.insert(me.clone(), 0).unwrap();
crds.insert(node_123.clone(), now).unwrap();
crds.insert(me.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
crds.insert(node_123.clone(), now, GossipRoute::LocalMessage)
.unwrap();
// Unknown pubkey in gossip_validators -- will push to nobody
let mut gossip_validators = HashSet::new();
@ -787,7 +804,10 @@ mod test {
&solana_sdk::pubkey::new_rand(),
0,
)));
assert_eq!(crds.insert(peer.clone(), now), Ok(()));
assert_eq!(
crds.insert(peer.clone(), now, GossipRoute::LocalMessage),
Ok(())
);
push.refresh_push_active_set(
&crds,
&HashMap::new(),
@ -826,8 +846,14 @@ mod test {
CrdsValue::new_unsigned(CrdsData::ContactInfo(peer))
})
.collect();
assert_eq!(crds.insert(peers[0].clone(), now), Ok(()));
assert_eq!(crds.insert(peers[1].clone(), now), Ok(()));
assert_eq!(
crds.insert(peers[0].clone(), now, GossipRoute::LocalMessage),
Ok(())
);
assert_eq!(
crds.insert(peers[1].clone(), now, GossipRoute::LocalMessage),
Ok(())
);
assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), peers[2].clone(), now),
Ok(())
@ -862,7 +888,10 @@ mod test {
&solana_sdk::pubkey::new_rand(),
0,
)));
assert_eq!(crds.insert(peer.clone(), 0), Ok(()));
assert_eq!(
crds.insert(peer.clone(), 0, GossipRoute::LocalMessage),
Ok(())
);
push.refresh_push_active_set(
&crds,
&HashMap::new(),
@ -898,7 +927,7 @@ mod test {
&solana_sdk::pubkey::new_rand(),
0,
)));
assert_eq!(crds.insert(peer, 0), Ok(()));
assert_eq!(crds.insert(peer, 0, GossipRoute::LocalMessage), Ok(()));
push.refresh_push_active_set(
&crds,
&HashMap::new(),

View File

@ -134,7 +134,10 @@ where
mod test {
use {
super::*,
crate::{crds::Crds, crds_value::CrdsValue},
crate::{
crds::{Crds, GossipRoute},
crds_value::CrdsValue,
},
rand::{thread_rng, Rng},
solana_sdk::timing::timestamp,
std::{collections::HashSet, iter::repeat_with, ops::Index},
@ -144,7 +147,8 @@ mod test {
let value = CrdsValue::new_rand(rng, None);
let label = value.label();
let mut crds = Crds::default();
crds.insert(value, timestamp()).unwrap();
crds.insert(value, timestamp(), GossipRoute::LocalMessage)
.unwrap();
crds.get(&label).cloned().unwrap()
}

View File

@ -7,6 +7,7 @@ use {
solana_gossip::{
cluster_info,
contact_info::ContactInfo,
crds::GossipRoute,
crds_gossip::*,
crds_gossip_error::CrdsGossipError,
crds_gossip_pull::{ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS},
@ -118,15 +119,21 @@ fn star_network_create(num: usize) -> Network {
let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone()));
let mut node = CrdsGossip::default();
node.crds.insert(new.clone(), timestamp()).unwrap();
node.crds.insert(entry.clone(), timestamp()).unwrap();
node.crds
.insert(new.clone(), timestamp(), GossipRoute::LocalMessage)
.unwrap();
node.crds
.insert(entry.clone(), timestamp(), GossipRoute::LocalMessage)
.unwrap();
let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node)));
(new.label().pubkey(), node)
})
.collect();
let mut node = CrdsGossip::default();
let id = entry.label().pubkey();
node.crds.insert(entry, timestamp()).unwrap();
node.crds
.insert(entry, timestamp(), GossipRoute::LocalMessage)
.unwrap();
let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node)));
network.insert(id, node);
Network::new(network)
@ -138,15 +145,23 @@ fn rstar_network_create(num: usize) -> Network {
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone()));
let mut origin = CrdsGossip::default();
let id = entry.label().pubkey();
origin.crds.insert(entry, timestamp()).unwrap();
origin
.crds
.insert(entry, timestamp(), GossipRoute::LocalMessage)
.unwrap();
let mut network: HashMap<_, _> = (1..num)
.map(|_| {
let node_keypair = Arc::new(Keypair::new());
let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone()));
let mut node = CrdsGossip::default();
node.crds.insert(new.clone(), timestamp()).unwrap();
origin.crds.insert(new.clone(), timestamp()).unwrap();
node.crds
.insert(new.clone(), timestamp(), GossipRoute::LocalMessage)
.unwrap();
origin
.crds
.insert(new.clone(), timestamp(), GossipRoute::LocalMessage)
.unwrap();
let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node)));
(new.label().pubkey(), node)
})
@ -163,7 +178,9 @@ fn ring_network_create(num: usize) -> Network {
let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone()));
let mut node = CrdsGossip::default();
node.crds.insert(new.clone(), timestamp()).unwrap();
node.crds
.insert(new.clone(), timestamp(), GossipRoute::LocalMessage)
.unwrap();
let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node)));
(new.label().pubkey(), node)
})
@ -181,7 +198,7 @@ fn ring_network_create(num: usize) -> Network {
end.lock()
.unwrap()
.crds
.insert(start_info, timestamp())
.insert(start_info, timestamp(), GossipRoute::LocalMessage)
.unwrap();
}
Network::new(network)
@ -195,7 +212,9 @@ fn connected_staked_network_create(stakes: &[u64]) -> Network {
let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone()));
let mut node = CrdsGossip::default();
node.crds.insert(new.clone(), timestamp()).unwrap();
node.crds
.insert(new.clone(), timestamp(), GossipRoute::LocalMessage)
.unwrap();
let node = Node::staked(
node_keypair,
contact_info,
@ -220,7 +239,9 @@ fn connected_staked_network_create(stakes: &[u64]) -> Network {
let mut end = end.lock().unwrap();
if keys[k] != *end_pubkey {
let start_info = start_entries[k].clone();
end.crds.insert(start_info, timestamp()).unwrap();
end.crds
.insert(start_info, timestamp(), GossipRoute::LocalMessage)
.unwrap();
}
}
}
@ -706,6 +727,7 @@ fn test_prune_errors() {
.insert(
CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone())),
0,
GossipRoute::LocalMessage,
)
.unwrap();
crds_gossip.refresh_push_active_set(

View File

@ -474,6 +474,7 @@ mod tests {
crate::rpc::create_validator_exit,
solana_gossip::{
contact_info::ContactInfo,
crds::GossipRoute,
crds_value::{CrdsData, CrdsValue, SnapshotHash},
},
solana_ledger::{
@ -759,6 +760,7 @@ mod tests {
],
))),
1,
GossipRoute::LocalMessage,
)
.unwrap();
assert_eq!(rm.health_check(), "ok");
@ -775,6 +777,7 @@ mod tests {
vec![(1000 + health_check_slot_distance - 1, Hash::default())],
))),
1,
GossipRoute::LocalMessage,
)
.unwrap();
assert_eq!(rm.health_check(), "ok");
@ -791,6 +794,7 @@ mod tests {
vec![(1000 + health_check_slot_distance, Hash::default())],
))),
1,
GossipRoute::LocalMessage,
)
.unwrap();
assert_eq!(rm.health_check(), "behind");