Decouple accounts hash calculation from snapshot hash (#9507)
This commit is contained in:
@ -6,8 +6,8 @@
|
||||
|
||||
use crate::cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES};
|
||||
use solana_ledger::{
|
||||
snapshot_package::SnapshotPackage, snapshot_package::SnapshotPackageReceiver,
|
||||
snapshot_package::SnapshotPackageSender,
|
||||
snapshot_package::AccountsPackage, snapshot_package::AccountsPackageReceiver,
|
||||
snapshot_package::AccountsPackageSender,
|
||||
};
|
||||
use solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
@ -27,13 +27,14 @@ pub struct AccountsHashVerifier {
|
||||
|
||||
impl AccountsHashVerifier {
|
||||
pub fn new(
|
||||
snapshot_package_receiver: SnapshotPackageReceiver,
|
||||
snapshot_package_sender: Option<SnapshotPackageSender>,
|
||||
accounts_package_receiver: AccountsPackageReceiver,
|
||||
accounts_package_sender: Option<AccountsPackageSender>,
|
||||
exit: &Arc<AtomicBool>,
|
||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||
trusted_validators: Option<HashSet<Pubkey>>,
|
||||
halt_on_trusted_validators_accounts_hash_mismatch: bool,
|
||||
fault_injection_rate_slots: u64,
|
||||
snapshot_interval_slots: u64,
|
||||
) -> Self {
|
||||
let exit = exit.clone();
|
||||
let cluster_info = cluster_info.clone();
|
||||
@ -46,17 +47,18 @@ impl AccountsHashVerifier {
|
||||
break;
|
||||
}
|
||||
|
||||
match snapshot_package_receiver.recv_timeout(Duration::from_secs(1)) {
|
||||
Ok(snapshot_package) => {
|
||||
Self::process_snapshot(
|
||||
snapshot_package,
|
||||
match accounts_package_receiver.recv_timeout(Duration::from_secs(1)) {
|
||||
Ok(accounts_package) => {
|
||||
Self::process_accounts_package(
|
||||
accounts_package,
|
||||
&cluster_info,
|
||||
&trusted_validators,
|
||||
halt_on_trusted_validators_accounts_hash_mismatch,
|
||||
&snapshot_package_sender,
|
||||
&accounts_package_sender,
|
||||
&mut hashes,
|
||||
&exit,
|
||||
fault_injection_rate_slots,
|
||||
snapshot_interval_slots,
|
||||
);
|
||||
}
|
||||
Err(RecvTimeoutError::Disconnected) => break,
|
||||
@ -70,28 +72,29 @@ impl AccountsHashVerifier {
|
||||
}
|
||||
}
|
||||
|
||||
fn process_snapshot(
|
||||
snapshot_package: SnapshotPackage,
|
||||
fn process_accounts_package(
|
||||
accounts_package: AccountsPackage,
|
||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||
trusted_validators: &Option<HashSet<Pubkey>>,
|
||||
halt_on_trusted_validator_accounts_hash_mismatch: bool,
|
||||
snapshot_package_sender: &Option<SnapshotPackageSender>,
|
||||
accounts_package_sender: &Option<AccountsPackageSender>,
|
||||
hashes: &mut Vec<(Slot, Hash)>,
|
||||
exit: &Arc<AtomicBool>,
|
||||
fault_injection_rate_slots: u64,
|
||||
snapshot_interval_slots: u64,
|
||||
) {
|
||||
if fault_injection_rate_slots != 0
|
||||
&& snapshot_package.root % fault_injection_rate_slots == 0
|
||||
&& accounts_package.root % fault_injection_rate_slots == 0
|
||||
{
|
||||
// For testing, publish an invalid hash to gossip.
|
||||
use rand::{thread_rng, Rng};
|
||||
use solana_sdk::hash::extend_and_hash;
|
||||
warn!("inserting fault at slot: {}", snapshot_package.root);
|
||||
warn!("inserting fault at slot: {}", accounts_package.root);
|
||||
let rand = thread_rng().gen_range(0, 10);
|
||||
let hash = extend_and_hash(&snapshot_package.hash, &[rand]);
|
||||
hashes.push((snapshot_package.root, hash));
|
||||
let hash = extend_and_hash(&accounts_package.hash, &[rand]);
|
||||
hashes.push((accounts_package.root, hash));
|
||||
} else {
|
||||
hashes.push((snapshot_package.root, snapshot_package.hash));
|
||||
hashes.push((accounts_package.root, accounts_package.hash));
|
||||
}
|
||||
|
||||
while hashes.len() > MAX_SNAPSHOT_HASHES {
|
||||
@ -107,8 +110,11 @@ impl AccountsHashVerifier {
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
if let Some(sender) = snapshot_package_sender.as_ref() {
|
||||
if sender.send(snapshot_package).is_err() {}
|
||||
|
||||
if accounts_package.block_height % snapshot_interval_slots == 0 {
|
||||
if let Some(sender) = accounts_package_sender.as_ref() {
|
||||
if sender.send(accounts_package).is_err() {}
|
||||
}
|
||||
}
|
||||
|
||||
cluster_info
|
||||
@ -225,8 +231,9 @@ mod tests {
|
||||
let mut hashes = vec![];
|
||||
for i in 0..MAX_SNAPSHOT_HASHES + 1 {
|
||||
let snapshot_links = TempDir::new().unwrap();
|
||||
let snapshot_package = SnapshotPackage {
|
||||
let accounts_package = AccountsPackage {
|
||||
hash: hash(&[i as u8]),
|
||||
block_height: 100 + i as u64,
|
||||
root: 100 + i as u64,
|
||||
slot_deltas: vec![],
|
||||
snapshot_links,
|
||||
@ -235,8 +242,8 @@ mod tests {
|
||||
compression: CompressionType::Bzip2,
|
||||
};
|
||||
|
||||
AccountsHashVerifier::process_snapshot(
|
||||
snapshot_package,
|
||||
AccountsHashVerifier::process_accounts_package(
|
||||
accounts_package,
|
||||
&cluster_info,
|
||||
&Some(trusted_validators.clone()),
|
||||
false,
|
||||
@ -244,6 +251,7 @@ mod tests {
|
||||
&mut hashes,
|
||||
&exit,
|
||||
0,
|
||||
100,
|
||||
);
|
||||
}
|
||||
let cluster_info_r = cluster_info.read().unwrap();
|
||||
|
@ -20,7 +20,7 @@ use solana_ledger::{
|
||||
blockstore_processor::{self, BlockstoreProcessorError, TransactionStatusSender},
|
||||
entry::VerifyRecyclers,
|
||||
leader_schedule_cache::LeaderScheduleCache,
|
||||
snapshot_package::SnapshotPackageSender,
|
||||
snapshot_package::AccountsPackageSender,
|
||||
};
|
||||
use solana_measure::thread_mem_usage;
|
||||
use solana_metrics::inc_new_counter_info;
|
||||
@ -97,7 +97,7 @@ pub struct ReplayStageConfig {
|
||||
pub subscriptions: Arc<RpcSubscriptions>,
|
||||
pub leader_schedule_cache: Arc<LeaderScheduleCache>,
|
||||
pub latest_root_senders: Vec<Sender<Slot>>,
|
||||
pub accounts_hash_sender: Option<SnapshotPackageSender>,
|
||||
pub accounts_hash_sender: Option<AccountsPackageSender>,
|
||||
pub block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
|
||||
pub transaction_status_sender: Option<TransactionStatusSender>,
|
||||
pub rewards_recorder_sender: Option<RewardsRecorderSender>,
|
||||
@ -690,7 +690,7 @@ impl ReplayStage {
|
||||
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
||||
root_bank_sender: &Sender<Vec<Arc<Bank>>>,
|
||||
lockouts_sender: &Sender<CommitmentAggregationData>,
|
||||
accounts_hash_sender: &Option<SnapshotPackageSender>,
|
||||
accounts_hash_sender: &Option<AccountsPackageSender>,
|
||||
latest_root_senders: &[Sender<Slot>],
|
||||
all_pubkeys: &mut HashSet<Rc<Pubkey>>,
|
||||
subscriptions: &Arc<RpcSubscriptions>,
|
||||
@ -1485,7 +1485,7 @@ impl ReplayStage {
|
||||
new_root: u64,
|
||||
bank_forks: &RwLock<BankForks>,
|
||||
progress: &mut ProgressMap,
|
||||
accounts_hash_sender: &Option<SnapshotPackageSender>,
|
||||
accounts_hash_sender: &Option<AccountsPackageSender>,
|
||||
all_pubkeys: &mut HashSet<Rc<Pubkey>>,
|
||||
) {
|
||||
let old_epoch = bank_forks.read().unwrap().root_bank().epoch();
|
||||
|
@ -1,5 +1,5 @@
|
||||
use crate::cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES};
|
||||
use solana_ledger::{snapshot_package::SnapshotPackageReceiver, snapshot_utils};
|
||||
use solana_ledger::{snapshot_package::AccountsPackageReceiver, snapshot_utils};
|
||||
use solana_sdk::{clock::Slot, hash::Hash};
|
||||
use std::{
|
||||
sync::{
|
||||
@ -17,7 +17,7 @@ pub struct SnapshotPackagerService {
|
||||
|
||||
impl SnapshotPackagerService {
|
||||
pub fn new(
|
||||
snapshot_package_receiver: SnapshotPackageReceiver,
|
||||
snapshot_package_receiver: AccountsPackageReceiver,
|
||||
starting_snapshot_hash: Option<(Slot, Hash)>,
|
||||
exit: &Arc<AtomicBool>,
|
||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||
@ -86,7 +86,7 @@ mod tests {
|
||||
use bincode::serialize_into;
|
||||
use solana_ledger::bank_forks::CompressionType;
|
||||
use solana_ledger::{
|
||||
snapshot_package::SnapshotPackage,
|
||||
snapshot_package::AccountsPackage,
|
||||
snapshot_utils::{self, SNAPSHOT_STATUS_CACHE_FILE_NAME},
|
||||
};
|
||||
use solana_runtime::{
|
||||
@ -172,7 +172,8 @@ mod tests {
|
||||
&(42, Hash::default()),
|
||||
&CompressionType::Bzip2,
|
||||
);
|
||||
let snapshot_package = SnapshotPackage::new(
|
||||
let snapshot_package = AccountsPackage::new(
|
||||
5,
|
||||
5,
|
||||
vec![],
|
||||
link_snapshots_dir,
|
||||
|
@ -26,7 +26,7 @@ use solana_ledger::{
|
||||
bank_forks::BankForks,
|
||||
blockstore::{Blockstore, CompletedSlotsReceiver},
|
||||
blockstore_processor::TransactionStatusSender,
|
||||
snapshot_package::SnapshotPackageSender,
|
||||
snapshot_package::AccountsPackageSender,
|
||||
};
|
||||
use solana_sdk::{
|
||||
pubkey::Pubkey,
|
||||
@ -98,7 +98,7 @@ impl Tvu {
|
||||
cfg: Option<Arc<AtomicBool>>,
|
||||
transaction_status_sender: Option<TransactionStatusSender>,
|
||||
rewards_recorder_sender: Option<RewardsRecorderSender>,
|
||||
snapshot_package_sender: Option<SnapshotPackageSender>,
|
||||
snapshot_package_sender: Option<AccountsPackageSender>,
|
||||
vote_tracker: Arc<VoteTracker>,
|
||||
retransmit_slots_sender: RetransmitSlotsSender,
|
||||
tvu_config: TvuConfig,
|
||||
@ -165,6 +165,14 @@ impl Tvu {
|
||||
|
||||
let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel();
|
||||
|
||||
let snapshot_interval_slots = {
|
||||
if let Some(config) = bank_forks.read().unwrap().snapshot_config() {
|
||||
config.snapshot_interval_slots
|
||||
} else {
|
||||
std::u64::MAX
|
||||
}
|
||||
};
|
||||
info!("snapshot_interval_slots: {}", snapshot_interval_slots);
|
||||
let (accounts_hash_sender, accounts_hash_receiver) = channel();
|
||||
let accounts_hash_verifier = AccountsHashVerifier::new(
|
||||
accounts_hash_receiver,
|
||||
@ -174,6 +182,7 @@ impl Tvu {
|
||||
tvu_config.trusted_validators.clone(),
|
||||
tvu_config.halt_on_trusted_validators_accounts_hash_mismatch,
|
||||
tvu_config.accounts_hash_fault_injection_slots,
|
||||
snapshot_interval_slots,
|
||||
);
|
||||
|
||||
let replay_stage_config = ReplayStageConfig {
|
||||
|
@ -81,6 +81,7 @@ pub struct ValidatorConfig {
|
||||
pub accounts_hash_fault_injection_slots: u64, // 0 = no fault injection
|
||||
pub frozen_accounts: Vec<Pubkey>,
|
||||
pub no_rocksdb_compaction: bool,
|
||||
pub accounts_hash_interval_slots: u64,
|
||||
}
|
||||
|
||||
impl Default for ValidatorConfig {
|
||||
@ -107,6 +108,7 @@ impl Default for ValidatorConfig {
|
||||
accounts_hash_fault_injection_slots: 0,
|
||||
frozen_accounts: vec![],
|
||||
no_rocksdb_compaction: false,
|
||||
accounts_hash_interval_slots: std::u64::MAX,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -622,6 +624,7 @@ fn new_banks_from_blockstore(
|
||||
leader_schedule_cache.set_fixed_leader_schedule(config.fixed_leader_schedule.clone());
|
||||
|
||||
bank_forks.set_snapshot_config(config.snapshot_config.clone());
|
||||
bank_forks.set_accounts_hash_interval_slots(config.accounts_hash_interval_slots);
|
||||
|
||||
(
|
||||
genesis_config,
|
||||
|
@ -38,7 +38,7 @@ mod tests {
|
||||
genesis_config_info: GenesisConfigInfo,
|
||||
}
|
||||
|
||||
fn setup_snapshot_test(snapshot_interval_slots: usize) -> SnapshotTestConfig {
|
||||
fn setup_snapshot_test(snapshot_interval_slots: u64) -> SnapshotTestConfig {
|
||||
let accounts_dir = TempDir::new().unwrap();
|
||||
let snapshot_dir = TempDir::new().unwrap();
|
||||
let snapshot_output_path = TempDir::new().unwrap();
|
||||
@ -50,6 +50,7 @@ mod tests {
|
||||
);
|
||||
bank0.freeze();
|
||||
let mut bank_forks = BankForks::new(0, bank0);
|
||||
bank_forks.accounts_hash_interval_slots = snapshot_interval_slots;
|
||||
|
||||
let snapshot_config = SnapshotConfig {
|
||||
snapshot_interval_slots,
|
||||
@ -265,7 +266,7 @@ mod tests {
|
||||
};
|
||||
|
||||
bank_forks
|
||||
.generate_snapshot(slot, &vec![], &package_sender)
|
||||
.generate_accounts_package(slot, &vec![], &package_sender)
|
||||
.unwrap();
|
||||
|
||||
if slot == saved_slot as u64 {
|
||||
@ -371,7 +372,7 @@ mod tests {
|
||||
let (snapshot_sender, _snapshot_receiver) = channel();
|
||||
// Make sure this test never clears bank.slots_since_snapshot
|
||||
let mut snapshot_test_config =
|
||||
setup_snapshot_test(add_root_interval * num_set_roots * 2);
|
||||
setup_snapshot_test((add_root_interval * num_set_roots * 2) as u64);
|
||||
let mut current_bank = snapshot_test_config.bank_forks[0].clone();
|
||||
let snapshot_sender = Some(snapshot_sender);
|
||||
for _ in 0..num_set_roots {
|
||||
|
Reference in New Issue
Block a user