Crds values of nodes with different shred versions are creeping into
gossip table resulting in runtime issues as the one addressed in:
https://github.com/solana-labs/solana/pull/17899
This commit works towards enforcing more checks and filtering based on
shred version by adding necessary mapping and api to gossip table.
Once populated, pubkey->shred-version mapping persists as long as there
are any values associated with the pubkey.
(cherry picked from commit 5a99fa3790
)
Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
@ -18,6 +18,7 @@ indexmap = { version = "1.5", features = ["rayon"] }
|
|||||||
itertools = "0.9.0"
|
itertools = "0.9.0"
|
||||||
log = "0.4.11"
|
log = "0.4.11"
|
||||||
lru = "0.6.1"
|
lru = "0.6.1"
|
||||||
|
matches = "0.1.8"
|
||||||
num-traits = "0.2"
|
num-traits = "0.2"
|
||||||
rand = "0.7.0"
|
rand = "0.7.0"
|
||||||
rand_chacha = "0.2.2"
|
rand_chacha = "0.2.2"
|
||||||
|
@ -1173,16 +1173,13 @@ impl ClusterInfo {
|
|||||||
/// Returns epoch-slots inserted since the given cursor.
|
/// Returns epoch-slots inserted since the given cursor.
|
||||||
/// Excludes entries from nodes with unkown or different shred version.
|
/// Excludes entries from nodes with unkown or different shred version.
|
||||||
pub fn get_epoch_slots(&self, cursor: &mut Cursor) -> Vec<EpochSlots> {
|
pub fn get_epoch_slots(&self, cursor: &mut Cursor) -> Vec<EpochSlots> {
|
||||||
let self_shred_version = self.my_shred_version();
|
let self_shred_version = Some(self.my_shred_version());
|
||||||
let gossip = self.gossip.read().unwrap();
|
let gossip = self.gossip.read().unwrap();
|
||||||
let entries = gossip.crds.get_epoch_slots(cursor);
|
let entries = gossip.crds.get_epoch_slots(cursor);
|
||||||
entries
|
entries
|
||||||
.filter(
|
.filter(|entry| {
|
||||||
|entry| match gossip.crds.get_contact_info(entry.value.pubkey()) {
|
gossip.crds.get_shred_version(&entry.value.pubkey()) == self_shred_version
|
||||||
Some(node) => node.shred_version == self_shred_version,
|
})
|
||||||
None => false,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
.map(|entry| match &entry.value.data {
|
.map(|entry| match &entry.value.data {
|
||||||
CrdsData::EpochSlots(_, slots) => slots.clone(),
|
CrdsData::EpochSlots(_, slots) => slots.clone(),
|
||||||
_ => panic!("this should not happen!"),
|
_ => panic!("this should not happen!"),
|
||||||
@ -2213,9 +2210,10 @@ impl ClusterInfo {
|
|||||||
) -> (usize, usize, usize) {
|
) -> (usize, usize, usize) {
|
||||||
let len = crds_values.len();
|
let len = crds_values.len();
|
||||||
trace!("PullResponse me: {} from: {} len={}", self.id, from, len);
|
trace!("PullResponse me: {} from: {} len={}", self.id, from, len);
|
||||||
let shred_version = self
|
let shred_version = {
|
||||||
.lookup_contact_info(from, |ci| ci.shred_version)
|
let gossip = self.gossip.read().unwrap();
|
||||||
.unwrap_or(0);
|
gossip.crds.get_shred_version(from).unwrap_or_default()
|
||||||
|
};
|
||||||
Self::filter_by_shred_version(
|
Self::filter_by_shred_version(
|
||||||
from,
|
from,
|
||||||
&mut crds_values,
|
&mut crds_values,
|
||||||
@ -2365,10 +2363,7 @@ impl ClusterInfo {
|
|||||||
let gossip = self.gossip.read().unwrap();
|
let gossip = self.gossip.read().unwrap();
|
||||||
messages
|
messages
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(from, _)| match gossip.crds.get_contact_info(*from) {
|
.map(|(from, _)| gossip.crds.get_shred_version(from).unwrap_or_default())
|
||||||
None => 0,
|
|
||||||
Some(info) => info.shred_version,
|
|
||||||
})
|
|
||||||
.collect()
|
.collect()
|
||||||
};
|
};
|
||||||
// Filter out data if the origin has different shred version.
|
// Filter out data if the origin has different shred version.
|
||||||
|
@ -35,6 +35,7 @@ use {
|
|||||||
map::{rayon::ParValues, Entry, IndexMap},
|
map::{rayon::ParValues, Entry, IndexMap},
|
||||||
set::IndexSet,
|
set::IndexSet,
|
||||||
},
|
},
|
||||||
|
matches::debug_assert_matches,
|
||||||
rayon::{prelude::*, ThreadPool},
|
rayon::{prelude::*, ThreadPool},
|
||||||
solana_sdk::{
|
solana_sdk::{
|
||||||
hash::{hash, Hash},
|
hash::{hash, Hash},
|
||||||
@ -66,6 +67,8 @@ pub struct Crds {
|
|||||||
entries: BTreeMap<u64 /*insert order*/, usize /*index*/>,
|
entries: BTreeMap<u64 /*insert order*/, usize /*index*/>,
|
||||||
// Hash of recently purged values.
|
// Hash of recently purged values.
|
||||||
purged: VecDeque<(Hash, u64 /*timestamp*/)>,
|
purged: VecDeque<(Hash, u64 /*timestamp*/)>,
|
||||||
|
// Mapping from nodes' pubkeys to their respective shred-version.
|
||||||
|
shred_versions: HashMap<Pubkey, u16>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(PartialEq, Debug)]
|
#[derive(PartialEq, Debug)]
|
||||||
@ -125,6 +128,7 @@ impl Default for Crds {
|
|||||||
records: HashMap::default(),
|
records: HashMap::default(),
|
||||||
entries: BTreeMap::default(),
|
entries: BTreeMap::default(),
|
||||||
purged: VecDeque::default(),
|
purged: VecDeque::default(),
|
||||||
|
shred_versions: HashMap::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -173,9 +177,10 @@ impl Crds {
|
|||||||
Entry::Vacant(entry) => {
|
Entry::Vacant(entry) => {
|
||||||
let entry_index = entry.index();
|
let entry_index = entry.index();
|
||||||
self.shards.insert(entry_index, &value);
|
self.shards.insert(entry_index, &value);
|
||||||
match value.value.data {
|
match &value.value.data {
|
||||||
CrdsData::ContactInfo(_) => {
|
CrdsData::ContactInfo(node) => {
|
||||||
self.nodes.insert(entry_index);
|
self.nodes.insert(entry_index);
|
||||||
|
self.shred_versions.insert(pubkey, node.shred_version);
|
||||||
}
|
}
|
||||||
CrdsData::Vote(_, _) => {
|
CrdsData::Vote(_, _) => {
|
||||||
self.votes.insert(value.ordinal, entry_index);
|
self.votes.insert(value.ordinal, entry_index);
|
||||||
@ -195,7 +200,13 @@ impl Crds {
|
|||||||
let entry_index = entry.index();
|
let entry_index = entry.index();
|
||||||
self.shards.remove(entry_index, entry.get());
|
self.shards.remove(entry_index, entry.get());
|
||||||
self.shards.insert(entry_index, &value);
|
self.shards.insert(entry_index, &value);
|
||||||
match value.value.data {
|
match &value.value.data {
|
||||||
|
CrdsData::ContactInfo(node) => {
|
||||||
|
self.shred_versions.insert(pubkey, node.shred_version);
|
||||||
|
// self.nodes does not need to be updated since the
|
||||||
|
// entry at this index was and stays contact-info.
|
||||||
|
debug_assert_matches!(entry.get().value.data, CrdsData::ContactInfo(_));
|
||||||
|
}
|
||||||
CrdsData::Vote(_, _) => {
|
CrdsData::Vote(_, _) => {
|
||||||
self.votes.remove(&entry.get().ordinal);
|
self.votes.remove(&entry.get().ordinal);
|
||||||
self.votes.insert(value.ordinal, entry_index);
|
self.votes.insert(value.ordinal, entry_index);
|
||||||
@ -239,6 +250,10 @@ impl Crds {
|
|||||||
self.table.get(&label)?.value.contact_info()
|
self.table.get(&label)?.value.contact_info()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn get_shred_version(&self, pubkey: &Pubkey) -> Option<u16> {
|
||||||
|
self.shred_versions.get(pubkey).copied()
|
||||||
|
}
|
||||||
|
|
||||||
pub fn get_lowest_slot(&self, pubkey: Pubkey) -> Option<&LowestSlot> {
|
pub fn get_lowest_slot(&self, pubkey: Pubkey) -> Option<&LowestSlot> {
|
||||||
let lable = CrdsValueLabel::LowestSlot(pubkey);
|
let lable = CrdsValueLabel::LowestSlot(pubkey);
|
||||||
self.table.get(&lable)?.value.lowest_slot()
|
self.table.get(&lable)?.value.lowest_slot()
|
||||||
@ -449,6 +464,7 @@ impl Crds {
|
|||||||
records_entry.get_mut().swap_remove(&index);
|
records_entry.get_mut().swap_remove(&index);
|
||||||
if records_entry.get().is_empty() {
|
if records_entry.get().is_empty() {
|
||||||
records_entry.remove();
|
records_entry.remove();
|
||||||
|
self.shred_versions.remove(&pubkey);
|
||||||
}
|
}
|
||||||
// If index == self.table.len(), then the removed entry was the last
|
// If index == self.table.len(), then the removed entry was the last
|
||||||
// entry in the table, in which case no other keys were modified.
|
// entry in the table, in which case no other keys were modified.
|
||||||
@ -544,17 +560,20 @@ impl Crds {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod tests {
|
||||||
use {
|
use {
|
||||||
super::*,
|
super::*,
|
||||||
crate::{
|
crate::{
|
||||||
contact_info::ContactInfo,
|
contact_info::ContactInfo,
|
||||||
crds_value::{new_rand_timestamp, NodeInstance},
|
crds_value::{new_rand_timestamp, NodeInstance, SnapshotHash},
|
||||||
},
|
},
|
||||||
rand::{thread_rng, Rng, SeedableRng},
|
rand::{thread_rng, Rng, SeedableRng},
|
||||||
rand_chacha::ChaChaRng,
|
rand_chacha::ChaChaRng,
|
||||||
rayon::ThreadPoolBuilder,
|
rayon::ThreadPoolBuilder,
|
||||||
solana_sdk::signature::{Keypair, Signer},
|
solana_sdk::{
|
||||||
|
signature::{Keypair, Signer},
|
||||||
|
timing::timestamp,
|
||||||
|
},
|
||||||
std::{collections::HashSet, iter::repeat_with},
|
std::{collections::HashSet, iter::repeat_with},
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -1017,6 +1036,53 @@ mod test {
|
|||||||
assert!(crds.records.is_empty());
|
assert!(crds.records.is_empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_get_shred_version() {
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
let pubkey = Pubkey::new_unique();
|
||||||
|
let mut crds = Crds::default();
|
||||||
|
assert_eq!(crds.get_shred_version(&pubkey), None);
|
||||||
|
// Initial insertion of a node with shred version:
|
||||||
|
let mut node = ContactInfo::new_rand(&mut rng, Some(pubkey));
|
||||||
|
let wallclock = node.wallclock;
|
||||||
|
node.shred_version = 42;
|
||||||
|
let node = CrdsData::ContactInfo(node);
|
||||||
|
let node = CrdsValue::new_unsigned(node);
|
||||||
|
assert_eq!(crds.insert(node, timestamp()), Ok(()));
|
||||||
|
assert_eq!(crds.get_shred_version(&pubkey), Some(42));
|
||||||
|
// An outdated value should not update shred-version:
|
||||||
|
let mut node = ContactInfo::new_rand(&mut rng, Some(pubkey));
|
||||||
|
node.wallclock = wallclock - 1; // outdated.
|
||||||
|
node.shred_version = 8;
|
||||||
|
let node = CrdsData::ContactInfo(node);
|
||||||
|
let node = CrdsValue::new_unsigned(node);
|
||||||
|
assert_eq!(crds.insert(node, timestamp()), Err(CrdsError::InsertFailed));
|
||||||
|
assert_eq!(crds.get_shred_version(&pubkey), Some(42));
|
||||||
|
// Update shred version:
|
||||||
|
let mut node = ContactInfo::new_rand(&mut rng, Some(pubkey));
|
||||||
|
node.wallclock = wallclock + 1; // so that it overrides the prev one.
|
||||||
|
node.shred_version = 8;
|
||||||
|
let node = CrdsData::ContactInfo(node);
|
||||||
|
let node = CrdsValue::new_unsigned(node);
|
||||||
|
assert_eq!(crds.insert(node, timestamp()), Ok(()));
|
||||||
|
assert_eq!(crds.get_shred_version(&pubkey), Some(8));
|
||||||
|
// Add other crds values with the same pubkey.
|
||||||
|
let val = SnapshotHash::new_rand(&mut rng, Some(pubkey));
|
||||||
|
let val = CrdsData::SnapshotHashes(val);
|
||||||
|
let val = CrdsValue::new_unsigned(val);
|
||||||
|
assert_eq!(crds.insert(val, timestamp()), Ok(()));
|
||||||
|
assert_eq!(crds.get_shred_version(&pubkey), Some(8));
|
||||||
|
// Remove contact-info. Shred version should stay there since there
|
||||||
|
// are still values associated with the pubkey.
|
||||||
|
crds.remove(&CrdsValueLabel::ContactInfo(pubkey), timestamp());
|
||||||
|
assert_eq!(crds.get_contact_info(pubkey), None);
|
||||||
|
assert_eq!(crds.get_shred_version(&pubkey), Some(8));
|
||||||
|
// Remove the remaining entry with the same pubkey.
|
||||||
|
crds.remove(&CrdsValueLabel::SnapshotHashes(pubkey), timestamp());
|
||||||
|
assert_eq!(crds.get_records(&pubkey).count(), 0);
|
||||||
|
assert_eq!(crds.get_shred_version(&pubkey), None);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
#[allow(clippy::needless_collect)]
|
#[allow(clippy::needless_collect)]
|
||||||
fn test_drop() {
|
fn test_drop() {
|
||||||
|
Reference in New Issue
Block a user