* checks for duplicate validator instances using gossip (cherry picked from commit8cd5eb9863
) # Conflicts: # core/src/cluster_info.rs # core/src/crds_value.rs # core/src/result.rs * pushes node-instance along with version early in gossip (cherry picked from commit542198180a
) # Conflicts: # core/src/cluster_info.rs * removes RwLock on ClusterInfo.instance (cherry picked from commit895d7d6a65
) # Conflicts: # core/src/cluster_info.rs * std::process::exit to kill all threads (cherry picked from commit1d267eae6b
) * removes backport merge conflicts Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
@ -18,8 +18,8 @@ use crate::{
|
|||||||
crds_gossip_error::CrdsGossipError,
|
crds_gossip_error::CrdsGossipError,
|
||||||
crds_gossip_pull::{CrdsFilter, ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS},
|
crds_gossip_pull::{CrdsFilter, ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS},
|
||||||
crds_value::{
|
crds_value::{
|
||||||
self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, LowestSlot, SnapshotHash,
|
self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, LowestSlot, NodeInstance,
|
||||||
Version, Vote, MAX_WALLCLOCK,
|
SnapshotHash, Version, Vote, MAX_WALLCLOCK,
|
||||||
},
|
},
|
||||||
data_budget::DataBudget,
|
data_budget::DataBudget,
|
||||||
epoch_slots::EpochSlots,
|
epoch_slots::EpochSlots,
|
||||||
@ -260,6 +260,7 @@ pub struct ClusterInfo {
|
|||||||
id: Pubkey,
|
id: Pubkey,
|
||||||
stats: GossipStats,
|
stats: GossipStats,
|
||||||
socket: UdpSocket,
|
socket: UdpSocket,
|
||||||
|
instance: NodeInstance,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for ClusterInfo {
|
impl Default for ClusterInfo {
|
||||||
@ -363,7 +364,7 @@ pub fn make_accounts_hashes_message(
|
|||||||
type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;
|
type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;
|
||||||
|
|
||||||
// TODO These messages should go through the gpu pipeline for spam filtering
|
// TODO These messages should go through the gpu pipeline for spam filtering
|
||||||
#[frozen_abi(digest = "CXtFSsGZga9KPs22QhkTmeBd2KsJe34xdXrbRuW34xiS")]
|
#[frozen_abi(digest = "GrTTbzFi3Psb1Mn5tdxj5B1szC2647PYZv2Zdhevxrtz")]
|
||||||
#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
|
#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
|
||||||
#[allow(clippy::large_enum_variant)]
|
#[allow(clippy::large_enum_variant)]
|
||||||
enum Protocol {
|
enum Protocol {
|
||||||
@ -493,6 +494,7 @@ impl ClusterInfo {
|
|||||||
id,
|
id,
|
||||||
stats: GossipStats::default(),
|
stats: GossipStats::default(),
|
||||||
socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
|
socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
|
||||||
|
instance: NodeInstance::new(id, timestamp()),
|
||||||
};
|
};
|
||||||
{
|
{
|
||||||
let mut gossip = me.gossip.write().unwrap();
|
let mut gossip = me.gossip.write().unwrap();
|
||||||
@ -520,6 +522,7 @@ impl ClusterInfo {
|
|||||||
id: *new_id,
|
id: *new_id,
|
||||||
stats: GossipStats::default(),
|
stats: GossipStats::default(),
|
||||||
socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
|
socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
|
||||||
|
instance: NodeInstance::new(*new_id, timestamp()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -540,11 +543,16 @@ impl ClusterInfo {
|
|||||||
) {
|
) {
|
||||||
let now = timestamp();
|
let now = timestamp();
|
||||||
self.my_contact_info.write().unwrap().wallclock = now;
|
self.my_contact_info.write().unwrap().wallclock = now;
|
||||||
let entry =
|
let entries: Vec<_> = vec![
|
||||||
CrdsValue::new_signed(CrdsData::ContactInfo(self.my_contact_info()), &self.keypair);
|
CrdsData::ContactInfo(self.my_contact_info()),
|
||||||
|
CrdsData::NodeInstance(self.instance.with_wallclock(now)),
|
||||||
|
]
|
||||||
|
.into_iter()
|
||||||
|
.map(|v| CrdsValue::new_signed(v, &self.keypair))
|
||||||
|
.collect();
|
||||||
let mut w_gossip = self.gossip.write().unwrap();
|
let mut w_gossip = self.gossip.write().unwrap();
|
||||||
w_gossip.refresh_push_active_set(stakes, gossip_validators);
|
w_gossip.refresh_push_active_set(stakes, gossip_validators);
|
||||||
w_gossip.process_push_message(&self.id(), vec![entry], now);
|
w_gossip.process_push_message(&self.id(), entries, now);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO kill insert_info, only used by tests
|
// TODO kill insert_info, only used by tests
|
||||||
@ -1720,9 +1728,14 @@ impl ClusterInfo {
|
|||||||
let mut last_contact_info_trace = timestamp();
|
let mut last_contact_info_trace = timestamp();
|
||||||
let mut adopt_shred_version = self.my_shred_version() == 0;
|
let mut adopt_shred_version = self.my_shred_version() == 0;
|
||||||
let recycler = PacketsRecycler::default();
|
let recycler = PacketsRecycler::default();
|
||||||
|
let crds_data = vec![
|
||||||
let message = CrdsData::Version(Version::new(self.id()));
|
CrdsData::Version(Version::new(self.id())),
|
||||||
self.push_message(CrdsValue::new_signed(message, &self.keypair));
|
CrdsData::NodeInstance(self.instance.with_wallclock(timestamp())),
|
||||||
|
];
|
||||||
|
for value in crds_data {
|
||||||
|
let value = CrdsValue::new_signed(value, &self.keypair);
|
||||||
|
self.push_message(value);
|
||||||
|
}
|
||||||
let mut generate_pull_requests = true;
|
let mut generate_pull_requests = true;
|
||||||
loop {
|
loop {
|
||||||
let start = timestamp();
|
let start = timestamp();
|
||||||
@ -2366,7 +2379,7 @@ impl ClusterInfo {
|
|||||||
stakes: HashMap<Pubkey, u64>,
|
stakes: HashMap<Pubkey, u64>,
|
||||||
feature_set: Option<&FeatureSet>,
|
feature_set: Option<&FeatureSet>,
|
||||||
epoch_time_ms: u64,
|
epoch_time_ms: u64,
|
||||||
) {
|
) -> Result<()> {
|
||||||
let mut timer = Measure::start("process_gossip_packets_time");
|
let mut timer = Measure::start("process_gossip_packets_time");
|
||||||
let packets: Vec<_> = thread_pool.install(|| {
|
let packets: Vec<_> = thread_pool.install(|| {
|
||||||
requests
|
requests
|
||||||
@ -2381,6 +2394,16 @@ impl ClusterInfo {
|
|||||||
})
|
})
|
||||||
.collect()
|
.collect()
|
||||||
});
|
});
|
||||||
|
// Check if there is a duplicate instance of
|
||||||
|
// this node with more recent timestamp.
|
||||||
|
let check_duplicate_instance = |values: &[CrdsValue]| {
|
||||||
|
for value in values {
|
||||||
|
if self.instance.check_duplicate(value) {
|
||||||
|
return Err(Error::DuplicateNodeInstance);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
};
|
||||||
// Split packets based on their types.
|
// Split packets based on their types.
|
||||||
let mut pull_requests = vec![];
|
let mut pull_requests = vec![];
|
||||||
let mut pull_responses = vec![];
|
let mut pull_responses = vec![];
|
||||||
@ -2393,8 +2416,14 @@ impl ClusterInfo {
|
|||||||
Protocol::PullRequest(filter, caller) => {
|
Protocol::PullRequest(filter, caller) => {
|
||||||
pull_requests.push((from_addr, filter, caller))
|
pull_requests.push((from_addr, filter, caller))
|
||||||
}
|
}
|
||||||
Protocol::PullResponse(from, data) => pull_responses.push((from, data)),
|
Protocol::PullResponse(from, data) => {
|
||||||
Protocol::PushMessage(from, data) => push_messages.push((from, data)),
|
check_duplicate_instance(&data)?;
|
||||||
|
pull_responses.push((from, data));
|
||||||
|
}
|
||||||
|
Protocol::PushMessage(from, data) => {
|
||||||
|
check_duplicate_instance(&data)?;
|
||||||
|
push_messages.push((from, data));
|
||||||
|
}
|
||||||
Protocol::PruneMessage(from, data) => prune_messages.push((from, data)),
|
Protocol::PruneMessage(from, data) => prune_messages.push((from, data)),
|
||||||
Protocol::PingMessage(ping) => ping_messages.push((from_addr, ping)),
|
Protocol::PingMessage(ping) => ping_messages.push((from_addr, ping)),
|
||||||
Protocol::PongMessage(pong) => pong_messages.push((from_addr, pong)),
|
Protocol::PongMessage(pong) => pong_messages.push((from_addr, pong)),
|
||||||
@ -2416,6 +2445,7 @@ impl ClusterInfo {
|
|||||||
self.stats
|
self.stats
|
||||||
.process_gossip_packets_time
|
.process_gossip_packets_time
|
||||||
.add_measure(&mut timer);
|
.add_measure(&mut timer);
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process messages from the network
|
/// Process messages from the network
|
||||||
@ -2467,7 +2497,7 @@ impl ClusterInfo {
|
|||||||
stakes,
|
stakes,
|
||||||
feature_set.as_deref(),
|
feature_set.as_deref(),
|
||||||
epoch_time_ms,
|
epoch_time_ms,
|
||||||
);
|
)?;
|
||||||
|
|
||||||
self.print_reset_stats(last_print);
|
self.print_reset_stats(last_print);
|
||||||
|
|
||||||
@ -2697,26 +2727,37 @@ impl ClusterInfo {
|
|||||||
.build()
|
.build()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let mut last_print = Instant::now();
|
let mut last_print = Instant::now();
|
||||||
loop {
|
while !exit.load(Ordering::Relaxed) {
|
||||||
let e = self.run_listen(
|
if let Err(err) = self.run_listen(
|
||||||
&recycler,
|
&recycler,
|
||||||
bank_forks.as_ref(),
|
bank_forks.as_ref(),
|
||||||
&requests_receiver,
|
&requests_receiver,
|
||||||
&response_sender,
|
&response_sender,
|
||||||
&thread_pool,
|
&thread_pool,
|
||||||
&mut last_print,
|
&mut last_print,
|
||||||
);
|
) {
|
||||||
if exit.load(Ordering::Relaxed) {
|
match err {
|
||||||
return;
|
Error::RecvTimeoutError(_) => {
|
||||||
}
|
let table_size = self.gossip.read().unwrap().crds.table.len();
|
||||||
if e.is_err() {
|
|
||||||
let r_gossip = self.gossip.read().unwrap();
|
|
||||||
debug!(
|
debug!(
|
||||||
"{}: run_listen timeout, table size: {}",
|
"{}: run_listen timeout, table size: {}",
|
||||||
self.id(),
|
self.id(),
|
||||||
r_gossip.crds.table.len()
|
table_size,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
Error::DuplicateNodeInstance => {
|
||||||
|
error!(
|
||||||
|
"duplicate running instances of the same validator node: {}",
|
||||||
|
self.id()
|
||||||
|
);
|
||||||
|
exit.store(true, Ordering::Relaxed);
|
||||||
|
// TODO: Pass through ValidatorExit here so
|
||||||
|
// that this will exit cleanly.
|
||||||
|
std::process::exit(1);
|
||||||
|
}
|
||||||
|
_ => error!("gossip run_listen failed: {}", err),
|
||||||
|
}
|
||||||
|
}
|
||||||
thread_mem_usage::datapoint("solana-listen");
|
thread_mem_usage::datapoint("solana-listen");
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -124,6 +124,15 @@ impl ContactInfo {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// New random ContactInfo for tests and simulations.
|
||||||
|
#[cfg(test)]
|
||||||
|
pub(crate) fn new_rand<R: rand::Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> Self {
|
||||||
|
let delay = 10 * 60 * 1000; // 10 minutes
|
||||||
|
let now = timestamp() - delay + rng.gen_range(0, 2 * delay);
|
||||||
|
let pubkey = pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand);
|
||||||
|
ContactInfo::new_localhost(&pubkey, now)
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
/// ContactInfo with multicast addresses for adversarial testing.
|
/// ContactInfo with multicast addresses for adversarial testing.
|
||||||
pub fn new_multicast() -> Self {
|
pub fn new_multicast() -> Self {
|
||||||
|
@ -248,7 +248,7 @@ impl CrdsGossipPush {
|
|||||||
for i in start..(start + push_fanout) {
|
for i in start..(start + push_fanout) {
|
||||||
let index = i % self.active_set.len();
|
let index = i % self.active_set.len();
|
||||||
let (peer, filter) = self.active_set.get_index(index).unwrap();
|
let (peer, filter) = self.active_set.get_index(index).unwrap();
|
||||||
if !filter.contains(&origin) {
|
if !filter.contains(&origin) || value.should_force_push(peer) {
|
||||||
trace!("new_push_messages insert {} {:?}", *peer, value);
|
trace!("new_push_messages insert {} {:?}", *peer, value);
|
||||||
push_messages.entry(*peer).or_default().push(value.clone());
|
push_messages.entry(*peer).or_default().push(value.clone());
|
||||||
num_pushes += 1;
|
num_pushes += 1;
|
||||||
|
@ -2,6 +2,7 @@ use crate::contact_info::ContactInfo;
|
|||||||
use crate::deprecated;
|
use crate::deprecated;
|
||||||
use crate::epoch_slots::EpochSlots;
|
use crate::epoch_slots::EpochSlots;
|
||||||
use bincode::{serialize, serialized_size};
|
use bincode::{serialize, serialized_size};
|
||||||
|
use rand::Rng;
|
||||||
use solana_sdk::sanitize::{Sanitize, SanitizeError};
|
use solana_sdk::sanitize::{Sanitize, SanitizeError};
|
||||||
use solana_sdk::timing::timestamp;
|
use solana_sdk::timing::timestamp;
|
||||||
use solana_sdk::{
|
use solana_sdk::{
|
||||||
@ -77,6 +78,7 @@ pub enum CrdsData {
|
|||||||
EpochSlots(EpochSlotsIndex, EpochSlots),
|
EpochSlots(EpochSlotsIndex, EpochSlots),
|
||||||
LegacyVersion(LegacyVersion),
|
LegacyVersion(LegacyVersion),
|
||||||
Version(Version),
|
Version(Version),
|
||||||
|
NodeInstance(NodeInstance),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Sanitize for CrdsData {
|
impl Sanitize for CrdsData {
|
||||||
@ -105,6 +107,7 @@ impl Sanitize for CrdsData {
|
|||||||
}
|
}
|
||||||
CrdsData::LegacyVersion(version) => version.sanitize(),
|
CrdsData::LegacyVersion(version) => version.sanitize(),
|
||||||
CrdsData::Version(version) => version.sanitize(),
|
CrdsData::Version(version) => version.sanitize(),
|
||||||
|
CrdsData::NodeInstance(node) => node.sanitize(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -254,6 +257,55 @@ impl Version {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, PartialEq, AbiExample, Deserialize, Serialize)]
|
||||||
|
pub struct NodeInstance {
|
||||||
|
from: Pubkey,
|
||||||
|
wallclock: u64,
|
||||||
|
timestamp: u64, // Timestamp when the instance was created.
|
||||||
|
token: u64, // Randomly generated value at node instantiation.
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NodeInstance {
|
||||||
|
pub fn new(pubkey: Pubkey, now: u64) -> Self {
|
||||||
|
Self {
|
||||||
|
from: pubkey,
|
||||||
|
wallclock: now,
|
||||||
|
timestamp: now,
|
||||||
|
token: rand::thread_rng().gen(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clones the value with an updated wallclock.
|
||||||
|
pub fn with_wallclock(&self, now: u64) -> Self {
|
||||||
|
Self {
|
||||||
|
wallclock: now,
|
||||||
|
..*self
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns true if the crds-value is a duplicate instance
|
||||||
|
// of this node, with a more recent timestamp.
|
||||||
|
pub fn check_duplicate(&self, other: &CrdsValue) -> bool {
|
||||||
|
match &other.data {
|
||||||
|
CrdsData::NodeInstance(other) => {
|
||||||
|
self.token != other.token
|
||||||
|
&& self.timestamp <= other.timestamp
|
||||||
|
&& self.from == other.from
|
||||||
|
}
|
||||||
|
_ => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Sanitize for NodeInstance {
|
||||||
|
fn sanitize(&self) -> Result<(), SanitizeError> {
|
||||||
|
if self.wallclock >= MAX_WALLCLOCK {
|
||||||
|
return Err(SanitizeError::ValueOutOfBounds);
|
||||||
|
}
|
||||||
|
self.from.sanitize()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Type of the replicated value
|
/// Type of the replicated value
|
||||||
/// These are labels for values in a record that is associated with `Pubkey`
|
/// These are labels for values in a record that is associated with `Pubkey`
|
||||||
#[derive(PartialEq, Hash, Eq, Clone, Debug)]
|
#[derive(PartialEq, Hash, Eq, Clone, Debug)]
|
||||||
@ -266,6 +318,7 @@ pub enum CrdsValueLabel {
|
|||||||
AccountsHashes(Pubkey),
|
AccountsHashes(Pubkey),
|
||||||
LegacyVersion(Pubkey),
|
LegacyVersion(Pubkey),
|
||||||
Version(Pubkey),
|
Version(Pubkey),
|
||||||
|
NodeInstance(Pubkey),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl fmt::Display for CrdsValueLabel {
|
impl fmt::Display for CrdsValueLabel {
|
||||||
@ -279,6 +332,7 @@ impl fmt::Display for CrdsValueLabel {
|
|||||||
CrdsValueLabel::AccountsHashes(_) => write!(f, "AccountsHashes({})", self.pubkey()),
|
CrdsValueLabel::AccountsHashes(_) => write!(f, "AccountsHashes({})", self.pubkey()),
|
||||||
CrdsValueLabel::LegacyVersion(_) => write!(f, "LegacyVersion({})", self.pubkey()),
|
CrdsValueLabel::LegacyVersion(_) => write!(f, "LegacyVersion({})", self.pubkey()),
|
||||||
CrdsValueLabel::Version(_) => write!(f, "Version({})", self.pubkey()),
|
CrdsValueLabel::Version(_) => write!(f, "Version({})", self.pubkey()),
|
||||||
|
CrdsValueLabel::NodeInstance(_) => write!(f, "NodeInstance({})", self.pubkey()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -294,6 +348,7 @@ impl CrdsValueLabel {
|
|||||||
CrdsValueLabel::AccountsHashes(p) => *p,
|
CrdsValueLabel::AccountsHashes(p) => *p,
|
||||||
CrdsValueLabel::LegacyVersion(p) => *p,
|
CrdsValueLabel::LegacyVersion(p) => *p,
|
||||||
CrdsValueLabel::Version(p) => *p,
|
CrdsValueLabel::Version(p) => *p,
|
||||||
|
CrdsValueLabel::NodeInstance(p) => *p,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -335,6 +390,7 @@ impl CrdsValue {
|
|||||||
CrdsData::EpochSlots(_, p) => p.wallclock,
|
CrdsData::EpochSlots(_, p) => p.wallclock,
|
||||||
CrdsData::LegacyVersion(version) => version.wallclock,
|
CrdsData::LegacyVersion(version) => version.wallclock,
|
||||||
CrdsData::Version(version) => version.wallclock,
|
CrdsData::Version(version) => version.wallclock,
|
||||||
|
CrdsData::NodeInstance(node) => node.wallclock,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn pubkey(&self) -> Pubkey {
|
pub fn pubkey(&self) -> Pubkey {
|
||||||
@ -347,6 +403,7 @@ impl CrdsValue {
|
|||||||
CrdsData::EpochSlots(_, p) => p.from,
|
CrdsData::EpochSlots(_, p) => p.from,
|
||||||
CrdsData::LegacyVersion(version) => version.from,
|
CrdsData::LegacyVersion(version) => version.from,
|
||||||
CrdsData::Version(version) => version.from,
|
CrdsData::Version(version) => version.from,
|
||||||
|
CrdsData::NodeInstance(node) => node.from,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn label(&self) -> CrdsValueLabel {
|
pub fn label(&self) -> CrdsValueLabel {
|
||||||
@ -359,6 +416,7 @@ impl CrdsValue {
|
|||||||
CrdsData::EpochSlots(ix, _) => CrdsValueLabel::EpochSlots(*ix, self.pubkey()),
|
CrdsData::EpochSlots(ix, _) => CrdsValueLabel::EpochSlots(*ix, self.pubkey()),
|
||||||
CrdsData::LegacyVersion(_) => CrdsValueLabel::LegacyVersion(self.pubkey()),
|
CrdsData::LegacyVersion(_) => CrdsValueLabel::LegacyVersion(self.pubkey()),
|
||||||
CrdsData::Version(_) => CrdsValueLabel::Version(self.pubkey()),
|
CrdsData::Version(_) => CrdsValueLabel::Version(self.pubkey()),
|
||||||
|
CrdsData::NodeInstance(_) => CrdsValueLabel::NodeInstance(self.pubkey()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn contact_info(&self) -> Option<&ContactInfo> {
|
pub fn contact_info(&self) -> Option<&ContactInfo> {
|
||||||
@ -432,6 +490,7 @@ impl CrdsValue {
|
|||||||
CrdsValueLabel::AccountsHashes(*key),
|
CrdsValueLabel::AccountsHashes(*key),
|
||||||
CrdsValueLabel::LegacyVersion(*key),
|
CrdsValueLabel::LegacyVersion(*key),
|
||||||
CrdsValueLabel::Version(*key),
|
CrdsValueLabel::Version(*key),
|
||||||
|
CrdsValueLabel::NodeInstance(*key),
|
||||||
];
|
];
|
||||||
labels.extend((0..MAX_VOTES).map(|ix| CrdsValueLabel::Vote(ix, *key)));
|
labels.extend((0..MAX_VOTES).map(|ix| CrdsValueLabel::Vote(ix, *key)));
|
||||||
labels.extend((0..MAX_EPOCH_SLOTS).map(|ix| CrdsValueLabel::EpochSlots(ix, *key)));
|
labels.extend((0..MAX_EPOCH_SLOTS).map(|ix| CrdsValueLabel::EpochSlots(ix, *key)));
|
||||||
@ -469,6 +528,15 @@ impl CrdsValue {
|
|||||||
.vote_index()
|
.vote_index()
|
||||||
.expect("all values must be votes")
|
.expect("all values must be votes")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns true if, regardless of prunes, this crds-value
|
||||||
|
/// should be pushed to the receiving node.
|
||||||
|
pub fn should_force_push(&self, peer: &Pubkey) -> bool {
|
||||||
|
match &self.data {
|
||||||
|
CrdsData::NodeInstance(node) => node.from == *peer,
|
||||||
|
_ => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@ -482,7 +550,7 @@ mod test {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_labels() {
|
fn test_labels() {
|
||||||
let mut hits = [false; 6 + MAX_VOTES as usize + MAX_EPOCH_SLOTS as usize];
|
let mut hits = [false; 7 + MAX_VOTES as usize + MAX_EPOCH_SLOTS as usize];
|
||||||
// this method should cover all the possible labels
|
// this method should cover all the possible labels
|
||||||
for v in &CrdsValue::record_labels(&Pubkey::default()) {
|
for v in &CrdsValue::record_labels(&Pubkey::default()) {
|
||||||
match v {
|
match v {
|
||||||
@ -492,9 +560,10 @@ mod test {
|
|||||||
CrdsValueLabel::AccountsHashes(_) => hits[3] = true,
|
CrdsValueLabel::AccountsHashes(_) => hits[3] = true,
|
||||||
CrdsValueLabel::LegacyVersion(_) => hits[4] = true,
|
CrdsValueLabel::LegacyVersion(_) => hits[4] = true,
|
||||||
CrdsValueLabel::Version(_) => hits[5] = true,
|
CrdsValueLabel::Version(_) => hits[5] = true,
|
||||||
CrdsValueLabel::Vote(ix, _) => hits[*ix as usize + 6] = true,
|
CrdsValueLabel::NodeInstance(_) => hits[6] = true,
|
||||||
|
CrdsValueLabel::Vote(ix, _) => hits[*ix as usize + 7] = true,
|
||||||
CrdsValueLabel::EpochSlots(ix, _) => {
|
CrdsValueLabel::EpochSlots(ix, _) => {
|
||||||
hits[*ix as usize + MAX_VOTES as usize + 6] = true
|
hits[*ix as usize + MAX_VOTES as usize + 7] = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -669,4 +738,80 @@ mod test {
|
|||||||
assert!(!value.verify());
|
assert!(!value.verify());
|
||||||
serialize_deserialize_value(value, correct_keypair);
|
serialize_deserialize_value(value, correct_keypair);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_check_duplicate_instance() {
|
||||||
|
fn make_crds_value(node: NodeInstance) -> CrdsValue {
|
||||||
|
CrdsValue::new_unsigned(CrdsData::NodeInstance(node))
|
||||||
|
}
|
||||||
|
let now = timestamp();
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
let pubkey = Pubkey::new_unique();
|
||||||
|
let node = NodeInstance::new(pubkey, now);
|
||||||
|
// Same token is not a duplicate.
|
||||||
|
assert!(!node.check_duplicate(&make_crds_value(NodeInstance {
|
||||||
|
from: pubkey,
|
||||||
|
wallclock: now + 1,
|
||||||
|
timestamp: now + 1,
|
||||||
|
token: node.token,
|
||||||
|
})));
|
||||||
|
// Older timestamp is not a duplicate.
|
||||||
|
assert!(!node.check_duplicate(&make_crds_value(NodeInstance {
|
||||||
|
from: pubkey,
|
||||||
|
wallclock: now + 1,
|
||||||
|
timestamp: now - 1,
|
||||||
|
token: rng.gen(),
|
||||||
|
})));
|
||||||
|
// Updated wallclock is not a duplicate.
|
||||||
|
let other = node.with_wallclock(now + 8);
|
||||||
|
assert_eq!(
|
||||||
|
other,
|
||||||
|
NodeInstance {
|
||||||
|
from: pubkey,
|
||||||
|
wallclock: now + 8,
|
||||||
|
timestamp: now,
|
||||||
|
token: node.token,
|
||||||
|
}
|
||||||
|
);
|
||||||
|
assert!(!node.check_duplicate(&make_crds_value(other)));
|
||||||
|
// Duplicate instance.
|
||||||
|
assert!(node.check_duplicate(&make_crds_value(NodeInstance {
|
||||||
|
from: pubkey,
|
||||||
|
wallclock: 0,
|
||||||
|
timestamp: now,
|
||||||
|
token: rng.gen(),
|
||||||
|
})));
|
||||||
|
// Different pubkey is not a duplicate.
|
||||||
|
assert!(!node.check_duplicate(&make_crds_value(NodeInstance {
|
||||||
|
from: Pubkey::new_unique(),
|
||||||
|
wallclock: now + 1,
|
||||||
|
timestamp: now + 1,
|
||||||
|
token: rng.gen(),
|
||||||
|
})));
|
||||||
|
// Differnt crds value is not a duplicate.
|
||||||
|
assert!(
|
||||||
|
!node.check_duplicate(&CrdsValue::new_unsigned(CrdsData::ContactInfo(
|
||||||
|
ContactInfo::new_rand(&mut rng, Some(pubkey))
|
||||||
|
)))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_should_force_push() {
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
let pubkey = Pubkey::new_unique();
|
||||||
|
assert!(
|
||||||
|
!CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_rand(
|
||||||
|
&mut rng,
|
||||||
|
Some(pubkey),
|
||||||
|
)))
|
||||||
|
.should_force_push(&pubkey)
|
||||||
|
);
|
||||||
|
let node = CrdsValue::new_unsigned(CrdsData::NodeInstance(NodeInstance::new(
|
||||||
|
pubkey,
|
||||||
|
timestamp(),
|
||||||
|
)));
|
||||||
|
assert!(node.should_force_push(&pubkey));
|
||||||
|
assert!(!node.should_force_push(&Pubkey::new_unique()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -31,6 +31,7 @@ pub enum Error {
|
|||||||
BlockstoreError(blockstore::BlockstoreError),
|
BlockstoreError(blockstore::BlockstoreError),
|
||||||
FsExtra(fs_extra::error::Error),
|
FsExtra(fs_extra::error::Error),
|
||||||
SnapshotError(snapshot_utils::SnapshotError),
|
SnapshotError(snapshot_utils::SnapshotError),
|
||||||
|
DuplicateNodeInstance,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type Result<T> = std::result::Result<T, Error>;
|
pub type Result<T> = std::result::Result<T, Error>;
|
||||||
|
Reference in New Issue
Block a user