checks for duplicate validator instances using gossip (bp #14018) (#14028)

* checks for duplicate validator instances using gossip

(cherry picked from commit 8cd5eb9863)

# Conflicts:
#	core/src/cluster_info.rs

* pushes node-instance along with version early in gossip

(cherry picked from commit 542198180a)

* removes RwLock on ClusterInfo.instance

(cherry picked from commit 895d7d6a65)

# Conflicts:
#	core/src/cluster_info.rs

* std::process::exit to kill all threads

(cherry picked from commit 1d267eae6b)

* removes backport merge conflicts

Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
mergify[bot]
2020-12-09 13:04:57 -08:00
committed by GitHub
parent c20e74a248
commit 07191dc224
4 changed files with 224 additions and 38 deletions

View File

@ -18,8 +18,8 @@ use crate::{
crds_gossip_error::CrdsGossipError, crds_gossip_error::CrdsGossipError,
crds_gossip_pull::{CrdsFilter, ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}, crds_gossip_pull::{CrdsFilter, ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS},
crds_value::{ crds_value::{
self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, LowestSlot, SnapshotHash, self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, LowestSlot, NodeInstance,
Version, Vote, MAX_WALLCLOCK, SnapshotHash, Version, Vote, MAX_WALLCLOCK,
}, },
data_budget::DataBudget, data_budget::DataBudget,
epoch_slots::EpochSlots, epoch_slots::EpochSlots,
@ -298,6 +298,7 @@ pub struct ClusterInfo {
stats: GossipStats, stats: GossipStats,
socket: UdpSocket, socket: UdpSocket,
local_message_pending_push_queue: RwLock<Vec<(CrdsValue, u64)>>, local_message_pending_push_queue: RwLock<Vec<(CrdsValue, u64)>>,
instance: NodeInstance,
} }
impl Default for ClusterInfo { impl Default for ClusterInfo {
@ -422,7 +423,7 @@ pub fn make_accounts_hashes_message(
type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>; type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;
// TODO These messages should go through the gpu pipeline for spam filtering // TODO These messages should go through the gpu pipeline for spam filtering
#[frozen_abi(digest = "8L3mKuv292LTa3XFCGNVdaFihWnsgYE4hf941p9gqUxF")] #[frozen_abi(digest = "6PpTdBvyX37y5ERokb8DejgKobpsuTbFJC39f8Eqz7Vy")]
#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)] #[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
enum Protocol { enum Protocol {
@ -553,6 +554,7 @@ impl ClusterInfo {
stats: GossipStats::default(), stats: GossipStats::default(),
socket: UdpSocket::bind("0.0.0.0:0").unwrap(), socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
local_message_pending_push_queue: RwLock::new(vec![]), local_message_pending_push_queue: RwLock::new(vec![]),
instance: NodeInstance::new(id, timestamp()),
}; };
{ {
let mut gossip = me.gossip.write().unwrap(); let mut gossip = me.gossip.write().unwrap();
@ -586,6 +588,7 @@ impl ClusterInfo {
.unwrap() .unwrap()
.clone(), .clone(),
), ),
instance: NodeInstance::new(*new_id, timestamp()),
} }
} }
@ -606,16 +609,24 @@ impl ClusterInfo {
) { ) {
let now = timestamp(); let now = timestamp();
self.my_contact_info.write().unwrap().wallclock = now; self.my_contact_info.write().unwrap().wallclock = now;
let entry = let entries: Vec<_> = vec![
CrdsValue::new_signed(CrdsData::ContactInfo(self.my_contact_info()), &self.keypair); CrdsData::ContactInfo(self.my_contact_info()),
CrdsData::NodeInstance(self.instance.with_wallclock(now)),
]
.into_iter()
.map(|v| CrdsValue::new_signed(v, &self.keypair))
.collect();
{
let mut local_message_pending_push_queue =
self.local_message_pending_push_queue.write().unwrap();
for entry in entries {
local_message_pending_push_queue.push((entry, now));
}
}
self.gossip self.gossip
.write() .write()
.unwrap() .unwrap()
.refresh_push_active_set(stakes, gossip_validators); .refresh_push_active_set(stakes, gossip_validators);
self.local_message_pending_push_queue
.write()
.unwrap()
.push((entry, now));
} }
// TODO kill insert_info, only used by tests // TODO kill insert_info, only used by tests
@ -1789,9 +1800,14 @@ impl ClusterInfo {
let mut last_contact_info_trace = timestamp(); let mut last_contact_info_trace = timestamp();
let mut adopt_shred_version = self.my_shred_version() == 0; let mut adopt_shred_version = self.my_shred_version() == 0;
let recycler = PacketsRecycler::default(); let recycler = PacketsRecycler::default();
let crds_data = vec![
let message = CrdsData::Version(Version::new(self.id())); CrdsData::Version(Version::new(self.id())),
self.push_message(CrdsValue::new_signed(message, &self.keypair)); CrdsData::NodeInstance(self.instance.with_wallclock(timestamp())),
];
for value in crds_data {
let value = CrdsValue::new_signed(value, &self.keypair);
self.push_message(value);
}
let mut generate_pull_requests = true; let mut generate_pull_requests = true;
loop { loop {
let start = timestamp(); let start = timestamp();
@ -2487,8 +2503,8 @@ impl ClusterInfo {
stakes: HashMap<Pubkey, u64>, stakes: HashMap<Pubkey, u64>,
feature_set: Option<&FeatureSet>, feature_set: Option<&FeatureSet>,
epoch_time_ms: u64, epoch_time_ms: u64,
) { ) -> Result<()> {
let mut timer = Measure::start("process_gossip_packets_time"); let _st = ScopedTimer::from(&self.stats.process_gossip_packets_time);
let packets: Vec<_> = thread_pool.install(|| { let packets: Vec<_> = thread_pool.install(|| {
packets packets
.into_par_iter() .into_par_iter()
@ -2501,6 +2517,16 @@ impl ClusterInfo {
}) })
.collect() .collect()
}); });
// Check if there is a duplicate instance of
// this node with more recent timestamp.
let check_duplicate_instance = |values: &[CrdsValue]| {
for value in values {
if self.instance.check_duplicate(value) {
return Err(Error::DuplicateNodeInstance);
}
}
Ok(())
};
// Split packets based on their types. // Split packets based on their types.
let mut pull_requests = vec![]; let mut pull_requests = vec![];
let mut pull_responses = vec![]; let mut pull_responses = vec![];
@ -2513,8 +2539,14 @@ impl ClusterInfo {
Protocol::PullRequest(filter, caller) => { Protocol::PullRequest(filter, caller) => {
pull_requests.push((from_addr, filter, caller)) pull_requests.push((from_addr, filter, caller))
} }
Protocol::PullResponse(from, data) => pull_responses.push((from, data)), Protocol::PullResponse(from, data) => {
Protocol::PushMessage(from, data) => push_messages.push((from, data)), check_duplicate_instance(&data)?;
pull_responses.push((from, data));
}
Protocol::PushMessage(from, data) => {
check_duplicate_instance(&data)?;
push_messages.push((from, data));
}
Protocol::PruneMessage(from, data) => prune_messages.push((from, data)), Protocol::PruneMessage(from, data) => prune_messages.push((from, data)),
Protocol::PingMessage(ping) => ping_messages.push((from_addr, ping)), Protocol::PingMessage(ping) => ping_messages.push((from_addr, ping)),
Protocol::PongMessage(pong) => pong_messages.push((from_addr, pong)), Protocol::PongMessage(pong) => pong_messages.push((from_addr, pong)),
@ -2539,9 +2571,7 @@ impl ClusterInfo {
response_sender, response_sender,
feature_set, feature_set,
); );
self.stats Ok(())
.process_gossip_packets_time
.add_measure(&mut timer);
} }
/// Process messages from the network /// Process messages from the network
@ -2588,7 +2618,7 @@ impl ClusterInfo {
stakes, stakes,
feature_set.as_deref(), feature_set.as_deref(),
epoch_time_ms, epoch_time_ms,
); )?;
self.print_reset_stats(last_print); self.print_reset_stats(last_print);
@ -2853,25 +2883,36 @@ impl ClusterInfo {
.build() .build()
.unwrap(); .unwrap();
let mut last_print = Instant::now(); let mut last_print = Instant::now();
loop { while !exit.load(Ordering::Relaxed) {
let e = self.run_listen( if let Err(err) = self.run_listen(
&recycler, &recycler,
bank_forks.as_ref(), bank_forks.as_ref(),
&requests_receiver, &requests_receiver,
&response_sender, &response_sender,
&thread_pool, &thread_pool,
&mut last_print, &mut last_print,
); ) {
if exit.load(Ordering::Relaxed) { match err {
return; Error::RecvTimeoutError(_) => {
} let table_size = self.gossip.read().unwrap().crds.len();
if e.is_err() { debug!(
let r_gossip = self.gossip.read().unwrap(); "{}: run_listen timeout, table size: {}",
debug!( self.id(),
"{}: run_listen timeout, table size: {}", table_size,
self.id(), );
r_gossip.crds.len() }
); Error::DuplicateNodeInstance => {
error!(
"duplicate running instances of the same validator node: {}",
self.id()
);
exit.store(true, Ordering::Relaxed);
// TODO: Pass through ValidatorExit here so
// that this will exit cleanly.
std::process::exit(1);
}
_ => error!("gossip run_listen failed: {}", err),
}
} }
thread_mem_usage::datapoint("solana-listen"); thread_mem_usage::datapoint("solana-listen");
} }

View File

@ -247,7 +247,7 @@ impl CrdsGossipPush {
for i in start..(start + push_fanout) { for i in start..(start + push_fanout) {
let index = i % self.active_set.len(); let index = i % self.active_set.len();
let (peer, filter) = self.active_set.get_index(index).unwrap(); let (peer, filter) = self.active_set.get_index(index).unwrap();
if !filter.contains(&origin) { if !filter.contains(&origin) || value.should_force_push(peer) {
trace!("new_push_messages insert {} {:?}", *peer, value); trace!("new_push_messages insert {} {:?}", *peer, value);
push_messages.entry(*peer).or_default().push(value.clone()); push_messages.entry(*peer).or_default().push(value.clone());
num_pushes += 1; num_pushes += 1;

View File

@ -79,6 +79,7 @@ pub enum CrdsData {
EpochSlots(EpochSlotsIndex, EpochSlots), EpochSlots(EpochSlotsIndex, EpochSlots),
LegacyVersion(LegacyVersion), LegacyVersion(LegacyVersion),
Version(Version), Version(Version),
NodeInstance(NodeInstance),
} }
impl Sanitize for CrdsData { impl Sanitize for CrdsData {
@ -107,6 +108,7 @@ impl Sanitize for CrdsData {
} }
CrdsData::LegacyVersion(version) => version.sanitize(), CrdsData::LegacyVersion(version) => version.sanitize(),
CrdsData::Version(version) => version.sanitize(), CrdsData::Version(version) => version.sanitize(),
CrdsData::NodeInstance(node) => node.sanitize(),
} }
} }
} }
@ -323,6 +325,55 @@ impl Version {
} }
} }
#[derive(Clone, Debug, PartialEq, AbiExample, Deserialize, Serialize)]
pub struct NodeInstance {
from: Pubkey,
wallclock: u64,
timestamp: u64, // Timestamp when the instance was created.
token: u64, // Randomly generated value at node instantiation.
}
impl NodeInstance {
pub fn new(pubkey: Pubkey, now: u64) -> Self {
Self {
from: pubkey,
wallclock: now,
timestamp: now,
token: rand::thread_rng().gen(),
}
}
// Clones the value with an updated wallclock.
pub fn with_wallclock(&self, now: u64) -> Self {
Self {
wallclock: now,
..*self
}
}
// Returns true if the crds-value is a duplicate instance
// of this node, with a more recent timestamp.
pub fn check_duplicate(&self, other: &CrdsValue) -> bool {
match &other.data {
CrdsData::NodeInstance(other) => {
self.token != other.token
&& self.timestamp <= other.timestamp
&& self.from == other.from
}
_ => false,
}
}
}
impl Sanitize for NodeInstance {
fn sanitize(&self) -> Result<(), SanitizeError> {
if self.wallclock >= MAX_WALLCLOCK {
return Err(SanitizeError::ValueOutOfBounds);
}
self.from.sanitize()
}
}
/// Type of the replicated value /// Type of the replicated value
/// These are labels for values in a record that is associated with `Pubkey` /// These are labels for values in a record that is associated with `Pubkey`
#[derive(PartialEq, Hash, Eq, Clone, Debug)] #[derive(PartialEq, Hash, Eq, Clone, Debug)]
@ -335,6 +386,7 @@ pub enum CrdsValueLabel {
AccountsHashes(Pubkey), AccountsHashes(Pubkey),
LegacyVersion(Pubkey), LegacyVersion(Pubkey),
Version(Pubkey), Version(Pubkey),
NodeInstance(Pubkey),
} }
impl fmt::Display for CrdsValueLabel { impl fmt::Display for CrdsValueLabel {
@ -348,6 +400,7 @@ impl fmt::Display for CrdsValueLabel {
CrdsValueLabel::AccountsHashes(_) => write!(f, "AccountsHashes({})", self.pubkey()), CrdsValueLabel::AccountsHashes(_) => write!(f, "AccountsHashes({})", self.pubkey()),
CrdsValueLabel::LegacyVersion(_) => write!(f, "LegacyVersion({})", self.pubkey()), CrdsValueLabel::LegacyVersion(_) => write!(f, "LegacyVersion({})", self.pubkey()),
CrdsValueLabel::Version(_) => write!(f, "Version({})", self.pubkey()), CrdsValueLabel::Version(_) => write!(f, "Version({})", self.pubkey()),
CrdsValueLabel::NodeInstance(_) => write!(f, "NodeInstance({})", self.pubkey()),
} }
} }
} }
@ -363,6 +416,7 @@ impl CrdsValueLabel {
CrdsValueLabel::AccountsHashes(p) => *p, CrdsValueLabel::AccountsHashes(p) => *p,
CrdsValueLabel::LegacyVersion(p) => *p, CrdsValueLabel::LegacyVersion(p) => *p,
CrdsValueLabel::Version(p) => *p, CrdsValueLabel::Version(p) => *p,
CrdsValueLabel::NodeInstance(p) => *p,
} }
} }
} }
@ -409,6 +463,7 @@ impl CrdsValue {
CrdsData::EpochSlots(_, p) => p.wallclock, CrdsData::EpochSlots(_, p) => p.wallclock,
CrdsData::LegacyVersion(version) => version.wallclock, CrdsData::LegacyVersion(version) => version.wallclock,
CrdsData::Version(version) => version.wallclock, CrdsData::Version(version) => version.wallclock,
CrdsData::NodeInstance(node) => node.wallclock,
} }
} }
pub fn pubkey(&self) -> Pubkey { pub fn pubkey(&self) -> Pubkey {
@ -421,6 +476,7 @@ impl CrdsValue {
CrdsData::EpochSlots(_, p) => p.from, CrdsData::EpochSlots(_, p) => p.from,
CrdsData::LegacyVersion(version) => version.from, CrdsData::LegacyVersion(version) => version.from,
CrdsData::Version(version) => version.from, CrdsData::Version(version) => version.from,
CrdsData::NodeInstance(node) => node.from,
} }
} }
pub fn label(&self) -> CrdsValueLabel { pub fn label(&self) -> CrdsValueLabel {
@ -433,6 +489,7 @@ impl CrdsValue {
CrdsData::EpochSlots(ix, _) => CrdsValueLabel::EpochSlots(*ix, self.pubkey()), CrdsData::EpochSlots(ix, _) => CrdsValueLabel::EpochSlots(*ix, self.pubkey()),
CrdsData::LegacyVersion(_) => CrdsValueLabel::LegacyVersion(self.pubkey()), CrdsData::LegacyVersion(_) => CrdsValueLabel::LegacyVersion(self.pubkey()),
CrdsData::Version(_) => CrdsValueLabel::Version(self.pubkey()), CrdsData::Version(_) => CrdsValueLabel::Version(self.pubkey()),
CrdsData::NodeInstance(_) => CrdsValueLabel::NodeInstance(self.pubkey()),
} }
} }
pub fn contact_info(&self) -> Option<&ContactInfo> { pub fn contact_info(&self) -> Option<&ContactInfo> {
@ -499,13 +556,14 @@ impl CrdsValue {
/// Return all the possible labels for a record identified by Pubkey. /// Return all the possible labels for a record identified by Pubkey.
pub fn record_labels(key: Pubkey) -> impl Iterator<Item = CrdsValueLabel> { pub fn record_labels(key: Pubkey) -> impl Iterator<Item = CrdsValueLabel> {
const CRDS_VALUE_LABEL_STUBS: [fn(Pubkey) -> CrdsValueLabel; 6] = [ const CRDS_VALUE_LABEL_STUBS: [fn(Pubkey) -> CrdsValueLabel; 7] = [
CrdsValueLabel::ContactInfo, CrdsValueLabel::ContactInfo,
CrdsValueLabel::LowestSlot, CrdsValueLabel::LowestSlot,
CrdsValueLabel::SnapshotHashes, CrdsValueLabel::SnapshotHashes,
CrdsValueLabel::AccountsHashes, CrdsValueLabel::AccountsHashes,
CrdsValueLabel::LegacyVersion, CrdsValueLabel::LegacyVersion,
CrdsValueLabel::Version, CrdsValueLabel::Version,
CrdsValueLabel::NodeInstance,
]; ];
CRDS_VALUE_LABEL_STUBS CRDS_VALUE_LABEL_STUBS
.iter() .iter()
@ -545,6 +603,15 @@ impl CrdsValue {
.vote_index() .vote_index()
.expect("all values must be votes") .expect("all values must be votes")
} }
/// Returns true if, regardless of prunes, this crds-value
/// should be pushed to the receiving node.
pub fn should_force_push(&self, peer: &Pubkey) -> bool {
match &self.data {
CrdsData::NodeInstance(node) => node.from == *peer,
_ => false,
}
}
} }
/// Filters out an iterator of crds values, returning /// Filters out an iterator of crds values, returning
@ -584,7 +651,7 @@ mod test {
#[test] #[test]
fn test_labels() { fn test_labels() {
let mut hits = [false; 6 + MAX_VOTES as usize + MAX_EPOCH_SLOTS as usize]; let mut hits = [false; 7 + MAX_VOTES as usize + MAX_EPOCH_SLOTS as usize];
// this method should cover all the possible labels // this method should cover all the possible labels
for v in CrdsValue::record_labels(Pubkey::default()) { for v in CrdsValue::record_labels(Pubkey::default()) {
match &v { match &v {
@ -594,9 +661,10 @@ mod test {
CrdsValueLabel::AccountsHashes(_) => hits[3] = true, CrdsValueLabel::AccountsHashes(_) => hits[3] = true,
CrdsValueLabel::LegacyVersion(_) => hits[4] = true, CrdsValueLabel::LegacyVersion(_) => hits[4] = true,
CrdsValueLabel::Version(_) => hits[5] = true, CrdsValueLabel::Version(_) => hits[5] = true,
CrdsValueLabel::Vote(ix, _) => hits[*ix as usize + 6] = true, CrdsValueLabel::NodeInstance(_) => hits[6] = true,
CrdsValueLabel::Vote(ix, _) => hits[*ix as usize + 7] = true,
CrdsValueLabel::EpochSlots(ix, _) => { CrdsValueLabel::EpochSlots(ix, _) => {
hits[*ix as usize + MAX_VOTES as usize + 6] = true hits[*ix as usize + MAX_VOTES as usize + 7] = true
} }
} }
} }
@ -806,4 +874,80 @@ mod test {
// cannot be more than 5 times number of keys. // cannot be more than 5 times number of keys.
assert!(currents.len() <= keys.len() * 5); assert!(currents.len() <= keys.len() * 5);
} }
#[test]
fn test_check_duplicate_instance() {
fn make_crds_value(node: NodeInstance) -> CrdsValue {
CrdsValue::new_unsigned(CrdsData::NodeInstance(node))
}
let now = timestamp();
let mut rng = rand::thread_rng();
let pubkey = Pubkey::new_unique();
let node = NodeInstance::new(pubkey, now);
// Same token is not a duplicate.
assert!(!node.check_duplicate(&make_crds_value(NodeInstance {
from: pubkey,
wallclock: now + 1,
timestamp: now + 1,
token: node.token,
})));
// Older timestamp is not a duplicate.
assert!(!node.check_duplicate(&make_crds_value(NodeInstance {
from: pubkey,
wallclock: now + 1,
timestamp: now - 1,
token: rng.gen(),
})));
// Updated wallclock is not a duplicate.
let other = node.with_wallclock(now + 8);
assert_eq!(
other,
NodeInstance {
from: pubkey,
wallclock: now + 8,
timestamp: now,
token: node.token,
}
);
assert!(!node.check_duplicate(&make_crds_value(other)));
// Duplicate instance.
assert!(node.check_duplicate(&make_crds_value(NodeInstance {
from: pubkey,
wallclock: 0,
timestamp: now,
token: rng.gen(),
})));
// Different pubkey is not a duplicate.
assert!(!node.check_duplicate(&make_crds_value(NodeInstance {
from: Pubkey::new_unique(),
wallclock: now + 1,
timestamp: now + 1,
token: rng.gen(),
})));
// Differnt crds value is not a duplicate.
assert!(
!node.check_duplicate(&CrdsValue::new_unsigned(CrdsData::ContactInfo(
ContactInfo::new_rand(&mut rng, Some(pubkey))
)))
);
}
#[test]
fn test_should_force_push() {
let mut rng = rand::thread_rng();
let pubkey = Pubkey::new_unique();
assert!(
!CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_rand(
&mut rng,
Some(pubkey),
)))
.should_force_push(&pubkey)
);
let node = CrdsValue::new_unsigned(CrdsData::NodeInstance(NodeInstance::new(
pubkey,
timestamp(),
)));
assert!(node.should_force_push(&pubkey));
assert!(!node.should_force_push(&Pubkey::new_unique()));
}
} }

View File

@ -32,6 +32,7 @@ pub enum Error {
FsExtra(fs_extra::error::Error), FsExtra(fs_extra::error::Error),
SnapshotError(snapshot_utils::SnapshotError), SnapshotError(snapshot_utils::SnapshotError),
WeightedIndexError(rand::distributions::weighted::WeightedError), WeightedIndexError(rand::distributions::weighted::WeightedError),
DuplicateNodeInstance,
} }
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;