Add Accounts hash consistency halting (#8772) (#8889)

* Accounts hash consistency halting

* Add option to inject account hash faults for testing.

Enable option in local cluster test to see that node halts.
This commit is contained in:
sakridge
2020-03-16 14:29:44 -07:00
committed by GitHub
parent e2cfc513eb
commit 1cc66f0cd7
11 changed files with 763 additions and 384 deletions

670
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,196 @@
// Service to verify accounts hashes with other trusted validator nodes.
//
// Each interval, publish the snapshat hash which is the full accounts state
// hash on gossip. Monitor gossip for messages from validators in the --trusted-validators
// set and halt the node if a mismatch is detected.
use crate::cluster_info::ClusterInfo;
use solana_ledger::{
snapshot_package::SnapshotPackage, snapshot_package::SnapshotPackageReceiver,
snapshot_package::SnapshotPackageSender,
};
use solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey};
use std::collections::{HashMap, HashSet};
use std::{
sync::{
atomic::{AtomicBool, Ordering},
mpsc::RecvTimeoutError,
Arc, RwLock,
},
thread::{self, Builder, JoinHandle},
time::Duration,
};
pub struct AccountsHashVerifier {
t_accounts_hash_verifier: JoinHandle<()>,
}
impl AccountsHashVerifier {
pub fn new(
snapshot_package_receiver: SnapshotPackageReceiver,
snapshot_package_sender: Option<SnapshotPackageSender>,
exit: &Arc<AtomicBool>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
trusted_validators: Option<HashSet<Pubkey>>,
halt_on_trusted_validators_accounts_hash_mismatch: bool,
fault_injection_rate_slots: u64,
) -> Self {
let exit = exit.clone();
let cluster_info = cluster_info.clone();
let t_accounts_hash_verifier = Builder::new()
.name("solana-accounts-hash".to_string())
.spawn(move || {
let mut hashes = vec![];
loop {
if exit.load(Ordering::Relaxed) {
break;
}
match snapshot_package_receiver.recv_timeout(Duration::from_secs(1)) {
Ok(snapshot_package) => {
Self::process_snapshot(
snapshot_package,
&cluster_info,
&trusted_validators,
halt_on_trusted_validators_accounts_hash_mismatch,
&snapshot_package_sender,
&mut hashes,
&exit,
fault_injection_rate_slots,
);
}
Err(RecvTimeoutError::Disconnected) => break,
Err(RecvTimeoutError::Timeout) => (),
}
}
})
.unwrap();
Self {
t_accounts_hash_verifier,
}
}
fn process_snapshot(
snapshot_package: SnapshotPackage,
cluster_info: &Arc<RwLock<ClusterInfo>>,
trusted_validators: &Option<HashSet<Pubkey>>,
halt_on_trusted_validator_accounts_hash_mismatch: bool,
snapshot_package_sender: &Option<SnapshotPackageSender>,
hashes: &mut Vec<(Slot, Hash)>,
exit: &Arc<AtomicBool>,
fault_injection_rate_slots: u64,
) {
if fault_injection_rate_slots != 0
&& snapshot_package.root % fault_injection_rate_slots == 0
{
// For testing, publish an invalid hash to gossip.
use rand::{thread_rng, Rng};
use solana_sdk::hash::extend_and_hash;
warn!("inserting fault at slot: {}", snapshot_package.root);
let rand = thread_rng().gen_range(0, 10);
let hash = extend_and_hash(&snapshot_package.hash, &[rand]);
hashes.push((snapshot_package.root, hash));
} else {
hashes.push((snapshot_package.root, snapshot_package.hash));
}
if halt_on_trusted_validator_accounts_hash_mismatch {
let mut slot_to_hash = HashMap::new();
for (slot, hash) in hashes.iter() {
slot_to_hash.insert(*slot, *hash);
}
if Self::should_halt(&cluster_info, trusted_validators, &mut slot_to_hash) {
exit.store(true, Ordering::Relaxed);
}
}
if let Some(sender) = snapshot_package_sender.as_ref() {
if sender.send(snapshot_package).is_err() {}
}
cluster_info
.write()
.unwrap()
.push_accounts_hashes(hashes.clone());
}
fn should_halt(
cluster_info: &Arc<RwLock<ClusterInfo>>,
trusted_validators: &Option<HashSet<Pubkey>>,
slot_to_hash: &mut HashMap<Slot, Hash>,
) -> bool {
if let Some(trusted_validators) = trusted_validators.as_ref() {
for trusted_validator in trusted_validators {
let cluster_info_r = cluster_info.read().unwrap();
if let Some(accounts_hashes) =
cluster_info_r.get_accounts_hash_for_node(trusted_validator)
{
for (slot, hash) in accounts_hashes {
if let Some(reference_hash) = slot_to_hash.get(slot) {
if *hash != *reference_hash {
error!("Trusted validator {} produced conflicting hashes for slot: {} ({} != {})",
trusted_validator,
slot,
hash,
reference_hash,
);
return true;
}
} else {
slot_to_hash.insert(*slot, *hash);
}
}
}
}
}
false
}
pub fn join(self) -> thread::Result<()> {
self.t_accounts_hash_verifier.join()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cluster_info::make_accounts_hashes_message;
use crate::contact_info::ContactInfo;
use solana_sdk::{
hash::hash,
signature::{Keypair, Signer},
};
#[test]
fn test_should_halt() {
let keypair = Keypair::new();
let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), 0);
let cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
let mut trusted_validators = HashSet::new();
let mut slot_to_hash = HashMap::new();
assert!(!AccountsHashVerifier::should_halt(
&cluster_info,
&Some(trusted_validators.clone()),
&mut slot_to_hash,
));
let validator1 = Keypair::new();
let hash1 = hash(&[1]);
let hash2 = hash(&[2]);
{
let message = make_accounts_hashes_message(&validator1, vec![(0, hash1)]).unwrap();
let mut cluster_info_w = cluster_info.write().unwrap();
cluster_info_w.push_message(message);
}
slot_to_hash.insert(0, hash2);
trusted_validators.insert(validator1.pubkey());
assert!(AccountsHashVerifier::should_halt(
&cluster_info,
&Some(trusted_validators.clone()),
&mut slot_to_hash,
));
}
}

