retains one node-instance per pubkey (#17187)
crds table retains up to 32 node-instance values per each pubkey. This is so because if there are multiple running instances of the same node, then we want gossip to propagate node-instance values associated with both instances, therefore the corresponding label/key includes the randomly generated token in addition to the pubkey: https://github.com/solana-labs/solana/blob/9c42a89a4/core/src/crds_value.rs#L448 https://github.com/solana-labs/solana/pull/14037 As a result, the number of such values per pubkey are effectively unbounded, requiring custom mitigations implemented in: https://github.com/solana-labs/solana/pull/14467 but still taking redundant extra memory and bandwidth. This commit instead retains only one node-instance per pubkey by extending crds values override logic. If a crds value is of type node-instance, it will always override an existing one with the same key if it has more recent starting timestamp (not wallclock). As a result, gossip will always propagate the node-instance with more recent timestamp. Since the check_duplicate logic will stop the node with older timestamp, this change should preserve existing functionality.
This commit is contained in:
@ -454,10 +454,10 @@ fn retain_staked(values: &mut Vec<CrdsValue>, stakes: &HashMap<Pubkey, u64>) {
|
||||
// Otherwise unstaked voting nodes will show up with no version in
|
||||
// the various dashboards.
|
||||
CrdsData::Version(_) => true,
|
||||
CrdsData::NodeInstance(_) => true,
|
||||
CrdsData::LowestSlot(_, _)
|
||||
| CrdsData::AccountsHashes(_)
|
||||
| CrdsData::LegacyVersion(_)
|
||||
| CrdsData::NodeInstance(_)
|
||||
| CrdsData::DuplicateShred(_, _) => {
|
||||
let stake = stakes.get(&value.pubkey()).copied();
|
||||
stake.unwrap_or_default() >= MIN_STAKE_FOR_GOSSIP
|
||||
|
134
core/src/crds.rs
134
core/src/crds.rs
@ -40,9 +40,6 @@ use std::{
|
||||
};
|
||||
|
||||
const CRDS_SHARDS_BITS: u32 = 8;
|
||||
// Limit number of crds values associated with each unique pubkey. This
|
||||
// excludes crds values which by label design are limited per each pubkey.
|
||||
const MAX_CRDS_VALUES_PER_PUBKEY: usize = 32;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Crds {
|
||||
@ -128,6 +125,14 @@ impl Default for Crds {
|
||||
// Both values should have the same key/label.
|
||||
fn overrides(value: &CrdsValue, other: &VersionedCrdsValue) -> bool {
|
||||
assert_eq!(value.label(), other.value.label(), "labels mismatch!");
|
||||
// Node instances are special cased so that if there are two running
|
||||
// instances of the same node, the more recent start is propagated through
|
||||
// gossip regardless of wallclocks.
|
||||
if let CrdsData::NodeInstance(value) = &value.data {
|
||||
if let Some(out) = value.overrides(&other.value) {
|
||||
return out;
|
||||
}
|
||||
}
|
||||
match value.wallclock().cmp(&other.value.wallclock()) {
|
||||
Ordering::Less => false,
|
||||
Ordering::Greater => true,
|
||||
@ -372,9 +377,7 @@ impl Crds {
|
||||
None => 0,
|
||||
}
|
||||
};
|
||||
let mut old_labels = Vec::new();
|
||||
// Buffer of crds values to be evicted based on their wallclock.
|
||||
let mut recent_unlimited_labels: Vec<(u64 /*wallclock*/, usize /*index*/)> = index
|
||||
index
|
||||
.into_iter()
|
||||
.filter_map(|ix| {
|
||||
let (label, value) = self.table.get_index(*ix).unwrap();
|
||||
@ -383,32 +386,12 @@ impl Crds {
|
||||
.max(local_timestamp)
|
||||
.saturating_add(timeout);
|
||||
if expiry_timestamp <= now {
|
||||
old_labels.push(label.clone());
|
||||
None
|
||||
Some(label.clone())
|
||||
} else {
|
||||
match label.value_space() {
|
||||
Some(_) => None,
|
||||
None => Some((value.value.wallclock(), *ix)),
|
||||
}
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
// Number of values to discard from the buffer:
|
||||
let nth = recent_unlimited_labels
|
||||
.len()
|
||||
.saturating_sub(MAX_CRDS_VALUES_PER_PUBKEY);
|
||||
// Partition on wallclock to discard the older ones.
|
||||
if nth > 0 && nth < recent_unlimited_labels.len() {
|
||||
recent_unlimited_labels.select_nth_unstable(nth);
|
||||
}
|
||||
old_labels.extend(
|
||||
recent_unlimited_labels
|
||||
.split_at(nth)
|
||||
.0
|
||||
.iter()
|
||||
.map(|(_ /*wallclock*/, ix)| self.table.get_index(*ix).unwrap().0.clone()),
|
||||
);
|
||||
old_labels
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
thread_pool.install(|| {
|
||||
self.records
|
||||
@ -541,7 +524,8 @@ mod test {
|
||||
contact_info::ContactInfo,
|
||||
crds_value::{new_rand_timestamp, NodeInstance},
|
||||
};
|
||||
use rand::{thread_rng, Rng};
|
||||
use rand::{thread_rng, Rng, SeedableRng};
|
||||
use rand_chacha::ChaChaRng;
|
||||
use rayon::ThreadPoolBuilder;
|
||||
use solana_sdk::signature::{Keypair, Signer};
|
||||
use std::{collections::HashSet, iter::repeat_with};
|
||||
@ -616,6 +600,62 @@ mod test {
|
||||
assert_eq!(crds.table[&val2.label()].local_timestamp, 3);
|
||||
assert_eq!(crds.table[&val2.label()].ordinal, 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_upsert_node_instance() {
|
||||
const SEED: [u8; 32] = [0x42; 32];
|
||||
let mut rng = ChaChaRng::from_seed(SEED);
|
||||
fn make_crds_value(node: NodeInstance) -> CrdsValue {
|
||||
CrdsValue::new_unsigned(CrdsData::NodeInstance(node))
|
||||
}
|
||||
let now = 1_620_838_767_000;
|
||||
let mut crds = Crds::default();
|
||||
let pubkey = Pubkey::new_unique();
|
||||
let node = NodeInstance::new(&mut rng, pubkey, now);
|
||||
let node = make_crds_value(node);
|
||||
assert_eq!(crds.insert(node, now), Ok(None));
|
||||
// A node-instance with a different key should insert fine even with
|
||||
// older timestamps.
|
||||
let other = NodeInstance::new(&mut rng, Pubkey::new_unique(), now - 1);
|
||||
let other = make_crds_value(other);
|
||||
assert_eq!(crds.insert(other, now), Ok(None));
|
||||
// A node-instance with older timestamp should fail to insert, even if
|
||||
// the wallclock is more recent.
|
||||
let other = NodeInstance::new(&mut rng, pubkey, now - 1);
|
||||
let other = other.with_wallclock(now + 1);
|
||||
let other = make_crds_value(other);
|
||||
let value_hash = hash(&serialize(&other).unwrap());
|
||||
assert_eq!(
|
||||
crds.insert(other, now),
|
||||
Err(CrdsError::InsertFailed(value_hash))
|
||||
);
|
||||
// A node instance with the same timestamp should insert only if the
|
||||
// random token is larger.
|
||||
let mut num_overrides = 0;
|
||||
for _ in 0..100 {
|
||||
let other = NodeInstance::new(&mut rng, pubkey, now);
|
||||
let other = make_crds_value(other);
|
||||
let value_hash = hash(&serialize(&other).unwrap());
|
||||
match crds.insert(other, now) {
|
||||
Ok(Some(_)) => num_overrides += 1,
|
||||
Err(CrdsError::InsertFailed(x)) => assert_eq!(x, value_hash),
|
||||
_ => panic!(),
|
||||
}
|
||||
}
|
||||
assert_eq!(num_overrides, 5);
|
||||
// A node instance with larger timestamp should insert regardless of
|
||||
// its token value.
|
||||
for k in 1..10 {
|
||||
let other = NodeInstance::new(&mut rng, pubkey, now + k);
|
||||
let other = other.with_wallclock(now - 1);
|
||||
let other = make_crds_value(other);
|
||||
match crds.insert(other, now) {
|
||||
Ok(Some(_)) => (),
|
||||
_ => panic!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_old_records_default() {
|
||||
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
|
||||
@ -662,40 +702,6 @@ mod test {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_old_records_unlimited() {
|
||||
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
|
||||
let mut rng = thread_rng();
|
||||
let now = 1_610_034_423_000;
|
||||
let pubkey = Pubkey::new_unique();
|
||||
let mut crds = Crds::default();
|
||||
let mut timeouts = HashMap::new();
|
||||
timeouts.insert(Pubkey::default(), 1);
|
||||
timeouts.insert(pubkey, 180);
|
||||
for _ in 0..1024 {
|
||||
let wallclock = now - rng.gen_range(0, 240);
|
||||
let val = NodeInstance::new(&mut rng, pubkey, wallclock);
|
||||
let val = CrdsData::NodeInstance(val);
|
||||
let val = CrdsValue::new_unsigned(val);
|
||||
assert_eq!(crds.insert(val, now), Ok(None));
|
||||
}
|
||||
let now = now + 1;
|
||||
let labels = crds.find_old_labels(&thread_pool, now, &timeouts);
|
||||
assert_eq!(crds.table.len() - labels.len(), MAX_CRDS_VALUES_PER_PUBKEY);
|
||||
let max_wallclock = labels
|
||||
.iter()
|
||||
.map(|label| crds.lookup(label).unwrap().wallclock())
|
||||
.max()
|
||||
.unwrap();
|
||||
assert!(max_wallclock > now - 180);
|
||||
let labels: HashSet<_> = labels.into_iter().collect();
|
||||
for (label, value) in crds.table.iter() {
|
||||
if !labels.contains(label) {
|
||||
assert!(max_wallclock <= value.value.wallclock());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_remove_default() {
|
||||
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
|
||||
|
@ -20,6 +20,7 @@ use solana_sdk::{
|
||||
use solana_vote_program::vote_transaction::parse_vote_transaction;
|
||||
use std::{
|
||||
borrow::{Borrow, Cow},
|
||||
cmp::Ordering,
|
||||
collections::{hash_map::Entry, BTreeSet, HashMap},
|
||||
fmt,
|
||||
};
|
||||
@ -405,7 +406,7 @@ impl NodeInstance {
|
||||
}
|
||||
|
||||
// Clones the value with an updated wallclock.
|
||||
pub fn with_wallclock(&self, now: u64) -> Self {
|
||||
pub(crate) fn with_wallclock(&self, now: u64) -> Self {
|
||||
Self {
|
||||
wallclock: now,
|
||||
..*self
|
||||
@ -414,7 +415,7 @@ impl NodeInstance {
|
||||
|
||||
// Returns true if the crds-value is a duplicate instance
|
||||
// of this node, with a more recent timestamp.
|
||||
pub fn check_duplicate(&self, other: &CrdsValue) -> bool {
|
||||
pub(crate) fn check_duplicate(&self, other: &CrdsValue) -> bool {
|
||||
match &other.data {
|
||||
CrdsData::NodeInstance(other) => {
|
||||
self.token != other.token
|
||||
@ -424,6 +425,26 @@ impl NodeInstance {
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
// Returns None if tokens are the same or other is not a node-instance from
|
||||
// the same owner. Otherwise returns true if self has more recent timestamp
|
||||
// than other, and so overrides it.
|
||||
pub(crate) fn overrides(&self, other: &CrdsValue) -> Option<bool> {
|
||||
let other = match &other.data {
|
||||
CrdsData::NodeInstance(other) => other,
|
||||
_ => return None,
|
||||
};
|
||||
if self.token == other.token || self.from != other.from {
|
||||
return None;
|
||||
}
|
||||
match self.timestamp.cmp(&other.timestamp) {
|
||||
Ordering::Less => Some(false),
|
||||
Ordering::Greater => Some(true),
|
||||
// Ties should be broken in a deterministic way across the cluster,
|
||||
// so that nodes propagate the same value through gossip.
|
||||
Ordering::Equal => Some(other.token < self.token),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Sanitize for NodeInstance {
|
||||
@ -445,7 +466,7 @@ pub enum CrdsValueLabel {
|
||||
AccountsHashes(Pubkey),
|
||||
LegacyVersion(Pubkey),
|
||||
Version(Pubkey),
|
||||
NodeInstance(Pubkey, u64 /*token*/),
|
||||
NodeInstance(Pubkey),
|
||||
DuplicateShred(DuplicateShredIndex, Pubkey),
|
||||
}
|
||||
|
||||
@ -460,7 +481,7 @@ impl fmt::Display for CrdsValueLabel {
|
||||
CrdsValueLabel::AccountsHashes(_) => write!(f, "AccountsHashes({})", self.pubkey()),
|
||||
CrdsValueLabel::LegacyVersion(_) => write!(f, "LegacyVersion({})", self.pubkey()),
|
||||
CrdsValueLabel::Version(_) => write!(f, "Version({})", self.pubkey()),
|
||||
CrdsValueLabel::NodeInstance(pk, token) => write!(f, "NodeInstance({}, {})", pk, token),
|
||||
CrdsValueLabel::NodeInstance(pk) => write!(f, "NodeInstance({})", pk),
|
||||
CrdsValueLabel::DuplicateShred(ix, pk) => write!(f, "DuplicateShred({}, {})", ix, pk),
|
||||
}
|
||||
}
|
||||
@ -477,27 +498,10 @@ impl CrdsValueLabel {
|
||||
CrdsValueLabel::AccountsHashes(p) => *p,
|
||||
CrdsValueLabel::LegacyVersion(p) => *p,
|
||||
CrdsValueLabel::Version(p) => *p,
|
||||
CrdsValueLabel::NodeInstance(p, _ /*token*/) => *p,
|
||||
CrdsValueLabel::NodeInstance(p) => *p,
|
||||
CrdsValueLabel::DuplicateShred(_, p) => *p,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns number of possible distinct labels of the same type for
|
||||
/// a fixed pubkey, and None if that is practically unlimited.
|
||||
pub(crate) fn value_space(&self) -> Option<usize> {
|
||||
match self {
|
||||
CrdsValueLabel::ContactInfo(_) => Some(1),
|
||||
CrdsValueLabel::Vote(_, _) => Some(MAX_VOTES as usize),
|
||||
CrdsValueLabel::LowestSlot(_) => Some(1),
|
||||
CrdsValueLabel::SnapshotHashes(_) => Some(1),
|
||||
CrdsValueLabel::EpochSlots(_, _) => Some(MAX_EPOCH_SLOTS as usize),
|
||||
CrdsValueLabel::AccountsHashes(_) => Some(1),
|
||||
CrdsValueLabel::LegacyVersion(_) => Some(1),
|
||||
CrdsValueLabel::Version(_) => Some(1),
|
||||
CrdsValueLabel::NodeInstance(_, _) => None,
|
||||
CrdsValueLabel::DuplicateShred(_, _) => Some(MAX_DUPLICATE_SHREDS as usize),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl CrdsValue {
|
||||
@ -570,7 +574,7 @@ impl CrdsValue {
|
||||
CrdsData::EpochSlots(ix, _) => CrdsValueLabel::EpochSlots(*ix, self.pubkey()),
|
||||
CrdsData::LegacyVersion(_) => CrdsValueLabel::LegacyVersion(self.pubkey()),
|
||||
CrdsData::Version(_) => CrdsValueLabel::Version(self.pubkey()),
|
||||
CrdsData::NodeInstance(node) => CrdsValueLabel::NodeInstance(node.from, node.token),
|
||||
CrdsData::NodeInstance(node) => CrdsValueLabel::NodeInstance(node.from),
|
||||
CrdsData::DuplicateShred(ix, shred) => CrdsValueLabel::DuplicateShred(*ix, shred.from),
|
||||
}
|
||||
}
|
||||
@ -931,7 +935,7 @@ mod test {
|
||||
token: rng.gen(),
|
||||
..node
|
||||
};
|
||||
assert_ne!(
|
||||
assert_eq!(
|
||||
make_crds_value(node).label(),
|
||||
make_crds_value(other).label()
|
||||
);
|
||||
@ -946,20 +950,31 @@ mod test {
|
||||
let mut rng = rand::thread_rng();
|
||||
let pubkey = Pubkey::new_unique();
|
||||
let node = NodeInstance::new(&mut rng, pubkey, now);
|
||||
let node_crds = make_crds_value(node.clone());
|
||||
// Same token is not a duplicate.
|
||||
assert!(!node.check_duplicate(&make_crds_value(NodeInstance {
|
||||
let other = NodeInstance {
|
||||
from: pubkey,
|
||||
wallclock: now + 1,
|
||||
timestamp: now + 1,
|
||||
token: node.token,
|
||||
})));
|
||||
};
|
||||
let other_crds = make_crds_value(other.clone());
|
||||
assert!(!node.check_duplicate(&other_crds));
|
||||
assert!(!other.check_duplicate(&node_crds));
|
||||
assert_eq!(node.overrides(&other_crds), None);
|
||||
assert_eq!(other.overrides(&node_crds), None);
|
||||
// Older timestamp is not a duplicate.
|
||||
assert!(!node.check_duplicate(&make_crds_value(NodeInstance {
|
||||
let other = NodeInstance {
|
||||
from: pubkey,
|
||||
wallclock: now + 1,
|
||||
timestamp: now - 1,
|
||||
token: rng.gen(),
|
||||
})));
|
||||
};
|
||||
let other_crds = make_crds_value(other.clone());
|
||||
assert!(!node.check_duplicate(&other_crds));
|
||||
assert!(other.check_duplicate(&node_crds));
|
||||
assert_eq!(node.overrides(&other_crds), Some(true));
|
||||
assert_eq!(other.overrides(&node_crds), Some(false));
|
||||
// Updated wallclock is not a duplicate.
|
||||
let other = node.with_wallclock(now + 8);
|
||||
assert_eq!(
|
||||
@ -971,27 +986,56 @@ mod test {
|
||||
token: node.token,
|
||||
}
|
||||
);
|
||||
assert!(!node.check_duplicate(&make_crds_value(other)));
|
||||
// Duplicate instance.
|
||||
assert!(node.check_duplicate(&make_crds_value(NodeInstance {
|
||||
from: pubkey,
|
||||
wallclock: 0,
|
||||
timestamp: now,
|
||||
token: rng.gen(),
|
||||
})));
|
||||
let other_crds = make_crds_value(other.clone());
|
||||
assert!(!node.check_duplicate(&other_crds));
|
||||
assert!(!other.check_duplicate(&node_crds));
|
||||
assert_eq!(node.overrides(&other_crds), None);
|
||||
assert_eq!(other.overrides(&node_crds), None);
|
||||
// Duplicate instance; tied timestamp.
|
||||
for _ in 0..10 {
|
||||
let other = NodeInstance {
|
||||
from: pubkey,
|
||||
wallclock: 0,
|
||||
timestamp: now,
|
||||
token: rng.gen(),
|
||||
};
|
||||
let other_crds = make_crds_value(other.clone());
|
||||
assert!(node.check_duplicate(&other_crds));
|
||||
assert!(other.check_duplicate(&node_crds));
|
||||
assert_eq!(node.overrides(&other_crds), Some(other.token < node.token));
|
||||
assert_eq!(other.overrides(&node_crds), Some(node.token < other.token));
|
||||
}
|
||||
// Duplicate instance; more recent timestamp.
|
||||
for _ in 0..10 {
|
||||
let other = NodeInstance {
|
||||
from: pubkey,
|
||||
wallclock: 0,
|
||||
timestamp: now + 1,
|
||||
token: rng.gen(),
|
||||
};
|
||||
let other_crds = make_crds_value(other.clone());
|
||||
assert!(node.check_duplicate(&other_crds));
|
||||
assert!(!other.check_duplicate(&node_crds));
|
||||
assert_eq!(node.overrides(&other_crds), Some(false));
|
||||
assert_eq!(other.overrides(&node_crds), Some(true));
|
||||
}
|
||||
// Different pubkey is not a duplicate.
|
||||
assert!(!node.check_duplicate(&make_crds_value(NodeInstance {
|
||||
let other = NodeInstance {
|
||||
from: Pubkey::new_unique(),
|
||||
wallclock: now + 1,
|
||||
timestamp: now + 1,
|
||||
token: rng.gen(),
|
||||
})));
|
||||
};
|
||||
let other_crds = make_crds_value(other.clone());
|
||||
assert!(!node.check_duplicate(&other_crds));
|
||||
assert!(!other.check_duplicate(&node_crds));
|
||||
assert_eq!(node.overrides(&other_crds), None);
|
||||
assert_eq!(other.overrides(&node_crds), None);
|
||||
// Differnt crds value is not a duplicate.
|
||||
assert!(
|
||||
!node.check_duplicate(&CrdsValue::new_unsigned(CrdsData::ContactInfo(
|
||||
ContactInfo::new_rand(&mut rng, Some(pubkey))
|
||||
)))
|
||||
);
|
||||
let other = ContactInfo::new_rand(&mut rng, Some(pubkey));
|
||||
let other = CrdsValue::new_unsigned(CrdsData::ContactInfo(other));
|
||||
assert!(!node.check_duplicate(&other));
|
||||
assert_eq!(node.overrides(&other), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
Reference in New Issue
Block a user