View File

@ -48,7 +48,7 @@ use solana_sdk::timing::duration_as_s;
use solana_sdk::{ use solana_sdk::{
clock::{Slot, DEFAULT_MS_PER_SLOT}, clock::{Slot, DEFAULT_MS_PER_SLOT},
pubkey::Pubkey, pubkey::Pubkey,
signature::{Keypair, Signable, Signature}, signature::{Keypair, Signable, Signature, Signer},
timing::{duration_as_ms, timestamp}, timing::{duration_as_ms, timestamp},
transaction::Transaction, transaction::Transaction,
}; };
@ -180,6 +180,14 @@ struct PullData {
pub filter: CrdsFilter, pub filter: CrdsFilter,
} }
pub fn make_accounts_hashes_message(
keypair: &Keypair,
accounts_hashes: Vec<(Slot, Hash)>,
) -> Option<CrdsValue> {
let message = CrdsData::AccountsHashes(SnapshotHash::new(keypair.pubkey(), accounts_hashes));
Some(CrdsValue::new_signed(message, keypair))
}
// TODO These messages should go through the gpu pipeline for spam filtering // TODO These messages should go through the gpu pipeline for spam filtering
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
@ -443,22 +451,36 @@ impl ClusterInfo {
.process_push_message(&self.id(), vec![entry], now); .process_push_message(&self.id(), vec![entry], now);
} }
pub fn push_snapshot_hashes(&mut self, snapshot_hashes: Vec<(Slot, Hash)>) { pub fn push_message(&mut self, message: CrdsValue) {
if snapshot_hashes.len() > MAX_SNAPSHOT_HASHES { let now = message.wallclock();
let id = message.pubkey();
self.gossip.process_push_message(&id, vec![message], now);
}
pub fn push_accounts_hashes(&mut self, accounts_hashes: Vec<(Slot, Hash)>) {
if accounts_hashes.len() > MAX_SNAPSHOT_HASHES {
warn!( warn!(
"snapshot_hashes too large, ignored: {}", "accounts hashes too large, ignored: {}",
snapshot_hashes.len() accounts_hashes.len(),
); );
return; return;
} }
let now = timestamp(); let message = CrdsData::AccountsHashes(SnapshotHash::new(self.id(), accounts_hashes));
let entry = CrdsValue::new_signed( self.push_message(CrdsValue::new_signed(message, &self.keypair));
CrdsData::SnapshotHash(SnapshotHash::new(self.id(), snapshot_hashes, now)), }
&self.keypair,
pub fn push_snapshot_hashes(&mut self, snapshot_hashes: Vec<(Slot, Hash)>) {
if snapshot_hashes.len() > MAX_SNAPSHOT_HASHES {
warn!(
"snapshot hashes too large, ignored: {}",
snapshot_hashes.len(),
); );
self.gossip return;
.process_push_message(&self.id(), vec![entry], now); }
let message = CrdsData::SnapshotHashes(SnapshotHash::new(self.id(), snapshot_hashes));
self.push_message(CrdsValue::new_signed(message, &self.keypair));
} }
pub fn push_vote(&mut self, tower_index: usize, vote: Transaction) { pub fn push_vote(&mut self, tower_index: usize, vote: Transaction) {
@ -518,11 +540,19 @@ impl ClusterInfo {
.collect() .collect()
} }
pub fn get_accounts_hash_for_node(&self, pubkey: &Pubkey) -> Option<&Vec<(Slot, Hash)>> {
self.gossip
.crds
.table
.get(&CrdsValueLabel::AccountsHashes(*pubkey))
.map(|x| &x.value.accounts_hash().unwrap().hashes)
}
pub fn get_snapshot_hash_for_node(&self, pubkey: &Pubkey) -> Option<&Vec<(Slot, Hash)>> { pub fn get_snapshot_hash_for_node(&self, pubkey: &Pubkey) -> Option<&Vec<(Slot, Hash)>> {
self.gossip self.gossip
.crds .crds
.table .table
.get(&CrdsValueLabel::SnapshotHash(*pubkey)) .get(&CrdsValueLabel::SnapshotHashes(*pubkey))
.map(|x| &x.value.snapshot_hash().unwrap().hashes) .map(|x| &x.value.snapshot_hash().unwrap().hashes)
} }

View File

@ -1,5 +1,6 @@
use crate::contact_info::ContactInfo; use crate::contact_info::ContactInfo;
use bincode::{serialize, serialized_size}; use bincode::{serialize, serialized_size};
use solana_sdk::timing::timestamp;
use solana_sdk::{ use solana_sdk::{
clock::Slot, clock::Slot,
hash::Hash, hash::Hash,
@ -62,7 +63,8 @@ pub enum CrdsData {
ContactInfo(ContactInfo), ContactInfo(ContactInfo),
Vote(VoteIndex, Vote), Vote(VoteIndex, Vote),
EpochSlots(EpochSlotIndex, EpochSlots), EpochSlots(EpochSlotIndex, EpochSlots),
SnapshotHash(SnapshotHash), SnapshotHashes(SnapshotHash),
AccountsHashes(SnapshotHash),
} }
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
@ -93,11 +95,11 @@ pub struct SnapshotHash {
} }
impl SnapshotHash { impl SnapshotHash {
pub fn new(from: Pubkey, hashes: Vec<(Slot, Hash)>, wallclock: u64) -> Self { pub fn new(from: Pubkey, hashes: Vec<(Slot, Hash)>) -> Self {
Self { Self {
from, from,
hashes, hashes,
wallclock, wallclock: timestamp(),
} }
} }
} }
@ -156,7 +158,8 @@ pub enum CrdsValueLabel {
ContactInfo(Pubkey), ContactInfo(Pubkey),
Vote(VoteIndex, Pubkey), Vote(VoteIndex, Pubkey),
EpochSlots(Pubkey), EpochSlots(Pubkey),
SnapshotHash(Pubkey), SnapshotHashes(Pubkey),
AccountsHashes(Pubkey),
} }
impl fmt::Display for CrdsValueLabel { impl fmt::Display for CrdsValueLabel {
@ -165,7 +168,8 @@ impl fmt::Display for CrdsValueLabel {
CrdsValueLabel::ContactInfo(_) => write!(f, "ContactInfo({})", self.pubkey()), CrdsValueLabel::ContactInfo(_) => write!(f, "ContactInfo({})", self.pubkey()),
CrdsValueLabel::Vote(ix, _) => write!(f, "Vote({}, {})", ix, self.pubkey()), CrdsValueLabel::Vote(ix, _) => write!(f, "Vote({}, {})", ix, self.pubkey()),
CrdsValueLabel::EpochSlots(_) => write!(f, "EpochSlots({})", self.pubkey()), CrdsValueLabel::EpochSlots(_) => write!(f, "EpochSlots({})", self.pubkey()),
CrdsValueLabel::SnapshotHash(_) => write!(f, "SnapshotHash({})", self.pubkey()), CrdsValueLabel::SnapshotHashes(_) => write!(f, "SnapshotHashes({})", self.pubkey()),
CrdsValueLabel::AccountsHashes(_) => write!(f, "AccountsHashes({})", self.pubkey()),
} }
} }
} }
@ -176,7 +180,8 @@ impl CrdsValueLabel {
CrdsValueLabel::ContactInfo(p) => *p, CrdsValueLabel::ContactInfo(p) => *p,
CrdsValueLabel::Vote(_, p) => *p, CrdsValueLabel::Vote(_, p) => *p,
CrdsValueLabel::EpochSlots(p) => *p, CrdsValueLabel::EpochSlots(p) => *p,
CrdsValueLabel::SnapshotHash(p) => *p, CrdsValueLabel::SnapshotHashes(p) => *p,
CrdsValueLabel::AccountsHashes(p) => *p,
} }
} }
} }
@ -202,7 +207,8 @@ impl CrdsValue {
CrdsData::ContactInfo(contact_info) => contact_info.wallclock, CrdsData::ContactInfo(contact_info) => contact_info.wallclock,
CrdsData::Vote(_, vote) => vote.wallclock, CrdsData::Vote(_, vote) => vote.wallclock,
CrdsData::EpochSlots(_, vote) => vote.wallclock, CrdsData::EpochSlots(_, vote) => vote.wallclock,
CrdsData::SnapshotHash(hash) => hash.wallclock, CrdsData::SnapshotHashes(hash) => hash.wallclock,
CrdsData::AccountsHashes(hash) => hash.wallclock,
} }
} }
pub fn pubkey(&self) -> Pubkey { pub fn pubkey(&self) -> Pubkey {
@ -210,7 +216,8 @@ impl CrdsValue {
CrdsData::ContactInfo(contact_info) => contact_info.id, CrdsData::ContactInfo(contact_info) => contact_info.id,
CrdsData::Vote(_, vote) => vote.from, CrdsData::Vote(_, vote) => vote.from,
CrdsData::EpochSlots(_, slots) => slots.from, CrdsData::EpochSlots(_, slots) => slots.from,
CrdsData::SnapshotHash(hash) => hash.from, CrdsData::SnapshotHashes(hash) => hash.from,
CrdsData::AccountsHashes(hash) => hash.from,
} }
} }
pub fn label(&self) -> CrdsValueLabel { pub fn label(&self) -> CrdsValueLabel {
@ -218,7 +225,8 @@ impl CrdsValue {
CrdsData::ContactInfo(_) => CrdsValueLabel::ContactInfo(self.pubkey()), CrdsData::ContactInfo(_) => CrdsValueLabel::ContactInfo(self.pubkey()),
CrdsData::Vote(ix, _) => CrdsValueLabel::Vote(*ix, self.pubkey()), CrdsData::Vote(ix, _) => CrdsValueLabel::Vote(*ix, self.pubkey()),
CrdsData::EpochSlots(_, _) => CrdsValueLabel::EpochSlots(self.pubkey()), CrdsData::EpochSlots(_, _) => CrdsValueLabel::EpochSlots(self.pubkey()),
CrdsData::SnapshotHash(_) => CrdsValueLabel::SnapshotHash(self.pubkey()), CrdsData::SnapshotHashes(_) => CrdsValueLabel::SnapshotHashes(self.pubkey()),
CrdsData::AccountsHashes(_) => CrdsValueLabel::AccountsHashes(self.pubkey()),
} }
} }
pub fn contact_info(&self) -> Option<&ContactInfo> { pub fn contact_info(&self) -> Option<&ContactInfo> {
@ -250,7 +258,14 @@ impl CrdsValue {
pub fn snapshot_hash(&self) -> Option<&SnapshotHash> { pub fn snapshot_hash(&self) -> Option<&SnapshotHash> {
match &self.data { match &self.data {
CrdsData::SnapshotHash(slots) => Some(slots), CrdsData::SnapshotHashes(slots) => Some(slots),
_ => None,
}
}
pub fn accounts_hash(&self) -> Option<&SnapshotHash> {
match &self.data {
CrdsData::AccountsHashes(slots) => Some(slots),
_ => None, _ => None,
} }
} }
@ -260,7 +275,8 @@ impl CrdsValue {
let mut labels = vec![ let mut labels = vec![
CrdsValueLabel::ContactInfo(*key), CrdsValueLabel::ContactInfo(*key),
CrdsValueLabel::EpochSlots(*key), CrdsValueLabel::EpochSlots(*key),
CrdsValueLabel::SnapshotHash(*key), CrdsValueLabel::SnapshotHashes(*key),
CrdsValueLabel::AccountsHashes(*key),
]; ];
labels.extend((0..MAX_VOTES).map(|ix| CrdsValueLabel::Vote(ix, *key))); labels.extend((0..MAX_VOTES).map(|ix| CrdsValueLabel::Vote(ix, *key)));
labels labels
@ -310,14 +326,15 @@ mod test {
#[test] #[test]
fn test_labels() { fn test_labels() {
let mut hits = [false; 3 + MAX_VOTES as usize]; let mut hits = [false; 4 + MAX_VOTES as usize];
// this method should cover all the possible labels // this method should cover all the possible labels
for v in &CrdsValue::record_labels(&Pubkey::default()) { for v in &CrdsValue::record_labels(&Pubkey::default()) {
match v { match v {
CrdsValueLabel::ContactInfo(_) => hits[0] = true, CrdsValueLabel::ContactInfo(_) => hits[0] = true,
CrdsValueLabel::EpochSlots(_) => hits[1] = true, CrdsValueLabel::EpochSlots(_) => hits[1] = true,
CrdsValueLabel::SnapshotHash(_) => hits[2] = true, CrdsValueLabel::SnapshotHashes(_) => hits[2] = true,
CrdsValueLabel::Vote(ix, _) => hits[*ix as usize + 3] = true, CrdsValueLabel::AccountsHashes(_) => hits[3] = true,
CrdsValueLabel::Vote(ix, _) => hits[*ix as usize + 4] = true,
} }
} }
assert!(hits.iter().all(|x| *x)); assert!(hits.iter().all(|x| *x));

View File

@ -5,6 +5,7 @@
//! command-line tools to spin up validators and a Rust library //! command-line tools to spin up validators and a Rust library
//! //!
pub mod accounts_hash_verifier;
pub mod banking_stage; pub mod banking_stage;
pub mod broadcast_stage; pub mod broadcast_stage;
pub mod cluster_info_vote_listener; pub mod cluster_info_vote_listener;

View File

@ -76,7 +76,7 @@ pub struct ReplayStageConfig {
pub leader_schedule_cache: Arc<LeaderScheduleCache>, pub leader_schedule_cache: Arc<LeaderScheduleCache>,
pub slot_full_senders: Vec<Sender<(u64, Pubkey)>>, pub slot_full_senders: Vec<Sender<(u64, Pubkey)>>,
pub latest_root_senders: Vec<Sender<Slot>>, pub latest_root_senders: Vec<Sender<Slot>>,
pub snapshot_package_sender: Option<SnapshotPackageSender>, pub accounts_hash_sender: Option<SnapshotPackageSender>,
pub block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>, pub block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
pub transaction_status_sender: Option<TransactionStatusSender>, pub transaction_status_sender: Option<TransactionStatusSender>,
pub rewards_recorder_sender: Option<RewardsRecorderSender>, pub rewards_recorder_sender: Option<RewardsRecorderSender>,
@ -179,7 +179,7 @@ impl ReplayStage {
leader_schedule_cache, leader_schedule_cache,
slot_full_senders, slot_full_senders,
latest_root_senders, latest_root_senders,
snapshot_package_sender, accounts_hash_sender,
block_commitment_cache, block_commitment_cache,
transaction_status_sender, transaction_status_sender,
rewards_recorder_sender, rewards_recorder_sender,
@ -334,7 +334,7 @@ impl ReplayStage {
&root_bank_sender, &root_bank_sender,
total_staked, total_staked,
&lockouts_sender, &lockouts_sender,
&snapshot_package_sender, &accounts_hash_sender,
&latest_root_senders, &latest_root_senders,
)?; )?;
} }
@ -605,7 +605,7 @@ impl ReplayStage {
root_bank_sender: &Sender<Vec<Arc<Bank>>>, root_bank_sender: &Sender<Vec<Arc<Bank>>>,
total_staked: u64, total_staked: u64,
lockouts_sender: &Sender<CommitmentAggregationData>, lockouts_sender: &Sender<CommitmentAggregationData>,
snapshot_package_sender: &Option<SnapshotPackageSender>, accounts_hash_sender: &Option<SnapshotPackageSender>,
latest_root_senders: &[Sender<Slot>], latest_root_senders: &[Sender<Slot>],
) -> Result<()> { ) -> Result<()> {
if bank.is_empty() { if bank.is_empty() {
@ -632,7 +632,7 @@ impl ReplayStage {
blockstore blockstore
.set_roots(&rooted_slots) .set_roots(&rooted_slots)
.expect("Ledger set roots failed"); .expect("Ledger set roots failed");
Self::handle_new_root(new_root, &bank_forks, progress, snapshot_package_sender); Self::handle_new_root(new_root, &bank_forks, progress, accounts_hash_sender);
latest_root_senders.iter().for_each(|s| { latest_root_senders.iter().for_each(|s| {
if let Err(e) = s.send(new_root) { if let Err(e) = s.send(new_root) {
trace!("latest root send failed: {:?}", e); trace!("latest root send failed: {:?}", e);
@ -959,12 +959,12 @@ impl ReplayStage {
new_root: u64, new_root: u64,
bank_forks: &RwLock<BankForks>, bank_forks: &RwLock<BankForks>,
progress: &mut HashMap<u64, ForkProgress>, progress: &mut HashMap<u64, ForkProgress>,
snapshot_package_sender: &Option<SnapshotPackageSender>, accounts_hash_sender: &Option<SnapshotPackageSender>,
) { ) {
bank_forks bank_forks
.write() .write()
.unwrap() .unwrap()
.set_root(new_root, snapshot_package_sender); .set_root(new_root, accounts_hash_sender);
let r_bank_forks = bank_forks.read().unwrap(); let r_bank_forks = bank_forks.read().unwrap();
progress.retain(|k, _| r_bank_forks.get(*k).is_some()); progress.retain(|k, _| r_bank_forks.get(*k).is_some());
} }

View File

@ -2,6 +2,7 @@
//! validation pipeline in software. //! validation pipeline in software.
use crate::{ use crate::{
accounts_hash_verifier::AccountsHashVerifier,
blockstream_service::BlockstreamService, blockstream_service::BlockstreamService,
cluster_info::ClusterInfo, cluster_info::ClusterInfo,
commitment::BlockCommitmentCache, commitment::BlockCommitmentCache,
@ -28,6 +29,7 @@ use solana_sdk::{
pubkey::Pubkey, pubkey::Pubkey,
signature::{Keypair, Signer}, signature::{Keypair, Signer},
}; };
use std::collections::HashSet;
use std::{ use std::{
net::UdpSocket, net::UdpSocket,
path::PathBuf, path::PathBuf,
@ -47,6 +49,7 @@ pub struct Tvu {
blockstream_service: Option<BlockstreamService>, blockstream_service: Option<BlockstreamService>,
ledger_cleanup_service: Option<LedgerCleanupService>, ledger_cleanup_service: Option<LedgerCleanupService>,
storage_stage: StorageStage, storage_stage: StorageStage,
accounts_hash_verifier: AccountsHashVerifier,
} }
pub struct Sockets { pub struct Sockets {
@ -56,6 +59,16 @@ pub struct Sockets {
pub forwards: Vec<UdpSocket>, pub forwards: Vec<UdpSocket>,
} }
#[derive(Default)]
pub struct TvuConfig {
pub max_ledger_slots: Option<u64>,
pub sigverify_disabled: bool,
pub shred_version: u16,
pub halt_on_trusted_validators_accounts_hash_mismatch: bool,
pub trusted_validators: Option<HashSet<Pubkey>>,
pub accounts_hash_fault_injection_slots: u64,
}
impl Tvu { impl Tvu {
/// This service receives messages from a leader in the network and processes the transactions /// This service receives messages from a leader in the network and processes the transactions
/// on the bank state. /// on the bank state.
@ -74,7 +87,6 @@ impl Tvu {
blockstore: Arc<Blockstore>, blockstore: Arc<Blockstore>,
storage_state: &StorageState, storage_state: &StorageState,
blockstream_unix_socket: Option<&PathBuf>, blockstream_unix_socket: Option<&PathBuf>,
max_ledger_slots: Option<u64>,
ledger_signal_receiver: Receiver<bool>, ledger_signal_receiver: Receiver<bool>,
subscriptions: &Arc<RpcSubscriptions>, subscriptions: &Arc<RpcSubscriptions>,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
@ -82,12 +94,11 @@ impl Tvu {
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
completed_slots_receiver: CompletedSlotsReceiver, completed_slots_receiver: CompletedSlotsReceiver,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>, block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
sigverify_disabled: bool,
cfg: Option<Arc<AtomicBool>>, cfg: Option<Arc<AtomicBool>>,
shred_version: u16,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
rewards_recorder_sender: Option<RewardsRecorderSender>, rewards_recorder_sender: Option<RewardsRecorderSender>,
snapshot_package_sender: Option<SnapshotPackageSender>, snapshot_package_sender: Option<SnapshotPackageSender>,
tvu_config: TvuConfig,
) -> Self { ) -> Self {
let keypair: Arc<Keypair> = cluster_info let keypair: Arc<Keypair> = cluster_info
.read() .read()
@ -117,7 +128,7 @@ impl Tvu {
); );
let (verified_sender, verified_receiver) = unbounded(); let (verified_sender, verified_receiver) = unbounded();
let sigverify_stage = if !sigverify_disabled { let sigverify_stage = if !tvu_config.sigverify_disabled {
SigVerifyStage::new( SigVerifyStage::new(
fetch_receiver, fetch_receiver,
verified_sender, verified_sender,
@ -143,12 +154,23 @@ impl Tvu {
completed_slots_receiver, completed_slots_receiver,
*bank_forks.read().unwrap().working_bank().epoch_schedule(), *bank_forks.read().unwrap().working_bank().epoch_schedule(),
cfg, cfg,
shred_version, tvu_config.shred_version,
); );
let (blockstream_slot_sender, blockstream_slot_receiver) = channel(); let (blockstream_slot_sender, blockstream_slot_receiver) = channel();
let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel(); let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel();
let (accounts_hash_sender, accounts_hash_receiver) = channel();
let accounts_hash_verifier = AccountsHashVerifier::new(
accounts_hash_receiver,
snapshot_package_sender,
exit,
cluster_info,
tvu_config.trusted_validators.clone(),
tvu_config.halt_on_trusted_validators_accounts_hash_mismatch,
tvu_config.accounts_hash_fault_injection_slots,
);
let replay_stage_config = ReplayStageConfig { let replay_stage_config = ReplayStageConfig {
my_pubkey: keypair.pubkey(), my_pubkey: keypair.pubkey(),
vote_account: *vote_account, vote_account: *vote_account,
@ -158,7 +180,7 @@ impl Tvu {
leader_schedule_cache: leader_schedule_cache.clone(), leader_schedule_cache: leader_schedule_cache.clone(),
slot_full_senders: vec![blockstream_slot_sender], slot_full_senders: vec![blockstream_slot_sender],
latest_root_senders: vec![ledger_cleanup_slot_sender], latest_root_senders: vec![ledger_cleanup_slot_sender],
snapshot_package_sender, accounts_hash_sender: Some(accounts_hash_sender),
block_commitment_cache, block_commitment_cache,
transaction_status_sender, transaction_status_sender,
rewards_recorder_sender, rewards_recorder_sender,
@ -185,7 +207,7 @@ impl Tvu {
None None
}; };
let ledger_cleanup_service = max_ledger_slots.map(|max_ledger_slots| { let ledger_cleanup_service = tvu_config.max_ledger_slots.map(|max_ledger_slots| {
LedgerCleanupService::new( LedgerCleanupService::new(
ledger_cleanup_slot_receiver, ledger_cleanup_slot_receiver,
blockstore.clone(), blockstore.clone(),
@ -213,6 +235,7 @@ impl Tvu {
blockstream_service, blockstream_service,
ledger_cleanup_service, ledger_cleanup_service,
storage_stage, storage_stage,
accounts_hash_verifier,
} }
} }
@ -228,6 +251,7 @@ impl Tvu {
self.ledger_cleanup_service.unwrap().join()?; self.ledger_cleanup_service.unwrap().join()?;
} }
self.replay_stage.join()?; self.replay_stage.join()?;
self.accounts_hash_verifier.join()?;
Ok(()) Ok(())
} }
} }
@ -288,7 +312,6 @@ pub mod tests {
blockstore, blockstore,
&StorageState::default(), &StorageState::default(),
None, None,
None,
l_receiver, l_receiver,
&Arc::new(RpcSubscriptions::new(&exit)), &Arc::new(RpcSubscriptions::new(&exit)),
&poh_recorder, &poh_recorder,
@ -296,12 +319,11 @@ pub mod tests {
&exit, &exit,
completed_slots_receiver, completed_slots_receiver,
block_commitment_cache, block_commitment_cache,
false,
None,
0,
None, None,
None, None,
None, None,
None,
TvuConfig::default(),
); );
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
tvu.join().unwrap(); tvu.join().unwrap();

View File

@ -20,7 +20,7 @@ use crate::{
storage_stage::StorageState, storage_stage::StorageState,
tpu::Tpu, tpu::Tpu,
transaction_status_service::TransactionStatusService, transaction_status_service::TransactionStatusService,
tvu::{Sockets, Tvu}, tvu::{Sockets, Tvu, TvuConfig},
}; };
use crossbeam_channel::unbounded; use crossbeam_channel::unbounded;
use solana_ledger::{ use solana_ledger::{
@ -76,6 +76,8 @@ pub struct ValidatorConfig {
pub wait_for_supermajority: Option<Slot>, pub wait_for_supermajority: Option<Slot>,
pub new_hard_forks: Option<Vec<Slot>>, pub new_hard_forks: Option<Vec<Slot>>,
pub trusted_validators: Option<HashSet<Pubkey>>, // None = trust all pub trusted_validators: Option<HashSet<Pubkey>>, // None = trust all
pub halt_on_trusted_validators_accounts_hash_mismatch: bool,
pub accounts_hash_fault_injection_slots: u64, // 0 = no fault injection
} }
impl Default for ValidatorConfig { impl Default for ValidatorConfig {
@ -99,6 +101,8 @@ impl Default for ValidatorConfig {
wait_for_supermajority: None, wait_for_supermajority: None,
new_hard_forks: None, new_hard_forks: None,
trusted_validators: None, trusted_validators: None,
halt_on_trusted_validators_accounts_hash_mismatch: false,
accounts_hash_fault_injection_slots: 0,
} }
} }
} }
@ -416,7 +420,6 @@ impl Validator {
blockstore.clone(), blockstore.clone(),
&storage_state, &storage_state,
config.blockstream_unix_socket.as_ref(), config.blockstream_unix_socket.as_ref(),
config.max_ledger_slots,
ledger_signal_receiver, ledger_signal_receiver,
&subscriptions, &subscriptions,
&poh_recorder, &poh_recorder,
@ -424,12 +427,19 @@ impl Validator {
&exit, &exit,
completed_slots_receiver, completed_slots_receiver,
block_commitment_cache, block_commitment_cache,
config.dev_sigverify_disabled,
config.enable_partition.clone(), config.enable_partition.clone(),
node.info.shred_version,
transaction_status_sender.clone(), transaction_status_sender.clone(),
rewards_recorder_sender, rewards_recorder_sender,
snapshot_package_sender, snapshot_package_sender,
TvuConfig {
max_ledger_slots: config.max_ledger_slots,
sigverify_disabled: config.dev_sigverify_disabled,
halt_on_trusted_validators_accounts_hash_mismatch: config
.halt_on_trusted_validators_accounts_hash_mismatch,
shred_version: node.info.shred_version,
trusted_validators: config.trusted_validators.clone(),
accounts_hash_fault_injection_slots: config.accounts_hash_fault_injection_slots,
},
); );
if config.dev_sigverify_disabled { if config.dev_sigverify_disabled {

View File

@ -606,6 +606,95 @@ fn test_softlaunch_operating_mode() {
} }
} }
#[test]
#[serial]
fn test_consistency_halt() {
solana_logger::setup();
let snapshot_interval_slots = 20;
let num_account_paths = 1;
// Create cluster with a leader producing bad snapshot hashes.
let mut leader_snapshot_test_config =
setup_snapshot_validator_config(snapshot_interval_slots, num_account_paths);
leader_snapshot_test_config
.validator_config
.accounts_hash_fault_injection_slots = 40;
let validator_stake = 10_000;
let config = ClusterConfig {
node_stakes: vec![validator_stake],
cluster_lamports: 100_000,
validator_configs: vec![leader_snapshot_test_config.validator_config.clone()],
..ClusterConfig::default()
};
let mut cluster = LocalCluster::new(&config);
sleep(Duration::from_millis(5000));
let (cluster_nodes, _) = discover_cluster(&cluster.entry_point_info.gossip, 1).unwrap();
info!("num_nodes: {}", cluster_nodes.len());
// Add a validator with the leader as trusted, it should halt when it detects
// mismatch.
let mut validator_snapshot_test_config =
setup_snapshot_validator_config(snapshot_interval_slots, num_account_paths);
let mut trusted_validators = HashSet::new();
trusted_validators.insert(cluster_nodes[0].id);
validator_snapshot_test_config
.validator_config
.trusted_validators = Some(trusted_validators);
validator_snapshot_test_config
.validator_config
.halt_on_trusted_validators_accounts_hash_mismatch = true;
warn!("adding a validator");
cluster.add_validator(
&validator_snapshot_test_config.validator_config,
validator_stake as u64,
Arc::new(Keypair::new()),
);
let num_nodes = 2;
assert_eq!(
discover_cluster(&cluster.entry_point_info.gossip, num_nodes)
.unwrap()
.0
.len(),
num_nodes
);
// Check for only 1 node on the network.
let mut encountered_error = false;
loop {
let discover = discover_cluster(&cluster.entry_point_info.gossip, 2);
match discover {
Err(_) => {
encountered_error = true;
break;
}
Ok(nodes) => {
if nodes.0.len() < 2 {
encountered_error = true;
break;
}
info!("checking cluster for fewer nodes.. {:?}", nodes.0.len());
}
}
let client = cluster
.get_validator_client(&cluster.entry_point_info.id)
.unwrap();
if let Ok(slot) = client.get_slot() {
if slot > 210 {
break;
}
info!("slot: {}", slot);
}
sleep(Duration::from_millis(1000));
}
assert!(encountered_error);
}
#[allow(unused_attributes)] #[allow(unused_attributes)]
#[test] #[test]
#[serial] #[serial]

View File

@ -145,6 +145,9 @@ while [[ -n $1 ]]; do
elif [[ $1 = --trusted-validator ]]; then elif [[ $1 = --trusted-validator ]]; then
args+=("$1" "$2") args+=("$1" "$2")
shift 2 shift 2
elif [[ $1 = --halt-on-trusted-validators-accounts-hash-mismatch ]]; then
args+=("$1")
shift
elif [[ $1 = -h ]]; then elif [[ $1 = -h ]]; then
usage "$@" usage "$@"
else else

View File

@ -851,6 +851,13 @@ pub fn main() {
.validator(solana_net_utils::is_host) .validator(solana_net_utils::is_host)
.help("IP address to bind the RPC port [default: use --bind-address]"), .help("IP address to bind the RPC port [default: use --bind-address]"),
) )
.arg(
clap::Arg::with_name("halt_on_trusted_validators_accounts_hash_mismatch")
.long("halt-on-trusted-validators-accounts-hash-mismatch")
.requires("trusted_validators")
.takes_value(false)
.help("Abort the validator if a bank hash mismatch is detected within trusted validator set"),
)
.get_matches(); .get_matches();
let identity_keypair = Arc::new(keypair_of(&matches, "identity").unwrap_or_else(Keypair::new)); let identity_keypair = Arc::new(keypair_of(&matches, "identity").unwrap_or_else(Keypair::new));
@ -1009,6 +1016,10 @@ pub fn main() {
validator_config.max_ledger_slots = Some(limit_ledger_size); validator_config.max_ledger_slots = Some(limit_ledger_size);
} }
if matches.is_present("halt_on_trusted_validators_accounts_hash_mismatch") {
validator_config.halt_on_trusted_validators_accounts_hash_mismatch = true;
}
if matches.value_of("signer_addr").is_some() { if matches.value_of("signer_addr").is_some() {
warn!("--vote-signer-address ignored"); warn!("--vote-signer-address ignored");
} }